kafka发送失败数据获取问题

Icuiacd 发表于: 2019-11-04   最后更新时间: 2019-11-04 16:15:03   2,822 游览

使用producer.send(record, new Callback() {});异步发送数据,手动关停集群,如何获取发送失败的数据

已知可以每次发送调用get方法阻塞

RecordMetadata metadata=producer.send(record).get();

但是吞吐量就降低了

发表于 2019-11-04

1、关闭项目是不会导致消息丢失的(kill而不是kill -9)。
2、更安全的方式是在项目停止之前,调用一下producer.close();,防止生产者中还有缓存的消息未发送。
3、强制关闭(kill -9)任何方式都会丢失消息,不在此次讨论范围内,必丢消息,此种场景几乎不存在。

Icuiacd -> 半兽人 5年前

谢谢,但实际我想问的不是如何不丢数据,而是如何获取到发送失败的数据,语言是java,写入采用带callback的send方法,异常的时候可以捕获到exception,但是无法获取到对应的数据

Icuiacd -> 半兽人 5年前
for (int i = 0; i < 999999999; i++) {
    ProducerRecord<String, String> record=new ProducerRecord<String, String>("topicname", 0, "key", "测试汉字测试汉字测试汉字测试汉字测试汉字测试汉字测试汉字"+i);
    producer.send(record, new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            // TODO Auto-generated method stub
            System.out.println(metadata.offset());
            if(metadata.offset()==-1L){
                log.info(metadata.toString());
                log.info(exception.toString()+"\n"+exception.getMessage());
                exception.printStackTrace();
            }
        }
    });
}
Icuiacd -> Icuiacd 5年前
-1
[2019-11-04 17:03:25,476] INFO topicname-0@-1 (kafka.ProducerFor7667)
[2019-11-04 17:03:25,476] INFO org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for testcaoyh-0: 30085 ms has passed since last append
Expiring 1 record(s) for topicname-0: 30085 ms has passed since last append (kafka.ProducerFor7667)
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for topicname-0: 30085 ms has passed since last append
半兽人 -> Icuiacd 5年前

1、new callback(m) 时,可以把你原始消息(m)也传递到类里。
2、callback将失败的加到失败队列里,从上层重新发送(不要在callback中重新发送)。

Icuiacd -> 半兽人 5年前

谢谢,解决了

怎么解决的,可以分享下吗

你的答案

查看kafka相关的其他问题或提一个您自己的问题