我现在要使用Java向kafka发送1w条消息,创建一个producer应该使用一个单例的还是说每循环一次就创建一个新的producer对象呢?
for(Map<String, Object> url : allCrawlerUrls){
Producer<String, Object> instance = null;
try {
instance = rzxKafkaProducer.getInstance();
JSONObject jsonObject = JSONObject.parseObject(JSON.toJSONString(url));
jsonObject.put("topic", topic);
jsonObject.put("merchantId", merchantId);
jsonObject.put("putDate", StringUtil.getStringTime());
jsonObject.put("ttId", taskId);
ProducerRecord<String, Object> record = new ProducerRecord<>(topic, jsonObject.toJSONString());
instance.send(record, new RzxProducerCallback((Integer) url.get("id")));
} finally {
instance.close();
}
}
我现在是使用这样的循环方式向kafka发送消息(每次会创建一个新的producer对象),
这样的话就回有一个问题,我使用netstat -an
查询TCP连接时有大量的Time_Wait连接,下面这样
TCP 172.30.128.69:59257 172.31.124.236:9092 TIME_WAIT
TCP 172.30.128.69:59298 172.31.124.236:9092 TIME_WAIT
TCP 172.30.128.69:59299 172.31.124.236:9092 TIME_WAIT
TCP 172.30.128.69:59302 172.31.124.236:9092 TIME_WAIT
TCP 172.30.128.69:59303 172.31.124.236:9092 TIME_WAIT
TCP 172.30.128.69:59306 172.31.124.236:9092 TIME_WAIT
TCP 172.30.128.69:59307 172.31.124.236:9092 TIME_WAIT
TCP 172.30.128.69:59310 172.31.124.236:9092 TIME_WAIT
这样有问题吗?
但是当我将producer改为单例对象时,在发送消息30秒后,回调函数会报错,报错信息为:30009 ms has passed since last append,我在网上查询说是配置文件的问题,这个我查看了配置文件也没有问题,进行了如下设置:
listeners=PLAINTEXT://172.31.124.236:9092
host.name=172.31.124.236
请问大神,应该使用哪一种的Producer创建方式呢?
1.不要每次new一个producer。
2.先在send后面加个休眠。
你好,今天我试着在send后休眠1毫秒,但是依旧会报超时的异常,是我休眠的时间太少了吗?
你循环里面写了close,是不是导致你单producer的问题在这。
在之前使用单例的producer时,没有进行close操作
如果还是有问题,那你拿官网的代码跑一把(ps:我不清楚每个人都做了哪些改动)。
另外,你挂个消费者命令看看消息的条数,丢失了多少条。
好的,谢谢大神。如果使用单例producer是不是就不用关心close操作了?程序停止的时候producer就会自动关闭。是这样吗?
嗯,注意要最后结束的时候掉下close(官方例子有,不要漏了)。
如果不想调close,就加一些休眠时间。(因为有些消息是还在缓存中,没来得及发送到kafka的时候,进程就结束了)
你的答案