kafka集群本来正常,重启节点后,网络请求空闲率NetworkProcessorAvgIdlePercent
突然变的很低,低于1%,虽然节点正常,但是集群性能很低。
kafka版本:2.1.1-1.1.0
9 broker 40c 256g 物理机 40topic 每个40-80分区,3副本
主要用于日志收集,所有容器应用有个agent采集日志然后发送到kafka
broker配置如下:
broker.id=1
zookeeper.connect=
log.dirs=/data/data1/kafka_1.1.0_data_bak,/data/data2/kafka_1.1.0_data_bak,/data/data3/kafka_1.1.0_data_bak,/data/data4/kafka_1.1.0_data_bak,/data/data5/kafka_1.1.0_data_bak
message.max.bytes=52428800
num.network.threads=16
num.io.threads=32
listeners=PLAINTEXT://
socket.send.buffer.bytes=10485760
socket.receive.buffer.bytes=10485760
socket.request.max.bytes=104857600
num.partitions=40
log.segment.bytes=1073741824
log.retention.hours=120
log.retention.check.interval.ms=300000
log.cleaner.enable=true
log.flush.interval.messages=1000
log.flush.scheduler.interval.ms=1000
auto.create.topics.enable=false
default.replication.factor=3
replica.socket.receive.buffer.bytes=10485760
replica.fetch.max.bytes=52428800
num.replica.fetchers=4
zookeeper.session.timeout.ms=120000
zookeeper.connection.timeout.ms=60000
controlled.shutdown.enable=true
controlled.shutdown.max.retries=3
controlled.shutdown.retry.backoff.ms=5000
auto.leader.rebalance.enable=true
num.recovery.threads.per.data.dir=1
delete.topic.enable=true
所有broker server.log日志一直报错:
Attempting to send response via channel for which there is no open connection, connection id xxxx-xxxx53695-854953 (kafka.network.Processor)
以下尝试均失败:
- 修改data过期时间为1小时,然后重启节点
- 重新搭建一个新集群,即删除zk和kafka所有日志,将自动创建topic改为手动,依次创建后,网络请求空闲率就越来越低
- 修改参数
num.network.threads
为40,请求空闲率虽然上升了,但是主机cpu打到98%,然后改为16
之前集群都是正常的,我猜是应用容器越来越多,采集的日志也越来越多,kafka支撑不住了,需要扩容吗?
也可能是应用agent连接kafka的连接超时时间太短吗,其中agent的连接kafka核心代码如下:
/** 重试等待时间,毫秒 */
private final String DEF_RETRY_BACKOFF_MS = "100";
/** 请求超时时间,毫秒 */
private final String DEF_REQUEST_TIMEOUT_MS = "600";
try{
reCreateProducer(properties);
}catch (Throwable t){
Logger.error("创建kafka生产者实例异常,进行重试创建...",t);
Thread redo = new Thread(new Runnable() {
int time = 1;
public void run() {
while (true){
if(time>5){
return ;
}
//构造实例
Logger.debug("重试创建kafka生产者实例{0}",time++);
try {
reCreateProducer(properties);
Thread.sleep(3000);
} catch (Throwable e) {
Logger.error("创建kafka生产者实例异常",e);
}
}
}
});
请论坛里的专家帮看下
另外主机参数配置了net.ipv4.tcp_tw_reuse = 1 net.ipv4.tcp_tw_recycle = 1,不知道和这有关没,之前没配置的时候有次主机句柄和连接数暴涨,主机都挂了了。
net.ipv4.tcp_tw_recycle = 1
你这个参数很面熟啊,容器化的话不能启用它。好面熟,我明天确认下是不是这个参数。
1、副本3个,日志2个副本就可以了吧。(相互备份,3倍消耗)
2、自动创建和手动创建topic应该没啥区别吧,就创建一次。
3、超时时间,是会影响,主要是如果老是超时,你就会要重试,一条消息如果重试几次,网络请求自然就更多了。
谢谢大佬回复,现在情况如下:
send response via channel for which there is no open connection,connection id broker ip:16667-应用ip:59454-1976888 (kafka.network.Processor)
里面的59454是应用连接kafka的端口吧,1976888
是啥,id吗4.使用
lsof -n|awk '{print $2}'|sort|uniq -c|sort -nr|grep
查看kafka进程打开的文件数竟然有300多万,但是主机配置ulimit -a
显示1024000
,主机监控报表显示句柄数正常,5万不到,执行命令的时候需要几分钟才能返回,是不是命令返回的是几分钟内打开的文件数总和?5.能否通过扩容节点解决目前的问题呢?
15秒-30秒,默认超时时间即可,集群越热,超时时间应越长,否则会导致雪崩。
是可以通过扩容解决目前的问题。
but,从你提供的数据分析(网络连接过多)我个人觉得你的客户端程序写的有问题,kakfa的生产者是长连接,一旦建立,就用该连接持续发送消息,你是不是发完一条消息就关了,重新建立?
应用的客户端运行了2年了,7月份出过一次故障把文件句柄耗光了,之前从未出现过这种问题。现在应用也没反应有啥问题,也能消费,但是总感觉是个不定时炸弹。
现在发现所有kafka进程有211个线程,32个线程cpu利用率都在99.1以上(和num.io.threads=32配置一致),其他线程cpu利用率低于0.5,通过jstack发现:
"kafka-request-handler-29" #111 daemon prio=5 os_prio=0 tid=0x00007f0d59653800 nid=0x7dd1 runnable [0x00007f0a46c4a000] java.lang.Thread.State: RUNNABLE at java.util.concurrent.locks.ReentrantReadWriteLock$Sync.tryReleaseShared(ReentrantReadWriteLock.java:440) at java.util.concurrent.locks.AbstractQueuedSynchronizer.releaseShared(AbstractQueuedSynchronizer.java:1341) at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.unlock(ReentrantReadWriteLock.java:881) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:252) at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:256) at kafka.server.MetadataCache.kafka$server$MetadataCache$$getAliveEndpoint(MetadataCache.scala:107) at kafka.server.MetadataCache$$anonfun$kafka$server$MetadataCache$$getEndpoints$1.apply$mcVI$sp(MetadataCache.scala:57) at kafka.server.MetadataCache$$anonfun$kafka$server$MetadataCache$$getEndpoints$1.apply(MetadataCache.scala:56) at kafka.server.MetadataCache$$anonfun$kafka$server$MetadataCache$$getEndpoints$1.apply(MetadataCache.scala:56) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at kafka.server.MetadataCache.kafka$server$MetadataCache$$getEndpoints(MetadataCache.scala:56) at kafka.server.MetadataCache$$anonfun$kafka$server$MetadataCache$$getPartitionMetadata$1$$anonfun$apply$1.apply(MetadataCache.scala:84) at kafka.server.MetadataCache$$anonfun$kafka$server$MetadataCache$$getPartitionMetadata$1$$anonfun$apply$1.apply(MetadataCache.scala:69) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at kafka.server.MetadataCache$$anonfun$kafka$server$MetadataCache$$getPartitionMetadata$1.apply(MetadataCache.scala:69) at kafka.server.MetadataCache$$anonfun$kafka$server$MetadataCache$$getPartitionMetadata$1.apply(MetadataCache.scala:68) at scala.Option.map(Option.scala:146) at kafka.server.MetadataCache.kafka$server$MetadataCache$$getPartitionMetadata(MetadataCache.scala:68) at kafka.server.MetadataCache$$anonfun$getTopicMetadata$1$$anonfun$apply$8.apply(MetadataCache.scala:117) at kafka.server.MetadataCache$$anonfun$getTopicMetadata$1$$anonfun$apply$8.apply(MetadataCache.scala:116) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at kafka.server.MetadataCache$$anonfun$getTopicMetadata$1.apply(MetadataCache.scala:116) at kafka.server.MetadataCache$$anonfun$getTopicMetadata$1.apply(MetadataCache.scala:116) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:256) at kafka.server.MetadataCache.getTopicMetadata(MetadataCache.scala:115) at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:937) at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:1015) at kafka.server.KafkaApis.handle(KafkaApis.scala:107) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at java.lang.Thread.run(Thread.java:745)
这有哪些可能造成的呢。
kafka集群安装完成之后,就会进行一轮压测。压力固定的情况下,连接和openfile是相对持平的。
建议你也拿客户端进行一轮压测来观察,是否是客户端的问题。
您好,这个情况我也遇到了,请问您是如何解决的?
肯定是应用有问题。我啥也没做,发现应用容器重启下,网络空闲利用率就会上来,两周后集群回复到90%以上了,然后warn日志越来越少,最后就没有了。
你的答案