kafka配置“request.required.acks”=-1时,循环发送消息会出现消息丢失的情况。

剑枫寒 发表于: 2019-09-02   最后更新时间: 2019-09-02 22:51:02   5,320 游览

根据群集配置“buffer.memory”,默认为32M,因为生产速度太快,所以缓冲区已满阻塞,导致消息丢失),但是当acks = 1时,我测试时似乎没有丢失消息

代码如下:

 while ( isRunning ) {
        if (times > 100000)
            break;
    RdKafka::ErrorCode resp = kafka_producer_->produce(kafka_topic_.get(),
                                                       partition_,
                                                       ProducerConfig::Instance()->getProducerConfigMsgflags(),
                                                       data,
                                                       data_len,
                                                       (key.empty())?nullptr:&key,
                                                       nullptr);

        .....

        printf("%% %d messages (%lu bytes) producer success !!!",
               (times*1), (times*jsonStr.size()));
        times ++;
    }

报错信息:

101637 LOG-5-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out ProduceRequest in flight (after 60712ms, timeout #0)
101638 % "LOG": event is a log message: LOG-5-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out ProduceRequest in fli       ght (after 60712ms, timeout #0)
101639 LOG-5-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out ProduceRequest in flight (after 60712ms, timeout #1)
101640 % "LOG": event is a log message: LOG-5-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out ProduceRequest in fli       ght (after 60712ms, timeout #1)
101641 LOG-5-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out ProduceRequest in flight (after 60712ms, timeout #2)
101642 % "LOG": event is a log message: LOG-5-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out ProduceRequest in fli       ght (after 60712ms, timeout #2)
101643 LOG-5-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out ProduceRequest in flight (after 60712ms, timeout #3)
101644 % "LOG": event is a log message: LOG-5-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out ProduceRequest in fli       ght (after 60712ms, timeout #3)
101645 LOG-5-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out ProduceRequest in flight (after 60712ms, timeout #4)
101646 % "LOG": event is a log message: LOG-5-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out ProduceRequest in fli       ght (after 60712ms, timeout #4)
101647 LOG-4-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out 1429 in-flight, 0 retry-queued, 0 out-queue, 0 partial       ly-sent requests
101648 % "LOG": event is a log message: LOG-4-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out 1429 in-flight, 0 ret       ry-queued, 0 out-queue, 0 partially-sent requests

问题:

  1. 根据错误日志来看,是由于网略超时,导致生产者缓冲区满了,引起了消息丢失。是不是要判断返回值,假如遇到:ERR__QUEUE_FULL 时要进行手动重试?

  2. 因为生产者是同样的生产速度,为什么acks=-1时会引起缓冲区满了,而acks=1时就不会引起缓冲区满。
    我的理解如下:

    当生产者推送一条消息并从代理broker接收到确认时,它将删除此消息。 如果它没有收到确认,它将重新加入缓冲区并等待重新发送。

    Acks = -1因为接收到的所有代理的ack非常慢,所以缓冲区中的旧消息无法清除,导致缓冲区被阻塞,后面的消息丢失。

  3. 不太清楚acks这个配置参数对于生产者的影响是什么,楼主可否详细告知一下,以及生产消息的整个流程。

发表于 2019-09-02
添加评论

生产者需要leader确认请求完成之前接收的应答数。此配置控制了发送消息的耐用性,支持以下配置:

  • acks=0 如果设置为0,那么生产者将不等待任何消息确认。消息将立刻添加到socket缓冲区并考虑发送。在这种情况下不能保障消息被服务器接收到。并且重试机制不会生效(因为客户端不知道故障了没有)。每个消息返回的offset始终设置为-1。

  • acks=1,这意味着leader写入消息到本地日志就立即响应,而不等待所有follower应答。在这种情况下,如果响应消息之后但follower还未复制之前leader立即故障,那么消息将会丢失。

  • acks=all 这意味着leader将等待所有副本同步后应答消息。此配置保障消息不会丢失(只要至少有一个同步的副本或者)。这是最强壮的可用性保障。等价于acks=-1。

参考:https://www.orchome.com/511

副本数越多,阻塞时间越长。导致缓冲区满,建议调大缓存和超时时间即可。建议压测到一定安全范围内,失败的消息会非常偶发,打印告警打印即可,无需特意重试。

剑枫寒 -> 半兽人 5年前

我就是不太清楚,生产者发送一个消息、和收到ack响应、和socket阻塞时间,以及缓冲区为什么会满之间的关系,一条消息异步发送的整个流程

剑枫寒 -> 半兽人 5年前

我设置 linger.ms = 500 、debug = msg 、acks = 1、message.max.bytes = 1000000(default)、batch.num.messages = 10000(default)
没有出现保存,也没有出现消息丢失,请问下面的Log是什么意思大佬?

191862 LOG-7-PRODUCE: [thrdserver1:9292/bootstrap]: server1:9292/3: test_sdk7 [1]: No more space in current MessageSet (1634 messag e(s), 999553 bytes)

半兽人 -> 剑枫寒 5年前

异步消息,只是你这块不阻塞,消息其实在缓存队列里等待发送。ack响应时间影响了缓存里的消息确认完毕是否可以释放。

你的答案

查看kafka相关的其他问题或提一个您自己的问题