支持的版本
>= 0.9
kafka客户端发布record(消息)
到kafka集群。
新的生产者是线程安全的,在线程之间共享单个生产者实例,通常单例比多个实例要快。
一个简单的例子,使用producer发送一个有序的key/value(键值对),放到java的main
方法里就能直接运行,
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
生产者的缓冲空间池保留尚未发送到服务器的消息,后台I/O线程负责将这些消息转换成请求发送到集群。如果使用后不关闭生产者,则会丢失这些消息。
send()
方法是异步的,添加消息到缓冲区等待发送,并立即返回。生产者将单个的消息批量在一起发送来提高效率。
ack
是判别请求是否为完整的条件(就是是判断是不是成功发送了)。我们指定了“all”将会阻塞消息,这种设置性能最低,但是是最可靠的。
retries
,如果请求失败,生产者会自动重试,我们指定是0次,如果启用重试,则会有重复消息的可能性。
producer
(生产者)缓存每个分区未发送的消息。缓存的大小是通过 batch.size
配置指定的。值较大的话将会产生更大的批。并需要更多的内存(因为每个“活跃”的分区都有1个缓冲区)。
默认缓冲可立即发送,即便缓冲空间还没有满,但是,如果你想减少请求的数量,可以设置linger.ms
大于0。这将指示生产者发送请求之前等待一段时间,希望更多的消息填补到未满的批中。这类似于TCP的算法,例如上面的代码段,可能100条消息在一个请求发送,因为我们设置了linger(逗留)时间为1毫秒,然后,如果我们没有填满缓冲区,这个设置将增加1毫秒的延迟请求以等待更多的消息。需要注意的是,在高负载下,相近的时间一般也会组成批,即使是 linger.ms=0
。在不处于高负载的情况下,如果设置比0大,以少量的延迟代价换取更少的,更有效的请求。
buffer.memory
控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms
设定,之后它将抛出一个TimeoutException。
key.serializer
和value.serializer
示例,将用户提供的key和value对象ProducerRecord转换成字节,你可以使用附带的ByteArraySerializaer或StringSerializer处理简单的string或byte类型。
幂等和事务
从Kafka 0.11
开始,KafkaProducer又支持两种模式:幂等生产者
和事务生产者
。幂等生产者加强了Kafka的交付语义,从至少一次交付到精确一次交付。特别是生产者的重试将不再引入重复。事务性生产者允许应用程序原子地将消息发送到多个分区(和主题!)。
要启用幂等(idempotence)
,必须将enable.idempotence
配置设置为true
。 如果设置,则retries(重试)
配置将默认为Integer.MAX_VALUE
,acks配置将默认为all
。API没有变化,所以无需修改现有应用程序即可利用此功能。
此外,如果send(ProducerRecord)
即使在无限次重试的情况下也会返回错误(例如消息在发送前在缓冲区中过期),那么建议关闭生产者,并检查最后产生的消息的内容,以确保它不重复。最后,生产者只能保证单个会话内发送的消息的幂等性
。
要使用事务生产者
和attendant API
,必须设置transactional.id
。如果设置了transactional.id
,幂等性会和幂等所依赖的生产者配置一起自动启用。此外,应该对包含在事务中的topic进行耐久性配置。特别是,replication.factor
应该至少是3
,而且这些topic的min.insync.replicas
应该设置为2
。最后,为了实现从端到端的事务性保证,消费者也必须配置为只读取已提交
的消息。
transactional.id
的目的是实现单个生产者实例的多个会话之间的事务恢复。它通常是由分区、有状态的应用程序中的分片标识符派生的。因此,它对于在分区应用程序中运行的每个生产者实例来说应该是唯一的。
所有新的事务性API都是阻塞的,并且会在失败时抛出异常。下面的例子说明了新的API是如何使用的。它与上面的例子类似,只是所有100条消息都是一个事务的一部分。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
producer.initTransactions();
try {
producer.beginTransaction();
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
}
producer.close();
如示例所示,每个生产者只能有一个未完成的事务。在beginTransaction()
和commitTransaction()
调用之间发送的所有消息都将是单个事务的一部分。当指定transactional.id
时,生产者发送的所有消息都必须是事务的一部分。
事务生产者使用异常来传递错误状态。特别是,不需要为producer.send()
指定回调,也不需要在返回的Future上调用.get()
:如果任何producer.send()
或事务性调用在事务过程中遇到不可恢复的错误,就会抛出KafkaException
。
该客户端可以与0.10.0
或更高版本的broker进行通信。旧的或较新的broker可能不支持某些客户端功能。例如,事务性API需要 0.11.0
或更新版本的broker。当调用在运行的broker版本中不可用的API时,您将收到UnsupportedVersionException
。
send()
public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback)
异步发送一条消息到topic,并调用callback
(当发送已确认)。
send是异步的,并且一旦消息被保存在等待发送的消息缓存
中,此方法就立即返回。这样并行发送多条消息而不阻塞去等待每一条消息的响应。
发送的结果是一个RecordMetadata,它指定了消息发送的分区,分配的offset和消息的时间戳。如果topic使用的是CreateTime,则使用用户提供的时间戳或发送的时间(如果用户没有指定指定消息的时间戳)如果topic使用的是LogAppendTime,则追加消息时,时间戳是broker的本地时间。
由于send调用是异步的,它将为分配消息的此消息的RecordMetadata
返回一个Future。如果future调用get(),则将阻塞,直到相关请求完成并返回该消息的metadata,或抛出发送异常。
如果要模拟一个简单的阻塞调用,你可以调用get()
方法。
byte[] key = "key".getBytes();
byte[] value = "value".getBytes();
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value)
producer.send(record).get();
完全无阻塞的话,可以利用回调参数提供的请求完成时将调用的回调通知。
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
producer.send(myRecord,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null)
e.printStackTrace();
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
});
发送到同一个分区的消息回调保证按一定的顺序执行,也就是说,在下面的例子中 callback1
保证执行 callback2
之前:
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1);
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);
注意:callback一般在生产者的I/O线程中执行,所以是相当的快的,否则将延迟其他的线程的消息发送。如果你需要执行阻塞或计算昂贵(消耗)的回调,建议在callback主体中使用自己的Executor来并行处理。
pecified by:
send in interface Producer<K,V>
Parameters:
record - 发送的记录(消息)
callback - 用户提供的callback,服务器来调用这个callback来应答结果(null表示没有callback)。
Throws:
InterruptException - 如果线程在阻塞中断。
SerializationException - 如果key或value不是给定有效配置的serializers。
TimeoutException - 如果获取元数据或消息分配内存花费的时间超过max.block.ms。
KafkaException - Kafka有关的错误(不属于公共API的异常)。
生产者的缓冲空间池保留尚未发送到服务器的消息,后台I/O线程负责将这些消息转换成请求发送到集群。如果使用后不关闭生产者,则会丢失这些消息。
请问,为什么不关闭生产者为什么会丢失消息
当进程结束时,kafka还有些消息在缓存中来不及发送,所以调用一下
close()
,告诉kafka生产者客户端立即发送。否则进程直接结束了,那你消息就没了。
1、如果这么说的话,假设
close()
之前,后台还有消息没发送,这时候客户端挂掉了,那么也会导致消息丢失?那有什么办法可以解决吗?2、
send()
是否可以理解为只是保存消息1、除了优雅停机,否则消息一定会丢。
2、是的,不是立即发送,你可以通过配置
linger.ms=0
立即发送,但是吞吐量就会打折。参见:https://www.orchome.com/511
好的,明白了,谢谢
消息一定会丢有点绝对了
1、同时发送100条消息,每条消息100字节,batch.size设置的1000字节,那么就会产生部分成功,部分失败这种现象
2、send方法返回的是个Future对象,如果显示调用future.get() 消息也不会丢的
我们先达成一致,部分成功部分失败等价消息会丢。
除了优雅停机,那其他的情况基本是程序崩溃了吧,相当于(kill -9)。假设现在程序在运行,我直接拔电源,那么消息会不会丢?
kafka 官方建议调用producer.send后使用close()方法,看了一下代码close方法会会将缓存队列状态置为关闭,唤醒io线程将内存中的数据发往broker,如果每次send后都调close方法会不会有问题,我理解应该是服务停止时才调用close方法避免消息丢失,求大神解答
对 只有进程退出,最后一次才需要执行。
kafka同步推送消息, 异常
这是kafka服务挂起拒绝接收请求的原因么
timeout了。应该先check下网络通不通哦。
网络是通的,定时脚本任务, 有的任务能推送, 有的任务失败, 查看打印日志, 就是这个日志, 定位好久, 没有找到哪里问题
springframework提供生成producer的DefaultKafkaProducerFactory工厂类,
它实现了DisposableBean的destroy方法,在应用结束时该destroy方法被调用,
在该方法中对producer进行优雅关闭,尽量将缓冲池的中的消息发送到kafka,避免消息的丢失。
———自己的观察:实际项目里关于生产者使用完如何关闭(【总结不对的地方请尽情指出】)
step8,kafka_2.13-2.40版本,输入的topic是streams-plaintext-input 而不是streams-file-input。
如果使用streams-file-input,运行./bin/kafka-run-class org.apache.kafka.streams.examples.wordcount.WordCountDemo指令会报错,从而无法继续
你发错地方了额。
。。。
最新的kafka官网文档在使用命令行工具的时候都使用的--bootstrap-server 或者broker-list,你的文档里有些-zookeeper的,我试了-zookeeper,命令并不报错,但是使用之后出现命令行工具的结果不对的现象,比如查看所有的topic时查不到,请问这两种方式的本质区别是什么
有些查不到的现象是zookeeper里面的数据 已经放到了kafka自身存储了。如消费者列表
我具体说一下,是我通过--bootstrap-server参数创建的topic,通过--bootstrap-server可以查到topic列表,这时我用--zookeeper参数去创建一个topic,然后再查topic列表就查不到了,无论通过--bootstrap-server还是--zookeeper参数,终端也没有报错。。。
给我的直观感受就是--zookeeper有毒,使用一次之后,各种命令行工具的运行结果好像都有点问题
到0.9后zk弱化了。向下不兼容了
使用生产者发生消息大概30秒后,就开始报这样的错误,请问是哪里处理问题呢?
你发送消息的时候用同步试试,估计你一开始就没成功过.
.get()
关于可重试,当我们使用了异步发送消息的时候,是kafka内部会帮我们进行重试直到最后重试结束才触发回调还是kafka依旧会触发回调..然后再进行重试(比如说三次重试三次回调)??
这是客户端的能力,不是kafka内部的能力。
您好,我在windows环境写了简单的producer,跟您上面给的例子一样,运行也没有报错,但是在虚拟机linux里的kafka自带的consumer消费不到,listener也设置了虚拟机的ip端口,请问是哪里出了问题呢?
虚拟机和windows能ping通,网络应该没问题
发个帖子吧,把server.config和程序的核心贴一下。
有个问题麻烦下:
在生产者发送消息报错:1514 ms has passed since batch creation plus linger time。不知道哪里的问题,检查了配置也没发现问题。
https://www.orchome.com/511
看下这个。