kafka 没有提交偏移量为什么还消费不到消息
我设置了手动提交偏移量,并且没有提交,但是为什么拉取不到消息,只能重启应用,或者重新加入group的时候才消费的到。
Consumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Collections.singleton(topic));
ConsumerRecords<String, String> records = null;
while (true) {
records = consumer.poll(Duration.ofMillis(8100));
logger.info("size {}",records.count());
records.forEach(record -> {
boolean success = true;
try {
OrderAction orderAction = processor.processor(new MQContent(record.value()));
if (orderAction == OrderAction.Suspend) {
success = false;
}
} catch (Exception e) {
success = false;
}
if (success) {
//手动提交offset
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
consumer.commitSync();
}
});
}
}
是呀,kafka客户端已经将消息拉下来了,也丢给你了,你不提交offset,那客户端也不会重新去拉取了,它认为你在处理中。
哪有什么好点的重试策略吗
kafka客户端负责把消息丢给你的业务层,这个时候,该消息必须被提交,无论你业务层是否成功,而且kafka是基于offset下标的,如果后面的提交也会跳过之前的消息。另外设计第一原则是不要侵入到业务层。
如果业务处理这条消息失败,重新丢会队列或者其他的处理就可以了,简单不复杂。
感谢
你的答案