kafka poll records count 经常为0,只从一个partition中拉取数据,broker 中数据充足

Miss 发表于: 2022-07-10   最后更新时间: 2022-07-10 14:54:48   1,863 游览

从broker offset 为0 处开始消费,发现poll count经常为0,这是什么原因呢?期待回复

max.partition.fetch.bytes=1048576
max.poll.records=500
发表于 2022-07-10
添加评论

你重置offset,你可能只重置了其中一个分区的offset,而没有重置所有分区的offset。

例如(伪代码):

Map<TopicPartition, Long> topicParitionMap = new HashMap<>();
for (int i = 0; i < partitionNum; i++) 
  topicParitionMap.put(new TopicPartition(topic, i), 0);

// 重置所有分区的offset
topicParitionMap.forEach((k, v) -> {
    consumer.seek(k, v);
    System.out.println(k + ", offsets:" + v);
});

设置成功之后,开始消费的时候,所有的offset都指定了最早的位置,通过以下命令确定该消费者组,所有分区的offset位置:

## 显示某个消费组的消费详情(0.10.1.0版本+)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
Miss -> 半兽人 2年前

现在我们是12个分区,因为目前的数据分布不均匀,所以我想测试下,从kafka 一个分区中一段时间的拉取量,就指定了从一个分中拉取进行测试。

我是通过这个方式:

TopicPartition tp = new TopicPartition("test",0);
consumer.seek(tp, 0);

刚开始的测试的时候,确实是每个分区重置去测试的。数据分布不均匀会导致拉取count 为0 吗?

partitions.forEach(tp -> {
    consumer.seek(tp, 0);
});
半兽人 -> Miss 2年前

你确认一下offset的位置,是不是位置不一样,所以已经没有消息了

Miss -> 半兽人 2年前

有消息的
partition CURRENT-OFFSET LOG-END-OFFSET LAG
0 89959331 97139009 7179678

Miss -> 半兽人 2年前

是哪里配置有问题吗,测试了producer 的吞吐量,大概每秒27M 的数据,测试的时候, 把 consumer max.partition.fetch.bytes 也进行了调整,无论调大调小,发现poll 的时候总是有空的,而且会频繁出现,consumer.poll(Duration.ofMillis(100))。

半兽人 -> Miss 2年前

没明白哎,首先你的意思第一个问题是有的分区没被消费,这个问题你确认了吗? 第二个问题是你的poll经常是空,所以通过查看消费者组命令来确保消息一直是充足的,否则消费者去拉取消息的时候,消息没有了,所以返回空是很正常的,

Miss -> 半兽人 2年前

首先你的意思第一个问题是有的分区没被消费,这个问题你确认了吗? 这个我是只消费 0 分区进行测试的,这个是0 分区的offset,所以消息是有的,0 分区消息充足,在这种情况下,poll 也是为0 ,频繁出现0

partition CURRENT-OFFSET LOG-END-OFFSET   LAG
0          89959331       97139009        7179678

部分日志:

2022-07-10_19:53:44.093 Consumer Poll Counts  ===> 0
2022-07-10_19:53:44.197 Consumer Poll Counts  ===> 0
2022-07-10_19:53:44.301 Consumer Poll Counts  ===> 0
2022-07-10_19:53:44.406 Consumer Poll Counts  ===> 0
2022-07-10_19:53:44.510 Consumer Poll Counts  ===> 0
2022-07-10_19:53:44.612 Consumer Poll Counts  ===> 0
2022-07-10_19:53:44.713 Consumer Poll Counts  ===> 0
2022-07-10_19:53:44.851 Consumer Poll Counts  ===> 500
2022-07-10_19:53:45.028 Consumer Poll Counts  ===> 0
2022-07-10_19:53:45.133 Consumer Poll Counts  ===> 0
2022-07-10_19:53:45.237 Consumer Poll Counts  ===> 0
2022-07-10_19:53:45.341 Consumer Poll Counts  ===> 0
2022-07-10_19:53:45.444 Consumer Poll Counts  ===> 0
2022-07-10_19:53:45.544 Consumer Poll Counts  ===> 0
2022-07-10_19:53:45.645 Consumer Poll Counts  ===> 0
2022-07-10_19:53:45.747 Consumer Poll Counts  ===> 0
TopicPartition tp = new TopicPartition("test",0);
consumer.seek(tp, 0);
半兽人 -> Miss 2年前

kafka是主动拉取消息,拉取不到消息说明没有消息。是你的count统计有问题。

Miss -> 半兽人 2年前
partition CURRENT-OFFSET LOG-END-OFFSET LAG
0 89959331 97139009 7179678

但是看这个的话 不是说明有消息吗?有这么多消息还没有拉取。我打印出的count 是每次poll 的recourds 的count.

Miss -> 半兽人 2年前

大佬你们有群不?

半兽人 -> Miss 2年前

这个逻辑是不会错的。任何参数你都不需要调整,有消息的情况下,是会持续拉取的。 你要检察下代码。 还有第一条提供你的命令,你查询的结果是什么?我想看看lag有多少

Miss -> 半兽人 2年前
partition CURRENT-OFFSET LOG-END-OFFSET LAG
0 89959331 97139009 7179678

这个就是的7179678 LAG

Miss -> 半兽人 2年前

这个是测试代码 和 log,发现poll count 为0 的频率 比较固定,不知道是哪里出的问题。谢谢了

Properties kafkaProperties = KafkaConfigurer.getKafkaProperties(configPath);

KafkaConsumer<String, ByteBuffer> consumer = new KafkaConsumer<>(kafkaProperties);
consumer.subscribe(Lists.newArrayList("test"),new ConsumerRebalanceListener(){

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        consumer.seek(new TopicPartition("test",0),0);

    }
});
while (true) {
    ConsumerRecords<String, ByteBuffer> records = consumer.poll(Duration.ofMillis(3000));
    log.info("========>" + records.count());
}
2022-07-10_21:23:51.122 [main] INFO  a.m.q.c.ConsumerTest - ========>0
2022-07-10_21:23:54.127 [main] INFO  a.m.q.c.ConsumerTest - ========>0
2022-07-10_21:23:57.133 [main] INFO  a.m.q.c.ConsumerTest - ========>0
2022-07-10_21:24:00.137 [main] INFO  a.m.q.c.ConsumerTest - ========>0
2022-07-10_21:24:02.533 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:02.534 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:02.534 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:02.535 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:02.536 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:02.536 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:02.537 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:02.538 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:02.538 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:02.539 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:02.539 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:02.539 [main] INFO  a.m.q.c.ConsumerTest - ========>70
2022-07-10_21:24:05.540 [main] INFO  a.m.q.c.ConsumerTest - ========>0
2022-07-10_21:24:08.541 [main] INFO  a.m.q.c.ConsumerTest - ========>0
2022-07-10_21:24:11.544 [main] INFO  a.m.q.c.ConsumerTest - ========>0
2022-07-10_21:24:14.549 [main] INFO  a.m.q.c.ConsumerTest - ========>0
2022-07-10_21:24:16.965 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:16.966 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:16.967 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:16.967 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:16.968 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:16.969 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:16.969 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:16.970 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:16.971 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:16.971 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:16.972 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:16.972 [main] INFO  a.m.q.c.ConsumerTest - ========>111
2022-07-10_21:24:19.974 [main] INFO  a.m.q.c.ConsumerTest - ========>0
2022-07-10_21:24:22.976 [main] INFO  a.m.q.c.ConsumerTest - ========>0
2022-07-10_21:24:25.977 [main] INFO  a.m.q.c.ConsumerTest - ========>0
2022-07-10_21:24:28.977 [main] INFO  a.m.q.c.ConsumerTest - ========>0
2022-07-10_21:24:31.428 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:31.429 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:31.430 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:31.431 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:31.431 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:31.432 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:31.433 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:31.433 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:31.434 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:31.435 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:31.435 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:31.436 [main] INFO  a.m.q.c.ConsumerTest - ========>141
2022-07-10_21:24:34.438 [main] INFO  a.m.q.c.ConsumerTest - ========>0
2022-07-10_21:24:37.442 [main] INFO  a.m.q.c.ConsumerTest - ========>0
2022-07-10_21:24:40.445 [main] INFO  a.m.q.c.ConsumerTest - ========>0
2022-07-10_21:24:43.448 [main] INFO  a.m.q.c.ConsumerTest - ========>0
2022-07-10_21:24:45.281 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:45.282 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:45.283 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:45.283 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:45.284 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:45.285 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:45.285 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:45.286 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:45.286 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:45.287 [main] INFO  a.m.q.c.ConsumerTest - ========>500
2022-07-10_21:24:45.287 [main] INFO  a.m.q.c.ConsumerTest - ========>500
半兽人 -> Miss 2年前

你这个是offset还没来得及提交,就又去拉了。
改成手动提交offset,保证下次拉取前,offset提交成功。

Miss -> 半兽人 2年前

这个意思是offset 不提交,就不会下次拉取?

半兽人 -> Miss 2年前

准确的说,客户端请求kafka的offset的位置,这是由客户端控制。

你的答案

查看kafka相关的其他问题或提一个您自己的问题