我有个人任务要从定时改为实时,有木有什么比较高效的办法能让我实时监听kafka的队列只要有新数据进来我就立刻拿走,谢谢
java客户端实例:
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }
其中
consumer.poll(100);
为长轮询,每个100ms去kafka里拉取消息,如果有,就将直接返回。
例子来自:https://www.orchome.com/451
谢谢,我现在就是这个干的了
结贴吧~!
我用的springboot自带的监听注解 @KafkaListener(topics = "test") ,这种是一监听到新消息就会收到消息,还是有个默认的拉取消息间隔时间?
找不到想要的答案?提一个您自己的问题。
0 声望
这家伙太懒,什么都没留下
java客户端实例:
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }
其中
为长轮询,每个100ms去kafka里拉取消息,如果有,就将直接返回。
例子来自:https://www.orchome.com/451
谢谢,我现在就是这个干的了
结贴吧~!
我用的springboot自带的监听注解 @KafkaListener(topics = "test") ,这种是一监听到新消息就会收到消息,还是有个默认的拉取消息间隔时间?
你的答案