kafka V2.1.1 poll 方法过时

守护轻鸟 发表于: 2019-09-20   最后更新时间: 2019-09-21 12:36:57   3,608 游览

最近把kafka版本升到了2.1.1 版本,发现poll方法过时了,请问2.1.1版本拉取消息应该用哪个api?

kafka client pom

 <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.1.1</version>
</dependency>

项目中的用到的poll方法

 ConsumerRecords<K, V> records = kafkaConsumer.poll(10 * 1000);
发表于 2019-09-20
添加评论
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<string, string=""> consumer = new KafkaConsumer&lt;&gt;(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
    ConsumerRecords<string, string=""> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<string, string=""> record : records)
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
你的答案

查看kafka相关的其他问题或提一个您自己的问题