使用Java管理kafka集群

原创
半兽人 发表于: 2020-12-23   最后更新时间: 2021-08-12 10:38:22  
{{totalSubscript}} 订阅, 11,816 游览

Kafka的管理Java客户端,支持管理和检查topic、broker、配置和ACL。

所需的最小broker版本是0.10.0.0。有更严格要求的方法将指定所需的最小 broker 版本。

这个客户端是在0.11.0.0中引入的,API还在不断发展。我们将尝试以兼容的方式演进API,但我们保留在必要时在次要版本中进行破坏性更改的权利。一旦API被认为是稳定的,我们将更新InterfaceStability注解和本通知。

方法如下

  • alterClientQuotas
  • alterConfigs
  • alterConsumerGroupOffsets
  • alterPartitionReassignments
  • alterReplicaLogDirs
  • alterUserScramCredentials
  • close
  • close
  • createAcls
  • createDelegationToken
  • createPartitions
  • createTopics
  • deleteAcls
  • deleteConsumerGroupOffsets
  • deleteConsumerGroups
  • deleteRecords
  • deleteTopics
  • describeAcls
  • describeClientQuotas
  • describeCluster
  • describeConfigs
  • describeConsumerGroups
  • describeDelegationToken
  • describeFeatures
  • describeLogDirs
  • describeReplicaLogDirs
  • describeTopics
  • describeUserScramCredentials
  • describeUserScramCredentials
  • electLeaders
  • electPreferredLeaders
  • electPreferredLeaders
  • expireDelegationToken
  • incrementalAlterConfigs
  • listConsumerGroupOffsets
  • listConsumerGroups
  • listOffsets
  • listPartitionReassignments
  • listPartitionReassignments
  • listPartitionReassignments
  • listPartitionReassignments
  • listTopics
  • renewDelegationToken

示例

创建Topic

// bootstrapServers 如 localhost:9092
private void createTopics(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put("bootstrap.servers", bootstrapServers);
    properties.put("connections.max.idle.ms", 10000);
    properties.put("request.timeout.ms", 5000);
    try (AdminClient client = AdminClient.create(properties)) {
        CreateTopicsResult result = client.createTopics(Arrays.asList(
                new NewTopic("topic1", 1, (short) 1),
                new NewTopic("topic2", 1, (short) 1),
                new NewTopic("topic3", 1, (short) 1)
        ));
        try {
            result.all().get();
        } catch (InterruptedException | ExecutionException e) {
            throw new IllegalStateException(e);
        }
    }
}

topic列表

private void listTopics(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put("bootstrap.servers", bootstrapServers);
    properties.put("connections.max.idle.ms", 10000);
    properties.put("request.timeout.ms", 5000);
    try (AdminClient client = AdminClient.create(properties)) {
        ListTopicsResult result = client.listTopics();
        try {
            result.listings().get().forEach(topic -> {
                System.out.println(topic);
            });
        } catch (InterruptedException | ExecutionException e) {
            throw new IllegalStateException(e);
        }
    }
}

输出

(name=topic1, internal=false)
(name=topic2, internal=false)
(name=topic3, internal=false)
...

增加分区

Properties properties = new Properties();
properties.put("bootstrap.servers", bootstrapServers);
properties.put("connections.max.idle.ms", 10000);
properties.put("request.timeout.ms", 5000);

try (AdminClient client = AdminClient.create(properties)) {
    Map newPartitions = new HashMap<>();
    // 增加到2个
    newPartitions.put("topic1", NewPartitions.increaseTo(2));
    CreatePartitionsResult rs = client.createPartitions(newPartitions);
    try {
        rs.all().get();
    } catch (InterruptedException | ExecutionException e) {
        throw new IllegalStateException(e);
    }
}

有关Admin API的更多信息,请参见javadoc.

更新于 2021-08-12

查看kafka更多相关的文章或提一个关于kafka的问题,也可以与我们一起分享文章