最近自己写生产者代码,实例化一个kafkaproducer,多个线程调用去发送消息,但是其会报堆栈溢出的错误,请问这个是什么原因?
错误如下:
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
at org.apache.kafka.common.header.internals.RecordHeaders.<init>(RecordHeaders.java:50)
at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:80)
at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:118)
at producer.KafkaProducerThread.<init>(KafkaProducerThread.java:37)
at producer.kafkaProducer417.sendMessage(kafkaProducer417.java:70)
at producer.kafkaProducer417.main(kafkaProducer417.java:81)
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
代码如下:
public static void sendMessage(String key, String value){
try{
executor.submit(new KafkaProducerThread(producer,key,value,config));
}catch(Exception e){
logger.error(e.getMessage());
}
public static void main(String[] args){
kafkaProducer417 test = new kafkaProducer417();
test.createTopic();
for(int i=0;i<10000000;i++){
test.sendMessage("data1","hello world");
}
}
public class KafkaProducerThread implements Runnable {
private Producer<String, String> producer = null;
private ProducerRecord<String, String> record = null;
public KafkaProducerThread(Producer<String, String> producer, String key, String value, KafkaProducerConfig config){}
public void run(){
try {
this.producer.send(this.record,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
logger.error(e.getMessage());
} else {
logger.info("The offset of the record we just sent is: " + metadata.offset());
}
}
});
}catch(KafkaException e){
logger.error("e.getMessage());
}
}
}