线上有两套Kafka集群:
- 集群A版本:0.10.0.1
- 集群B版本:0.10.2.1
近期由于集群B资源紧张,故从集群A中下线了3台机器,准备添加至集群B中,下线步骤如下:
- 迁移该三台机器上的Topic副本至其他Broker上
- 分别在三台机器上运行bin/kafka-server-stop.sh命令,关闭Kafka服务
- 清理三台机器上的Kafka服务的安装目录、数据目录及日志目录。
下线后,集群A运行正常。
然后,把三台集群依次添加至集群B中,添加后,三台机器上的server.log中均有WARN
日志如下:
[2021-10-15 11:12:12,140] INFO [Kafka Server 12], started (kafka.server.KafkaServer)
[2021-10-15 11:12:12,323] WARN Attempting to send response via channel for which there is no open connection, connection id 5 (kafka.network.Processor)
[2021-10-15 11:12:18,445] WARN Attempting to send response via channel for which there is no open connection, connection id 1 (kafka.network.Processor)
[2021-10-15 11:12:29,527] WARN Attempting to send response via channel for which there is no open connection, connection id 3 (kafka.network.Processor)
[2021-10-15 11:12:31,585] WARN Attempting to send response via channel for which there is no open connection, connection id 1 (kafka.network.Processor)
[2021-10-15 11:12:31,728] WARN Attempting to send response via channel for which there is no open connection, connection id 10 (kafka.network.Processor)
[2021-10-15 11:12:57,526] WARN Attempting to send response via channel for which there is no open connection, connection id 0 (kafka.network.Processor)
同时集群A中的各个Broker也开始报ERROR
日志如下:
[2021-10-15 11:13:02,187] ERROR Closing socket for xxx:9092-xxxx:49691 because of error (kafka.network.Processor)
kafka.network.InvalidRequestException: Error getting request for apiKey: 3 and apiVersion: 2
at kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:95)
at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:87)
at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:488)
at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:483)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.network.Processor.processCompletedReceives(SocketServer.scala:483)
at kafka.network.Processor.run(SocketServer.scala:413)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: Invalid version for API key 3: 2
at org.apache.kafka.common.protocol.ProtoUtils.schemaFor(ProtoUtils.java:31)
at org.apache.kafka.common.protocol.ProtoUtils.requestSchema(ProtoUtils.java:44)
at org.apache.kafka.common.protocol.ProtoUtils.parseRequest(ProtoUtils.java:60)
at org.apache.kafka.common.requests.MetadataRequest.parse(MetadataRequest.java:96)
at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:48)
at kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:92)
... 10 more
请问是我Broker从集群A中下线的方式有误吗,为什么三台Broker下线之后没异常,添加至集群B中会导致集群A服务报错?
有没有哪位大神遇到过此类情况。
应该是连错zk了。
反复检查了好几遍,zk没连错,三台Broker配置的都是集群B的zk目录,除了zk有没有其他需要清理的元数据
kafka是通过broker来确认身份的,通过zk来绑定集群的。
你起B,A会有反应,说明问题在zk。
你报错的信息就是API版本差异呀,说明集群版本成员版本不一致。
你zk的脏数据影响的,或者删,或者broker.id换成新的。
ps(你补充一下2个kafka连接zk的配置,内部ip的话,暴露外网也没关系,尽量真实,我要确认集群)
我两套用的一个zk,配置不同的根目录
集群A的zk配置:
集群B的zk配置:
没其他问题了,超出我的知识圈了。
我们再查查看是不是有其他问题,非常感谢!
你的答案