在
中
:
StreamsBuilder builder = new StreamsBuilder();
KStream<String,String> source = builder.stream("streams-count-input");
source.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
.groupBy((key, value) -> value)
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store1"))
.toStream()
.to("streams-count-output", Produced.with(Serdes.String(), Serdes.Long()));
Topology topology = builder.build();
System.out.println(topology);
KafkaStreams kafkaStreams = new KafkaStreams(topology,properties);
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
kafkaStreams.close();
latch.countDown();//计数减一
}
});
try {
kafkaStreams.start();
latch.await();
} catch (InterruptedException e) {
System.exit(1);
}
System.exit(0);