从broker offset 为0 处开始消费,发现poll count经常为0,这是什么原因呢?期待回复 max.partition.fetch.bytes=1048576 max.poll.records=500
你重置offset,你可能只重置了其中一个分区的offset,而没有重置所有分区的offset。
例如(伪代码):
Map<TopicPartition, Long> topicParitionMap = new HashMap<>(); for (int i = 0; i < partitionNum; i++) topicParitionMap.put(new TopicPartition(topic, i), 0); // 重置所有分区的offset topicParitionMap.forEach((k, v) -> { consumer.seek(k, v); System.out.println(k + ", offsets:" + v); });
设置成功之后,开始消费的时候,所有的offset都指定了最早的位置,通过以下命令确定该消费者组,所有分区的offset位置:
## 显示某个消费组的消费详情(0.10.1.0版本+) bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
现在我们是12个分区,因为目前的数据分布不均匀,所以我想测试下,从kafka 一个分区中一段时间的拉取量,就指定了从一个分中拉取进行测试。
我是通过这个方式:
TopicPartition tp = new TopicPartition("test",0); consumer.seek(tp, 0);
刚开始的测试的时候,确实是每个分区重置去测试的。数据分布不均匀会导致拉取count 为0 吗?
partitions.forEach(tp -> { consumer.seek(tp, 0); });
你确认一下offset的位置,是不是位置不一样,所以已经没有消息了
有消息的
partition CURRENT-OFFSET LOG-END-OFFSET LAG
0 89959331 97139009 7179678
是哪里配置有问题吗,测试了producer 的吞吐量,大概每秒27M 的数据,测试的时候, 把 consumer max.partition.fetch.bytes 也进行了调整,无论调大调小,发现poll 的时候总是有空的,而且会频繁出现,consumer.poll(Duration.ofMillis(100))。
没明白哎,首先你的意思第一个问题是有的分区没被消费,这个问题你确认了吗? 第二个问题是你的poll经常是空,所以通过查看消费者组命令来确保消息一直是充足的,否则消费者去拉取消息的时候,消息没有了,所以返回空是很正常的,
首先你的意思第一个问题是有的分区没被消费,这个问题你确认了吗? 这个我是只消费 0 分区进行测试的,这个是0 分区的offset,所以消息是有的,0 分区消息充足,在这种情况下,poll 也是为0 ,频繁出现0
partition CURRENT-OFFSET LOG-END-OFFSET LAG 0 89959331 97139009 7179678
部分日志:
2022-07-10_19:53:44.093 Consumer Poll Counts ===> 0 2022-07-10_19:53:44.197 Consumer Poll Counts ===> 0 2022-07-10_19:53:44.301 Consumer Poll Counts ===> 0 2022-07-10_19:53:44.406 Consumer Poll Counts ===> 0 2022-07-10_19:53:44.510 Consumer Poll Counts ===> 0 2022-07-10_19:53:44.612 Consumer Poll Counts ===> 0 2022-07-10_19:53:44.713 Consumer Poll Counts ===> 0 2022-07-10_19:53:44.851 Consumer Poll Counts ===> 500 2022-07-10_19:53:45.028 Consumer Poll Counts ===> 0 2022-07-10_19:53:45.133 Consumer Poll Counts ===> 0 2022-07-10_19:53:45.237 Consumer Poll Counts ===> 0 2022-07-10_19:53:45.341 Consumer Poll Counts ===> 0 2022-07-10_19:53:45.444 Consumer Poll Counts ===> 0 2022-07-10_19:53:45.544 Consumer Poll Counts ===> 0 2022-07-10_19:53:45.645 Consumer Poll Counts ===> 0 2022-07-10_19:53:45.747 Consumer Poll Counts ===> 0
TopicPartition tp = new TopicPartition("test",0); consumer.seek(tp, 0);
kafka是主动拉取消息,拉取不到消息说明没有消息。是你的count统计有问题。
partition CURRENT-OFFSET LOG-END-OFFSET LAG 0 89959331 97139009 7179678
但是看这个的话 不是说明有消息吗?有这么多消息还没有拉取。我打印出的count 是每次poll 的recourds 的count.
大佬你们有群不?
这个逻辑是不会错的。任何参数你都不需要调整,有消息的情况下,是会持续拉取的。 你要检察下代码。 还有第一条提供你的命令,你查询的结果是什么?我想看看lag有多少
partition CURRENT-OFFSET LOG-END-OFFSET LAG 0 89959331 97139009 7179678
这个就是的
7179678 LAG
这个是测试代码 和 log,发现poll count 为0 的频率 比较固定,不知道是哪里出的问题。谢谢了
Properties kafkaProperties = KafkaConfigurer.getKafkaProperties(configPath); KafkaConsumer<String, ByteBuffer> consumer = new KafkaConsumer<>(kafkaProperties); consumer.subscribe(Lists.newArrayList("test"),new ConsumerRebalanceListener(){ @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { consumer.seek(new TopicPartition("test",0),0); } }); while (true) { ConsumerRecords<String, ByteBuffer> records = consumer.poll(Duration.ofMillis(3000)); log.info("========>" + records.count()); }
2022-07-10_21:23:51.122 [main] INFO a.m.q.c.ConsumerTest - ========>0 2022-07-10_21:23:54.127 [main] INFO a.m.q.c.ConsumerTest - ========>0 2022-07-10_21:23:57.133 [main] INFO a.m.q.c.ConsumerTest - ========>0 2022-07-10_21:24:00.137 [main] INFO a.m.q.c.ConsumerTest - ========>0 2022-07-10_21:24:02.533 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:02.534 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:02.534 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:02.535 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:02.536 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:02.536 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:02.537 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:02.538 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:02.538 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:02.539 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:02.539 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:02.539 [main] INFO a.m.q.c.ConsumerTest - ========>70 2022-07-10_21:24:05.540 [main] INFO a.m.q.c.ConsumerTest - ========>0 2022-07-10_21:24:08.541 [main] INFO a.m.q.c.ConsumerTest - ========>0 2022-07-10_21:24:11.544 [main] INFO a.m.q.c.ConsumerTest - ========>0 2022-07-10_21:24:14.549 [main] INFO a.m.q.c.ConsumerTest - ========>0 2022-07-10_21:24:16.965 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:16.966 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:16.967 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:16.967 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:16.968 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:16.969 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:16.969 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:16.970 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:16.971 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:16.971 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:16.972 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:16.972 [main] INFO a.m.q.c.ConsumerTest - ========>111 2022-07-10_21:24:19.974 [main] INFO a.m.q.c.ConsumerTest - ========>0 2022-07-10_21:24:22.976 [main] INFO a.m.q.c.ConsumerTest - ========>0 2022-07-10_21:24:25.977 [main] INFO a.m.q.c.ConsumerTest - ========>0 2022-07-10_21:24:28.977 [main] INFO a.m.q.c.ConsumerTest - ========>0 2022-07-10_21:24:31.428 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:31.429 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:31.430 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:31.431 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:31.431 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:31.432 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:31.433 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:31.433 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:31.434 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:31.435 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:31.435 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:31.436 [main] INFO a.m.q.c.ConsumerTest - ========>141 2022-07-10_21:24:34.438 [main] INFO a.m.q.c.ConsumerTest - ========>0 2022-07-10_21:24:37.442 [main] INFO a.m.q.c.ConsumerTest - ========>0 2022-07-10_21:24:40.445 [main] INFO a.m.q.c.ConsumerTest - ========>0 2022-07-10_21:24:43.448 [main] INFO a.m.q.c.ConsumerTest - ========>0 2022-07-10_21:24:45.281 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:45.282 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:45.283 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:45.283 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:45.284 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:45.285 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:45.285 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:45.286 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:45.286 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:45.287 [main] INFO a.m.q.c.ConsumerTest - ========>500 2022-07-10_21:24:45.287 [main] INFO a.m.q.c.ConsumerTest - ========>500
你这个是offset还没来得及提交,就又去拉了。
改成手动提交offset,保证下次拉取前,offset提交成功。
这个意思是offset 不提交,就不会下次拉取?
准确的说,客户端请求kafka的offset的位置,这是由客户端控制。
你的答案