您好!
我有一个问题想问一下,生产者在执行producer = new KafkaProducer<>(props);这一段代码的时候并不建立连接?在第一次执行send方法的时候才去建立连接呢?
而且不是说send方法是异步的吗?我有一次将kafka地址配置错误了,在执行send方法前一直没有报错,在第一次执行send方法的时候程序会阻塞在那里,一直到60s后,会报一个发送超时的错误。代码如下:
props.put("bootstrap.servers", getUrl());
// leader收到的来自所有follower复制成功的确认数量,acks=0时只要消息被立即发送到socket buffer中,就认为发送成功,retries(失败重试) 配置也会失效
props.put("acks", "0");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
LOG.info("IOT-KAFKA建立连接@" + getUrl());
发送代码:
producer.send(new ProducerRecord<>(TOPIC, reqData), this);
回调:
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
LOG.info("send kafka error " + exception.getMessage());
} else {
LOG.info("send to " + metadata.topic() + " successed ");
}
}