常见的异步方式:
创建异步线程
每个新创建一个线程来执行异步任务,任务结束线程也终止。
线程的创建成本比较大,不建议使用。
使用Queue,producer/consumer方式
在内部创建一个Queue,worker线程直接将异步处理的任务放入queue,一个或多个异步线程从queue中消费并执行任务。
线程池
用线程池来替换每次创建线程,减少线程创建的成本,线程被复用,一次创建多处使用。
和使用Queue类似,也是通过BlockingQueue
实现,但策略上更复杂,向线程池提交Callable&Runnable任务,由线程池调度执行。
参考:java.util.concurrent.ThreadPoolExecutor#execute
spring @Async注解
通过注解来来简化了异步编程,只需要在需要异步的方法上使用@Async
注解即可。
其本质也是在线程池功能上扩展的,将异步执行方法封装为一个Callable
,然后提交给线程池。
org.springframework.aop.interceptor.AsyncExecutionInterceptor:
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
Callable<Object> task = new Callable<Object>() {
@Override
public Object call() throws Exception {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
}
};
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
背景和场景
产生的背景
在项目中使用了@Async
来执行异步任务,但在线上运行时出现了一次OOM的故障。通过分析发现是,线程池队列设置的比较大,当时的JVM内存给的也比较少(2048M),异步任务方法参数中传了大量的数据,任务执行被后端数据库阻塞(后端数据库变慢),最后导致缓存了大量的数据被放到线程池队列。其实JVM内存配置合适,线程池队列数合适,并配置合适的RejectedExecutionHandler
策略。
产生这个组件,1) 旨在替换内存队列的异步方式 2) 用来方便扩展集成分布式MQ
异步隔离
除了上面背景和场景,开发这个组件的另一个初衷就是有效异步隔离和作为一个降级备份方案。
也是主要实现了文件队列方式的一个原因。
当我们使用分布式MQ时,难免分布式MQ宕机或者其他网络等原因导致不能生产消息,或者阻塞影响到本身的业务,出现这种情况时可以降级到本地文件队列。
本地文件队列的优点是速度快,只要文件系统不出问题可以认为不会被阻塞。缺点是本地文件队列生产的消息必须自己来消费,出现故障时消息消费会延迟,文件系统的损坏也会导致消息丢失。主要看使用的姿势,更看重哪一方面了。
基本架构设计思路
采用producer/consumer生产消费设计模式。
参考了@Async
思路,定义一个注解@AsyncExecutable
, 使用Spring拦截器拦截注解了@AsyncExecutable
的方法,可以使用AOP或者BeanPostProcessor来应用拦截器。
producer
拦截器拦截到@AsyncExecutable
方法后,将该方法所有的参数和方法信息作为Message,并序列化Message,序列化采用Kryo或者Json,将序列化后的信息放入队列。
class Message {
String beanName;
String klassName;
String methodName;
Class<?>[] argTypes;
Object[] args;
boolean hasTransactional = true;
}
consumer
有1个调度主线程和worker线程组成,主线程负责从队列中拉取消息,并分发到worker线程,worker线程采用线程池,使用了spring提供的TaskExecutor。
worker线程反序列化消息为Message对象,并根据Message中的方法信息在spring ApplicationContext中查找到spring 管理的bean,并通过反射来调用。
队列
队列抽象了一个BlockableQueue
, 通过BlockableQueue
具体实现来扩展,可以是内存,文件,或分布式MQ。
public interface BlockableQueue<T> {
String DefaultQueueName = "fileQueue";
/**
* push一个消息到队列
*
* @param t
* @return
*/
boolean offer(T t);
/**
* 从队列pop一个消息,如果队列中无可用消息,则阻塞
*
* @return
* @throws InterruptedException
*/
T take() throws InterruptedException;
/**
* 从队列pop一个消息,如果队列中无可用消息,则返回null
*
* @return
*/
T poll();
/**
* 队列中消息数量
*
* @return
*/
int size();
}
通用的默认实现:
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class DefaultBlockableQueue implements BlockableQueue<byte[]> {
final Lock lock = new ReentrantLock();
final Condition notEmpty = lock.newCondition();
private Queue<byte[]> queue = null;
public FileBlockableQueue(Queue<byte[]> queue) {
this.queue = queue;
}
@Override
public boolean offer(byte[] bytes) {
lock.lock();
try {
boolean v = queue.offer(bytes);
notEmpty.signal();
return v;
} finally {
lock.unlock();
}
}
@Override
public byte[] take() throws InterruptedException {
lock.lock();
try {
while (queue.size() == 0) {
notEmpty.await();
}
byte[] bytes = queue.poll();
return bytes;
} finally {
lock.unlock();
}
}
@Override
public byte[] poll() {
return queue.poll();
}
@Override
public int size() {
return queue.size();
}
}
本文实现了一个文件队列,采用去哪儿文件队列实现,这是一个fork:https://github.com/tietang/fqueue
对编程模型来说不用关心异步细节,只需要在需要异步的方法上注解@AsyncExecutable
即可。