这个项目是处理实时数据的,接到上游的实时数据之后通过Spring Integration分发到kafka里面。
数据是一条一条过来的,每过来一条数据就立即发往Kafka集群。
然后就报
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-1"
这种错误。
这是因为实时数据量比较大,一条一条地向kafka集群发送数据网络速度跟不上,数据积压在内存里面导致了OOM吗?
求大神指导!
是的,不清楚你业务逻辑的实现方式,但是OOM你必须提高JVM的配置。
你现还未报发送kafka超时类的错误,说明在规定时间内消息还是可以到kafka的。
JVM初始化内存已经调大了,还是发生OOM内存溢出。现在我将linger.ms调整为100开启批处理,这样减少了网络通讯的次数从而提高生产者的效率,但是本地无法重现OOM异常,所以难以验证这样开启批处理是否有效。
这是原来的代码,现在
kafkaTemplate.flush();
方法注释掉了。public void sendMessages(Map<string,string> message) { String ric = ""; if(null != message && message.size() != 0) { ric = message.get("marketDataCode"); } ObjectMapper mapper = new ObjectMapper(); String dataOfRic = null; try { dataOfRic = mapper.writeValueAsString(message); } catch (JsonProcessingException e) { e.printStackTrace(); } kafkaTemplate.send(config.getTopic(), ric.getBytes(), dataOfRic.getBytes()); kafkaTemplate.flush(); }
这里是多线程调用一个生产者实例的,生产者链接上集群之后不关闭。
我不太明白数据堆积在什么地方,Kafka生产者客户端有缓存吗?
kafkaTemplate.flush(); 每次调用会严重影响效率。
kafka客户端是按批次发送的,所以效率会非常高。
linger.ms 非 0 的时候客户端才真的开启批处理吧,linger.ms=0不也是立即发送的吗
kafka的消息发送依靠
linger.ms
和batch.size
,哪个达到了就立即执行,这些批次队列是new出来的,直接就丢到传输层了,新来的就会到新new的batch上。而你调用1条消息1次flush,与上面的逻辑相互执行,flush是应该在关闭应用的时候执行。
高并发场景下,瞬时的消息量非常高了。
现在不再调用1条消息1次flush了,现在其实算是低负载,所以设置linger.ms=100,batch.size为默认值,这样的话可以让batch积攒到一定的大小再发送,这样RTT的消耗就少了。
flush方法会使所有缓存记录立即被发送,现在想要批处理,所以不用flush了。
去掉flush是好了吧?
好像并没有,从内存监控结果来看内存还是在一直增加,不过去掉flush之后性能确实有所提高。
怎么样了
好像还是有问题的,内存监控发现内存还是在增加。kafkaTemplate调用生产者相关的Demo有吗?方便的话发我参考一下,我邮箱2860748715@qq.com
内存增加,GC回收就可以,只要不满就没问题。我没有使用过springboot。
Uncaught exception in kafka-coordinator-heartbeat-thread kafka消费者运行一段时间之后报了这样的错误,这是又为啥啊?
不同的问题,重新提吧,要详细描述。
报oom的是kafka客户端,还是你自己的多线程程序?
thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread,看这句话应该是kafka生产者客户端的网络线程抛出的异常
你的答案