项目是kafka接口项目,共3台服务器,每台16个实例,供其他项目写入、查询kafka。
客户端是2.0.0版本,kafka服务端共5台服务器。
项目在运行过程中一直再报Direct buffer memory 异常,起初只是以为资源不够,遂尝试了加实例、加堆外内存,目前每个实例已经配置了-XX:MaxDirectMemorySize=10G
的大小依然不够....
尝试dump内存后发现了大量的Batch
项目里的producer相关代码如下,producer会存储在map中,保证只有一个
@Controller
@RequestMapping("/kafka")
public class KafkaController {
@ApiOperation(value = "kafka新增接口", produces = "application/json")
@RequestMapping(method = {RequestMethod.GET, RequestMethod.POST}, value = "/api/insert")
@ResponseBody
public KafkaBaseNewView kafkaInsert(HttpServletRequest request, HttpServletResponse response,Kfkaparam kafkaparam...){
...
List<List<String>> lists = ...待写入数据
for (List<String> strings : lists) {
KafkaProducerUtils.send(kafkaHost, topicName, partition, strings);
}
}
public static void send(String kafkaHost, String topic, Integer patition, List<String> list){
ProducerRecord<String, String> producerRecord = new ProducerRecord(topic,patition,String.valueOf(messageNo),s);
KafkaProducer kafkaProducer = null;
if(KAFKA_PRODUCER_MAP.containsKey(kafkaHost)){
kafkaProducer = KAFKA_PRODUCER_MAP.get(kafkaHost);
}else{
Properties prop = new Properties();
prop.put(ProducerConfig.BATCH_SIZE_CONFIG, 1048576);// 默认参数事16384即16KB
prop.put(ProducerConfig.LINGER_MS_CONFIG, 100);// 该参数是控制消息发送延时的 默认参数是0
prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, host);
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
prop.put("sasl.jaas.config", "xxx");
prop.put("security.protocol", "SASL_PLAINTEXT");
prop.put("sasl.mechanism", "PLAIN");
kafkaProducer = new KafkaProducer<>(prop);
KAFKA_PRODUCER_MAP.put(kafkaHost, kafkaProducer);
}
kafkaProducer.send(producerRecord);
}
报错信息为:
[api-kafka] 2021-12-09 19:35:40 [http-nio-10120-exec-498] ERROR o.a.c.c.C.[.[localhost].[/].[dispatcherServlet] | Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Handler dispatch failed; nested exception is java.lang.OutOfMemoryError: Direct buffer memory] with root cause
[api-kafka] 2021-12-09 19:42:28 [http-nio-10120-exec-515] ERROR o.a.c.c.C.[.[localhost].[/].[dispatcherServlet] | Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Handler dispatch failed; nested exception is java.lang.OutOfMemoryError: Direct buffer memory] with root cause
[api-kafka] 2021-12-09 19:46:37 [http-nio-10120-exec-472] ERROR o.a.c.c.C.[.[localhost].[/].[dispatcherServlet] | Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Handler dispatch failed; nested exception is java.lang.OutOfMemoryError: Direct buffer memory] with root cause
不知是否是项目达到了瓶颈还是配置有问题,烦请指点,感谢!
看了下,是你的消息堵塞的太严重了,来不及发送,越堆越多,导致的oom。
减少延迟,因为你的消息量级已经够了,不要等待0.1秒了,改成
1
或者0
。prop.put(ProducerConfig.LINGER_MS_CONFIG, 100);// 该参数是控制消息发送延时的 默认参数是0
batch.size,虽然加大了,但是你带宽不够,来不及发送。
prop.put(ProducerConfig.BATCH_SIZE_CONFIG, 1048576);// 默认参数事16384即16KB
acks=0
acks=0 如果设置为0,那么生产者将不等待任何消息确认。消息将立刻添加到socket缓冲区并考虑发送。在这种情况下不能保障消息被服务器接收到。并且重试机制不会生效(因为客户端不知道故障了没有)。每个消息返回的offset始终设置为-1。
更多生产者配置,参考:Kafka Producer配置
最后
其实就是消息太多,而来不及发送,导致的。
很大可能是kafka服务器节点的硬件(如网络)到达了瓶颈,你可以通过增加分区数/或者kafka节点数来分摊压力,提高发送速度,减少堵塞的消息量。
非常感谢,也就是说主要还是kafka服务端写入数据太慢导致的么?
假如有topic A01,现在我再增加topic A02,可以缓解这种现象么?这种拥堵现象对于每个topic是独立的还是会影响该集群下的所有topic呢?
你没有提供你的每秒kafka生产者上送的数据量每秒多少,而kafka服务器每秒接收多少,没有具体的数据支撑。你kafka的带宽和磁盘的吞吐量等等的数据都没有体现。所以这些量化这些数据,才能帮助你确认瓶颈。
但是,从你的表现来看,就是如此。
你增加topic A02确实是可以缓解(你增加topic和扩容topicA01分区效果是一样的)。
但是,如果你的5个服务器节点已经到了极限,你怎么做都是徒劳的,只能增加kafka节点数。
如果没到瓶颈,kafka节点也可以通过增加接收消息的数量来提高吞吐。
参考:Kafka Broker配置 检索关键字
max
。谢谢 我会根据建议进行尝试的
你的答案