Kafka客户端consumer无法消费到消息

旧象 发表于: 2017-07-27   最后更新时间: 2017-07-27 17:22:43   13,040 游览

用的是0.10.1.1的API中最简单的第一个例子
kafka版本0.10.1.1
生产者可以生产消息,用命令行也是可以消费消息的,但就是在代码中无法消费
运行发现ConsumerRecords都获取到了,但是接下来的
for (ConsumerRecord record : records) {//一些输出}
进不去了
不知道问题出在哪里了???望大神指点一下
具体的consumer代码,如下:
public class MyConsumer {

public static void main(String[] args) throws IOException {
    Consumer<String, String> consumer = KFKUtil.getConsumer();
    consumer.subscribe(Arrays.asList("my-topic"));

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(500);
        System.out.println(records);
        for (ConsumerRecord<String, String> record : records) {
            System.out.println("Message: " + record.value());
            System.out.println("Fetche message from partition " + record.partition() + ",offset = " + record.offset());
            // consumer.commitSync();//手动提交偏移量
        }
    }
}

}

发表于 2017-07-27
添加评论

我后面尝试了一下,先开启kafka消费者客户端,然后开启kafka生产者客户端,产生新的消息,发现这时的消费者客户端可以消费到新的消息,这里有点懵了。难道是偏移量的问题?前面没有消费到消息的情况是:我先打开Linux上的消费者,然后从windows上的生产者客户端生产消息到kafka集群,然后Linux上的消费者消费到了消息,然后再启动windows上的消费者客户端就消费不到消息了。请问是不是消息只能处理一次?consumer客户端如何控制从哪里开始消费消息呢?

半兽人 -> 旧象 7年前

长轮询拉取消息,poll(500),当没有消息时,阻塞在这里了。
根据消费者group,同一个group中,消息只会被其中一个消费者消费。不同的消费者组,均能收到消息。

auto.offset.reset 默认是lastest,即启动时offset为最新的,之前的消息不处理

auto.offset.reset 设置为earliest则会消费之前的消息

你的答案

查看kafka相关的其他问题或提一个您自己的问题