关于前段时间提问的“kafka 生产消息报错 RecordTooLargeException”这个问题,最近找到了答案,之所以怎么更改都会报错,是因为kafka connector里除了produce source record之外,还会有三个topic:
- config.storage.topic
- offset.storage.topic
- status.storage.topic
之前的问题实际是offset.storage.topic
里消息过大,抛出的异常,所以producer.override.max.request.size
配置了,也没有用。
那么问题来了,如何更改这三个topic的producer config,看了源码,关于max.request.size
的配置,是没有显式配置的,那么用的就是producer的默认配置,所以改kafka broker端producer.properties就行了吗?有没有其他方式?
请问下大佬,生效是
producer.
或者consumer.
(原生配置生效吗)? 加override的作用是什么 ?我看了官网的解释,但不太明白,我的没有生效。https://docs.confluent.io/platform/current/connect/references/allconfigs.html#override-the-worker-configuration
已解决
只要没写错,配置是有效的。
感谢回复,问题已解决
kafka服务端核心的2个上限配置:
replica.socket.receive.buffer.bytes
message.max.bytes
配置来自:Kafka Broker配置
第一个没改过,不知道改什么值合适,第二个早就改了. 看报错的log,明确表示是
max.request.size
值设置的过小。java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 3175237 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration. at org.apache.kafka.connect.storage.KafkaOffsetBackingStore$SetCallbackFuture.get(KafkaOffsetBackingStore.java:228) at org.apache.kafka.connect.storage.KafkaOffsetBackingStore$SetCallbackFuture.get(KafkaOffsetBackingStore.java:161) at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:498) at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:113) at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:47) at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:86) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
看kafka connector的源码。
org.apache.kafka.connect.storage.KafkaOffsetBackingStore
,在new 一个新的producer时,没有配置 max.request.size,所以用的是默认值1048576,导致producer 大于1048576时,会报错. 现在就是不知道,如何修改这个值的默认值。这不又回到上个问题了。
你在
connect-distributed.properties
中配置:producer.max.request.size=15728640
应该是生效的呀。
配置了,但只对于task生产source record时生效,在connect.log里可以看到两种producer config的log,一种是刚起connect service时,是默认值,一种是task启动时,是想要的值。
是不是之前多了
override
,之前的一直没有生效。producer.override.max.request.size
哈哈哈,我两个都配了,头都炸了
那现在好了吗?
没有,下午就测了,不行了 根据code来看,得改producer 的那个配置的默认值才行了
不可思议,你把启动命令贴一下,直接设置
max.request.size=15728640
也试试。
connect-distributed.properties
里设置了connector.client.config.override.policy=All producer.max.request.size=104857600 producer.override.max.request.size=104857600
不是,主要是task的producer生效了
根据log来看,是可以了,不知道是哪个参数生效的,我得一个个排除 :)
producer.max.request.size=104857600 producer.override.max.request.size=104857600 max.request.size=104857600
坐等你最终的结果。
结论:
max.request.size=104857600
是connector内部topic的prodcuer的配置.producer.max.request.size=104857600
是connector source record 的prodcuer的配置.你的答案