kafka生产数据时报TimeoutException异常

sunny 发表于: 2016-06-17   最后更新时间: 2021-08-23 23:11:09   49,150 游览

在生产线程运行过程中,有的生产成功,有的发生下面的异常

记录大小11371
生产的数据条数: 11371
记录大小11392

org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for testTopic9-1

16/06/17 11:02:23 ERROR internals.RecordBatch: Error executing user-provided callback on message for topic-partition testTopic9-1:

生产了数据:16
生产的数据条数: 11392
记录大小11450
生产的数据条数: 11450
记录大小11434
生产的数据条数: 11434
记录大小11410

org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for testTopic9-0

16/06/17 11:02:24 ERROR internals.RecordBatch: Error executing user-provided callback on message for topic-partition testTopic9-0:

生产的数据条数: 11410
记录大小11397
生产的数据条数: 11397

org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for testTopic9-1

16/06/17 11:02:25 ERROR internals.RecordBatch: Error executing user-provided callback on message for topic-partition testTopic9-1:
org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for testTopic9-0

16/06/17 11:02:25 ERROR internals.RecordBatch: Error executing user-provided callback on message for topic-partition testTopic9-0:

生产者主要配置参数如下

#The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will block for max.block.ms after which it will throw an exception.
#This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if compression is enabled) as well as for maintaining in-flight requests.
#The buffer.memory controls the total amount of memory available to the producer for buffering. If records are sent faster than they can be transmitted to the server then this buffer space will be exhausted. When the buffer space is exhausted additional send calls will block. The threshold for time to block is determined by max.block.ms after which it throws a TimeoutException.
#为生产者提供的可控制的内存大小(单位字节)1G  默认值为33554432
buffer.memory=1000000000


# 批量生产的请求大小,单位字节100M  默认值为1048576 The maximum size of a request in bytes. This is also effectively a cap on the maximum record size. Note that the server has its own cap on record size which may be different from this. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests.
#批量发送数据的大小设定为3M时,经过业务封装序列化后变为17M
max.request.size=100000000


#默认值为 131072,设置100M The size of the TCP send buffer (SO_SNDBUF) to use when sending data.   
send.buffer.bytes=100000000

#最长阻塞时间(当buffer.memory的内存不足时,将被阻塞发送数据到集群),单位毫秒 默认值为60000 The configuration controls how long KafkaProducer.send() and KafkaProducer.partitionsFor() will block.These methods can be blocked either because the buffer is full or metadata unavailable.Blocking in the user-supplied serializers or partitioner will not be counted against this timeout.
max.block.ms=60000

#The producer config block.on.buffer.full has been deprecated and will be removed in future release. Currently its default value has been changed to false. The KafkaProducer will no longer throw BufferExhaustedException but instead will use max.block.ms value to block, after which it will throw a TimeoutException. If block.on.buffer.full property is set to true explicitly, it will set the max.block.ms to Long.MAX_VALUE and metadata.fetch.timeout.ms will not be honoured
metadata.fetch.timeout.ms=60000

#压缩类型 The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid values are none, gzip, snappy, or lz4. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).
compression.type=none

#发送失败重试次数,默认值为0 Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries will potentially change the ordering of records because if two records are sent to a single partition, and the first fails and is retried but the second succeeds, then the second record may appear first.
retries=2

#  本地缓存大小,相当于记录总数,单位为字节,通过后台一个线程批量写入kafka集群,每个分区都有缓存,默认值为16384
#The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes.
#No attempt will be made to batch records larger than this size.
#Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent.
#A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the specified batch size in anticipation of additional records.
batch.size=10
发表于 2016-06-17
添加评论

追加:消费端报堆栈空间错误 运行参数: -Xmx3550m -Xms3550m -Xmn2g -Xss128k

INFO consumer.SimpleConsumer: Reconnect due to error:
java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
    at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:85)
    at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:131)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130)
    at kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:108)
    at kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:29)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
16/06/17 11:23:24 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1466133684358] Added fetcher for partitions ArrayBuffer([[testTopic9,1], initOffset 177 to broker BrokerEndPoint(3,WINDOWS-87VRQR5,9093)] )
16/06/17 11:23:26 WARN consumer.ConsumerFetcherThread: [ConsumerFetcherThread-CSVIndexToSolrConsumer_WINDOWS-87VRQR5-1466133684211-dd910f5a-0-3], Error in fetch kafka.consumer.ConsumerFetcherThread$FetchRequest@4b572e1c
java.lang.OutOfMemoryError: Java heap space
無名 -> sunny 8年前

错误信息太多了,但是重要信息太少。

我姑且认为你是有多台生产机器,其中有部分是报timeout的,有部分是可用的。而且生产机器是网络环境是一致的情况下。

从日志来看,java.nio.channels.ClosedChannelException

这个错误很明显了,就是网络问题。

排查:
1、检查所有生产者机器与集群地址是否是ok的。
2、排查集群的配置是否有问题。
3、防火墙问题,部分防火墙短连接是ok的,但是会kill长连接

sunny -> 無名 8年前

现在消费端加大了heap space,没有问题了,生产者只启动一个线程,将数据发送到一个topic(两个分区,备份数为3),broker节点数量为3,每次发送3M的数据量,看了官网的配置,不清楚advertised.listeners,listeners参数有没有关系,我的配置:

listeners=PLAINTEXT://127.0.0.1:9092  
host.name=127.0.0.1
無名 -> sunny 8年前

你这么配置有问题的啊。居然能调通。那你生产者和服务器是同一台机器?
advertised.listeners是绑定出口地址的

sunny -> 無名 8年前

生产者一台主机,消费者与broker集群在同一台物理机上,现在的错误进一步定位到,在生产时,获取meta信息为空,导致的抛出下面的异常:

org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for testTopic9-1
16/06/17 17:43:33 ERROR internals.RecordBatch: Error executing user-provided callback on message for topic-partition testTopic9-1:
org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for testTopic9-0
16/06/17 17:43:33 ERROR internals.RecordBatch: Error executing user-provided callback on message for topic-partition testTopic9-0:

部分代码如下

kafkaProducer.send(record,new Callback() { 
public void onCompletion(RecordMetadata metadata, Exception e) { 
        if(null != e) { 
            System.out.println("metadata信息 >>>>  " + metadata);
            e.printStackTrace();
           System.out.println("发生异常: the offset of the send record is {}" +  metadata.offset()); 
           //记录异常日志

        } 
        System.out.println("生产了数据:" +  metadata.offset());
    } 
});
sunny -> 無名 8年前

正确配置应该是什么样的?

無名 -> sunny 8年前

你这就是生产者调不通啊,https://www.orchome.com/141 这篇文章看下。

sunny -> 無名 8年前

补充,异常信息 LEADER_NOT_AVAILABLE

無名 -> sunny 8年前

先根据我给你的地址,先检查一遍,保证服务器和应用机之间的配置正确。每一步都要验证!

sunny -> 無名 8年前

好的,谢谢,是生产者不通。生产一会就不行了

你的答案

查看kafka相关的其他问题或提一个您自己的问题