kafka消费时,有一个分区的记录每次都lag
换了多个不同group去消费,情况如下:
test-group1: 全部消费
test-group12: 2分区lag
test-group123: 2分区lag
test-group124: 0、2分区lag,重启0分区消费完成,2分区还是一直lag
test-group125: 2分区lag
test-group126: 0、2分区lag,重启0分区消费完成,2分区还是一直lag
test-group127: 2分区lag
配置信息:
spring.kafka.consumer.bootstrap-servers = xx.xx.xxx.xxx:9092,xx.xx.xxx.xxx:9092,xx.xx.xxx.xxx:9092
spring.kafka.consumer.properties.sasl.mechanism = SCRAM-SHA-256
spring.kafka.consumer.properties.security.protocol = SASL_PLAINTEXT
##新建消费组时从什么位置开始消费 latest:最近位置 earliest:最早位置
spring.kafka.consumer.auto-offset-reset = earliest
##批量消费一次最大拉取的数据量
spring.kafka.consumer.max-poll-records = 1000
##是否开启自动提交
spring.kafka.consumer.enable-auto-commit = false
#自动提交的间隔时间,自动提交开启时生效
spring.kafka.consumer.auto-commit-interval = 2000
#连接超时时间
spring.kafka.consumer.session-timeout = 20000
#手动提交设置与poll的心跳数,如果消息队列中没有消息,等待毫秒后,调用poll()方法。如果队列中有消息,立即消费消息,每次消费的消息的多少可以通过max.poll.records配置。
#300秒的提交间隔,如果程序大于300秒提交,会报错
spring.kafka.consumer.max-poll-interval = 300000
#设置拉取数据的大小,1M 1048576
spring.kafka.consumer.max-partition-fetch-bytes = 1048576
#消费组
spring.kafka.consumer.group-id = test-group123
#连接消费用户名
spring.kafka.consumer.username = test
#消费连接用户密码
spring.kafka.consumer.password = 123456
#是否开启批量消费,true表示批量消费
spring.kafka.listener.batch-listener = true
#设置消费的线程数
spring.kafka.listener.concurrencys = 3
#只限自动提交
spring.kafka.listener.poll-timeout = 1500
消费者监听配置:
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>(20);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
props.put("security.protocol", protocol);
props.put("sasl.mechanism", mechanism);
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.scram.ScramLoginModule required username=\""+username+"\" password=\""+password+"\";");
return props;
}
/**
*
* @return
*/
@Bean
@ConditionalOnMissingBean(name = "kafkaBatchListener")
public KafkaListenerContainerFactory<?> kafkaBatchListener() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = kafkaListenerContainerFactory();
factory.setConcurrency(concurrency);
return factory;
}
private ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
//批量消费
factory.setBatchListener(batchListener);
//如果消息队列中没有消息,等待timeout毫秒后,调用poll()方法。
// 如果队列中有消息,立即消费消息,每次消费的消息的多少可以通过max.poll.records配置。
//手动提交无需配置
factory.getContainerProperties().setPollTimeout(pollTimeout);
//设置提交偏移量的方式, MANUAL_IMMEDIATE 表示消费一条提交一次;MANUAL表示批量提交一次
//低版本的spring-kafka,ackMode需要引入AbstractMessageListenerContainer.AckMode.MANUAL
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
return factory;
}
private ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
消费者消费:
@KafkaListener(containerFactory = "kafkaBatchListener",topics = {"ODS_TM_TABLE_DATA_TOTAL"})
public void batchListener1(List<ConsumerRecord<?,?>> records, Acknowledgment ack){
try {
records.forEach(record -> {
//TODO - 处理消息
log.info("receive {} msg:{}",record.topic(),record.value().toString());
});
} catch (Exception e) {
//TODO - 消息处理异常操作
log.error("kafka listen error:{}",e.getMessage());
} finally {
//手动提交偏移量
ack.acknowledge();
}
}
代码封装的太多了,我看不出来里面的逻辑,有2种可能。
1、当消息接收后,任何异常自己消化,不要往上抛。
2、进程结束,需要休眠或者优雅停机,否则有可能造成你还没提交offset成功,进程已经结束了。
感谢大佬抽空解答,原因找到了,是因为测试账号限流了。
你的答案