目前我们java应用系统有十几个消费线程(每个线程对应一个消费组),消费kafka不同的topic数据然后插入对应的数据库。但是业务方经常投诉数据同步不及时,运维过程中通过kafka命令:
./kafka-consumer-groups.sh --bootstrap-server 10.199.0.114:9001 --group xxxxxxxxx --describe --command-config ../config/consumer.properties
发现有部分消费组有大量的数据堆积,同时kafka消费组也异常退出了,如下图所示:
但是排查我们java应用程序,该消费组对应的消费线程是正常running的,且无任务异常日志,并且通过程序日志和jstack 命令发现,消费线程在poll的时候已经长时间阻塞,不消费数据了。
最后运维通过重启kafka消费线程,发现又能正常消费数据,并且调用kafka命令数据堆积也慢慢减少,消费组也恢复正常了。。。。。。。
最后最后运行一段时间后同样的现场又会在不同的消费线程上复现,重启消费线程同样能解决。。。
望大佬帮忙分析下!!!
我只想到了一个会导致你这种现象的发生:
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records){ // 这里,你的业务死锁了 } }
如果是业务发生死锁,堆栈线程不可能是这么显示的吧
首先这里要捕获异常,否则循环会终止,导致重复或漏消息。
另外如果这里过了,那下次轮寻的时候就会察觉失效了
最后确保客户端版本和服务端保持一致
你的答案