最近使用spring集成kafka的api来自动监听并生产消息,发送消息使用如下api:
org.springframework.kafka.core.kafkaTemplate.sendDefault("your msg"),做压力测试时,正常情况下高并发生产消息都OK;
但模拟在kafka集群挂掉的情况下继续高并发发送消息时,发现生产者所在的服务很快会挂掉,weblogic抛出异常java.lang.OutOfMemoryError: Java heap space
;
个人理解是生产者发送大量消息失败时GC并不能及时回收而导致的jvm堆内存溢出;
请教下,如何控制kafka生产者发送消息失败时,不至于影响系统内存或生产者所在的应用服务?
生产消息的部分代码如下:
private static ExecutorService exec = Executors.newFixedThreadPool(10);
exec.execute( new Runnable() {
@Override
public void run() {
//发送消息到kafka
KafkaProducerFactory.getKafkaTemplate().sendDefault("your msg");
}
});
应用场景可能不太一样解决方式也大有不同,就按照我之前的场景的统一回复下吧:
1、内存溢出的原因:
当kafka集群(或单机)服务挂了,生产者继续向kafka发送消息时,有两个超时设置会导致线程不被及时释放,另外还有一个缓冲区大小的设置也会导致异常抛出,三个参数分别如下:
即使用默认配置,当kafka挂了,线程调用send()方法向kafka发送消息至少会被阻塞60s,线程分分钟就会全部被阻塞,web容器在没有可用线程时收到的请求一般还会存放在队列中等待响应,线程得不到释放意味着内存同样无法被释放,所以很快内存就溢出了。
解决思路:
因此适当减少阻塞超时时长(测试设置为300ms)、增加生产者内存缓冲区,即便kafka挂了只要能即时释放线程及内存,应用服务就不至于挂掉,但阻塞时长过小有可能导致kafka网络波动时部分数据丢失,对数据有严格要求的场景并不适用。另外也可以从线程池里做限制,避免高并发场景下线程堵死的情况。
样例代码及分析:
package com.demo.kafka.util; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.ListenableFuture; import com.demo.kafka.util.KafkaProducerFactory; public class KafkaAppender{ private static final Logger log = LoggerFactory.getLogger(KafkaAppender.class); // 线程数 private static int threadPoolNum = 20; private static ExecutorService exec = Executors.newFixedThreadPool(threadPoolNum); /** * 解决当kafka挂掉时生产者内存溢出的三种思路:</p> * 1.获取当前ExecutorService线程池活动线程数,当活动线程数等于创建线程池线程数量时,表所有线程均处于阻塞状态, * 此时return释放当前线程,不执行发送kafka; 特点:高并发时会丢失部分日志</p> * * 2.当活动线程数等于创建线程池线程数量时,执行TimeUnit.MILLISECONDS.sleep(1000) * 表示让出当前线程资源1秒,然后重新竞争发送; 特点:适用于kafka集群,或能及时恢复kafka服务的环境</p> * * 3.设置最大阻塞时长max.block.ms,和最大请求时长request.timeout.ms为300ms(默认60s) * 表示执行发送kafka超过300ms即认为发送失败,直接结束当前线程;(测试平均发送一条日志耗时为5ms) * */ public void sendMsg2Kafka(final String msg) { // 查询当前线程池活动线程数 int threadNum = ((ThreadPoolExecutor) exec).getActiveCount(); log.info("当前线程池活动线程数:{}", threadNum); // 1(供参考).当线程池没有可用线程时接结束该线程任务(丢弃日志) // while (threadNum == threadPoolNum) { // log.info("线程池可用线程数为0,丢弃该条日志"); // return; // } // 2(供参考).当线程池没有可用线程时,调用线程进入睡眠状态,并让出执行机会给其它线程1000ms // while (threadNum == threadPoolNum) { // try { // log.info("调用线程进入睡眠1000ms"); // TimeUnit.MILLISECONDS.sleep(1000); // } catch (InterruptedException e) { // log.error("Exception:", e); // } // } // 3. 发送消息到kafka exec.execute(new Runnable() { @Override public void run() { long start = System.nanoTime(); try { // 日志消息发送到kafka,并获获取返回结果 // KafkaProducerFactory是手动封装的一个获取KafkaTemplate的工厂类 ListenableFuture<SendResult<String, String>> result = KafkaProducerFactory.getKafkaTemplate().sendDefault(msg); // 解析回调函数确认是否发送成功,失败时打印失败信息及阻塞时长 if (result != null) { Long offsetIndex = result.get().getRecordMetadata().offset(); if (offsetIndex != null && offsetIndex >= 0) { // 发送成功 long end = System.nanoTime(); log.info("日志发送成功,offset:{},发送耗时:{}ms", offsetIndex,TimeUnit.NANOSECONDS.toMillis(end - start)); } else { // 发送失败 long end = System.nanoTime(); log.info("日志发送失败,阻塞时长:{}ms", TimeUnit.NANOSECONDS.toMillis(end - start)); } } } catch (Exception e) { // 发送异常 long end = System.nanoTime(); log.error("日志发送异常,阻塞时长:{}ms", TimeUnit.NANOSECONDS.toMillis(end - start), e); } } }); } }
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test [2019-10-10 17:02:09,513] ERROR Uncaught exception in thread 'kafka-producer-network-thread | console-producer': (org.apache.kafka.common.utils.KafkaThread) java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385) at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572) at org.apache.kafka.common.network.Selector.poll(Selector.java:483) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:331) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238) at java.lang.Thread.run(Thread.java:748)
一直报这个错,是什么原因?
已经执行了
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
还是一样。问题解决了吗? 我也遇到 这种情况
有更多代码吗?和内存溢出的错误也提供下。
kafka启动脚本
kafka-server-start.sh
中指定了kafka启动时需要的最小内存,默认为1G
调大就好了。
你的答案