返回到文章

采纳

编辑于

kafka配置“request.required.acks”=-1时,循环发送消息会出现消息丢失的情况。

kafka

根据群集配置“buffer.memory”,默认为32M,因为生产速度太快,所以缓冲区已满阻塞,导致消息丢失),但是当acks = 1时,我测试时似乎没有丢失消息

代码如下:

 while ( isRunning ) {
        if (times > 100000)
            break;
    RdKafka::ErrorCode resp = kafka_producer_->produce(kafka_topic_.get(),
                                                       partition_,
                                                       ProducerConfig::Instance()->getProducerConfigMsgflags(),
                                                       data,
                                                       data_len,
                                                       (key.empty())?nullptr:&key,
                                                       nullptr);

        .....

        printf("%% %d messages (%lu bytes) producer success !!!",
               (times*1), (times*jsonStr.size()));
        times ++;
    }

报错信息:

101637 LOG-5-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out ProduceRequest in flight (after 60712ms, timeout #0)
101638 % "LOG": event is a log message: LOG-5-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out ProduceRequest in fli       ght (after 60712ms, timeout #0)
101639 LOG-5-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out ProduceRequest in flight (after 60712ms, timeout #1)
101640 % "LOG": event is a log message: LOG-5-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out ProduceRequest in fli       ght (after 60712ms, timeout #1)
101641 LOG-5-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out ProduceRequest in flight (after 60712ms, timeout #2)
101642 % "LOG": event is a log message: LOG-5-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out ProduceRequest in fli       ght (after 60712ms, timeout #2)
101643 LOG-5-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out ProduceRequest in flight (after 60712ms, timeout #3)
101644 % "LOG": event is a log message: LOG-5-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out ProduceRequest in fli       ght (after 60712ms, timeout #3)
101645 LOG-5-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out ProduceRequest in flight (after 60712ms, timeout #4)
101646 % "LOG": event is a log message: LOG-5-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out ProduceRequest in fli       ght (after 60712ms, timeout #4)
101647 LOG-4-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out 1429 in-flight, 0 retry-queued, 0 out-queue, 0 partial       ly-sent requests
101648 % "LOG": event is a log message: LOG-4-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out 1429 in-flight, 0 ret       ry-queued, 0 out-queue, 0 partially-sent requests

问题:

  1. 根据错误日志来看,是由于网略超时,导致生产者缓冲区满了,引起了消息丢失。是不是要判断返回值,假如遇到:ERR__QUEUE_FULL 时要进行手动重试?

  2. 因为生产者是同样的生产速度,为什么acks=-1时会引起缓冲区满了,而acks=1时就不会引起缓冲区满。
    我的理解如下:

    当生产者推送一条消息并从代理broker接收到确认时,它将删除此消息。 如果它没有收到确认,它将重新加入缓冲区并等待重新发送。

    Acks = -1因为接收到的所有代理的ack非常慢,所以缓冲区中的旧消息无法清除,导致缓冲区被阻塞,后面的消息丢失。

  3. 不太清楚acks这个配置参数对于生产者的影响是什么,楼主可否详细告知一下,以及生产消息的整个流程。