用的是0.10.1.1的API中最简单的第一个例子
kafka版本0.10.1.1
生产者可以生产消息,用命令行也是可以消费消息的,但就是在代码中无法消费
运行发现ConsumerRecords都获取到了,但是接下来的
for (ConsumerRecord
进不去了
不知道问题出在哪里了???望大神指点一下
具体的consumer代码,如下:
public class MyConsumer {
public static void main(String[] args) throws IOException {
Consumer<String, String> consumer = KFKUtil.getConsumer();
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(500);
System.out.println(records);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Message: " + record.value());
System.out.println("Fetche message from partition " + record.partition() + ",offset = " + record.offset());
// consumer.commitSync();//手动提交偏移量
}
}
}
}
我后面尝试了一下,先开启kafka消费者客户端,然后开启kafka生产者客户端,产生新的消息,发现这时的消费者客户端可以消费到新的消息,这里有点懵了。难道是偏移量的问题?前面没有消费到消息的情况是:我先打开Linux上的消费者,然后从windows上的生产者客户端生产消息到kafka集群,然后Linux上的消费者消费到了消息,然后再启动windows上的消费者客户端就消费不到消息了。请问是不是消息只能处理一次?consumer客户端如何控制从哪里开始消费消息呢?
长轮询拉取消息,poll(500),当没有消息时,阻塞在这里了。
根据消费者group,同一个group中,消息只会被其中一个消费者消费。不同的消费者组,均能收到消息。
auto.offset.reset 默认是lastest,即启动时offset为最新的,之前的消息不处理
auto.offset.reset 设置为earliest则会消费之前的消息
你的答案