kafka 在异常断开某一台服务器,会导致重复消费

根本你的方法,进行消息消费,但是当我开三台服务器去消费消费一个topic的时候,当我断开某一台服务器消费,那么会导致重复消费。
我建立了一个topic,并且建立了3个分区,我循环了1000次发送消息,但是输出却统计了1002条消息

 public static void main(String args[]) {
            String  topic = "dyq";
            if(args.length >0) {
                topic = args[0];
            }

            Properties props = new Properties();
            props.put("bootstrap.servers", "xxxxxxx:9092");
            props.put("auto.commit.interval.ms", "false");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("group.id", "test");


            KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
            List<String> subscribedTopics =  new ArrayList<String>();
            //如果需要订阅多个Topic,则在这里add进去即可
            //subscribedTopics.add(topic1);
            subscribedTopics.add(topic);
            consumer.subscribe(subscribedTopics);
            try {
                int index = 0;
                while(true) {
                    ConsumerRecords<String, String> records = consumer.poll(100);
                    for (TopicPartition partition : records.partitions()) {
                        List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                        for (ConsumerRecord<String, String> record : partitionRecords) {
                            index++;
                        }
                        long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                        //数据库操作操作
                        //....................
                        consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset - 1)));
                        System.out.println("我现在准备入库操作了  || 累计入库数 【" + index +"】" );
                    }
                }
            } finally {
              consumer.close();
            }
        }





发表于: 10月前   最后更新时间: 10月前   游览量:2397
上一条: 无法远程连接kafka写producer
下一条: kafka-stream运行报错

评论…


  • 你好,你的网站上有没有一个地方,是专门给你留言发问题的?现在都是在某篇文章下面给你留言的
    props.put("enable.auto.commit", "false");
    关闭自动提交
    • 其实我发现一个问题,  我们入库的代码,如果是放在这句consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset - 1)))代码之前 ,当刚好执行数据入库,服务器挂掉,那么我们kafka的消息就并未提交,那么其他服务器就会多提交一条消息入库。反之,如果我们把入库代码放在kafka消息提交之后,再进行提交,那么当我们执行完了kafka消息提交之后,还未在我能数据入库这条消息的时候挂掉,那么就会丢失这一条消息。请我我说的这个是对的吗?因为我在并发测试的时候,把入库代码放在kafka消息提交之后处理,就丢失了一条消息
        • 我现在能想到的,就是要么在数据库端去做判断,不提交重复数据。要么就同一个group中,不做多台服务器去处理。请问我这个说法是否正确,或者有其他方式呢?
            • 默认都认为是处理成功,如果失败,重新丢回kafka中。
              不管你做什么判断,第一是后面的程序一旦提交成功,你的offset就变了,并且你offset提交的顺序不规则,你逻辑全乱了。不仅仅是丢消息,重复消费等问题了。
              • 评论…
                • in this conversation