kafka 消费中出现大量的重复数据,其中有的数据重复,有的数据不重复。因为生产的数据量比较大,就没5000条就手动提交,并将consumer停止,通过定时任务再将consumer启动,这里reset的方式是earliest。通过测试发现offset有回滚的现象,如下图所示:这种情况应该如何排查,在图中还有出现Rebalance的情况,群组中一直都是3个消费者,对应着一个有三个分区的主题。这种Rebalance会不会是导致消息重复的原因。
Rebalance是导致你重复消费的原因。
1、默认情况下,单笔消息处理的时间与手动提交offset的动作,超过30秒(默认),就会报rebalance,那你的offset没提交成功,其他的消费者就会重新消费一遍消息。
2、你的流程很奇怪,生产和消费本来就是独立的流程,不应该有耦合关系。你生产你的,我消费我的,为何你的逻辑会绑定在一起。
生产者和消费者是隔离的,因为数据量比较大,会导致内存溢出的问题,所以就让消费者满足某个条件就停止该消费者,三个消费者组成的消费者群组,里面的某个消费者满足了条件就将其停止,然后就可能触发Rebalance,导致的消息重复的问题,能不能关闭Rebalance解决这个问题。
可以增加超时时间,但是治标不治本。
内存溢出是生产者溢出,还是消费者溢出?
消费者是一个批次一个批次的拉取消息的,如果当前批次没有消费完,是不会拉取新的消息的。
老师,是消费者溢出,我们通过增大超时时间,可以防止单个消费者因为处理时间超时导致的Rebalance问题,但是因为消费者一直在运行会导致内存溢出,所以我们就采用了满足某个条件就将消费者进行停止。再使用定时将消费者启动起来。我们不是因为这个消费者因为超时导致被踢出group,而是因为我们采用了满足某个条件就将消费者进行停止,从而导致的Rebalance,这种情况下怎么解决?
一个问题一个问题的解决
1、内存溢出的原因,默认一个批次拉取3000条,你溢出了,调大jvm,即可解决(因为在你代码不调整的情况下,还是默认拉取3000条,满足它就不会在溢出了)
2、offset提交的方式,拿到消息,立马提交,然后在慢慢处理,如果这个时候溢出,会导致消息丢失,不会重复。
3、减少批次的数量,原来一次拉取3000条,改成一次拉取1000条,也可以解决你的问题。
4、你的消费者停止逻辑写的有问题那。提交offset报rebalance,刚恢复的消费者怎么会报呢?
老师。我使用的kafka的版本是2.2.1,网上说该版本采用的旧重平衡协议eager协议,但是2,4之后就采用了改进的协议cooperative协议,如果有一个consumer 离开了消费群组,其他的消费者也可以继续使用其所消费的分区。可以通过参数来控制Rebalance的情况,通过参数session.timeout.ms,session.timeout.ms去调整。不知道这个能不能解决。
你都没认真看我的回答,只要触发Rebalance,你的消息要么丢,要么重复消费。跟算法有关系吗?
谢谢您的回答😂,我从网上找到的,也不知道是不是解决方式,下面是原文:
其中成员加入或成员离组是最常见的触发重平衡的情况。新成员加入这个场景必然发生重平衡,没办法优化(针对初始化多个消费者的情况有其他优化,即延迟进行重平衡),但消费者崩溃离组却可以优化。因为一个消费者崩溃离组通常不会影响到其他{partition - consumer}的分配情况。
因此在 kafka 2.3~2.4 推出一项优化,即此次介绍的Static Membership功能和一个consumer端的配置参数 group.instance.id。一旦配置了该参数,成员将自动成为静态成员,否则的话和以前一样依然被视为是动态成员。
静态成员的好处在于,其静态成员ID值是不变的,因此之前分配给该成员的所有分区也是不变的。即假设一个成员挂掉,在没有超时前静态成员重启回来是不会触发 Rebalance 的(超时时间为session.timeout.ms,默认10 sec)。在静态成员挂掉这段时间,broker会一直为该消费者保存状态(offset),直到超时或静态成员重新连接。
另外您说的第四条不是很明白,我的场景类似:一个群组中有三个消费者,我们设置了消费者停止的条件,其中有一个消费者满足了条件,停止了,但是另外两个并没满足,还在运行。这时就会触发Rebalance,导致另外两个消息的重复消费。
再次感谢您的回答,学到了很多,respect
👍👍👍
你的答案