如题, 在注重吞吐量的场景, 如何提高发送消息的速度?
我目前的做法是,打算将ack
设置为1, 设大以下配置参数的值linger.ms
、batch.size
、buffer.memory
、 max.in.flight.requests.per.connection
。
以及代码中处理逻辑如下:
// 多线程中每个线程都如此发送
List<Foo> fooList = getFooList();
for (List<Foo> partitionFooList : Lists.partition(fooList, 50)) {
List<Future<RecordMetadata> futureList = new ArrayList<>();
for (Foo foo : partitionFooList) {
ProducerRecord<String, Foo> record = new ProducerRecord<>(topic, foo);
Future<RecordMetadata> future = kafkaProducer.send(record);
futureList.add(future);
}
try {
for (Futrue<RecordMetadata> future : futureList) {
future.get();
}
} catch (InterruptedException | ExecutionException e) {
log.error("error while send message for foo", e);
}
}
这是我目前的做法, 请问你有什么更好的解决办法吗? 以及linger.ms
、batch.size
、buffer.memory
、 max.in.flight.requests.per.connection
增大到多少合适,有什么好的度量或者考察的方法吗?
你这些调整确实是针对吞吐进行提高的。
但是,从你的程序看下来,其实你是阻塞式的提交(
get()
),等待消息结果通知,性能会大大压缩,ack=1
在副本有多个的情况下,会提高结果的。你发现没有,你的程序和kafka集群其实只有一个连接,你所有的都共用这一个。
kafkaProducer.send
,由它来统一批次消息,缓存消息,然后发送给kafka的。
当然,一个通道并不意味着慢,如果你纯异步提交的话,已经可以达到非常高的吞吐了(接近带宽和kafka),如果你的机器和网络特别强劲,调整你提供的参数,加大批次数量,也可提升性能。
参考:
https://www.orchome.com/42
https://www.orchome.com/303#item-4
你的意思我总结下来主要是以下这一点:
1)取消阻塞? 我之所以使用
get()
是因为想提高消息发送可靠性,这一点你有什么更好的办法吗?可是如果不这么做的话,我发送失败自身都是无感知的。
https://www.orchome.com/303#item-4
这里有异步callback的方式。
我想请教一下,发送相同数目的消息, 异步回调的方式会比同步发送的方式快很多吗? 有没有相关的基准测试。
5-10倍的差距。
我现在测试环境采用异步回调的方式发送消息,多线程发送,每个线程需要花7~8 s才能发送1000个消息,这是不是算发送消息算慢的?
我单个消息一般在5KB~10KB消息之间, 起了8个线程去发送消息
慢,不过跟你的消息大小有关。
你测试的方式不对吧,你先用默认生产者发送,1个线程,写个轮询,测试一下。
起多线程我上面已经说过了,其实发送的时候,还是共用一个
kafkaProducer
。你要有对比。
你的意思是开多个生产者去发送吗?
调了一下午的参数,kafka都快不起来 T T。
没那么玄幻的,默认参数已经满足99%的场景了。
你先去测试异步和同步,加上你说的开多个生产者(但是我记得多个通道,效果其实不明显)。
最后在找一个场景去调优(你已经确定同步还是异步之后)。
增大了
batch.size
之后,快很多, 但是Failed to allocate memory within the configured max blocking time 60000 ms. 出现异常以下是我的生产者配置参数:
properties: linger.ms: 30 max.in.flight.requests: 10 buffer.memory: 536870912 batch.size: 1048576
我试过默认参数,发送消息还是很慢。
你的答案