请教下kafka Stream的聚合怎么用(kafka0.10.2.0)?

l. 发表于: 2018-02-03   最后更新时间: 2018-02-03 12:35:39   3,880 游览
KStream<String, String> source = builder.stream("net");
source.map((key, value) -> distinctFun(value, new String[] {"ni"})).groupByKey().aggregate(
                () -> 0L,  // initial value
                (aggKey, value, aggregate) -> aggregate + 1L,   // aggregating value
                TimeWindows.of(5000L).advanceBy(1000L) // intervals in milliseconds
        );

这个aggregate的参数报错,网上也没找到更多的用法介绍,有没有大神能帮我看下,给个demo看看么,谢谢了~

发表于 2018-02-03
l.
添加评论

在Kafka源码的streams/examples包中有相关的例子。
https://www.orchome.com/335

你的答案

查看kafka相关的其他问题或提一个您自己的问题