生产者在send的时候,有非阻塞和阻塞两种。
现在有业务需要,使用阻塞模式。需要根据send之后的响应进行其他的业务处理,或者接收到send的异常之后,将一些数据存在定时任务,尝试再往下走。
业务过程:
1 业务逻辑 ---> 2 kafka的send(),并且调用get() ----> 3 业务逻辑(根据第2步的响应进行处理)。
在第二步中,有个疑问点:
a: 业务服务器 跟 kafka服务器因为网络原因不连通,那么在send的时候,要如何捕获超时,如何设置不多次重试?
我封装的代码如下
import com.oristartech.kafka.core.domain.Message;
import com.oristartech.kafka.core.domain.Reply;
/**
* 生产者接口
* @author
*
*/
public interface ProducerHandler {
/**
* 发送消息
* @param topic
* @param value
* @return
*/
public void send(Message message) throws Exception;
public void send(Message message,SendCallback callback) throws Exception;
/**
* 同步发送,等待响应
* @param message
*/
public Reply sendSync(Message message);
}
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSONObject;
import com.oristartech.kafka.core.domain.Message;
import com.oristartech.kafka.core.domain.Reply;
import com.oristartech.kafka.core.exception.ProducerException;
/**
* 生产者接口实现抽象类,引入jar的项目在使用消息的时候,继承该类
*
* @author
*
*/
public abstract class AbstractProducer implements ProducerHandler {
private static final Logger logger = LoggerFactory.getLogger(AbstractProducer.class);
static KafkaProducer<String, String> kafkaProducer = null;
static KafkaProducer<String, String> kafkaProducer_sync = null;
// ExecutorService fixedThreadPool = Executors.newFixedThreadPool(20);//线程池
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(10);
static String key = "e24d66f5bbec";
Random random = new Random();
/**
* 同步
*/
protected static Properties props_sync = new Properties();
static {
Properties properties = new Properties();
// 使用ClassLoader加载properties配置文件生成对应的输入流
InputStream in = AbstractProducer.class.getClassLoader()
.getResourceAsStream("config/message-producer.properties");
// 使用properties对象加载输入流
try {
properties.load(in);
} catch (IOException e) {
// TODO Auto-generated catch block
logger.error("找不到config/message-producer.properties文件");
e.printStackTrace();
}
String bootstrap_servers = properties.getProperty("bootstrap.servers");
if (StringUtils.isEmpty(bootstrap_servers)) {
logger.error("找不到bootstrap.servers属性");
}
String producer_acks = properties.getProperty("producer.acks");
if (StringUtils.isEmpty(producer_acks)) {
producer_acks = "all";
}
String producer_retries = properties.getProperty("producer.retries");
if (StringUtils.isEmpty(producer_retries)) {
//producer_retries = "0";
producer_retries = ""+3;
}
String producer_batch_size = properties.getProperty("producer.batch.size");
if (StringUtils.isEmpty(producer_batch_size)) {
producer_batch_size = "16384";
}
String producer_linger_ms = properties.getProperty("producer.linger.ms");
if (StringUtils.isEmpty(producer_linger_ms)) {
producer_linger_ms = "1";
}
String producer_buffer_memory = properties.getProperty("producer.buffer.memory");
if (StringUtils.isEmpty(producer_buffer_memory)) {
producer_buffer_memory = "33554432";
}
String producer_metadata_fetch_timeout_ms = properties.getProperty("metadata.fetch.timeout.ms");
if (StringUtils.isEmpty(producer_metadata_fetch_timeout_ms)) {
producer_metadata_fetch_timeout_ms = "" + (60 * 5 * 1000);
}
//非阻塞
Properties props = new Properties();
props.put("bootstrap.servers", bootstrap_servers);// 服务器ip:端口号,集群用逗号分隔
//尝试50秒时间,发送不出去就当失败
props.put("max.block.ms", 50000);// 该配置控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 将阻塞多长时间。此外这些方法被阻止,也可能是因为缓冲区已满或元数据不可用。在用户提供的序列化程序或分区器中的锁定不会计入此超时
props.put("acks", producer_acks); // “所有”设置将导致记录的完整提交阻塞,最慢的,但最持久的设置。
props.put("retries", Integer.valueOf(producer_retries)); // 如果请求失败,生产者也会自动重试,即使设置成0
props.put("batch.size", Integer.valueOf(producer_batch_size));
props.put("linger.ms", Integer.valueOf(producer_linger_ms)); // 默认立即发送,这里这是延时毫秒数
props.put("buffer.memory", Integer.valueOf(producer_buffer_memory)); // 生产者缓冲大小,当缓冲区耗尽后,额外的发送调用将被阻塞。时间超过max.block.ms将抛出TimeoutException
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,CustomPartitioner.class.getCanonicalName());//自定义分区函数
props.put("metadata.fetch.timeout.ms", Integer.valueOf(producer_metadata_fetch_timeout_ms));// 指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误(抛出异常或执行回调)。
kafkaProducer = new KafkaProducer<String, String>(props);
//阻塞
props_sync.put("bootstrap.servers", bootstrap_servers);// 服务器ip:端口号,集群用逗号分隔
props_sync.put("max.block.ms", 5000);// 该配置控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 将阻塞多长时间。此外这些方法被阻止,也可能是因为缓冲区已满或元数据不可用。在用户提供的序列化程序或分区器中的锁定不会计入此超时
props_sync.put("request.timeout.ms", 15000);// 该配置控制客户端等待请求响应的最长时间。如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽,则该请求将失败。 这应该大于replica.lag.time.max.ms,以减少由于不必要的生产者重试引起的消息重复的可能性。
//props.put("replica.lag.time.max.ms", 18000);//
props_sync.put("acks", producer_acks); // “所有”设置将导致记录的完整提交阻塞,最慢的,但最持久的设置。
props_sync.put("retries", Integer.valueOf(producer_retries)); // 如果请求失败,生产者也会自动重试,即使设置成0
props_sync.put("batch.size", Integer.valueOf(producer_batch_size));
props_sync.put("linger.ms", Integer.valueOf(producer_linger_ms)); // 默认立即发送,这里这是延时毫秒数
props_sync.put("buffer.memory", Integer.valueOf(producer_buffer_memory)); // 生产者缓冲大小,当缓冲区耗尽后,额外的发送调用将被阻塞。时间超过max.block.ms将抛出TimeoutException
props_sync.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props_sync.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props_sync.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,CustomPartitioner.class.getCanonicalName());//自定义分区函数
props_sync.put("metadata.fetch.timeout.ms", Integer.valueOf(producer_metadata_fetch_timeout_ms));// 指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误(抛出异常或执行回调)。
}
//非阻塞
@Override
public void send(Message message) throws Exception {
send(message, null);
}
@Override
public void send(Message message, final SendCallback callback) throws Exception {
if(null == message) {
logger.error("消息对象为空");
return;
}
String topic = message.getTopic();
if(StringUtils.isEmpty(topic)) {
throw new ProducerException("主题(topic)不允许为空");
}
String key = message.getKey();
if(StringUtils.isEmpty(key)) {
throw new ProducerException("消息key不允许为空");
}
String value = JSONObject.toJSONString(message);
if(kafkaProducer == null) {
logger.error("kafkaProducer 为null...........");
return;
}
ProducerRecord<String, String> record = new ProducerRecord<String,String>(topic,key,value);
long startTime = System.currentTimeMillis();
kafkaProducer.send(record);
long endTime = System.currentTimeMillis();
logger.info("响应时间[" + ((endTime - startTime) / 1000.0) + "秒]" );
}
//同步、阻塞
@Override
public Reply sendSync(Message message) {
kafkaProducer_sync = new KafkaProducer<String, String>(props_sync);
Reply reply = new Reply();
if(null == message) {
logger.error("要发送的消息对象为空...............");
reply.setStatus(0);
reply.setMsg("要发送的消息对象不允许为空。");
return reply;
}
String topic = message.getTopic();
if(StringUtils.isEmpty(topic)) {
logger.error("要发送的消息主题为空...............");
reply.setStatus(0);
reply.setMsg("要发送的消息主题不允许为空。");
return reply;
}
String key = message.getKey();
if(StringUtils.isEmpty(key)) {
logger.error("要发送的消息key为空...............");
reply.setStatus(0);
reply.setMsg("要发送的消息key不允许为空。");
return reply;
}
String value = JSONObject.toJSONString(message);
System.out.println(value);
if(kafkaProducer == null) {
logger.error("kafkaProducer 为null...........");
reply.setStatus(0);
reply.setMsg("生产者Producer为空");
return reply;
}
long startTime = System.currentTimeMillis();
//这里通过Future.get()方法,阻塞当前线程,等待Kafka服务端的ACK响应
try {
reply.setStatus(1);
ProducerRecord<String, String> record = new ProducerRecord<String,String>(topic, key,value);
Future<RecordMetadata> future = kafkaProducer_sync.send(record);
RecordMetadata recordMetadata = future.get();
if (recordMetadata == null) {
reply.setStatus(0);
reply.setMsg("发送失败:recordMetadata为空");
return reply;
}
} catch (Exception e) {
reply.setStatus(0);
if(e instanceof java.util.concurrent.ExecutionException) {
if(e.getCause() != null && e.getCause() instanceof org.apache.kafka.common.errors.TimeoutException) {
reply.setMsg("发送超时:"+e.getMessage());
kafkaProducer_sync.close();
}else {
reply.setMsg("发送失败");
}
}else {
reply.setMsg("发送失败");
}
e.printStackTrace();
return reply;
}finally {
//kafkaProducer_sync.close();
}
long endTime = System.currentTimeMillis();
reply.setMsg("发送成功! 响应时间["+ ((endTime - startTime) / 1000.0) + "秒]");
logger.info("sendSync 响应时间[" + ((endTime - startTime) / 1000.0) + "秒]");
return reply;
}
}
其中,在 sendSync 中 ,是否有必要在入口第一步的时候,每次都初始化一次:
kafkaProducer_sync = new KafkaProducer
其次是, kafkaProducer_sync.close(); 在什么时机下使用才合适?
1. 如何考虑kafka服务的异常情况
2. kafkaProducer_sync = new KafkaProducer<String, String>(props_sync); 该在什么地方初始化才合适?
1、阻塞的可以直接捕获异常,根据你业务可以针对性的进行处理,落库也好,都可以。我个人觉得日志打出来就好,因为kafka发送出现异常少之又少,运行5年了,没出现过。不然你为了异常要额外添加很多处理逻辑。
2、kafkaProducer_sync一次初始化,和配置一起初始化。
3、 kafkaProducer_sync.close(); 在系统停止的时候调用。
你的答案