kafka判断当前topic内是否包含需要的数据

一如乞人不需要形象 发表于: 2020-06-08   最后更新时间: 2020-06-08 20:32:10   2,596 游览

业务需要判断当前topic内是否包含需要的数据。
当前逻辑是得到该topic所有partitionInfo,startOffset,endOffset.

//get topic partitions info
List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
List<TopicPartition> topicPartitions = getTopicPartiton(partitionInfoList);

//Get end offset for each partition
Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(topicPartitions);
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);

然后for循环所有的partition,consumer.seek来去始末的message来判断是否是业务需要。

 for (TopicPartition topicPartition : topicPartitions) {

    // skip when startOffset equal endOffset
    if (beginningOffsets.get(topicPartition).equals(endOffsets.get(topicPartition))) continue;

    //if need seek , must pool fisrt
    consumer.assign(Arrays.asList(topicPartition));
    consumer.poll(duration);

    //Get begin offset message
    consumer.seekToBeginning(Arrays.asList(topicPartition));
    getOffsetValue(consumer);

    //Get end offset message
    consumer.seek(topicPartition, endOffsets.get(topicPartition) - 1);
    getOffsetValue(consumer);
}

Q1:这样玩可以吗?或者有什么好的意见?

Q2:这种情况需要并发吗?还是影响不大,都行

发表于 2020-06-08

Q1

  1. 如果你的业务类型不同,可以使用不同的topic。
  2. 我很好奇你的offset是怎么获取的,当你第一次启动的时候,你从什么时间开始消费?难道你的客户端要存储这些信息,不会很复杂嘛。
  3. 如果是我,在topic有不同业务的情况下,我持续获取消息,只处理我需要的消息,跳过不符合类型的消息,依靠kafka来管理offset。

Q2:你这么玩,那么复杂,还不能分布式消费,哪里还有并发。

Q1: 业务上有正常的消费(订阅topic,有就消费)和回滚消费(指定内容的一些关键字大小在某个范围内消费),对于回滚的消费,因为topic内数据有不同的删除策略,而message传进去的时间我们无法确定,所以得确定当前topic内的message符合这个范围。
offset通过seekToBeging方式直接跳到最开始的可有offset,也可以用beginningOffsets方法获取offset,再seek的方式呀
Q2:感觉可以分布式消费呢,,就是每个消费者指定parition,且只读offset始末位置的值,是不是太没必要了 😂

Q1: 好吧,我已经把我的方案告诉你了。

Q2:分布式我指的是相同的程序,部署多个,部署在不同的机器上处理,而不是在一个程度上多线程并行处理,2个概念。你这种是为了写而写,无法扩展,全部限定死了,光想想就觉得累...

嘿嘿嘿,无时无刻不觉得自己的code贼low

加油,多写写慢慢感悟。

  • 书读百遍其义自见

大神,我的这些理解正确吗?还有麻烦解答一下里面的疑问。谢谢🙏😊

  1. 一个kafka cluster就是一个worker
  2. Distributed Mode需要每个cluster都配置好server.properties,distributed.properties,以及在libs里放入相关的jar, 并且每个cluster都需要执行distributed.sh
  3. 基于2,call任意一个server的api,都能启动一个新connector,具体工作在哪个server,是kafka自己均衡的,比如有6个connector,3个cluster,一般来说每个cluster2个connector work.
  4. 基于2,Distributed模式是如何保证每个cluster间配置的一致性性的,还是压根不保证,得自己保证这些,我起了两个cluster(一个配置了jar,一个没有),call一个server(无jar)可以看到另一个server(有jar)上起的connector状态,但call这个server(无jar)起connector会报错,没有指定的connctor.

发新的提问一直出不来,不知道是bug还是什么。

我估计是你发表情了,所以发不出来。

你的答案

查看kafka相关的其他问题或提一个您自己的问题