The broker does not support CREATE_PARTITIONS

稻草人 发表于: 2019-06-13   最后更新时间: 2020-12-23 17:42:19   4,000 游览
AdminClient client = AdminClient.create(props);
List<ConfigEntry> entities = new ArrayList<>();
for (String key : topicParam.getConfig().keySet()) {
    ConfigEntry entity = new ConfigEntry(key, topicParam.getConfig().get(key));
    entities.add(entity);
}
Map<ConfigResource, Config> configs = new HashMap<>();
configs.put(new ConfigResource(ConfigResource.Type.TOPIC,"topic", new Config(entities));
AlterConfigsResult rs = client.alterConfigs(configs);
rs.all().get();

adminclient在增加分区的时候爆出来的 ,请问 我调脚本就可以操作这是什么情况

发表于 2019-06-13
添加评论

你好,我也遇到这个问题,请问你这边有解决方案吗?谢谢

他贴的这个错误不完整,完整的是

org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support CREATE_PARTITIONS

的话,kafka版本太低不支持哦。

你好,我这边报错的全部内容确实和你发得一样。目前的版本是kafka_2.10-0.10.1.0, 请问还有没有其它方式给topic增加分区?谢谢

分区扩容

# kafka版本 < 2.2
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2

# kafka版本 >= 2.2
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic topic1 --partitions 2

你好,我是想问java代码里有没有增加分区的实现?你发的脚本语句是可以增加分区,只是在Java代码里不会实现。请问有方法吗?谢谢

你的客户端和服务端保持一致,解决版本问题应该就可以了。

你好,不好意思啊,我再问下。kafka服务端是2.10的版本。 Java里使用的kafka-clients2.1的。

Map newPartitions=new HashMap<>();
newPartitions.put(topic, NewPartitions.increaseTo(getConcurrency(topic)));
CreatePartitionsResult result = client.createPartitions(newPartitions);
System.out.println("topic修改分区结果:"+result.all().get());

这样还是会有报错:org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support CREATE_PARTITIONS

兄弟,你确定版本对了吗?我版本和客户端都改成2.1了,模拟的一切正常。

Properties properties = new Properties();
properties.put("bootstrap.servers", "172.168.xx.xx:9092");
properties.put("connections.max.idle.ms", 10000);
properties.put("request.timeout.ms", 5000);

try (AdminClient client = AdminClient.create(properties)) {
    Map newPartitions = new HashMap<>();
    newPartitions.put("topic1", NewPartitions.increaseTo(2));
    CreatePartitionsResult rs = client.createPartitions(newPartitions);
    try {
        rs.all().get();
    } catch (InterruptedException | ExecutionException e) {
        throw new IllegalStateException(e);
    }
}
*. -> 半兽人 2年前

大佬你好,我增加topic分区时,也出现了这个问题,但我的kafka版本是3.0.0

你的答案

查看kafka相关的其他问题或提一个您自己的问题