参数设成手工提交数据。由于某种错误有10个数据一直都没有提交成功。那会这些数据应该会实在次消费到。但会在什么时候在次消费到呢?好像只有重新启动程序才可以消费到
重新分配的消费者可重新消费。
什么叫重新分配的消费者啊。我的程序一直没关闭。希望能把没有提交的数据重新消费掉。说明了就是业务处理失败的。无限消费没有提交的数据。直到该条数据被正确处理掉。就提交
指定offset提交是不是这样的啊Map<TopicPartition,OffsetAndMetadata> offset = new HashMap<TopicPartition,OffsetAndMetadata>();offset.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset()));consumer.commitSync(offset);
还有就是offset 99 ,100这两个。我在100的位置提交了。那么是不是99的数据也取不出来了啊,还是说只是100的提交了。99的数据还是会取出来的
刚做了个测试,如果我从kafka中取出5条数据,分别为1,2,3,4,5,如果消费者在执行一些逻辑在执行1,2,3,4的时候都失败了未提交commit,然后消费5做逻辑成功了提交了commit,那么offset也会被移动到5那一条数据那里,1,2,3,4 相当于也会丢失
如果是做消费者取出数据执行一些操作,全部都失败的话,然后重启消费者,这些数据会从失败的时候重新开始读取
所以消费者还是应该自己做容错机制
你来决定何时commit。你的逻辑会很复杂。
找不到想要的答案?提一个您自己的问题。
0 声望
这家伙太懒,什么都没留下
重新分配的消费者可重新消费。
什么叫重新分配的消费者啊。我的程序一直没关闭。希望能把没有提交的数据重新消费掉。说明了就是业务处理失败的。无限消费没有提交的数据。直到该条数据被正确处理掉。就提交
指定offset提交是不是这样的啊
Map<TopicPartition,OffsetAndMetadata> offset = new HashMap<TopicPartition,OffsetAndMetadata>();
offset.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset()));
consumer.commitSync(offset);
还有就是offset 99 ,100这两个。我在100的位置提交了。那么是不是99的数据也取不出来了啊,还是说只是100的提交了。99的数据还是会取出来的
刚做了个测试,如果我从kafka中取出5条数据,分别为1,2,3,4,5,如果消费者在执行一些逻辑在执行1,2,3,4的时候都失败了未提交commit,然后消费5做逻辑成功了提交了commit,那么offset也会被移动到5那一条数据那里,1,2,3,4
相当于也会丢失
如果是做消费者取出数据执行一些操作,全部都失败的话,然后重启消费者,这些数据会从失败的时候重新开始读取
所以消费者还是应该自己做容错机制
你来决定何时commit。你的逻辑会很复杂。
你的答案