kafka单条消息3.22M,按照网上的解答修改了broker的配置,但还是发不出去,也不报错

l. 发表于: 2018-01-26   最后更新时间: 2018-01-26 18:19:50   4,597 游览

3个broker 都添加了如下配置:

message.max.bytes=4000000
replica.fetch.max.bytes=4194304
max.partition.fetch.bytes=4194304

发表于 2018-01-26
l.
添加评论

你发送的时候,在结束进程前,加个休眠试试。

l. -> 半兽人 6年前

我加了  不行3M就是不行  换成小的就可以

半兽人 -> l. 6年前

你配置的没有错,Kafka集群所有节点都设置了吗?

l. -> 半兽人 6年前

都设置了,刚刚还测出个神奇的问题,我把大小降到了800K左右,发了10条,不压缩只有2条左右能发过去,压缩能发过去四五条。我觉得还是有地方配置不对,就是不知道在哪

l. -> 半兽人 6年前

这是生产者的配置
# 可用性配置
kafka.config.acks=all
# 重试次数
kafka.config.retries=3
# 批次大小
kafka.config.batchSize=4000000
# 消息延迟发送的毫秒数
kafka.config.lingerMs=1
# 生产者等待发送到kafka的消息队列占用内容的大小
kafka.config.bufferMemory=33554432
# 实现Serializer接口的序列化类键
kafka.config.keySerializerClass=org.apache.kafka.common.serialization.StringSerializer
kafka.config.valueSerializerClass=org.apache.kafka.common.serialization.StringSerializer

半兽人 -> l. 6年前

你的客户端怎么写的

l. -> 半兽人 6年前
public KafkaProducerThread(String topic, boolean isAsync, List<String> msgs) {

  Properties properties = new Properties();
  properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);// broker 集群地址
  // properties.put(ProducerConfig.CLIENT_ID_CONFIG, topic + "Producer");//
  // 自定义客户端id
  properties.put(ProducerConfig.ACKS_CONFIG, acks);
  properties.put(ProducerConfig.RETRIES_CONFIG, retries);
  properties.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
  properties.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
  properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
  properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);// key 序列化方式
  properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);// value 序列化方式

  this.producer = new KafkaProducer<String, String>(properties);
  this.topic = topic;
  this.isAsync = isAsync;
  this.msgs = msgs;
  log.info("初始化kafka连接...");
  log.info("即将向topic:" + topic);
  log.info("写入数据" + msgs.size() + "条");
  if (!"".equals(compressionTopics)) {
   List<String> topics = Arrays.asList(compressionTopics.split(","));
   if (topics.contains(topic)) {
    properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType);
   }
  }
 }

@Override
public void run() {
 try {
  for (int i = 0; i < msgs.size(); i++) {
   TimeUnit.MILLISECONDS.sleep(1);
   String msg = msgs.get(i);
   String key = i + "";
   if (isAsync) {// 异步
    // producer.send(new ProducerRecord<String, String>(this.topic, msg));
    producer.send(new ProducerRecord<String, String>(this.topic, key, msg));
   } else {// 同步
    producer.send(new ProducerRecord<String, String>(this.topic, key, msg),
      new MsgProducerCallback(System.currentTimeMillis(), key, msg));
   }
   TimeUnit.MILLISECONDS.sleep(1);
  }
 } catch (Exception e) {
  e.printStackTrace();
 }
 log.info("写入数据结束.");
}
半兽人 -> l. 6年前
producer.close();
log.info("写入数据结束.");

在结束前,加上这个。
l. -> 半兽人 6年前

谢谢大神,刚才那个条数不对的问题解决了,但是3M的还是入不进去 T.T

半兽人 -> l. 6年前

还不进。。。
这样,看看有没有异常。
producer.send(new ProducerRecord<String, String>(this.topic, key, msg)).get();

l. -> 半兽人 6年前

The message is 3376156 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.  终于报错了。

半兽人 -> l. 6年前

看到错误提示了吧,让你调整max.request.size

l. -> 半兽人 6年前

可以了  谢谢大神

你的答案

查看kafka相关的其他问题或提一个您自己的问题