1、kafka生产者等待5分钟发送消息,远程主机会关闭连接
2、kafka版本3.1.0,部署环境用的阿里云服务器
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
Properties props = new Properties();
props.put("bootstrap.servers", "ip地址");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-DD-mm dd:HH:ss");
long current = System.currentTimeMillis();
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 3; i++) {
producer.send(new ProducerRecord<String, String>("alg-task-results", "0416", sdf.format(new Date()))).get(30, TimeUnit.SECONDS);
logger.info("发送第" + i + "个消息,花费" + (System.currentTimeMillis() - current) + "ms");
current = System.currentTimeMillis();
}
for (int i = 0; i < 10; i++) {
logger.info("等待第" + i + "次睡眠30s");
Thread.sleep(30 * 1000L);
}
current = System.currentTimeMillis();
logger.info("开始第二次发送");
for (int i = 0; i < 10; i++) {
long start = 0;
if(i == 0){
start = System.currentTimeMillis();
}
// 如果时间设置10秒,会报错超时,设置30秒会有报错,远程主机关闭
producer.send(new ProducerRecord<String, String>("alg-task-results", "0416", sdf.format(new Date()))).get(10, TimeUnit.SECONDS);
logger.info("发送第" + i + "个消息,花费" + (System.currentTimeMillis() - current) + "ms");
current = System.currentTimeMillis();
long end = 0;
if(i == 0){
end = System.currentTimeMillis();
System.err.println(end-start);
}
}
producer.close();
}
4、如果等待5分钟,再次发送消息,报错如下:
14:24:15.372 kafkaTest [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=12, clientId=producer-1, correlationId=6) and timeout 30000 to node 0: MetadataRequestData(topics=[MetadataRequestTopic(topicId=AAAAAAAAAAAAAAAAAAAAAA, name='alg-task-results')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false)
14:24:15.593 kafkaTest [main] INFO Main - 开始第二次发送
14:24:15.594 kafkaTest [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Sending PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=7) and timeout 30000 to node 0: {acks=-1,timeout=30000,partitionSizes=[alg-task-results-2=92]}
Exception in thread "main" java.util.concurrent.TimeoutException: Timeout after waiting for 10000 ms.
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:76)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
at Main.main(Main.java:53)
5、如果等待5分钟,修改超时时间为30秒,报错如下:
java.io.IOException: 远程主机强迫关闭了一个现有的连接。
at java.base/sun.nio.ch.SocketDispatcher.read0(Native Method)
at java.base/sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276)
at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:245)
at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:358)
at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:95)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:452)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:402)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576)
at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
at java.base/java.lang.Thread.run(Thread.java:834)
怀疑是阿里云的防火墙,主动断开的连接。
可以看看kafka的日志,看看断开的相关信息。
max.poll.interval.ms:单次poll()操作后可以执行的最长时间,或者poll()调用之间的最大延迟,如果在这个延迟时间之内未收到下一次poll()操作,将认为客户端已失败,从而触发Rebalance操作,默认时间为5分钟。
你的答案