最近把kafka版本升到了2.1.1 版本,发现poll方法过时了,请问2.1.1版本拉取消息应该用哪个api?
kafka client pom
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>
项目中的用到的poll方法
ConsumerRecords<K, V> records = kafkaConsumer.poll(10 * 1000);
Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("group.id", "test"); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<string, string=""> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); while (true) { ConsumerRecords<string, string=""> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<string, string=""> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }
你的答案