代码如下:
public class Stream8 {
// private static final Logger log = Logger.getLogger(Stream.class);
public static void main(String[] args) {
Properties props = new Properties();
String auto_commit = args[0];
//consumer group
//指定一个应用ID,会在指定的目录下创建文件夹,里面存放.lock文件
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KProperties.kafka_server_URL +":"+ KProperties.kafka_server_port);
// props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,10485760);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 2000);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, auto_commit);
StreamsBuilder builder = new StreamsBuilder();
KStream<String,String> textLines = builder.stream(KProperties.topic); //接收第一个topic
//测试
// textLines.map((k,v) -> new KeyValue<>(v,"1")).to(KProperties.topic1, Produced.with(Serdes.String(),Serdes.String()));
textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split(" ")))
.map((k,v) -> new KeyValue<>(k,v + "," + v.toString().length())).to(KProperties.topic1, Produced.with(Serdes.String(),Serdes.String()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
在kafkaManager中看到新的topic,但是点开又是空的:
app1-Counts123-changelog
app1-Counts123-repartition
app1-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog
app1-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition
app1-KSTREAM-AGGREGATE-STATE-STORE-0000000005-changelog