我现在用的是spring集成的kafka,设置批量从kafka拉取消息,采用异步多线程手动提交的方式,在测试环境下有消息挤压情况下,每次会拉去设置拉取的消息数量,但是在生产环境下,就每次拉取一条数据~,然后存入Hbase中会报CommitFaileExecption
错误消息, 这错误消息意思大概是这批消息消费时间超过了kafka设置的poll时间,进行了消费者Rebalancing
错误信息:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
相关配置和代码
listener:
poll-timeout: 1500
consumer:
max-poll-records: 500
fetch-min-bytes: 1048576
fetch.max.wait.ms: 2000
enable-auto-commit: false
@Bean
public KafkaListenerContainerFactory kafkaListenerContainerFactory(ConsumerFactory consumerFactory) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);
//设置提交偏移量的方式
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
@KafkaListener(topics = {kafkaTopConfig.UPLINKSTATUS_TOPIC}, groupId = kafkaTopConfig.GROUP_ID,
containerFactory = "kafkaListenerContainerFactory",
properties = "max.poll.interval.ms:60000")
public void uplinkStatusListener(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
log.info("uplinkStatusKafkaListener批量拉取消息数量:{}", records.size());
ArrayList<UplinkStatusInfo> uplinkStatusInfos = new ArrayList<>();
for (ConsumerRecord<?, ?> record : records) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
String message = (String) kafkaMessage.get();
ack.acknowledge();
UplinkStatusInfo uplinkStatusInfo = JSONObject.parseObject(message, UplinkStatusInfo.class);
uplinkStatusInfos.add(uplinkStatusInfo);
log.info("apConfigInfo上报消息:{}, deviceId: {}", uplinkStatusInfo, uplinkStatusInfo.getDeviceId());
}
}
CompletableFuture.runAsync(() -> {
try {
hbaseService.saveUplinkStatusInfo(uplinkStatusInfos);
} catch (Exception e) {
log.error("uplinkstatus上报消息存入Hbase失败: {}", e);
}
}, ThreadPoolUtil.getHbaseThreadPool());
}
自己通过增大poll时间,减少消息拉取大小和数量,都未解决在有消息挤压情况下,每次拉取数据为一条和CommitFaileExecption
错误的问题。
默认配置情况下,1条消息处理时长最大是30秒(也就是提交一下offset),否则kafka则认为该消费者已经失连了,会重新平衡消费者,而这时你的消费者在提交,已经失效了。
你这种写法很不安全,还是上面说的,kafka虽然你一个批次拉取的数量控制了,但是异步非阻塞的处理情况下,拉取的频繁了,效果是一样的。
大佬 如果产生阻塞,就是因为异步提交入库的逻辑比较耗时,导致线程池资源不够用对吧, 还有一点很奇怪的就是我提到的,在有消息挤压的情况下,会出来很多拉取消息为几条的情况~ 但是配置的有等待拉取时间,以及拉取的字节数和拉取消息的数量,但是一直找不到拉取很少几条数据的原因
这个耗时你最好打点日志来看。
等待拉取时间是没用的,这个时间的意思是,当kafka里没有消息了,下一次尝试拉取的时间,如果kafka里一直有消息,就不会阻塞了。
好的 谢谢
你的答案