kafka频繁报 OutOfMemoryError: Direct buffer memory 异常

G1-JVM 发表于: 2021-12-10   最后更新时间: 2021-12-10 20:26:38   3,088 游览

项目是kafka接口项目,共3台服务器,每台16个实例,供其他项目写入、查询kafka。

客户端是2.0.0版本,kafka服务端共5台服务器。

项目在运行过程中一直再报Direct buffer memory 异常,起初只是以为资源不够,遂尝试了加实例、加堆外内存,目前每个实例已经配置了-XX:MaxDirectMemorySize=10G的大小依然不够....

尝试dump内存后发现了大量的Batch

项目里的producer相关代码如下,producer会存储在map中,保证只有一个

@Controller
@RequestMapping("/kafka")
public class KafkaController {

@ApiOperation(value = "kafka新增接口", produces = "application/json")
@RequestMapping(method = {RequestMethod.GET, RequestMethod.POST}, value = "/api/insert")
@ResponseBody
public KafkaBaseNewView kafkaInsert(HttpServletRequest request, HttpServletResponse response,Kfkaparam kafkaparam...){

    ...

    List<List<String>> lists = ...待写入数据
    for (List<String> strings : lists) {
        KafkaProducerUtils.send(kafkaHost, topicName, partition, strings);
    }

}


public static void send(String kafkaHost, String topic, Integer patition, List<String> list){

    ProducerRecord<String, String> producerRecord = new ProducerRecord(topic,patition,String.valueOf(messageNo),s);

    KafkaProducer kafkaProducer = null;

    if(KAFKA_PRODUCER_MAP.containsKey(kafkaHost)){
        kafkaProducer = KAFKA_PRODUCER_MAP.get(kafkaHost);
    }else{
        Properties prop = new Properties();
        prop.put(ProducerConfig.BATCH_SIZE_CONFIG, 1048576);// 默认参数事16384即16KB
        prop.put(ProducerConfig.LINGER_MS_CONFIG, 100);// 该参数是控制消息发送延时的 默认参数是0
        prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, host);
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        prop.put("sasl.jaas.config", "xxx");
        prop.put("security.protocol", "SASL_PLAINTEXT");
        prop.put("sasl.mechanism", "PLAIN");
        kafkaProducer = new KafkaProducer<>(prop);
        KAFKA_PRODUCER_MAP.put(kafkaHost, kafkaProducer);
    }

    kafkaProducer.send(producerRecord);

}

报错信息为:

[api-kafka] 2021-12-09 19:35:40 [http-nio-10120-exec-498] ERROR o.a.c.c.C.[.[localhost].[/].[dispatcherServlet] | Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Handler dispatch failed; nested exception is java.lang.OutOfMemoryError: Direct buffer memory] with root cause
[api-kafka] 2021-12-09 19:42:28 [http-nio-10120-exec-515] ERROR o.a.c.c.C.[.[localhost].[/].[dispatcherServlet] | Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Handler dispatch failed; nested exception is java.lang.OutOfMemoryError: Direct buffer memory] with root cause
[api-kafka] 2021-12-09 19:46:37 [http-nio-10120-exec-472] ERROR o.a.c.c.C.[.[localhost].[/].[dispatcherServlet] | Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Handler dispatch failed; nested exception is java.lang.OutOfMemoryError: Direct buffer memory] with root cause

不知是否是项目达到了瓶颈还是配置有问题,烦请指点,感谢!

发表于 2021-12-10
  • 发送的代码,在多补充一下。半兽人 2年前
  • 已经追加了G1-JVM 2年前
¥20.0

看了下,是你的消息堵塞的太严重了,来不及发送,越堆越多,导致的oom。

  • 减少延迟,因为你的消息量级已经够了,不要等待0.1秒了,改成1或者0

    prop.put(ProducerConfig.LINGER_MS_CONFIG, 100);// 该参数是控制消息发送延时的 默认参数是0
    
  • batch.size,虽然加大了,但是你带宽不够,来不及发送。

    prop.put(ProducerConfig.BATCH_SIZE_CONFIG, 1048576);// 默认参数事16384即16KB
    
  • acks=0
    acks=0 如果设置为0,那么生产者将不等待任何消息确认。消息将立刻添加到socket缓冲区并考虑发送。在这种情况下不能保障消息被服务器接收到。并且重试机制不会生效(因为客户端不知道故障了没有)。每个消息返回的offset始终设置为-1。

  • 更多生产者配置,参考:Kafka Producer配置

最后

其实就是消息太多,而来不及发送,导致的。

很大可能是kafka服务器节点的硬件(如网络)到达了瓶颈,你可以通过增加分区数/或者kafka节点数来分摊压力,提高发送速度,减少堵塞的消息量。

G1-JVM -> 半兽人 2年前

非常感谢,也就是说主要还是kafka服务端写入数据太慢导致的么?
假如有topic A01,现在我再增加topic A02,可以缓解这种现象么?这种拥堵现象对于每个topic是独立的还是会影响该集群下的所有topic呢?

半兽人 -> G1-JVM 2年前

你没有提供你的每秒kafka生产者上送的数据量每秒多少,而kafka服务器每秒接收多少,没有具体的数据支撑。你kafka的带宽和磁盘的吞吐量等等的数据都没有体现。所以这些量化这些数据,才能帮助你确认瓶颈。

但是,从你的表现来看,就是如此。

你增加topic A02确实是可以缓解(你增加topic和扩容topicA01分区效果是一样的)。

但是,如果你的5个服务器节点已经到了极限,你怎么做都是徒劳的,只能增加kafka节点数。

如果没到瓶颈,kafka节点也可以通过增加接收消息的数量来提高吞吐。

  • message.max.bytes

参考:Kafka Broker配置 检索关键字max

G1-JVM -> 半兽人 2年前

谢谢 我会根据建议进行尝试的

你的答案

查看kafka相关的其他问题或提一个您自己的问题