L

0 声望

这家伙太懒,什么都没留下

个人动态
  • 半兽人 回复 Lkafka streams output流收不到消息 中 :

    。。。嗯

    5年前
  • L 回复 半兽人kafka streams output流收不到消息 中 :

    发现问题了,是这个value产生的是long类型,最后消费者乱码了。
    改成String类型就好了。
    stream.map((key, value)-> KeyValue.pair(key, value + "")).to("streams-count-output");
    liunx 生产者和消费者默认的key,value序列化器都是String类似吗?

    5年前
  • 半兽人 回复 Lkafka streams output流收不到消息 中 :

    没看出什么问题,稍后我测试下。

    5年前
  • L 回复 半兽人kafka streams output流收不到消息 中 :
    StreamsBuilder builder = new StreamsBuilder();
            KStream<String,String> source = builder.stream("streams-count-input");
            source.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
                    .groupBy((key, value) -> value)
                    .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store1"))
                    .toStream()
                    .to("streams-count-output", Produced.with(Serdes.String(), Serdes.Long()));
            Topology topology = builder.build();
            System.out.println(topology);
            KafkaStreams kafkaStreams = new KafkaStreams(topology,properties);
            final CountDownLatch latch = new CountDownLatch(1);
            Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
                @Override
                public void run() {
                    kafkaStreams.close();
                    latch.countDown();//计数减一
                }
            });

            try {
                kafkaStreams.start();
                latch.await();
            } catch (InterruptedException e) {
                System.exit(1);
            }
            System.exit(0);
    5年前