。莪

0 声望

这家伙太懒,什么都没留下

个人动态
  • 赞了 。莪kafka 消费者组 10个消费者分布在不同的机器上,因为机器上线是串行的,发生重复消费。 的评论!

    目前看我对应分区的偏移量正确提交了,成功日志正常打印了。

    public void run() {
        // 设置线程Name
        threadName = Thread.currentThread().getName() + BuildUtils.buildUniqueNo();
        Thread.currentThread().setName(threadName);
        LOGGER.info("UserComponentConsumer init threadName:{}!", threadName);
        try {
            // 监控消费者
            Thread.sleep(50);
            // 订阅Topic
            consumer.subscribe(Collections.singletonList(componentGroupConfig.getTopic()), new UserComponentRebalanceHandler());
            // init 偏移量
            consumerInitOffset(new ArrayList<>(consumer.assignment()), consumer);
            // 循环接收消息
            while (consumerOpenFlag && !Thread.currentThread().isInterrupted()) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
                for (TopicPartition partition : records.partitions()) {
                    // 依次处理分区批量消息
                    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                    long firstOffset = partitionRecords.get(0).offset(); // 分区批量消息 头偏移量
                    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); // 分区批量消息 尾偏移量
                    try {
                        // 消息解析
                        messageDispose(partitionRecords);
                    } catch (Exception exception) {
                        LOGGER.error("UserComponentConsumer messageDispose threadName:{} topic:{}, partition:{} firstOffset:{} lastOffset: {} error:{}", threadName, partition.topic(), partition.partition(), firstOffset, lastOffset, exception);
                    }
                    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset))); // 同步提交
                    LOGGER.info("UserComponentConsumer threadName:{} topic:{}, partition:{} firstOffset:{} lastOffset: {}, GroupId:{} !!", threadName, partition.topic(), partition.partition(), firstOffset, lastOffset, componentGroupConfig.getGroupId());
                }
            }
        } catch (WakeupException ex) {
            LOGGER.error("UserComponentConsumer threadName:{} WakeupException:{}", threadName, ex);
        } catch (InterruptedException ex) {
            LOGGER.error("UserComponentConsumer threadName:{} current thread of Interrupted, InterruptedException:{} !!", threadName, ex);
            // 恢复线程的中断状态
            Thread.currentThread().interrupt();
        } catch (Exception ex) {
            LOGGER.error("UserComponentConsumer threadName:{} Exception error:{} ", threadName, ex);
        } finally {
            // 消费者退出
            consumer.close();
        }
    }
    
    11月前
  • 。莪 赞了 在 kafka 消费者组 10个消费者分布在不同的机器上,因为机器上线是串行的,发生重复消费。 的评论!

    目前看我对应分区的偏移量正确提交了,成功日志正常打印了。

    public void run() {
        // 设置线程Name
        threadName = Thread.currentThread().getName() + BuildUtils.buildUniqueNo();
        Thread.currentThread().setName(threadName);
        LOGGER.info("UserComponentConsumer init threadName:{}!", threadName);
        try {
            // 监控消费者
            Thread.sleep(50);
            // 订阅Topic
            consumer.subscribe(Collections.singletonList(componentGroupConfig.getTopic()), new UserComponentRebalanceHandler());
            // init 偏移量
            consumerInitOffset(new ArrayList<>(consumer.assignment()), consumer);
            // 循环接收消息
            while (consumerOpenFlag && !Thread.currentThread().isInterrupted()) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
                for (TopicPartition partition : records.partitions()) {
                    // 依次处理分区批量消息
                    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                    long firstOffset = partitionRecords.get(0).offset(); // 分区批量消息 头偏移量
                    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); // 分区批量消息 尾偏移量
                    try {
                        // 消息解析
                        messageDispose(partitionRecords);
                    } catch (Exception exception) {
                        LOGGER.error("UserComponentConsumer messageDispose threadName:{} topic:{}, partition:{} firstOffset:{} lastOffset: {} error:{}", threadName, partition.topic(), partition.partition(), firstOffset, lastOffset, exception);
                    }
                    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset))); // 同步提交
                    LOGGER.info("UserComponentConsumer threadName:{} topic:{}, partition:{} firstOffset:{} lastOffset: {}, GroupId:{} !!", threadName, partition.topic(), partition.partition(), firstOffset, lastOffset, componentGroupConfig.getGroupId());
                }
            }
        } catch (WakeupException ex) {
            LOGGER.error("UserComponentConsumer threadName:{} WakeupException:{}", threadName, ex);
        } catch (InterruptedException ex) {
            LOGGER.error("UserComponentConsumer threadName:{} current thread of Interrupted, InterruptedException:{} !!", threadName, ex);
            // 恢复线程的中断状态
            Thread.currentThread().interrupt();
        } catch (Exception ex) {
            LOGGER.error("UserComponentConsumer threadName:{} Exception error:{} ", threadName, ex);
        } finally {
            // 消费者退出
            consumer.close();
        }
    }
    
    11月前
  • 赞了 。莪kafka 消费者组 10个消费者分布在不同的机器上,因为机器上线是串行的,发生重复消费。 的评论!

    kafka消费者是批量拉取消息的,比如一次拉取2000条。

    1. 如果你拉取了2000条,直接提交offset,那如果你消费者消费第1000条,报错了,那么就会丢失1000条。
    2. 如果你拉取了2000条,想先消费在提交offset,那当你先消费了1000条时,报错了,重启消费者后会重新消费这2000条,因为你没有提交offset,那么会重新消费者2000条,就出现重复消费。

    说到这里,简单的消费者逻辑你已经理解了,问题的核心在于你什么时候提交这个offset。

    11月前
  • 。莪 回复 半兽人kafka 消费者组 10个消费者分布在不同的机器上,因为机器上线是串行的,发生重复消费。 中 :

    目前看我对应分区的偏移量正确提交了,成功日志正常打印了。

    public void run() {
        // 设置线程Name
        threadName = Thread.currentThread().getName() + BuildUtils.buildUniqueNo();
        Thread.currentThread().setName(threadName);
        LOGGER.info("UserComponentConsumer init threadName:{}!", threadName);
        try {
            // 监控消费者
            Thread.sleep(50);
            // 订阅Topic
            consumer.subscribe(Collections.singletonList(componentGroupConfig.getTopic()), new UserComponentRebalanceHandler());
            // init 偏移量
            consumerInitOffset(new ArrayList<>(consumer.assignment()), consumer);
            // 循环接收消息
            while (consumerOpenFlag && !Thread.currentThread().isInterrupted()) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
                for (TopicPartition partition : records.partitions()) {
                    // 依次处理分区批量消息
                    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                    long firstOffset = partitionRecords.get(0).offset(); // 分区批量消息 头偏移量
                    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); // 分区批量消息 尾偏移量
                    try {
                        // 消息解析
                        messageDispose(partitionRecords);
                    } catch (Exception exception) {
                        LOGGER.error("UserComponentConsumer messageDispose threadName:{} topic:{}, partition:{} firstOffset:{} lastOffset: {} error:{}", threadName, partition.topic(), partition.partition(), firstOffset, lastOffset, exception);
                    }
                    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset))); // 同步提交
                    LOGGER.info("UserComponentConsumer threadName:{} topic:{}, partition:{} firstOffset:{} lastOffset: {}, GroupId:{} !!", threadName, partition.topic(), partition.partition(), firstOffset, lastOffset, componentGroupConfig.getGroupId());
                }
            }
        } catch (WakeupException ex) {
            LOGGER.error("UserComponentConsumer threadName:{} WakeupException:{}", threadName, ex);
        } catch (InterruptedException ex) {
            LOGGER.error("UserComponentConsumer threadName:{} current thread of Interrupted, InterruptedException:{} !!", threadName, ex);
            // 恢复线程的中断状态
            Thread.currentThread().interrupt();
        } catch (Exception ex) {
            LOGGER.error("UserComponentConsumer threadName:{} Exception error:{} ", threadName, ex);
        } finally {
            // 消费者退出
            consumer.close();
        }
    }
    
    11月前
  • 半兽人 回复 。莪kafka 消费者组 10个消费者分布在不同的机器上,因为机器上线是串行的,发生重复消费。 中 :

    没有那么多玄幻的情况,首先你要确保你提交的offset是正确的分区并且是成功的。

    11月前
  • 。莪 回复 半兽人kafka 消费者组 10个消费者分布在不同的机器上,因为机器上线是串行的,发生重复消费。 中 :

    我是通过一个线程包装成一个消费者,在关闭程序的时候是通过优雅停机,在线程里面循环判断是否中断,再退出消费者,正常情况下都是这批消息处理完,提交偏移量之后,下一次循环进行退出,理论上不应该出现重复数据。

    11月前
  • 。莪 回复 半兽人kafka 消费者组 10个消费者分布在不同的机器上,因为机器上线是串行的,发生重复消费。 中 :

    我是拉取2000条,全部处理完成之后再进行偏移量提交。

    11月前
  • 半兽人 回复 。莪kafka 消费者组 10个消费者分布在不同的机器上,因为机器上线是串行的,发生重复消费。 中 :

    kafka消费者是批量拉取消息的,比如一次拉取2000条。

    1. 如果你拉取了2000条,直接提交offset,那如果你消费者消费第1000条,报错了,那么就会丢失1000条。
    2. 如果你拉取了2000条,想先消费在提交offset,那当你先消费了1000条时,报错了,重启消费者后会重新消费这2000条,因为你没有提交offset,那么会重新消费者2000条,就出现重复消费。

    说到这里,简单的消费者逻辑你已经理解了,问题的核心在于你什么时候提交这个offset。

    11月前