kafka生产者客户端(0.10.1.1API)

KafkaProducer 0.10.1.1 API 和0.9.0.0的API没有什么区别。

kafka客户端发布record(消息)到kafka集群。

新的生产者是线程安全的,在线程之间共享单个生产者实例,通常单例比多个实例要快。

一个简单的例子,使用producer发送一个有序的key/value(键值对),放到java的main方法里就能直接运行,

Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 Producer<String, String> producer = new KafkaProducer<>(props);
 for(int i = 0; i < 100; i++)
     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

 producer.close();

生产者的缓冲空间池保留尚未发送到服务器的消息,后台I/O线程负责将这些消息转换成请求发送到集群。如果使用后不关闭生产者,则会泄露这些资源。

send()方法是异步的,添加消息到缓冲区等待发送,并立即返回。生产者将单个的消息批量在一起发送来提高效率。

ack是判别请求是否为完整的条件(就是是判断是不是成功发送了)。我们指定了“all”将会阻塞消息,这种设置性能最低,但是是最可靠的。

retries,如果请求失败,生产者会自动重试,我们指定是0次,如果启用重试,则会有重复消息的可能性。

producer(生产者)缓存每个分区未发送消息。缓存的大小是通过 batch.size 配置指定的。值较大的话将会产生更大的批。并需要更多的内存(因为每个“活跃”的分区都有1个缓冲区)。

默认缓冲可立即发送,即便缓冲空间还没有满,但是,如果你想减少请求的数量,可以设置linger.ms大于0。这将指示生产者发送请求之前等待一段时间,希望更多的消息填补到未满的批中。这类似于TCP的算法,例如上面的代码段,可能100条消息在一个请求发送,因为我们设置了linger(逗留)时间为1毫秒,然后,如果我们没有填满缓冲区,这个设置将增加1毫秒的延迟请求以等待更多的消息。需要注意的是,在高负载下,相近的时间一般也会组成批,即使是 linger.ms=0。在不处于高负载的情况下,如果设置比0大,以少量的延迟代价换取更少的,更有效的请求。

buffer.memory 控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms设定,之后它将抛出一个TimeoutException。

key.serializervalue.serializer示例,将用户提供的key和value对象ProducerRecord转换成字节,你可以使用附带的ByteArraySerializaerStringSerializer处理简单的string或byte类型。

send()

public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback)

异步发送一条消息到topic,并调用callback(当发送已确认)。

send是异步的,并且一旦消息被保存在等待发送的消息缓存中,此方法就立即返回。这样并行发送多条消息而不阻塞去等待每一条消息的响应。

发送的结果是一个RecordMetadata,它指定了消息发送的分区,分配的offset和消息的时间戳。如果topic使用的是CreateTime,则使用用户提供的时间戳或发送的时间(如果用户没有指定指定消息的时间戳)如果topic使用的是LogAppendTime,则追加消息时,时间戳是broker的本地时间。

由于send调用是异步的,它将为分配消息的此消息的RecordMetadata返回一个Future。如果future调用get(),则将阻塞,直到相关请求完成并返回该消息的metadata,或抛出发送异常。

如果要模拟一个简单的阻塞调用,你可以调用get()方法。

 byte[] key = "key".getBytes();
 byte[] value = "value".getBytes();
 ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value)
 producer.send(record).get();

完全无阻塞的话,可以利用回调参数提供的请求完成时将调用的回调通知。

 ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
 producer.send(myRecord,
               new Callback() {
                   public void onCompletion(RecordMetadata metadata, Exception e) {
                       if(e != null)
                           e.printStackTrace();
                       System.out.println("The offset of the record we just sent is: " + metadata.offset());
                   }
               });

发送到同一个分区的消息回调保证按一定的顺序执行,也就是说,在下面的例子中 callback1 保证执行 callback2 之前:

producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1);
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);

注意:callback一般在生产者的I/O线程中执行,所以是相当的快的,否则将延迟其他的线程的消息发送。如果你需要执行阻塞或计算昂贵(消耗)的回调,建议在callback主体中使用自己的Executor来并行处理。

pecified by:
send in interface Producer<K,V>
Parameters:

record - 发送的记录(消息)
callback - 用户提供的callback,服务器来调用这个callback来应答结果(null表示没有callback)。

Throws:

InterruptException - 如果线程在阻塞中断。
SerializationException - 如果key或value不是给定有效配置的serializers。
TimeoutException - 如果获取元数据或消息分配内存话费的时间超过max.block.ms。
KafkaException - Kafka有关的错误(不属于公共API的异常)。







发表于: 1年前   最后更新时间: 8月前   游览量:47197
上一条: kafka的生态系统
下一条: kafka生产者API

评论…


  • 您好,请问一下为什么往kafka中生产了20条信息,但是用消费端去查看的时候却有400条信息呢?而且其他的信息都是那20条信息重复的。
    • 这种组名是随机的,你要指定消费者组,这样保证消费者都是一个组的,就不会重复消费。
      bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties
        • emmm,因为我需要在生产者生产数据的同时需要往不同的topic中添加数据,所以我去监控一个配置文件去实时读写,而问题是topic改变之后。新topic有新的数据加入,而替换之前的原topic也有新的数据加入。这是为什么?
            • 怎么会呢,你的核心逻辑是这个。“my-topic”不同,发送的主题就不同。
              producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
                没有用过消息中间件,感觉看不懂哈
                你好我再看client源码的时候发现,在send()过程有个检测metadata的时候也会触发max.block.ms,而且这个操作是阻塞当前线程,也就是我服务端挂了,当前线程会被阻塞,这样不是不能达到异步效果吗。kafka service挂了,当前业务也受到了影响
                • 不影响的,如果你异步发送,阻塞也是异步阻塞的,不会涉及到主线程的。并且是当ack=all的时候,才会异步阻塞。
                    • 不好意思今天才看见,我最近看了下源码,阻塞是在获取元数据(Metadata )的时候才会触发,第一次启动连不上服务端,内存中Metadata 对应的topic为空造成了强行当前线程阻塞,在站点运行过程中不会触发。发送是异步发送,但是发送前有段元数据的检测是造成阻塞的原因
                        producer发送到kafka,能否通过API检查是否被一个consumer消费完成呢?
                        你好,请问一下,我想用producer发送消息,是否能够在Callback得到Consumer处理后的信息,比如最简单的是producer发送Jack,Consumer处理后的信息为 Hello ,Jack!,在producer能够打印出来。这个的想法是生产者生产消息后,通过kafka分到不同的Consumer,实现业务的解耦合。
                        关于batch.size和buffer.memory不知道这样理解对不对:
                        生产者每次讲需要发送的消息在linger.ms设置的时间间隔内缓存在buffer.memory,然后再从缓存中发送到服务器?
                        那么缓存每个分区未发送消息,即batch.size指定的空间是什么啊?
                        • batch.size: 当多个消息要发送到相同分区的时,生产者尝试将消息批量打包在一起,以减少请求交互。这样有助于客户端和服务端的性能提升。该配置的默认批次大小(以字节为单位):不会打包大于此配置大小的消息。发送到broker的请求将包含多个批次,每个分区一个,用于发送数据。

                          buffer.memory: 生产者要用的总内存,用来缓存等待发送到服务器的消息的内存总字节数。
                            你好,我的producer每次生产数据需要很长一段时间,并且Java客户端的topic在终端上看不到..消费者也不能去,是什么原因呢
                            • 老哥那么晚还看博客,实属难得,问题描述的的确不清楚,现在问题解决了,因为用的是腾讯云环境,远程客户端没法方位,所以要配置/config/server.properties中的host.name = 你的公网地址

                                您好:
                                我在对kafka的生产者和消费者做压力测试,生产者拼命的发数据到kafka,消费者拼命地消费数据,但消费者是从头开始消费数据,也就是说消费者走的磁盘IO,没有使用缓存数据,这时由于消费者在疯狂读磁盘,导致磁盘的资源都被他用掉,磁盘IO队列用满,生产者无法向磁盘写数据,几十秒后生产者抛出大量超时异常。

                                即使消费者不会占据磁盘的全部IO,只要消费者消费了kafka较老的数据使用了磁盘IO,就会影响到生产者的性能。

                                请问有什么办法解决以上的问题么?
                                org.apache.kafka.common.config.ConfigException: Invalid value all for configuratio
                                n retries: Not a number of type INT  我用kafka生产者的时候,出现这个错误。但是我的retries配置的是1,所以不太清楚什么情况。
                                即遍缓冲空间还没有满,调整为“即便缓存空间还没有满”。
                              • 评论…
                                • in this conversation