最近自己写生产者代码,实例化一个kafkaproducer,多个线程调用去发送消息,但是其会报堆栈溢出的错误,请问这个是什么原因?
错误如下:
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
at org.apache.kafka.common.header.internals.RecordHeaders.<init>(RecordHeaders.java:50)
at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:80)
at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:118)
at producer.KafkaProducerThread.<init>(KafkaProducerThread.java:37)
at producer.kafkaProducer417.sendMessage(kafkaProducer417.java:70)
at producer.kafkaProducer417.main(kafkaProducer417.java:81)
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
代码如下:
public static void sendMessage(String key, String value){
try{
executor.submit(new KafkaProducerThread(producer,key,value,config));
}catch(Exception e){
logger.error(e.getMessage());
}
public static void main(String[] args){
kafkaProducer417 test = new kafkaProducer417();
test.createTopic();
for(int i=0;i<10000000;i++){
test.sendMessage("data1","hello world");
}
}
public class KafkaProducerThread implements Runnable {
private Producer<String, String> producer = null;
private ProducerRecord<String, String> record = null;
public KafkaProducerThread(Producer<String, String> producer, String key, String value, KafkaProducerConfig config){}
public void run(){
try {
this.producer.send(this.record,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
logger.error(e.getMessage());
} else {
logger.info("The offset of the record we just sent is: " + metadata.offset());
}
}
});
}catch(KafkaException e){
logger.error("e.getMessage());
}
}
}
先增加jvm。
应该就是jvm内存的问题,谢谢谢谢
再想请问一个问题 错误:Expiring 6713 record(s) for topic1-1: 40006 ms has passed since last append是因为生产的速度太快吗?有什么建议的解决的方案吗?
描述一下你现在的吞吐量和集群情况。
现在单条消息1KB的时候,落地速度在5Mb/s左右。配置为一个topic,一个partition,batch_size是16Kb,acks = 1,三个生产者线程发送,集群是2台服务器,每台是16G内存,2T硬盘,千兆网,redhat6.7系统;但当我消息达到2Kb时会发送失败,报上面的错误,请问是什么原因呢?还有生产速度为什么会这么慢呢?
因为你2个集群,所以你的topic分区要大于2,依次提升分区数来测试性能。kafka客户端是以所对应的分区进行缓存批次发送的。这块可以提高性能。
kafka启动脚本
kafka-server-start.sh
中指定了kafka启动时需要的最小内存,默认为1Gexport KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
调大就好了。
你的答案