KafkaOffsetMonitor:监控消费者和延迟的队列

一个小应用程序来监视kafka消费者的进度和它们的延迟的队列。

KafkaOffsetMonitor是用来实时监控Kafka集群中的consumer以及在队列中的位置(偏移量)。

你可以查看当前的消费者组,每个topic队列的所有partition的消费情况。可以很快地知道每个partition中的消息是否很快被消费以及相应的队列消息增长速度等信息。这些可以debug kafka的producer和consumer,你完全知道你的系统将会发生什么。

这个web管理平台保留的partition offset和consumer滞后的历史数据(具体数据保存多少天我们可以在启动的时候配置),所以你可以很轻易了解这几天consumer消费情况。

KafkaOffsetMonitor这款软件是用Scala代码编写的,消息等历史数据是保存在名为offsetapp.db数据库文件中,该数据库是SQLLite文件,非常的轻量级。虽然我们可以在启动KafkaOffsetMonitor程序的时候指定数据更新的频率和数据保存的时间,但是不建议更新很频繁,或者保存大量的数据,因为在KafkaOffsetMonitor图形展示的时候会出现图像展示过慢,或者是直接导致内存溢出了。

所有的关于消息的偏移量、kafka集群的数量等信息都是从Zookeeper中获取到的,日志大小是通过计算得到的。

消费者组列表

screenshot

消费组的topic列表

screenshot

图中参数含义解释如下:

topic:创建时topic名称
partition:分区编号
offset:表示该parition已经消费了多少条message
logSize:表示该partition已经写了多少条message
Lag:表示有多少条message没有被消费。
Owner:表示消费者
Created:该partition创建时间
Last Seen:消费状态刷新最新时间。

topic的历史位置

screenshot

Offset存储位置

kafka能灵活地管理offset,可以选择任意存储和格式来保存offset。KafkaOffsetMonitor目前支持以下流行的存储格式。

  • kafka0.8版本以前,offset默认存储在zookeeper中(基于Zookeeper)
  • kafka0.9版本以后,offset默认存储在内部的topic中(基于Kafka内部的topic)
  • Storm Kafka Spout(默认情况下基于Zookeeper)

KafkaOffsetMonitor每个运行的实例只能支持单一类型的存储格式。

下载

可以到github下载KafkaOffsetMonitor源码。

https://github.com/quantifind/KafkaOffsetMonitor

编译KafkaOffsetMonitor命令:

sbt/sbt assembly

不过不建议你自己去下载,因为编译的jar包里引入的都是外部的css和js,所以打开必须联网,都是国外的地址,你编译的时候还要修改js路径,我已经搞定了,你直接下载就好了。

百度云盘:https://pan.baidu.com/s/1kUZJrCV

启动

编译完之后,将会在KafkaOffsetMonitor根目录下生成一个类似KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar的jar文件。这个文件包含了所有的依赖,我们可以直接启动它:

java -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar \
     com.quantifind.kafka.offsetapp.OffsetGetterWeb \
     --offsetStorage kafka \
     --zk zk-server1,zk-server2 \
     --port 8080 \
     --refresh 10.seconds \
     --retain 2.days

启动方式2,创建脚本,因为您可能不是一个kafka集群。用脚本可以启动多个。

vim mobile_start_en.sh
        nohup java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb 
       --offsetStorage kafka
       --zk 127.0.0.1:2181  
       --port 8080      
       --refresh 10.seconds      
       --retain 2.days 1>mobile-logs/stdout.log 2>mobile-logs/stderr.log &

各个参数的含义:

  • offsetStorage:有效的选项是"zookeeper","kafka","storm"。0.9版本以后,offset存储的位置在kafka。
  • zk: zookeeper的地址
  • prot 端口号
  • refresh 刷新频率,更新到DB。
  • retain 保留DB的时间
  • dbName 在哪里存储记录(默认'offsetapp')





发表于: 3年前   最后更新时间: 2年前   游览量:16644
上一条: kafka审核
下一条: Kafka Manager

评论…


  • kafka 1.1.0 启动之后,打开页面,菜单能出来,页面没内容,点那个菜单都没反应,后台没报错,什么情况
    你好,我的kafka版本 kafka_2.12-1.1.0 ,按照你的流程部署,
    点击:
    Kafka Cluster Visualization
    Loading ...一直这样(我昨天做的SASL认证,是不是需要额外处理什么)
    topic list 我点击了是正常的能看到所有的 topic
    其他的点击没啥反应不显示东西
    另外我看了下日志报了 空指针异常:
    INFO  ConsumerFetcherManager:68 - [ConsumerFetcherManager-1541677023239] Added fetcher for partitions ArrayBuffer()
    2018-11-08 19:38:51 WARN  ConsumerFetcherManager$LeaderFinderThread:89 - [KafkaOffsetMonitor-1541677023090_server1.hzguode.com-1541677023185-3dacb5b0-leader-finder-thread], Failed to find leader for Set([__consumer_offsets,16], [__consumer_offsets,2], [__consumer_offsets,39], [__consumer_offsets,15], [__consumer_offsets,33], [__consumer_offsets,38], [__consumer_offsets,9], [__consumer_offsets,7], [__consumer_offsets,31], [__consumer_offsets,48], [__consumer_offsets,24], [__consumer_offsets,19], [__consumer_offsets,11], [__consumer_offsets,22], [__consumer_offsets,26], [__consumer_offsets,17], [__consumer_offsets,10], [__consumer_offsets,35], [__consumer_offsets,36], [__consumer_offsets,34], [__consumer_offsets,49], [__consumer_offsets,44], [__consumer_offsets,23], [__consumer_offsets,40], [__consumer_offsets,47], [__consumer_offsets,45], [__consumer_offsets,43], [__consumer_offsets,46], [__consumer_offsets,37], [__consumer_offsets,28], [__consumer_offsets,27], [__consumer_offsets,25], [__consumer_offsets,6], [__consumer_offsets,5], [__consumer_offsets,3], [__consumer_offsets,18], [__consumer_offsets,14], [__consumer_offsets,41], [__consumer_offsets,42], [__consumer_offsets,4], [__consumer_offsets,12], [__consumer_offsets,21], [__consumer_offsets,30], [__consumer_offsets,1], [__consumer_offsets,32], [__consumer_offsets,13], [__consumer_offsets,8], [__consumer_offsets,20], [__consumer_offsets,0], [__consumer_offsets,29])
    java.lang.NullPointerException
            at org.apache.kafka.common.utils.Utils.formatAddress(Utils.java:312)
            at kafka.cluster.Broker.connectionString(Broker.scala:62)
            at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
            at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
            at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
            at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
            at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
            at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
            at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
            at scala.collection.AbstractTraversable.map(Traversable.scala:105)
            at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:89)
            at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
            at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

    麻烦大神抽空帮忙给看下 搞了一上午 头大....
    进入某个topic后Unable to find Active Consumers,其实有个消费端,测试好多条发送消息和接受消息,在zk上也有consumer信息。
    在kafka-manager上能看得到,kafkaMonitor上没有,这是怎么回事呀
    请问要监控的kafka有ssl证书的话,KafkaOffsetMonitor可以正常监控吗?还是启动方式会不一样呢?谢谢你!
    你好,我按照你的方法运行了kafkaoffsetmonitor,但是发现界面没内容,而服务器上面的stdout.log日志打印了很多,kafka版本是2.11-0.10.2.0,下面是我的运行命令:
    java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --offsetStorage kafka --zk zk1:2181,zk2:2181,zk3:2181/log/kafka --port 8090 --refresh 10.seconds --retain 2.days 1>mobile-logs/stdout.log 2>mobile-logs/stderr.log &
    你好,我这里看不到consumer group 成员,只能看到topic list,
    KafkaOffsetMonitor适用于Kafka 0.10.x版本吗?
    KafkaOffsetMonitor监控页面访问地址,ip:port.
    我通过百度云下载的kafkamanager,我想监控kafkaspout消费组,但是在ui上面没有看到。
    环境:storm 1.1.0 ,kafka-2.11-0.10,storm-kafka-client-1.1.0。不知道有碰到过的没?
    • 有的。我使用kafka-consumer-group.sh --new-consumer --bootstrap-server apa2:9092 --list 命令可以看到消费组,但是通过kafka-consumer-group.sh --describe --new-consumer --bootstrap-server apa2:9092  --group kafkaspout22 却没有显示更详细的信息。
        • 使用--offsetStorage zookeeper 也看不到。但是可以看到console-consumer的消费组。
          不知道使用kafka manager有没有监控过storm kafkaspout消费组?希望能指导一下具体如何配置。
            这个可以支持在有kerberos和acl的集群上运行么
          • 评论…
            • in this conversation