使用注解@KafkaListener(topics= {"binlog"}, id="consumer") 消费不到消息。

古塔山后的晓霞 发表于: 2021-04-30   最后更新时间: 2021-04-30 12:05:37   5,354 游览

使用注解@KafkaListener(topics= "binlog", id="consumer")消费不到消息

但是使用

@KafkaListener(topics = "binlog", id="consumer",
            topicPartitions = {
                @TopicPartition(topic = "binlog", partitionOffsets = @PartitionOffset(partition = "0" , initialOffset = "0"))
            })

这种写法可以获取到,但是每次consumer重启都从offset 0 开始抓数据,各位大神帮忙看看什么原因要设置offset之后才能正常获取消息,有没有办法让他自动获取最大的偏移量开始接收消息.困扰两天了

依赖环境:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.1</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>org.example</groupId>
    <artifactId>kafka</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <!--kafka-->
<!--        <dependency>-->
<!--            <groupId>org.springframework.kafka</groupId>-->
<!--            <artifactId>spring-kafka</artifactId>-->
<!--            <version>2.2.4.RELEASE</version>-->
<!--        </dependency>-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.4.1.RELEASE</version>
        </dependency>
        <!-- elasticsearch http api client -->
        <dependency>
            <groupId>io.searchbox</groupId>
            <artifactId>jest</artifactId>
            <version>5.3.3</version>
        </dependency>
        <!--fastjson-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.51</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

代码

/**
 *实现单数据消费逻辑
 */
@Component
public class SingleDataConsumer {

    private static final Logger logger =     LoggerFactory.getLogger(SingleDataConsumer.class);

    @KafkaListener(topics = Config.KAFKA_JSON_TOPICS)
    public void listener(ConsumerRecord<?, ?> record){
        logger.info("topic.quick.consumer receive : " + record.toString());
    }
}

配置信心

#=============== consumer  =======================
# 指定默认消费者group id
spring.kafka.consumer.group-id=consumer
#spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
#spring.kafka.consumer.auto-commit-interval=100
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
添加评论

LL,最后是怎么解决的,现在我的情况和你一样,只有设置了分区和初始化的位置才能消费到消息

你把这个删了试试...

initialOffset = "0"

如果也可以,那你最后把

 partitionOffsets = @PartitionOffset(partition = "0" , initialOffset = "0")

整段都删除了,这里指定了分区,和初始化的位置。

@PartitionOffset需要两个参数,不能删...
注解这样使用也不行

@KafkaListener(topics = Config.KAFKA_JSON_TOPICS, id=Config.KAFKA_JSON_ID,
            topicPartitions = {
                @TopicPartition(topic = Config.KAFKA_JSON_TOPICS, partitions = {"0", "1", "2"})
            })
@KafkaListener(id = Config.KAFKA_JSON_ID, topics = Config.KAFKA_JSON_TOPICS)

换成这样吧。

还是消费不到。。
只有换成这样才行

@KafkaListener(id=Config.KAFKA_JSON_ID, topics = Config.KAFKA_JSON_TOPICS,
topicPartitions = {
    @TopicPartition(topic = Config.KAFKA_JSON_TOPICS, partitionOffsets = @PartitionOffset(partition = "0" , initialOffset = "0")),
        @TopicPartition(topic = Config.KAFKA_JSON_TOPICS, partitionOffsets = @PartitionOffset(partition = "1" , initialOffset = "0")),
        @TopicPartition(topic = Config.KAFKA_JSON_TOPICS, partitionOffsets = @PartitionOffset(partition = "2" , initialOffset = "0"))
})

有点奇怪的是,返回的数据是成对的。

2021-04-30 12:40:01.371  INFO 44386 --- [consumer2-0-C-1] top.sync.kafka.kafka.SingleDataConsumer  : topic.quick.consumer receive : ConsumerRecord(topic = binlog, partition = 2, leaderEpoch = 0, offset = 57, CreateTime = 1619757601362, serialized key size = -1, serialized value size = 60, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = "{\"event\":\"test.role.delete\",\"value\":[61,\"伟大\"]}")
2021-04-30 12:40:01.377  INFO 44386 --- [consumer2-0-C-1] top.sync.kafka.kafka.SingleDataConsumer  : topic.quick.consumer receive : ConsumerRecord(topic = binlog, partition = 1, leaderEpoch = 0, offset = 59, CreateTime = 1619757601362, serialized key size = -1, serialized value size = 50, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"event":"test.role.delete","value":[61,"伟大"]})

给我的感觉就是@KafkaListener(id = Config.KAFKA_JSON_ID, topics = Config.KAFKA_JSON_TOPICS)这个注解pull不来消息,类似于请求参数不足,Kafka服务端没查到对应的数据,所以没消息返回。

我给你的肯定可行的。
是不是这个消费者同名了,被其他人消费走了。你换个消费者组id试试。
参考:https://www.orchome.com/6794

有道理,我排查一下。

-> 半兽人 2年前

您好,我目前也是遇到和楼主一样的问题,只有设置

topicPartitions = {
    @TopicPartition(topic = Config.KAFKA_JSON_TOPICS, partitionOffsets = @PartitionOffset(partition = "0" , initialOffset = "0")),
        @TopicPartition(topic = Config.KAFKA_JSON_TOPICS, partitionOffsets = @PartitionOffset(partition = "1" , initialOffset = "0")),
        @TopicPartition(topic = Config.KAFKA_JSON_TOPICS, partitionOffsets = @PartitionOffset(partition = "2" , initialOffset = "0"))
}

才会消费消息,生产消息是可以的

半兽人 -> 2年前

你去掉initialOffset = "0",在给消费者换个名字:

spring.kafka.consumer.group-id=consumer

然后你继续生产,看看是否消费的到。

用命令工具,查看你这个消费者的具体情况:

## 显示某个消费组的消费详情(仅支持offset存储在zookeeper上的)
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test

## 显示某个消费组的消费详情(0.9版本 - 0.10.1.0 之前)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group

## 显示某个消费组的消费详情(0.10.1.0版本+)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

命令来自:kafka命令大全

-> 半兽人 2年前

2.4.1 版本的PartitionOffset是需要这个initialOffset参数的

半兽人 -> 2年前

这个参数的意思是什么?

-> 半兽人 2年前

定义从第几个开始往下消费消息吧

半兽人 -> 2年前

对呀,你都知道是定义消费者的位置,那你永远都从最早的开始。

另外,你有什么特殊需求,要指定消费某个分区吗?
你就按照上面的解决方式,不要指定分区了。

你的答案

查看kafka相关的其他问题或提一个您自己的问题