使用sarama手动提交消费者位移

▓千年祇园 卐 发表于: 2023-11-13   最后更新时间: 2023-11-13 17:01:43   386 游览

使用sarama kafka go客户端进行消费者消费,发现在提交消费者位移时,最后几条消息可能无法正确提交位移,造成重复消费问题

func (h *handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    i := 0
    for m := range claim.Messages() {
        msg := m.Value
     // do something...
        if err := h.f(context.Background(), msg); err != nil {
            // noop
        } else {
            sess.MarkMessage(m, "")
            i++
            if i%3 == 0 {
                sess.Commit()
            }
        }
    }
    return nil
}

如以上代码所示,如果此次 pull 的消息总数为8条,则有2条消息位移无法正确提交,尽管已经消费。有什么好的办法也能将最后2条消息提交么?

发表于 2023-11-13
添加评论

这个代码很有深度,奇数提交来保证当突然宕机,最大损失的消息数只有3个。

如果我理解对的话,go的水平有限,只给你描述一下代码逻辑:

1、加个判断,来确认是剩余的消息数。例如:小于3,则提交Offset。

你好,是这样的,上述代码中 claim.Messages()是chan类型,类似于数据管道,数据会源源不断地进来,理论上,如果再进来一条消息,即9条消息,上述最后2条消息也就被消费了。这里主要模拟后续没有消息进来,就像是到第8条消息处理完客户端异常退出了,下次重启时,这2条消息会被重复消费。提到claim.Messages()是chan类型,这样的话,比较难以判断通道中剩余的消息数,不像是在一个已知的容器,有固定大小。

你的答案

查看kafka相关的其他问题或提一个您自己的问题