kafka服务端显示断开与消费者链接,但是消费者没有任何异常;
[2019-10-25 04:41:28,835] INFO [GroupCoordinator 0]: Member consumer-3-c80f9b26-ce89-4685-8afa-d5766e32fd47 in group DataAcquireGroup has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-10-25 04:41:28,835] INFO [GroupCoordinator 0]: Preparing to rebalance group DataAcquireGroup in state PreparingRebalance with old generation 30 (__consumer_offsets-33) (reason: removing member consumer-3-c80f9b26-ce89-4685-8afa-d5766e32fd47 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
[2019-10-25 04:41:28,965] INFO [GroupCoordinator 0]: Member consumer-4-36f437e5-186b-470f-92cd-f02483b8fa7d in group DataAcquireGroup has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-10-25 04:41:28,975] INFO [GroupCoordinator 0]: Member consumer-1-a0eaa01b-1fdf-40be-a9d3-b7e527bcb7fe in group DataAcquireGroup has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-10-25 04:41:28,987] INFO [GroupCoordinator 0]: Member consumer-5-cf609397-08a1-4eef-b6cf-19a63ad24ad3 in group DataAcquireGroup has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-10-25 04:41:28,989] INFO [GroupCoordinator 0]: Member consumer-2-331356e1-34ee-49a7-b352-ef2729a001bb in group DataAcquireGroup has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
消费者代码:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
ToolsUtil.linkedBlockingQueue.put(record);
}
}
消费者配置:
bootstrap.servers=127.0.0.1:9092
group.id=DataAcquireGrouphuihui
enable.auto.commit=true
auto.commit.interval.ms=1000
max.poll.interval.ms=60000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
max.poll.records=100
消费者正常运作10+小时后出现问题,且没有任何异常或者是警告输出 生产者运作正常
一切正常,进程被杀,一般是oom引起的,系统日志中会有。
另外,你启动的消费者的程序是守护进程吧。
首先进程没有被杀死,查看日志发现没有任何输出,CPU使用暴涨至100%,启动的程序是普通java程序,通过打印信息排查,执行到consumer.poll(100);后程序不动了,并没有被杀死,望大佬帮忙看看
consumer.poo(100) 是长轮询,每个100ms去kafka拉取消息。
cpu100% 要找出哪个线程跑到100%了。
可参考:https://www.orchome.com/833
请问下有排查到原因吗
你的答案