我按照Kafka Stream演示程序这个例子。生成的topic 有好几个
__consumer_offsets
streams-count-input
streams-count-output
streams-word-count-counts-store1-changelog
streams-word-count-counts-store1-repartition
而且消费者streams-count-output接受不到消息,streams-word-count-counts-store1-repartition才能接受消息,这是为什么
昵称
0 声望
这家伙太懒,什么都没留下
你的例子地址是?
https://www.orchome.com/936
KStream<String,String> source = builder.stream("streams-count-input");
.groupBy((key, value) -> value)
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store1"))
.toStream()
.to("streams-count-output", Produced.with(Serdes.String(), Serdes.Long()));
System.out.println(topology);
final CountDownLatch latch = new CountDownLatch(1);
@Override
public void run() {
kafkaStreams.close();
latch.countDown();//计数减一
}
});
try {
kafkaStreams.start();
latch.await();
} catch (InterruptedException e) {
System.exit(1);
}
System.exit(0);
没看出什么问题,稍后我测试下。
发现问题了,是这个value产生的是long类型,最后消费者乱码了。
改成String类型就好了。
stream.map((key, value)-> KeyValue.pair(key, value + "")).to("streams-count-output");
liunx 生产者和消费者默认的key,value序列化器都是String类似吗?
。。。嗯
你的答案