提问说明
1、根据流量监控,发现周六从中午12点开始,消费就开始有延时了,然后在周日凌晨快2点延时开始恢复。我查看kafka 的日志server.log,发现在这个时间段内日志在不断报错,报错如下:
[2020-06-06 12:59:59,754] WARN [SocketServer brokerId=2] Unexpected error from /10.93.222.159; closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1818848801 larger than 104857600)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:104)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
at kafka.network.Processor.poll(SocketServer.scala:890)
at kafka.network.Processor.run(SocketServer.scala:789)
at java.lang.Thread.run(Thread.java:748)
2、我的kakfa版本是2.3,用的是rsyslog写kafka,rsyslog是8.25版本,用的是librdkakfa是0.9.1版本。
3、我观察写入的流量没有问题,就是消费开始突然有延时,然后自己恢复了。而期间这么久,日志里就报这个错,在延时开始恢复的时候这个报错也没了,时间都对的上。这个报错我看是说写入日志量超过了kafka接收的单次最大量。
所以有两个问题:
- 报这个错是不是日志没法写入就丢了呢
- 这种大量报写入失败,会影响消费吗
kafka默认配置一次socket请求最多处理100MB数据,属性是
server.properteis
中的socket.request.max.bytes
控制。你的请求数量大小约1.7G
,远远大于kafka单次接收的能力了,所以发出warning(警告)。socket.request.max.bytes
batchsize
(不清楚rsyslog是不是对应这个)最后,生产者管生产,是不会影响消费者的(前提你的内存是足够的,因为消费者的消息会优先从内存里获取)。
大佬,请教您一个问题,执行这个命令时报了一个错误
这个错误可能是什么原因引起的呢
请到问题专区提问,这是别人的问题区额。
ps:你的消费者组不存在
我今天研究了下librdkafka,发现它的默认配置 batch.size=1000000,batch.num.messages=10000,按照这个默认配置算,根本达不到kakfa里socket.request.max.bytes配置的100M。我有多个rsyslog服务器在发送数据,难道这个将多个写入同一个topic的请求合并算了吗?
客户端会把topic相同的,分区也相同的合并,对kafka节点来说,是总的。
我没太懂你的意思,客户端合并是什么意思呢,我这里的rsyslog是分布在每一台服务器上,每个都是单独发给kafka,只不过topic是相同的
librdkafka
就是客户端,简单点客户端发送消息到kafka服务端,kafka单节点同一时刻接收的socket总和超过了kafka设置默认最大的100m,所以发出警告。(socket总和里,包含了客户端发送的,副本同步的等)
谢谢您,我大概明白了,还有几个小细节想请教下您。
1、这个socket总和里只有一个topic的数据还是包含多个topic呢。
2、还有这个socket总和是否有一个队列存放数据,kafka间隔一段时间接收一次数据呢。如果是的话,是否可以调小这个间隔时间来接收数据呢
1、包含多个
2、如果调大socket,注意jvm也要跟着增加,防止内存溢出。
kafka间隔一段时间接受消息?没有吧,只能是在客户端控制输入,调小batch的消息数量,或增加kafka节点,分解压力。
多谢,明白了
你的答案