使用kafkaStream时为什么在kafkaManager中生成好多假的TOPIC

King 发表于: 2018-04-12   最后更新时间: 2018-04-12  
  •   0 订阅,394 游览

代码如下:

public class Stream8 {

//    private static final Logger log = Logger.getLogger(Stream.class);

    public static void main(String[] args) {
        Properties props = new Properties();

        String auto_commit = args[0];
        //consumer group
        //指定一个应用ID,会在指定的目录下创建文件夹,里面存放.lock文件  
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
        props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KProperties.kafka_server_URL +":"+ KProperties.kafka_server_port);
//        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,10485760);
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 2000);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, auto_commit);        

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String,String> textLines = builder.stream(KProperties.topic); //接收第一个topic

        //测试
//        textLines.map((k,v) -> new KeyValue<>(v,"1")).to(KProperties.topic1, Produced.with(Serdes.String(),Serdes.String()));
        textLines
        .flatMapValues(value -> Arrays.asList(value.toLowerCase().split(" ")))
        .map((k,v) -> new KeyValue<>(k,v + "," + v.toString().length())).to(KProperties.topic1, Produced.with(Serdes.String(),Serdes.String()));
         KafkaStreams streams = new KafkaStreams(builder.build(), props);
         streams.start();

    }

}

在kafkaManager中看到新的topic,但是点开又是空的:

app1-Counts123-changelog
app1-Counts123-repartition
app1-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog
app1-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition
app1-KSTREAM-AGGREGATE-STATE-STORE-0000000005-changelog







发表于: 3月前   最后更新时间: 3月前   游览量:394
上一条: 到头了!
下一条: 已经是最后了!

评论…


  • 最后补充下问题解决方案:
    将kafka集群的每个节点的配置文件中下面该属性注释放开,并修改对应的hostname,保存后重启kafka。
    advertised.listeners=PLAINTEXT://dwtest-data2:9092

    然后之前的那种情况就不会出现了。
    谢谢。

    1. 用的谷歌浏览器上传图片,选择图片确定后就一直在那转啊转圈圈。
    2. 我每次启动时都会生成不同的topic,测试了几次,发现自动生成了许多,看起来比较讨厌,不知道有没有好的解决方法
    另外在上面程序中,/tmp目录下,看到有保存的一些文件

    这些主题用于存储相应的offset。
    用命令直接消费对应的主题看看。
    监控在没有消费者的情况下是无法显示是否有消息的。
    截图了发现无法上传图片
  • 评论…
    • in this conversation
      提问