kafka 没有提交偏移量为什么还消费不到消息

安东尼的不二情 发表于: 2019-12-09   最后更新时间: 2019-12-09 18:19:55   2,640 游览

kafka 没有提交偏移量为什么还消费不到消息

我设置了手动提交偏移量,并且没有提交,但是为什么拉取不到消息,只能重启应用,或者重新加入group的时候才消费的到。

 Consumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Collections.singleton(topic));
        ConsumerRecords<String, String> records = null;
        while (true) {
            records = consumer.poll(Duration.ofMillis(8100));
            logger.info("size {}",records.count());

            records.forEach(record -> {
                boolean success = true;
                try {
                    OrderAction orderAction = processor.processor(new MQContent(record.value()));
                    if (orderAction == OrderAction.Suspend) {
                        success = false;
                    }
                } catch (Exception e) {
                    success = false;
                }
                if (success) {
                    //手动提交offset
                    TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
                    consumer.commitSync();
                }

            });
        }

    }
发表于 2019-12-09

是呀,kafka客户端已经将消息拉下来了,也丢给你了,你不提交offset,那客户端也不会重新去拉取了,它认为你在处理中。

哪有什么好点的重试策略吗

kafka客户端负责把消息丢给你的业务层,这个时候,该消息必须被提交,无论你业务层是否成功,而且kafka是基于offset下标的,如果后面的提交也会跳过之前的消息。另外设计第一原则是不要侵入到业务层。

如果业务处理这条消息失败,重新丢会队列或者其他的处理就可以了,简单不复杂。

你的答案

查看kafka相关的其他问题或提一个您自己的问题