需求:当轮询kafka的consumer重启的时候,可以从最新的offset开始消费
遇到问题:(kafka版本 2.12-2.2.0)当代码执行到poll方法时报错,且没有重置offset。在kafka服务端找不到这个consumer,是否assign不能注册consumer?可以使用subscribe()方法注册consumer的同时执行seekToEnd()吗?
更正问题!!!
通过assign()方法订阅分区失败,kafka服务器查不到对应consumer,导致后面poll方法报错。为什么assign分区会失败呢
相关代码
List<TopicPartition> finalTopicPartitions = new ArrayList<>();
for (String topicName : topicNames) {
List<PartitionInfo> partitionInfos = kafkaConsumer.partitionsFor(topicName);
List<TopicPartition> topicPartitions = partitionInfos.stream().map(partitionInfo -> new TopicPartition(topicName, partitionInfo.partition())).collect(Collectors.toList());
finalTopicPartitions.addAll(topicPartitions);
}
kafkaConsumer.assign(finalTopicPartitions);
kafkaConsumer.seekToEnd(finalTopicPartitions);
// 轮询kafka,拉取任务
while (true) {
...
// 从kafka获取content,并根据策略筛选条件进行筛选
ConsumerRecords records = kafkaConsumer.poll(pollTimeout);
}
相关配置
kafka:
"auto.offset.reset": latest
报错信息
[ERROR] [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.handle(ConsumerCoordinator.java:1167)] [Consumer clientId=consumer-1034-1, groupId=1034] Offset commit failed on partition test-0 at offset 15: The coordinator is not aware of this member.
你的
消费者组名
被占用了,还有其他的程序在使用。假设程序A和程序B以同一个
消费者组名
订阅这个分区的话,只有一个程序能消费到这我理解,怎么会有占用呢?然后我刚才去除别的实例的干扰,又查了下kafka服务端,发现虽然我调用了assign()方法,但是服务端并没有消费者,是否说明没订阅成功,为什么会没订阅成功呢?上面补充了查询kafka服务端的图
1、因为你们用了同一个名字,所以你指定的offset的就报了刚才的错误呀,不是其成员。
2、消费者启成功后,生产者是否有持续的消息写到该topic呢?
不好意思,回复完了。确实是因为没有消息写进来,看似没有订阅成功,应该是kafka sdk里面的订阅是个lazy操作,已经可以每次开始消费都从最新的offset了,问题解决了。就是1看不懂
谢谢
这个代码不对, 你应该使用
kafkaConmsumer.endOffsets
来读取 LEO(更准确的说应该是ISR里的HW), 如若你要使用seekToEnd()
, 你在这个方法之前一定要先提前调用consumer.poll(0)
应消费的元数据保存在__consumer_offstes
中的位置, 在这之后, 你才可以正常使用seekToBegin/seekToEnd
你的答案