我使用 Java Producer API 向 Kafka V. 0.8 发送字符串消息。
如果消息大小约为 15 MB,我会收到一个 MessageSizeTooLargeException
异常。
我试图将 message.max.bytes
设置为 40 MB,但我仍然遇到异常。 小消息工作没有问题。
(异常出现在生产者中,我在这个应用程序中没有消费者。)
我怎么样才能摆脱这个异常?
我的生产者配置
private ProducerConfig kafkaConfig() {
Properties props = new Properties();
props.put("metadata.broker.list", BROKERS);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
props.put("message.max.bytes", "" + 1024 * 1024 * 40);
return new ProducerConfig(props);
}
错误日志:
4709 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to send requests for topics datasift with correlation ids in [213,224]
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
你需要调整三个(或四个)属性:
消费者端:
fetch.message.max.bytes
- 这将决定消费者可以获取的最大消息大小。Broker方面:
replica.fetch.max.byte
- 这将允许kafka之中的副本在集群内发送消息,并确保消息被正确复制。如果这个值太小,那么消息将永远不会被复制,因此,消费者将永远不会看到消息,因为消息永远不会被提交(完全复制)。Broker方面:
message.max.bytes
- 这是broker可以从生产者接收的最大消息大小。Broker方面 (每个topic):
max.message.bytes
- 这是broker允许附加到主题上的最大的消息大小。这个大小在压缩前是有效的。(默认是broker的message.max.byte
。)第2点比较难发现,因为你不会从Kafka那里得到任何异常、信息或警告,所以当你发送大的信息时,一定要考虑到这一点。
你的答案