kafka生产者发送消息失败导致内存溢出java.lang.OutOfMemoryError:Java heap space,请教如何解决?

桑代克 发表于: 2018-04-26   最后更新时间: 2018-04-26  
  •   0 订阅,851 游览

最近自己写生产者代码,实例化一个kafkaproducer,多个线程调用去发送消息,但是其会报堆栈溢出的错误,请问这个是什么原因?
错误如下:

Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
    at org.apache.kafka.common.header.internals.RecordHeaders.<init>(RecordHeaders.java:50)
    at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:80)
    at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:118)
    at producer.KafkaProducerThread.<init>(KafkaProducerThread.java:37)
    at producer.kafkaProducer417.sendMessage(kafkaProducer417.java:70)
    at producer.kafkaProducer417.main(kafkaProducer417.java:81)
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

代码如下:

public static void sendMessage(String key, String value){
   try{
        executor.submit(new   KafkaProducerThread(producer,key,value,config));
      }catch(Exception e){
         logger.error(e.getMessage());
      }
public static void main(String[] args){
    kafkaProducer417 test = new kafkaProducer417();
    test.createTopic();
    for(int i=0;i<10000000;i++){
        test.sendMessage("data1","hello world");
    }
 }

 public class KafkaProducerThread implements Runnable {

    private Producer<String, String> producer = null;
    private ProducerRecord<String, String> record = null;
    public KafkaProducerThread(Producer<String, String> producer, String key, String value, KafkaProducerConfig config){}
    public void run(){
        try {
            this.producer.send(this.record,
                    new Callback() {
                        public void onCompletion(RecordMetadata metadata, Exception e) {
                            if (e != null) {
                                logger.error(e.getMessage());
                            } else {
                                logger.info("The offset of the record we just sent is: " + metadata.offset());
                            }
                        }
                    });
        }catch(KafkaException e){
            logger.error("e.getMessage());
        }
    }
}






发表于: 3月前   最后更新时间: 3月前   游览量:851
上一条: 到头了!
下一条: 已经是最后了!

评论…


  • 先增加jvm。
    • 再想请问一个问题 错误:Expiring 6713 record(s) for topic1-1: 40006 ms has passed since last append是因为生产的速度太快吗?有什么建议的解决的方案吗?
        • 现在单条消息1KB的时候,落地速度在5Mb/s左右。配置为一个topic,一个partition,batch_size是16Kb,acks = 1,三个生产者线程发送,集群是2台服务器,每台是16G内存,2T硬盘,千兆网,redhat6.7系统;但当我消息达到2Kb时会发送失败,报上面的错误,请问是什么原因呢?还有生产速度为什么会这么慢呢?
            • 因为你2个集群,所以你的topic分区要大于2,依次提升分区数来测试性能。kafka客户端是以所对应的分区进行缓存批次发送的。这块可以提高性能。
              • 评论…
                • in this conversation
                  提问