我想在开始生产和消费之前确保kafka服务是否正在运行。kafka服务运行在windows中,这是我在eclipse中的kafka服务器代码......
Properties properties = new Properties();
properties.setProperty("broker.id", "1");
properties.setProperty("port", "9092");
properties.setProperty("log.dirs", "D://workspace//");
properties.setProperty("zookeeper.connect", "localhost:2181");
Option<String> option = Option.empty();
KafkaConfig config = new KafkaConfig(properties);
KafkaServer kafka = new KafkaServer(config, new CurrentTime(), option);
kafka.startup();
在这种情况下,如果(kafka != null)
是不够的,因为它始终是true
的。那么有没有什么方法可以知道我的kafka服务器正在运行,并准备好接收生产者。我有必要检查这一点,因为这会导致一些起始数据包的丢失。
使用AdminClient API
Properties properties = new Properties(); properties.put("bootstrap.servers", "localhost:9092"); properties.put("connections.max.idle.ms", 10000); properties.put("request.timeout.ms", 5000); try (AdminClient client = KafkaAdminClient.create(properties)) { ListTopicsResult topics = client.listTopics(); Set<String> names = topics.names().get(); if (names.isEmpty()) { // case: if no topic found. } return true; } catch (InterruptedException | ExecutionException e) { // Kafka is not available }
你的答案