kakfa简单消费消息信息丢失问题Ignoring fetched records for topic12-0 at offset 187710 since the current position is 187946

桑代克 发表于: 2018-05-14   最后更新时间: 2018-05-14  
  •   0 订阅,65 游览

我写了一个简单的消费者,每次消费500条消息后,会同步提交偏移量,但是日志显示的偏移量存在明显略过了一部分消息的情况。我的topic只有一个分区。
代码如下:


String[] getMessage(int num) throws Exception{
        Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
        int count = 0;
        if(consumer!=null){
            try {
                while (count < num) {
                    ConsumerRecords<String, String> records = consumer.poll(1000);
                    for (ConsumerRecord<String, String> record : records) {
                        message[count] = record.value();
                        currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
                                new OffsetAndMetadata(record.offset()));
                        count++;
                        if(count == num){
                            break;
                        }
                    }
                }
                consumer.commitSync(currentOffsets);
            }catch (Exception e) {
                logger.error(String.format("poll message occurs error:%s", e.toString()));
            }
        }
        return message;
    }

下面是在主函数中调用这个函数:

while(true){
   message = getMessage(500);
}

在日志中出现了如下情况:

[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]Group test committed offset 187709 for partition topic12-0
[org.apache.kafka.clients.consumer.internals.Fetcher]Ignoring fetched records for topic12-0 at offset 187710 since the current position is 187946
[org.apache.kafka.clients.consumer.internals.Fetcher]Sending fetch for partitions [topic12-0] to broker host-129-152:9092 (id: 45 rack: null)
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]Group test committed offset 188209 for partition topic12-0
[org.apache.kafka.clients.consumer.internals.Fetcher]Ignoring fetched records for topic12-0 at offset 188446 since the current position is 188682
[org.apache.kafka.clients.consumer.internals.Fetcher]Sending fetch for partitions [topic12-0] to broker host-129-152:9092 (id: 45 rack: null)
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]Group test committed offset 188945 for partition topic12-0
[org.apache.kafka.clients.consumer.internals.Fetcher]Ignoring fetched records for topic12-0 at offset 189182 since the current position is 189418

请问大神这是什么原因造成的呢?







发表于: 9天前   最后更新时间: 9天前   游览量:65
上一条: 到头了!
下一条: 已经是最后了!

评论…


  • 自动提交offset关闭了吗?
    • 你代码我看不懂额。我怀疑是consumer.commitSync和break;这俩块地方。
      参考:http://orchome.com/451 的例子,把你的逻辑先去掉,没问题了在嵌入你的逻辑。
        • 嗯嗯,我想请问,如果要求一次消费1000条消息,这样子是不是poll信息后,仅取1000条,然后获得第1000条的偏移量提交,这种做法是可以的吧?
          • 评论…
            • in this conversation
              提问