返回到文章

采纳

编辑于

Kafka streams 中处理数据, 之后的消息是否会等待前面的消息处理完

kafka

比如:在下述代码的RealTimeHandlerInstance::dataProcess部分,我处理这些消息会有一定的处理延时,那么当消息突然传来很多,之后的消息是否会等待前面消息处理完之后再处理?

KStream<String, String> kStream = streamsBuilder.stream("mqtt", Consumed.with(Serdes.String(), Serdes.String()));

        KTable<String, String> sameKeyAddStore =
                kStream.map((String k, String v) -> {

                    // 这里做了一些其他处理 

                    return new KeyValue<>(k1, k2);
                }).groupByKey().aggregate(
                        () -> "",
                        RealTimeHandlerInstance::dataProcess,
                        Materialized.as("SameKeyAddStore1")
                );

        //创建 kafkaStreams
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), streamsConfig);