kakfa简单消费消息信息丢失问题Ignoring fetched records for topic12-0 at offset 187710 since the current position is 187946

桑代克 发表于: 2018-05-14   最后更新时间: 2018-05-14 14:37:55   3,200 游览

我写了一个简单的消费者,每次消费500条消息后,会同步提交偏移量,但是日志显示的偏移量存在明显略过了一部分消息的情况。我的topic只有一个分区。
代码如下:


String[] getMessage(int num) throws Exception{
        Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
        int count = 0;
        if(consumer!=null){
            try {
                while (count < num) {
                    ConsumerRecords<String, String> records = consumer.poll(1000);
                    for (ConsumerRecord<String, String> record : records) {
                        message[count] = record.value();
                        currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
                                new OffsetAndMetadata(record.offset()));
                        count++;
                        if(count == num){
                            break;
                        }
                    }
                }
                consumer.commitSync(currentOffsets);
            }catch (Exception e) {
                logger.error(String.format("poll message occurs error:%s", e.toString()));
            }
        }
        return message;
    }

下面是在主函数中调用这个函数:

while(true){
   message = getMessage(500);
}

在日志中出现了如下情况:

[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]Group test committed offset 187709 for partition topic12-0
[org.apache.kafka.clients.consumer.internals.Fetcher]Ignoring fetched records for topic12-0 at offset 187710 since the current position is 187946
[org.apache.kafka.clients.consumer.internals.Fetcher]Sending fetch for partitions [topic12-0] to broker host-129-152:9092 (id: 45 rack: null)
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]Group test committed offset 188209 for partition topic12-0
[org.apache.kafka.clients.consumer.internals.Fetcher]Ignoring fetched records for topic12-0 at offset 188446 since the current position is 188682
[org.apache.kafka.clients.consumer.internals.Fetcher]Sending fetch for partitions [topic12-0] to broker host-129-152:9092 (id: 45 rack: null)
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]Group test committed offset 188945 for partition topic12-0
[org.apache.kafka.clients.consumer.internals.Fetcher]Ignoring fetched records for topic12-0 at offset 189182 since the current position is 189418

请问大神这是什么原因造成的呢?

发表于 2018-05-14
添加评论

自动提交offset关闭了吗?

桑代克 -> 半兽人 6年前

关了呀

半兽人 -> 桑代克 6年前

你代码我看不懂额。我怀疑是consumer.commitSync和break;这俩块地方。

桑代克 -> 半兽人 6年前

嗯嗯,我想请问,如果要求一次消费1000条消息,这样子是不是poll信息后,仅取1000条,然后获得第1000条的偏移量提交,这种做法是可以的吧?

半兽人 -> 桑代克 6年前

那你要严格控制拉取批次的大小,然后已批次提交,提交和处理消息逻辑没有关联,否则将受到影响。

你的答案

查看kafka相关的其他问题或提一个您自己的问题