kafk设置批量消费,但是在生产环境下好多都是拉取一条消息进行消费,然后存入Hbase中会报CommitFaileExecption错误信息

周末多睡会 发表于: 2021-06-16   最后更新时间: 2021-06-16 15:50:39   2,827 游览

我现在用的是spring集成的kafka,设置批量从kafka拉取消息,采用异步多线程手动提交的方式,在测试环境下有消息挤压情况下,每次会拉去设置拉取的消息数量,但是在生产环境下,就每次拉取一条数据~,然后存入Hbase中会报CommitFaileExecption错误消息, 这错误消息意思大概是这批消息消费时间超过了kafka设置的poll时间,进行了消费者Rebalancing

错误信息:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

相关配置和代码

listener:
  poll-timeout: 1500
consumer:
  max-poll-records: 500
  fetch-min-bytes: 1048576
  fetch.max.wait.ms: 2000
  enable-auto-commit: false
@Bean
 public KafkaListenerContainerFactory kafkaListenerContainerFactory(ConsumerFactory consumerFactory) {
     ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
             new ConcurrentKafkaListenerContainerFactory<>();
     factory.setConsumerFactory(consumerFactory);
     factory.setBatchListener(true);
     //设置提交偏移量的方式
     factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
     return factory;
 }
@KafkaListener(topics = {kafkaTopConfig.UPLINKSTATUS_TOPIC}, groupId = kafkaTopConfig.GROUP_ID,
            containerFactory = "kafkaListenerContainerFactory",
            properties = "max.poll.interval.ms:60000")
    public void uplinkStatusListener(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
        log.info("uplinkStatusKafkaListener批量拉取消息数量:{}", records.size());
        ArrayList<UplinkStatusInfo> uplinkStatusInfos = new ArrayList<>();
        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            if (kafkaMessage.isPresent()) {
                String message = (String) kafkaMessage.get();
                ack.acknowledge();
                UplinkStatusInfo uplinkStatusInfo = JSONObject.parseObject(message, UplinkStatusInfo.class);
                uplinkStatusInfos.add(uplinkStatusInfo);
                log.info("apConfigInfo上报消息:{}, deviceId: {}", uplinkStatusInfo, uplinkStatusInfo.getDeviceId());
            }
        }

        CompletableFuture.runAsync(() -> {
            try {
                hbaseService.saveUplinkStatusInfo(uplinkStatusInfos);
            } catch (Exception e) {
                log.error("uplinkstatus上报消息存入Hbase失败: {}", e);
            }
        }, ThreadPoolUtil.getHbaseThreadPool());
    }

自己通过增大poll时间,减少消息拉取大小和数量,都未解决在有消息挤压情况下,每次拉取数据为一条和CommitFaileExecption错误的问题。

发表于 2021-06-16
  • 补充一下完整的错误信息。半兽人 3年前
    @半兽人 大佬错误信息是这个的: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.周末多睡会
    @半兽人 我采用异步多线程的方式,如果消费一批数据采用手动提交的话,不是应该直接更新kafka的offset,而不会等待这批消息存入Hbase中,也不会管这批消息是否会丢,应该不会出现消费者`rebalanced`, 我这样理解是对的吗?周末多睡会
    @周末多睡会 你理解的是对的,我现在关注的点是你的代码是否按照你的逻辑实现的(ps:我没使用过这种手动提交)。半兽人
    @半兽人 我理解出现CommitFailedException错误原因主要是consumer消费这一批数据到入库比较耗时,超过了max.poll.interval.ms设置的时间,导致Broker以为consuemr服务已不可用,然后重新分配新的consumer进行消费的,周末多睡会
    @半兽人 你说的我的代码是否按照我的逻辑实现,是指手动提交offset吗?我在测试环境下,就没有出现在有消息挤压的情况下,大量consumer只拉取一条消息进行消费和CommitFailedException的错误, 但是在生产上这两者都会出现,周末多睡会
¥5.0

默认配置情况下,1条消息处理时长最大是30秒(也就是提交一下offset),否则kafka则认为该消费者已经失连了,会重新平衡消费者,而这时你的消费者在提交,已经失效了。

  1. 确认一下你的异步流程在丢消息进去的时候,是否有阻塞。因为即使你一次拿100条还是500条,你不阻塞的情况下,也会把队列很快加满的。
  2. 多打一些日志,把你异步队列正在处理数量,每次提交offset的时间,都打印一下。

你这种写法很不安全,还是上面说的,kafka虽然你一个批次拉取的数量控制了,但是异步非阻塞的处理情况下,拉取的频繁了,效果是一样的。

大佬 如果产生阻塞,就是因为异步提交入库的逻辑比较耗时,导致线程池资源不够用对吧, 还有一点很奇怪的就是我提到的,在有消息挤压的情况下,会出来很多拉取消息为几条的情况~ 但是配置的有等待拉取时间,以及拉取的字节数和拉取消息的数量,但是一直找不到拉取很少几条数据的原因

这个耗时你最好打点日志来看。
等待拉取时间是没用的,这个时间的意思是,当kafka里没有消息了,下一次尝试拉取的时间,如果kafka里一直有消息,就不会阻塞了。

好的 谢谢

你的答案

查看kafka相关的其他问题或提一个您自己的问题