代码如下:
public class Stream8 {
// private static final Logger log = Logger.getLogger(Stream.class);
public static void main(String[] args) {
Properties props = new Properties();
String auto_commit = args[0];
//consumer group
//指定一个应用ID,会在指定的目录下创建文件夹,里面存放.lock文件
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KProperties.kafka_server_URL +":"+ KProperties.kafka_server_port);
// props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,10485760);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 2000);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, auto_commit);
StreamsBuilder builder = new StreamsBuilder();
KStream<String,String> textLines = builder.stream(KProperties.topic); //接收第一个topic
//测试
// textLines.map((k,v) -> new KeyValue<>(v,"1")).to(KProperties.topic1, Produced.with(Serdes.String(),Serdes.String()));
textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split(" ")))
.map((k,v) -> new KeyValue<>(k,v + "," + v.toString().length())).to(KProperties.topic1, Produced.with(Serdes.String(),Serdes.String()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
在kafkaManager中看到新的topic,但是点开又是空的:
app1-Counts123-changelog
app1-Counts123-repartition
app1-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog
app1-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition
app1-KSTREAM-AGGREGATE-STATE-STORE-0000000005-changelog
截图了发现无法上传图片
可以上传额,你用的什么游览器?
用命令直接消费对应的主题看看。
监控在没有消费者的情况下是无法显示是否有消息的。
这些主题用于存储相应的offset。
谢谢。
1. 用的谷歌浏览器上传图片,选择图片确定后就一直在那转啊转圈圈。
2. 我每次启动时都会生成不同的topic,测试了几次,发现自动生成了许多,看起来比较讨厌,不知道有没有好的解决方法
另外在上面程序中,/tmp目录下,看到有保存的一些文件
看着确实讨厌,但没有更好的办法,除非把offset存储改成zk存储。
谢谢
最后补充下问题解决方案:
将kafka集群的每个节点的配置文件中下面该属性注释放开,并修改对应的hostname,保存后重启kafka。
advertised.listeners=PLAINTEXT://dwtest-data2:9092
然后之前的那种情况就不会出现了。
你的答案