如何实时监控kafka队列是否有新数据

克里斯蒂安 发表于: 2019-11-18   最后更新时间: 2019-11-18 14:21:40   3,357 游览

监控kafka队列是否有新数据进来

我有个人任务要从定时改为实时,有木有什么比较高效的办法能让我实时监听kafka的队列
只要有新数据进来我就立刻拿走,谢谢

发表于 2019-11-18

java客户端实例:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

其中

consumer.poll(100);

为长轮询,每个100ms去kafka里拉取消息,如果有,就将直接返回。

例子来自:https://www.orchome.com/451

谢谢,我现在就是这个干的了

结贴吧~!

星风 -> 半兽人 5年前

我用的springboot自带的监听注解 @KafkaListener(topics = "test") ,这种是一监听到新消息就会收到消息,还是有个默认的拉取消息间隔时间?

你的答案

查看kafka相关的其他问题或提一个您自己的问题