Kafka streams 中处理数据, 之后的消息是否会等待前面的消息处理完

北兮! 发表于: 2018-04-13   最后更新时间: 2018-04-13 11:38:18   2,641 游览

比如:在下述代码的RealTimeHandlerInstance::dataProcess部分,我处理这些消息会有一定的处理延时,那么当消息突然传来很多,之后的消息是否会等待前面消息处理完之后再处理?

KStream<String, String> kStream = streamsBuilder.stream("mqtt", Consumed.with(Serdes.String(), Serdes.String()));

        KTable<String, String> sameKeyAddStore =
                kStream.map((String k, String v) -> {

                    // 这里做了一些其他处理 

                    return new KeyValue<>(k1, k2);
                }).groupByKey().aggregate(
                        () -> "",
                        RealTimeHandlerInstance::dataProcess,
                        Materialized.as("SameKeyAddStore1")
                );

        //创建 kafkaStreams
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), streamsConfig);
添加评论
你的答案

查看kafka相关的其他问题或提一个您自己的问题