在
                            
                            中
                            :
                            
StreamsBuilder builder = new StreamsBuilder();
        KStream<String,String> source = builder.stream("streams-count-input");
        source.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
                .groupBy((key, value) -> value)
                .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store1"))
                .toStream()
                .to("streams-count-output", Produced.with(Serdes.String(), Serdes.Long()));
        Topology topology = builder.build();
        System.out.println(topology);
        KafkaStreams kafkaStreams = new KafkaStreams(topology,properties);
        final CountDownLatch latch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                kafkaStreams.close();
                latch.countDown();//计数减一
            }
        });
        try {
            kafkaStreams.start();
            latch.await();
        } catch (InterruptedException e) {
            System.exit(1);
        }
        System.exit(0);