目前发现的问题
- 无法自动创建topic,需要手动创建topic再写入数据
- 能生产数据,但是无法消费,消费者一直空等
- 使用kafka-comsumer-groups.sh会报TimeoutException
- 会莫名的报数据量太大的错,但其实数据很小。
麻烦大佬们帮忙看看是部署文件哪里错了,谢谢。
部署文件
apiVersion: v1
kind: Service
metadata:
name: zookeeper
namespace: default-platform
labels:
app: zookeeper
spec:
clusterIP: None # Headless Service
ports:
- port: 2181
name: zookeeper
selector:
app: zookeeper
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: zookeeper
namespace: default-platform
labels:
app: zookeeper
spec:
serviceName: zookeeper
replicas: 1 # Zookeeper 节点数
selector:
matchLabels:
app: zookeeper
template:
metadata:
labels:
app: zookeeper
spec:
containers:
- name: zookeeper
image: zookeeper:3.8 # Zookeeper 镜像
ports:
- containerPort: 2181 # Zookeeper 服务端口
env:
- name: ALLOW_ANONYMOUS_LOGIN
value: "yes"
volumeMounts:
- name: zookeeper-data
mountPath: /bitnami/zookeeper # Zookeeper 数据目录
volumeClaimTemplates:
- metadata:
name: zookeeper-data
namespace: default-platform
spec:
accessModes: [ "ReadWriteMany" ]
storageClassName: "managed-nfs-storage-delete" # 使用 Delete 策略的 StorageClass
resources:
requests:
storage: 5Gi # 每个 Zookeeper 节点的存储大小
---
apiVersion: v1
kind: Service
metadata:
name: kafka-service
namespace: default-platform
labels:
app: kafka
spec:
ports:
- port: 9092
name: internal
targetPort: 9092
- port: 30121
name: external
targetPort: 30121
nodePort: 30121
type: NodePort
selector:
app: kafka
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
namespace: default-platform
labels:
app: kafka
spec:
serviceName: kafka
replicas: 1 # Kafka 集群的节点数
selector:
matchLabels:
app: kafka
template:
metadata:
labels:
app: kafka
spec:
containers:
- name: kafka
image:bitnami/kafka:3.0.0
ports:
- containerPort: 9092 # Kafka 服务端口
resources:
requests:
cpu: "0.1"
memory: "1Gi"
limits:
cpu: "2"
memory: "2500Mi"
env:
- name: ALLOW_PLAINTEXT_LISTENER
value: "yes"
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
- name: KAFKA_LISTENERS
value: "INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:30121"
- name: KAFKA_ADVERTISED_LISTENERS
value: "INTERNAL://kafka-0.kafka.default-platform.svc.cluster.local:9092,EXTERNAL://192.168.*.*:30121"
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: "INTERNAL"
- name: KAFKA_ZOOKEEPER_CONNECT
value: "zookeeper:2181"
volumeMounts:
- name: kafka-data
mountPath: /var/lib/kafka/data # Kafka 数据目录
volumeClaimTemplates:
- metadata:
name: kafka-data
namespace: default-platform
spec:
accessModes: [ "ReadWriteMany" ]
storageClassName: "managed-nfs-storage-delete" # 使用 Delete 策略的 StorageClass
resources:
requests:
storage: 10Gi # 每个 Kafka 节点的存储大小
问题3报错信息
I have no name!@kafka-0:/$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-consumer-group
Error: Executing consumer group command failed due to org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeGroups(api=FIND_COORDINATOR)
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeGroups(api=FIND_COORDINATOR)
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$describeConsumerGroups$1(ConsumerGroupCommand.scala:543)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeConsumerGroups(ConsumerGroupCommand.scala:542)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupsOffsets(ConsumerGroupCommand.scala:558)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeGroups(ConsumerGroupCommand.scala:367)
at kafka.admin.ConsumerGroupCommand$.run(ConsumerGroupCommand.scala:72)
at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:59)
at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeGroups(api=FIND_COORDINATOR)
问题4报错信息:
[2025-02-25 18:19:47,260] WARN [SocketServer listenerType=ZK_BROKER, nodeId=1020] Unexpected error from /10.244.6.0; closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1195725856 larger than 104857600)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:105)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:452)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:402)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576)
at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
at kafka.network.Processor.poll(SocketServer.scala:989)
at kafka.network.Processor.run(SocketServer.scala:892)
at java.base/java.lang.Thread.run(Thread.java:829)
你在容器里运行,不能用localhost这个地址,因为kafka对外绑定:
- name: KAFKA_ADVERTISED_LISTENERS value: "INTERNAL://kafka-0.kafka.default-platform.svc.cluster.local:9092,EXTERNAL://192.168.*.*:30121
所以你应该是:
kafka-consumer-groups.sh --bootstrap-server 192.168.*.*:30121 --describe --group my-consumer-group
我尝试了,不管用内部还是外部连接。一样会报问题3描述的错误
你的答案