kafka消费者自动提交报错,kafka需要怎么配置呢?

野心永恒 发表于: 2019-08-28   最后更新时间: 2019-08-28 14:25:57   5,004 游览

kafka消费者自动提交报错

  1. 业务场景是这样的:
    每当有一个任务下发的时候,我会启动一个线程,实例化一个consumer,consumer会订阅当前有任务的topic,当这些任务全部都消费完毕后,我会停止这个线程,但是最近发现kafka输出ERROR日志,日志如下:
    11:25:34 [Thread-18] ERROR o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=rzx-consumer-group] Offset commit failed on partition topic1566791132867-0 at offset 10: The coordinator is not aware of this member.
    11:25:34 [Thread-18] WARN  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=rzx-consumer-group] Synchronous auto-commit of offsets {topic1566791132867-0=OffsetAndMetadata{offset=10, metadata=''}} failed: Commit cannot be completed since the group
    has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processin
    g. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
    
    每个消息也都消费成功了,之后再处理相同topic的消息也不会重复消费,感觉是消费者配置的问题,目前的消费者配置为:
         props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 12695150);
         props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 12695150);
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
         props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    
    请问,需要怎么调整配置呢?
    谢谢
发表于 2019-08-28
添加评论

线程关闭的时候,调用kakfa消费者的close()方法。

野心永恒 -> 半兽人 5年前

调用了consumer.close();这个方法呢

半兽人 -> 野心永恒 5年前

kafka消费者提交offset时,kafka集群正在重新分配消费者,就会报该错误。
kafka客户端自动是独立的线程,按间隔自动提交,当最后一次提交offset前,kafka消费者已经通知k8s关闭,释放了消费者的身份。
建议:

  1. poll本身为长轮询,定时去拉取新消息,没必要每次new出来。
  2. 改为手动提交
野心永恒 -> 半兽人 5年前

每一次都new一个新的consumer是因为需要监听不同的topic,同一个consumer可以切换不同的topic吗?

你的答案

查看kafka相关的其他问题或提一个您自己的问题