springboot+kafka @KafkaListener方式消费,偶尔有重复的消息?

请叫我马里奥 发表于: 2019-01-05   最后更新时间: 2019-01-05 18:14:29   20,640 游览
  • 版本
    spring-boot :2.1.0.RELEASE
    spring-kafka:2.2.0.RELEASE
    kafka版本:kafka_2.12-2.1.0,一台机器启用3个实例

  • 配置

    @Bean
    public KafkaConsumer listener(){
        return new KafkaConsumer();
    }

    @Bean
    public KafkaListenerContainerFactory messageListenerContainer(){
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConcurrency(1);
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory consumerFactory(){
        Map<String,Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,url);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"zlink");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"100");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");

        DefaultKafkaConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory(props);
        return consumerFactory;
    }


    @Bean
    public ProducerFactory producerFactory(){
        Map<String,Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,url);
        props.put(ProducerConfig.ACKS_CONFIG,"all");
        props.put(ProducerConfig.RETRIES_CONFIG,0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG,1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");


        return new DefaultKafkaProducerFactory(props);
    }

    @Bean
    public KafkaTemplate kafkaTemplate(){
        return new KafkaTemplate(producerFactory());
    }

    @Bean
    public NewTopic topic(){
        return new NewTopic("zlink-device-online",3, (short) 3);
    }

    @Bean
    public KafkaAdmin kafkaAdmin(){
        Map<String,Object> config = new HashMap<>();
        config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,url);
        return new KafkaAdmin(config);
    }
  • listener
@KafkaListener(topics = ConstantCode.KAFKA_TOPIC_DEVICE_ONLINE)
    public void listen(ConsumerRecord<String,String> record){
        String data = record.value();
        Integer partition = record.partition();
        Long timestamp = record.timestamp();
        Long offset = record.offset();
        log.info("kafka consumer message:{};partition:{};offset:{};timestamp:{}",data,partition,offset,timestamp);
        gatewayService.gatewayOnline(data);
    }

问题:
gatewayService.gatewayOnline(data)处理消息需要800ms的时间。
第一步:发送500条消息
第二步:过十分钟后在发送500条消息
之后会隔两三分钟发送500条消息,总发送12次的500条消息,最后的结果是第二步所有消息会收到两次,其他的都是一次,请问一下会是什么原因导致的?应该怎么解决?

发表于 2019-01-05
添加评论

springboot没用过,有些代码看不懂,我先大概猜测下。
自动提交offset时,gatewayService.gatewayOnline(data); 如果抛异常,listen会不会重新来,导致没提交成功。

从日志看没有报错信息,如果有异常,应该会打印的

已经解决了,中间有一个重新分配分区的日志,之前一直没看到。原因就是一次获取500条消息的处理时间大于了max.poll.interval.ms这个值

你的答案

查看kafka相关的其他问题或提一个您自己的问题