返回到文章

采纳

编辑于 4年前

使用Java管理kafka集群

使用Java管理kafka集群
kafka
kafka
接口

Kafka的管理Java客户端,支持管理和检查topic、broker、配置和ACL。

所需的最小broker版本是0.10.0.0。有更严格要求的方法将指定所需的最小 broker 版本。

这个客户端是在0.11.0.0中引入的,API还在不断发展。我们将尝试以兼容的方式演进API,但我们保留在必要时在次要版本中进行破坏性更改的权利。一旦API被认为是稳定的,我们将更新InterfaceStability注解和本通知。

方法如下

  • alterClientQuotas
  • alterConfigs
  • alterConsumerGroupOffsets
  • alterPartitionReassignments
  • alterReplicaLogDirs
  • alterUserScramCredentials
  • close
  • close
  • createAcls
  • createDelegationToken
  • createPartitions
  • createTopics
  • deleteAcls
  • deleteConsumerGroupOffsets
  • deleteConsumerGroups
  • deleteRecords
  • deleteTopics
  • describeAcls
  • describeClientQuotas
  • describeCluster
  • describeConfigs
  • describeConsumerGroups
  • describeDelegationToken
  • describeFeatures
  • describeLogDirs
  • describeReplicaLogDirs
  • describeTopics
  • describeUserScramCredentials
  • describeUserScramCredentials
  • electLeaders
  • electPreferredLeaders
  • electPreferredLeaders
  • expireDelegationToken
  • incrementalAlterConfigs
  • listConsumerGroupOffsets
  • listConsumerGroups
  • listOffsets
  • listPartitionReassignments
  • listPartitionReassignments
  • listPartitionReassignments
  • listPartitionReassignments
  • listTopics
  • renewDelegationToken

示例

创建Topic

// bootstrapServers 如 localhost:9092
private void createTopics(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put("bootstrap.servers", bootstrapServers);
    properties.put("connections.max.idle.ms", 10000);
    properties.put("request.timeout.ms", 5000);
    try (AdminClient client = AdminClient.create(properties)) {
        CreateTopicsResult result = client.createTopics(Arrays.asList(
                new NewTopic("topic1", 1, (short) 1),
                new NewTopic("topic2", 1, (short) 1),
                new NewTopic("topic3", 1, (short) 1)
        ));
        try {
            result.all().get();
        } catch (InterruptedException | ExecutionException e) {
            throw new IllegalStateException(e);
        }
    }
}

topic列表

private void listTopics(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put("bootstrap.servers", bootstrapServers);
    properties.put("connections.max.idle.ms", 10000);
    properties.put("request.timeout.ms", 5000);
    try (AdminClient client = AdminClient.create(properties)) {
        ListTopicsResult result = client.listTopics();
        try {
            result.listings().get().forEach(topic -> {
                System.out.println(topic);
            });
        } catch (InterruptedException | ExecutionException e) {
            throw new IllegalStateException(e);
        }
    }
}

输出

(name=topic1, internal=false)
(name=topic2, internal=false)
(name=topic3, internal=false)
...

增加分区

Properties properties = new Properties();
properties.put("bootstrap.servers", bootstrapServers);
properties.put("connections.max.idle.ms", 10000);
properties.put("request.timeout.ms", 5000);

try (AdminClient client = AdminClient.create(properties)) {
    Map newPartitions = new HashMap<>();
    // 增加到2个
    newPartitions.put("topic1", NewPartitions.increaseTo(2));
    CreatePartitionsResult rs = client.createPartitions(newPartitions);
    try {
        rs.all().get();
    } catch (InterruptedException | ExecutionException e) {
        throw new IllegalStateException(e);
    }
}

有关Admin API的更多信息,请参见javadoc.