您好!
我有一个问题想问一下,生产者在执行producer = new KafkaProducer<>(props);这一段代码的时候并不建立连接?在第一次执行send方法的时候才去建立连接呢?
而且不是说send方法是异步的吗?我有一次将kafka地址配置错误了,在执行send方法前一直没有报错,在第一次执行send方法的时候程序会阻塞在那里,一直到60s后,会报一个发送超时的错误。代码如下:
props.put("bootstrap.servers", getUrl());
// leader收到的来自所有follower复制成功的确认数量,acks=0时只要消息被立即发送到socket buffer中,就认为发送成功,retries(失败重试) 配置也会失效
props.put("acks", "0");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
LOG.info("IOT-KAFKA建立连接@" + getUrl());
发送代码:
producer.send(new ProducerRecord<>(TOPIC, reqData), this);
回调:
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
LOG.info("send kafka error " + exception.getMessage());
} else {
LOG.info("send to " + metadata.topic() + " successed ");
}
}
send只是正常的发送报错。不是在send才创建的连接,你可以把debug日志打开,把send代码注销掉。加上休眠,即可看到连接kafka的相关日志。
我也是遇到同样的问题
你报的啥错?
你的答案