我们使用producer异步发送,在失败的callback里close(0)来中止任何后序发送中的数据,在正常情况时效果非常好,但有个异常情况无法解决:
我们的in.flight为1,当一个request因为网络抖动长时间写不成功时,会得到如下WARN日志然后request进行重试:
Got error produce response with correlation id 19889 on topic-partition test1_local-0, retrying (5 attempts left). Error: NETWORK_EXCEPTION
但这个重试的request会耽误后序的batch,导致后序的batch抛出异常:
TimeoutException: Expiring 309 record(s) for test3_local-3: 30001 ms has passed since batch creation plus linger time
发送失败的这个后序batch在我们的callback里显式调用了close(0),问题来了,close(0)之后,前面retrying的request实际上都能retry成功写入broker,但在callback里显示失败:
IllegalStateException: Producer is closed forcefully.
由于我们的数据发送的序号是根据回调的成功来递增的,重新建立producer之后,会选择从最后一个成功回调的数据序号继续发送,导致retry的数据全部重复了一遍。
请问博主,解决这个异常的场景的思路在哪里呢?