lioY3

0 声望

这家伙太懒,什么都没留下

个人动态
  • 半兽人 回复 lioY3Kafka Java客户端消费不到消息 中 :

    3、4也很重要,降低并发消费者先。

    1年前
  • lioY3 回复 半兽人Kafka Java客户端消费不到消息 中 :

    感谢站长回复。我新开了一个消费者demo,换了一个group消费第三方Kafka集群,参数如正文没做调整,去掉了发送数据到目标Kafka的操作,依然出现正文提到的报错1、2、3、4。我接下来尝试一下站长提出的解决方案1和2,但是目前的配置时间是不是已经很长了,有没有可能是和第三方的Kafka集群间的网络问题造成的呢?

    spring.kafka.consumer.max-poll-records=50
    spring.kafka.consumer.properties.max.poll.interval.ms=300000
    spring.kafka.consumer.properties.session.timeout.ms=60000
    spring.kafka.consumer.properties.request.timeout.ms=18500000
    
    1年前
  • 半兽人 回复 lioY3Kafka Java客户端消费不到消息 中 :

    Caused by: java.lang.IllegalStateException: Correlation id for response (922079) does not match request (922069), request header: RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=2, clientId=consumer-quanzhougaozhi-ivehcletransfer-group-0516-1, correlationId=922069)

    请求和收到的不匹配,可能是异常太多,多线程springboot错乱导致的。

    所以我们聚焦核心异常:

    核心异常是:org.apache.kafka.clients.consumer.CommitFailedException

    Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

    默认情况下,消息处理时间超过了30秒,kafka将该消费者从组中移除了,认为其已经无效。所以将组中移除,所以提交offset失败了。
    而失败,会导致重新选举消费者,而你的消费者程序有12个,就是此时导致的错乱。

    解决:

    1. 你收到一批消息之后,将消息发送到新的kafka集群的时间,不能超过30秒,调大超时时间(治标不治本),参考这篇文章的第一个回答:https://www.orchome.com/893
    2. 如上所示,如果你调大了超时时间,依旧出现CommitFailedException,那你就需要设置获取消息的数量大小了,就是pull少一点的kafka消息,这样发送到新kafka的量少了,超时的问题就得到了缓解。
    3. 根据你的参数,spring.kafka.listener.concurrency=12,并发也是导致你异常加重的原因之一,越小的话同时处理的消息少,不容易阻塞在提交新集群那里,异常的概率越低。另外你现在是2个消费者程序,可以通过增加消费者程序来解决并发的问题,而不是通过调这个参数。
    4. spring.kafka.listener.concurrency=12话说回来,你一共12个分区,1个消费者程序12个并发的话,1个消费者程序全占满了,另外一个消费者程序可能分到个别甚至是0个消费者分区。比如你现在2个消费者,concurrency不应该超过6个,3个话不能超过4个,我建议你默认1个就行了,处理不会慢太多,可以增加消费者程序来提高并发。
    1年前