Failed to allocate memory within the configured max blocking time 60000 ms

netman525 发表于: 2017-11-17   最后更新时间: 2021-08-27 16:52:09   11,159 游览

你好,线上报错。

查阅了相关文档,有几个配置需要修改。初步考虑配置成如下这样:

batch.size=4000
linger.ms=1
acks=1
retries=0

其他配置都是默认,还有那些地方需要修改呢?大神线上怎么配置的?

发表于 2017-11-17
添加评论

你这个错误是达到的消息,比发送的消息要快的多了。这几个参数确实可以缓解,topic中副本最好主备就好。

netman525 -> 半兽人 7年前

大神的意思是副本设置为2个,并且加上这些配置吗?

半兽人 -> netman525 7年前

对的。

netman525 -> 半兽人 7年前

acks=1已经表示Leader接收即返回成功,麻烦问下这里减少副本有什么作用吗?

半兽人 -> netman525 7年前

减少kafka之间的负载。

netman525 -> 半兽人 7年前

按照您的意思,副本设置为2,Poducer 的配置

batch.size=4000
linger.ms=1
acks=1
retries=0

我看了下kafka-manager中的Bytes In 在500k,Bytes Out在700k的时候会报错:

kafka-producer-network-thread | producer-1","info":"send kafka message failed, error:Expiring 18 record(s) for camera-0 due to 30002 ms has passed since batch creation plus linger time

半兽人 -> netman525 7年前

什么环境 都超30秒了。

netman525 -> 半兽人 7年前

线上环境的,而且这个值一直在增加。 Expiring 4 record(s) for camera-5 due to 29717795 ms has passed since last append.

半兽人 -> netman525 7年前

抱歉,最近在忙,你可以描述一下你的整个环境、和场景。

netman525 -> 半兽人 7年前

好的,非常感谢。

目前我们的业务场景是针对链路的信息采集,包括controller、dubbo、druid等的采集,每个请求会发50条数据,并且数据中包含SQL语句,参数等,所以单条数据量比较大。通过kafka-manager可以看到每个broker Bytes In =500k,Bytes Out = 700k,而其中针对交易的场景数据量会比较大,所以交易的应用会报上述错误。按照上述参数调整后。

副本设置为2,Poducer 的配置

batch.size=4000
linger.ms=1
acks=1
retries=0

依然会报如下错误:

error:Failed to allocate memory within the configured max blocking time 60000 ms 等。

半兽人 -> netman525 7年前

你的集群有几个,topic分区多少?

半兽人 -> 半兽人 7年前

buffer.memory:增加大发送总缓存。
ack=0,日志类消息不要确认了。
linger.ms=0,不要等待了。

netman525 -> 半兽人 7年前

1个集群3个broker, topic 10个分区

netman525 -> netman525 7年前

buffer.memory默认: 33554432(32M) ,扩大到多少呢?

半兽人 -> netman525 7年前

1个集群3个broker说的有问题额。1个broker=1个集群。
你们用的什么客户端写的?一定要使用非阻塞式发送。

netman525 -> 半兽人 7年前

客户端用java。    非阻塞式?  send()发送我们都是让其在异步线程池中进行的。

半兽人 -> netman525 7年前

send().get()是阻塞。单send非阻塞,不需要异步线程吧,kafka这吞吐很高额。

netman525 -> 半兽人 7年前

使用的方法是:

Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);

我们在10个异步线程池调用send()方法。 但是初始化时,只建立了1个

producer = new KafkaProducer<>(properties);

所以有可能会造成上述错误? 按照您的提示,去掉线程池再试试。

半兽人 -> netman525 7年前

你的并发数多少?

netman525 -> 半兽人 7年前

并发数是1000多,会产生20几万的数据量.同时会向kafka发送数据。

半兽人 -> netman525 7年前

单台并发多少,如果你消息的实时性要求不高,那就增大阻塞时间,和提高buffer的量来缓解。
还有,你该增加集群和分区数了。

netman525 -> 半兽人 7年前

可以考虑在一个应用创建多个KafkaProducer,发送的时候,与KafkaProducer的个数取模,发送数据。你试过这样的方式没? 打算压测试试

半兽人 -> netman525 7年前

试过,当并发到50的时候 会新建一个连接。

netman525 -> 半兽人 7年前

这里怎么去判断并发50,建立新连接呢? >_<

半兽人 -> netman525 7年前

代码逻辑实现的  +1 和 -1 ,当有发送+1,当发送完成-1,取差即是并发。

netman525 -> 半兽人 7年前

Hi,我环境里试过了,为什么起2个KafkaProducer,线程kafka-producer-network-thread | producer-1和 kafka-producer-network-thread | producer-2会死锁呢?

RecordAccumulator.append()在这里会死锁?

半兽人 -> netman525 7年前

是你写的有问题吧。

半兽人 -> netman525 7年前

贴出来看看。

netman525 -> 半兽人 7年前
/
  初始化producers个数
 /

private static int KAFKA_PRODUCES_COUNT = Integer.parseInt(XxlConfClient.get("zzjr-camera.kafka.producers", "1"));
private static KafkaProducer<String, String>[] producers = new KafkaProducer[KAFKA_PRODUCES_COUNT];

static{

    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
    properties.put(ProducerConfig.BATCH_SIZE_CONFIG,XxlConfClient.get("zzjr-camera.kafka.batch.size", "4000"));
    properties.put(ProducerConfig.LINGER_MS_CONFIG,XxlConfClient.get("zzjr-camera.kafka.linger.ms", "0"));
    properties.put(ProducerConfig.ACKS_CONFIG,XxlConfClient.get("zzjr-camera.kafka.acks", "0"));
    properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,XxlConfClient.get("zzjr-camera.kafka.buffer.memory", "67108864"));
    properties.put(ProducerConfig.RETRIES_CONFIG, XxlConfClient.get("zzjr-camera.kafka.retries", "0"));
    properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, XxlConfClient.get("zzjr-camera.kafka.compression.type", "none"));

    //初始化kafkaProducers
    for(int i =0; i< KAFKA_PRODUCES_COUNT; i++){
        producers[i] = new KafkaProducer<>(properties);
    }
}

/
  kafka发送消息
  @param entity
  @param <T>
 /

public static <T> void send(final T entity){

    try {
        int partition = new Random().nextInt(PARTITION);
        ProducerRecord record = new ProducerRecord<>(TOPIC,
                                                     partition,
                                                     "",
        );
        producers[new Random().nextInt(KAFKA_PRODUCES_COUNT)].send(record, new SendCallback(record, 0));
    } catch (Exception e) {
        logger.error("Method KafkaProducerClient.send() error case:"+ e.getMessage());
    }
}

通过VisualVM 看到线程kafka-producer-network-thread | producer-1 和 kafka-producer-network-thread | producer-2 阻塞了。

半兽人 -> netman525 7年前

你为什么要指定分区呢?

半兽人 -> 半兽人 7年前

随机分区就好了,new SendCallback()这个也去了吧,不需要回调。

netman525 -> 半兽人 7年前

1.我这里随机数计算分区和你说的去掉分区效果一样吗?
2.new SendCallback()去掉后看不到消息发送成功与否了吧?

半兽人 -> netman525 7年前

1、如果你kafka端调整,那新增加的分区 永远写不进去。
2、是的,如果你收到了报错的消息,然后呢?你能处理么。

半兽人 -> 半兽人 7年前

你看不到是哪条消息报的错。

netman525 -> 半兽人 7年前

目前我将分区扩大到20个,使用1个kafkaProducer发送数据,目前只是在抢购高峰期会出现这种错误:Expiring 3 record(s) for camera-5 due to 30009 ms has passed since batch creation plus linger time.  根据 https://issues.apache.org/jira/browse/KAFKA-4557 描述,准备提升kafka版本至0.10.2.0。 然后再提高kafkaProducer里的request.timeout.ms至60000ms。

netman525 -> 半兽人 7年前

1、同意你说的,增加分区后,会动态分区到新增的分区,不会存在重启等等。
2、我的回调是这么做的

/*
  producer回调
 */
static class SendCallback implements Callback {

    ProducerRecord<String, String> record;
    int sendSeq = 0;

    public SendCallback(ProducerRecord record, int sendSeq){
        this.record = record;
        this.sendSeq = sendSeq;
    }

    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e){
        //send success
        if (null == e) {
            return;
        }

        //send failed

        logger.error("send kafka message failed, error:" + e.getMessage());

        if (sendSeq < 1) {
            producers[new Random().nextInt(KAFKA_PRODUCES_COUNT)].send(record, new SendCallback(record, ++sendSeq));
        }
    }
}
半兽人 -> netman525 7年前

测试过回调吗?无法重发的。所以我后续就把回调去了。

netman525 -> 半兽人 7年前

还没测试到这一步,目前其他部分发送数据都还报错。

目前我将分区扩大到20个,使用1个kafkaProducer发送数据,目前只是在抢购高峰期会出现这种错误:

Expiring 3 record(s) for camera-5 due to 30009 ms has passed since batch creation plus linger time.

根据 https://issues.apache.org/jira/browse/KAFKA-4557 描述,准备提升kafka版本至0.10.2.0。

然后再提高kafkaProducer里的request.timeout.ms至60000ms。

这里你还有其他建议吗? > - <

半兽人 -> netman525 7年前

随机可以稍微优化一下,用轮询。
https://www.orchome.com/685

netman525 -> 半兽人 7年前

你好,方便加下的你微信吗?  有些其他问题,想向你请教下。

netman525 -> netman525 7年前

hi,麻烦再请假下:上述的问题根源,看了几天源码,做了测试,大概能够找到问题了。增加了 request.timeout.ms =60000 和 buffer.memory = 96M。 但是在抢购高峰期,依然会出现

Expiring 26 record(s) for camera-15: 60001 ms has passed since batch creation plus linger time。

目前集群

brokers=5
partition=50

针对抢购大概3分钟内出现的超时问题,可以将request.timeout.ms=5分钟吗?
是不是需要更大的buffer.memory ?
有其他办法吗?

netman525 -> netman525 7年前

还可以选择 增加linger.time、建立多个kafkaproducer ?

半兽人 -> netman525 7年前

buffer.memory持续增大,你的消息体积很大吗?

netman525 -> 半兽人 7年前

不算大,消息内容基本就是SQL、请求时间以及封装的JSON格式的数据。 抢购2分钟时间产生超过100万条数据

你的答案

查看kafka相关的其他问题或提一个您自己的问题