我们现在有个业务,消费kafka持久化到DB,此时如果DB挂了,消费会报错,期望的是,每次拉完数据,检测DB是否正常,如果DB挂了,那么等待一段时间再重新拉数据,可以用Acknowledgment.nack来实现吗?例如:
public void listen(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
if (isConsumeNeedStop) {
ack.nack(0, 10000);
LOGGER.info(errorInfo);
} else {
try {
records.forEach(record -> consume(record));
} finally {
ack.acknowledge();
}
}
}
如果可以的话,nack方法的第一个参数填什么?
如果没报超时,可以实现:
nack的方法
index我在想是不是一个消息批次的offset位置,就是说一个批次比如有1000条,你是全部重复消费还是怎么样。
我现在没办法尝试,我希望你测试后可以给我分享下结果。
这边尝试过的同事跟我反馈是,offset设置0,就是全部重试,offset是多少,就丢掉多少数据
当前批次的吗
是的
那你应该是完美解决你的问题了,可以通过
record.offset()
定位你当前失败的位置。你的答案