StreamsBuilder sb = new StreamsBuilder();
KTable<Integer, Integer> example =
sb.stream(inputs, Consumed.with(Serdes.Integer(), Serdes.Integer()))
.groupByKey()
.aggregate(
() -> 0,
(key, value, currentValue) -> {
logger.info("During Aggregation {}", currentValue)
return currentValue
},
Materialized.with(Serdes.Integer(), Serdes.Integer())
);
example.toStream().peek(loggger.info("At the end {}", currentValue))
我在0.0001 秒里放出了这三个event
(1, 1), (1, 2), (1, 3)
log到了
"During Aggregation 1"
"During Aggregation 2"
"During Aggregation 3"
但我只有"At the end 3"
漏掉了"At the end 1", "At the end 2"
是kafka KTable有什么短时间内不update的情况吗,有什么方法能够把"At the end 1", "At the end 2" 也打印出来?
你的答案