kafka获取topic下所有消费组信息

G1-JVM 发表于: 2022-07-07   最后更新时间: 2022-07-07 14:59:22   2,383 游览

出于某些监控的需要,需要根据topic名称自动获取当前topic下所有的消费组名称。根据网上资料,使用了AdminClient客户端获取

DescribeConsumerGroupsResult groupResult = AdminClient.describeConsumerGroups(groupIdList);

Map<String, ConsumerGroupDescription> stringConsumerGroupDescriptionMap = groupResult.all().get();

但是获取到的结果中 ConsumerGroupDescription 下的members字段皆为空,且state为empty,如下图所示

想请教下,为什么获取不到信息,或有什么方法可以直接获取到topic下的所有消费组

感谢

发表于 2022-07-07
¥1.0

listConsumerGroups

参考:使用Java管理kafka集群

G1-JVM -> 半兽人 2年前

这个命令有使用,但是正如问题描述的,我这里是查不到消费组下topic的信息的。不知道是否是因为我这里消费时,是用consumer.assign()方法手动指定消费位置,导致kafka无法管理消费组导致的

半兽人 -> G1-JVM 2年前

你的消费者组不活跃了,自然就找不到了:

public List<String> activeConsumerByTopic(String topicName) {
        List<String> lists = new ArrayList<>();
        try (AdminClient client = KafkaAdminFactory.getInstance()) {
            try {
                // get all consumer groupId
                List<String> groupIds = client.listConsumerGroups().all().get().stream().map(s -> s.groupId()).collect(Collectors.toList());
                // Here you get all the descriptions for the groups
                Map<String, ConsumerGroupDescription> groups = client.describeConsumerGroups(groupIds).all().get();
                for (final String groupId : groupIds) {
                    ConsumerGroupDescription descr = groups.get(groupId);
                    // find if any description is connected to the topic with topicName
                    Optional<TopicPartition> tp = descr.members().stream().
                            map(s -> s.assignment().topicPartitions()).
                            flatMap(coll -> coll.stream()).
                            filter(s -> s.topic().equals(topicName)).findAny();
                    if (tp.isPresent()) {
                        // you found the consumer, so collect the group id somewhere
                        lists.add(descr.groupId());
                    }
                }
            } catch (InterruptedException | ExecutionException e) {
                throw new IllegalStateException(e);
            }
        }
        return lists;
    }

你可以安装一个我重写的kafka监控,看看是否可以获取到,参考:KafkaOffsetMonitor raft版:监控消费者和延迟的队列

如果能达到你的预期,参考核心功能代码:
https://github.com/orchome/KafkaOffsetMonitor/blob/main/src/main/java/www/orchome/com/kafka/core/KafkaService.java

G1-JVM -> 半兽人 2年前

非常感谢回答,我会尝试的

补充一下最后的解决办法,也许会有人用到
我们的Kafka监控用的是kafka manager,是由雅虎公司开源发布的,现已更名为CMAK,
该监控系统提供了一套API,用以获取kafk集群的一些信息,
其中便有获取所有消费组的接口,定义如下:
GET /api/status/:cluster/consumersSummary 其中:cluster为kafka客户端名称
使用代码调用API的话,需要加上Http的基础验证,使用方法百度一下即可
其他详细API地址为:https://github.com/yahoo/CMAK/blob/master/conf/routes

你的答案

查看kafka相关的其他问题或提一个您自己的问题