KafkaUtils.getKafkaConsumer().poll(100) 卡住就不动了,感觉是阻塞了,也没有报错,请问是什么原因呢

专心大萝卜 发表于: 2020-07-09   最后更新时间: 2020-07-09 20:12:49   2,438 游览

提问说明

代码执行到ConsumerRecords<String, String> records = KafkaUtils.getKafkaConsumer().poll(100);

就不动了,感觉是阻塞了,也没有报错,请问是什么原因呢

 Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.101.176:2181,192.168.101.177:2181,192.168.101.178:2181");// 服务器ip:端口号,集群用逗号分隔
        //props.put("zookeeper.connect", "192.168.101.176:2181,192.168.101.177:2181,192.168.101.178:2181");// 服务器ip:端口号,集群用逗号分隔
//        props.put("zookeeper.session.timeout.ms", "4000");
//        props.put("zookeeper.sync.time.ms", "200");
        //props.put("bootstrap.servers", "192.168.106.28:9092");// 服务器ip:端口号,集群用逗号分隔
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("auto.offset.reset","earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        consumer = new KafkaConsumer<String,String>(props);
        consumer.subscribe(Arrays.asList("groupid-gwkeepalive"));
发表于 2020-07-09
添加评论

poll(100)方法是长轮询,持续向kafka拉取消息,永远不会结束,是正确的。
可参考:https://www.orchome.com/451

你的答案

查看kafka相关的其他问题或提一个您自己的问题