返回到文章

采纳

编辑于

使用kafkaStream时为什么在kafkaManager中生成好多假的TOPIC

kafka

代码如下:

public class Stream8 {

//    private static final Logger log = Logger.getLogger(Stream.class);

    public static void main(String[] args) {
        Properties props = new Properties();

        String auto_commit = args[0];
        //consumer group
        //指定一个应用ID,会在指定的目录下创建文件夹,里面存放.lock文件  
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
        props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KProperties.kafka_server_URL +":"+ KProperties.kafka_server_port);
//        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,10485760);
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 2000);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, auto_commit);        

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String,String> textLines = builder.stream(KProperties.topic); //接收第一个topic

        //测试
//        textLines.map((k,v) -> new KeyValue<>(v,"1")).to(KProperties.topic1, Produced.with(Serdes.String(),Serdes.String()));
        textLines
        .flatMapValues(value -> Arrays.asList(value.toLowerCase().split(" ")))
        .map((k,v) -> new KeyValue<>(k,v + "," + v.toString().length())).to(KProperties.topic1, Produced.with(Serdes.String(),Serdes.String()));
         KafkaStreams streams = new KafkaStreams(builder.build(), props);
         streams.start();

    }

}

在kafkaManager中看到新的topic,但是点开又是空的:

app1-Counts123-changelog
app1-Counts123-repartition
app1-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog
app1-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition
app1-KSTREAM-AGGREGATE-STATE-STORE-0000000005-changelog