kafka streams output流收不到消息

L 发表于: 2018-10-24   最后更新时间: 2018-10-24 17:03:40   1,899 游览

我按照Kafka Stream演示程序这个例子。生成的topic 有好几个
__consumer_offsets
streams-count-input
streams-count-output
streams-word-count-counts-store1-changelog
streams-word-count-counts-store1-repartition
而且消费者streams-count-output接受不到消息,streams-word-count-counts-store1-repartition才能接受消息,这是为什么

发表于 2018-10-24
L

你的例子地址是?

L -> 半兽人 5年前
StreamsBuilder builder = new StreamsBuilder();
        KStream<String,String> source = builder.stream("streams-count-input");
        source.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
                .groupBy((key, value) -> value)
                .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store1"))
                .toStream()
                .to("streams-count-output", Produced.with(Serdes.String(), Serdes.Long()));
        Topology topology = builder.build();
        System.out.println(topology);
        KafkaStreams kafkaStreams = new KafkaStreams(topology,properties);
        final CountDownLatch latch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                kafkaStreams.close();
                latch.countDown();//计数减一
            }
        });

        try {
            kafkaStreams.start();
            latch.await();
        } catch (InterruptedException e) {
            System.exit(1);
        }
        System.exit(0);
半兽人 -> L 5年前

没看出什么问题,稍后我测试下。

L -> 半兽人 5年前

发现问题了,是这个value产生的是long类型,最后消费者乱码了。
改成String类型就好了。
stream.map((key, value)-> KeyValue.pair(key, value + "")).to("streams-count-output");
liunx 生产者和消费者默认的key,value序列化器都是String类似吗?

半兽人 -> L 5年前

。。。嗯

你的答案

查看kafka相关的其他问题或提一个您自己的问题