返回到文章

采纳

编辑于
l.

请教下kafka Stream的聚合怎么用(kafka0.10.2.0)?

kafka

KStream<String, String> source = builder.stream("net");
source.map((key, value) -> distinctFun(value, new String[] {"ni"})).groupByKey().aggregate(
                () -> 0L,  // initial value
                (aggKey, value, aggregate) -> aggregate + 1L,   // aggregating value
                TimeWindows.of(5000L).advanceBy(1000L) // intervals in milliseconds
        );

这个aggregate的参数报错,网上也没找到更多的用法介绍,有没有大神能帮我看下,给个demo看看么,谢谢了~