kafka单条消息3.22M,按照网上的解答修改了broker的配置,但还是发不出去,也不报错

l. 发表于: 2018-01-26   最后更新时间: 2018-01-26  
  •   21 订阅,178 游览

3个broker 都添加了如下配置:

message.max.bytes=4000000
replica.fetch.max.bytes=4194304
max.partition.fetch.bytes=4194304







发表于: 28天前   最后更新时间: 28天前   游览量:178
上一条: kafka 发送消息send报错,无法连接
下一条: 无法远程连接kafka写producer

评论…


  • 你发送的时候,在结束进程前,加个休眠试试。
    • 都设置了,刚刚还测出个神奇的问题,我把大小降到了800K左右,发了10条,不压缩只有2条左右能发过去,压缩能发过去四五条。我觉得还是有地方配置不对,就是不知道在哪
        • 这是生产者的配置
          # 可用性配置
          kafka.config.acks=all
          # 重试次数
          kafka.config.retries=3
          # 批次大小
          kafka.config.batchSize=4000000
          # 消息延迟发送的毫秒数
          kafka.config.lingerMs=1
          # 生产者等待发送到kafka的消息队列占用内容的大小
          kafka.config.bufferMemory=33554432
          # 实现Serializer接口的序列化类键
          kafka.config.keySerializerClass=org.apache.kafka.common.serialization.StringSerializer
          kafka.config.valueSerializerClass=org.apache.kafka.common.serialization.StringSerializer
            • public KafkaProducerThread(String topic, boolean isAsync, List<String> msgs) {
                Properties properties = new Properties();
                properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);// broker 集群地址
                // properties.put(ProducerConfig.CLIENT_ID_CONFIG, topic + "Producer");//
                // 自定义客户端id
                properties.put(ProducerConfig.ACKS_CONFIG, acks);
                properties.put(ProducerConfig.RETRIES_CONFIG, retries);
                properties.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
                properties.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
                properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
                properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);// key 序列化方式
                properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);// value 序列化方式

                this.producer = new KafkaProducer<String, String>(properties);
                this.topic = topic;
                this.isAsync = isAsync;
                this.msgs = msgs;
                log.info("初始化kafka连接...");
                log.info("即将向topic:" + topic);
                log.info("写入数据" + msgs.size() + "条");
                if (!"".equals(compressionTopics)) {
                 List<String> topics = Arrays.asList(compressionTopics.split(","));
                 if (topics.contains(topic)) {
                  properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType);
                 }
                }
               }

               @Override
               public void run() {
                try {
                 for (int i = 0; i < msgs.size(); i++) {
                  TimeUnit.MILLISECONDS.sleep(1);
                  String msg = msgs.get(i);
                  String key = i + "";
                  if (isAsync) {// 异步
                   // producer.send(new ProducerRecord<String, String>(this.topic, msg));
                   producer.send(new ProducerRecord<String, String>(this.topic, key, msg));
                  } else {// 同步
                   producer.send(new ProducerRecord<String, String>(this.topic, key, msg),
                     new MsgProducerCallback(System.currentTimeMillis(), key, msg));
                  }
                  TimeUnit.MILLISECONDS.sleep(1);
                 }
                } catch (Exception e) {
                 e.printStackTrace();
                }
                log.info("写入数据结束.");
               }
                • The message is 3376156 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.  终于报错了。
                  • 评论…
                    • in this conversation
                      提问