压测时发生消费者被踢出组问题,导致重复消费(1分钟之后) 看了日志从消费开始到被踢出组才一秒钟左右的时间 应该不是消费时间过长导致消费者踢出组
导致重复消费的原因就是ack.acknowledge();的时候抛了下面的异常
现在想知道为什么会抛这个异常呢???
@Bean("ack_manual_immediate")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaAckContainerFactory(KafkaProperties kafkaProperties) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
Map<String, Object> props = consumerConfigs(kafkaProperties);
//重新设置为:禁止自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
factory.setConsumerFactory(new DefaultKafkaConsumerFactory(props));
//设置提交偏移量的方式, MANUAL_IMMEDIATE 表示消费一条提交一次;MANUAL表示批量提交一次
factory.getContainerProperties().setAckMode((ContainerProperties.AckMode.MANUAL_IMMEDIATE));
//是否自动启动
factory.setAutoStartup(true);
//防止topic不存在的时候报错
factory.setMissingTopicsFatal(false);
return factory;
}
@KafkaListener(containerFactory = "ack_manual_immediate", topics = {KafkaTopic.TRANS_POST_PROCESSING}, groupId = KafkaTopic.TRANS_POST_PROCESSING + "_ACCOUNT")
public void onMsgReceive(ConsumerRecord<String, String> record, Acknowledgment ack) {
log.info("订单[{}]交易成功记账消费开始", record.value());
String transOrderNo = new JSONObject(record.value()).getStr("transOrderNo");
String transStatus = new JSONObject(record.value()).getStr("transStatus");
try {
if (!TransStatusEnum.SUCCESS.name().equals(transStatus)) {
log.info("订单[{}]状态为[{}]不做记账处理", record.value(), transStatus);
ack.acknowledge();
return;
}
var transInfoPo = transInfoService.getById(transOrderNo);
if (transInfoPo == null) {
log.error("订单[{}]不存在", transOrderNo);
ack.acknowledge();
return;
}
if (!TransStatusEnum.SUCCESS.name().equals(transInfoPo.getTransStatus())) {
log.error("订单[{}]状态为[{}]不做记账处理", transOrderNo, transInfoPo.getTransStatus());
ack.acknowledge();
return;
}
if(!StrUtil.equals(transInfoPo.getAccountStatus(), AccountStatusEnum.UN_ACCOUNT.name())){
log.error("订单[{}]记账状态为[{}]不做记账处理", transOrderNo, transInfoPo.getAccountStatus());
ack.acknowledge();
return;
}
//非0元激活 不记账
boolean accountFlag = true;
if (StrUtil.equals(transInfoPo.getActivityTransFlag(), YesOrNotEnum.YES.getValue())) {
var terminalInfoPo = terminalInfoService.getById(transInfoPo.getDeviceSn());
var activityCashBackPo = activityCashBackService.getOne(new LambdaQueryWrapperX<ActivityCashBackPo>()
.eq(ActivityCashBackPo::getActivityCashNo, terminalInfoPo.getActivityCashNo())
.eq(ActivityCashBackPo::getPolicyNo, terminalInfoPo.getPolicyNo())
.eq(ActivityCashBackPo::getEffectiveStatus, EffectiveStatusEnum.IN_EFFECT.getValue())
);
if (activityCashBackPo != null) {
if (transInfoPo.getTransAmount().compareTo(activityCashBackPo.getActivityAmount()) == 0
&& activityCashBackPo.getActivityAmount().compareTo(BigDecimal.ZERO) > 0
) {
log.info("订单[{}]-[{}]非0元激活[{}]不记账", transOrderNo, transInfoPo.getTransAmount(), activityCashBackPo.getActivityAmount());
accountFlag = false;
}
}
}
if (accountFlag) {
var res = accountHandler.chargeAccount(transInfoPo);
AccountStatusEnum accountStatus = StrUtil.equals(res.getResult().getResCode(), "200") ? AccountStatusEnum.ACCOUNT_SUCCESS : AccountStatusEnum.ACCOUNT_FAIL;
if (res.hasResult()) {
transInfoService.lambdaUpdate()
.set(TransInfoPo::getAccountStatus, accountStatus.name())
.set(TransInfoPo::getAccountMsg, res.getResult().getResMsg())
.set(TransInfoPo::getAccountTime, new Date())
.eq(TransInfoPo::getTransOrderNo, transOrderNo)
.eq(TransInfoPo::getAccountStatus, AccountStatusEnum.UN_ACCOUNT)
.eq(TransInfoPo::getTransStatus, TransStatusEnum.SUCCESS.name())
.update();
//发送 交易记账成功主题:TRANS_ACCOUNT_SUCC
if (accountStatus == AccountStatusEnum.ACCOUNT_SUCCESS) {
var jsonObject = new JSONObject();
jsonObject.set("transOrderNo", transOrderNo);
producer.sendMessage(KafkaTopic.TRANS_ACCOUNT_SUCC, jsonObject.toString());
}
}
}
} catch (Exception e) {
log.error("订单[{}]交易成功记账消费异常:", transOrderNo, e);
}
log.info("订单[{}]交易成功记账消费结束1", transOrderNo);
ack.acknowledge();
log.info("订单[{}]交易成功记账消费结束2", transOrderNo);
}
2022-11-07 16:35:49.283 [org.springframework.kafka.KafkaListenerEndpointContainer#6-0-C-1] ERROR o.s.k.l.KafkaMessageListenerContainer - [error,149] - Error handler threw an exception
102 org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void cn.eeepay.payment.core.modules.kafka.consumer.TransSuccessAccountConsumer.onMsgReceive(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception; nested exception is org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.; nested exception is org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
103 at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:206)
104 at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:133)
105 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2625)
106 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2494)
107 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2405)
108 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2284)
109 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1958)
110 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1353)
111 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1344)
112 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1236)
113 at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
114 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
115 at java.base/java.lang.Thread.run(Thread.java:833)
//设置提交偏移量的方式, MANUAL_IMMEDIATE 表示消费一条提交一次;MANUAL表示批量提交一次 factory.getContainerProperties().setAckMode((ContainerProperties.AckMode.MANUAL_IMMEDIATE));
MANUAL_IMMEDIATE 这个提交的方式虽然是消费一次提交一次 但是kafka每一批都会拉取max.poll.records(默认500条数据) 比如说在消费第300条数据是手动提交的时候 (1-300条数据)消费的总时长超过了max.poll.interval.ms 就会被踢出消费者组
我这边也出现了相同的场景。。。应该也是这个原因
你的答案