业务场景是这样的:
每当有一个任务下发的时候,我会启动一个线程,实例化一个consumer,consumer会订阅当前有任务的topic,当这些任务全部都消费完毕后,我会停止这个线程,但是最近发现kafka输出ERROR日志,日志如下:11:25:34 [Thread-18] ERROR o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=rzx-consumer-group] Offset commit failed on partition topic1566791132867-0 at offset 10: The coordinator is not aware of this member.
11:25:34 [Thread-18] WARN o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=rzx-consumer-group] Synchronous auto-commit of offsets {topic1566791132867-0=OffsetAndMetadata{offset=10, metadata=''}} failed: Commit cannot be completed since the group
has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processin
g. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
每个消息也都消费成功了,之后再处理相同topic的消息也不会重复消费,感觉是消费者配置的问题,目前的消费者配置为: props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 12695150);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 12695150);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
请问,需要怎么调整配置呢?
谢谢
线程关闭的时候,调用kakfa消费者的
close()
方法。调用了consumer.close();这个方法呢
kafka消费者提交offset时,kafka集群正在重新分配消费者,就会报该错误。
kafka客户端自动是独立的线程,按间隔自动提交,当最后一次提交offset前,kafka消费者已经通知k8s关闭,释放了消费者的身份。
建议:
每一次都new一个新的consumer是因为需要监听不同的topic,同一个consumer可以切换不同的topic吗?
你的答案