java.net.SocketTimeoutException kafka写入消费失败

小白晒太阳 发表于: 2017-06-12   最后更新时间: 2017-06-12 13:11:29   5,252 游览

今天又出现这个问题了,看来跟advertiesed.listeners 无关。

消费端:

sms-consumer-group1_zw_78_64-1496632739724-69516149-leader-finder-thread:602964627]-[WARN] Fetching topic metadata with correlation id 2 for topics [Set(sms)] from broker [id:176,host:10.17.24.176,port:9092] failed
java.net.SocketTimeoutException
    at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201)
    at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
    at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
    at kafka.utils.Utils$.read(Utils.scala:380)
    at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
    at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
    at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

生产者:

2017-06-12 10:46:52[qtp958382397-99:591474422]-[WARN] Failed to send producer request with correlation id 234660 to broker 176 with data for partitions [sms,3]
java.net.SocketTimeoutException
    at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229)
    at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
    at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
    at kafka.utils.Utils$.read(Utils.scala:380)
    at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
    at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
    at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
    at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
    at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
    at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
    at scala.collection.Iterator$class.foreach(Iterator.scala:772)
    at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
    at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
    at kafka.producer.Producer.send(Producer.scala:77)
    at kafka.javaapi.producer.Producer.send(Producer.scala:33)
    at com.sohu.tv.ugc.kafka.util.service.impl.ProducerServiceImpl.sendMessage(ProducerServiceImpl.java:78)
    at com.sohu.sms.api.controller.SMSController.sendMessage(SMSController.java:151)
    at com.sohu.sms.api.controller.SMSController.send(SMSController.java:123)
    at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.springframework.web.method.support.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:219)
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:132)
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:104)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandleMethod(RequestMappingHandlerAdapter.java:745)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:686)
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:80)
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:925)
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:856)
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:936)
    at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:838)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:812)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:812)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1669)
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:88)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1652)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:585)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
    at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:577)
    at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:223)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)
    at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:215)
    at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:110)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
    at org.eclipse.jetty.server.Server.handle(Server.java:499)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:311)
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:257)
    at org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:544)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
    at java.lang.Thread.run(Thread.java:744)

搞不明白是为啥,服务原本跑了一个星期都是OK的,但是今天突然就出问题了。
看kafka server.log打印
[2017-06-12 10:45:51,425] INFO Rolled new log segment for 'sms-3' in 2 ms. (kafka.log.Log),也没有错误信息

今天又出现这个问题了,看来跟advertiesed.listeners 无关。
消费端:

sms-consumer-group1_zw_78_64-1496632739724-69516149-leader-finder-thread:602964627]-[WARN] Fetching topic metadata with correlation id 2 for topics [Set(sms)] from broker [id:176,host:10.17.24.176,port:9092] failed
java.net.SocketTimeoutException
    at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201)
    at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
    at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
    at kafka.utils.Utils$.read(Utils.scala:380)
    at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
    at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
    at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

生产者:

2017-06-12 10:46:52[qtp958382397-99:591474422]-[WARN] Failed to send producer request with correlation id 234660 to broker 176 with data for partitions [sms,3]
java.net.SocketTimeoutException
    at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229)
    at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
    at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
    at kafka.utils.Utils$.read(Utils.scala:380)
    at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
    at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
    at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
    at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
    at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
    at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
    at scala.collection.Iterator$class.foreach(Iterator.scala:772)
    at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
    at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
    at kafka.producer.Producer.send(Producer.scala:77)
    at kafka.javaapi.producer.Producer.send(Producer.scala:33)
    at com.sohu.tv.ugc.kafka.util.service.impl.ProducerServiceImpl.sendMessage(ProducerServiceImpl.java:78)
    at com.sohu.sms.api.controller.SMSController.sendMessage(SMSController.java:151)
    at com.sohu.sms.api.controller.SMSController.send(SMSController.java:123)
    at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.springframework.web.method.support.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:219)
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:132)
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:104)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandleMethod(RequestMappingHandlerAdapter.java:745)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:686)
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:80)
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:925)
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:856)
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:936)
    at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:838)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:812)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:812)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1669)
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:88)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1652)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:585)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
    at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:577)
    at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:223)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)
    at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:215)
    at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:110)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
    at org.eclipse.jetty.server.Server.handle(Server.java:499)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:311)
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:257)
    at org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:544)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
    at java.lang.Thread.run(Thread.java:744)

搞不明白是为啥,服务原本跑了一个星期都是OK的,但是今天突然就出问题了。
看kafka server.log打印
[2017-06-12 10:45:51,425] INFO Rolled new log segment for 'sms-3' in 2 ms. (kafka.log.Log),也没有错误信息

发表于 2017-06-12
添加评论

超时,是网络问题,可能是你集群中的某台有问题,ping hostname保证ok

ping都没有问题,是正常的

防火墙全关了么

服务原本跑了一个星期都是OK的

所以才有理由怀疑。
1、环境发生了变化
2、集群中的某台有问题,可以看下kafka日志。

kafka日志就打印了那一条log,没有错误信息。
我发现了一个可疑的点,我的kafka集群中有2个broker所在的机器存在大量的CLOSE_WAIT。

CLOSE_WAIT 385
ESTABLISHED 8

均来自机器A ,我在机器A上做了这样的事情,每分钟用java去执行bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker -zookeeper ${zk} -topic ${topic} -group ${group}kafka自带的脚本。

感觉像是请求太频繁,kafka broker 连接未释放导致再有新的请求就出问题了。

一分钟一次还好啦。你把你的配置发一下

什么配置,是指kafka 配置吗,在https://www.orchome.com/570有贴过

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

你配认证了?

加了DESCRIBE权限,给上面说的机器A。

你的答案

查看kafka相关的其他问题或提一个您自己的问题