有什么方法可以在java中获得特定主题的消费者列表吗?到现在为止,我能够获得主题列表:
final ListTopicsResult listTopicsResult = adminClient.listTopics();
KafkaFuture<Set<String>> kafkaFuture = listTopicsResult.names();
Set<String> map = kafkaFuture.get();
但我还没有找到一种方法来获得每个主题的消费者列表。
有什么方法可以在java中获得特定主题的消费者列表吗?到现在为止,我能够获得主题列表:
final ListTopicsResult listTopicsResult = adminClient.listTopics();
KafkaFuture<Set<String>> kafkaFuture = listTopicsResult.names();
Set<String> map = kafkaFuture.get();
但我还没有找到一种方法来获得每个主题的消费者列表。
我最近正在为我的kafka客户端工具解决同样的问题。挺麻烦的,但我从代码中发现的唯一方法是如下:
Properties props = ...//here you put your properties AdminClient kafkaClient = AdminClient.create(props); //Here you get all the consumer groups List<String> groupIds = kafkaClient.listConsumerGroups().all().get(). stream().map(s -> s.groupId()).collect(Collectors.toList()); //Here you get all the descriptions for the groups Map<String, ConsumerGroupDescription> groups = kafkaClient. describeConsumerGroups(groupIds).all().get(); for (final String groupId : groupIds) { ConsumerGroupDescription descr = groups.get(groupId); //find if any description is connected to the topic with topicName Optional<TopicPartition> tp = descr.members().stream(). map(s -> s.assignment().topicPartitions()). flatMap(coll -> coll.stream()). filter(s -> s.topic().equals(topicName)).findAny(); if (tp.isPresent()) { //you found the consumer, so collect the group id somewhere } }
用的是kafkadmin客户端,参考来自:
你的答案