使用注解@KafkaListener(topics= "binlog", id="consumer")
消费不到消息
但是使用
@KafkaListener(topics = "binlog", id="consumer",
topicPartitions = {
@TopicPartition(topic = "binlog", partitionOffsets = @PartitionOffset(partition = "0" , initialOffset = "0"))
})
这种写法可以获取到,但是每次consumer重启都从offset 0 开始抓数据,各位大神帮忙看看什么原因要设置offset之后才能正常获取消息,有没有办法让他自动获取最大的偏移量开始接收消息.困扰两天了
依赖环境:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>org.example</groupId>
<artifactId>kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!--kafka-->
<!-- <dependency>-->
<!-- <groupId>org.springframework.kafka</groupId>-->
<!-- <artifactId>spring-kafka</artifactId>-->
<!-- <version>2.2.4.RELEASE</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.4.1.RELEASE</version>
</dependency>
<!-- elasticsearch http api client -->
<dependency>
<groupId>io.searchbox</groupId>
<artifactId>jest</artifactId>
<version>5.3.3</version>
</dependency>
<!--fastjson-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
代码
/**
*实现单数据消费逻辑
*/
@Component
public class SingleDataConsumer {
private static final Logger logger = LoggerFactory.getLogger(SingleDataConsumer.class);
@KafkaListener(topics = Config.KAFKA_JSON_TOPICS)
public void listener(ConsumerRecord<?, ?> record){
logger.info("topic.quick.consumer receive : " + record.toString());
}
}
配置信心
#=============== consumer =======================
# 指定默认消费者group id
spring.kafka.consumer.group-id=consumer
#spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
#spring.kafka.consumer.auto-commit-interval=100
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
LL,最后是怎么解决的,现在我的情况和你一样,只有设置了分区和初始化的位置才能消费到消息
你把这个删了试试...
initialOffset = "0"
如果也可以,那你最后把
partitionOffsets = @PartitionOffset(partition = "0" , initialOffset = "0")
整段都删除了,这里指定了分区,和初始化的位置。
@PartitionOffset需要两个参数,不能删...
注解这样使用也不行
@KafkaListener(topics = Config.KAFKA_JSON_TOPICS, id=Config.KAFKA_JSON_ID, topicPartitions = { @TopicPartition(topic = Config.KAFKA_JSON_TOPICS, partitions = {"0", "1", "2"}) })
@KafkaListener(id = Config.KAFKA_JSON_ID, topics = Config.KAFKA_JSON_TOPICS)
换成这样吧。
还是消费不到。。
只有换成这样才行
@KafkaListener(id=Config.KAFKA_JSON_ID, topics = Config.KAFKA_JSON_TOPICS, topicPartitions = { @TopicPartition(topic = Config.KAFKA_JSON_TOPICS, partitionOffsets = @PartitionOffset(partition = "0" , initialOffset = "0")), @TopicPartition(topic = Config.KAFKA_JSON_TOPICS, partitionOffsets = @PartitionOffset(partition = "1" , initialOffset = "0")), @TopicPartition(topic = Config.KAFKA_JSON_TOPICS, partitionOffsets = @PartitionOffset(partition = "2" , initialOffset = "0")) })
有点奇怪的是,返回的数据是成对的。
2021-04-30 12:40:01.371 INFO 44386 --- [consumer2-0-C-1] top.sync.kafka.kafka.SingleDataConsumer : topic.quick.consumer receive : ConsumerRecord(topic = binlog, partition = 2, leaderEpoch = 0, offset = 57, CreateTime = 1619757601362, serialized key size = -1, serialized value size = 60, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = "{\"event\":\"test.role.delete\",\"value\":[61,\"伟大\"]}") 2021-04-30 12:40:01.377 INFO 44386 --- [consumer2-0-C-1] top.sync.kafka.kafka.SingleDataConsumer : topic.quick.consumer receive : ConsumerRecord(topic = binlog, partition = 1, leaderEpoch = 0, offset = 59, CreateTime = 1619757601362, serialized key size = -1, serialized value size = 50, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"event":"test.role.delete","value":[61,"伟大"]})
给我的感觉就是
@KafkaListener(id = Config.KAFKA_JSON_ID, topics = Config.KAFKA_JSON_TOPICS)
这个注解pull
不来消息,类似于请求参数不足,Kafka服务端没查到对应的数据,所以没消息返回。我给你的肯定可行的。
是不是这个消费者同名了,被其他人消费走了。你换个消费者组id试试。
参考:https://www.orchome.com/6794
有道理,我排查一下。
您好,我目前也是遇到和楼主一样的问题,只有设置
topicPartitions = { @TopicPartition(topic = Config.KAFKA_JSON_TOPICS, partitionOffsets = @PartitionOffset(partition = "0" , initialOffset = "0")), @TopicPartition(topic = Config.KAFKA_JSON_TOPICS, partitionOffsets = @PartitionOffset(partition = "1" , initialOffset = "0")), @TopicPartition(topic = Config.KAFKA_JSON_TOPICS, partitionOffsets = @PartitionOffset(partition = "2" , initialOffset = "0")) }
才会消费消息,生产消息是可以的
你去掉
initialOffset = "0"
,在给消费者换个名字:spring.kafka.consumer.group-id=consumer
然后你继续生产,看看是否消费的到。
用命令工具,查看你这个消费者的具体情况:
## 显示某个消费组的消费详情(仅支持offset存储在zookeeper上的) bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test ## 显示某个消费组的消费详情(0.9版本 - 0.10.1.0 之前) bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group ## 显示某个消费组的消费详情(0.10.1.0版本+) bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
命令来自:kafka命令大全
2.4.1 版本的PartitionOffset是需要这个initialOffset参数的
这个参数的意思是什么?
定义从第几个开始往下消费消息吧
对呀,你都知道是定义消费者的位置,那你永远都从最早的开始。
另外,你有什么特殊需求,要指定消费某个分区吗?
你就按照上面的解决方式,不要指定分区了。
你的答案