目前测试来看,只有相同集群内部可以从一个topic发送到另外一个topic。
但是对于不同集群的发送数据一致测试没通过。
主要业务是我要清洗source topic然后发送给对方kafka服务器
public static void main(String[] args) {
Properties props = new Properties();
String source_bootstrap_server = "server1:9092"; //源集群
String source_topic = "test"; //源集群的topic
String target_bootstrap_server = "server2:9092"; //目标集群
String target_topic = "test1"; //目标集群的topic
//consumer group
//指定一个应用ID,会在指定的目录下创建文件夹,里面存放.lock文件
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "king"); //对应源topic的消费者groupid
props.put(StreamsConfig.STATE_DIR_CONFIG, "./tmp/");
// props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KProperties.kafka_server_URL +":"+ KProperties.kafka_server_port);
// props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,10485760);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 2000);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "bigdata2");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, source_bootstrap_server);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, target_bootstrap_server);
StreamsBuilder builder = new StreamsBuilder();
KStream<String,String> textLines = builder.stream(source_topic); //source topic
//测试
textLines.flatMapValues(value -> Arrays.asList(value.toLowerCase().split(" ")))
.map((k,v) -> new KeyValue<>(k,v + "," + v.toString().length()))
//targetTopic
.to(target_topic, Produced.with(Serdes.String(),Serdes.String()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
已解决。哈哈,我犯傻了。其实直接用消费者 + 生产者就好了。不需要用kafkaStream。
你的答案