现象描述
突然客户群里反馈,线上某功能处理出现严重拥堵。再处理不好就要切换渠道。这个功能就是一个通知功能,客户依赖通知结果去完成他的业务逻辑。但是这个通知非常缓慢,严重拥堵。
背景描述
常有这样一个需求场景,为了提高请求的吞吐量,在一个请求链路中某些业务逻辑是可以异步执行。实现方式大体上分为两种:
- 开辟单独的线程去处理异步逻辑。
- 引入MQ将异步逻辑发送到MQ,其他服务接受到消息后处理。
本文讨论的是第一种情况。Spring 提供了一个注解@Async 作用就是开辟独立线程去异步处理。但是在不深入了解注解实现的情况下使用,往往就造成一些问题。
一个业务系统使用了@Async 实现了一个通知功能,于是出现了上述的现象描述。
代码是这样的。
@Retryable(maxAttempts = 3, backoff = @Backoff(delay = 1000 * 5, multiplier = 3), include = CallbackFailException.class)
@Async
public String doCallback(CallBackMessage callBackMessage) {
......do samething
}
@Retryable 这个注解的作用是完成重试机制,当执行过程中遇到指定异常类型是触发重试,可以指定重试的次数,重试间隔时间。这个不是本文的重点不做讨论。
@Retryable 和 @Async 一起使用的目的就是异步的完成通知,如果通知失败触发重试机制。
问题分析
现象是通知出现了积压,大量通知阻塞。我们来看@Async的实现原理。既然需要开辟新线程去执行,我们看Spring 是如果实现的。如果不自定义异步方法的线程池,Spring 默认使SimpleAsyncTaskExecutor,但是这个线程池不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。它会根据CPU核心数设置一个最大值,如果超过这个值就会阻塞其他线程。并发大的时候会产生严重的性能问题.
相关源码:
public void execute(Runnable task, long startTimeout) {
Assert.notNull(task, "Runnable must not be null");
Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);
if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
this.concurrencyThrottle.beforeAccess();
doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
}
else {
doExecute(taskToUse);
}
}
protected void beforeAccess() {
if (this.concurrencyLimit == NO_CONCURRENCY) {
throw new IllegalStateException(
"Currently no invocations allowed - concurrency limit set to NO_CONCURRENCY");
}
if (this.concurrencyLimit > 0) {
boolean debug = logger.isDebugEnabled();
synchronized (this.monitor) {
boolean interrupted = false;
while (this.concurrencyCount >= this.concurrencyLimit) {
if (interrupted) {
throw new IllegalStateException("Thread was interrupted while waiting for invocation access, " +
"but concurrency limit still does not allow for entering");
}
if (debug) {
logger.debug("Concurrency count " + this.concurrencyCount +
" has reached limit " + this.concurrencyLimit + " - blocking");
}
try {
this.monitor.wait();
}
catch (InterruptedException ex) {
// Re-interrupt current thread, to allow other threads to react.
Thread.currentThread().interrupt();
interrupted = true;
}
}
if (debug) {
logger.debug("Entering throttle at concurrency count " + this.concurrencyCount);
}
this.concurrencyCount++;
}
}
}
protected void doExecute(Runnable task) {
Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
thread.start();
}
如果异步执行的业务逻辑耗时较长,则会出现大量的阻塞,这次线上问题就是因为通知是发给第三方系统,请求响应超时时间设置过长,恰好部分客户服务出现问题导致通知返回时间非常长,触发了重试通知,重试时又是相同的问题。导致大量的通知积压。
解决方案
- 首先要使用自定义的线程池替换默认的 SimpleAsyncTaskExecutor 具体如下:
@Configuration
@EnableAsync
public class AppConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(16);
threadPoolTaskExecutor.setMaxPoolSize(32);
threadPoolTaskExecutor.setQueueCapacity(10000);
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new SimpleAsyncUncaughtExceptionHandler();
}
}
这样@Async就会使用自定义的线程池,如果@Async使用很多,还可以定义多个线程池,然后再指定使用具体的线程池。当然你线程池里面可以设置拒绝的策略,这里就不做讨论。
- 其次如果需要异步执行的业务逻辑非常耗时,不建议使用@Async,使用MQ去处理。如果异步任务中需要请求其他的服务,也注意要设置请求超时时间,以防其他服务出现异常时带崩你的服务。