SpringBoot整合Kafka(开启SSL状态报错不开启SSL可以正常使用) 项目启动报错Java heap space

胡Ba一.ˇ° 发表于: 2019-04-11   最后更新时间: 2019-04-28 11:00:55   5,807 游览

提问说明

SpringBoot项目整合了Kafka,Kafka Broker都开启了SSL(没有开启验证主机名),并且在Linux下用producer以及consumer的命令行可以在SSL状态下使用,但是在SpringBoot中启动就会报错.导致无法启动.证书也都有

 spring:
  kafka:
    ssl:
      key-store-location: file:C:/Users/eatheryu/Desktop/ca/server/server.keystore.jks
      key-store-password: 1111111
      key-password: 1111111
      trust-store-location: file:C:/Users/eatheryu/Desktop/ca/client/client.truststore.jks
      trust-store-password: 1111111
      key-store-type: JKS

    bootstrap-servers: 111.111.111.111:8089
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: all
      retries: 30
      ssl:
        protocol: SSL

    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      enable-auto-commit: false
      group-id: test3
      auto-offset-reset: latest
      max-poll-records: 20
      ssl:
        protocol: SSL

报错信息如下:


Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2019-04-11 16:25:41.095 ERROR 18668 --- [  restartedMain] o.s.boot.SpringApplication               : Application run failed

org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is java.lang.OutOfMemoryError: Java heap space
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:879) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:161) ~[spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:549) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140) ~[spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775) [spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) [spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:316) [spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) [spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) [spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at com.example.kafka.KafkaApplication.main(KafkaApplication.java:10) [classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_191]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_191]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191]
    at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) [spring-boot-devtools-2.1.0.RELEASE.jar:2.1.0.RELEASE]
Caused by: java.lang.OutOfMemoryError: Java heap space
    at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) ~[na:1.8.0_191]
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) ~[na:1.8.0_191]
    at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:335) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:296) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:560) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:496) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.common.network.Selector.poll(Selector.java:425) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:218) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:274) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1774) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1742) ~[kafka-clients-2.0.0.jar:na]
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.checkTopics(AbstractMessageListenerContainer.java:275) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
    at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:135) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:257) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:289) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:238) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:879) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:161) ~[spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:549) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140) ~[spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775) [spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) [spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]

纠结很久了但还不知道错误在哪里

发表于 2019-04-11
添加评论

看样子大概率是 SSL的问题。
https://stackoverflow.com/questions/58218493/kafka-and-ssl-java-lang-outofmemoryerror-java-heap-space-when-using-kafka-top
看下 kafka Server.log 就会发现 SSL认证失败。

猜测原因是 SSL 认证失败, 数据始终没有发送出去, 最后 OOM。
希望对其他人有所帮助。

看的人不少就是没人回答.....目前问题还未解决..一点思路没有

老兄,这个调整下jvm。你的内存不足了。

关掉ssl验证立马就好啦

你先调整下试试。

jvm初始值我从128调到了512...还是报错....有一点.我把连接broker的ip地址换成hosts配置过的服务器名称,然后开启ssl通过服务器名称验证,这样子报的是Timeout expired while fetching topic metadata,好像都提示Broker may not be available. 都连不上... 用ip是能连上.但是会报jvm堆溢出....烦

Caused by: java.lang.OutOfMemoryError: null 设置了jvm内存后会是这样子...

那还不是因为设置的小了。。

我调到2048都不行.....

有毒。你是不是死循环了,内存有多少吃多少。

冤枉啊...我的代码都是正常的...

Producer代码

@RequestMapping("/send")
public String sendMessage(){
    String msg="producer";
        ListenableFuture test = kafkaTemplate.send(topic,msg);
        test.addCallback(new ListenableFutureCallback() {
            @Override
            public void onFailure(Throwable throwable) {
                System.out.println("发送失败");
            }

            @Override
            public void onSuccess(Object o) {
                System.out.println("发送成功");
            }
        });
    return "success";
}

Consumer代码

@KafkaListener(topics = "${pro.topic2}", groupId = "${spring.kafka.consumer.group-id}", containerFactory = "kafkaListenerContainerFactory", errorHandler = "consumerAwareErrorHandler")
public void listen2(ConsumerRecord<?, ?> consumerRecord, Acknowledgment acknowledgment) {
    System.out.println("testInQT====key======" + consumerRecord.key() + "=====Value===" + consumerRecord.value() + "=====offset=====" + consumerRecord.offset() + "consumerRecord.partition();" + consumerRecord.partition());
    acknowledgment.acknowledge();
}

这个代码不开ssl就可以跑通,开了就报异常...

你的答案

查看kafka相关的其他问题或提一个您自己的问题