我现在要使用Java向kafka发送1w条消息,创建一个producer应该使用一个单例的还是说每循环一次就创建一个新的producer对象呢?
for(Map<String, Object> url : allCrawlerUrls){
Producer<String, Object> instance = null;
try {
instance = rzxKafkaProducer.getInstance();
JSONObject jsonObject = JSONObject.parseObject(JSON.toJSONString(url));
jsonObject.put("topic", topic);
jsonObject.put("merchantId", merchantId);
jsonObject.put("putDate", StringUtil.getStringTime());
jsonObject.put("ttId", taskId);
ProducerRecord<String, Object> record = new ProducerRecord<>(topic, jsonObject.toJSONString());
instance.send(record, new RzxProducerCallback((Integer) url.get("id")));
} finally {
instance.close();
}
}
我现在是使用这样的循环方式向kafka发送消息(每次会创建一个新的producer对象),
这样的话就回有一个问题,我使用netstat -an
查询TCP连接时有大量的Time_Wait连接,下面这样
TCP 172.30.128.69:59257 172.31.124.236:9092 TIME_WAIT
TCP 172.30.128.69:59298 172.31.124.236:9092 TIME_WAIT
TCP 172.30.128.69:59299 172.31.124.236:9092 TIME_WAIT
TCP 172.30.128.69:59302 172.31.124.236:9092 TIME_WAIT
TCP 172.30.128.69:59303 172.31.124.236:9092 TIME_WAIT
TCP 172.30.128.69:59306 172.31.124.236:9092 TIME_WAIT
TCP 172.30.128.69:59307 172.31.124.236:9092 TIME_WAIT
TCP 172.30.128.69:59310 172.31.124.236:9092 TIME_WAIT
这样有问题吗?
但是当我将producer改为单例对象时,在发送消息30秒后,回调函数会报错,报错信息为:30009 ms has passed since last append,我在网上查询说是配置文件的问题,这个我查看了配置文件也没有问题,进行了如下设置:
listeners=PLAINTEXT://172.31.124.236:9092
host.name=172.31.124.236
请问大神,应该使用哪一种的Producer创建方式呢?