如果一条消息的处理时长超过了30秒(默认),则会触发rebalance,继续加大耗时,但是业务阻塞时间未知,还是会触发,导致消息丢失。而且加大耗时是很多参数要调,如offsets.commit.timeout.ms
,request.timeout.ms
,session.timeout.ms
等。
建议解决方案:
kafka支持动态控制消费的流量,分别在poll(long)
中使用pause(Collection)
和resume(Collection)
来暂停消费指定分配的分区,重新开始消费指定暂停的分区。