kafka producer异步发送在失败回调里close,会失去前面retry的request的正确回调结果

冰点 发表于: 2018-01-25   最后更新时间: 2021-11-15 09:59:33   14,125 游览

我们使用producer异步发送,在失败的callback里close(0)来中止任何后序发送中的数据,在正常情况时效果非常好,但有个异常情况无法解决:

我们的in.flight为1,当一个request因为网络抖动长时间写不成功时,会得到如下WARN日志然后request进行重试:

Got error produce response with correlation id 19889 on topic-partition test1_local-0, retrying (5 attempts left). Error: NETWORK_EXCEPTION

但这个重试的request会耽误后序的batch,导致后序的batch抛出异常:

TimeoutException: Expiring 309 record(s) for test3_local-3: 30001 ms has passed since batch creation plus linger time

发送失败的这个后序batch在我们的callback里显式调用了close(0),问题来了,close(0)之后,前面retrying的request实际上都能retry成功写入broker,但在callback里显示失败:

IllegalStateException: Producer is closed forcefully.

由于我们的数据发送的序号是根据回调的成功来递增的,重新建立producer之后,会选择从最后一个成功回调的数据序号继续发送,导致retry的数据全部重复了一遍。

请问博主,解决这个异常的场景的思路在哪里呢?

发表于 2018-01-25
添加评论

问题解决了,问题在于我们用同一个producer发送很多topic-partition,发送中的partition不会做超时检查,但指向同一个broker而且因为前面request导致等待未发送的partition的batch会被检查超时。我在这里回答,给其他人提供相同问题的解决办法,producer尽量减少发送的topic-partition,即使同一个topic如果partition多于broker数,也会容易出相同问题。

Jack -> 冰点 6年前

大哥,请问一下“同一个producer发送很多topic-partition”这句话是什么意思?是说topic的partition数少设一点,不要超过broker个数就可以了吗?
producer.config对应的配置文件和consumer.config对应的配置文件还有什么其他配置项是需要注意的吗?谢谢

冰点 -> Jack 6年前
“同一个producer发送很多topic-partition”这句话是说,要尽量少的共享使用同一个producer对象并发的发送多个partition,比如多线程使用同一个producer对象进行同步发送,或者单线程使用同一个producer对象进行异步发送。
12345 -> 冰点 6年前

遇到一样的问题,谢谢提醒。
还有一个问题,kafka估计有一个返回的验证机制,发送完后立即关闭进程。会导致错误。
这个社区有群么?

半兽人 -> 12345 6年前

QQ群:472582182

你的答案

查看kafka相关的其他问题或提一个您自己的问题