flume定义了kafkasource消费数据, 手动提交偏移量的方式 , kafka版本 2.1.0-cdh6.2.1 topic的分区数是1 topic的数据量增量一天千万左右, 为了保证顺序所以用了单分区
数据量较大的情况下出现
2020-12-24 17:13:38,725 (PollableSourceRunner-DirectKafkaSouce-source1) [ERROR - org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:812)] [Consumer clientId=consumer-1, groupId=group61] Offset commit failed on partition A_PREPAY_FLOW-0 at offset 431122116: The request timed out.
2020-12-24 17:13:38,727 (PollableSourceRunner-DirectKafkaSouce-source1) [INFO - org.apache.kafka.clients.consumer.internals.AbstractCoordinator.markCoordinatorUnknown(AbstractCoordinator.java:706)] [Consumer clientId=consumer-1, groupId=group61] Group coordinator hnbigdata006:9092 (id: 2147483323 rack: null) is unavailable or invalid, will attempt rediscovery
2020-12-24 17:13:38,830 (PollableSourceRunner-DirectKafkaSouce-source1) [INFO - org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:654)] [Consumer clientId=consumer-1, groupId=group61] Discovered group coordinator hnbigdata006:9092 (id: 2147483323 rack: null)
2020-12-24 17:13:38,830 (PollableSourceRunner-DirectKafkaSouce-source1) [INFO - org.apache.kafka.clients.consumer.internals.AbstractCoordinator.markCoordinatorUnknown(AbstractCoordinator.java:706)] [Consumer clientId=consumer-1, groupId=group61] Group coordinator hnbigdata006:9092 (id: 2147483323 rack: null) is unavailable or invalid, will attempt rediscovery
2020-12-24 17:13:38,932 (PollableSourceRunner-DirectKafkaSouce-source1) [INFO - org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:654)] [Consumer clientId=consumer-1, groupId=group61] Discovered group coordinator hnbigdata006:9092 (id: 2147483323 rack: null)
请大佬帮忙指导下
kafka一条消息默认处理是30秒,如果超过30秒就会被踢出group,重新选举,也就是说,你要在30秒内手动提交一次。
增加kafka超时时间:
session.timeout.ms
,offsets.commit.timeout.ms
,request.timeout.ms
。业务处理好像都在30秒内可以完成的, 这个问题是否和我broker节点异常有关呢?
大兄弟,broker都挂了,那超时不是正常的么。。
那我的数据在提交offest之前已经入库了, 这里offest提交失败, 也就是偏移量信息写入__consume_offests失败, 那下一轮消费拿到的还是这批数据了? 数据重复了?
是的,会导致重复消费。
要么先提交offset在处理消息,但当程序被强杀,或崩溃则会导致消息丢失。
我选择的是后者(程序做好压测,不随意更改线程和并发,不会崩溃)
你的答案