kafka生产者发送消息是创建一个producer应该使用一个单例的还是说每循环一次就创建一个新的producer对象呢

野心永恒 发表于: 2019-07-16   最后更新时间: 2019-07-16 21:51:44   5,190 游览

我现在要使用Java向kafka发送1w条消息,创建一个producer应该使用一个单例的还是说每循环一次就创建一个新的producer对象呢?

for(Map<String, Object> url : allCrawlerUrls){
    Producer<String, Object> instance = null;
    try {
         instance = rzxKafkaProducer.getInstance();
        JSONObject jsonObject = JSONObject.parseObject(JSON.toJSONString(url));
        jsonObject.put("topic", topic);
        jsonObject.put("merchantId", merchantId);
        jsonObject.put("putDate", StringUtil.getStringTime());
        jsonObject.put("ttId", taskId);
        ProducerRecord<String, Object> record = new ProducerRecord<>(topic, jsonObject.toJSONString());
        instance.send(record, new RzxProducerCallback((Integer) url.get("id")));
    } finally {
        instance.close();
    }
}

我现在是使用这样的循环方式向kafka发送消息(每次会创建一个新的producer对象),
这样的话就回有一个问题,我使用netstat -an查询TCP连接时有大量的Time_Wait连接,下面这样

  TCP    172.30.128.69:59257    172.31.124.236:9092    TIME_WAIT
  TCP    172.30.128.69:59298    172.31.124.236:9092    TIME_WAIT
  TCP    172.30.128.69:59299    172.31.124.236:9092    TIME_WAIT
  TCP    172.30.128.69:59302    172.31.124.236:9092    TIME_WAIT
  TCP    172.30.128.69:59303    172.31.124.236:9092    TIME_WAIT
  TCP    172.30.128.69:59306    172.31.124.236:9092    TIME_WAIT
  TCP    172.30.128.69:59307    172.31.124.236:9092    TIME_WAIT
  TCP    172.30.128.69:59310    172.31.124.236:9092    TIME_WAIT

这样有问题吗?

但是当我将producer改为单例对象时,在发送消息30秒后,回调函数会报错,报错信息为:30009 ms has passed since last append,我在网上查询说是配置文件的问题,这个我查看了配置文件也没有问题,进行了如下设置:

listeners=PLAINTEXT://172.31.124.236:9092
host.name=172.31.124.236

请问大神,应该使用哪一种的Producer创建方式呢?

发表于 2019-07-16
添加评论

1.不要每次new一个producer。
2.先在send后面加个休眠。

野心永恒 -> 半兽人 5年前

你好,今天我试着在send后休眠1毫秒,但是依旧会报超时的异常,是我休眠的时间太少了吗?

半兽人 -> 野心永恒 5年前

你循环里面写了close,是不是导致你单producer的问题在这。

野心永恒 -> 半兽人 5年前

在之前使用单例的producer时,没有进行close操作

半兽人 -> 野心永恒 5年前

如果还是有问题,那你拿官网的代码跑一把(ps:我不清楚每个人都做了哪些改动)。
另外,你挂个消费者命令看看消息的条数,丢失了多少条。

野心永恒 -> 半兽人 5年前

好的,谢谢大神。如果使用单例producer是不是就不用关心close操作了?程序停止的时候producer就会自动关闭。是这样吗?

半兽人 -> 野心永恒 5年前

嗯,注意要最后结束的时候掉下close(官方例子有,不要漏了)。
如果不想调close,就加一些休眠时间。(因为有些消息是还在缓存中,没来得及发送到kafka的时候,进程就结束了)

你的答案

查看kafka相关的其他问题或提一个您自己的问题