这个代码不对, 你应该使用kafkaConmsumer.endOffsets
来读取 LEO(更准确的说应该是ISR里的HW), 如若你要使用seekToEnd()
, 你在这个方法之前一定要先提前调用consumer.poll(0)
应消费的元数据保存在__consumer_offstes
中的位置, 在这之后, 你才可以正常使用seekToBegin/seekToEnd
不好意思,回复完了。确实是因为没有消息写进来,看似没有订阅成功,应该是kafka sdk里面的订阅是个lazy操作,已经可以每次开始消费都从最新的offset了,问题解决了。就是1看不懂
1、因为你们用了同一个名字,所以你指定的offset的就报了刚才的错误呀,不是其成员。
2、消费者启成功后,生产者是否有持续的消息写到该topic呢?