兄弟,你确定版本对了吗?我版本和客户端都改成2.1了,模拟的一切正常。
Properties properties = new Properties();
properties.put("bootstrap.servers", "172.168.xx.xx:9092");
properties.put("connections.max.idle.ms", 10000);
properties.put("request.timeout.ms", 5000);
try (AdminClient client = AdminClient.create(properties)) {
Map newPartitions = new HashMap<>();
newPartitions.put("topic1", NewPartitions.increaseTo(2));
CreatePartitionsResult rs = client.createPartitions(newPartitions);
try {
rs.all().get();
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
}
你好,不好意思啊,我再问下。kafka服务端是2.10的版本。 Java里使用的kafka-clients2.1
的。
Map newPartitions=new HashMap<>();
newPartitions.put(topic, NewPartitions.increaseTo(getConcurrency(topic)));
CreatePartitionsResult result = client.createPartitions(newPartitions);
System.out.println("topic修改分区结果:"+result.all().get());
这样还是会有报错:org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support CREATE_PARTITIONS
你好,我是想问java代码里有没有增加分区的实现?你发的脚本语句是可以增加分区,只是在Java代码里不会实现。请问有方法吗?谢谢
分区扩容
# kafka版本 < 2.2
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2
# kafka版本 >= 2.2
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic topic1 --partitions 2