出于某些监控的需要,需要根据topic名称自动获取当前topic下所有的消费组名称。根据网上资料,使用了AdminClient客户端获取
DescribeConsumerGroupsResult groupResult = AdminClient.describeConsumerGroups(groupIdList);
Map<String, ConsumerGroupDescription> stringConsumerGroupDescriptionMap = groupResult.all().get();
但是获取到的结果中 ConsumerGroupDescription 下的members字段皆为空,且state为empty,如下图所示
想请教下,为什么获取不到信息,或有什么方法可以直接获取到topic下的所有消费组
感谢
listConsumerGroups
参考:使用Java管理kafka集群
这个命令有使用,但是正如问题描述的,我这里是查不到消费组下topic的信息的。不知道是否是因为我这里消费时,是用consumer.assign()方法手动指定消费位置,导致kafka无法管理消费组导致的
你的消费者组不活跃了,自然就找不到了:
public List<String> activeConsumerByTopic(String topicName) { List<String> lists = new ArrayList<>(); try (AdminClient client = KafkaAdminFactory.getInstance()) { try { // get all consumer groupId List<String> groupIds = client.listConsumerGroups().all().get().stream().map(s -> s.groupId()).collect(Collectors.toList()); // Here you get all the descriptions for the groups Map<String, ConsumerGroupDescription> groups = client.describeConsumerGroups(groupIds).all().get(); for (final String groupId : groupIds) { ConsumerGroupDescription descr = groups.get(groupId); // find if any description is connected to the topic with topicName Optional<TopicPartition> tp = descr.members().stream(). map(s -> s.assignment().topicPartitions()). flatMap(coll -> coll.stream()). filter(s -> s.topic().equals(topicName)).findAny(); if (tp.isPresent()) { // you found the consumer, so collect the group id somewhere lists.add(descr.groupId()); } } } catch (InterruptedException | ExecutionException e) { throw new IllegalStateException(e); } } return lists; }
你可以安装一个我重写的kafka监控,看看是否可以获取到,参考:KafkaOffsetMonitor raft版:监控消费者和延迟的队列
如果能达到你的预期,参考核心功能代码:
https://github.com/orchome/KafkaOffsetMonitor/blob/main/src/main/java/www/orchome/com/kafka/core/KafkaService.java
非常感谢回答,我会尝试的
补充一下最后的解决办法,也许会有人用到
我们的Kafka监控用的是kafka manager,是由雅虎公司开源发布的,现已更名为CMAK,
该监控系统提供了一套API,用以获取kafk集群的一些信息,
其中便有获取所有消费组的接口,定义如下:
GET /api/status/:cluster/consumersSummary 其中:cluster为kafka客户端名称
使用代码调用API的话,需要加上Http的基础验证,使用方法百度一下即可
其他详细API地址为:https://github.com/yahoo/CMAK/blob/master/conf/routes
你的答案