OrcHome OrcHome
OrcHome个人中心.
l. 修改资料 更换头像
关注(0) 粉丝(0) 积分(0)
关注

暂无关注.....

粉丝

暂无粉丝.....


l. kafka stream 有没有办法将一条消息拆成多条 发表评论:
蠢了,分词不就这样的么,请大佬忽视这个。
15天前
发表了 kafka stream 有没有办法将一条消息拆成多条

15天前
发表了 请教下kafka Stream的聚合怎么用(kafka0.10.2.0)?

17天前
l. kafka-stream运行报错 发表评论:
我用的kafka版本是0.10.2.0
20天前
l. kafka-stream运行报错 发表评论:
求教大神,这个怎么解决啊
20天前
发表了 kafka-stream运行报错

20天前
l. 回复 半兽人 kafka单条消息3.22M,按照网上的解答修改了broker的配置,但还是发不出去,也不报错 中:
可以了  谢谢大神
21天前
半兽人 回复 l. kafka单条消息3.22M,按照网上的解答修改了broker的配置,但还是发不出去,也不报错 中:
看到错误提示了吧,让你调整max.request.size
24天前
l. 回复 半兽人 kafka单条消息3.22M,按照网上的解答修改了broker的配置,但还是发不出去,也不报错 中:
The message is 3376156 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.  终于报错了。
24天前
半兽人 回复 l. kafka单条消息3.22M,按照网上的解答修改了broker的配置,但还是发不出去,也不报错 中:
还不进。。。
这样,看看有没有异常。
producer.send(new ProducerRecord<String, String>(this.topic, key, msg)).get();
24天前
l. 回复 半兽人 kafka单条消息3.22M,按照网上的解答修改了broker的配置,但还是发不出去,也不报错 中:
谢谢大神,刚才那个条数不对的问题解决了,但是3M的还是入不进去 T.T
24天前
半兽人 回复 l. kafka单条消息3.22M,按照网上的解答修改了broker的配置,但还是发不出去,也不报错 中:
producer.close();
log.info("写入数据结束.");

在结束前,加上这个。

24天前
l. 回复 半兽人 kafka单条消息3.22M,按照网上的解答修改了broker的配置,但还是发不出去,也不报错 中:
public KafkaProducerThread(String topic, boolean isAsync, List<String> msgs) {
  Properties properties = new Properties();
  properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);// broker 集群地址
  // properties.put(ProducerConfig.CLIENT_ID_CONFIG, topic + "Producer");//
  // 自定义客户端id
  properties.put(ProducerConfig.ACKS_CONFIG, acks);
  properties.put(ProducerConfig.RETRIES_CONFIG, retries);
  properties.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
  properties.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
  properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
  properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);// key 序列化方式
  properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);// value 序列化方式

  this.producer = new KafkaProducer<String, String>(properties);
  this.topic = topic;
  this.isAsync = isAsync;
  this.msgs = msgs;
  log.info("初始化kafka连接...");
  log.info("即将向topic:" + topic);
  log.info("写入数据" + msgs.size() + "条");
  if (!"".equals(compressionTopics)) {
   List<String> topics = Arrays.asList(compressionTopics.split(","));
   if (topics.contains(topic)) {
    properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType);
   }
  }
 }

 @Override
 public void run() {
  try {
   for (int i = 0; i < msgs.size(); i++) {
    TimeUnit.MILLISECONDS.sleep(1);
    String msg = msgs.get(i);
    String key = i + "";
    if (isAsync) {// 异步
     // producer.send(new ProducerRecord<String, String>(this.topic, msg));
     producer.send(new ProducerRecord<String, String>(this.topic, key, msg));
    } else {// 同步
     producer.send(new ProducerRecord<String, String>(this.topic, key, msg),
       new MsgProducerCallback(System.currentTimeMillis(), key, msg));
    }
    TimeUnit.MILLISECONDS.sleep(1);
   }
  } catch (Exception e) {
   e.printStackTrace();
  }
  log.info("写入数据结束.");
 }

25天前
半兽人 回复 l. kafka单条消息3.22M,按照网上的解答修改了broker的配置,但还是发不出去,也不报错 中:
你的客户端怎么写的
25天前
l. 回复 半兽人 kafka单条消息3.22M,按照网上的解答修改了broker的配置,但还是发不出去,也不报错 中:
这是生产者的配置
# 可用性配置
kafka.config.acks=all
# 重试次数
kafka.config.retries=3
# 批次大小
kafka.config.batchSize=4000000
# 消息延迟发送的毫秒数
kafka.config.lingerMs=1
# 生产者等待发送到kafka的消息队列占用内容的大小
kafka.config.bufferMemory=33554432
# 实现Serializer接口的序列化类键
kafka.config.keySerializerClass=org.apache.kafka.common.serialization.StringSerializer
kafka.config.valueSerializerClass=org.apache.kafka.common.serialization.StringSerializer

25天前
l. 回复 半兽人 kafka单条消息3.22M,按照网上的解答修改了broker的配置,但还是发不出去,也不报错 中:
都设置了,刚刚还测出个神奇的问题,我把大小降到了800K左右,发了10条,不压缩只有2条左右能发过去,压缩能发过去四五条。我觉得还是有地方配置不对,就是不知道在哪
25天前
半兽人 回复 l. kafka单条消息3.22M,按照网上的解答修改了broker的配置,但还是发不出去,也不报错 中:
你配置的没有错,Kafka集群所有节点都设置了吗?
25天前
l. 回复 半兽人 kafka单条消息3.22M,按照网上的解答修改了broker的配置,但还是发不出去,也不报错 中:
我加了  不行3M就是不行  换成小的就可以
25天前
发表了 kafka单条消息3.22M,按照网上的解答修改了broker的配置,但还是发不出去,也不报错

25天前
l. 回复 半兽人 kafka能支持多大的producer并发发送消息,跟那些参数有关? 中:
大神  能加你QQ么   想请教你一些问题
1月前
半兽人 回复 l. kafka能支持多大的producer并发发送消息,跟那些参数有关? 中:
真实场景不就是这样么。跟服务器设置的openfile,kakfa的内存数等有关。
1月前
l. 回复 半兽人 kafka能支持多大的producer并发发送消息,跟那些参数有关? 中:
我是单机上启的1万个线程。
如果每个连接都是长连接,服务端这边应该会受到socket端口个数限制吧
1月前
半兽人 回复 l. kafka能支持多大的producer并发发送消息,跟那些参数有关? 中:
你的测试很可能不贴合实际的环境,1万个线程是共用一个物理连接,还是1万个物理连接?
每个服务器的吞吐量多少,你建了多少个分区?

1月前
l. 回复 半兽人 kafka能支持多大的producer并发发送消息,跟那些参数有关? 中:
我尝试过将num.network.threads、socket.receive.buffer.bytes、socket.request.max.bytes
、socket.send.buffer.bytes这些参数调大,但都没什么用  

1月前
发表了 kafka能支持多大的producer并发发送消息,跟那些参数有关?

1月前

l. kafka stream 有没有办法将一条消息拆成多条 发表评论:
蠢了,分词不就这样的么,请大佬忽视这个。
15天前
l. kafka-stream运行报错 发表评论:
我用的kafka版本是0.10.2.0
20天前
l. kafka-stream运行报错 发表评论:
求教大神,这个怎么解决啊
20天前
l. 回复 半兽人 kafka单条消息3.22M,按照网上的解答修改了broker的配置,但还是发不出去,也不报错 评论:
可以了  谢谢大神
21天前
半兽人 回复 l. kafka单条消息3.22M,按照网上的解答修改了broker的配置,但还是发不出去,也不报错 评论:
看到错误提示了吧,让你调整max.request.size
24天前
l. 回复 半兽人 kafka单条消息3.22M,按照网上的解答修改了broker的配置,但还是发不出去,也不报错 评论:
The message is 3376156 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.  终于报错了。
24天前
半兽人 回复 l. kafka单条消息3.22M,按照网上的解答修改了broker的配置,但还是发不出去,也不报错 评论:
还不进。。。
这样,看看有没有异常。
producer.send(new ProducerRecord<String, String>(this.topic, key, msg)).get();
24天前
l. 回复 半兽人 kafka单条消息3.22M,按照网上的解答修改了broker的配置,但还是发不出去,也不报错 评论:
谢谢大神,刚才那个条数不对的问题解决了,但是3M的还是入不进去 T.T
24天前
半兽人 回复 l. kafka单条消息3.22M,按照网上的解答修改了broker的配置,但还是发不出去,也不报错 评论:
producer.close();
log.info("写入数据结束.");

在结束前,加上这个。

24天前
l. 回复 半兽人 kafka单条消息3.22M,按照网上的解答修改了broker的配置,但还是发不出去,也不报错 评论:
public KafkaProducerThread(String topic, boolean isAsync, List<String> msgs) {
  Properties properties = new Properties();
  properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);// broker 集群地址
  // properties.put(ProducerConfig.CLIENT_ID_CONFIG, topic + "Producer");//
  // 自定义客户端id
  properties.put(ProducerConfig.ACKS_CONFIG, acks);
  properties.put(ProducerConfig.RETRIES_CONFIG, retries);
  properties.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
  properties.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
  properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
  properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);// key 序列化方式
  properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);// value 序列化方式

  this.producer = new KafkaProducer<String, String>(properties);
  this.topic = topic;
  this.isAsync = isAsync;
  this.msgs = msgs;
  log.info("初始化kafka连接...");
  log.info("即将向topic:" + topic);
  log.info("写入数据" + msgs.size() + "条");
  if (!"".equals(compressionTopics)) {
   List<String> topics = Arrays.asList(compressionTopics.split(","));
   if (topics.contains(topic)) {
    properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType);
   }
  }
 }

 @Override
 public void run() {
  try {
   for (int i = 0; i < msgs.size(); i++) {
    TimeUnit.MILLISECONDS.sleep(1);
    String msg = msgs.get(i);
    String key = i + "";
    if (isAsync) {// 异步
     // producer.send(new ProducerRecord<String, String>(this.topic, msg));
     producer.send(new ProducerRecord<String, String>(this.topic, key, msg));
    } else {// 同步
     producer.send(new ProducerRecord<String, String>(this.topic, key, msg),
       new MsgProducerCallback(System.currentTimeMillis(), key, msg));
    }
    TimeUnit.MILLISECONDS.sleep(1);
   }
  } catch (Exception e) {
   e.printStackTrace();
  }
  log.info("写入数据结束.");
 }

25天前
半兽人 回复 l. kafka单条消息3.22M,按照网上的解答修改了broker的配置,但还是发不出去,也不报错 评论:
你的客户端怎么写的
25天前
l. 回复 半兽人 kafka单条消息3.22M,按照网上的解答修改了broker的配置,但还是发不出去,也不报错 评论:
这是生产者的配置
# 可用性配置
kafka.config.acks=all
# 重试次数
kafka.config.retries=3
# 批次大小
kafka.config.batchSize=4000000
# 消息延迟发送的毫秒数
kafka.config.lingerMs=1
# 生产者等待发送到kafka的消息队列占用内容的大小
kafka.config.bufferMemory=33554432
# 实现Serializer接口的序列化类键
kafka.config.keySerializerClass=org.apache.kafka.common.serialization.StringSerializer
kafka.config.valueSerializerClass=org.apache.kafka.common.serialization.StringSerializer

25天前
l. 回复 半兽人 kafka单条消息3.22M,按照网上的解答修改了broker的配置,但还是发不出去,也不报错 评论:
都设置了,刚刚还测出个神奇的问题,我把大小降到了800K左右,发了10条,不压缩只有2条左右能发过去,压缩能发过去四五条。我觉得还是有地方配置不对,就是不知道在哪
25天前
半兽人 回复 l. kafka单条消息3.22M,按照网上的解答修改了broker的配置,但还是发不出去,也不报错 评论:
你配置的没有错,Kafka集群所有节点都设置了吗?
25天前
l. 回复 半兽人 kafka单条消息3.22M,按照网上的解答修改了broker的配置,但还是发不出去,也不报错 评论:
我加了  不行3M就是不行  换成小的就可以
25天前
l. 回复 半兽人 kafka能支持多大的producer并发发送消息,跟那些参数有关? 评论:
大神  能加你QQ么   想请教你一些问题
1月前
半兽人 回复 l. kafka能支持多大的producer并发发送消息,跟那些参数有关? 评论:
真实场景不就是这样么。跟服务器设置的openfile,kakfa的内存数等有关。
1月前
l. 回复 半兽人 kafka能支持多大的producer并发发送消息,跟那些参数有关? 评论:
我是单机上启的1万个线程。
如果每个连接都是长连接,服务端这边应该会受到socket端口个数限制吧
1月前
半兽人 回复 l. kafka能支持多大的producer并发发送消息,跟那些参数有关? 评论:
你的测试很可能不贴合实际的环境,1万个线程是共用一个物理连接,还是1万个物理连接?
每个服务器的吞吐量多少,你建了多少个分区?

1月前
l. 回复 半兽人 kafka能支持多大的producer并发发送消息,跟那些参数有关? 评论:
我尝试过将num.network.threads、socket.receive.buffer.bytes、socket.request.max.bytes
、socket.send.buffer.bytes这些参数调大,但都没什么用  

1月前