kafka生产者发送消息失败导致内存溢出java.lang.OutOfMemoryError:Java heap space,请教如何解决?

桑代克 发表于: 2018-04-26   最后更新时间: 2018-04-26 09:21:54   10,412 游览

最近自己写生产者代码,实例化一个kafkaproducer,多个线程调用去发送消息,但是其会报堆栈溢出的错误,请问这个是什么原因?
错误如下:

Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
    at org.apache.kafka.common.header.internals.RecordHeaders.<init>(RecordHeaders.java:50)
    at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:80)
    at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:118)
    at producer.KafkaProducerThread.<init>(KafkaProducerThread.java:37)
    at producer.kafkaProducer417.sendMessage(kafkaProducer417.java:70)
    at producer.kafkaProducer417.main(kafkaProducer417.java:81)
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

代码如下:

public static void sendMessage(String key, String value){
   try{
        executor.submit(new   KafkaProducerThread(producer,key,value,config));
      }catch(Exception e){
         logger.error(e.getMessage());
      }
public static void main(String[] args){
    kafkaProducer417 test = new kafkaProducer417();
    test.createTopic();
    for(int i=0;i<10000000;i++){
        test.sendMessage("data1","hello world");
    }
 }

 public class KafkaProducerThread implements Runnable {

    private Producer<String, String> producer = null;
    private ProducerRecord<String, String> record = null;
    public KafkaProducerThread(Producer<String, String> producer, String key, String value, KafkaProducerConfig config){}
    public void run(){
        try {
            this.producer.send(this.record,
                    new Callback() {
                        public void onCompletion(RecordMetadata metadata, Exception e) {
                            if (e != null) {
                                logger.error(e.getMessage());
                            } else {
                                logger.info("The offset of the record we just sent is: " + metadata.offset());
                            }
                        }
                    });
        }catch(KafkaException e){
            logger.error("e.getMessage());
        }
    }
}
发表于 2018-04-26
添加评论

先增加jvm。

桑代克 -> 半兽人 6年前

应该就是jvm内存的问题,谢谢谢谢

桑代克 -> 半兽人 6年前

再想请问一个问题 错误:Expiring 6713 record(s) for topic1-1: 40006 ms has passed since last append是因为生产的速度太快吗?有什么建议的解决的方案吗?

半兽人 -> 桑代克 6年前

描述一下你现在的吞吐量和集群情况。

桑代克 -> 半兽人 6年前

现在单条消息1KB的时候,落地速度在5Mb/s左右。配置为一个topic,一个partition,batch_size是16Kb,acks = 1,三个生产者线程发送,集群是2台服务器,每台是16G内存,2T硬盘,千兆网,redhat6.7系统;但当我消息达到2Kb时会发送失败,报上面的错误,请问是什么原因呢?还有生产速度为什么会这么慢呢?

半兽人 -> 桑代克 6年前

因为你2个集群,所以你的topic分区要大于2,依次提升分区数来测试性能。kafka客户端是以所对应的分区进行缓存批次发送的。这块可以提高性能。

半兽人 -> 半兽人 5年前

kafka启动脚本kafka-server-start.sh中指定了kafka启动时需要的最小内存,默认为1G

export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"

调大就好了。

你的答案

查看kafka相关的其他问题或提一个您自己的问题