感谢站长回复。我新开了一个消费者demo,换了一个group消费第三方Kafka集群,参数如正文没做调整,去掉了发送数据到目标Kafka的操作,依然出现正文提到的报错1、2、3、4。我接下来尝试一下站长提出的解决方案1和2,但是目前的配置时间是不是已经很长了,有没有可能是和第三方的Kafka集群间的网络问题造成的呢?
spring.kafka.consumer.max-poll-records=50
spring.kafka.consumer.properties.max.poll.interval.ms=300000
spring.kafka.consumer.properties.session.timeout.ms=60000
spring.kafka.consumer.properties.request.timeout.ms=18500000
Caused by: java.lang.IllegalStateException: Correlation id for response (922079) does not match request (922069), request header: RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=2, clientId=consumer-quanzhougaozhi-ivehcletransfer-group-0516-1, correlationId=922069)
请求和收到的不匹配,可能是异常太多,多线程springboot错乱导致的。
所以我们聚焦核心异常:
核心异常是:org.apache.kafka.clients.consumer.CommitFailedException
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
默认情况下,消息处理时间超过了30秒,kafka将该消费者从组中移除了,认为其已经无效。所以将组中移除,所以提交offset失败了。
而失败,会导致重新选举消费者,而你的消费者程序有12个,就是此时导致的错乱。
CommitFailedException
,那你就需要设置获取消息的数量大小了,就是pull少一点的kafka消息,这样发送到新kafka的量少了,超时的问题就得到了缓解。spring.kafka.listener.concurrency=12
,并发也是导致你异常加重的原因之一,越小的话同时处理的消息少,不容易阻塞在提交新集群那里,异常的概率越低。另外你现在是2个消费者程序,可以通过增加消费者程序来解决并发的问题,而不是通过调这个参数。spring.kafka.listener.concurrency=12
话说回来,你一共12个分区,1个消费者程序12个并发的话,1个消费者程序全占满了,另外一个消费者程序可能分到个别甚至是0个消费者分区。比如你现在2个消费者,concurrency
不应该超过6个,3个话不能超过4个,我建议你默认1个就行了,处理不会慢太多,可以增加消费者程序来提高并发。