kafka消费时,有一个分区的记录每次都延迟lag,消费不干净

亂舞ぁ蒼蠅 发表于: 2021-08-25   最后更新时间: 2021-08-25 17:41:03   2,364 游览

kafka消费时,有一个分区的记录每次都lag

kafka消费时,有一个分区的记录每次都延迟lag,消费不干净

换了多个不同group去消费,情况如下:

test-group1:    全部消费
test-group12:   2分区lag
test-group123:   2分区lag
test-group124:   0、2分区lag,重启0分区消费完成,2分区还是一直lag
test-group125:   2分区lag
test-group126:   0、2分区lag,重启0分区消费完成,2分区还是一直lag
test-group127:   2分区lag

配置信息:

spring.kafka.consumer.bootstrap-servers = xx.xx.xxx.xxx:9092,xx.xx.xxx.xxx:9092,xx.xx.xxx.xxx:9092
spring.kafka.consumer.properties.sasl.mechanism = SCRAM-SHA-256
spring.kafka.consumer.properties.security.protocol = SASL_PLAINTEXT
##新建消费组时从什么位置开始消费  latest:最近位置   earliest:最早位置
spring.kafka.consumer.auto-offset-reset = earliest
##批量消费一次最大拉取的数据量
spring.kafka.consumer.max-poll-records = 1000
##是否开启自动提交
spring.kafka.consumer.enable-auto-commit = false
#自动提交的间隔时间,自动提交开启时生效
spring.kafka.consumer.auto-commit-interval = 2000
#连接超时时间
spring.kafka.consumer.session-timeout = 20000
#手动提交设置与poll的心跳数,如果消息队列中没有消息,等待毫秒后,调用poll()方法。如果队列中有消息,立即消费消息,每次消费的消息的多少可以通过max.poll.records配置。
#300秒的提交间隔,如果程序大于300秒提交,会报错
spring.kafka.consumer.max-poll-interval = 300000
#设置拉取数据的大小,1M 1048576
spring.kafka.consumer.max-partition-fetch-bytes = 1048576
#消费组
spring.kafka.consumer.group-id = test-group123
#连接消费用户名
spring.kafka.consumer.username = test
#消费连接用户密码
spring.kafka.consumer.password = 123456

#是否开启批量消费,true表示批量消费
spring.kafka.listener.batch-listener = true
#设置消费的线程数
spring.kafka.listener.concurrencys = 3
#只限自动提交
spring.kafka.listener.poll-timeout = 1500

消费者监听配置:

private Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>(20);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
    props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);
    props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
    props.put("security.protocol", protocol);
    props.put("sasl.mechanism", mechanism);
    props.put("sasl.jaas.config",
            "org.apache.kafka.common.security.scram.ScramLoginModule required username=\""+username+"\" password=\""+password+"\";");

    return props;
}



/**
 *
 * @return
 */
@Bean
@ConditionalOnMissingBean(name = "kafkaBatchListener")
public KafkaListenerContainerFactory<?> kafkaBatchListener() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = kafkaListenerContainerFactory();
    factory.setConcurrency(concurrency);
    return factory;
}

private ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    //批量消费
    factory.setBatchListener(batchListener);
    //如果消息队列中没有消息,等待timeout毫秒后,调用poll()方法。
    // 如果队列中有消息,立即消费消息,每次消费的消息的多少可以通过max.poll.records配置。
    //手动提交无需配置
    factory.getContainerProperties().setPollTimeout(pollTimeout);
    //设置提交偏移量的方式, MANUAL_IMMEDIATE 表示消费一条提交一次;MANUAL表示批量提交一次
    //低版本的spring-kafka,ackMode需要引入AbstractMessageListenerContainer.AckMode.MANUAL
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
    return factory;
}

private ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

消费者消费:

@KafkaListener(containerFactory = "kafkaBatchListener",topics = {"ODS_TM_TABLE_DATA_TOTAL"})
public void batchListener1(List<ConsumerRecord<?,?>> records, Acknowledgment ack){

    try {
        records.forEach(record -> {

            //TODO - 处理消息
            log.info("receive {} msg:{}",record.topic(),record.value().toString());

        });
    } catch (Exception e) {

        //TODO - 消息处理异常操作
        log.error("kafka listen error:{}",e.getMessage());

    } finally {
        //手动提交偏移量
        ack.acknowledge();
    }

}
发表于 2021-08-25

代码封装的太多了,我看不出来里面的逻辑,有2种可能。
1、当消息接收后,任何异常自己消化,不要往上抛。
2、进程结束,需要休眠或者优雅停机,否则有可能造成你还没提交offset成功,进程已经结束了。

感谢大佬抽空解答,原因找到了,是因为测试账号限流了。

你的答案

查看kafka相关的其他问题或提一个您自己的问题