返回到文章

采纳

编辑于

kafka生产者发送消息失败导致内存溢出java.lang.OutOfMemoryError:Java heap space,请教如何解决?

kafka

最近自己写生产者代码,实例化一个kafkaproducer,多个线程调用去发送消息,但是其会报堆栈溢出的错误,请问这个是什么原因?
错误如下:

Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
    at org.apache.kafka.common.header.internals.RecordHeaders.<init>(RecordHeaders.java:50)
    at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:80)
    at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:118)
    at producer.KafkaProducerThread.<init>(KafkaProducerThread.java:37)
    at producer.kafkaProducer417.sendMessage(kafkaProducer417.java:70)
    at producer.kafkaProducer417.main(kafkaProducer417.java:81)
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

代码如下:

public static void sendMessage(String key, String value){
   try{
        executor.submit(new   KafkaProducerThread(producer,key,value,config));
      }catch(Exception e){
         logger.error(e.getMessage());
      }
public static void main(String[] args){
    kafkaProducer417 test = new kafkaProducer417();
    test.createTopic();
    for(int i=0;i<10000000;i++){
        test.sendMessage("data1","hello world");
    }
 }

 public class KafkaProducerThread implements Runnable {

    private Producer<String, String> producer = null;
    private ProducerRecord<String, String> record = null;
    public KafkaProducerThread(Producer<String, String> producer, String key, String value, KafkaProducerConfig config){}
    public void run(){
        try {
            this.producer.send(this.record,
                    new Callback() {
                        public void onCompletion(RecordMetadata metadata, Exception e) {
                            if (e != null) {
                                logger.error(e.getMessage());
                            } else {
                                logger.info("The offset of the record we just sent is: " + metadata.offset());
                            }
                        }
                    });
        }catch(KafkaException e){
            logger.error("e.getMessage());
        }
    }
}