kafka consumer 服务运行第十天直接死掉了,重启之后 消费数据 然后又死掉

佐岸 发表于: 2021-11-03   最后更新时间: 2021-11-04 14:01:04   2,678 游览

项目运行第十天,消费者服务死掉,无法消费数据,导致数据积压

单节点kafka

| 2021-11-03T21:51:45.330+0800 |INFO |org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler.shutdown@(ExecutorConfigurationSupport.java:218)|Shutting down ExecutorService
| 2021-11-03T21:51:45.330+0800 |INFO |org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe@(KafkaConsumer.java:1082)|[Consumer clientId=consumer-local1-26, groupId=local1] Unsubscribed all topics or patterns and assigned partitions
| 2021-11-03T21:51:45.330+0800 |INFO |org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler.shutdown@(ExecutorConfigurationSupport.java:218)|Shutting down ExecutorService
| 2021-11-03T21:51:45.331+0800 |INFO |org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe@(KafkaConsumer.java:1082)|[Consumer clientId=consumer-local1-5, groupId=local1] Unsubscribed all topics or patterns and assigned partitions
| 2021-11-03T21:51:45.331+0800 |INFO |org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler.shutdown@(ExecutorConfigurationSupport.java:218)|Shutting down ExecutorService
| 2021-11-03T21:51:45.331+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer.info@(LogAccessor.java:292)|local1: partitions revoked: [mtalog_GameOnlineAmountEvent-1]
| 2021-11-03T21:51:45.331+0800 |INFO |org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe@(KafkaConsumer.java:1082)|[Consumer clientId=consumer-local1-8, groupId=local1] Unsubscribed all topics or patterns and assigned partitions
| 2021-11-03T21:51:45.331+0800 |INFO |org.apache.kafka.clients.consumer.internals.AbstractCoordinator.maybeLeaveGroup@(AbstractCoordinator.java:979)|[Consumer clientId=consumer-local1-50, groupId=local1] Member consumer-local1-50-32068794-b01d-4a48-ac69-dcf88b7f7e0f sending LeaveGroup request to coordinator 172.29.2.215:9092 (id: 2147483647 rack: null) due to the consumer unsubscribed from all topics
| 2021-11-03T21:51:45.332+0800 |INFO |org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler.shutdown@(ExecutorConfigurationSupport.java:218)|Shutting down ExecutorService
| 2021-11-03T21:51:45.332+0800 |INFO |org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe@(KafkaConsumer.java:1082)|[Consumer clientId=consumer-local1-50, groupId=local1] Unsubscribed all topics or patterns and assigned partitions
| 2021-11-03T21:51:45.332+0800 |INFO |org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler.shutdown@(ExecutorConfigurationSupport.java:218)|Shutting down ExecutorService
| 2021-11-03T21:51:45.332+0800 |INFO |org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler.shutdown@(ExecutorConfigurationSupport.java:218)|Shutting down ExecutorService
| 2021-11-03T21:51:45.332+0800 |INFO |org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler.shutdown@(ExecutorConfigurationSupport.java:218)|Shutting down ExecutorService
| 2021-11-03T21:51:45.332+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer.info@(LogAccessor.java:292)|local1: partitions revoked: [mtalog_GamePlayerLoginRecordEvent-1]
| 2021-11-03T21:51:45.333+0800 |INFO |org.apache.kafka.clients.consumer.internals.AbstractCoordinator.maybeLeaveGroup@(AbstractCoordinator.java:979)|[Consumer clientId=consumer-local1-18, groupId=local1] Member consumer-local1-18-b2311c16-154a-4577-b2d1-3b37f778950e sending LeaveGroup request to coordinator 172.29.2.215:9092 (id: 2147483647 rack: null) due to the consumer unsubscribed from all topics
| 2021-11-03T21:51:45.333+0800 |INFO |org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe@(KafkaConsumer.java:1082)|[Consumer clientId=consumer-local1-18, groupId=local1] Unsubscribed all topics or patterns and assigned partitions
| 2021-11-03T21:51:45.333+0800 |INFO |org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler.shutdown@(ExecutorConfigurationSupport.java:218)|Shutting down ExecutorService
| 2021-11-03T21:51:45.373+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.info@(LogAccessor.java:292)|local1: Consumer stopped
| 2021-11-03T21:51:45.375+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.info@(LogAccessor.java:292)|local1: Consumer stopped
| 2021-11-03T21:51:45.377+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.info@(LogAccessor.java:292)|local1: Consumer stopped
| 2021-11-03T21:51:45.386+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.info@(LogAccessor.java:292)|local1: Consumer stopped
| 2021-11-03T21:51:45.389+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.info@(LogAccessor.java:292)|local1: Consumer stopped
| 2021-11-03T21:51:45.390+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.info@(LogAccessor.java:292)|local1: Consumer stopped
| 2021-11-03T21:51:45.391+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.info@(LogAccessor.java:292)|local1: Consumer stopped
| 2021-11-03T21:51:45.393+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.info@(LogAccessor.java:292)|local1: Consumer stopped
| 2021-11-03T21:51:45.395+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.info@(LogAccessor.java:292)|local1: Consumer stopped
| 2021-11-03T21:51:45.397+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.info@(LogAccessor.java:292)|local1: Consumer stopped
| 2021-11-03T21:51:45.401+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.info@(LogAccessor.java:292)|local1: Consumer stopped
| 2021-11-03T21:51:45.402+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.info@(LogAccessor.java:292)|local1: Consumer stopped
| 2021-11-03T21:51:45.452+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.info@(LogAccessor.java:292)|local1: Consumer stopped
| 2021-11-03T21:51:45.487+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.info@(LogAccessor.java:292)|local1: Consumer stopped
| 2021-11-03T21:51:45.487+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.info@(LogAccessor.java:292)|local1: Consumer stopped
| 2021-11-03T21:51:45.515+0800 |INFO |org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.shutdown@(ExecutorConfigurationSupport.java:218)|Shutting down ExecutorService 'applicationTaskExecutor'
| 2021-11-03T21:51:45.517+0800 |INFO |com.zaxxer.hikari.HikariDataSource.close@(HikariDataSource.java:350)|HikariPool-2 - Shutdown initiated...
| 2021-11-03T21:51:45.532+0800 |INFO |com.zaxxer.hikari.HikariDataSource.close@(HikariDataSource.java:352)|HikariPool-2 - Shutdown completed.
| 2021-11-03T21:51:45.533+0800 |INFO |com.zaxxer.hikari.HikariDataSource.close@(HikariDataSource.java:350)|HikariPool-1 - Shutdown initiated...
| 2021-11-03T21:51:45.543+0800 |INFO |com.zaxxer.hikari.HikariDataSource.close@(HikariDataSource.java:352)|HikariPool-1 - Shutdown completed.

错误信息:

2021-11-02T17:09:21.940+0800 | [main] | ERROR | org.springframework.boot.SpringApplication.reportFailure@(SpringApplication.java:837) - Application run failed
java.lang.IllegalStateException: Failed to execute CommandLineRunner
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:798)
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:779)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:322)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)
    at com.chujian.boss.MatConsumerApplication.main(MatConsumerApplication.java:17)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49)
    at org.springframework.boot.loader.Launcher.launch(Launcher.java:107)
    at org.springframework.boot.loader.Launcher.launch(Launcher.java:58)
    at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:88)
Caused by: java.lang.NullPointerException: null
    at com.chujian.boss.kafka.StartKafka.lambda$run$0(StartKafka.java:40)
    at java.util.ArrayList.forEach(ArrayList.java:1257)
    at com.chujian.boss.kafka.StartKafka.run(StartKafka.java:39)
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:795)
    ... 13 common frames omitted
2021-11-02T17:09:21.942+0800 | [Thread-3] | WARN  | com.alibaba.nacos.common.http.HttpClientBeanHolder.shutdown@(HttpClientBeanHolder.java:108) - [HttpClientBeanHolder] Start destroying common HttpClient
2021-11-02T17:09:21.942+0800 | [Thread-14] | WARN  | com.alibaba.nacos.common.notify.NotifyCenter.shutdown@(NotifyCenter.java:145) - [NotifyCenter] Start destroying Publisher
2021-11-02T17:09:21.942+0800 | [Thread-14] | WARN  | com.alibaba.nacos.common.notify.NotifyCenter.shutdown@(NotifyCenter.java:162) - [NotifyCenter] Destruction of the end
2021-11-02T17:09:21.942+0800 | [Thread-3] | WARN  | com.alibaba.nacos.common.http.HttpClientBeanHolder.shutdown@(HttpClientBeanHolder.java:114) - [HttpClientBeanHolder] Destruction of the end

代码如下:

public class StartKafka implements CommandLineRunner {

  @Resource
  private KafkaListenerEndpointRegistry registry;

  @Resource
  private KafkaTopicConfig config;

  @Autowired
  private AdminClient adminClient;

  @Override
  public void run(String... args) {
    config.getTopics().forEach(s -> {
      registry.getListenerContainer(s.getName()).start();
      System.out.printf("启动 {%s} %n", s);
      adminClient.createTopics(Collections.singletonList(s.toNewTopic()));
    });
  }
}

目前kafka是单节点,副本为1,分区为2

用命令:

/data/soft/kafka/kafka_2.12-2.2.2/bin/kafka-consumer-groups.sh --describe --bootstrap-server 172.29.2.215:9092 --describe --group local1

查询,有以下信息:

Consumer group 'local1' has no active members.

会不会跟kafka里的数据有关?

发表于 2021-11-03
  • 你怎么启动的消费者补充一下。半兽人 3年前 回复
    @半兽人 用docker启动的佐岸 回复
  • 你程序宕了吗(进程结束)?半兽人 3年前 回复
    @半兽人 对,服务停了,这套架构用在其他项目是没问题的,就这个项目有问题,感觉时不时kafka哪里配置错误了佐岸 回复
    @佐岸 你进程结束,一般是内存溢出(oom)的概率比较大导致这种情况,程序并发处理消息可能会导致,你加大程序的jvm试试。半兽人 回复
    @半兽人 看了下log,没有出现oom的问题,我这边加大试试吧佐岸 回复
    @佐岸 还有一种可能,获取到消息之后,捕获这个消息的所有异常,不能往上抛,会破坏消费者循坏,半兽人 回复
    @佐岸 问题还可以编辑 不要写在这里哦 看不清半兽人 回复
  • 从目前为止,你提供的信息都不完整,既然是消费的时候报的错,我没看到相关的代码。你这第一步都报错,这个信息毫无意义。我还是重点怀疑你的「消息」处理异常,导致向上抛了错误,而导致线程终止了。半兽人 3年前 回复
    @半兽人 那这块如何排查?我需要提供什么信息?数据库这边包括入库日志 我都看了,暂时没看到错误的log,佐岸 回复
  • 问题解决了,归结:配置文件里的一个topic名字和kafka里创建的有误差,导致自动监控 topic时,真正的kafka里没有改该topic,就会启动出错,导致线程异常终止,感谢佐岸 3年前 回复
添加评论

问题解决了,归结:配置文件里的一个topic名字和kafka里创建的有误差,导致自动监控topic时,真正的kafka里没有该topic,导致线程异常终止。

其实就是上述报错的代码问题,一开始忽略了。毕竟这个错误出现之后,消费了一批数据并且正常入库,消费者就停止了。

半兽人 -> 佐岸 3年前

恭喜哦,采纳自己吧。

你的答案

查看kafka相关的其他问题或提一个您自己的问题