业务需要判断当前topic内是否包含需要的数据。
当前逻辑是得到该topic所有partitionInfo,startOffset,endOffset.
//get topic partitions info
List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
List<TopicPartition> topicPartitions = getTopicPartiton(partitionInfoList);
//Get end offset for each partition
Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(topicPartitions);
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
然后for循环所有的partition,consumer.seek来去始末的message来判断是否是业务需要。
for (TopicPartition topicPartition : topicPartitions) {
// skip when startOffset equal endOffset
if (beginningOffsets.get(topicPartition).equals(endOffsets.get(topicPartition))) continue;
//if need seek , must pool fisrt
consumer.assign(Arrays.asList(topicPartition));
consumer.poll(duration);
//Get begin offset message
consumer.seekToBeginning(Arrays.asList(topicPartition));
getOffsetValue(consumer);
//Get end offset message
consumer.seek(topicPartition, endOffsets.get(topicPartition) - 1);
getOffsetValue(consumer);
}
Q1:这样玩可以吗?或者有什么好的意见?
Q2:这种情况需要并发吗?还是影响不大,都行
Q1
Q2:你这么玩,那么复杂,还不能分布式消费,哪里还有并发。
Q1: 业务上有正常的消费(订阅topic,有就消费)和回滚消费(指定内容的一些关键字大小在某个范围内消费),对于回滚的消费,因为topic内数据有不同的删除策略,而message传进去的时间我们无法确定,所以得确定当前topic内的message符合这个范围。
offset通过seekToBeging方式直接跳到最开始的可有offset,也可以用beginningOffsets方法获取offset,再seek的方式呀
Q2:感觉可以分布式消费呢,,就是每个消费者指定parition,且只读offset始末位置的值,是不是太没必要了 😂
Q1: 好吧,我已经把我的方案告诉你了。
Q2:分布式我指的是相同的程序,部署多个,部署在不同的机器上处理,而不是在一个程度上多线程并行处理,2个概念。你这种是为了写而写,无法扩展,全部限定死了,光想想就觉得累...
嘿嘿嘿,无时无刻不觉得自己的code贼low
加油,多写写慢慢感悟。
大神,我的这些理解正确吗?还有麻烦解答一下里面的疑问。谢谢🙏😊
发新的提问一直出不来,不知道是bug还是什么。
我估计是你发表情了,所以发不出来。
你的答案