1、kafka消费报错
2020-12-02 23:04:32.290 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Consumer exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72)
at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1378)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1085)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1134)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:999)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1504)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2331)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2326)
2、kafka版本kafka_2.11-2.4.0
3、设置了手动commit,配置代码如下
@Configuration
@EnableKafka
public class KafkaConfig {
@Autowired
private PropsConfig propsConfig;
@Bean
KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
// 开启批量监听
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(propsConfig.getPollTimeout());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getServers());
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId());
propsMap.put(ConsumerConfig.CLIENT_ID_CONFIG, propsConfig.getClientId());
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit());
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// 如果没有offset则从最后的offset开始读
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
// 必须大于session.timeout.ms的设置
propsMap.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "40000");
// 默认为30秒
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
propsMap.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
//设置每次接收Message的数量
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MessageDeserializer.class.getName());
return propsMap;
}
}
# kafka配置
kafka:
bootstrap-servers: 192.168.54.150:9092
consumer:
topic: example_wk_new
# 是否自动提交offset
enable-auto-commit: false
properties:
group-id: example_wk_new
pollTimeout: 3000
client-id: example_wk_new_1
listener:
# 消费端监听的topic不存在时,项目启动会报错(关掉)
missing-topics-fatal: false
请问这是怎么回事?需要修改什么?
默认情况下,消息处理时间超过了30秒,kafka将该消费者从组中移除了,认为其已经无效。所以将组中移除,所以提交offset失败了。
可以参考这个问题的第一个回答,来调整你的参数:https://www.orchome.com/893
所以为了能够在30秒内消费完消息,使每次的消息量减少,是不是可以减小 max.poll.records 值得设置
或者哪个配置可以增加消息处理时间?还是需要增加几个分区,增加消费者数量
不用调整
max.poll.records
,一个批次中一条消息别超过30秒即可(阻塞)。参考:https://www.orchome.com/535
然后,他的这一次消费失败,是不是自动将消息重新投递到其他分区的消费者消费?
对 会重复消费,因为你没优先提交offset。
谢谢大佬的解答
你的答案