kafka发送数据到队列前丢失

妄青山 发表于: 2022-09-26   最后更新时间: 2022-09-26 17:41:19   1,170 游览

我用python 生成了4000多条数据,最后通过kafka send()出去的只有3000多条,有时候2000条。

我尝试了几个方法解决:

  1. 每次send数据前time_sleep一段时间(大概0.001秒),就不会丢失数据。
  2. 使用同步发送数据也就是send().get()之后也不会丢失数据
  3. 使用异步回调发送数据也不会丢失。

以下是丢失数据的代码。4000条数据如果都成功经过send的话sum应该会累加到4000,可是最终只到了2000-3000。

求大佬解答!!

input_path = event.src_path
producer.send("Runparse", input_path)
sum = sum +1
print(sum)
发表于 2022-09-26
添加评论

客户端发送消息,消息先到缓存区打包成一个批次之后,批量发给kafka,而这个时候,你的python进程已经结束了,所以导致这部分缓存的消息还没来得及发送,所以你加了休眠,就不会丢消息了。

你可以在循环结束之后,追加如下代码,这样生产者就会将未发送的消息立即发送给kafka了:

producer.close();
妄青山 -> 半兽人 2年前

谢大佬,可能我没讲清楚,是这么个情况,具体我是使用python watchdog 在不断监控文件夹下产生文件的情况,并不是循环发送数据,文件夹新增一个文件我就会send一次文件路径到kafka中,我将1000个左右的文件移动到监控的文件夹下,sum的数量就为正常的1000,并都可以正常传输到kafka,但数量增加到1500个左右后就会丢失数据,sum的数量就少于1500。

我怀疑是不是send没有成功,某个环节将数据丢弃了。

半兽人 -> 妄青山 2年前

只有这一种情况的,你加了休眠,就不丢了呀。
核心其实就是producer所处的进程(线程也一样)结束了,所以它也跟着结束了,但是消息还没来得及发。

妄青山 -> 半兽人 2年前

回大佬,同样的程序,用的同一个kafka集群,我在不同的电脑上进行了测试,在我本机测试的时候,数据并没有发生数据丢失,我看移动数据的速度大概有300个每秒左右,然后在第二台电脑上移动速度为每秒2800个左右。在第一台电脑上测试了几万条数据都成功发送,第二台电脑上的话就发生了丢失情况。如果按照大佬说的,producer所处的进程(线程)结束了,没来得及发送出去,应该是两台电脑都会发生这种情况吧(程序都是一样的
所以我怀疑是因为生成文件的速率太快了,send来不及发送就将数据丢弃了。

半兽人 -> 妄青山 2年前

两台电脑都会发生这种情况吧

是的,另外一台电脑可能结束的没那么果断,如果数据都是一致的情况下,多试几次就可能复现了。

核心就是这个原因导致的,程序逻辑就要看你自己找原因了。

有个点你要注意,如果你的进程持续watch永远不停止,producer理论是长连接,producer是单例,也不会销毁,所以最终消息还是会发送成功。所以如果你每次都是new producer,那么你的设计可能要在改改了。

妄青山 -> 半兽人 2年前

谢大佬,明天我去复现试试看。

你的答案

查看kafka相关的其他问题或提一个您自己的问题