kafka消费端一直报这个错误:o.s.k.listener.LoggingErrorHandler - Error while processing: null

无风三尺浪✅ 发表于: 2019-12-18   最后更新时间: 2019-12-18 23:46:31   8,471 游览

kafka消费端一直报这个错误:

o.s.k.listener.LoggingErrorHandler - Error while processing: null
o.s.k.listener.LoggingErrorHandler - Error while processing: null
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:722)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:600)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1250)
    at brave.kafka.clients.TracingConsumer.commitSync(TracingConsumer.java:146)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1327)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1188)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:688)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:745)
发表于 2019-12-18
添加评论

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

转译:

org.apache.kafka.client.consumer.CommitFailedException。无法完成提交,因为该组已经重新平衡并将分区分配给另一个成员。这意味着后续调用poll()的时间间隔长于配置的max.poll.interval.ms,这通常意味着poll循环花费了太多的时间进行消息处理。你可以通过增加会话超时或用max.poll.records减少poll()中返回的最大批次大小来解决这个问题。

这两个参数的介绍:

max.poll.interval.ms

使用消费者组管理时poll()调用之间的最大延迟。消费者在获取更多消息之前可以空闲的时间量的上限。如果此超时时间期满之前poll()没有调用,则该消费者被视为失败,并且分组将重新平衡,以便将分区重新分配给别的成员。

max.poll.records

在单次调用poll()中返回的最大消息数。

问题原因:

当你提交offset的时候,处理消息已经超过了30秒(默认),消费者已经从成员里踢出来了,重新进行消费者平衡,所以你无法提交。

建议里给了2个方式,通过加大max.poll.interval.ms超时时间,默认30秒,或者减少max.poll.records批次的消息数,少拿点消息,默认500条。

以下是java消费者客户端调整时间的例子:

props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,60000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,60000);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,60000);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,70000);
你的答案

查看kafka相关的其他问题或提一个您自己的问题