kafka无法获取ReplicaFetcherThread
[2016-06-15 17:19:09,277] WARN [ReplicaFetcherThread-0-2], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@4e52c0e7 (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 2 was disconnected before the response was read
at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:87)
at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:84)
at scala.Option.foreach(Option.scala:257)
at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84)
at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80)
at kafka.utils.NetworkClientBlockingOps$.recursivePoll$2(NetworkClientBlockingOps.scala:137)
at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143)
at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:80)
at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244)
at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229)
at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
下面是state-change.log
[2016-06-15 17:20:05,176] TRACE Broker 0 cached leader info (LeaderAndIsrInfo:(Leader:0,ISR:2,0,LeaderEpoch:63,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:0,2) for partition __consumer_offsets-3 in response to UpdateMetadata request sent by controller 3 epoch 21 with correlation id 54 (state.change.logger)
大概4分钟后的日志,自动恢复了
[2016-06-15 17:19:56,843] INFO re-registering broker info in ZK for broker 0 (kafka.server.KafkaHealthcheck$SessionExpireListener)
[2016-06-15 17:19:56,844] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2016-06-15 17:19:56,865] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2016-06-15 17:19:56,866] INFO Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT -> EndPoint(192.168.80.98,9093,PLAINTEXT) (kafka.utils.ZkUtils)
[2016-06-15 17:19:56,866] INFO done re-registering broker (kafka.server.KafkaHealthcheck$SessionExpireListener)
[2016-06-15 17:19:56,866] INFO Subscribing to /brokers/topics path to watch for new topics (kafka.server.KafkaHealthcheck$SessionExpireListener)
是什么问题导致呢,
第一个错是读取响应之前,连接被关闭了,这个错的原因,应该就是下面问题。
zookeeper重新创建了/brokers/ids/0,说明你的kafka节点崩溃导致的(zk的临时节点随着应用的丢失(心跳机制)而清除)
所有,感觉你这个是网络原因导致的,你可以关注一下防火墙的设置。
有些防火墙针会对一些长连接就行断开(这种情况很容易忽视,是运维人员的疏忽)。
防火墙是关闭状态的,还是会出现这个错误(有时候不是很频繁)
我也遇见了这个问题,短暂时间自动回复,查不出什么原因导致。。。
请问你确定了问题原因吗?
你的答案