Kafka的管理Java客户端,支持管理和检查topic、broker、配置和ACL。
所需的最小broker版本是0.10.0.0
。有更严格要求的方法将指定所需的最小 broker 版本。
这个客户端是在0.11.0.0
中引入的,API还在不断发展。我们将尝试以兼容的方式演进API,但我们保留在必要时在次要版本中进行破坏性更改的权利。一旦API被认为是稳定的,我们将更新InterfaceStability注解
和本通知。
方法如下
- alterClientQuotas
- alterConfigs
- alterConsumerGroupOffsets
- alterPartitionReassignments
- alterReplicaLogDirs
- alterUserScramCredentials
- close
- close
- createAcls
- createDelegationToken
- createPartitions
- createTopics
- deleteAcls
- deleteConsumerGroupOffsets
- deleteConsumerGroups
- deleteRecords
- deleteTopics
- describeAcls
- describeClientQuotas
- describeCluster
- describeConfigs
- describeConsumerGroups
- describeDelegationToken
- describeFeatures
- describeLogDirs
- describeReplicaLogDirs
- describeTopics
- describeUserScramCredentials
- describeUserScramCredentials
- electLeaders
- electPreferredLeaders
- electPreferredLeaders
- expireDelegationToken
- incrementalAlterConfigs
- listConsumerGroupOffsets
- listConsumerGroups
- listOffsets
- listPartitionReassignments
- listPartitionReassignments
- listPartitionReassignments
- listPartitionReassignments
- listTopics
- renewDelegationToken
示例
创建Topic
// bootstrapServers 如 localhost:9092
private void createTopics(String bootstrapServers) {
Properties properties = new Properties();
properties.put("bootstrap.servers", bootstrapServers);
properties.put("connections.max.idle.ms", 10000);
properties.put("request.timeout.ms", 5000);
try (AdminClient client = AdminClient.create(properties)) {
CreateTopicsResult result = client.createTopics(Arrays.asList(
new NewTopic("topic1", 1, (short) 1),
new NewTopic("topic2", 1, (short) 1),
new NewTopic("topic3", 1, (short) 1)
));
try {
result.all().get();
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
}
}
topic列表
private void listTopics(String bootstrapServers) {
Properties properties = new Properties();
properties.put("bootstrap.servers", bootstrapServers);
properties.put("connections.max.idle.ms", 10000);
properties.put("request.timeout.ms", 5000);
try (AdminClient client = AdminClient.create(properties)) {
ListTopicsResult result = client.listTopics();
try {
result.listings().get().forEach(topic -> {
System.out.println(topic);
});
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
}
}
输出
(name=topic1, internal=false)
(name=topic2, internal=false)
(name=topic3, internal=false)
...
增加分区
Properties properties = new Properties();
properties.put("bootstrap.servers", bootstrapServers);
properties.put("connections.max.idle.ms", 10000);
properties.put("request.timeout.ms", 5000);
try (AdminClient client = AdminClient.create(properties)) {
Map newPartitions = new HashMap<>();
// 增加到2个
newPartitions.put("topic1", NewPartitions.increaseTo(2));
CreatePartitionsResult rs = client.createPartitions(newPartitions);
try {
rs.all().get();
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
}
有关Admin API的更多信息,请参见javadoc.