使用sarama kafka go客户端进行消费者消费,发现在提交消费者位移时,最后几条消息可能无法正确提交位移,造成重复消费问题
func (h *handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
i := 0
for m := range claim.Messages() {
msg := m.Value
// do something...
if err := h.f(context.Background(), msg); err != nil {
// noop
} else {
sess.MarkMessage(m, "")
i++
if i%3 == 0 {
sess.Commit()
}
}
}
return nil
}
如以上代码所示,如果此次 pull 的消息总数为8条,则有2条消息位移无法正确提交,尽管已经消费。有什么好的办法也能将最后2条消息提交么?
这个代码很有深度,奇数提交来保证当突然宕机,最大损失的消息数只有3个。
如果我理解对的话,go的水平有限,只给你描述一下代码逻辑:
1、加个判断,来确认是剩余的消息数。例如:小于3,则提交Offset。
你好,是这样的,上述代码中 claim.Messages()是chan类型,类似于数据管道,数据会源源不断地进来,理论上,如果再进来一条消息,即9条消息,上述最后2条消息也就被消费了。这里主要模拟后续没有消息进来,就像是到第8条消息处理完客户端异常退出了,下次重启时,这2条消息会被重复消费。提到claim.Messages()是chan类型,这样的话,比较难以判断通道中剩余的消息数,不像是在一个已知的容器,有固定大小。
你的答案