好的,我先按照您的点评和建议优化下重新测试下,现在看来确实太复杂了,看的头痛,假装不是自己写的然后心里默念哪个傻逼写的。
然后我们数据量是近pb级别的,所以就考虑的异步
是的,callback你是单例,并且大家都会共同触发你的逻辑。
线程池疑点是对的,并发操作异常那块producer看到的消息缓存队列很可能是同一个,就用1个试试。
不知道你的并发量有多大。你可以producer.send(record).get();
同步发送(性能会打折扣)。
你不想丢消息这么设计太复杂了,考虑过度,导致你的程序过度复杂。
producer如果失效本身会自动重连,无需你自己替换。
同步发送失败信息可以直接捕获,我们捕获后直接打日志,然后人工处理(我们已经4年了,没有人工处理过,只要不是kill -9,普通的关闭是不会有问题的),另外一种异步callback将失败的加到失败队列里,从上层重新发送。
你可以参考这个例子,同步的,我们支付业务用的,运行5年没出过问题,没丢过消息。
https://www.orchome.com/1056
onFail是我的逻辑,重试的逻辑。不知道这样设计有没有问题?我们系统不能丢一条数据,所以才设计的这个重试逻辑。
还有个疑点是自制的线程池那种方式应该没问题的吧?
还有那个getSignal..()的设计感觉也没问题?
另外,onComplete方法应该是kafka生产者自有的重试规则的次数试完之后仍然失败才会在这个回调方法里表现发送失败对吧?而不是试一次就回调一次
callback的对象只实例化了一次,所有的回调通知共用这个对象,重点关注下onFail()
的逻辑。
共享对象的300ms,一条300ms,那大家都得排着。
不知道你上面那个onFail()是不是你的逻辑,有休眠,有处理等等,还有重置生产传递对象,线程的可见性也要注意用volidate
修饰。
@Override
public void add(int partition, String value, Callback callback) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, partition, null, value.getBytes(Charsets.UTF_8));
if(callback != null) {
producer.send(record, new org.apache.kafka.clients.producer.Callback() {
@Override
public void onCompletion(RecordMetadata arg0, Exception arg1) {
if(arg1 == null) {
callback.onSuccess(arg0.serializedKeySize() + arg0.serializedValueSize());
}else {
callback.onFail(new MsgSendFailException(arg1));
}
}
});
} else {
producer.send(record);
}
}