业务系统连接不上kafka服务处理

√锋²º¹8ヾ/❤ 发表于: 2018-08-11   最后更新时间: 2018-08-12 12:49:36   2,140 游览

生产者在send的时候,有非阻塞和阻塞两种。

现在有业务需要,使用阻塞模式。需要根据send之后的响应进行其他的业务处理,或者接收到send的异常之后,将一些数据存在定时任务,尝试再往下走。

业务过程:

1 业务逻辑 ---> 2 kafka的send(),并且调用get() ----> 3 业务逻辑(根据第2步的响应进行处理)。

在第二步中,有个疑问点:

a: 业务服务器 跟 kafka服务器因为网络原因不连通,那么在send的时候,要如何捕获超时,如何设置不多次重试?

我封装的代码如下


import com.oristartech.kafka.core.domain.Message;
import com.oristartech.kafka.core.domain.Reply;

/**
 * 生产者接口
 * @author 
 *
 */
public interface ProducerHandler {

    /**
     * 发送消息
     * @param topic
     * @param value
     * @return
     */
    public void send(Message message) throws Exception;

    public void send(Message message,SendCallback callback) throws Exception;

    /**
     * 同步发送,等待响应
     * @param message
     */
    public Reply sendSync(Message message);
}


import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSONObject;
import com.oristartech.kafka.core.domain.Message;
import com.oristartech.kafka.core.domain.Reply;
import com.oristartech.kafka.core.exception.ProducerException;

/**
 * 生产者接口实现抽象类,引入jar的项目在使用消息的时候,继承该类
 * 
 * @author 
 *
 */
public abstract class AbstractProducer implements ProducerHandler {

    private static final Logger logger = LoggerFactory.getLogger(AbstractProducer.class);

    static KafkaProducer<String, String> kafkaProducer = null;
    static KafkaProducer<String, String> kafkaProducer_sync = null;

    // ExecutorService fixedThreadPool = Executors.newFixedThreadPool(20);//线程池
    ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(10);
    static String key = "e24d66f5bbec";
    Random random = new Random();

    /**
     * 同步
     */
    protected static Properties props_sync = new Properties();
    static {
        Properties properties = new Properties();
        // 使用ClassLoader加载properties配置文件生成对应的输入流
        InputStream in = AbstractProducer.class.getClassLoader()
                .getResourceAsStream("config/message-producer.properties");
        // 使用properties对象加载输入流
        try {
            properties.load(in);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            logger.error("找不到config/message-producer.properties文件");
            e.printStackTrace();
        }

        String bootstrap_servers = properties.getProperty("bootstrap.servers");
        if (StringUtils.isEmpty(bootstrap_servers)) {
            logger.error("找不到bootstrap.servers属性");
        }

        String producer_acks = properties.getProperty("producer.acks");
        if (StringUtils.isEmpty(producer_acks)) {
            producer_acks = "all";
        }
        String producer_retries = properties.getProperty("producer.retries");
        if (StringUtils.isEmpty(producer_retries)) {
            //producer_retries = "0";
            producer_retries = ""+3;
        }
        String producer_batch_size = properties.getProperty("producer.batch.size");
        if (StringUtils.isEmpty(producer_batch_size)) {
            producer_batch_size = "16384";
        }
        String producer_linger_ms = properties.getProperty("producer.linger.ms");
        if (StringUtils.isEmpty(producer_linger_ms)) {
            producer_linger_ms = "1";
        }
        String producer_buffer_memory = properties.getProperty("producer.buffer.memory");
        if (StringUtils.isEmpty(producer_buffer_memory)) {
            producer_buffer_memory = "33554432";
        }

        String producer_metadata_fetch_timeout_ms = properties.getProperty("metadata.fetch.timeout.ms");
        if (StringUtils.isEmpty(producer_metadata_fetch_timeout_ms)) {
            producer_metadata_fetch_timeout_ms = "" + (60 * 5 * 1000);
        }



        //非阻塞
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrap_servers);// 服务器ip:端口号,集群用逗号分隔
        //尝试50秒时间,发送不出去就当失败
        props.put("max.block.ms", 50000);// 该配置控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 将阻塞多长时间。此外这些方法被阻止,也可能是因为缓冲区已满或元数据不可用。在用户提供的序列化程序或分区器中的锁定不会计入此超时
        props.put("acks", producer_acks); // “所有”设置将导致记录的完整提交阻塞,最慢的,但最持久的设置。
        props.put("retries", Integer.valueOf(producer_retries)); // 如果请求失败,生产者也会自动重试,即使设置成0
        props.put("batch.size", Integer.valueOf(producer_batch_size));
        props.put("linger.ms", Integer.valueOf(producer_linger_ms)); // 默认立即发送,这里这是延时毫秒数
        props.put("buffer.memory", Integer.valueOf(producer_buffer_memory)); // 生产者缓冲大小,当缓冲区耗尽后,额外的发送调用将被阻塞。时间超过max.block.ms将抛出TimeoutException
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,CustomPartitioner.class.getCanonicalName());//自定义分区函数
        props.put("metadata.fetch.timeout.ms", Integer.valueOf(producer_metadata_fetch_timeout_ms));// 指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误(抛出异常或执行回调)。

        kafkaProducer = new KafkaProducer<String, String>(props);

        //阻塞
        props_sync.put("bootstrap.servers", bootstrap_servers);// 服务器ip:端口号,集群用逗号分隔
        props_sync.put("max.block.ms", 5000);// 该配置控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 将阻塞多长时间。此外这些方法被阻止,也可能是因为缓冲区已满或元数据不可用。在用户提供的序列化程序或分区器中的锁定不会计入此超时
        props_sync.put("request.timeout.ms", 15000);// 该配置控制客户端等待请求响应的最长时间。如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽,则该请求将失败。 这应该大于replica.lag.time.max.ms,以减少由于不必要的生产者重试引起的消息重复的可能性。
        //props.put("replica.lag.time.max.ms", 18000);//
        props_sync.put("acks", producer_acks); // “所有”设置将导致记录的完整提交阻塞,最慢的,但最持久的设置。
        props_sync.put("retries", Integer.valueOf(producer_retries)); // 如果请求失败,生产者也会自动重试,即使设置成0
        props_sync.put("batch.size", Integer.valueOf(producer_batch_size));
        props_sync.put("linger.ms", Integer.valueOf(producer_linger_ms)); // 默认立即发送,这里这是延时毫秒数
        props_sync.put("buffer.memory", Integer.valueOf(producer_buffer_memory)); // 生产者缓冲大小,当缓冲区耗尽后,额外的发送调用将被阻塞。时间超过max.block.ms将抛出TimeoutException
        props_sync.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props_sync.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props_sync.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,CustomPartitioner.class.getCanonicalName());//自定义分区函数

        props_sync.put("metadata.fetch.timeout.ms", Integer.valueOf(producer_metadata_fetch_timeout_ms));// 指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误(抛出异常或执行回调)。


    }

    //非阻塞
    @Override
    public void send(Message message) throws Exception {
        send(message, null);
    }

    @Override
    public void send(Message message, final SendCallback callback) throws Exception {
         if(null == message) {
             logger.error("消息对象为空");
             return;
         }

         String topic = message.getTopic();
         if(StringUtils.isEmpty(topic)) {
             throw new ProducerException("主题(topic)不允许为空");
         }

         String key = message.getKey();
         if(StringUtils.isEmpty(key)) {
             throw new ProducerException("消息key不允许为空");
         }

         String value = JSONObject.toJSONString(message);


         if(kafkaProducer == null) {
             logger.error("kafkaProducer 为null...........");
             return;
         }

         ProducerRecord<String, String> record = new ProducerRecord<String,String>(topic,key,value);
         long startTime = System.currentTimeMillis();

         kafkaProducer.send(record);
         long endTime = System.currentTimeMillis();
         logger.info("响应时间[" + ((endTime - startTime) / 1000.0) + "秒]" );

    }

    //同步、阻塞
    @Override
    public Reply sendSync(Message message) {
         kafkaProducer_sync = new KafkaProducer<String, String>(props_sync);
         Reply reply = new Reply();
         if(null == message) {
             logger.error("要发送的消息对象为空...............");
             reply.setStatus(0);
             reply.setMsg("要发送的消息对象不允许为空。");
             return reply;
         }

         String topic = message.getTopic();
         if(StringUtils.isEmpty(topic)) {
             logger.error("要发送的消息主题为空...............");
             reply.setStatus(0);
             reply.setMsg("要发送的消息主题不允许为空。");
             return reply;
         }
         String key = message.getKey();
         if(StringUtils.isEmpty(key)) {
             logger.error("要发送的消息key为空...............");
             reply.setStatus(0);
             reply.setMsg("要发送的消息key不允许为空。");
             return reply;
         }

         String value = JSONObject.toJSONString(message);
         System.out.println(value);
         if(kafkaProducer == null) {
             logger.error("kafkaProducer 为null...........");
             reply.setStatus(0);
             reply.setMsg("生产者Producer为空");
             return reply;
         }
         long startTime = System.currentTimeMillis();
        //这里通过Future.get()方法,阻塞当前线程,等待Kafka服务端的ACK响应
         try {
             reply.setStatus(1);
             ProducerRecord<String, String> record = new ProducerRecord<String,String>(topic, key,value);
             Future<RecordMetadata> future = kafkaProducer_sync.send(record);
             RecordMetadata recordMetadata = future.get();
             if (recordMetadata == null) {
                 reply.setStatus(0);
                 reply.setMsg("发送失败:recordMetadata为空");
                 return reply;
             }
         } catch (Exception e) {
             reply.setStatus(0);
             if(e instanceof java.util.concurrent.ExecutionException) {
                 if(e.getCause() != null && e.getCause() instanceof org.apache.kafka.common.errors.TimeoutException) {
                     reply.setMsg("发送超时:"+e.getMessage());
                     kafkaProducer_sync.close();
                 }else {
                     reply.setMsg("发送失败");
                 }
             }else {
                 reply.setMsg("发送失败");
             }
             e.printStackTrace();
             return reply;
         }finally {
             //kafkaProducer_sync.close();
        }
         long endTime = System.currentTimeMillis();
         reply.setMsg("发送成功! 响应时间["+ ((endTime - startTime) / 1000.0) + "秒]");
          logger.info("sendSync 响应时间[" + ((endTime - startTime) / 1000.0) + "秒]");
          return reply;
    }

}

其中,在 sendSync 中 ,是否有必要在入口第一步的时候,每次都初始化一次:

kafkaProducer_sync = new KafkaProducer(props_sync);

其次是, kafkaProducer_sync.close(); 在什么时机下使用才合适?

发表于 2018-08-11
添加评论

1. 如何考虑kafka服务的异常情况
2. kafkaProducer_sync = new KafkaProducer<String, String>(props_sync);  该在什么地方初始化才合适?

1、阻塞的可以直接捕获异常,根据你业务可以针对性的进行处理,落库也好,都可以。我个人觉得日志打出来就好,因为kafka发送出现异常少之又少,运行5年了,没出现过。不然你为了异常要额外添加很多处理逻辑。
2、kafkaProducer_sync一次初始化,和配置一起初始化。
3、 kafkaProducer_sync.close();   在系统停止的时候调用。

你的答案

查看kafka相关的其他问题或提一个您自己的问题