我想查询kafka0.10版本的所有消费者组,网上没有查到用java api的方法,有没有Java API能够查询kafka0.10.版本的所有消费者组的方法?

烟花易冷 发表于: 2018-09-13   最后更新时间: 2018-09-13 09:27:40   4,389 游览

adminClient.listAllConsumerGroups(),这个方法可以吗?我用着调用查出来全是空的,可能是我的kafka上没有消费者组造成的,有用过的可以说一下吗?

发表于 2018-09-13
添加评论

试过了吗?

烟花易冷 -> 半兽人 5年前

我用了 kafka.admin.AdminClient.这个方法下面的几个方法

listAllGroupsFlattened(),listAllConsumerGroupsFlattened()

能够查出来消费者组,但是现在出现了一个新的问题,就是查出来的消费者组,其状态是“dead”的,是查不出来的,这个不知道怎么解决,暂时在查源码,不知道您有没有什么方法。

半兽人 -> 烟花易冷 5年前

额,只能获取活跃的。我之前看过monitor源码,也是这样,自己留了个缓存,记录之前的消费者组。

烟花易冷 -> 半兽人 5年前

还有个问题想请教一下,我用这个命令

./kafka-consumer-groups.sh --bootstrap-server 10.0.59.86:9092,10.0.59.87:9093,10.0.59.88:9094 --list --new-consumer

在kafka中查询了一下,没有消费者组,然后我又用这个命令查询了一下

bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 20 --broker-list 10.0.59.86:9092,10.0.59.87:9093,10.0.59.88:9094 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"

有offset可以查询得到,那是否证明着这个消费者组已经是”dead“的状态了?查询出来的offset这是没有过期被删除?

烟花易冷 -> 半兽人 5年前

还有,我想请教一下,kafka是用GroupCoordinator来进行group的管理的,那么是不是可以在java中实例化一个GroupCoordinator,然后进行kafka的链接,进行group的查询?因为只是知道GroupCoordinator这个的存在,没有看过了解太多,只是有这么一个想法,不知道原理,想请教一下。

半兽人 -> 烟花易冷 5年前

可以证明,offset不会被删,会持续传递。

半兽人 -> 烟花易冷 5年前

具体可以把源码down下来看看。
https://www.orchome.com/621

烟花易冷 -> 半兽人 5年前

又来拜访,请问一下,java能直接读取kafka0.10.2版本的_consumer_offset里面的offset数据吗?我现在已经将Group消费者组全部获取,但是其状态为“dead",我能根据他的groupId,topic,partition这些数据去查询他的offset吗?

半兽人 -> 烟花易冷 5年前
consumer.offsetsForTimes();
consumer.beginningOffsets();
consumer.endOffsets()
半兽人 -> 半兽人 5年前

3种获取offset,通过时间戳获取offset,或开始offset,获取结束offset。

烟花易冷 -> 半兽人 5年前

您好,还是有些不解,再次麻烦,第一种方法返回的是分区也就是topic的存在的第一个消息的offset,如果按照正常的情况,消费者应该消费的下一个消息就应该是这个,提交的也就是这个消息的offset,是这个意思吗?如果是按照我的理解,那么如果中间的消息断掉了,消费者组提交的offset是不是就与这个offset之间有着很大的差值了?如果您有时间,可以帮解答一下吗。。。。

烟花易冷 -> 半兽人 5年前

还是说这个方法查询出来的offset值,也包括了曾经消费过后提交过的offset,

半兽人 -> 烟花易冷 5年前

我给你发的这3个方法,属于查找定位offset,跟正常的消费者流程无关,只是通过这3种方法,来获取offset而已,你可以通过获取的offset重新消费你的消息。是2种流程。

烟花易冷 -> 半兽人 5年前

嗯嗯,刚看完API,了解了,那有没有相应的API,可以查询消费者提交的offset呢?

半兽人 -> 烟花易冷 5年前

你的意思是想看某个消费者组已经消费到哪个位置了吗?

烟花易冷 -> 半兽人 5年前

是的,我现在在用这个方法adminClient.describeConsumerGroup(),但有时候总是调不到。

半兽人 -> 烟花易冷 5年前

我现在有点忙,这种需求你是第一个额,活跃的消费者可以直接打印出来offset,稍后在帮你找,

烟花易冷 -> 半兽人 5年前

好的,麻烦您了,万分感谢

烟花易冷 -> 半兽人 5年前

你好,昨天通过查找API,已经解决了问题,在KafkaConsumer类下,有两个方法,commited和position方法。可以获取消费者组的当前偏移量,只是需要查询的速度慢了些,现在正在优化。

半兽人 -> 烟花易冷 5年前

好奇问一句,你们采集这个干什么呢。一般我们拿offset,主要是故障重放。其他时刻靠监控来告警。

烟花易冷 -> 半兽人 5年前

现在再做个监控,将这个offset取出来,可以进行一些消息的判断,读取了多少,有些时候,消费者组已经dead掉了,但是我们还想看看这个消费者组消费了多少数据,根据这些判断一些信息,而且以前在进行测试的时候,有一些消费者组被放弃了,现在想将这些全部都查看一下,就需要这些数据。

烟花易冷 -> 半兽人 5年前

打扰了,还有个问题需要咨询一下您,我在使用KafkaConsumer这个类的时候,Kafkaconsumer kafkaConsumer = new KafkaConsumer(props)的时候,其在控制台一直输出它的配置,有没有什么办法把它关掉呢?我在log4j.properties中使用这个log4j.logger.org.apache.kafka.clients.consumer=OFF没有效果,还有没有其他的办法吗?

半兽人 -> 烟花易冷 5年前

看看它是哪个包下的,log4j单独为它配置权限。

你好,有问题请教一下您,关于获取offset的值和kafka权限是否有关呢,比如用SCRAM做了权限认证配置后是否会影响到offset的值的获取呢

获取成功了吗 我有相同需求,获取不到消费组list和logsize 比较急用 请回复一下 谢谢

烟花易冷 -> zl 5年前

你可以说一下你是怎么查不的吗,我是自己找的源码,可以说一下你的具体是什么样子的情况吗

zl -> 烟花易冷 5年前

可以加一下 qq或者微信吗? 291928588是qq
我现在的问题是 adminClient 在win下面获取不到consumer  在centos下能够获取到consumer group
在调用adminclient.listgroupoffsets的时候 报只支持到v2 v1不支持 我的kafka版本是2.11_0.10.2

请问,用这个命令

bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server A5-401-NF5280M4-2017-141:9092 --list

查询不到消费者组怎么办?

你的答案

查看kafka相关的其他问题或提一个您自己的问题