kafka streams output流收不到消息

我按照Kafka Stream演示程序这个例子。生成的topic 有好几个
__consumer_offsets
streams-count-input
streams-count-output
streams-word-count-counts-store1-changelog
streams-word-count-counts-store1-repartition
而且消费者streams-count-output接受不到消息,streams-word-count-counts-store1-repartition才能接受消息,这是为什么






发表于: 26天前   最后更新时间: 26天前   游览量:132
上一条: 到头了!
下一条: 已经是最后了!

评论…


  • 你的例子地址是?
    • StreamsBuilder builder = new StreamsBuilder();
              KStream<String,String> source = builder.stream("streams-count-input");
              source.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                      .groupBy((key, value) -> value)
                      .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store1"))
                      .toStream()
                      .to("streams-count-output", Produced.with(Serdes.String(), Serdes.Long()));
              Topology topology = builder.build();
              System.out.println(topology);
              KafkaStreams kafkaStreams = new KafkaStreams(topology,properties);
              final CountDownLatch latch = new CountDownLatch(1);
              Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
                  @Override
                  public void run() {
                      kafkaStreams.close();
                      latch.countDown();//计数减一
                  }
              });

              try {
                  kafkaStreams.start();
                  latch.await();
              } catch (InterruptedException e) {
                  System.exit(1);
              }
              System.exit(0);
        • 发现问题了,是这个value产生的是long类型,最后消费者乱码了。
          改成String类型就好了。
          stream.map((key, value)-> KeyValue.pair(key, value + "")).to("streams-count-output");
          liunx 生产者和消费者默认的key,value序列化器都是String类似吗?
          • 评论…
            • in this conversation