kafka生产者实例

半兽人 发表于: 2015-02-09   最后更新时间: 2016-10-25 22:23:53  
{{totalSubscript}} 订阅, 19,073 游览

kafka生产者实例

相信你对kafka已经有一定的了解了,是时候编写一些代码啦。
JAVA客户端实例

Producers(生产者)

生产者类是用于创建新消息为一个特定的主题和可选的分区。
这个例子是java的,你需要引入相关的包。

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

第一步是定义属性,如果使producer发现集群,序列化消息,如果适当的指导消息到一个特定的分区。

Properties props = new Properties();
 
props.put("metadata.broker.list", "broker1:9092,broker2:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "example.producer.SimplePartitioner");
props.put("request.required.acks", "1");

ProducerConfig config = new ProducerConfig(props);

metadata.broker.list:broker集群的地址,不用配置全部的broker地址,它会关联到其他的broker。你也不用担心topic或分区在哪个broker下,它会找到对应的broker。

serializer.class:指定采用哪种序列化方式将消息传输给Broker,你也可以在发送消息的时候指定序列化类型,不指定则以此为默认序列化类型。

partitioner.clss:指定消息发送对应分区方式,若不指定,则随机发送到一个分区,也可以在发送消息的时候指定分区类型。

request.required.acks:指定消息是否确定已发送成功,如果不设置值,则默认为“发送或不确认‘,可能会导致数据丢失。


现在开始定义producer对象:

Producer<String, String> producer = new Producer<String, String>(config);

生产者实例是一个java泛型,有两个参数类型,第一个分区key的类型,第二个是消息的类型。
现在构建要发送的消息。

Random rnd = new Random();

long runtime = new Date().getTime();

String ip = “192.168.2.” + rnd.nextInt(255);

String msg = runtime + “,www.example.com,” + ip;
在这个例子中我们假消息的网站访问IP地址。以逗号分隔,时间戳,第二个是网站地址,第三是请求者的IP地址。我们这里使用Java类随机的最后八位字节的IP不同,所以我们可以看到分区是如何工作的。

最后写发送broker的消息类:

KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);


producer.send(data);
Topic是“page_visits”。这里我们通过IP作为分区键。注意,如果您不包括一个partition,即使你已经定义了一个partitioner,kafka将消息分配给一个随机的分区。


完整源码

import java.util.*;
 
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
 
public class TestProducer {
    public static void main(String[] args) {
        long events = Long.parseLong(args[0]);
        Random rnd = new Random();
 
        Properties props = new Properties();
        props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "example.producer.SimplePartitioner");
        props.put("request.required.acks", "1");
 
        ProducerConfig config = new ProducerConfig(props);
 
        Producer<String, String> producer = new Producer<String, String>(config);
 
        for (long nEvents = 0; nEvents < events; nEvents++) { 
               long runtime = new Date().getTime();  
               String ip = “192.168.2.” + rnd.nextInt(255); 
               String msg = runtime + “,www.example.com,” + ip; 
               KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
               producer.send(data);
        }
        producer.close();
    }
}

分区代码:

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
 
public class SimplePartitioner implements Partitioner {
    public SimplePartitioner (VerifiableProperties props) {
 
    }
 
    public int partition(Object key, int a_numPartitions) {
        int partition = 0;
        String stringKey = (String) key;
        int offset = stringKey.lastIndexOf('.');
        if (offset > 0) {
           partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;
        }
       return partition;
  }
 
}

这段逻辑的关键,我们得到的IP地址,取得最后一个字节,并进行分区数模运算,得出相应的分区,好处是相同的源ip划分到相同的分区里。但是你在消费的时候,要知道如何处理。

运行此之前,确保您已经创建topic:"page_visits"。从命令行:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 5 --topic page_visits

使用--partition选项创建1个以上的分区。

现在编译运行程序。

执行下面命令,确认是否已经有数据了。

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic page_visits --from-beginning
Maven
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.9.2</artifactId>
  <version>0.8.1.1</version>
  <scope>compile</scope>
  <exclusions>
    <exclusion>
      <artifactId>jmxri</artifactId>
      <groupId>com.sun.jmx</groupId>
    </exclusion>
    <exclusion>
      <artifactId>jms</artifactId>
      <groupId>javax.jms</groupId>
    </exclusion>
    <exclusion>
      <artifactId>jmxtools</artifactId>
      <groupId>com.sun.jdmk</groupId>
    </exclusion>
  </exclusions>
</dependency>



更新于 2016-10-25

自由如风 4年前

同一个生产者在多线程中未为同一个Topic发送消息,会报并发异常吗?

半兽人 -> 自由如风 4年前

不会,客户端会统一加入到发送缓存队里,发送。

天南地北 6年前

该获取获取kafka中的Logsize

该如何获取kafka中的Logsize,在网上找了好久,没发现一个Demo,站主可否指教下

半兽人 -> 天南地北 6年前

直接去存储log的地方看吧。

江湖浪子 8年前
server.properties
server-1.properties
server-2.properties

如果需要客户端可以访问的话,还需要。注意advertised.host.name一定要是客户端能路由到的;
所以相对于主机名,还是用IP比较稳妥。(我开始用的主机名,死活连接不上。感谢QQ群里的@段誉 @虚竹 @aDog )
e.x.:

advertised.host.name=192.168.221.136
advertised.port=9092
查看history更多相关的文章或提一个关于history的问题,也可以与我们一起分享文章