通过K8S部署KAFKA只能生产无法消费的问题。

? 发表于: 2025-02-26   最后更新时间: 2025-02-27 09:45:41   93 游览

目前发现的问题

  1. 无法自动创建topic,需要手动创建topic再写入数据
  2. 能生产数据,但是无法消费,消费者一直空等
  3. 使用kafka-comsumer-groups.sh会报TimeoutException
  4. 会莫名的报数据量太大的错,但其实数据很小。

麻烦大佬们帮忙看看是部署文件哪里错了,谢谢。

部署文件

apiVersion: v1
kind: Service
metadata:
  name: zookeeper
  namespace: default-platform
  labels:
    app: zookeeper
spec:
  clusterIP: None  # Headless Service
  ports:
  - port: 2181
    name: zookeeper
  selector:
    app: zookeeper

---

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: zookeeper
  namespace: default-platform
  labels:
    app: zookeeper
spec:
  serviceName: zookeeper
  replicas: 1  # Zookeeper 节点数
  selector:
    matchLabels:
      app: zookeeper
  template:
    metadata:
      labels:
        app: zookeeper
    spec:
      containers:
      - name: zookeeper
        image: zookeeper:3.8  # Zookeeper 镜像
        ports:
        - containerPort: 2181  # Zookeeper 服务端口
        env:
        - name: ALLOW_ANONYMOUS_LOGIN
          value: "yes"
        volumeMounts:
        - name: zookeeper-data
          mountPath: /bitnami/zookeeper  # Zookeeper 数据目录
  volumeClaimTemplates:
  - metadata:
      name: zookeeper-data
      namespace: default-platform
    spec:
      accessModes: [ "ReadWriteMany" ]
      storageClassName: "managed-nfs-storage-delete"  # 使用 Delete 策略的 StorageClass
      resources:
        requests:
          storage: 5Gi  # 每个 Zookeeper 节点的存储大小

---

apiVersion: v1
kind: Service
metadata:
  name: kafka-service
  namespace: default-platform
  labels:
    app: kafka
spec:
  ports:
  - port: 9092
    name: internal
    targetPort: 9092
  - port: 30121
    name: external
    targetPort: 30121
    nodePort: 30121
  type: NodePort
  selector:
    app: kafka

---

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
  namespace: default-platform
  labels:
    app: kafka
spec:
  serviceName: kafka
  replicas: 1  # Kafka 集群的节点数
  selector:
    matchLabels:
      app: kafka
  template:
    metadata:
      labels:
        app: kafka
    spec:
      containers:
      - name: kafka
        image:bitnami/kafka:3.0.0
        ports:
        - containerPort: 9092  # Kafka 服务端口
        resources:
          requests:
            cpu: "0.1"
            memory: "1Gi"
          limits:
            cpu: "2"
            memory: "2500Mi"
        env:
          - name: ALLOW_PLAINTEXT_LISTENER
            value: "yes"
          - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
            value: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
          - name: KAFKA_LISTENERS
            value: "INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:30121"
          - name: KAFKA_ADVERTISED_LISTENERS
            value: "INTERNAL://kafka-0.kafka.default-platform.svc.cluster.local:9092,EXTERNAL://192.168.*.*:30121"
          - name: KAFKA_INTER_BROKER_LISTENER_NAME
            value: "INTERNAL"
          - name: KAFKA_ZOOKEEPER_CONNECT
            value: "zookeeper:2181"
        volumeMounts:
        - name: kafka-data
          mountPath: /var/lib/kafka/data  # Kafka 数据目录
  volumeClaimTemplates:
  - metadata:
      name: kafka-data
      namespace: default-platform
    spec:
      accessModes: [ "ReadWriteMany" ]
      storageClassName: "managed-nfs-storage-delete"  # 使用 Delete 策略的 StorageClass
      resources:
        requests:
          storage: 10Gi  # 每个 Kafka 节点的存储大小

问题3报错信息

I have no name!@kafka-0:/$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-consumer-group

Error: Executing consumer group command failed due to org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeGroups(api=FIND_COORDINATOR)
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeGroups(api=FIND_COORDINATOR)
        at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
        at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
        at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$describeConsumerGroups$1(ConsumerGroupCommand.scala:543)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at scala.collection.TraversableLike.map(TraversableLike.scala:286)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108)
        at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeConsumerGroups(ConsumerGroupCommand.scala:542)
        at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupsOffsets(ConsumerGroupCommand.scala:558)
        at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeGroups(ConsumerGroupCommand.scala:367)
        at kafka.admin.ConsumerGroupCommand$.run(ConsumerGroupCommand.scala:72)
        at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:59)
        at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeGroups(api=FIND_COORDINATOR)

问题4报错信息:

[2025-02-25 18:19:47,260] WARN [SocketServer listenerType=ZK_BROKER, nodeId=1020] Unexpected error from /10.244.6.0; closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1195725856 larger than 104857600)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:105)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:452)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:402)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576)
at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
at kafka.network.Processor.poll(SocketServer.scala:989)
at kafka.network.Processor.run(SocketServer.scala:892)
at java.base/java.lang.Thread.run(Thread.java:829)
发表于 2025-02-26
?
添加评论

你在容器里运行,不能用localhost这个地址,因为kafka对外绑定:

- name: KAFKA_ADVERTISED_LISTENERS
  value: "INTERNAL://kafka-0.kafka.default-platform.svc.cluster.local:9092,EXTERNAL://192.168.*.*:30121

所以你应该是:

 kafka-consumer-groups.sh --bootstrap-server 192.168.*.*:30121 --describe --group my-consumer-group
? -> 半兽人 14天前

我尝试了,不管用内部还是外部连接。一样会报问题3描述的错误

你的答案

查看kafka相关的其他问题或提一个您自己的问题