kafka消费端手动提交ack失败

渊虹 发表于: 2022-11-08   最后更新时间: 2022-11-08 14:56:42   2,411 游览

压测时发生消费者被踢出组问题,导致重复消费(1分钟之后) 看了日志从消费开始到被踢出组才一秒钟左右的时间 应该不是消费时间过长导致消费者踢出组

导致重复消费的原因就是ack.acknowledge();的时候抛了下面的异常

现在想知道为什么会抛这个异常呢???

screenshot

screenshot

screenshot

 @Bean("ack_manual_immediate")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaAckContainerFactory(KafkaProperties kafkaProperties) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        Map<String, Object> props = consumerConfigs(kafkaProperties);
        //重新设置为:禁止自动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory(props));
        //设置提交偏移量的方式, MANUAL_IMMEDIATE 表示消费一条提交一次;MANUAL表示批量提交一次
        factory.getContainerProperties().setAckMode((ContainerProperties.AckMode.MANUAL_IMMEDIATE));
        //是否自动启动
        factory.setAutoStartup(true);
        //防止topic不存在的时候报错
        factory.setMissingTopicsFatal(false);
        return factory;

    }
@KafkaListener(containerFactory = "ack_manual_immediate", topics = {KafkaTopic.TRANS_POST_PROCESSING}, groupId = KafkaTopic.TRANS_POST_PROCESSING + "_ACCOUNT")
    public void onMsgReceive(ConsumerRecord<String, String> record, Acknowledgment ack) {
        log.info("订单[{}]交易成功记账消费开始", record.value());
        String transOrderNo = new JSONObject(record.value()).getStr("transOrderNo");
        String transStatus = new JSONObject(record.value()).getStr("transStatus");
        try {
            if (!TransStatusEnum.SUCCESS.name().equals(transStatus)) {
                log.info("订单[{}]状态为[{}]不做记账处理", record.value(), transStatus);
                ack.acknowledge();
                return;
            }
            var transInfoPo = transInfoService.getById(transOrderNo);
            if (transInfoPo == null) {
                log.error("订单[{}]不存在", transOrderNo);
                ack.acknowledge();
                return;
            }
            if (!TransStatusEnum.SUCCESS.name().equals(transInfoPo.getTransStatus())) {
                log.error("订单[{}]状态为[{}]不做记账处理", transOrderNo, transInfoPo.getTransStatus());
                ack.acknowledge();
                return;
            }
            if(!StrUtil.equals(transInfoPo.getAccountStatus(), AccountStatusEnum.UN_ACCOUNT.name())){
                log.error("订单[{}]记账状态为[{}]不做记账处理", transOrderNo, transInfoPo.getAccountStatus());
                ack.acknowledge();
                return;
            }
            //非0元激活 不记账
            boolean accountFlag = true;
            if (StrUtil.equals(transInfoPo.getActivityTransFlag(), YesOrNotEnum.YES.getValue())) {
                var terminalInfoPo = terminalInfoService.getById(transInfoPo.getDeviceSn());
                var activityCashBackPo = activityCashBackService.getOne(new LambdaQueryWrapperX<ActivityCashBackPo>()
                        .eq(ActivityCashBackPo::getActivityCashNo, terminalInfoPo.getActivityCashNo())
                        .eq(ActivityCashBackPo::getPolicyNo, terminalInfoPo.getPolicyNo())
                        .eq(ActivityCashBackPo::getEffectiveStatus, EffectiveStatusEnum.IN_EFFECT.getValue())
                );
                if (activityCashBackPo != null) {
                    if (transInfoPo.getTransAmount().compareTo(activityCashBackPo.getActivityAmount()) == 0
                            && activityCashBackPo.getActivityAmount().compareTo(BigDecimal.ZERO) > 0
                    ) {
                        log.info("订单[{}]-[{}]非0元激活[{}]不记账", transOrderNo, transInfoPo.getTransAmount(), activityCashBackPo.getActivityAmount());
                        accountFlag = false;
                    }
                }
            }
            if (accountFlag) {
                var res = accountHandler.chargeAccount(transInfoPo);
                AccountStatusEnum accountStatus = StrUtil.equals(res.getResult().getResCode(), "200") ? AccountStatusEnum.ACCOUNT_SUCCESS : AccountStatusEnum.ACCOUNT_FAIL;
                if (res.hasResult()) {
                    transInfoService.lambdaUpdate()
                            .set(TransInfoPo::getAccountStatus, accountStatus.name())
                            .set(TransInfoPo::getAccountMsg, res.getResult().getResMsg())
                            .set(TransInfoPo::getAccountTime, new Date())
                            .eq(TransInfoPo::getTransOrderNo, transOrderNo)
                            .eq(TransInfoPo::getAccountStatus, AccountStatusEnum.UN_ACCOUNT)
                            .eq(TransInfoPo::getTransStatus, TransStatusEnum.SUCCESS.name())
                            .update();
                    //发送 交易记账成功主题:TRANS_ACCOUNT_SUCC
                    if (accountStatus == AccountStatusEnum.ACCOUNT_SUCCESS) {
                        var jsonObject = new JSONObject();
                        jsonObject.set("transOrderNo", transOrderNo);
                        producer.sendMessage(KafkaTopic.TRANS_ACCOUNT_SUCC, jsonObject.toString());
                    }
                }
            } 
        } catch (Exception e) {
            log.error("订单[{}]交易成功记账消费异常:", transOrderNo, e);
        }
        log.info("订单[{}]交易成功记账消费结束1", transOrderNo);

        ack.acknowledge();

        log.info("订单[{}]交易成功记账消费结束2", transOrderNo);
    }
2022-11-07 16:35:49.283   [org.springframework.kafka.KafkaListenerEndpointContainer#6-0-C-1] ERROR o.s.k.l.KafkaMessageListenerContainer - [error,149] - Error handler threw an exception
102 org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void cn.eeepay.payment.core.modules.kafka.consumer.TransSuccessAccountConsumer.onMsgReceive(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception; nested exception is org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.; nested exception is org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
103         at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:206)
104         at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:133)
105         at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2625)
106         at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2494)
107         at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2405)
108         at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2284)
109         at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1958)
110         at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1353)
111         at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1344)
112         at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1236)
113         at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
114         at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
115         at java.base/java.lang.Thread.run(Thread.java:833)
发表于 2022-11-08
//设置提交偏移量的方式, MANUAL_IMMEDIATE 表示消费一条提交一次;MANUAL表示批量提交一次
factory.getContainerProperties().setAckMode((ContainerProperties.AckMode.MANUAL_IMMEDIATE));

MANUAL_IMMEDIATE 这个提交的方式虽然是消费一次提交一次 但是kafka每一批都会拉取max.poll.records(默认500条数据) 比如说在消费第300条数据是手动提交的时候 (1-300条数据)消费的总时长超过了max.poll.interval.ms 就会被踢出消费者组

骑火箭的牛 -> 渊虹 1年前

我这边也出现了相同的场景。。。应该也是这个原因

你的答案

查看kafka相关的其他问题或提一个您自己的问题