Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-1"这样的问题有人遇到过吗

林飞 发表于: 2019-10-13   最后更新时间: 2019-10-13 22:17:05   6,172 游览

这个项目是处理实时数据的,接到上游的实时数据之后通过Spring Integration分发到kafka里面。

数据是一条一条过来的,每过来一条数据就立即发往Kafka集群。

然后就报

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-1"

这种错误。

这是因为实时数据量比较大,一条一条地向kafka集群发送数据网络速度跟不上,数据积压在内存里面导致了OOM吗?

求大神指导!

发表于 2019-10-13
添加评论

是的,不清楚你业务逻辑的实现方式,但是OOM你必须提高JVM的配置。
你现还未报发送kafka超时类的错误,说明在规定时间内消息还是可以到kafka的。

林飞 -> 半兽人 5年前

JVM初始化内存已经调大了,还是发生OOM内存溢出。现在我将linger.ms调整为100开启批处理,这样减少了网络通讯的次数从而提高生产者的效率,但是本地无法重现OOM异常,所以难以验证这样开启批处理是否有效。

这是原来的代码,现在kafkaTemplate.flush();方法注释掉了。

public void sendMessages(Map<string,string> message) {
     String ric = ""; 
     if(null != message && message.size() != 0) {
      ric = message.get("marketDataCode");
     }
   ObjectMapper mapper = new ObjectMapper();
   String dataOfRic = null;
   try {
    dataOfRic = mapper.writeValueAsString(message);
   } catch (JsonProcessingException e) {
    e.printStackTrace();
   }
         kafkaTemplate.send(config.getTopic(), ric.getBytes(), dataOfRic.getBytes()); 
         kafkaTemplate.flush();
  }

这里是多线程调用一个生产者实例的,生产者链接上集群之后不关闭。
我不太明白数据堆积在什么地方,Kafka生产者客户端有缓存吗?

半兽人 -> 林飞 5年前

kafkaTemplate.flush(); 每次调用会严重影响效率。
kafka客户端是按批次发送的,所以效率会非常高。

林飞 -> 半兽人 5年前

linger.ms 非 0 的时候客户端才真的开启批处理吧,linger.ms=0不也是立即发送的吗

半兽人 -> 林飞 5年前

kafka的消息发送依靠linger.msbatch.size,哪个达到了就立即执行,这些批次队列是new出来的,直接就丢到传输层了,新来的就会到新new的batch上。
而你调用1条消息1次flush,与上面的逻辑相互执行,flush是应该在关闭应用的时候执行。
高并发场景下,瞬时的消息量非常高了。

林飞 -> 半兽人 5年前

现在不再调用1条消息1次flush了,现在其实算是低负载,所以设置linger.ms=100,batch.size为默认值,这样的话可以让batch积攒到一定的大小再发送,这样RTT的消耗就少了。

flush方法会使所有缓存记录立即被发送,现在想要批处理,所以不用flush了。

八荒 -> 林飞 5年前

去掉flush是好了吧?

林飞 -> 八荒 5年前

好像并没有,从内存监控结果来看内存还是在一直增加,不过去掉flush之后性能确实有所提高。

半兽人 -> 林飞 5年前

怎么样了

林飞 -> 半兽人 5年前

好像还是有问题的,内存监控发现内存还是在增加。kafkaTemplate调用生产者相关的Demo有吗?方便的话发我参考一下,我邮箱2860748715@qq.com

半兽人 -> 林飞 5年前

内存增加,GC回收就可以,只要不满就没问题。我没有使用过springboot。

林飞 -> 半兽人 5年前

Uncaught exception in kafka-coordinator-heartbeat-thread kafka消费者运行一段时间之后报了这样的错误,这是又为啥啊?

半兽人 -> 林飞 5年前

不同的问题,重新提吧,要详细描述。

报oom的是kafka客户端,还是你自己的多线程程序?

林飞 -> 八荒 5年前

thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread,看这句话应该是kafka生产者客户端的网络线程抛出的异常

你的答案

查看kafka相关的其他问题或提一个您自己的问题