kafka指定消费,查不出数据

ighack 发表于: 2017-04-14   最后更新时间: 2017-04-14 09:44:43   8,484 游览

代码如下。查不出数据recods没有内容

        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("group.id", groupId);
        props.put("enable.auto.commit", false);
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("max.poll.records", maxPollRecords);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //consumer.subscribe(Arrays.asList(topic));

        TopicPartition p = new TopicPartition(topic,2);
        consumer.assign(Arrays.asList(p));
        consumer.seek(p,488430);
        //while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                String V = record.value();
            }
       // }
发表于 2017-04-14
添加评论
ighack -> 半兽人 7年前

我说的不是在正常情况下,无法消费消息。而在指定一个offset的时候。执行完

ConsumerRecords<String, String> records = consumer.poll(100);
records里没有数据。
如果不是指定消费的话。还是可以正常取值的
半兽人 -> ighack 7年前

consumer.seek(p,488430);
这句去掉呢?指定的offset位置不在你指定分区上呢?

ighack -> 半兽人 7年前

我确定在这个分区,并且在这个offset上面。这是我在生产端得到的回应。并且前期我也验证过。在生产端得到的回应。和消费端取出的消息是对应的。出就是说在

Future<RecordMetadata> meat = producer.send(new ProducerRecord<String, String>(topic, Integer.toString(i),  Integer.toString(i)));
RecordMetadata m = meat.get();
m.partition();
m.offset()
和消费端
record.partition()
record.offset()
是对应的

半兽人 -> ighack 7年前

我的意思是 你不指定offset的时候,能不能消费到

ighack -> 半兽人 7年前

经我在次确认。不指定offset可以正常取值。如果指定offset就没有办法取值了

半兽人 -> ighack 7年前

我怀疑的就是在指定分区的情况,kafka并没有支持这一点。

ighack -> 半兽人 7年前
  1. TopicPartition p = new TopicPartition(topic,2);

  2.         consumer.assign(Arrays.asList(p));

  3.         consumer.seek(p,488430);
  4. 这不就是为了指定一个分区上的offset。我看到这个方法说明指了以后。就是为了下次的
  5. consumer.poll(100)
冥佳 -> ighack 7年前

把while循环打开就可以拿到数据了,反正我是这样实现的!

ighack -> 冥佳 7年前

是可以消费数据。而我发现第一次consumer.poll(100)就取不到的,第二次才可以取到数据。不知道为什么

你的答案

查看kafka相关的其他问题或提一个您自己的问题