LL,最后是怎么解决的,现在我的情况和你一样,只有设置了分区和初始化的位置才能消费到消息
我给你的肯定可行的。
是不是这个消费者同名了,被其他人消费走了。你换个消费者组id试试。
参考:https://www.orchome.com/6794
给我的感觉就是@KafkaListener(id = Config.KAFKA_JSON_ID, topics = Config.KAFKA_JSON_TOPICS)
这个注解pull
不来消息,类似于请求参数不足,Kafka服务端没查到对应的数据,所以没消息返回。
还是消费不到。。
只有换成这样才行
@KafkaListener(id=Config.KAFKA_JSON_ID, topics = Config.KAFKA_JSON_TOPICS,
topicPartitions = {
@TopicPartition(topic = Config.KAFKA_JSON_TOPICS, partitionOffsets = @PartitionOffset(partition = "0" , initialOffset = "0")),
@TopicPartition(topic = Config.KAFKA_JSON_TOPICS, partitionOffsets = @PartitionOffset(partition = "1" , initialOffset = "0")),
@TopicPartition(topic = Config.KAFKA_JSON_TOPICS, partitionOffsets = @PartitionOffset(partition = "2" , initialOffset = "0"))
})
有点奇怪的是,返回的数据是成对的。
2021-04-30 12:40:01.371 INFO 44386 --- [consumer2-0-C-1] top.sync.kafka.kafka.SingleDataConsumer : topic.quick.consumer receive : ConsumerRecord(topic = binlog, partition = 2, leaderEpoch = 0, offset = 57, CreateTime = 1619757601362, serialized key size = -1, serialized value size = 60, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = "{\"event\":\"test.role.delete\",\"value\":[61,\"伟大\"]}")
2021-04-30 12:40:01.377 INFO 44386 --- [consumer2-0-C-1] top.sync.kafka.kafka.SingleDataConsumer : topic.quick.consumer receive : ConsumerRecord(topic = binlog, partition = 1, leaderEpoch = 0, offset = 59, CreateTime = 1619757601362, serialized key size = -1, serialized value size = 50, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"event":"test.role.delete","value":[61,"伟大"]})