kafka集群重启后,消费者提示无法与leader建立连接

▓千年祇园 卐 发表于: 2022-12-01   最后更新时间: 2022-12-02 00:26:35   1,625 游览

kafka集群重启后,zk 可以看到leader/follower节点,但在一个follower节点上用消费者脚本接收消息,提示无法与leader建立连接

kafka-console-consumer.sh --bootstrap-server 10.168.45.21:9092 --topic k

以下是终端消费者报错信息

Connection to node -2 (/10.168.45.21:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

注:三个节点用zkServer.sh status可以查看其角色follower/leader/follower

[2022-12-01 15:17:58,375] INFO Creating /brokers/ids/1 (is it secure? false) (kafka.zk.KafkaZkClient)
[2022-12-01 15:17:58,397] ERROR Error while creating ephemeral at /brokers/ids/1, node already exists and owner '144199714271002624' does not match current session '72069007191048192' (kafka.zk.KafkaZkClient$CheckedEphemeral)
[2022-12-01 15:17:58,404] ERROR [KafkaServer id=1] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists
    at org.apache.zookeeper.KeeperException.create(KeeperException.java:126)
    at kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1904)
    at kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1842)
    at kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1809)
    at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:96)
    at kafka.server.KafkaServer.startup(KafkaServer.scala:319)
    at kafka.Kafka$.main(Kafka.scala:109)
    at kafka.Kafka.main(Kafka.scala)
[2022-12-01 15:17:58,407] INFO [KafkaServer id=1] shutting down (kafka.server.KafkaServer)
[2022-12-01 15:17:58,407] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1] Stopping socket server request processors (kafka.network.SocketServer)
[2022-12-01 15:17:58,411] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1] Stopped socket server request processors (kafka.network.SocketServer)
[2022-12-01 15:17:58,412] INFO [ReplicaManager broker=1] Shutting down (kafka.server.ReplicaManager)
[2022-12-01 15:17:58,414] INFO [LogDirFailureHandler]: Shutting down (kafka.server.ReplicaManager$LogDirFailureHandler)
[2022-12-01 15:17:58,415] INFO [LogDirFailureHandler]: Shutdown completed (kafka.server.ReplicaManager$LogDirFailureHandler)
[2022-12-01 15:17:58,415] INFO [LogDirFailureHandler]: Stopped (kafka.server.ReplicaManager$LogDirFailureHandler)
[2022-12-01 15:17:58,415] INFO [ReplicaFetcherManager on broker 1] shutting down (kafka.server.ReplicaFetcherManager)
[2022-12-01 15:17:58,417] INFO [ReplicaFetcherManager on broker 1] shutdown completed (kafka.server.ReplicaFetcherManager)
[2022-12-01 15:17:58,417] INFO [ReplicaAlterLogDirsManager on broker 1] shutting down (kafka.server.ReplicaAlterLogDirsManager)
[2022-12-01 15:17:58,418] INFO [ReplicaAlterLogDirsManager on broker 1] shutdown completed (kafka.server.ReplicaAlterLogDirsManager)
[2022-12-01 15:17:58,418] INFO [ExpirationReaper-1-Fetch]: Shutting down (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-12-01 15:17:58,474] INFO [ExpirationReaper-1-Fetch]: Stopped (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-12-01 15:17:58,474] INFO [ExpirationReaper-1-Fetch]: Shutdown completed (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-12-01 15:17:58,475] INFO [ExpirationReaper-1-Produce]: Shutting down (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-12-01 15:17:58,674] INFO [ExpirationReaper-1-Produce]: Stopped (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-12-01 15:17:58,674] INFO [ExpirationReaper-1-Produce]: Shutdown completed (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-12-01 15:17:58,675] INFO [ExpirationReaper-1-DeleteRecords]: Shutting down (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-12-01 15:17:58,675] INFO [ExpirationReaper-1-DeleteRecords]: Shutdown completed (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-12-01 15:17:58,675] INFO [ExpirationReaper-1-DeleteRecords]: Stopped (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-12-01 15:17:58,675] INFO [ExpirationReaper-1-ElectLeader]: Shutting down (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-12-01 15:17:58,675] INFO [ExpirationReaper-1-ElectLeader]: Stopped (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-12-01 15:17:58,675] INFO [ExpirationReaper-1-ElectLeader]: Shutdown completed (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-12-01 15:17:58,681] INFO [ReplicaManager broker=1] Shut down completely (kafka.server.ReplicaManager)
[2022-12-01 15:17:58,682] INFO [BrokerToControllerChannelManager broker=1 name=alterIsr]: Shutting down (kafka.server.BrokerToControllerRequestThread)
[2022-12-01 15:17:58,682] INFO [BrokerToControllerChannelManager broker=1 name=alterIsr]: Stopped (kafka.server.BrokerToControllerRequestThread)
[2022-12-01 15:17:58,682] INFO [BrokerToControllerChannelManager broker=1 name=alterIsr]: Shutdown completed (kafka.server.BrokerToControllerRequestThread)
[2022-12-01 15:17:58,687] INFO Broker to controller channel manager for alterIsr shutdown (kafka.server.BrokerToControllerChannelManagerImpl)
[2022-12-01 15:17:58,687] INFO [BrokerToControllerChannelManager broker=1 name=forwarding]: Shutting down (kafka.server.BrokerToControllerRequestThread)
[2022-12-01 15:17:58,687] INFO [BrokerToControllerChannelManager broker=1 name=forwarding]: Stopped (kafka.server.BrokerToControllerRequestThread)
[2022-12-01 15:17:58,687] INFO [BrokerToControllerChannelManager broker=1 name=forwarding]: Shutdown completed (kafka.server.BrokerToControllerRequestThread)
[2022-12-01 15:17:58,688] INFO Broker to controller channel manager for forwarding shutdown (kafka.server.BrokerToControllerChannelManagerImpl)
[2022-12-01 15:17:58,688] INFO Shutting down. (kafka.log.LogManager)
[2022-12-01 15:17:58,710] INFO Shutdown complete. (kafka.log.LogManager)
[2022-12-01 15:17:58,712] INFO [feature-zk-node-event-process-thread]: Shutting down (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread)
[2022-12-01 15:17:58,712] INFO [feature-zk-node-event-process-thread]: Stopped (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread)
[2022-12-01 15:17:58,712] INFO [feature-zk-node-event-process-thread]: Shutdown completed (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread)
[2022-12-01 15:17:58,712] INFO [ZooKeeperClient Kafka server] Closing. (kafka.zookeeper.ZooKeeperClient)
[2022-12-01 15:17:58,816] INFO Session: 0x1000a6154f00000 closed (org.apache.zookeeper.ZooKeeper)
[2022-12-01 15:17:58,816] INFO EventThread shut down for session: 0x1000a6154f00000 (org.apache.zookeeper.ClientCnxn)
[2022-12-01 15:17:58,817] INFO [ZooKeeperClient Kafka server] Closed. (kafka.zookeeper.ZooKeeperClient)
[2022-12-01 15:17:58,817] INFO [ThrottledChannelReaper-Fetch]: Shutting down (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2022-12-01 15:17:59,449] INFO [ThrottledChannelReaper-Fetch]: Stopped (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2022-12-01 15:17:59,449] INFO [ThrottledChannelReaper-Fetch]: Shutdown completed (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2022-12-01 15:17:59,449] INFO [ThrottledChannelReaper-Produce]: Shutting down (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2022-12-01 15:18:00,449] INFO [ThrottledChannelReaper-Produce]: Stopped (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2022-12-01 15:18:00,449] INFO [ThrottledChannelReaper-Produce]: Shutdown completed (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2022-12-01 15:18:00,449] INFO [ThrottledChannelReaper-Request]: Shutting down (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2022-12-01 15:18:00,450] INFO [ThrottledChannelReaper-Request]: Stopped (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2022-12-01 15:18:00,450] INFO [ThrottledChannelReaper-Request]: Shutdown completed (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2022-12-01 15:18:00,450] INFO [ThrottledChannelReaper-ControllerMutation]: Shutting down (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2022-12-01 15:18:00,451] INFO [ThrottledChannelReaper-ControllerMutation]: Stopped (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2022-12-01 15:18:00,451] INFO [ThrottledChannelReaper-ControllerMutation]: Shutdown completed (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2022-12-01 15:18:00,452] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1] Shutting down socket server (kafka.network.SocketServer)
[2022-12-01 15:18:00,477] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1] Shutdown completed (kafka.network.SocketServer)
[2022-12-01 15:18:00,477] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics)
[2022-12-01 15:18:00,477] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics)
[2022-12-01 15:18:00,477] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics)
[2022-12-01 15:18:00,479] INFO Broker and topic stats closed (kafka.server.BrokerTopicStats)
[2022-12-01 15:18:00,482] INFO App info kafka.server for 1 unregistered (org.apache.kafka.common.utils.AppInfoParser)
[2022-12-01 15:18:00,482] INFO [KafkaServer id=1] shut down completed (kafka.server.KafkaServer)
[2022-12-01 15:18:00,482] ERROR Exiting Kafka. (kafka.Kafka$)
[2022-12-01 15:18:00,483] INFO [KafkaServer id=1] shutting down (kafka.server.KafkaServer)
发表于 2022-12-01
添加评论

[2022-12-01 15:17:58,397] ERROR Error while creating ephemeral at /brokers/ids/1, node already exists and owner '144199714271002624' does not match current session '72069007191048192' (kafka.zk.KafkaZkClient$CheckedEphemeral)

kafka没启来,/brokers/ids/1在zk中已经存在了,导致的。

为何你的zk数据没有被及时释放?

我的run.sh里对于start部分,依次启动3台zk,然后依次启动3台kafka; 对于stop部分,依次停止3台kafka, 然后sleep 12, 最后依次停止3台zookeeper.

启动zk形如:

/opt/local/kafka/zookeeper/apache-zookeeper-3.6.3-bin/bin/zkServer.sh start

启动kafka形如:

/opt/local/kafka/kafka_2.13-3.0.0/bin/kafka-server-start.sh -daemon /opt/local/kafka/kafka_2.13-3.0.0/config/server.properties

停止kafka形如:

/opt/local/kafka/kafka_2.13-3.0.0/bin/kafka-server-stop.sh

停止zk形如:

/opt/local/kafka/zookeeper/apache-zookeeper-3.6.3-bin/bin/zkServer.sh stop

这样的脚本的是否有问题呢

1、停止kafka的时候,休眠太简单了,判断下kafka进程是确认没了,感觉你的kafka没停掉。
2、清理zk的脏数据。

一般来说退出kafka和zk的正确姿势是怎样的?需要在run.sh脚本中每执行一个停止kafka命令,就判断kafka进程有木有?另外,就是zk的脏数据是如何产生的呢,应该怎么清理比较好~
非常感谢!

kafka有时候确实不生效(没解释原因),可以使用kill

kill掉kafka,例如:

> ps | grep server-1.properties
7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java... 
> kill -9 7564

在Windows上使用:

> wmic process where "caption = 'java.exe' and commandline like '%server-1.properties%'" get processid
ProcessId
6016
> taskkill /pid 6016 /f

zk的停止没问题。

嗯嗯,所以比较可靠的方法是先停止kafka集群,等待集群中所有kafka实例退出后,再退出zk集群,这样就不会有上面我说的问题了吧?

zk里存储的都是临时数据,kafka关闭之后,就会释放的。
我是怀疑kafka还没有真正的关闭,导致的你的问题。

有这个可能,非常感谢~

你的答案

查看kafka相关的其他问题或提一个您自己的问题