kafka生产者实例
相信你对kafka已经有一定的了解了,是时候编写一些代码啦。
JAVA客户端实例
Producers(生产者)
生产者类是用于创建新消息为一个特定的主题和可选的分区。
这个例子是java的,你需要引入相关的包。
import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig;
第一步是定义属性,如果使producer发现集群,序列化消息,如果适当的指导消息到一个特定的分区。
Properties props = new Properties(); props.put("metadata.broker.list", "broker1:9092,broker2:9092"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("partitioner.class", "example.producer.SimplePartitioner"); props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props);
metadata.broker.list:broker集群的地址,不用配置全部的broker地址,它会关联到其他的broker。你也不用担心topic或分区在哪个broker下,它会找到对应的broker。
serializer.class:指定采用哪种序列化方式将消息传输给Broker,你也可以在发送消息的时候指定序列化类型,不指定则以此为默认序列化类型。
partitioner.clss:指定消息发送对应分区方式,若不指定,则随机发送到一个分区,也可以在发送消息的时候指定分区类型。
request.required.acks:指定消息是否确定已发送成功,如果不设置值,则默认为“发送或不确认‘,可能会导致数据丢失。
现在开始定义producer对象:
Producer<String, String> producer = new Producer<String, String>(config);
生产者实例是一个java泛型,有两个参数类型,第一个分区key的类型,第二个是消息的类型。
现在构建要发送的消息。
Random rnd = new Random(); long runtime = new Date().getTime(); String ip = “192.168.2.” + rnd.nextInt(255); String msg = runtime + “,www.example.com,” + ip;在这个例子中我们假消息的网站访问IP地址。以逗号分隔,时间戳,第二个是网站地址,第三是请求者的IP地址。我们这里使用Java类随机的最后八位字节的IP不同,所以我们可以看到分区是如何工作的。
最后写发送broker的消息类:
KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg); producer.send(data);Topic是“page_visits”。这里我们通过IP作为分区键。注意,如果您不包括一个partition,即使你已经定义了一个partitioner,kafka将消息分配给一个随机的分区。
完整源码
import java.util.*; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class TestProducer { public static void main(String[] args) { long events = Long.parseLong(args[0]); Random rnd = new Random(); Properties props = new Properties(); props.put("metadata.broker.list", "broker1:9092,broker2:9092 "); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("partitioner.class", "example.producer.SimplePartitioner"); props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); for (long nEvents = 0; nEvents < events; nEvents++) { long runtime = new Date().getTime(); String ip = “192.168.2.” + rnd.nextInt(255); String msg = runtime + “,www.example.com,” + ip; KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg); producer.send(data); } producer.close(); } }
分区代码:
import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; public class SimplePartitioner implements Partitioner { public SimplePartitioner (VerifiableProperties props) { } public int partition(Object key, int a_numPartitions) { int partition = 0; String stringKey = (String) key; int offset = stringKey.lastIndexOf('.'); if (offset > 0) { partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions; } return partition; } }
这段逻辑的关键,我们得到的IP地址,取得最后一个字节,并进行分区数模运算,得出相应的分区,好处是相同的源ip划分到相同的分区里。但是你在消费的时候,要知道如何处理。
运行此之前,确保您已经创建topic:"page_visits"。从命令行:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 5 --topic page_visits
使用--partition选项创建1个以上的分区。
现在编译运行程序。
执行下面命令,确认是否已经有数据了。
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic page_visits --from-beginningMaven
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <version>0.8.1.1</version> <scope>compile</scope> <exclusions> <exclusion> <artifactId>jmxri</artifactId> <groupId>com.sun.jmx</groupId> </exclusion> <exclusion> <artifactId>jms</artifactId> <groupId>javax.jms</groupId> </exclusion> <exclusion> <artifactId>jmxtools</artifactId> <groupId>com.sun.jdmk</groupId> </exclusion> </exclusions> </dependency>
同一个生产者在多线程中未为同一个Topic发送消息,会报并发异常吗?
不会,客户端会统一加入到发送缓存队里,发送。
该获取获取kafka中的Logsize
该如何获取kafka中的Logsize,在网上找了好久,没发现一个Demo,站主可否指教下
直接去存储log的地方看吧。
server.properties server-1.properties server-2.properties
如果需要客户端可以访问的话,还需要。注意advertised.host.name一定要是客户端能路由到的;
所以相对于主机名,还是用IP比较稳妥。(我开始用的主机名,死活连接不上。感谢QQ群里的@段誉 @虚竹 @aDog )
e.x.:
advertised.host.name=192.168.221.136 advertised.port=9092