我写了一个简单的消费者,每次消费500条消息后,会同步提交偏移量,但是日志显示的偏移量存在明显略过了一部分消息的情况。我的topic只有一个分区。
代码如下:
String[] getMessage(int num) throws Exception{
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int count = 0;
if(consumer!=null){
try {
while (count < num) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
message[count] = record.value();
currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset()));
count++;
if(count == num){
break;
}
}
}
consumer.commitSync(currentOffsets);
}catch (Exception e) {
logger.error(String.format("poll message occurs error:%s", e.toString()));
}
}
return message;
}
下面是在主函数中调用这个函数:
while(true){
message = getMessage(500);
}
在日志中出现了如下情况:
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]Group test committed offset 187709 for partition topic12-0
[org.apache.kafka.clients.consumer.internals.Fetcher]Ignoring fetched records for topic12-0 at offset 187710 since the current position is 187946
[org.apache.kafka.clients.consumer.internals.Fetcher]Sending fetch for partitions [topic12-0] to broker host-129-152:9092 (id: 45 rack: null)
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]Group test committed offset 188209 for partition topic12-0
[org.apache.kafka.clients.consumer.internals.Fetcher]Ignoring fetched records for topic12-0 at offset 188446 since the current position is 188682
[org.apache.kafka.clients.consumer.internals.Fetcher]Sending fetch for partitions [topic12-0] to broker host-129-152:9092 (id: 45 rack: null)
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]Group test committed offset 188945 for partition topic12-0
[org.apache.kafka.clients.consumer.internals.Fetcher]Ignoring fetched records for topic12-0 at offset 189182 since the current position is 189418
请问大神这是什么原因造成的呢?
自动提交offset关闭了吗?
关了呀
你代码我看不懂额。我怀疑是consumer.commitSync和break;这俩块地方。
参考:https://www.orchome.com/451 的例子,把你的逻辑先去掉,没问题了在嵌入你的逻辑。
嗯嗯,我想请问,如果要求一次消费1000条消息,这样子是不是poll信息后,仅取1000条,然后获得第1000条的偏移量提交,这种做法是可以的吧?
那你要严格控制拉取批次的大小,然后已批次提交,提交和处理消息逻辑没有关联,否则将受到影响。
你的答案