本文是Netty文集中“Netty 源码解析”系列的文章。主要对Netty的重要流程以及类进行源码解析,以使得我们更好的去使用Netty。Netty是一个非常优秀的网络框架,对其源码解读的过程也是不断学习的过程。
Netty的优雅关闭操作
Netty是通过『eventLoopGroup.shutdownGracefully()』操作来实现它的优雅关闭的。
我们先来看下shutdownGracefully方法的doc说明:
/**
* Signals this executor that the caller wants the executor to be shut down. Once this method is called,
* {@link #isShuttingDown()} starts to return {@code true}, and the executor prepares to shut itself down.
* Unlike {@link #shutdown()}, graceful shutdown ensures that no tasks are submitted for <i>'the quiet period'</i>
* (usually a couple seconds) before it shuts itself down. If a task is submitted during the quiet period,
* it is guaranteed to be accepted and the quiet period will start over.
*
* @param quietPeriod the quiet period as described in the documentation
* @param timeout the maximum amount of time to wait until the executor is {@linkplain #shutdown()}
* regardless if a task was submitted during the quiet period
* @param unit the unit of {@code quietPeriod} and {@code timeout}
*
* @return the {@link #terminationFuture()}
*/
Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
调用者希望执行器进行关闭的信号。一旦这个方法被调用了,『isShuttingDown()』方法将开始都会返回true,同时执行器准备关闭它自己。不像『shutdown()』方法,优雅关闭会确保在它关闭它自己之前没有任务在’the quiet period’(平静期,即,gracefulShutdownQuietPeriod属性)内提交。如果一个任务在平静期内提交了,它会保证任务被接受并且重新开始平静期。
如果你现在,对这段描述有些许困惑,没关系,请继续往下看,gracefulShutdownQuietPeriod(即,quietPeriod参数)、gracefulShutdownStartTime(即,timeout参数)主要会在『confirmShutdown()』方法中使用,下面会结合方法的实现场景来说明gracefulShutdownStartTime、gracefulShutdownQuietPeriod的含义。
源码解析
// AbstractEventExecutorGroup#shutdownGracefully
public Future<?> shutdownGracefully() {
return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
}
static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2;
static final long DEFAULT_SHUTDOWN_TIMEOUT = 15;
// MultithreadEventExecutorGroup#shutdownGracefully
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
for (EventExecutor l: children) {
l.shutdownGracefully(quietPeriod, timeout, unit);
}
return terminationFuture();
}
遍历EventExecutor[]数组,取出EventExecutor执行shutdownGracefully操作。因为优雅关闭的流程主要是在各个NioEventLoop线程各自完成的,它是一个异步操作,因此此时返回该异步操作的Future,它是一个无返回结果的DefaultPromise对象。
① 确保 quietPeriod、unit的为有效值,即『quietPeriod >= 0』、『unit != null』。同时,确保timeout、quietPeriod之间的正确性,即『quietPeriod <= timeout』。
② 如果该NioEventLoop已经执行过关闭操作了,可能是『shutdownGracefully()』这样的优雅关闭,也有可能是『shutdown() or shutdownNow()』,当然后两种方法已经不建议使用了(Deprecated)。那么直接返回该异步操作的Future对象。
③ 使用自旋锁(『自旋 + CAS』)的方式修改当前NioEventLoop所关联的线程的状态(volatile修饰的成员变量state)。因为此方法可能被多线程同时调用,所以使用了自旋锁的方式来保证NioEventLoop所关联的线程状态(state成员变量)的修改是原子性的。
之前我们说过,NioEventLoop所关联的线程总共有5个状态,分别是:
private static final int ST_NOT_STARTED = 1; // 线程还未启动
private static final int ST_STARTED = 2; // 线程已经启动
private static final int ST_SHUTTING_DOWN = 3; // 线程正在关闭
private static final int ST_SHUTDOWN = 4; // 线程已经关闭
private static final int ST_TERMINATED = 5; // 线程已经终止
其中,在正常的线程状态流为:ST_NOT_STARTED ——> ST_STARTED ——> ST_SHUTTING_DOWN ——> ST_TERMINATED。
而ST_SHUTDOWN这个线程状态是已经弃用的『shutdown() or shutdownNow()』所会设置的线程状态,但是无论怎样在此步骤中,线程的状态至少为会置为ST_SHUTTING_DOWN,或者说正常情况下都是会设置为ST_SHUTTING_DOWN的。
补充简单说明下两个知识点:
a) 自旋锁(Spin lock):由它自己去占有CPU运行的时间,然后去尝试进行更新,直到更新成功完成。也因为它是占用CPU资源的方式,所以自旋锁实现的操作是非常简短的,不然其他线程可能会一直在自旋等待该自旋锁。也正式因为自旋锁是不会释放CPU的,也就是线程无需被挂起,这样就没有线程上下文切换的问题了。
因此,自旋锁一般用于在多核处理器中预计线程持有锁的时间很短(即锁操作所需的时间非常的短)情况,甚至时间短于两次线程上下文的切换的开销。
b) volatile的可见性:volatile除了保证单个变量的读/写具有原子性外,还有有一个很重要的特性就是对线程内存可见性的保证(即,对一个 volatile 变量的读,总是能看到(任意线程)对这个 volatile 变量最后的写入)。因为此处修改state字段(本文是Netty服务端主线程)的线程和使用该字段的线程(NioEventLoop所关联线程)不是同一个线程。因此通过volatile来修饰state字段来实现,通过主线程修改了EventLoop所关联的线程状态后,在NioEventLoop的事件循环中能立即正确感知其线程状态的变化,从而做出相应的操作。
④ 根据传入的参数,设置成员变量gracefulShutdownQuietPeriod、gracefulShutdownTimeout。这里分别为默认值,gracefulShutdownQuietPeriod为2秒,gracefulShutdownTimeout为15秒。
⑤ 如果NioEventLoop所关联的线程之前的状态为ST_NOT_STARTED,则说明该线程还未被启动过,那么启动该线程。
Q:为什么我们在执行关闭操作的时候,还需要特意去启动那些未启动的NioEventLoop线程了?
A:是这样的,在基于NIO的网络传输模式中,会在构建NioEventLoopGroup的时候就预先将一定数量的NioEventLoop给创建好(默认为操作系统可运行处理器数的2倍),而NioEventLoop在初始化的时候就会将其上的Selector给开启了。同时Selector的关闭是在『doStartThread()』方法中最后会去完成的事。关于『doStartThread()』方法将在后面详细展开。
好了,在完成将NioEventLoop所关联的线程状态修改为’ST_SHUTTING_DOWN’,也就说明关闭流程的开始。那么,接下来我们来看看NioEventLoop中是如果完成优雅的关闭的。
我们先来看看doStartThread()方法:
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
"before run() implementation terminates.");
}
try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.release();
if (!taskQueue.isEmpty()) {
logger.warn(
"An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
}
① 这里executor.execute方法底层会通过ThreadPerTaskExecutor.execute(Runnable)方法来创建并启动执行任务的唯一线程。然后启动的线程就会执行我们通过executor.execute方法提交上来的这个任务(具体的这块说明请见Netty 源码解析 ——— 服务端启动流程 (上))。
② 在Runnable任务中,会将当前的线程设置为NioEventLoop所关联的线程,即对成员变量thread赋值为Thread.currentThread()。然后执行『SingleThreadEventExecutor.this.run();』这里实际调用的是『NioEventLoop#run()』方法来进行事件循环操作。
③ 当事件循环操作退出后(当NioEventLoop需要关闭时,事件循环才会退出),进行关闭的后续操作。
当NioEventLoop已经处于使用状态(即,上面有Channel与其绑定),那么此时它会处于事件循环操作中;若NioEventLoop没有处于使用状态(即,该NioEventLoop已经被初始化构建好了,但还没有任何一个Channel与其绑定过),那么在执行shutdownGracefully()后,也会因为调用了doStartThread()方法,此时该NioEventLoop也会处于事件循环中。
那么,接下来我们就来看看NioEventLoop中事件循环对于优雅关闭都完成了哪些操作了?
『NioEventLoop#run()』:
protected void run() {
for (;;) {
try {
......
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
此处,我们仅对与优雅关闭流程相关的部分进行展开。
事件循环首先会对Selector上注册的Channel所就绪的I/O事件做处理,然后处理taskQueue中的任务以及时间已经到达的定时/周期性任务。最后,在每次事件循环的最后都会判断一次当前的线程状态,如果发现当前的线程状态处于正在关闭的状态(即,state >= ST_SHUTTING_DOWN)则会开始处理关闭流程,即:
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
注意,事件循环中将正常的工作流程放在了一个try-catch中,将关闭流程放在了另一个try-catch中,这是为了它们之间能够不会互相影响。这样即便工作流程抛出异常了,每次事件循环的最后依旧能够去处理关闭事件。
关闭流程主要分为两步:
① 『closeAll()』:
private void closeAll() {
selectAgain();
Set<SelectionKey> keys = selector.keys();
Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
for (SelectionKey k: keys) {
Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
channels.add((AbstractNioChannel) a);
} else {
k.cancel();
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, k, null);
}
}
for (AbstractNioChannel ch: channels) {
ch.unsafe().close(ch.unsafe().voidPromise());
}
}
获取该注册到这个Selector所有Channel所对应的SelectionKey,然后获取SelectionKey附加对象attachment(),若attachment是一个AbstractNioChannel对象则先让放入到channels集合中,否则直接调用『k.cancel(),即selectionKey.cancel()』操作将这个SelectableChannel从Selector上注销。最后遍历channels集合,依次取出AbstractNioChannel,进行AbstractNioChannel的关闭操作(『ch.unsafe().close(ch.unsafe().voidPromise());』)
- 如设置了Socket#SO_LINGER配置项(即,config().getSoLinger() > 0),则说明当需要关闭socket时,如果这时send buffer里还有数据没有发送完,则先尝试把send buffer中的数据发送完了再关闭socket。所以此时会先执行doDeregister()操作,将当前的SocketChannel从Selector上注销,然后将close()操作作为一个任务放到另一个执行器去执行,也就是说不在当前的NioEventLoop的线程上去执行当前SocketChannel的关闭操作,因为此时SocketChannel不会马上关闭,它需要尝试在l_linger time时间内将发送缓存区中的数据发送出去并等待对方的确认。在l_linger time时间之后socket才会真正的被关闭。
- 如果没有设置Socket#SO_LINGER配置项,则直接在NioEventLoop线程上进行SocketChannel/ServerSocektChannel的close()操作。并将outboundBuffer中所有还未发送出去的消息标志为操作失败(fail flush),然后关闭outboundBuffer,释放相关资源。在关闭socket之后,将SocketChannel/ServerSocketChannel从Selector上注销(即,『selectionKey.cancel()』。selectionKey表示一个SocketChannel/ServerSocketChannel注册到Selector的关联关系)。
- 触发‘channelInactive’事件和‘channelUnregistered’事件,这两个事件都会在ChannelPipeline中得以传播。但这两个事件的触发会被封装为一个任务提交至当前的NioEventLoop的taskQueue在随后被执行,这么做的原因是为了确保eventLoop 的close 操作不会因为调用’channelInactive’事件和‘channelUnregistered’事件而“堵塞”,因为这里的close操作涉及到 socket 的关闭和 selectionKey.cancel() 操作,这两步涉及NIO 网络的操作是很重要的。‘channelInactive’事件和‘channelUnregistered’事件都是入站事件,它们会依次顺序调用ChannelPipeline中的ChannelInboundHandler的channelInactive()方法以及channelUnregistered()方法。并且,ChannelPipeline中的head在处理‘channelUnregistered’事件时除了将该事件传播给ChannelPipeline中的下一个ChannelInboundHandler外,还会触发一个destroy()操作
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
// Remove all handlers sequentially if channel is closed and unregistered.
if (!channel.isOpen()) {
destroy();
}
}
该destroy()操作会删除ChannelPipeline中的所有的handler(除了head、tail之外),并触发每个Handler的handlerRemoved()方法。注意,这里handler的移除操作是先顺序移除head到tail间所有的ChannelInboundHandler,然后在顺序移除tail到head间所有的ChannelOutboundHandler。
② 『confirmShutdown()』:
protected boolean confirmShutdown() {
if (!isShuttingDown()) {
return false;
}
if (!inEventLoop()) {
throw new IllegalStateException("must be invoked from an event loop");
}
cancelScheduledTasks();
if (gracefulShutdownStartTime == 0) {
gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
}
if (runAllTasks() || runShutdownHooks()) {
if (isShutdown()) {
// Executor shut down - no new tasks anymore.
return true;
}
// There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period or
// terminate if the quiet period is 0.
// See https://github.com/netty/netty/issues/4241
if (gracefulShutdownQuietPeriod == 0) {
return true;
}
wakeup(true);
return false;
}
final long nanoTime = ScheduledFutureTask.nanoTime();
if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
return true;
}
if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
// Check if any tasks were added to the queue every 100ms.
// TODO: Change the behavior of takeTask() so that it returns on timeout.
wakeup(true);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// Ignore
}
return false;
}
// No tasks were added for last quiet period - hopefully safe to shut down.
// (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.)
return true;
}
首先,先简单的描述下『runAllTasks()』和『runShutdownHooks()』所会完成的操作:
a) runAllTasks():首先会将已经到运行时间的定时/周期性任务放入taskQueue中,然后依次执行taskQueue中的任务。当且仅当taskQueue中的任务都执行完了,该方法会返回true,并且会将最后一个任务执行完后此时的系统时间赋值为成员变量lastExecutionTime;否则,如果该taskQueue中没有要执行的任务,那么该方法会返回false。
b) runShutdownHooks():执行用户自定义的所有shutdownHook,比如我们通过(『nioEventloop.addShutdownHook(runnable)』方法来提交我们希望该NioEventLoop被关闭时所要执行的一些操作)。当shutdownHook都执行完了该方法会返回true,并且会在执行完最后一个showdownHook后将此时的系统时间赋值为成员变量lastExecutionTime;否则,如果没有任何需要执行的shutdownHook,即shutdownHooks集合为空,那么该方法将返回false。
接下来,我们来判断在什么条件下confirmShutdown()方法将返回true,以至于可以退出NioEventLoop的事件循环,继续doStartThread()的后续操作以完成最后的优雅关闭流程。
我们分两种情况来讨论:
① gracefulShutdownQuietPeriod == 0
如果taskQueue中待执行的任务,或者有到期的定时/周期性任务,再或者有用户自定义的shutdownHook任务,那么会在执行完任务后退出confirmShutdown方法,并返回true;否则,如果没有任务待执行的任务,那么‘nanoTime - lastExecutionTime > gracefulShutdownQuietPeriod’也会使得confirmShutdown()方法退出,并返回true。
② gracefulShutdownQuietPeriod > 0
- 从『if (runAllTasks() || runShutdownHooks())』这个判断语句中,我们能够确保只有在taskQueue中所有的任务都被执行完了,并且shutdownHooks集合中所有的shutdownHook也都执行完了之后,这个判断语句才会返回true。也就是说,当该if语句返回true时,我们能够确保所有的任务和shutdownHook都已经执行完了。
- 『nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout』:接下来我们判断,执行完上面所有任务(包括taskQueue中的任务、可执行的定时/周期性任务、所有的shutdownHook任务)所需的时间是否已经超过了优雅关闭的超时时间(gracefulShutdownTimeout),如果已经超过了,那么则退出confirmShutdown方法,并返回true。否则,继续下面的步骤
- 『nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod』:如果‘当前时间距离最后一次执行任务的时间’小于等于’优雅退出的平静期(gracefulShutdownQuietPeriod)’。则使NioEventLoop线程睡眠100ms后,退出confirmShutdown方法,并返回false,这时说明关闭操作是未被批准的,那么NioEventLoop的事件循环并不会退出,并且会在下次事件循的最后再次调用confirmShutdown()方法进行关闭操作的确认,也就是会从新执行步骤1;否则,如果‘当前时间距离最后一次执行任务的时间’大于’优雅退出的平静期(gracefulShutdownQuietPeriod)’,则退出confirmShutdown方法,并返回true。此时说明,在一个优雅退出的平静期(gracefulShutdownQuietPeriod)内都没有任何的任务被提交至该NioEventLoop线程上,那么我们就有希望能够安全的进行关闭。为什么说是有希望了?这是因为我们实在没有办法保证在此时用户不会通过execute()来提交一个任务。
我们用一个流程图来说明gracefulShutdownQuietPeriod、gracefulShutdownTimeout在confirmShutdown操作中起到的作用和关系(注意,下面并不是confirmShutdown()方法流程图):
好了,在结束NioEventLoop的事件循环后,我们继续来看doStartThread()的后续操作。
首先会将变量success设置为true,接下就是执行finally块中的代码了:
① 如果当前NioEventLoop线程的状态还不是处于关闭相关的状态的话,则通过自旋锁的方式将当前NioEventLoop线程的状态修改为’ST_SHUTTING_DOWN’。从我们当前优雅关闭的流程来说,当前NioEventLoop线程的此时就是ST_SHUTTING_DOWN了。
② 判断,如果NioEventLoop事件循环结束了,但是‘gracefulShutdownStartTime’成员变量却为0,则说明事件循环不是因为confirmShutdown()方法而导致的结束,那么就打印一个错误日志,告知当前的EventExecutor的实现是由问题的,因为事件循环的终止必须是通过调用confirmShutdown()方法来实现的,也就是说,事件循环能够正确退出,也就是因为关闭操作被确认了。
③ 此时会通过自旋锁的方式再次调用一次『confirmShutdown()』,以确保所有的NioEventLoop中taskQueue中所有的任务以及用户自定义的所有shutdownHook也都执行了。之后才会进行关闭操作。
④ cleanup():
protected void cleanup() {
try {
selector.close();
} catch (IOException e) {
logger.warn("Failed to close a selector.", e);
}
}
会将当前NioEventLoop所关联的Selector关闭。
⑤ 修改NioEventLoop线程的状态为’ST_TERMINATED’。注意,在此操作完成之后,所有提交至该NioEventLoop显示的任务都会被拒绝,也就是该NioEventLoop不会再接收任何的任务了。
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (!offerTask(task)) {
reject(task);
}
}
final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
return taskQueue.offer(task);
}
public boolean isShutdown() {
return state >= ST_SHUTDOWN;
}
⑥ threadLock.release():threadLock是一个初始化资源为0的信号量,此操作会使得信号量的资源+1。那么这种情况下,如果有用户操作了awaitTermination方法的话(该方法底层会通过『threadLock.tryAcquire(timeout, unit)』来阻塞的尝试获取信号量的资源),该方法就会结束阻塞并返回,当然它也可以因为设置的等待超时间已到而返回。
⑦ 此时会再次判断该NioEventLoop的taskQueue是否为空,如果为非空,只会打印警告日志,告知用户,当前NioEventLoop在退出时仍有未完成的任务。而这个任务可能是在步骤③完成后,步骤⑤完成之前,又有用户提交上来的。
⑧ 设置该优雅关闭异步操作为成功完成。
后记
好了,整个NioEventLoopGroup的整个优雅关闭流程就分析完了,一句简单『nioEventLoopGroup.shutdownGracefully()』操作背后竟然有着如此复杂的关闭流程,再次佩服Netty为我们将复杂的流程给封闭化,而提供最为简便的API供用户来更好更方便的去使用它。
若文章有任何错误,望大家不吝指教:)