开篇
这篇文章的目的主要是分析下Dubbo当中关于线程池的策略和线程模型,主要从源码角度出发并结合网上一些现成的文章来进行阐述。
个人阅读过源码以后的第一感觉就是成熟的框架也是靠使用了基础的线程池来实现的,唯一的遗憾就是这篇文章没有理顺dubbo的配置和真正线程池参数之间的关系,这个后面再补充一篇类似的文章。
Dubbo线程池策略
resources目录下的com.alibaba.dubbo.common.threadpool.ThreadPool的文件
fixed=com.alibaba.dubbo.common.threadpool.support.fixed.FixedThreadPool
cached=com.alibaba.dubbo.common.threadpool.support.cached.CachedThreadPool
limited=com.alibaba.dubbo.common.threadpool.support.limited.LimitedThreadPool
说明:
- dubbo的线程池策略通过SPI配置文件对外提供,在com.alibaba.dubbo.common.threadpool.ThreadPool文件当中定义。
- dubbo的线程池策略对外提供了三种策略,分别是fixed、cached、limited三类。
- 每类策略的定义见下述源码。
CachedThreadPool
public class CachedThreadPool implements ThreadPool {
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
说明:
- CachedThreadPool的实现可以理解为JDK当中Executors.newCachedThreadPool()方法。
FixedThreadPool
public class FixedThreadPool implements ThreadPool {
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
说明:
- FixedThreadPool的实现可以理解为JDK当中Executors.newFixedThreadPool()方法。
LimitedThreadPool
public class LimitedThreadPool implements ThreadPool {
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
说明:
- LimitedThreadPool的实现可以理解为JDK当中Executors.newCachedThreadPool()方法。
- LimitedThreadPool的区别在于线程池中的线程永远不会过期,因为alive时间为最大值。
NamedThreadFactory
public class NamedThreadFactory implements ThreadFactory {
private static final AtomicInteger POOL_SEQ = new AtomicInteger(1);
private final AtomicInteger mThreadNum = new AtomicInteger(1);
private final String mPrefix;
private final boolean mDaemon;
private final ThreadGroup mGroup;
public NamedThreadFactory() {
this("pool-" + POOL_SEQ.getAndIncrement(), false);
}
public NamedThreadFactory(String prefix) {
this(prefix, false);
}
public NamedThreadFactory(String prefix, boolean daemon) {
mPrefix = prefix + "-thread-";
mDaemon = daemon;
SecurityManager s = System.getSecurityManager();
mGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();
}
public Thread newThread(Runnable runnable) {
String name = mPrefix + mThreadNum.getAndIncrement();
Thread ret = new Thread(mGroup, runnable, name, 0);
ret.setDaemon(mDaemon);
return ret;
}
public ThreadGroup getThreadGroup() {
return mGroup;
}
}
说明:
- 提供了线程池中线程创建的工厂,推荐使用线程池时候一定要自定义线程池工厂,便于定位线程的用途。
Dubbo线程模型
resources目录下的com.alibaba.dubbo.remoting.Dispatcher文件
all=com.alibaba.dubbo.remoting.transport.dispatcher.all.AllDispatcher
direct=com.alibaba.dubbo.remoting.transport.dispatcher.direct.DirectDispatcher
message=com.alibaba.dubbo.remoting.transport.dispatcher.message.MessageOnlyDispatcher
execution=com.alibaba.dubbo.remoting.transport.dispatcher.execution.ExecutionDispatcher
connection=com.alibaba.dubbo.remoting.transport.dispatcher.connection.ConnectionOrderedDispatcher
说明:
- Dubbo线程模型在resources目录下的com.alibaba.dubbo.remoting.Dispatcher文件当中。
- 提供5类线程模型,分别是all、direct、message、execution、connection。
- 每类的用途见下面的介绍。
Dubbo线程模型对比
根据请求的消息类被IO线程处理还是被业务线程池处理,Dubbo提供了下面几种线程模型:
- all : (AllDispatcher类)所有消息都派发到业务线程池,这些消息包括请求/响应/连接事件/断开事件/心跳等,这些线程模型如下图:
- direct : (DirectDispacher类)所有消息都不派发到业务线程池,全部在IO线程上直接执行,模型如下图:
- message : (MessageOnlyDispatcher类)只有请求响应消息派发到业务线程池,其他连接断开事件/心跳等消息,直接在IO线程上执行,模型图如下:
- execution:(ExecutionDispatcher类)只把请求类消息派发到业务线程池处理,但是响应和其它连接断开事件,心跳等消息直接在IO线程上执行,模型如下图:
- connection:(ConnectionOrderedDispatcher类)在IO线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到业务线程池处理,模型如下图:
AllDispatcher
public class AllDispatcher implements Dispatcher {
public static final String NAME = "all";
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new AllChannelHandler(handler, url);
}
}
public class AllChannelHandler extends WrappedChannelHandler {
public AllChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
public void connected(Channel channel) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {}
}
public void disconnected(Channel channel) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
} catch (Throwable t) {}
}
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {}
}
public void caught(Channel channel, Throwable exception) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
} catch (Throwable t) {}
}
private ExecutorService getExecutorService() {
ExecutorService cexecutor = executor;
if (cexecutor == null || cexecutor.isShutdown()) {
cexecutor = SHARED_EXECUTOR;
}
return cexecutor;
}
}
说明:
- AllChannelHandler的connected、disconnected、received统一通过业务线程池处理。
- cexecutor.execute(new ChannelEventRunnable())统一提交。
ConnectionOrderedDispatcher
public class ConnectionOrderedDispatcher implements Dispatcher {
public static final String NAME = "connection";
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new ConnectionOrderedChannelHandler(handler, url);
}
}
public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {
protected final ThreadPoolExecutor connectionExecutor;
private final int queuewarninglimit;
public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
String threadName = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
connectionExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
new NamedThreadFactory(threadName, true),
new AbortPolicyWithReport(threadName, url)
); // FIXME There's no place to release connectionExecutor!
queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
}
public void connected(Channel channel) throws RemotingException {
try {
checkQueueLength();
connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {}
}
public void disconnected(Channel channel) throws RemotingException {
try {
checkQueueLength();
connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
} catch (Throwable t) {}
}
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = executor;
if (cexecutor == null || cexecutor.isShutdown()) {
cexecutor = SHARED_EXECUTOR;
}
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {}
}
public void caught(Channel channel, Throwable exception) throws RemotingException {
ExecutorService cexecutor = executor;
if (cexecutor == null || cexecutor.isShutdown()) {
cexecutor = SHARED_EXECUTOR;
}
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
} catch (Throwable t) {}
}
private void checkQueueLength() {
if (connectionExecutor.getQueue().size() > queuewarninglimit) {
logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queuewarninglimit));
}
}
}
说明:
- ConnectionOrderedDispatcher的connected和disconnected事件通过connectionExecutor实现。
- ConnectionOrderedDispatcher的received事件通过单独的executor去实现。
- ConnectionOrderedDispatcher的连接处理和消息处理通过不同的executor处理。
DirectDispatcher
public class DirectDispatcher implements Dispatcher {
public static final String NAME = "direct";
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return handler;
}
}
说明:
- DirectDispatcher内部的处理没有用到线程池,统一由IO线程去处理。
ExecutionDispatcher
public class ExecutionDispatcher implements Dispatcher {
public static final String NAME = "execution";
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new ExecutionChannelHandler(handler, url);
}
}
public class ExecutionChannelHandler extends WrappedChannelHandler {
public ExecutionChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
public void connected(Channel channel) throws RemotingException {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
}
public void disconnected(Channel channel) throws RemotingException {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
}
public void received(Channel channel, Object message) throws RemotingException {
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {}
}
public void caught(Channel channel, Throwable exception) throws RemotingException {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
}
}
说明:
- ExecutionDispatcher的实现和AllDispatcher几乎相同。
- ExecutionDispatcher的连接和消息处理统一由业务线程池处理。
MessageOnlyDispatcher
public class MessageOnlyDispatcher implements Dispatcher {
public static final String NAME = "message";
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new MessageOnlyChannelHandler(handler, url);
}
}
public class MessageOnlyChannelHandler extends WrappedChannelHandler {
public MessageOnlyChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = executor;
if (cexecutor == null || cexecutor.isShutdown()) {
cexecutor = SHARED_EXECUTOR;
}
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {}
}
}
说明:
- MessageOnlyDispatcher的消息处理通过业务线程池去执行。
- MessageOnlyDispatcher的连接事件通过IO线程去执行。