1.启动kafka。
//启动zookeeper server (用&是为了能退出命令行):
bin/zookeeper-server-start.sh config/zookeeper.properties &
//启动kafka server:
bin/kafka-server-start.sh config/server.properties &
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.1.1</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.10.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.10</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.10</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>com.yammer.metrics</groupId> <artifactId>metrics-core</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>com.yammer.metrics</groupId> <artifactId>metrics-annotation</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency> <dependency> <groupId>zkclient</groupId> <artifactId>zkclient</artifactId> <version>0.3</version> </dependency> </dependencies>
2.新建一个生产者例子
import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class KafkaTest { public static void main(String[] args) { Properties props = new Properties(); props.put("zk.connect", "10.103.22.47:2181"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("metadata.broker.list", "10.103.22.47:9092"); props.put("request.required.acks", "1"); //props.put("partitioner.class", "com.xq.SimplePartitioner"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); String ip = "192.168.2.3"; String msg ="this is a messageuuu!"; KeyedMessage<String, String> data = new KeyedMessage<String, String>("test", ip,msg); producer.send(data); producer.close(); } }
3.新建一个消费者例子
import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.Message; import kafka.message.MessageAndMetadata; public class ConsumerSample { public static void main(String[] args) { // specify some consumer properties Properties props = new Properties(); props.put("zookeeper.connect", "10.103.22.47:2181"); props.put("zookeeper.connectiontimeout.ms", "1000000"); props.put("group.id", "test_group"); // Create the connection to the cluster ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector connector = Consumer.createJavaConsumerConnector(consumerConfig); Map<String,Integer> topics = new HashMap<String,Integer>(); topics.put("test", 2); Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = connector.createMessageStreams(topics); List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get("test"); ExecutorService threadPool = Executors.newFixedThreadPool(2); for (final KafkaStream<byte[], byte[]> stream : streams) { threadPool.submit(new Runnable() { public void run() { for (MessageAndMetadata msgAndMetadata : stream) { // process message (msgAndMetadata.message()) System.out.println("topic: " + msgAndMetadata.topic()); Message message = (Message) msgAndMetadata.message(); ByteBuffer buffer = message.payload(); byte[] bytes = new byte[message.payloadSize()]; buffer.get(bytes); String tmp = new String(bytes); System.out.println("message content: " + tmp); } } }); } } }
相关推荐
Kafka 是一种高吞吐量 的分布式发布订阅消息系统,本实例利用C#开发,同步数据,亲测可用!
Kafka客户端开发实例java源码
storm与kafka整合的客户端开发实例java源码
2. 高级-etcd、contex、kafka消费实例、logagent 3. 实战-商品秒杀架构设计与开发 4. 实战-商品秒杀开发与接入层实现 总共18课时,网上收集的资料,只共用于学习,不能用于商业用途,否则后果自负。
开发使用,特此分享,所...Kafka Eagle监控系统是一款用来监控Kafka集群的工具,支持管理多个Kafka集群、管理Kafka主题(包含查看、删除、创建等)、消费者组合消费者实例监控、消息阻塞告警、Kafka集群健康状态查看等
发展要构建开发版本,您需要Kafka的最新版本以及一系列上游Confluent项目,您必须从其相应的快照分支中进行构建。 有关此过程的指导,请参见。 您可以使用标准生命周期阶段在Maven中构建kafka-connect-jdbc。常问...
此外,Kafka集群由多个Kafka实例组成,每个实例(server)成为broker。无论是Kafka集群,还是Producer和Consumer都依赖于Zookeeper集群保存一些meta信息,来保证系统可用性。 Kafka的特性包括高吞吐量、可持久化、...
Kafka REST代理 Kafka REST代理为Kafka集群提供了RESTful接口。 它使生成和使用消息,查看群集状态以及执行管理操作变...以下内容假设您已经使用默认设置和一些已创建的主题运行Kafka和REST Proxy实例。 # Get a list
该适配器旨在替代,该允许多个socket.io实例使用而不是Redis进行通信。 为什么选择卡夫卡? Kafka是一种非常快速,可扩展的分布式消息总线,旨在处理低延迟的大量数据。 它最初由LinkedIn开发,现在是Apache ...
为了便于本地开发,您也可以禁用Kafka部分并完全通过Redis使用Spider,尽管由于爬网请求的序列化,所以不建议这样做。依存关系请参阅每个子项目中的requirements.txt以了解Pip包的依赖性。 运行集群所需的其他重要...
Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献...
DBPRO IoT框架 物联网框架的开发,用于基于流的传感器数据分析。...为了实际获得任何可观察到的输出,您必须运行必要的实例(Kafka,InfluxDB,Grafana)并在相应的Java类中设置其地址。 建于 分布式流媒体平台
卡夫卡面料网 该存储库保存用于测试启用kafka的网络的配置和示例链代码。 介绍 Hyperledger Fabric引入了Kafka作为订购者之间的主要共识机制。... 4个Kafka经纪人实例。 3个Zookeper实例。 技术文档 资料夹结构 ./
breeze是一个基于Java开发的结构化日志异步记录和预警框架。实际上你可以使用它来实现任何结构化的(或理解为POJO的)数据,但breeze提供的预警能力仍然最适用于记录日志(特别是具有业务含义的日志)的场景。 ...
1. 高级-项目实战-日志收集系统kafka库实战 2. 高级-etcd、contex、kafka消费实例、logagent 3. 实战-商品秒杀架构设计与开发 以上这几个项目所需要的工具包
此版本的一个BOSH部署对应于一个服务实例。 在development目录中有一个清单清单示例。 必须在支持全局云配置(246或更高版本)的BOSH版本上使用它。 请注意,此版本仅用于演示目的,不用于生产用途。 自述...
第12天-高级-etcd、contex、kafka消费实例、logagent 第13天-实战-日志管理平台开发 第14天-实战-商品秒杀架构设计与开发 第15天-实战-商品秒杀开发与接入层实现 第16天-实战-商品秒杀逻辑层实现 第17天-实战-商品...
Api Example 以API接口开发为例,探索开发的... Kafka操作实例 RabbitMQ操作实例 Elasticsearch操作实例 项目目录结构 APIExample ├── src │ ├── main │ │ ├── java │ │ │ └── com │ │ │ └─
Kafka操作实例 RabbitMQ操作实例 Elasticsearch操作实例 项目目录结构 APIExample ├── src │ ├── main │ │ ├── java │ │ │ └── com │ │ │ └── fengwenyi │ │ │ └── api_example │ │...
一、基于nginx+lua完成商品详情页访问流量实时上报kafka的开发 ==================================== 在nginx这一层,接收到访问请求的时候,就把请求的流量上报发送给kafka 这样的话,storm才能去消费kafka中的...