我想实现kafka精准一次消费,这样可行吗

发表于: 2023-03-19   最后更新时间: 2023-03-19 22:20:38   1,111 游览

⼿动维护偏移量,将偏移量存储到Redis中,使⽤先消费后保存偏移量⽅式保证⾄少⼀次消费;使⽤消息添加ID,将ID放⼊Redis的Set集合,如果消息ID重复,则不再消费,保证消费的幂等性。

发表于 2023-03-19
添加评论

那我们来理一下这个里面涉及到的漏洞问题:

  1. 当你消费消息之后,在上传redis的过程中,程序crash,会导致重复消费。
  2. 当你先放到redis,去消费的时候,程序crash,会导致消息loss。

这个过程,我并没有增加提交offset到kafka服务器的场景,其实这个动作,相当于redis。

所以,依然没有解决这个问题。

我的建议是每次消费1条,提交1条,来保证在极端情况下,只会丢1条消息。
如果你实在是想保证幂等,只有利用redis除了记录offset id,也要增加一步消息确认消费的步骤,确定消费完了。

消息的偏移量会被保存在redis中,首先先检查redis中是否有该主题的偏移量,如果有,从redis记录的偏移量开始消费,如果没有,从最新或者最旧的偏移量消费。
当从redis获得偏移量后,得到偏移量对应的broker的消息,程序会先判断消息的id是否在redis的set集合里面,如果id在,说明此前消费过,直接从下一条消息开始消费;如果不在,会先进行消费消息将统计的指标存储在外部存储中,存储后也就是消费完成再把消息的id存在set集合中。 最后提交偏移量到redis中。
在这个过程中,如果把消息的id存在set集合之前程序挂了,仍然会导致消费重复。如果在最后提交偏移量到redis之前程序挂了,由于消息id已经存到set里面了,所以不会导致重复消费。
所以我觉得这样依旧解决不了问题,如果把消息的id存在set集合之前程序挂了,也就是消费了,没有保存消息id,也没有保存消息偏移量,我想恢复不让它重复消费,也就是从下一条偏移量开始消费,但是我又怎么确定它确实是在消息的id存在set集合之前程序挂了呢而不是在程序的其他地方挂了呢?

半兽人 -> 1年前

增加一步消息确认消费的步骤,大概的步骤如下:

  1. offset存储到redis
  2. 程序开始消费
  3. 程序消费完成后,将消费完成的结果在写给redis(多的这一步)
  4. commit给kafka集群

这个虽然稳健一点,但是如果第三步的时候crash了,依旧会造成重复消费(你可以加一步待确认消息,告警或者通过人工处理来确认这笔消息是否被消费国,来弥补这个问题)

所以总结下来,kill -9或者程序crash的情况下,幂等保证不了的。

这种极端的条件建议不要过度的设计了,我们使用了7、8年了,还没有出现过这种问题。

-> 半兽人 1年前

好的,感谢

你的答案

查看kafka相关的其他问题或提一个您自己的问题