kafka在异常断开某一台服务器,会导致重复消费

Mr丶Dong° 发表于: 2018-01-30   最后更新时间: 2022-09-27 18:44:15   5,737 游览

当我开三台服务器去消费消费一个topic的时候,当我断开某一台服务器消费,那么会导致重复消费。

我建立了一个topic,并且建立了3个分区,我循环了1000次发送消息,但是输出却统计了1002条消息:

public static void main(String args[]) {
    String  topic = "dyq";
    if(args.length >0) {
        topic = args[0];
    }

    Properties props = new Properties();
    props.put("bootstrap.servers", "xxxxxxx:9092");
    props.put("auto.commit.interval.ms", "false");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("group.id", "test");


    KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
    List<String> subscribedTopics =  new ArrayList<String>();
    //如果需要订阅多个Topic,则在这里add进去即可
    //subscribedTopics.add(topic1);
    subscribedTopics.add(topic);
    consumer.subscribe(subscribedTopics);
    try {
        int index = 0;
        while(true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                for (ConsumerRecord<String, String> record : partitionRecords) {
                    index++;
                }
                long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                //数据库操作操作
                //....................
                consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset - 1)));
                System.out.println("我现在准备入库操作了  || 累计入库数 【" + index +"】" );
            }
        }
    } finally {
      consumer.close();
    }
}
发表于 2018-01-30
添加评论

你好,你的网站上有没有一个地方,是专门给你留言发问题的?现在都是在某篇文章下面给你留言的

半兽人 -> 愚思 6年前

有问答区的呀。

愚思 -> 半兽人 6年前

哦哦,我以为那个只是你网站的滚动消息呢。好了,我在那里提了问题,你有空时给我解答下哈,谢谢啦

props.put("enable.auto.commit", "false");

关闭自动提交,

Mr丶Dong° -> 半兽人 6年前

其实我发现一个问题, 我们入库的代码,如果是放在这句

consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset - 1)))

代码之前 ,当刚好执行数据入库,服务器挂掉,那么我们kafka的消息就并未提交,那么其他服务器就会多提交一条消息入库。反之,如果我们把入库代码放在kafka消息提交之后,再进行提交,那么当我们执行完了kafka消息提交之后,还未在我能数据入库这条消息的时候挂掉,那么就会丢失这一条消息。请我我说的这个是对的吗?因为我在并发测试的时候,把入库代码放在kafka消息提交之后处理,就丢失了一条消息。

半兽人 -> Mr丶Dong° 6年前

是这样的,后成功的会跳过这个offset。

Mr丶Dong° -> 半兽人 6年前

那么这个问题是没办法处理的吗?要么会多消费一条数据,要么会少消费一条数据~~

半兽人 -> Mr丶Dong° 6年前

没办法的,原理如此。

Mr丶Dong° -> 半兽人 6年前

我现在能想到的,就是要么在数据库端去做判断,不提交重复数据。要么就同一个group中,不做多台服务器去处理。请问我这个说法是否正确,或者有其他方式呢?

半兽人 -> Mr丶Dong° 6年前

默认都认为是处理成功,如果失败,重新丢回kafka中。

不管你做什么判断,后面的程序一旦提交成功,你的offset就变了,并且你offset提交的顺序不规则,你逻辑全乱了。不仅仅是丢消息,重复消费等问题了。

你的答案

查看kafka相关的其他问题或提一个您自己的问题