比如:在下述代码的RealTimeHandlerInstance::dataProcess
部分,我处理这些消息会有一定的处理延时,那么当消息突然传来很多,之后的消息是否会等待前面消息处理完之后再处理?
KStream<String, String> kStream = streamsBuilder.stream("mqtt", Consumed.with(Serdes.String(), Serdes.String()));
KTable<String, String> sameKeyAddStore =
kStream.map((String k, String v) -> {
// 这里做了一些其他处理
return new KeyValue<>(k1, k2);
}).groupByKey().aggregate(
() -> "",
RealTimeHandlerInstance::dataProcess,
Materialized.as("SameKeyAddStore1")
);
//创建 kafkaStreams
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), streamsConfig);