返回到文章

采纳

编辑于 1年前

使用sarama手动提交消费者位移

kafka

使用sarama kafka go客户端进行消费者消费,发现在提交消费者位移时,最后几条消息可能无法正确提交位移,造成重复消费问题

func (h *handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    i := 0
    for m := range claim.Messages() {
        msg := m.Value
     // do something...
        if err := h.f(context.Background(), msg); err != nil {
            // noop
        } else {
            sess.MarkMessage(m, "")
            i++
            if i%3 == 0 {
                sess.Commit()
            }
        }
    }
    return nil
}

如以上代码所示,如果此次 pull 的消息总数为8条,则有2条消息位移无法正确提交,尽管已经消费。有什么好的办法也能将最后2条消息提交么?