kafkaStream如何从一个kafka集群的topic消费数据然后发送到另一个kafka集群的topic里面?

King 发表于: 2018-04-23   最后更新时间: 2018-04-23 17:39:11   7,509 游览

目前测试来看,只有相同集群内部可以从一个topic发送到另外一个topic。
但是对于不同集群的发送数据一致测试没通过。

主要业务是我要清洗source topic然后发送给对方kafka服务器
    public static void main(String[] args) {
        Properties props = new Properties();

        String source_bootstrap_server = "server1:9092"; //源集群
        String source_topic = "test"; //源集群的topic
        String target_bootstrap_server = "server2:9092"; //目标集群
        String target_topic = "test1";  //目标集群的topic

        //consumer group
        //指定一个应用ID,会在指定的目录下创建文件夹,里面存放.lock文件  
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "king"); //对应源topic的消费者groupid
        props.put(StreamsConfig.STATE_DIR_CONFIG, "./tmp/");
//        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KProperties.kafka_server_URL +":"+ KProperties.kafka_server_port);
//        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,10485760);
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 2000);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "bigdata2");
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, source_bootstrap_server);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, target_bootstrap_server);


        StreamsBuilder builder = new StreamsBuilder();
        KStream<String,String> textLines = builder.stream(source_topic); //source topic

        //测试
        textLines.flatMapValues(value -> Arrays.asList(value.toLowerCase().split(" ")))
        .map((k,v) -> new KeyValue<>(k,v + "," + v.toString().length()))
        //targetTopic
        .to(target_topic, Produced.with(Serdes.String(),Serdes.String()));
         KafkaStreams streams = new KafkaStreams(builder.build(), props);
         streams.start();

    }
发表于 2018-04-23
添加评论

已解决。哈哈,我犯傻了。其实直接用消费者 + 生产者就好了。不需要用kafkaStream。

你的答案

查看kafka相关的其他问题或提一个您自己的问题