利用spring boot集成的kafka在消费kafka 消息时,如果出现kafka集群崩溃或数据库连接故障时,有什么好的方法进行消息逻辑幂等性处理吗?我用到的方法是一直重复消费该条消息,但是该处理逻辑会导致后面的消息出现严重挤压,如果将失败的消息发送到失败主题中去,会出现入库数据存在时差错误,目前kafka即可群数据量较大,不方便存储位移到redis或者数据库中,大家有什么好的方法吗?
发表于 2021-03-16
首先,消费者不支持幂等的,要靠你自己来判断。
其次:
如果数据库有问题(阻塞),当下
失败的消息
就无法处理,那后续的消息也无法处理,你应该直到数据库恢复,重新消费这些失败的消息。数据库是正常的,那你这个消息因为某种原因,就它失败了,放到异常topic中,我认为是合理的,当下,无论你反复的处理,也是异常的,人工干预一下即可。
一般只要粗暴的关注第
方案1
,停止offset提交,从失败的地方开始重新消费。建议:不要写过多的逻辑,反而引起系统的不稳定(我们运行6、7年了,基本没出过异常)。
我们数据库是采用的HBASe数据库,当发送失败消息到另一个主题中时,可能会把原来hbase正确的数据替换掉,能否设置一下消费重试次数,如果达到次数重试阈值的话,则停止消费,或出现kafka集群故障或者网络延迟大规模故障的话,该方案显得可行吗?
我们目前数据库采用的是hbase数据库,采用方案发送失败消息到另一个主题中,让程序消费失败主题的数据,可能会存在将正确数据进行更新;目前我们的方案是一直seek某条位移消息,当kafka集群大规模故障或网络出现延迟故障时,kafka集群消息会出现一定的挤压,你说的方案和我目前采用的方案一致,现在能否设置一下重试消费次数阈值?如果达到重试阈值,就停止消费;若达不到则进行重试;麻烦请回复一下,这样是否妥当。
kafka消费者的消息是没有重试的,批量丢到客户端之后,标记消息是否已消费是靠offset:
如果没提交,那消费者重启,则还是从该位置进行消费(重复消费)。
所以一切的核心还是至于你的拿到消息之后,如果处理这条消息的。
所以你自己实现逻辑:
最后,我还是认为失败应该是几年都不会发生的事情,如果有阻塞(业务),你肯定需要提前准备好相对充足的资源来满足你的业务。
你的答案