KStream<String, String> source = builder.stream("net");
source.map((key, value) -> distinctFun(value, new String[] {"ni"})).groupByKey().aggregate(
() -> 0L, // initial value
(aggKey, value, aggregate) -> aggregate + 1L, // aggregating value
TimeWindows.of(5000L).advanceBy(1000L) // intervals in milliseconds
);
这个aggregate的参数报错,网上也没找到更多的用法介绍,有没有大神能帮我看下,给个demo看看么,谢谢了~
在Kafka源码的streams/examples包中有相关的例子。
https://www.orchome.com/335
你的答案