根据群集配置“buffer.memory”,默认为32M,因为生产速度太快,所以缓冲区已满阻塞,导致消息丢失),但是当acks = 1时,我测试时似乎没有丢失消息
while ( isRunning ) {
if (times > 100000)
break;
RdKafka::ErrorCode resp = kafka_producer_->produce(kafka_topic_.get(),
partition_,
ProducerConfig::Instance()->getProducerConfigMsgflags(),
data,
data_len,
(key.empty())?nullptr:&key,
nullptr);
.....
printf("%% %d messages (%lu bytes) producer success !!!",
(times*1), (times*jsonStr.size()));
times ++;
}
101637 LOG-5-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out ProduceRequest in flight (after 60712ms, timeout #0)
101638 % "LOG": event is a log message: LOG-5-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out ProduceRequest in fli ght (after 60712ms, timeout #0)
101639 LOG-5-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out ProduceRequest in flight (after 60712ms, timeout #1)
101640 % "LOG": event is a log message: LOG-5-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out ProduceRequest in fli ght (after 60712ms, timeout #1)
101641 LOG-5-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out ProduceRequest in flight (after 60712ms, timeout #2)
101642 % "LOG": event is a log message: LOG-5-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out ProduceRequest in fli ght (after 60712ms, timeout #2)
101643 LOG-5-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out ProduceRequest in flight (after 60712ms, timeout #3)
101644 % "LOG": event is a log message: LOG-5-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out ProduceRequest in fli ght (after 60712ms, timeout #3)
101645 LOG-5-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out ProduceRequest in flight (after 60712ms, timeout #4)
101646 % "LOG": event is a log message: LOG-5-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out ProduceRequest in fli ght (after 60712ms, timeout #4)
101647 LOG-4-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out 1429 in-flight, 0 retry-queued, 0 out-queue, 0 partial ly-sent requests
101648 % "LOG": event is a log message: LOG-4-REQTMOUT: [thrd:server1:9292/bootstrap]: server1:9292/2: Timed out 1429 in-flight, 0 ret ry-queued, 0 out-queue, 0 partially-sent requests
根据错误日志来看,是由于网略超时,导致生产者缓冲区满了,引起了消息丢失。是不是要判断返回值,假如遇到:ERR__QUEUE_FULL 时要进行手动重试?
因为生产者是同样的生产速度,为什么acks=-1时会引起缓冲区满了,而acks=1时就不会引起缓冲区满。
我的理解如下:
当生产者推送一条消息并从代理broker接收到确认时,它将删除此消息。 如果它没有收到确认,它将重新加入缓冲区并等待重新发送。
Acks = -1因为接收到的所有代理的ack非常慢,所以缓冲区中的旧消息无法清除,导致缓冲区被阻塞,后面的消息丢失。
不太清楚acks这个配置参数对于生产者的影响是什么,楼主可否详细告知一下,以及生产消息的整个流程。