kafka消费者突然不消费了,有错误日志输出
kafka集群环境是3个节点,2副本,消费者模块最近时不时的无法消费消息,其中一个消费者错误日志如下:
2021-09-23 10:28:20.460 ERROR [Thread-10] [] o.a.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=gw_clm_consumer] Offset commit failed on partition risk_gateway_etl-0 at offset 2579636: This is not the correct coordinator.
2021-09-23 10:28:20.460 INFO [Thread-10] [] o.a.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=gw_clm_consumer] Group coordinator 172.16.196.85:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery
2021-09-23 10:28:20.560 INFO [Thread-10] [] o.a.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=gw_clm_consumer] Discovered group coordinator 172.16.196.84:9092 (id: 2147483647 rack: null)
2021-09-23 10:28:20.562 ERROR [Thread-10] [] o.a.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=gw_clm_consumer] Offset commit failed on partition risk_gateway_etl-0 at offset 2579636: The coordinator is not aware of this member.
另一个消费者错误日志如下:
而且另一个消费者在同一时间的错误日志是:
2021-09-23 10:28:19.247 ERROR [Thread-10] [] o.a.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=gw_clm_consumer] Offset commit failed on partition risk_gateway_etl-2 at offset 2577133: The request timed out.
2021-09-23 10:28:19.247 INFO [Thread-10] [] o.a.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=gw_clm_consumer] Group coordinator 172.16.196.85:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery
2021-09-23 10:28:19.479 INFO [Thread-10] [] o.a.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=gw_clm_consumer] Discovered group coordinator 172.16.196.85:9092 (id: 2147483645 rack: null)
2021-09-23 10:28:20.459 ERROR [Thread-10] [] o.a.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=gw_clm_consumer] Offset commit failed on partition risk_gateway_etl-2 at offset 2577133: This is not the correct coordinator.
2021-09-23 10:28:20.459 INFO [Thread-10] [] o.a.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=gw_clm_consumer] Group coordinator 172.16.196.85:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery
2021-09-23 10:28:20.560 INFO [Thread-10] [] o.a.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=gw_clm_consumer] Discovered group coordinator 172.16.196.84:9092 (id: 2147483647 rack: null)
2021-09-23 10:28:20.562 ERROR [Thread-10] [] o.a.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=gw_clm_consumer] Offset commit failed on partition risk_gateway_etl-2 at offset 2577133: The coordinator is not aware of this member.
另: 我在集群中其中一台broker里日志里看到如下信息:
[2021-09-23 10:28:10,998] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions elk-0(kafka.server.ReplicaFetcherManager)
[2021-09-23 10:28:10,998] INFO [ReplicaFetcherManager on broker 1] Added fetcher for partitions List([elk-0, initOffset 10040090963 to broker BrokerEndPoint(0,172.16.196.84,9092)] ) (kafka.server.ReplicaF
etcherManager)
[2021-09-23 10:28:10,998] INFO [ReplicaAlterLogDirsManager on broker 1] Added fetcher for partitions List() (kafka.server.ReplicaAlterLogDirsManager)
[2021-09-23 10:28:11,007] INFO [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread)
[2021-09-23 10:28:11,007] INFO [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Error sending fetch request (sessionId=1693859521, epoch=21103032) to node 2: java.nio.channels.ClosedSelectorException. (org.
apache.kafka.clients.FetchSessionHandler)
[2021-09-23 10:28:11,008] INFO [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)
[2021-09-23 10:28:11,008] INFO [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Shutdown completed(kafka.server.ReplicaFetcherThread)
以上问题查了些资料,但没有收获,希望能在这里找到答案。
这个错误一般是offset提交的时候,消息处理时间超过了30秒,kafka将该消费者从组中移除了,认为其已经无效。所以将组中移除,所以提交offset失败了。
可通过加大超时时间:
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,60000); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,60000); props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,60000); props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,70000);
来解决这个问题。
相关的错误和解决方法和参考:https://www.orchome.com/6742
我看了下我的配置,目前配置的是25秒,那我把这4个参数都设置下看看
你的答案