Tlink

0 声望

这家伙太懒,什么都没留下

个人动态
  • 岁月神偷 回复 TlinkKafkaOffsetMonitor:监控消费者和延迟的队列 中 :

    解决了吗?我也是一直刷这个错误

    4年前
  • 细水长流 贬了 在 KafkaOffsetMonitor:监控消费者和延迟的队列 的评论!

    我用的21.1的版本跟你一样的情况,也是这个错误,web也只能看到topic

    5年前
  • 一一一一一 回复 TlinkKafkaOffsetMonitor:监控消费者和延迟的队列 中 :
    package com.quantifind.kafka.core
    import java.nio.ByteBuffer
    import com.quantifind.kafka.OffsetGetter.OffsetInfo
    import com.quantifind.kafka.OffsetGetter
    import com.quantifind.utils.ZkUtilsWrapper
    import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
    import kafka.common.{OffsetAndMetadata, TopicAndPartition}
    import kafka.consumer.{ConsumerConnector, KafkaStream}
    import kafka.message.MessageAndMetadata
    import kafka.utils.Logging
    import org.I0Itec.zkclient.ZkClient
    import scala.collection._
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.Future
    import scala.util.control.NonFatal
    class KafkaOffsetGetter(theZkClient: ZkClient, zkUtils: ZkUtilsWrapper = new ZkUtilsWrapper) extends OffsetGetter {
      import KafkaOffsetGetter._
      override val zkClient = theZkClient
      override def processPartition(group: String, topic: String, pid: Int): Option[OffsetInfo] = {
        try {
          zkUtils.getLeaderForPartition(zkClient, topic, pid) match {
            case Some(bid) =>
              val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(bid))
              consumerOpt flatMap { consumer =>
                  val topicAndPartition = TopicAndPartition(topic, pid)
                  offsetMap.get(GroupTopicPartition(group, topicAndPartition)) map { offsetMetaData =>
                    val request =
                      OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
                    val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
                    OffsetInfo(group = group,
                      topic = topic,
                      partition = pid,
                      offset = offsetMetaData.offset,
                      logSize = logSize,
                      owner = Some("NA"))
                  }
              }
            case None =>
              error("No broker for partition %s - %s".format(topic, pid))
              None
          }
        } catch {
          case NonFatal(t) =>
            error(s"Could not parse partition info. group: [$group] topic: [$topic]", t)
            None
        }
      }
      override def getGroups: Seq[String] = {
        topicAndGroups.groupBy(_.group).keySet.toSeq
      }
      override def getTopicList(group: String): List[String] = {
        topicAndGroups.filter(_.group == group).groupBy(_.topic).keySet.toList
      }
      override def getTopicMap: Map[String, scala.Seq[String]] = {
        topicAndGroups.groupBy(_.topic).mapValues(_.map(_.group).toSeq)
      }
      override def getActiveTopicMap: Map[String, Seq[String]] = {
        getTopicMap
      }
    }
    object KafkaOffsetGetter extends Logging {
      val ConsumerOffsetTopic = "__consumer_offsets"
      val offsetMap: mutable.Map[GroupTopicPartition, OffsetAndMetadata] = mutable.HashMap()
      val topicAndGroups: mutable.Set[TopicAndGroup] = mutable.HashSet()
      def startOffsetListener(consumerConnector: ConsumerConnector) = {
        Future {
          try {
            logger.info("Staring Kafka offset topic listener")
            val offsetMsgStream: KafkaStream[Array[Byte], Array[Byte]] = consumerConnector
              .createMessageStreams(Map(ConsumerOffsetTopic -> 1))
              .get(ConsumerOffsetTopic).map(_.head) match {
              case Some(s) => s
              case None => throw new IllegalStateException("Cannot create a consumer stream on offset topic ")
            }
            val it = offsetMsgStream.iterator()
            while (true) {
              try {
                val offsetMsg: MessageAndMetadata[Array[Byte], Array[Byte]] = it.next()
                val commitKey: GroupTopicPartition = readMessageKey(ByteBuffer.wrap(offsetMsg.key()))
                val commitValue: OffsetAndMetadata = readMessageValue(ByteBuffer.wrap(offsetMsg.message()))
                info("Processed commit message: " + commitKey + " => " + commitValue)
                offsetMap += (commitKey -> commitValue)
                topicAndGroups += TopicAndGroup(commitKey.topicPartition.topic, commitKey.group)
                info(s"topicAndGroups = $topicAndGroups")
              } catch {
                case e: RuntimeException =>
                  // sometimes offsetMsg.key() || offsetMsg.message() throws NPE
                  warn("Failed to process one of the commit message due to exception. The 'bad' message will be skipped", e)
              }
            }
          } catch {
            case e: Throwable =>
              fatal("Offset topic listener aborted dur to unexpected exception", e)
              System.exit(1)
          }
        }
      }
      // massive code stealing from kafka.server.OffsetManager
      import java.nio.ByteBuffer
      import org.apache.kafka.common.protocol.types.Type.{INT32, INT64, STRING}
      import org.apache.kafka.common.protocol.types.{Field, Schema, Struct}
      private case class KeyAndValueSchemas(keySchema: Schema, valueSchema: Schema)
      private val OFFSET_COMMIT_KEY_SCHEMA_V0 = new Schema(new Field("group", STRING),
                                                           new Field("topic", STRING),
                                                           new Field("partition", INT32))
      private val KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("group")
      private val KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("topic")
      private val KEY_PARTITION_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("partition")
      private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", INT64),
                                                             new Field("metadata", STRING, "Associated metadata.", ""),
                                                             new Field("timestamp", INT64))
      private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", INT64),
                                                             new Field("metadata", STRING, "Associated metadata.", ""),
                                                             new Field("commit_timestamp", INT64),
                                                             new Field("expire_timestamp", INT64))
      private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", INT64),
                                                             new Field("metadata", STRING, "Associated metadata.", ""),
                                                             new Field("commit_timestamp", INT64))
      private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(new Field("offset", INT64),
                                                             new Field("leader_epoch", INT32),
                                                             new Field("metadata", STRING, "Associated metadata.", ""),
                                                             new Field("commit_timestamp", INT64))
      private val VALUE_OFFSET_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
      private val VALUE_METADATA_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
      private val VALUE_TIMESTAMP_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")
      private val VALUE_OFFSET_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")
      private val VALUE_METADATA_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")
      private val VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
      private val VALUE_OFFSET_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")
      private val VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
      private val VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
      private val VALUE_OFFSET_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")
      private val VALUE_LEADER_EPOCH_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")
      private val VALUE_METADATA_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")
      private val VALUE_COMMIT_TIMESTAMP_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")
      // private val VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")
      // map of versions to schemas
      private val OFFSET_SCHEMAS = Map(0 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V0),
                                       1 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V1),
        2 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V2),
        3 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V3))
      private def schemaFor(version: Int) = {
        val schemaOpt = OFFSET_SCHEMAS.get(version)
        schemaOpt match {
          case Some(schema) => schema
          case _ => throw new RuntimeException("Unknown offset schema version " + version)
        }
      }
      case class MessageValueStructAndVersion(value: Struct, version: Short)
      case class TopicAndGroup(topic: String, group: String)
      case class GroupTopicPartition(group: String, topicPartition: TopicAndPartition) {
        def this(group: String, topic: String, partition: Int) =
          this(group, new TopicAndPartition(topic, partition))
        override def toString =
          "[%s,%s,%d]".format(group, topicPartition.topic, topicPartition.partition)
      }
      /**
       * Decodes the offset messages' key
       *
       * @param buffer input byte-buffer
       * @return an GroupTopicPartition object
       */
      private def readMessageKey(buffer: ByteBuffer): GroupTopicPartition = {
        val version = buffer.getShort()
        val keySchema = schemaFor(version).keySchema
        val key = keySchema.read(buffer).asInstanceOf[Struct]
        val group = key.get(KEY_GROUP_FIELD).asInstanceOf[String]
        val topic = key.get(KEY_TOPIC_FIELD).asInstanceOf[String]
        val partition = key.get(KEY_PARTITION_FIELD).asInstanceOf[Int]
        GroupTopicPartition(group, TopicAndPartition(topic, partition))
      }
      /**
       * Decodes the offset messages' payload and retrieves offset and metadata from it
       *
       * @param buffer input byte-buffer
       * @return an offset-metadata object from the message
       */
      private def readMessageValue(buffer: ByteBuffer): OffsetAndMetadata = {
        val structAndVersion = readMessageValueStruct(buffer)
        if (structAndVersion.value == null) { // tombstone
          null
        } else {
          if (structAndVersion.version == 0) {
            val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V0).asInstanceOf[Long]
            val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V0).asInstanceOf[String]
            val timestamp = structAndVersion.value.get(VALUE_TIMESTAMP_FIELD_V0).asInstanceOf[Long]
            OffsetAndMetadata(offset, metadata, timestamp)
          } else if (structAndVersion.version == 1) {
            val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V1).asInstanceOf[Long]
            val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V1).asInstanceOf[String]
            val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
            // not supported in 0.8.2
            // val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
            OffsetAndMetadata(offset, metadata, commitTimestamp)
          } else if (structAndVersion.version == 2) {
            val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V2).asInstanceOf[Long]
            val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V2).asInstanceOf[String]
            val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V2).asInstanceOf[Long]
            // not supported in 0.8.2
            // val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
            OffsetAndMetadata(offset, metadata, commitTimestamp)
          }else if(structAndVersion.version == 3){
            val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V3).asInstanceOf[Long]
            val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V3).asInstanceOf[String]
            val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V3).asInstanceOf[Long]
            // not supported in 0.8.2
            // val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
            OffsetAndMetadata(offset, metadata, commitTimestamp)
          } else {
            throw new IllegalStateException("Unknown offset message version: " + structAndVersion.version)
          }
        }
      }
      private def readMessageValueStruct(buffer: ByteBuffer): MessageValueStructAndVersion = {
        if(buffer == null) { // tombstone
          MessageValueStructAndVersion(null, -1)
        } else {
          val version = buffer.getShort()
          val valueSchema = schemaFor(version).valueSchema
          val value = valueSchema.read(buffer).asInstanceOf[Struct]
          MessageValueStructAndVersion(value, version)
        }
      }
    }
    

    改下源码

    5年前
  • 一一一一一 回复 TlinkKafkaOffsetMonitor:监控消费者和延迟的队列 中 :
    package com.quantifind.kafka.core
    import java.nio.ByteBuffer
    import com.quantifind.kafka.OffsetGetter.OffsetInfo
    import com.quantifind.kafka.OffsetGetter
    import com.quantifind.utils.ZkUtilsWrapper
    import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
    import kafka.common.{OffsetAndMetadata, TopicAndPartition}
    import kafka.consumer.{ConsumerConnector, KafkaStream}
    import kafka.message.MessageAndMetadata
    import kafka.utils.Logging
    import org.I0Itec.zkclient.ZkClient
    import scala.collection._
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.Future
    import scala.util.control.NonFatal
    class KafkaOffsetGetter(theZkClient: ZkClient, zkUtils: ZkUtilsWrapper = new ZkUtilsWrapper) extends OffsetGetter {
      import KafkaOffsetGetter._
      override val zkClient = theZkClient
      override def processPartition(group: String, topic: String, pid: Int): Option[OffsetInfo] = {
        try {
          zkUtils.getLeaderForPartition(zkClient, topic, pid) match {
            case Some(bid) =>
              val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(bid))
              consumerOpt flatMap { consumer =>
                  val topicAndPartition = TopicAndPartition(topic, pid)
                  offsetMap.get(GroupTopicPartition(group, topicAndPartition)) map { offsetMetaData =>
                    val request =
                      OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
                    val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
                    OffsetInfo(group = group,
                      topic = topic,
                      partition = pid,
                      offset = offsetMetaData.offset,
                      logSize = logSize,
                      owner = Some("NA"))
                  }
              }
            case None =>
              error("No broker for partition %s - %s".format(topic, pid))
              None
          }
        } catch {
          case NonFatal(t) =>
            error(s"Could not parse partition info. group: [$group] topic: [$topic]", t)
            None
        }
      }
      override def getGroups: Seq[String] = {
        topicAndGroups.groupBy(_.group).keySet.toSeq
      }
      override def getTopicList(group: String): List[String] = {
        topicAndGroups.filter(_.group == group).groupBy(_.topic).keySet.toList
      }
      override def getTopicMap: Map[String, scala.Seq[String]] = {
        topicAndGroups.groupBy(_.topic).mapValues(_.map(_.group).toSeq)
      }
      override def getActiveTopicMap: Map[String, Seq[String]] = {
        getTopicMap
      }
    }
    object KafkaOffsetGetter extends Logging {
      val ConsumerOffsetTopic = "__consumer_offsets"
      val offsetMap: mutable.Map[GroupTopicPartition, OffsetAndMetadata] = mutable.HashMap()
      val topicAndGroups: mutable.Set[TopicAndGroup] = mutable.HashSet()
      def startOffsetListener(consumerConnector: ConsumerConnector) = {
        Future {
          try {
            logger.info("Staring Kafka offset topic listener")
            val offsetMsgStream: KafkaStream[Array[Byte], Array[Byte]] = consumerConnector
              .createMessageStreams(Map(ConsumerOffsetTopic -> 1))
              .get(ConsumerOffsetTopic).map(_.head) match {
              case Some(s) => s
              case None => throw new IllegalStateException("Cannot create a consumer stream on offset topic ")
            }
            val it = offsetMsgStream.iterator()
            while (true) {
              try {
                val offsetMsg: MessageAndMetadata[Array[Byte], Array[Byte]] = it.next()
                val commitKey: GroupTopicPartition = readMessageKey(ByteBuffer.wrap(offsetMsg.key()))
                val commitValue: OffsetAndMetadata = readMessageValue(ByteBuffer.wrap(offsetMsg.message()))
                info("Processed commit message: " + commitKey + " => " + commitValue)
                offsetMap += (commitKey -> commitValue)
                topicAndGroups += TopicAndGroup(commitKey.topicPartition.topic, commitKey.group)
                info(s"topicAndGroups = $topicAndGroups")
              } catch {
                case e: RuntimeException =>
                  // sometimes offsetMsg.key() || offsetMsg.message() throws NPE
                  warn("Failed to process one of the commit message due to exception. The 'bad' message will be skipped", e)
              }
            }
          } catch {
            case e: Throwable =>
              fatal("Offset topic listener aborted dur to unexpected exception", e)
              System.exit(1)
          }
        }
      }
      // massive code stealing from kafka.server.OffsetManager
      import java.nio.ByteBuffer
      import org.apache.kafka.common.protocol.types.Type.{INT32, INT64, STRING}
      import org.apache.kafka.common.protocol.types.{Field, Schema, Struct}
      private case class KeyAndValueSchemas(keySchema: Schema, valueSchema: Schema)
      private val OFFSET_COMMIT_KEY_SCHEMA_V0 = new Schema(new Field("group", STRING),
                                                           new Field("topic", STRING),
                                                           new Field("partition", INT32))
      private val KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("group")
      private val KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("topic")
      private val KEY_PARTITION_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("partition")
      private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", INT64),
                                                             new Field("metadata", STRING, "Associated metadata.", ""),
                                                             new Field("timestamp", INT64))
      private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", INT64),
                                                             new Field("metadata", STRING, "Associated metadata.", ""),
                                                             new Field("commit_timestamp", INT64),
                                                             new Field("expire_timestamp", INT64))
      private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", INT64),
                                                             new Field("metadata", STRING, "Associated metadata.", ""),
                                                             new Field("commit_timestamp", INT64))
      private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(new Field("offset", INT64),
                                                             new Field("leader_epoch", INT32),
                                                             new Field("metadata", STRING, "Associated metadata.", ""),
                                                             new Field("commit_timestamp", INT64))
      private val VALUE_OFFSET_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
      private val VALUE_METADATA_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
      private val VALUE_TIMESTAMP_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")
      private val VALUE_OFFSET_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")
      private val VALUE_METADATA_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")
      private val VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
      private val VALUE_OFFSET_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")
      private val VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
      private val VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
      private val VALUE_OFFSET_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")
      private val VALUE_LEADER_EPOCH_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")
      private val VALUE_METADATA_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")
      private val VALUE_COMMIT_TIMESTAMP_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")
      // private val VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")
      // map of versions to schemas
      private val OFFSET_SCHEMAS = Map(0 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V0),
                                       1 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V1),
        2 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V2),
        3 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V3))
      private def schemaFor(version: Int) = {
        val schemaOpt = OFFSET_SCHEMAS.get(version)
        schemaOpt match {
          case Some(schema) => schema
          case _ => throw new RuntimeException("Unknown offset schema version " + version)
        }
      }
      case class MessageValueStructAndVersion(value: Struct, version: Short)
      case class TopicAndGroup(topic: String, group: String)
      case class GroupTopicPartition(group: String, topicPartition: TopicAndPartition) {
        def this(group: String, topic: String, partition: Int) =
          this(group, new TopicAndPartition(topic, partition))
        override def toString =
          "[%s,%s,%d]".format(group, topicPartition.topic, topicPartition.partition)
      }
      /**
       * Decodes the offset messages' key
       *
       * @param buffer input byte-buffer
       * @return an GroupTopicPartition object
       */
      private def readMessageKey(buffer: ByteBuffer): GroupTopicPartition = {
        val version = buffer.getShort()
        val keySchema = schemaFor(version).keySchema
        val key = keySchema.read(buffer).asInstanceOf[Struct]
        val group = key.get(KEY_GROUP_FIELD).asInstanceOf[String]
        val topic = key.get(KEY_TOPIC_FIELD).asInstanceOf[String]
        val partition = key.get(KEY_PARTITION_FIELD).asInstanceOf[Int]
        GroupTopicPartition(group, TopicAndPartition(topic, partition))
      }
      /**
       * Decodes the offset messages' payload and retrieves offset and metadata from it
       *
       * @param buffer input byte-buffer
       * @return an offset-metadata object from the message
       */
      private def readMessageValue(buffer: ByteBuffer): OffsetAndMetadata = {
        val structAndVersion = readMessageValueStruct(buffer)
        if (structAndVersion.value == null) { // tombstone
          null
        } else {
          if (structAndVersion.version == 0) {
            val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V0).asInstanceOf[Long]
            val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V0).asInstanceOf[String]
            val timestamp = structAndVersion.value.get(VALUE_TIMESTAMP_FIELD_V0).asInstanceOf[Long]
            OffsetAndMetadata(offset, metadata, timestamp)
          } else if (structAndVersion.version == 1) {
            val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V1).asInstanceOf[Long]
            val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V1).asInstanceOf[String]
            val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
            // not supported in 0.8.2
            // val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
            OffsetAndMetadata(offset, metadata, commitTimestamp)
          } else if (structAndVersion.version == 2) {
            val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V2).asInstanceOf[Long]
            val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V2).asInstanceOf[String]
            val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V2).asInstanceOf[Long]
            // not supported in 0.8.2
            // val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
            OffsetAndMetadata(offset, metadata, commitTimestamp)
          }else if(structAndVersion.version == 3){
            val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V3).asInstanceOf[Long]
            val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V3).asInstanceOf[String]
            val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V3).asInstanceOf[Long]
            // not supported in 0.8.2
            // val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
            OffsetAndMetadata(offset, metadata, commitTimestamp)
          } else {
            throw new IllegalStateException("Unknown offset message version: " + structAndVersion.version)
          }
        }
      }
      private def readMessageValueStruct(buffer: ByteBuffer): MessageValueStructAndVersion = {
        if(buffer == null) { // tombstone
          MessageValueStructAndVersion(null, -1)
        } else {
          val version = buffer.getShort()
          val valueSchema = schemaFor(version).valueSchema
          val value = valueSchema.read(buffer).asInstanceOf[Struct]
          MessageValueStructAndVersion(value, version)
        }
      }
    }
    

    改下源码,这个类的代码改成这样就好了

    5年前
  • Maybe so 贬了 在 KafkaOffsetMonitor:监控消费者和延迟的队列 的评论!

    我用的21.1的版本跟你一样的情况,也是这个错误,web也只能看到topic

    5年前
  • Tlink 回复 .....KafkaOffsetMonitor:监控消费者和延迟的队列 中 :

    我用的21.1的版本跟你一样的情况,也是这个错误,web也只能看到topic

    5年前
  • Tlink 回复 半兽人KafkaOffsetMonitor:监控消费者和延迟的队列 中 :

    2.1.1的版本offset在kafka上面吧,--offsetStorage kafka 也只能看到主题,其他的都没有

    5年前