返回到文章

采纳

编辑于

kafka producer代码发送自定义消息类型

kafka

代码1:

       for(int i = 0; i < 2; i++){                  
           final int index = i;  
           PeopleInfo people =new PeopleInfo("Daming", i);
           people.getAge();
           people.getName();
           producer.send(new ProducerRecord<String, Object>(topicName, Integer.toString(i), people), new Callback() {  

             @Override  
             public void onCompletion(RecordMetadata metadata, Exception exception) {  
                 if (metadata != null) {  
                     System.out.println(index+"  发送成功:"+"checksum: "+metadata.checksum()+" offset: "+metadata.offset()+" partition: "+metadata.partition()+" topic: "+metadata.topic());  
                 }  
                 if (exception != null) {  
                     System.out.println(index+"异常:"+exception.getMessage());  
                 }  
             }  
         });  
       } 

代码2:

producer.send(new ProducerRecord<String, Object>("second",Integer.toString(11),people), new Callback() {  

            @Override  
            public void onCompletion(RecordMetadata metadata, Exception exception) {  
                if (metadata != null) {  
                    System.out.println(1+"  发送成功:"+"checksum: "+metadata.checksum()+" offset: "+metadata.offset()+" partition: "+metadata.partition()+" topic: "+metadata.topic());  
                }  
                if (exception != null) {  
                    System.out.println(1+"异常:"+exception.getMessage());  
                }  
            }  
        });

我想实现的是我用PeopleInfo这个类封装我所要发送的消息,然后通过kafka producer发送出去,我的序列化程序(Encoder)和反序列程序(Decoder)和消息封装类PeopleInfo均已测试过没有问题,可是当我用程序1时,kafka端能够收到消息,Consumer也能根据topic进行消费,producer的send方法中的callback也能进去执行。

可是,当我把代码1换成代码2的时候,也更换了topic名,kafka端就是收不到消息,callback方法也没有执行进去,为什么会出现这种情况呢?消息封装类和序列化类和反序列类都经过测试没有问题,用代码1完全可以发送出去,代码2就是不行,望大神指教!不胜感激!