关于物联网的 kafka 订阅讨论设计 有三个订阅者:A,B,C A 想订阅 modelId 的 所有 测点。B 想订阅 modelId 的 Ua,Ia 测点。C 想订阅 所有 Ua 的测点 方案一: 订阅 总的kafka topic,在各自 的消费组里 进行消息过滤。方案二: 在消息源端进行过滤,分别推送三个不同的 kafka topic
方案一:架构简单,生产方可长年不做任何改造,消费者按需过滤,工作量在消费者,一个消费者失误只会影响自己。
方案二:消息类型的再细分,坏处是逻辑在生产者,以后新增类型生产者和消费者都要改动代码,好处是可扩展性强,并发高,网络传输减少了,大规模场景下显而易见。
另外一套方案:
同一个topic,根据
key
规则,将消息分发到不同的消费者分区
,消费者订阅不同的分区来获取想要的数据。感谢回复, 第三种方案, pattion 的数量如何划分,根据什么样的 key 规则可以初始需求呢,这一点我没有底,望不啬赐教
分区逻辑是你自己写的,所以比较灵活,你的匹配逻辑比较简单,可以通过(point=?)来放到不同的分区中,分区的数量就基于你的量来绝对范围了,比如
Ua
发送到1-10
分区。分区逻辑的例子:
生产者发送消息的规则:
props.put("partitioner.class", "example.producer.SimplePartitioner");
逻辑类:
import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; public class SimplePartitioner implements Partitioner { public SimplePartitioner (VerifiableProperties props) { } public int partition(Object key, int a_numPartitions) { int partition = 0; String stringKey = (String) key; int offset = stringKey.lastIndexOf('.'); if (offset > 0) { partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions; } return partition; } }
这段逻辑的关键,我们得到的IP地址,取得最后一个字节,并进行分区数模运算,得出相应的分区,好处是相同的源ip划分到相同的分区里。但是你在消费的时候,要知道如何处理。
第三种方案: 参考 mqtt tpoic 设计方案 /tenantId/modelId/productId/deviceId/point, 但是我不确定这个 topic 会不会膨胀到无法管理,基本上 一个项目上得有 5 个模型,每个模型有3个产品,然后 50个点位,,5350 = 750,如果设备再多一点,1000 个设备 ? 750*1000!boom! 看来设备Id 不能加. /tenantId/modelId/productId/point 750 个 topic! 不知道这种方案又怎样
如果你量庞大,消费者又不想复杂,你可以考虑选择第4种方式,借助kafka的
流
来将消息拆分。topicA(所有消息) -> 流 (将消息拆分发送到 topicB,和topicC)
这样,A就直接消费topicA,流来拆分消息(流量也在服务器消化了),消费者B和C分别消费topicB和topicC。
参考:Kafka Streams开发者指南
感谢,我想这个方案 和 方案二 不谋而合,也是我偏向的一种方案,我将向这个方案靠近验证
你的答案