使用kafkaStream时为什么在kafkaManager中生成好多假的TOPIC

King 发表于: 2018-04-12   最后更新时间: 2018-04-12 19:24:40   3,557 游览

代码如下:

public class Stream8 {

//    private static final Logger log = Logger.getLogger(Stream.class);

    public static void main(String[] args) {
        Properties props = new Properties();

        String auto_commit = args[0];
        //consumer group
        //指定一个应用ID,会在指定的目录下创建文件夹,里面存放.lock文件  
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
        props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KProperties.kafka_server_URL +":"+ KProperties.kafka_server_port);
//        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,10485760);
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 2000);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, auto_commit);        

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String,String> textLines = builder.stream(KProperties.topic); //接收第一个topic

        //测试
//        textLines.map((k,v) -> new KeyValue<>(v,"1")).to(KProperties.topic1, Produced.with(Serdes.String(),Serdes.String()));
        textLines
        .flatMapValues(value -> Arrays.asList(value.toLowerCase().split(" ")))
        .map((k,v) -> new KeyValue<>(k,v + "," + v.toString().length())).to(KProperties.topic1, Produced.with(Serdes.String(),Serdes.String()));
         KafkaStreams streams = new KafkaStreams(builder.build(), props);
         streams.start();

    }

}

在kafkaManager中看到新的topic,但是点开又是空的:

app1-Counts123-changelog
app1-Counts123-repartition
app1-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog
app1-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition
app1-KSTREAM-AGGREGATE-STATE-STORE-0000000005-changelog

发表于 2018-04-12
添加评论

截图了发现无法上传图片

半兽人 -> King 6年前

可以上传额,你用的什么游览器?

用命令直接消费对应的主题看看。
监控在没有消费者的情况下是无法显示是否有消息的。

这些主题用于存储相应的offset。

谢谢。



1. 用的谷歌浏览器上传图片,选择图片确定后就一直在那转啊转圈圈。

2. 我每次启动时都会生成不同的topic,测试了几次,发现自动生成了许多,看起来比较讨厌,不知道有没有好的解决方法

另外在上面程序中,/tmp目录下,看到有保存的一些文件




半兽人 -> King 6年前

看着确实讨厌,但没有更好的办法,除非把offset存储改成zk存储。

King -> 半兽人 6年前

谢谢

最后补充下问题解决方案:
将kafka集群的每个节点的配置文件中下面该属性注释放开,并修改对应的hostname,保存后重启kafka。
advertised.listeners=PLAINTEXT://dwtest-data2:9092

然后之前的那种情况就不会出现了。

你的答案

查看kafka相关的其他问题或提一个您自己的问题