3个broker 都添加了如下配置: message.max.bytes=4000000replica.fetch.max.bytes=4194304max.partition.fetch.bytes=4194304
你发送的时候,在结束进程前,加个休眠试试。
我加了 不行3M就是不行 换成小的就可以
你配置的没有错,Kafka集群所有节点都设置了吗?
都设置了,刚刚还测出个神奇的问题,我把大小降到了800K左右,发了10条,不压缩只有2条左右能发过去,压缩能发过去四五条。我觉得还是有地方配置不对,就是不知道在哪
这是生产者的配置
# 可用性配置
kafka.config.acks=all
# 重试次数
kafka.config.retries=3
# 批次大小
kafka.config.batchSize=4000000
# 消息延迟发送的毫秒数
kafka.config.lingerMs=1
# 生产者等待发送到kafka的消息队列占用内容的大小
kafka.config.bufferMemory=33554432
# 实现Serializer接口的序列化类键
kafka.config.keySerializerClass=org.apache.kafka.common.serialization.StringSerializer
kafka.config.valueSerializerClass=org.apache.kafka.common.serialization.StringSerializer
你的客户端怎么写的
public KafkaProducerThread(String topic, boolean isAsync, List<String> msgs) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);// broker 集群地址 // properties.put(ProducerConfig.CLIENT_ID_CONFIG, topic + "Producer");// // 自定义客户端id properties.put(ProducerConfig.ACKS_CONFIG, acks); properties.put(ProducerConfig.RETRIES_CONFIG, retries); properties.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); properties.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs); properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);// key 序列化方式 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);// value 序列化方式 this.producer = new KafkaProducer<String, String>(properties); this.topic = topic; this.isAsync = isAsync; this.msgs = msgs; log.info("初始化kafka连接..."); log.info("即将向topic:" + topic); log.info("写入数据" + msgs.size() + "条"); if (!"".equals(compressionTopics)) { List<String> topics = Arrays.asList(compressionTopics.split(",")); if (topics.contains(topic)) { properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType); } } } @Override public void run() { try { for (int i = 0; i < msgs.size(); i++) { TimeUnit.MILLISECONDS.sleep(1); String msg = msgs.get(i); String key = i + ""; if (isAsync) {// 异步 // producer.send(new ProducerRecord<String, String>(this.topic, msg)); producer.send(new ProducerRecord<String, String>(this.topic, key, msg)); } else {// 同步 producer.send(new ProducerRecord<String, String>(this.topic, key, msg), new MsgProducerCallback(System.currentTimeMillis(), key, msg)); } TimeUnit.MILLISECONDS.sleep(1); } } catch (Exception e) { e.printStackTrace(); } log.info("写入数据结束."); }
log.info("写入数据结束.");
谢谢大神,刚才那个条数不对的问题解决了,但是3M的还是入不进去 T.T
还不进。。。
这样,看看有没有异常。
producer.send(new ProducerRecord<String, String>(this.topic, key, msg)).get();
The message is 3376156 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. 终于报错了。
看到错误提示了吧,让你调整max.request.size
可以了 谢谢大神
你的答案