kafka KTable漏消息的问题?

KaL 发表于: 2021-03-16   最后更新时间: 2021-03-16 17:41:33   1,160 游览
StreamsBuilder sb = new StreamsBuilder();
KTable<Integer, Integer> example =    
    sb.stream(inputs, Consumed.with(Serdes.Integer(), Serdes.Integer()))    
        .groupByKey()    
        .aggregate(        
            () -> 0,        
            (key, value, currentValue) -> {          
                logger.info("During Aggregation {}", currentValue)
                return currentValue
            },        
            Materialized.with(Serdes.Integer(), Serdes.Integer())    
        );

example.toStream().peek(loggger.info("At the end {}", currentValue))

我在0.0001 秒里放出了这三个event

(1, 1), (1, 2), (1, 3)

log到了

"During Aggregation 1" 
"During Aggregation 2" 
"During Aggregation 3"

但我只有"At the end 3"
漏掉了"At the end 1", "At the end 2"

是kafka KTable有什么短时间内不update的情况吗,有什么方法能够把"At the end 1", "At the end 2" 也打印出来?

添加评论
你的答案

查看kafka相关的其他问题或提一个您自己的问题