kafka 消费者组 10个消费者分布在不同的机器上,因为机器上线是串行的,发生重复消费。串行上线机器A,在A机器上的消费者先退出再加入,这个时候导致了再均衡。提交方式=手动提交。消费者:会分配多个分区。 问题: 消费者组里面的消费者重复的原因。 kafka再均衡过程中,正在拉去数据 | 正在处理处理 | 正在提交数据 不同状态下的消费者是怎么参与再均衡的。
kafka消费者是批量拉取消息的,比如一次拉取2000条。
说到这里,简单的消费者逻辑你已经理解了,问题的核心在于你什么时候提交这个offset。
我是拉取2000条,全部处理完成之后再进行偏移量提交。
我是通过一个线程包装成一个消费者,在关闭程序的时候是通过优雅停机,在线程里面循环判断是否中断,再退出消费者,正常情况下都是这批消息处理完,提交偏移量之后,下一次循环进行退出,理论上不应该出现重复数据。
没有那么多玄幻的情况,首先你要确保你提交的offset是正确的分区并且是成功的。
目前看我对应分区的偏移量正确提交了,成功日志正常打印了。
public void run() { // 设置线程Name threadName = Thread.currentThread().getName() + BuildUtils.buildUniqueNo(); Thread.currentThread().setName(threadName); LOGGER.info("UserComponentConsumer init threadName:{}!", threadName); try { // 监控消费者 Thread.sleep(50); // 订阅Topic consumer.subscribe(Collections.singletonList(componentGroupConfig.getTopic()), new UserComponentRebalanceHandler()); // init 偏移量 consumerInitOffset(new ArrayList<>(consumer.assignment()), consumer); // 循环接收消息 while (consumerOpenFlag && !Thread.currentThread().isInterrupted()) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500)); for (TopicPartition partition : records.partitions()) { // 依次处理分区批量消息 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); long firstOffset = partitionRecords.get(0).offset(); // 分区批量消息 头偏移量 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); // 分区批量消息 尾偏移量 try { // 消息解析 messageDispose(partitionRecords); } catch (Exception exception) { LOGGER.error("UserComponentConsumer messageDispose threadName:{} topic:{}, partition:{} firstOffset:{} lastOffset: {} error:{}", threadName, partition.topic(), partition.partition(), firstOffset, lastOffset, exception); } consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset))); // 同步提交 LOGGER.info("UserComponentConsumer threadName:{} topic:{}, partition:{} firstOffset:{} lastOffset: {}, GroupId:{} !!", threadName, partition.topic(), partition.partition(), firstOffset, lastOffset, componentGroupConfig.getGroupId()); } } } catch (WakeupException ex) { LOGGER.error("UserComponentConsumer threadName:{} WakeupException:{}", threadName, ex); } catch (InterruptedException ex) { LOGGER.error("UserComponentConsumer threadName:{} current thread of Interrupted, InterruptedException:{} !!", threadName, ex); // 恢复线程的中断状态 Thread.currentThread().interrupt(); } catch (Exception ex) { LOGGER.error("UserComponentConsumer threadName:{} Exception error:{} ", threadName, ex); } finally { // 消费者退出 consumer.close(); } }
你的答案