返回到文章

采纳

编辑于

kafka生产者发送消息是创建一个producer应该使用一个单例的还是说每循环一次就创建一个新的producer对象呢

kafka

我现在要使用Java向kafka发送1w条消息,创建一个producer应该使用一个单例的还是说每循环一次就创建一个新的producer对象呢?

for(Map<String, Object> url : allCrawlerUrls){
    Producer<String, Object> instance = null;
    try {
         instance = rzxKafkaProducer.getInstance();
        JSONObject jsonObject = JSONObject.parseObject(JSON.toJSONString(url));
        jsonObject.put("topic", topic);
        jsonObject.put("merchantId", merchantId);
        jsonObject.put("putDate", StringUtil.getStringTime());
        jsonObject.put("ttId", taskId);
        ProducerRecord<String, Object> record = new ProducerRecord<>(topic, jsonObject.toJSONString());
        instance.send(record, new RzxProducerCallback((Integer) url.get("id")));
    } finally {
        instance.close();
    }
}

我现在是使用这样的循环方式向kafka发送消息(每次会创建一个新的producer对象),
这样的话就回有一个问题,我使用netstat -an查询TCP连接时有大量的Time_Wait连接,下面这样

  TCP    172.30.128.69:59257    172.31.124.236:9092    TIME_WAIT
  TCP    172.30.128.69:59298    172.31.124.236:9092    TIME_WAIT
  TCP    172.30.128.69:59299    172.31.124.236:9092    TIME_WAIT
  TCP    172.30.128.69:59302    172.31.124.236:9092    TIME_WAIT
  TCP    172.30.128.69:59303    172.31.124.236:9092    TIME_WAIT
  TCP    172.30.128.69:59306    172.31.124.236:9092    TIME_WAIT
  TCP    172.30.128.69:59307    172.31.124.236:9092    TIME_WAIT
  TCP    172.30.128.69:59310    172.31.124.236:9092    TIME_WAIT

这样有问题吗?

但是当我将producer改为单例对象时,在发送消息30秒后,回调函数会报错,报错信息为:30009 ms has passed since last append,我在网上查询说是配置文件的问题,这个我查看了配置文件也没有问题,进行了如下设置:

listeners=PLAINTEXT://172.31.124.236:9092
host.name=172.31.124.236

请问大神,应该使用哪一种的Producer创建方式呢?