项目运行第十天,消费者服务死掉,无法消费数据,导致数据积压
单节点kafka
| 2021-11-03T21:51:45.330+0800 |INFO |org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler.shutdown@(ExecutorConfigurationSupport.java:218)|Shutting down ExecutorService
| 2021-11-03T21:51:45.330+0800 |INFO |org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe@(KafkaConsumer.java:1082)|[Consumer clientId=consumer-local1-26, groupId=local1] Unsubscribed all topics or patterns and assigned partitions
| 2021-11-03T21:51:45.330+0800 |INFO |org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler.shutdown@(ExecutorConfigurationSupport.java:218)|Shutting down ExecutorService
| 2021-11-03T21:51:45.331+0800 |INFO |org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe@(KafkaConsumer.java:1082)|[Consumer clientId=consumer-local1-5, groupId=local1] Unsubscribed all topics or patterns and assigned partitions
| 2021-11-03T21:51:45.331+0800 |INFO |org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler.shutdown@(ExecutorConfigurationSupport.java:218)|Shutting down ExecutorService
| 2021-11-03T21:51:45.331+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer.info@(LogAccessor.java:292)|local1: partitions revoked: [mtalog_GameOnlineAmountEvent-1]
| 2021-11-03T21:51:45.331+0800 |INFO |org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe@(KafkaConsumer.java:1082)|[Consumer clientId=consumer-local1-8, groupId=local1] Unsubscribed all topics or patterns and assigned partitions
| 2021-11-03T21:51:45.331+0800 |INFO |org.apache.kafka.clients.consumer.internals.AbstractCoordinator.maybeLeaveGroup@(AbstractCoordinator.java:979)|[Consumer clientId=consumer-local1-50, groupId=local1] Member consumer-local1-50-32068794-b01d-4a48-ac69-dcf88b7f7e0f sending LeaveGroup request to coordinator 172.29.2.215:9092 (id: 2147483647 rack: null) due to the consumer unsubscribed from all topics
| 2021-11-03T21:51:45.332+0800 |INFO |org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler.shutdown@(ExecutorConfigurationSupport.java:218)|Shutting down ExecutorService
| 2021-11-03T21:51:45.332+0800 |INFO |org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe@(KafkaConsumer.java:1082)|[Consumer clientId=consumer-local1-50, groupId=local1] Unsubscribed all topics or patterns and assigned partitions
| 2021-11-03T21:51:45.332+0800 |INFO |org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler.shutdown@(ExecutorConfigurationSupport.java:218)|Shutting down ExecutorService
| 2021-11-03T21:51:45.332+0800 |INFO |org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler.shutdown@(ExecutorConfigurationSupport.java:218)|Shutting down ExecutorService
| 2021-11-03T21:51:45.332+0800 |INFO |org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler.shutdown@(ExecutorConfigurationSupport.java:218)|Shutting down ExecutorService
| 2021-11-03T21:51:45.332+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer.info@(LogAccessor.java:292)|local1: partitions revoked: [mtalog_GamePlayerLoginRecordEvent-1]
| 2021-11-03T21:51:45.333+0800 |INFO |org.apache.kafka.clients.consumer.internals.AbstractCoordinator.maybeLeaveGroup@(AbstractCoordinator.java:979)|[Consumer clientId=consumer-local1-18, groupId=local1] Member consumer-local1-18-b2311c16-154a-4577-b2d1-3b37f778950e sending LeaveGroup request to coordinator 172.29.2.215:9092 (id: 2147483647 rack: null) due to the consumer unsubscribed from all topics
| 2021-11-03T21:51:45.333+0800 |INFO |org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe@(KafkaConsumer.java:1082)|[Consumer clientId=consumer-local1-18, groupId=local1] Unsubscribed all topics or patterns and assigned partitions
| 2021-11-03T21:51:45.333+0800 |INFO |org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler.shutdown@(ExecutorConfigurationSupport.java:218)|Shutting down ExecutorService
| 2021-11-03T21:51:45.373+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.info@(LogAccessor.java:292)|local1: Consumer stopped
| 2021-11-03T21:51:45.375+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.info@(LogAccessor.java:292)|local1: Consumer stopped
| 2021-11-03T21:51:45.377+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.info@(LogAccessor.java:292)|local1: Consumer stopped
| 2021-11-03T21:51:45.386+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.info@(LogAccessor.java:292)|local1: Consumer stopped
| 2021-11-03T21:51:45.389+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.info@(LogAccessor.java:292)|local1: Consumer stopped
| 2021-11-03T21:51:45.390+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.info@(LogAccessor.java:292)|local1: Consumer stopped
| 2021-11-03T21:51:45.391+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.info@(LogAccessor.java:292)|local1: Consumer stopped
| 2021-11-03T21:51:45.393+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.info@(LogAccessor.java:292)|local1: Consumer stopped
| 2021-11-03T21:51:45.395+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.info@(LogAccessor.java:292)|local1: Consumer stopped
| 2021-11-03T21:51:45.397+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.info@(LogAccessor.java:292)|local1: Consumer stopped
| 2021-11-03T21:51:45.401+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.info@(LogAccessor.java:292)|local1: Consumer stopped
| 2021-11-03T21:51:45.402+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.info@(LogAccessor.java:292)|local1: Consumer stopped
| 2021-11-03T21:51:45.452+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.info@(LogAccessor.java:292)|local1: Consumer stopped
| 2021-11-03T21:51:45.487+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.info@(LogAccessor.java:292)|local1: Consumer stopped
| 2021-11-03T21:51:45.487+0800 |INFO |org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.info@(LogAccessor.java:292)|local1: Consumer stopped
| 2021-11-03T21:51:45.515+0800 |INFO |org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.shutdown@(ExecutorConfigurationSupport.java:218)|Shutting down ExecutorService 'applicationTaskExecutor'
| 2021-11-03T21:51:45.517+0800 |INFO |com.zaxxer.hikari.HikariDataSource.close@(HikariDataSource.java:350)|HikariPool-2 - Shutdown initiated...
| 2021-11-03T21:51:45.532+0800 |INFO |com.zaxxer.hikari.HikariDataSource.close@(HikariDataSource.java:352)|HikariPool-2 - Shutdown completed.
| 2021-11-03T21:51:45.533+0800 |INFO |com.zaxxer.hikari.HikariDataSource.close@(HikariDataSource.java:350)|HikariPool-1 - Shutdown initiated...
| 2021-11-03T21:51:45.543+0800 |INFO |com.zaxxer.hikari.HikariDataSource.close@(HikariDataSource.java:352)|HikariPool-1 - Shutdown completed.
错误信息:
2021-11-02T17:09:21.940+0800 | [main] | ERROR | org.springframework.boot.SpringApplication.reportFailure@(SpringApplication.java:837) - Application run failed
java.lang.IllegalStateException: Failed to execute CommandLineRunner
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:798)
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:779)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:322)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)
at com.chujian.boss.MatConsumerApplication.main(MatConsumerApplication.java:17)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49)
at org.springframework.boot.loader.Launcher.launch(Launcher.java:107)
at org.springframework.boot.loader.Launcher.launch(Launcher.java:58)
at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:88)
Caused by: java.lang.NullPointerException: null
at com.chujian.boss.kafka.StartKafka.lambda$run$0(StartKafka.java:40)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at com.chujian.boss.kafka.StartKafka.run(StartKafka.java:39)
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:795)
... 13 common frames omitted
2021-11-02T17:09:21.942+0800 | [Thread-3] | WARN | com.alibaba.nacos.common.http.HttpClientBeanHolder.shutdown@(HttpClientBeanHolder.java:108) - [HttpClientBeanHolder] Start destroying common HttpClient
2021-11-02T17:09:21.942+0800 | [Thread-14] | WARN | com.alibaba.nacos.common.notify.NotifyCenter.shutdown@(NotifyCenter.java:145) - [NotifyCenter] Start destroying Publisher
2021-11-02T17:09:21.942+0800 | [Thread-14] | WARN | com.alibaba.nacos.common.notify.NotifyCenter.shutdown@(NotifyCenter.java:162) - [NotifyCenter] Destruction of the end
2021-11-02T17:09:21.942+0800 | [Thread-3] | WARN | com.alibaba.nacos.common.http.HttpClientBeanHolder.shutdown@(HttpClientBeanHolder.java:114) - [HttpClientBeanHolder] Destruction of the end
代码如下:
public class StartKafka implements CommandLineRunner {
@Resource
private KafkaListenerEndpointRegistry registry;
@Resource
private KafkaTopicConfig config;
@Autowired
private AdminClient adminClient;
@Override
public void run(String... args) {
config.getTopics().forEach(s -> {
registry.getListenerContainer(s.getName()).start();
System.out.printf("启动 {%s} %n", s);
adminClient.createTopics(Collections.singletonList(s.toNewTopic()));
});
}
}
目前kafka是单节点,副本为1,分区为2
用命令:
/data/soft/kafka/kafka_2.12-2.2.2/bin/kafka-consumer-groups.sh --describe --bootstrap-server 172.29.2.215:9092 --describe --group local1
查询,有以下信息:
Consumer group 'local1' has no active members.
会不会跟kafka里的数据有关?
问题解决了,归结:配置文件里的一个topic名字和kafka里创建的有误差,导致自动监控topic时,真正的kafka里没有该topic,导致线程异常终止。
其实就是上述报错的代码问题,一开始忽略了。毕竟这个错误出现之后,消费了一批数据并且正常入库,消费者就停止了。
恭喜哦,采纳自己吧。
你的答案