使用sarama kafka go客户端进行消费者消费,发现在提交消费者位移时,最后几条消息可能无法正确提交位移,造成重复消费问题
func (h *handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
i := 0
for m := range claim.Messages() {
msg := m.Value
// do something...
if err := h.f(context.Background(), msg); err != nil {
// noop
} else {
sess.MarkMessage(m, "")
i++
if i%3 == 0 {
sess.Commit()
}
}
}
return nil
}
如以上代码所示,如果此次 pull 的消息总数为8条,则有2条消息位移无法正确提交,尽管已经消费。有什么好的办法也能将最后2条消息提交么?