我为 Netty 贡献源码 | 且看 Netty 如何应对 TCP 连接的正常关闭,异常关闭,半关闭场景(上)

本系列Netty源码解析文章基于 4.1.56.Final版本

写在前面.....

本文是笔者肉眼盯 Bug 系列的第三弹,前两弹分别是:

而在本篇文章中笔者又用肉眼盯出了 Netty 在处理 TCP 连接半关闭时的一个 Bug。

image.png
image.png

那么在接下来的内容中,笔者会随着源码深入的解读慢慢的为大家一层一层地拨开迷雾,带大家来一步一步分析这个 Bug 产生的原因以及造成的影响,并逐步带大家把这个 Bug 修复掉。

下面就让我们一起带着怀疑,审视,欣赏,崇敬,敬畏的态度来一起品读世界顶级程序员编写出的代码。由衷的感谢他们在这一领域做出的贡献。

本文概要.png

在笔者前边关于 Netty Reactor 的系列文章中,我们详细的分析了 Reactor 的创建启动运行,以及接收网络连接接收网络数据,然后通过 pipeline 对 IO 事件的编排处理,最后到发送网络数据的一整套流程实现。相信大家通过对这一系列文章的阅读思考,已经对 Reactor 在 Netty 中的实现有了一个全面并且深刻的认识。

那么现在就到了关闭连接的时刻了,在本文中笔者将带大家一起剖析下关闭连接在 Netty 中的整个实现逻辑。

在 Netty 中对于用户关闭连接的处理分为三大模块:

  1. 处理正常的 TCP 连接关闭。

  2. 处理异常的 TCP 连接关闭。

  3. 处理 TCP 连接半关闭的场景。

接下来,笔者就带大家从这三个连接关闭场景来全面分析下 Netty 是如何处理连接关闭的。

首先我们来看下最简单的场景 --- 正常的TCP连接关闭。

1. 正常 TCP 连接关闭

在进入源码实现之前,我们先来回顾下 TCP 连接关闭的整个流程,其实 Netty 中针对连接关闭的整个源码实现流程也是按照图中 TCP 连接关闭的四次挥手步骤进行的。

TCP连接关闭.png
  1. 首先 Netty 客户端在对应的 ChannelHandler 中调用 ctx.channel().close() 方法主动关闭连接,内核会向服务端发送一个 FIN 包,随即客户端连接进入 FIN_WAIT1 状态。
public class EchoClientHandler extends ChannelInboundHandlerAdapter {

   @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
       // 客户端连接进入 FIN_WAIT1 状态
       ctx.channel().close();
    }
}
  1. 服务端内核协议栈在接收到客户端发送过来的 FIN 包后,会自动回复客户端一个 ACK 包,随后会将文件结束符 EOF 插入到 Socket 接收缓冲区中的末尾。服务端连接状态进入 CLOSE_WAIT ,客户端接收到 ACK 包后进入FIN_WAIT2 状态。

  2. 当服务端内核协议栈将 EOF 插入到 Socket 的接收缓冲区时,这时 OP_READ 事件活跃,Reactor 线程随即会处理 channel 上的 OP_READ 事件,只不过此时从 channel 中读取到的字节数为 -1 ,表示对端发起了 channel 关闭请求。服务端开始执行连接关闭流程。

  3. 由于客户端调用的是 ctx.channel().close() 方法来关闭连接,相当于将 TCP 连接的读写通道同时关闭,所以客户端在 FIN_WAIT2 状态下无法在接收服务端发送的数据,但此时服务端处于 CLOSE_WAIT 状态下仍可向客户端发送数据,只不过客户端在接收到数据后会丢弃并发送 RST 报文给服务端。

  4. 服务端在 CLOSE_WAIT 状态下,调用 ctx.channel().close() 向客户端发送 FIN 包,随即进入 LAST_ACK 状态。

  5. 客户端在收到来自服务端的 FIN 包后,回复 ACK 包给服务端,完成四次挥手,随即进入 TIME_WAIT 状态,服务端在收到客户端的 ACK 包后结束 LAST_ACK 状态进入 CLOSE 状态。

Netty 中对于连接关闭的处理主要在第 3 步和第 5 步,剩下的逻辑均由内核协议栈处理完成。

从上述 TCP 关闭连接的四次挥手步骤中,我们可以看出 Netty 对于关闭连接的响应是通过处理 OP_READ 事件来完成的,而对于 OP_READ 事件的处理,笔者已经在 Netty如何高效接收网络数据 一文中详细介绍过了,这里我们直接来到 OP_READ 事件的处理函数中,聚焦于连接关闭逻辑的处理。

Reactor线程运行时结构.png

当 Reactor 线程轮询到 Channel 上有 OP_READ 事件活跃时,就会来到 NioEventLoop#processSelectedKey 函数中去处理活跃的 IO 事件,在本文的语义中 OP_READ 事件就表示连接关闭事件。

public final class NioEventLoop extends SingleThreadEventLoop {

   private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
      
                  .................省略..............

        try {
            int readyOps = k.readyOps();

                  .................省略..............

            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                //处理 OP_READ 事件,本文中表示连接关闭事件
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
}

最终会在 AbstractNioByteChannel#read 方法中完成对 OP_READ 事件的处理,下图中置灰的逻辑处理模块即为 Netty 在整个 OP_READ 事件处理中关于连接关闭事件的处理位置。

Netty 中关于 OP_READ 事件的处理一共分为两大模块,一块是针对接收连接上网络数据的处理。另一块则是本文的主题,针对连接关闭事件的处理。

连接关闭事件处理.png
public abstract class AbstractNioByteChannel extends AbstractNioChannel {

        @Override
        public final void read() {
            final ChannelConfig config = config();

            ..........省略连接半关闭处理........

            ..........省略获取allocHandle过程.......

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    byteBuf = allocHandle.allocate(allocator);
                    //记录本次读取了多少字节数
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    //如果本次没有读取到任何字节,则退出循环 进行下一轮事件轮询
                    // -1 表示客户端主动关闭了连接close或者shutdownOutput 这里均会返回-1
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        byteBuf.release();
                        byteBuf = null;
                        //当客户端主动关闭连接时(客户端发送fin1),会触发read就绪事件,这里从channel读取的数据会是-1
                        close = allocHandle.lastBytesRead() < 0;
                        if (close) {
                            // There is nothing left to read as we received an EOF.
                            readPending = false;
                        }
                        break;
                    }

                    .........省略.............

                } while (allocHandle.continueReading());

                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (close) {
                    //此时客户端发送fin1(fi_wait_1状态)主动关闭连接,服务端接收到fin,并回复ack进入close_wait状态
                    //在服务端进入close_wait状态 需要调用close 方法向客户端发送fin_ack,服务端才能结束close_wait状态
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                 ............省略...............
            } finally {
                 ............省略...............
            }
        }
    }

}

在前边 TCP 连接关闭的步骤 3 中我们提到,当服务端的内核协议栈接收到来自客户端的 FIN 包后,内核协议栈会向 Socket 的接收缓冲区插入文件结束符 EOF ,表示客户端已经主动发起了关闭连接流程,这时 NioSocketChannel 上的 OP_READ 事件活跃,随即 Reactor 线程会在 AbstractNioByteChannel#read 方法中处理 OP_READ 事件。

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {

    @Override
    protected int doReadBytes(ByteBuf byteBuf) throws Exception {
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        allocHandle.attemptedBytesRead(byteBuf.writableBytes());
        //读到EOF后,这里会返回-1
        return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
    }

}

Reactor 线程会通过 ByteBuf#writeBytes 方法读取 NioSocketChannel 中的数据,由于此时底层 Socket 接收缓冲区中只有一个 EOF 并没有其他接收数据,所以这里的 ByteBuf#writeBytes 方法会返回 -1。表示客户端已经发起了连接关闭流程,此时服务端连接状态为 CLOSE_WAIT ,客户端连接状态为 FIN_WAIT2 。

Netty处理TCP关闭流程.png
     boolean close = false;
     close = allocHandle.lastBytesRead() < 0;
     if (close) {
           closeOnRead(pipeline);
     }

当本次 read loop 从 Channel 中读取到的字节数为 -1 时,则进入 closeOnRead 方法,服务端开始关闭连接流程。

从上述 Netty 处理 TCP 正常关闭流程( Socket 接收缓冲区中只有 EOF ,没有其他正常接收数据)可以看出,这种情况下只会触发 ChannelReadComplete 事件而不会触发 ChannelRead 事件。

2. Netty 对 TCP 连接正常关闭的处理

       private void closeOnRead(ChannelPipeline pipeline) {
           //判断服务端连接接收方向是否关闭,这里肯定是没有关闭的
           if (!isInputShutdown0()) {
                if (isAllowHalfClosure(config())) {
                      .....省略TCP连接半关闭处理逻辑.......
                } else {
                    //如果不支持半关闭,则服务端直接调用close方法向客户端发送fin,结束close_wait状态进如last_ack状态
                    close(voidPromise());
                }
            } else {
                    .....省略TCP连接半关闭处理逻辑.......
            }
        }

众所周知 TCP 是一个面向连接的、可靠的、基于字节流的全双工传输层通信协议,既然它是全双工的,那就意味着 TCP 连接同时有一个读通道和写通道。

image.png

这里的 isInputShutdown0 方法是用来判断 TCP 连接上的读通道是否关闭,那么在当前情况下,服务端的读通道肯定还没有关闭,因为目前 Netty 还没有调用任何关闭连接的系统调用。

    @Override
    protected boolean isInputShutdown0() {
        return isInputShutdown();
    }

    @Override
    public boolean isInputShutdown() {
        return javaChannel().socket().isInputShutdown() || !isActive();
    }

至于这里为什么要对读通道是否关闭进行判断,笔者会在本文 TCP 连接半关闭相关处理章节为大家详细解释。

由于本小节介绍的是 TCP 连接正常关闭的场景,并不是半关闭,所以这里的 isAllowHalfClosure = false 。Reactor 线程进入 close 方法,执行真正的关闭流程。

2.1 close 方法发起 TCP 连接关闭流程

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

      @Override
      public void close(final ChannelPromise promise) {
            assertEventLoop();

            ClosedChannelException closedChannelException =
                    StacklessClosedChannelException.newInstance(AbstractChannel.class, "close(ChannelPromise)");

            close(promise, closedChannelException, closedChannelException, false);
      }

      private void close(final ChannelPromise promise, final Throwable cause,
                           final ClosedChannelException closeCause, final boolean notify) {

                      .........省略...........

      }

}

这里正是 netty 关闭 channel 的核心逻辑所在,而关闭 channel 的行为又分为主动关闭和被动关闭,如本例中,客户端主动调用 ctx.channel().close() 发起关闭流程为主动关闭方,而服务端则是被动关闭方。

而主动关闭方和被动关闭方在这里的传参是不一样的,我们先来看被动关闭方也就是本例中服务端在调用 close 方法的传参。

        @Override
        public void close(final ChannelPromise promise) {
            assertEventLoop();

            ClosedChannelException closedChannelException =
                    StacklessClosedChannelException.newInstance(AbstractChannel.class, "close(ChannelPromise)");
            close(promise, closedChannelException, closedChannelException, false);
        }
  • ChannelPromise promise:服务端作为被动关闭方,这里传入的 ChannelPromise 类型为 VoidChannelPromise ,表示调用方对处理结果并不关心,VoidChannelPromise 不可添加 Listener ,不可修改操作结果状态。
public final class VoidChannelPromise extends AbstractFuture<Void> implements ChannelPromise {

    @Override
    public VoidChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
        fail();
        return this;
    }

    @Override
    public boolean isDone() {
        return false;
    }

   @Override
    public boolean setUncancellable() {
        return true;
    }

    @Override
    public VoidChannelPromise setFailure(Throwable cause) {
        fireException0(cause);
        return this;
    }

    @Override
    public boolean trySuccess() {
        return false;
    }
   
}

而作为主动关闭方的客户端则需要监听 Channel 关闭的结果,所以这里传递的 ChannelPromise 参数为 DefaultChannelPromise 。

        ChannelFuture channelFuture = ctx.channel().close();
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                  ...........省略.......
            }
        });
    @Override
    public ChannelFuture close() {
        return close(newPromise());
    }

    @Override
    public ChannelPromise newPromise() {
        return new DefaultChannelPromise(channel(), executor());
    }
  • Throwable cause:当 Channel 关闭之后,需要清理 Channel 写入缓冲队列 ChannelOutboundBuffer 中的待发送数据,这里会将异常 cause 传递给用户的 writePromise ,通知用户 Channel 已经关闭,write 操作失败。这里传入的异常类型为 StacklessClosedChannelException 。
write事件传播流程.png

如图中所示,当用户调用 ctx.writeAndFlush(msg) 发送数据时,由于是异步发送 Netty 会在图中的第 2 步直接返回一个 ChannelFuture 给用户,发送成功或者发送失败都会通知这个 ChannelFuture 。如果在数据发送之前连接就关闭了,那么 Netty 就会把 StacklessClosedChannelException 异常通知给用户持有的这个 ChannelFuture。相关数据的发送细节,感兴趣的读者可以在回顾下笔者的 一文搞懂 Netty 发送数据全流程 这篇文章。

  • ClosedChannelException closeCause:这个参数和 Throwable cause 参数的作用差不多,都是用于在连接关闭的时候如果此时还有待发送数据未发送。就通知用户这里在参数中指定的异常。唯一不同的是 Throwable cause 负责通知给 Channel 发送数据缓冲队列 ChannelOutboundBuffer 中的 flushedEntry 队列。ClosedChannelException closeCause 负责通知给 ChannelOutboundBuffer 中的 unflushedEntry 队列。
ChannelOutboundBuffer结构.png

这里大家只需要理解个大概,稍微有个印象就行,笔者后面还会详细介绍。

  • boolean notify:由于在关闭 Channel 之后,会清理 Channel 对应的发送缓冲队列 ChannelOutboundBuffer 中存储的待发送数据,同时也会释放其中用于存储待发送数据用的 ByteBuffer ,当 ChannelOutboundBuffer 中的内存占用低于低水位线的时候,会触发 ChannelWritabilityChanged 事件。这里的参数 boolean notify 决定是否触发 ChannelWritabilityChanged 事件,由于当前是关闭操作,所以 notify = false ,不需要触发 ChannelWritabilityChanged 事件。

在介绍完 close 方法的各个参数之后,接下来我们来看一下具体的关闭逻辑:

2.1.1 连接关闭之前的校验工作

      // channel的关闭流程是否已经开始
      private boolean closeInitiated;

      // 关闭channel操作的指定future,来判断关闭流程进度 每个channel对应一个CloseFuture
      // 连接关闭之后,netty 会通知这个CloseFuture
      private final CloseFuture closeFuture = new CloseFuture(this);

      private void close(final ChannelPromise promise, final Throwable cause,
                           final ClosedChannelException closeCause, final boolean notify) {
            if (!promise.setUncancellable()) {
                //关闭操作如果被取消则直接返回
                return;
            }

            if (closeInitiated) {
                //如果此时channel已经开始关闭流程,则进入这里
                if (closeFuture.isDone()) {               
                    //如果channel已经关闭 则设置promise为success,如果promise是voidPromise类型则会跳过
                    safeSetSuccess(promise);
                } else if (!(promise instanceof VoidChannelPromise)) { 
                    //如果promise不是voidPromise,则会在关闭完成后 通过closeFuture设置promise success
                    closeFuture.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            promise.setSuccess();
                        }
                    });
                }
                // 直接返回,防止重复关闭
                return;
            }
  
            //当前channel现在开始进入正在关闭状态
            closeInitiated = true;

            .......关闭channel.........
        }

Netty 这里使用一个 boolean closeInitiated 变量来防止 Reactor 线程来重复执行关闭流程,因为 Channel 的关闭操作可以在多个业务线程中发起,这样就会导致多个业务线程向 Reactor 线程提交多个关闭 Channel 的任务。

除此之外,Netty 还为每一个 Channel 创建了一个 CloseFuture closeFuture,用来表示 Channel 关闭的相关进度状态。当 Channel 完成关闭后,Netty 会设置 closeFuture 为 success 状态,并通知 closeFuture 上注册的 listener 。

如果 closeInitiated == true 说明当前 Channel 的关闭操作已经开始,如果有多个业务线程先后提交过来多个关闭任务,Reactor 线程则会首先通过 closeFuture.isDone() 判断当前 Channel 是否已经完成关闭 ,如果 Channel 已经关闭,则会在 closeFuture 上注册的 listener 中设置关闭任务对应的 Promie 为 success ,进而通知到业务线程。

     protected final void safeSetSuccess(ChannelPromise promise) {
            if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
                logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
            }
    }

从这里也可以看出 VoidChannelPromise 表示一个空的 Promise ,不能对其设置 success 或者 fail , 更不能对其添加 listener 。一般用于不关心操作结果的场景。

如果此时 Channel 的关闭流程虽然已经开始但还未完成的情况下,则将关闭任务对应 Promise (在业务线程中持有)的通知动作封装成 ChannelFutureListener 并添加到 closeFuture 中。当 Channel 关闭后,closeFuture 会被设置为 success ,并通知其中注册的 ChannelFutureListener 。

image.png

2.1.2 Channel关闭前的准备工作

        private void close(final ChannelPromise promise, final Throwable cause,
                           final ClosedChannelException closeCause, final boolean notify) {
            
            ...........省略连接关闭之前的校验工作........

            //当前channel是否active,这里肯定是active的
            final boolean wasActive = isActive();
            final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            //将channel对应的写缓冲区channelOutboundBuffer设置为null 表示channel要关闭了,不允许继续发送数据
            //此时如果还在write数据,则直接释放bytebuffer,并立马 fail 相关writeFuture 并抛出newClosedChannelException异常
            //此时如果执行flush,则会直接返回
            this.outboundBuffer = null; 
            //如果开启了SO_LINGER,则需要先将channel从reactor中取消掉。避免reactor线程空转浪费cpu
            Executor closeExecutor = prepareToClose();

            .............省略关闭Channel逻辑流程.......
        }

通过 isActive() 获取 Channel 的状态 boolean wasActive ,由于此时我们还没有关闭 Channel,所以 Channel 现在的状态肯定是 active 的。之所以在关闭流程的一开始就获取 Channel 是否 active 的状态,是因为当我们关闭 Channel 之后,需要通过这个状态来判断 channel 是否是第一次从 active 变为 inactive ,如果是第一次,则会触发 ChannelInactive 事件在 Channel 对应的 pipeline 中传播。

在 Channel 关闭之前,还会将 Channel 对应的写入缓冲队列 ChannelOutboundBuffer 设置为 null ,表示 Channel 即将要关闭了,不允许业务线程在继续发送数据。

一文搞懂 Netty 发送数据全流程 一文中我们提到过,如果 Channel 准备关闭的时候,用户还在向 Channel 写入数据,则直接释放 bytebuffer ,并立马 fail 掉相关 ChannelPromise 并抛出 newClosedChannelException 异常。

        @Override
        public final void write(Object msg, ChannelPromise promise) {
            assertEventLoop();
            //获取当前channel对应的待写入数据缓冲队列(支持用户异步写入的核心关键)
            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            // outboundBuffer == null说明channel准备关闭了,直接标记发送失败。
            if (outboundBuffer == null) {
                try {
                    ReferenceCountUtil.release(msg);
                } finally {
                    safeSetFailure(promise,
                            newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
                }
                return;
            }

            .............省略.........
         }

如果此时用户还在执行 Channel 的 flush 操作发送数据,那么发送流程直接会 return 掉,停止发送数据。

        @Override
        public final void flush() {
            assertEventLoop();

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            //channel以关闭
            if (outboundBuffer == null) {
                return;
            }

            .........省略........
       }

2.1.3 针对 SO_LINGER 选项的处理

        @Override
        protected Executor prepareToClose() {
            try {
                if (javaChannel().isOpen() && config().getSoLinger() > 0) {
                    //在设置SO_LINGER后,channel会延时关闭,在延时期间我们仍然可以进行读写,这样会导致io线程eventloop不断的循环浪费cpu资源
                    //所以需要在延时关闭期间 将channel注册的事件全部取消。
                    doDeregister();

                    /**
                     * 设置了SO_LINGER,不管是阻塞socket还是非阻塞socket,在关闭的时候都会发生阻塞,所以这里不能使用Reactor线程来
                     * 执行关闭任务,否则Reactor线程就会被阻塞。
                     * */
                    return GlobalEventExecutor.INSTANCE;
                }
            } catch (Throwable ignore) {
            }
            //在没有设置SO_LINGER的情况下,可以使用Reactor线程来执行关闭任务
            return null;
        }
    }

要理解这段逻辑,首先我们需要理解 SO_LINGER 这个 Socket 选项,他会影响 Socket 的关闭行为。

在默认情况下,当我们调用 Socket 的 close 方法后 ,close 方法会立即返回,剩下的事情会交给内核协议栈帮助我们处理,如果此时 Socket 对应的发送缓冲区还有数据待发送,接下来内核协议栈会将 Socket 发送缓冲区的数据发送出去,随后会向对端发送 FIN 包关闭连接。注意:此时应用程序是无法感知到这些数据是否已经发送到对端的,因为应用程序在调用 close 方法后就立马返回了,剩下的这些都是内核在替我们完成。接着主动关闭方就进入了 TCP 四次挥手的关闭流程最后进入TIME_WAIT状态。

TCP连接关闭.png

而 SO_LINGER 选项会控制调用 close 方法关闭 Socket 的行为。

  struct linger {
      int l_onoff;   // linger active
      int l_linger;  // how many seconds to linger for
  };
  • l_onoff:表示是否开启 SO_LINGER 选项。0 表示关闭。默认情况下是关闭的。

  • int l_linger:如果开启了 SO_LINGER 选项,则该参数表示应用程序调用 close 方法后需要阻塞等待多长时间。单位为秒。

这两个参数的不同组合会影响到 Socket 的关闭行为:

  • l_onoff = 0 时 l_linger 的值会被忽略,属于我们上边讲述的默认关闭行为。

  • l_onoff = 1,l_linger > 0:这种情况下,应用程序调用 close 方法后就不会立马返回,无论 Socket 是阻塞模式还是非阻塞模式,应用程序都会阻塞在这里。直到以下两个条件其中之一发生,才会解除阻塞返回。随后进行正常的四次挥手关闭流程。

    • 当 Socket 发送缓冲区的数据全部发送出去,并等到对端 ACK 后,close 方法返回。
    • 应用程序在 close 方法上的阻塞时间到达 l_linger 设置的值后,close 方法返回。
so_linger关闭.png
  • l_onoff = 1,l_linger = 0:这种情况下,当应用程序调用 close 方法后会立即返回,随后内核直接清空 Socket 的发送缓冲区,并向对端发送 RST 包,主动关闭方直接跳过四次挥手进入 CLOSE 状态,注意这种情况下是不会有 TIME_WAIT 状态的。
RST关闭连接.png

Netty 也提供了 SO_LINGER 选项的设置,由于一般关闭连接的行为都是由客户端发起,我们以 Netty 客户端代码为例说明:

public final class EchoClient {

        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .option(ChannelOption.SO_LINGER, 2)
              ..........省略........
        }
}
public class DefaultSocketChannelConfig extends DefaultChannelConfig
                                        implements SocketChannelConfig {

    @Override
    public SocketChannelConfig setSoLinger(int soLinger) {
        try {
            if (soLinger < 0) {
                javaSocket.setSoLinger(false, 0);
            } else {
                javaSocket.setSoLinger(true, soLinger);
            }
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
        return this;
    }

}

默认情况下 SO_LINGER 选项是关闭的,在 JDK 底层设置 SO_LINGER 选项的方法 setSoLinger 中,参数 on 对应 l_onoff ,参数 linger 对应 l_linger ,单位为秒。

public void setSoLinger(boolean on, int linger) throws SocketException 

当我们理解了 SO_LINGER 选项的工作原理及其应用之后,现在回过头来在看 prepareToClose 方法的逻辑就很容易理解了。

        @Override
        protected Executor prepareToClose() {
            try {
                if (javaChannel().isOpen() && config().getSoLinger() > 0) {
                    //在设置SO_LINGER后,channel会延时关闭,在延时期间我们仍然可以进行读写,这样会导致io线程eventloop不断的循环浪费cpu资源
                    //所以需要在延时关闭期间 将channel注册的事件全部取消。
                    doDeregister();

                    /**
                     * 设置了SO_LINGER,不管是阻塞socket还是非阻塞socket,在关闭的时候都会发生阻塞,所以这里不能使用Reactor线程来
                     * 执行关闭任务,否则Reactor线程就会被阻塞。
                     * */
                    return GlobalEventExecutor.INSTANCE;
                }
            } catch (Throwable ignore) {
            }
            //在没有设置SO_LINGER的情况下,可以使用Reactor线程来执行关闭任务
            return null;
        }

首先我们来关注下 prepareToClose 方法的返回值,它会返回一个 Executor ,这个 Executor 用于执行真正的 Channel 关闭任务。

大家这里可能会有疑问,Channel 上的 IO 操作之前不都是由 Reactor 线程负责执行吗?为什么这里需要用一个单独的 Executor 来执行呢?

原因就是如果我们设置了 SO_LINGER 选项 config().getSoLinger() > 0 ,如果继续采用 Reactor 线程执行 Channel 关闭的动作,那么在这种情况下底层Socket 的 close 方法会阻塞 Reactor 线程,直到 Socket 发送缓冲区中的数据全部发送出去并收到对端 ACK ,或者 linger 指定的超时时间到达。

由于 Reactor 线程负责多个 Channel 上的 IO 处理,如果被阻塞在这里,就会影响其他 Channel 上的 IO 处理,降低吞吐。所以当我们设置了 SO_LINGER 选项时,就不能使用 Reactor 线程来执行 Channel 关闭的动作,而是用GlobalEventExecutor.INSTANCE来负责执行 Channel 的关闭动作。

如果我们没有设置 SO_LINGER 选项,底层 Socket 的 close 方法会立即返回并不会阻塞,所以这种情况下,依然会使用 Reactor 线程来执行 Channel 的关闭动作。

prepareToClose 方法这种情况下会返回 null ,表示默认采用 Reactor 线程来执行 Channel 的关闭。

这里还有一个重要的点需要和大家强调的是,当我们设置了 SO_LINGER 选项之后,Channel 的关闭动作会被阻塞并延时关闭,在延时关闭期间,Reactor 线程依然可以响应 OP_READ 事件和 OP_WRITE 事件,这可能会导致 Reactor 线程不断的自旋循环浪费 CPU 资源,所以基于这个原因,netty 这里需要将 Channel 从 Reactor 上注销掉。这样 Reactor 线程就不会在响应 Channel 上的 IO 事件了。

2.1.4 doDeregister 注销 Channel

public abstract class AbstractNioChannel extends AbstractChannel {

   //channel注册到Selector后获得的SelectKey
    volatile SelectionKey selectionKey;

    @Override
    protected void doDeregister() throws Exception {
        eventLoop().cancel(selectionKey());
    }

    protected SelectionKey selectionKey() {
        assert selectionKey != null;
        return selectionKey;
    }
}
public final class NioEventLoop extends SingleThreadEventLoop {
    //记录socketChannel从Selector上注销的个数 达到256个 则需要将无效selectKey从SelectedKeys集合中清除掉
    private int cancelledKeys;

    private static final int CLEANUP_INTERVAL = 256;
    /**
     * 将socketChannel从selector中注销 取消监听IO事件
     * */
    void cancel(SelectionKey key) {
        key.cancel();
        cancelledKeys ++;
        // 当从selector中注销的socketChannel数量达到256个,设置needsToSelectAgain为true
        // 在io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain 中重新做一次轮询,将失效的selectKey移除,
        // 以保证selectKeySet的有效性
        if (cancelledKeys >= CLEANUP_INTERVAL) {
            cancelledKeys = 0;
            needsToSelectAgain = true;
        }
    }

}

Channel 在向 Reactor 中的 Selector 注册成功后,会得到一个 SelectionKey 。这个 SelectionKey 可以理解成 Channel 在 Selector 中的模型。

当 Channel 需要将自己从 Selector 中注销掉时,直接可以通过调用对应的 SelectionKey#cancel 方法。此时调用 SelectionKey#isValid 将会返回 false 。

SelectionKey#cancel 方法调用后,Selector 会将要取消的这个 SelectionKey 加入到 Selector 中的 cancelledKeys 集合中。

public abstract class AbstractSelector extends Selector {

    private final Set<SelectionKey> cancelledKeys = new HashSet<SelectionKey>();

    void cancel(SelectionKey k) {                      
        synchronized (cancelledKeys) {
            cancelledKeys.add(k);
        }
    }
}

随后在 Selector 的下一次轮询过程中,会将 cancelledKeys 集合中的 SelectionKey 从 Selector 中所有的 KeySet 中移除。这里的 KeySet 包括Selector用于存放 IO 就绪 SelectionKey 的 selectedKeys 集合,以及用于存放所有在 Selector 上注册的 Channel 对应 SelectionKey 的 keys 集合。

public abstract class SelectorImpl extends AbstractSelector {

    protected Set<SelectionKey> selectedKeys = new HashSet();
    protected HashSet<SelectionKey> keys = new HashSet();
    
     .....................省略...............
}

这里需要注意的是当我们调用 SelectionKey#cancel 方法后,该 SelectionKey 并不会立马从 Selector 中删除,只不过此时调用 SelectionKey#isValid 方法会返回 false 。需要等到下次轮询 selector.selectNow() 的时候,被取消掉的 SelectionKey 才会从 Selector 中被删除掉。

当在本次轮询期间,假如有大量的 Channel 从 Selector 中注销,就绪集合 selectedKeys 中依然会保存这些 Channel 对应 SelectionKey 直到下次轮询。那么当然会影响本次轮询结果 selectedKeys 的有效性,增加了许多不必要的遍历开销。

所以 netty 在 NioEventLoop#cancel 方法中做了一个优化来保证 Selector 中的 IO 就绪集合 selectedKeys 的有效性,当 Selector 中注销的 Channel 数量 cancelledKeys 超过 CLEANUP_INTERVAL = 256 个时,就会将 needsToSelectAgain 标志设置为 true 。

    private void processSelectedKeysOptimized() {
        for (int i = 0; i < selectedKeys.size; ++i) {

            ......循环处理Selector中的IO就绪集合selectedKeys.....

            if (needsToSelectAgain) {
                selectedKeys.reset(i + 1);
                selectAgain();
                i = -1;
            }
        }
    }

当 Reactor 线程在循环遍历处理 Selector 中的 IO 活跃 Channel 时,如果
needsToSelectAgain = true ,那么就会立马执行一次 selector.selectNow() ,目的就是为了清除 Selector 中已经注销的 Selectionkey ,从而保证IO就绪集合 selectedKeys 的有效性。

    private void selectAgain() {
        needsToSelectAgain = false;
        try {
            selector.selectNow();
        } catch (Throwable t) {
            logger.warn("Failed to update SelectionKeys.", t);
        }
    }

2.1.5 Channel 的关闭

image.png

prepareToClose 方法返回的 closeExecutor 是用来执行 Channel 关闭操作的,当我们开启了 SO_LINGER 选项时,closeExecutor = GlobalEventExecutor.INSTANCE ,避免了 Reactor 线程的阻塞。

由 GlobalEventExecutor 负责执行 doClose0 方法关闭 Channel 底层的 Socket,并通知 closeFuture 关闭结果。

        private void close(final ChannelPromise promise, final Throwable cause,
                           final ClosedChannelException closeCause, final boolean notify) {
            
            ...........省略重进入关闭流程处理........

            ...........省略Channel关闭前的准备工作........

            Executor closeExecutor = prepareToClose();
            if (closeExecutor != null) {
                closeExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            // 在GlobalEventExecutor中执行channel的关闭任务,设置closeFuture,promise success
                            doClose0(promise);
                        } finally {
                            // reactor线程中执行
                            invokeLater(new Runnable() {
                                @Override
                                public void run() {
                                    if (outboundBuffer != null) {
                                        // cause = closeCause = ClosedChannelException, notify = false
                                        // 此时channel已经关闭,需要清理对应channelOutboundBuffer中的待发送数据flushedEntry
                                        outboundBuffer.failFlushed(cause, notify);
                                        //循环清理channelOutboundBuffer中的unflushedEntry
                                        outboundBuffer.close(closeCause);
                                    }
                                    //这里的active = true
                                    //关闭channel后,会将channel从reactor中注销,首先触发ChannelInactive事件,然后触发ChannelUnregistered
                                    fireChannelInactiveAndDeregister(wasActive);
                                }
                            });
                        }
                    }
                });
            } else {
                 ...........省略在Reactor中Channel关闭的逻辑........
            }
        }

当 Channel 的关闭操作在 closeExecutor 线程中执行完毕之后,此时 Channel 从物理上就已经关闭了,但是 Channel 中还有一些遗留的东西需要清理,比如 Channel 对应的写入缓冲队列 ChannelOutboundBuffer 中的待发送数据需要被清理掉,并通知用户线程由于 Channel 已经关闭,导致数据发送失败。

同时 Netty 也需要让用户感知到 Channel 已经关闭的事件,所以还需要在关闭 Channel 对应的 pipeline 中触发 ChannelInactive 事件和 ChannelUnregistered 事件。

而以上列举的这两点清理 Channel 的相关工作则需要在 Reactor 线程中完成,不能在 closeExecutor 线程中完成。这是处于线程安全的考虑,因为在 Channel 关闭之前,对于 ChannelOutboundBuffer 以及 pipeline 的操作均是由 Reactor 线程来执行的,Channel 关闭之后相关的清理工作理应继续由 Reactor 线程负责,避免多线程执行产生线程安全问题。

2.1.5.1 doClose0 关闭 Channel

        // 关闭channel操作的指定future,来判断关闭流程进度 每个channel一个
        private final CloseFuture closeFuture = new CloseFuture(this);

        private void doClose0(ChannelPromise promise) {
            try {
                // 关闭channel,此时服务端向客户端发送fin2,服务端进入last_ack状态,客户端收到fin2进入time_wait状态
                doClose();
                // 设置clostFuture的状态为success,表示channel已经关闭
                // 调用shutdownOutput则不会通知closeFuture
                closeFuture.setClosed();
                // 通知用户promise success,关闭操作已经完成
                safeSetSuccess(promise);
            } catch (Throwable t) {
                closeFuture.setClosed();
                // 通知用户线程关闭失败
                safeSetFailure(promise, t);
            }
        }

首先调用 doClose() 方法关闭底层 JDK 中的 SocketChannel 。

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {

    @Override
    protected void doClose() throws Exception {
        super.doClose();
        javaChannel().close();
    }

}

这里大家需要注意的一个点是,在 JDK 底层 SocketChannel 的关闭方法中,同样也会将该 Channel 关联的所有 SelectionKey 取消掉。因为在 prepareToClose 方法中我们提到,只有我们设置了 SO_LINGER 选项时,才会在 prepareToClose 方法中调用 doDeregister 方法将 Channel 关联的 SelectionKey 从 Selector 中取消掉。

而当我们没有设置 SO_LINGER 选项时,则不会提前调用 doDeregister 方法取消。所以需要在这里真正关闭 Channel 的地方,将其关联的所有 SelectionKey 取消掉。

    public final void close() throws IOException {
        synchronized (closeLock) {
            if (!open)
                return;
            open = false;
            implCloseChannel();
        }
    }

    protected final void implCloseChannel() throws IOException {
        implCloseSelectableChannel();
        synchronized (keyLock) {
            int count = (keys == null) ? 0 : keys.length;
            //关闭与该Channel相关的所有SelectionKey
            for (int i = 0; i < count; i++) {
                SelectionKey k = keys[i];
                if (k != null)
                    k.cancel();
            }
        }
    }

当我们调用了 doClose() 方法后,此时服务端的内核协议栈就会向客户端发出 FIN 包,服务端结束 CLOSE_WAIT 状态进入 LAST_ACK 状态。客户端收到服务端的 FIN 包后,向服务端回复 ACK 包,随后客户端进入 TIME_WAIT 状态。服务端收到客户端的 ACK 包后结束 LAST_ACK 状态进入 CLOSE 状态。

TCP连接关闭.png

当调用 doClose() 完成 Channel 的关闭后,就会调用 closeFuture.setClosed() 通知 Channel 的 closeFuture 关闭成功。

static final class CloseFuture extends DefaultChannelPromise {

        boolean setClosed() {
            return super.trySuccess();
        }

}

随后调用 safeSetSuccess(promise) 通知用户的 promise 关闭成功。

image.png

2.1.5.2 清理 ChannelOutboundBuffer

这里大家需要注意:清空 ChannelOutboundBuffer 的操作是在 Reactor 线程中执行的。

ChannelOutboundBuffer结构.png
       if (outboundBuffer != null) {
                // Fail all the queued messages
                // cause = closeCause = ClosedChannelException, notify = false
                // 此时channel已经关闭,需要清理对应channelOutboundBuffer中的待发送数据flushedEntry
                outboundBuffer.failFlushed(cause, notify);
                //循环清理channelOutboundBuffer中的unflushedEntry
                outboundBuffer.close(closeCause);
       }

当 Channel 关闭之后,此时 Channel 中的写入缓冲队列 ChannelOutboundBuffer 中可能会有一些待发送数据,这时就需要将这些待发送数据从 ChannelOutboundBuffer 中清除掉。

通过调用 ChannelOutboundBuffer#failFlushed 方法,循环遍历 flushedEntry 指针到 tailEntry 指针之间的 Entry 对象,将其从 ChannelOutboundBuffer 链表中删除,并释放 Entry 对象中封装的 byteBuffer ,通知用户的 promise 写入失败。并回收 Entry 对象实例。

public final class ChannelOutboundBuffer {

    void failFlushed(Throwable cause, boolean notify) {
        if (inFail) {
            return;
        }

        try {
            inFail = true;
            for (;;) {
                // 循环清除channelOutboundBuffer中的待发送数据
                // 将entry从buffer中删除,并释放entry中的bytebuffer,通知promise failed
                if (!remove0(cause, notify)) {
                    break;
                }
            }
        } finally {
            inFail = false;
        }
    }

    private boolean remove0(Throwable cause, boolean notifyWritability) {
        Entry e = flushedEntry;
        if (e == null) {
            //清空当前reactor线程缓存的所有待发送数据
            clearNioBuffers();
            return false;
        }
        Object msg = e.msg;

        ChannelPromise promise = e.promise;
        int size = e.pendingSize;
        //从channelOutboundBuffer中删除该Entry节点
        removeEntry(e);

        if (!e.cancelled) {
            // only release message, fail and decrement if it was not canceled before.
            //释放msg所占用的内存空间
            ReferenceCountUtil.safeRelease(msg);
            //编辑promise发送失败,并通知相应的Lisener
            safeFail(promise, cause);
            //由于msg得到释放,所以需要降低channelOutboundBuffer中的内存占用水位线,并根据notifyWritability决定是否触发ChannelWritabilityChanged事件
            decrementPendingOutboundBytes(size, false, notifyWritability);
        }

        // recycle the entry
        //回收Entry实例对象
        e.recycle();

        return true;
    }
}

在 remove0 方法中 netty 会将已经关闭的 Channel 对应的 ChannelOutboundBuffer 中还没来得及 flush 进 Socket 发送缓存区中的数据全部清除掉。这部分数据就是上图中 flushedEntry 指针到 tailEntry 指针之间的 Entry对象。

Entry 对象中封装了用户待发送数据的 ByteBuffer,以及用于通知用户发送结果的 promise 实例。

这里需要将这些还未来得及 flush 的 Entry 节点从 ChannelOutboundBuffer 中全部清除,并释放这些 Entry 节点中包裹的发送数据 msg 所占用的内存空间。并标记对应的 promise 为失败同时通知对应的用户 listener 。

以上的清理逻辑主要是应对在 Channel 即将关闭之前,用户极限调用 flush 操作想要发送数据的情况。

另外还有一种情况 Netty 这里需要考虑处理,由于在关闭 Channel 之前,用户可能还会向 ChannelOutboundBuffer 中 write 数据,但还未来得及调用 flush 操作,这就导致了 ChannelOutboundBuffer 中在 unflushedEntry 指针与 tailEntry 指针之间还可能会有数据。

之前我们清理的是 flushedEntry 指针与 tailEntry 指针之间的数据,这里大家需要注意区分。

所以还需要调用 ChannelOutboundBuffer#close 方法将这一部分数据全部清理掉。

public final class ChannelOutboundBuffer {

  void close(final Throwable cause, final boolean allowChannelOpen) {
        if (inFail) {
            channel.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    close(cause, allowChannelOpen);
                }
            });
            return;
        }

        inFail = true;

        if (!allowChannelOpen && channel.isOpen()) {
            throw new IllegalStateException("close() must be invoked after the channel is closed.");
        }

        if (!isEmpty()) {
            throw new IllegalStateException("close() must be invoked after all flushed writes are handled.");
        }

        // Release all unflushed messages.
        //循环清理channelOutboundBuffer中的unflushedEntry,因为在执行关闭之前有可能用户有一些数据write进来,需要清理掉
        try {
            Entry e = unflushedEntry;
            while (e != null) {
                // Just decrease; do not trigger any events via decrementPendingOutboundBytes()
                int size = e.pendingSize;
                TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);

                if (!e.cancelled) {
                    //释放unflushedEntry中的bytebuffer
                    ReferenceCountUtil.safeRelease(e.msg);
                    //通知unflushedEntry中的promise failed
                    safeFail(e.promise, cause);
                }
                e = e.recycleAndGetNext();
            }
        } finally {
            inFail = false;
        }
        //清理channel用于缓存JDK nioBuffer的 threadLocal缓存NIO_BUFFERS
        clearNioBuffers();
    }

}

当我们清理完 ChannelOutboundBuffer 中的残留数据之后,ChannelOutboundBuffer 中的内存水位线就会下降,由于当前是关闭操作,所以这里的 notifyWritability = false ,不需要触发 ChannelWritabilityChanged 事件。

关于对 ChannelOutboundBuffer 的详细操作,笔者已经在 一文搞懂 Netty 发送数据全流程 一文中详细介绍过了,忘记的同学可以在回顾下这篇文章。

2.1.5.3 触发 ChannelInactive 事件和 ChannelUnregistered 事件

在 Channel 关闭之后并清理完 ChannelOutboundBuffer 中遗留的待发送数据,就该在 Channel 的 pipeline 中触发 ChannelInactive 事件和 ChannelUnregistered 事件了。同样以下的这些操作也都是在 Reactor 线程中执行的。

       private void fireChannelInactiveAndDeregister(final boolean wasActive) {
            //wasActive && !isActive() 条件表示 channel的状态第一次从active变为 inactive
            //这里的wasActive = true  isActive()= false
            deregister(voidPromise(), wasActive && !isActive());
        }

这里传递进来的参数 wasActive = true ,在我们关闭 Channel 之前会通过 isActive() 先获取一次,在该方法中通过 wasActive && !isActive() 判断 Channel 是否是第一次从 active 状态变为 inactive 状态。如果是,则触发后续的 ChannelInactive 事件。

        private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
            if (!promise.setUncancellable()) {
                return;
            }

            if (!registered) {
                safeSetSuccess(promise);
                return;
            }

            invokeLater(new Runnable() {
                @Override
                public void run() {
                    try {
                        //将channel从reactor中注销,reactor不在监听channel上的事件
                        doDeregister();
                    } catch (Throwable t) {
                        logger.warn("Unexpected exception occurred while deregistering a channel.", t);
                    } finally {
                        if (fireChannelInactive) {
                            //当channel被关闭后,触发ChannelInactive事件
                            pipeline.fireChannelInactive();
                        }

                        if (registered) {
                            //如果channel没有注册,则不需要触发ChannelUnregistered
                            registered = false;
                            //随后触发ChannelUnregistered
                            pipeline.fireChannelUnregistered();
                        }
                        //通知deRegisterPromise
                        safeSetSuccess(promise);
                    }
                }
            });
        }

注意这里又会调用 doDeregister() 方法将 Channel 从 Reactor 上注销,到目前为止,我们已经看到有三个地方执行注销 Channel 的操作了。

  • 第一次是在 prepareToClose() 方法中,当我们设置了 SO_LINGER 选项后,为了防止 Reactor 线程在延时关闭期间,还在不停的自旋循环响应 OP_READ 事件和 OP_WRITE 事件从而造成浪费 CPU 资源,我们需要 doDeregister() 方法将 Channel 从 Reactor 上取消。

  • 第二次是在真正的关闭 Channel 的时候,JDK 底层在关闭 SocketChannel 的时候又会将 Channel 从 Selector 上取消。应对关闭 SO_LINGER 选项的情况

  • 第三次就是在本小节中,触发 ChannelUnregistered 事件之前,又会调用 doDeregister() 方法将 Channel 从 Reactor 上取消。

这里大家可能会有疑问,这第三次注销操作是应对哪种情况呢?

首先 JDK NIO 底层在将 Channel 从 Selector 上注销的时候做了防重处理,多次调用注销操作是没有影响的。

另外这个方法可能会在用户的 ChannelHandler 中被调用,因为用户的行为我们无法预知,用户可能在 Channel 关闭前调用,所以这里还是需要调用一次 doDeregister() 方法。为的就是应对用户在 ChannelHandler 中主动注销 Channel 同时不希望 Channel 关闭的场景。

        // 仅仅是注销 Channel,但是 Channel 不会关闭
        ctx.deregister();
        ctx.channel().deregister();

在调用完 doDeregister() 方法之后,netty 紧接着就会在 Channel 的 pipeline 中触发 ChannelInactive 事件以及 ChannelUnregistered 事件,并且这两个事件只会被触发一次。

在接收连接的时候,当 Channel 向 Reactor 注册成功之后,是先触发 ChannelRegistered 事件后触发 ChannelActive 事件。

在关闭连接的时候,当 Channel 从 Reactor 中取消注册之后,是先触发 ChannelInactive 事件后触发 ChannelUnregistered 事件

这里大家还需要注意的一个点是,以上的逻辑会封装在 Runnable 中被提交到 Reactor 的任务队列中延迟执行那么这里为什么要延迟执行呢

这里延后 deRegister 操作的原因是用于处理一种极端的异常情况,前边我们提到 Channel 的 deregister() 操作是可以在用户的 ChannelHandler 中执行的,用户行为是不可预知的。

我们想象一下这样的一个场景:假如当前 pipeline 中还有事件传播(比如正在处理编码解码),与此同时 deregister() 方法可能会在某个事件回调中被用户调用,导致其它事件在传播的过程中,Channel 被从 Reactor 上注销掉了。

并且同时 channel 又注册到新的 Reactor 上。如果此时旧的 Reactor 正在处理 pipeline 上的事件而旧 Reactor 还未处理完的数据理应继续在旧的 Reactor 中处理,如果此时我们立马执行 deRegister ,未处理完的数据就会在新的 Reactor 上处理,这样就会导致一个 handler 被多个 Reactor 线程处理导致线程安全问题。所以需要延后 deRegister 的操作。


到这里呢,关于 netty 如何处理 TCP 连接正常关闭的逻辑,笔者就为大家全部介绍完了,不过还留了一个小小的尾巴,就是当我们未设置 SO_LINGER 选项时,Channel 的关闭操作会直接在 Reactor 线程中执行。closeExecutor 这种情况下会是 null 。

        private void close(final ChannelPromise promise, final Throwable cause,
                           final ClosedChannelException closeCause, final boolean notify) {
            
            ...........省略重进入关闭流程处理........

            ...........省略Channel关闭前的准备工作........

            Executor closeExecutor = prepareToClose();
            if (closeExecutor != null) {
                ...........省略在closeExecutor中Channel关闭的逻辑........
            } else {
                try {
                    // Close the channel and fail the queued messages in all cases.
                    doClose0(promise);
                } finally {
                    if (outboundBuffer != null) {
                        // Fail all the queued messages.
                        outboundBuffer.failFlushed(cause, notify);
                        outboundBuffer.close(closeCause);
                    }
                }

                // 此时 Channel 已经关闭,如果此时用户还在执行 flush 操作
                // netty 则会在 flush 方法的处理中处理 Channel 关闭的情况
                // 所以这里 deRegister 操作需要延后到 flush 方法处理完之后
                if (inFlush0) {
                    invokeLater(new Runnable() {
                        @Override
                        public void run() {
                            fireChannelInactiveAndDeregister(wasActive);
                        }
                    });
                } else {
                    fireChannelInactiveAndDeregister(wasActive);
                }                 
            }
        }

这里可以看到其实逻辑都是一样的。都是先调用 doClose0 关闭 JDK NIO 底层的 SocketChannel ,然后清理 ChannelOutboundBuffer 中遗留的待发送数据,最后触发 ChannelInactive 事件和 ChannelUnregistered 事件。

image.png

3. TCP 连接的异常关闭

image.png

在本文前边的内容中,我们介绍了 TCP 数据包中的 SYN 包,FIN 包,ACK 包的使用场景,它们都是通过 TCP 首部协议中的 8 位控制位来标识,不同的控制位代表不同的含义。

第二小节介绍的内容均属于 TCP 在正常情况下进行的连接的建立,发送数据,关闭连接。

而现实中情况往往是复杂的,TCP 连接不可能总是处于正常的状态,那么当 TCP 连接出现异常时,就需要有一种机制让我们来强制关闭连接,这个就是本小节要介绍的 RST 包用于异常情况下强制关闭 TCP 连接。

由于 RST 包是用来处理 TCP 连接的异常情况的,所以当本端发送一个 RST 包给对端之后,并不需要对端回复 ACK 确认包。

通讯方不管是发出或者是收到一个 RST 包 ,都会导致内存,端口等连接资源被释放,并且跳过正常的 TCP 四次挥手关闭流程直接强制关闭,Socket 缓冲区的数据来不及处理直接被丢弃。

当通讯端收到一个 RST 包后,如果仍然对 Socket 进行读取,那么就会抛出 connection has been reset by the peer 异常,如果仍然对 Socket 进行写入,就会抛出 broken pipe 异常。应用程序通过这样的方式来感知内核是否收到 RST 包。

发送 RST 强制关闭连接,这将导致之前已经发送但尚未送达的、或是已经进入对端 Socket 接收缓冲区但还未被对端应用程序处理的数据被无条件丢弃,导致对端应用程序可能会出现异常

说了这么多,那么究竟会有哪些场景导致需要发送 RST 来强制关闭连接呢?下面笔者就来为大家一一梳理下:

3.1 TCP 连接队列已满

TCP连接过程.png

我们先根据上面这副图来看一下一个正常的 TCP 连接建立的过程:

  1. 客户端向服务端发送 SYN 包请求建立 TCP 连接。客户端连接状态变为 SYN_SENT 状态。

  2. 服务端收到 SYN 包之后,服务端连接状态变为 SYN_RECV 状态。随后会创建轻量级 request_sock 结构来表示连接信息(里面能唯一确定某个客户端发来的 SYN 的信息),并将这个 request_sock 结构放入 TCP 的半连接队列 SYN_Queue 中,TCP 内核协议栈发送 SYN+ACK 包给客户端。

  3. 客户端的 TCP 内核协议栈收到服务端发送过来的 SYN+ACK 后,随即回复
    ACK 包给服务端。此时客户端连接状态变为 ESTANLISHED 状态。

  4. 服务端收到客户端的 ACK 包之后,从半连接队列中查找是否有代表该客户端连接的轻量级 request_sock 结构,如果有,连接状态变为 ESTABLISHED 状态,随后会从半连接队列 SYN-Queue 中将 request_socket 结构取出移动到全连接队列 ACCEPT-Queue 中。

  5. 用户进程的 accpet 系统调用根据监听 Socket 克隆出一个真正的连接 Socket 然后返回。

从 TCP 建立连接的过程我们看到,这里涉及到两个重要的队列,一个存放客户端 SYN 信息的半连接队列 SYN-Queue ,另一个是存放完成三次握手的客户端连接信息的全连接队列 ACCEPT-Queue 。

那么只要是队列它就会有长度的限制,就可能会满。那么在这两个连接队列已满的状况下,又会发生什么情况呢?

3.1.1 半连接队列 SYN-Queue 已满

SYN FLOOD.png

假设现在有大量的客户端在向服务端发送 SYN 包请求建立连接,但是这些客户端比较坏,在收到服务端的 SYN+ACK 包之后就是不回复 ACK 包给服务端,而服务端一直收不到客户端的 ACK 包,所以就会重传 SYN+ACK 包给客户端,重传次数由内核参数 tcp_synack_retries 限制,默认为 5 次。

$ cat /proc/sys/net/ipv4/tcp_synack_retries
5

这 5 次的重传时间间隔为 1s , 2s , 4s , 8s , 16s ,总共 31s ,而第 5 次重传的 SYN+ACK 包发出后还要等 32s 才能知道第 5 次也超时了,所以,总共需要 1s + 2s + 4s+ 8s+ 16s + 32s = 63s ,TCP 才会把断开这个连接,并从半连接队列中移除对应的 request_sock 。

我们可以看到 TCP 内核协议栈需要等待 63s 的时间才能断开这个半连接,假设这 63s 内又有大量的客户端这样子搞事情,那么很快服务端的半连接队列 SYN-Queue 堆积的 request_sock 就会越来越多最终溢出。

当半连接队列溢出之后,再有正常的客户端连接进来之后,内核协议栈默认情况下就会直接丢弃 SYN 包,导致服务端无法处理正常客户端的请求,这就叫做 SYN Flood 攻击。

有一个内核参数 net.ipv4.tcp_syncookies 可以影响内核处理半连接队列溢出时的行为:

  • net.ipv4.tcp_syncookies = 0 : 服务端直接丢弃客户端发来的 SYN 包。

  • net.ipv4.tcp_syncookies = 1 :如果此时全连接队列 ACEPT-Queue 也满了,并且 qlen_young 的值大于 1 ,那么直接丢弃 SYN 包,否则就生成 syncookie(一个特别的 sequence number )然后作为 SYN + ACK 包中的序列号返回给客户端。并输出 "possible SYN flooding on port . Sending cookies."。

qlen_young 表示目前半连接队列中,没有进行 SYN+ACK 包重传的连接数量。

随后客户端会在 ACK 包中将这个 syncookie 带上回复给服务端,服务端校验 syncookie ,并根据 syncookie 的信息构造 request_sock 结构放入全连接队列中。

从以上过程我们可以看出在开启 tcp_syncookies 的情况下,服务端利用 syncookie 可以绕过半连接队列从而完成建立连接的过程。我们可以利用这种方式来防御 SYN Flood 攻击。

但是 tcp_syncookies 不适合用在服务端负载很高的场景,因为在启用 tcp_syncookies 的时候,服务端在发送 SYN+ACK 包之前,会要求客户端在短时间内回复一个序号,这个序号包含客户端之前发送 SYN 包内的信息,比如 IP 和端口。

如果客户端回复的这个序号是正确的,那么服务端就认为这个客户端是正常的,随后就会发送携带 syncookie 的 SYN+ACK 包给客户端。如果客户端不回复这个序号或者序号不正确,那么服务端就认为这个客户端是不正常的,直接丢弃连接不理会。

从这个过程中,我们可以看出当启用 tcp_syncookies 的时候,这个建立连接的过程并不是一个正常的 TCP 三次握手的过程,因为服务端在发送 SYN+ACK 包之前还需要等待客户端回复一个序号,这就产生了一定的延迟,所以 tcp_syncookies 不适合用在服务端负载很高的场景,但是一般的负载情况还是比较有效防御 SYN Flood 攻击的方式。

除此之外,我们还可以调整以下内核参数来防御 SYN Flood 攻击

  • 增大半连接队列容量 tcp_max_syn_backlog 。设置比默认 256 更大的一个数值。

  • 减少 SYN+ACK 重试次数 tcp_synack_retries 。

3.1.2 全连接队列 ACCEPT-Queue 已满

当服务端的负载比较大并且从全连接队列中 accept 连接处理的比较慢,同时又有大量新的客户端连接上来的时候,就会导致 TCP 全连接队列溢出。

内核参数 net.ipv4.tcp_abort_on_overflow 会影响内核协议栈处理全连接队列溢出的行为。

当客户端发来三次握手最后一个 ACK 包时,但此时服务端全连接队列已满:

  • 当 tcp_abort_on_overflow = 0 时,服务端内核协议栈会将该连接标记为 acked 状态,但仍保留在 SYN-Queue 中,并开启 SYN+ACK 重传机制。当 SYN+ACK 包的重传次数超过 net.ipv4.tcp_synack_retries 设置的值时,再将该连接从 SYN queue 中删除。但是此时在客户端的视角来说,连接已经建立成功了。客户端并不知道此时 ACK 包已经被服务端所忽略,如果此时向服务端发送数据的话,服务端会回复 RST 给客户端。
全连接队列满0.png
  • 当 tcp_abort_on_overflow = 1 时, 服务端TCP 协议栈直接回复 RST 包,并直接从 SYN-Queue 中删除该连接信息。
全连接队列已满1.png

面对全连接队列溢出的情况,我们需要及时增大全连接队列的长度,而全连接队列的长度由两个参数控制:

  • 内核参数 net.core.somaxconn,默认 128 。

  • listen 系统调用方法参数 backlog 。

int listen(int sockfd, int backlog)

在 Netty 中我们可以通过如下配置指定:

    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
     .channel(NioServerSocketChannel.class)
     .option(ChannelOption.SO_BACKLOG, 全连接队列长度)

全连接队列 ACCEPT-Queue 的长度由 min(backlog, somaxconn) 决定,所以当全连接队列满时,我们需要检查如下设置:

  • 调整内核参数 net.core.somaxconn。
  • 检查应用程序中的 backlog 参数。
  • 设置 tcp_abort_on_overflow = 1 。

3.2 连接未被监听的端口

连接未LISTEN端口RST.png

当客户端 Connect 一个未被监听的远端服务端口,则会收到对端发来的一个 RST 包。

客户端要连接的端口未被监听,有两种情况:

  • 该端口在服务端从来没有应用程序监听过。

  • 服务端监听该端口的应用程序崩溃挂掉了。

3.3 服务端程序崩溃

服务端程序崩溃RST.png

TCP 连接正常的状态下,无论是连接时发送的 SYN ,还是连接建立成功后发送的正常数据包,以及最后关闭连接时发送的 FIN ,都会收到对端的 ACK 确认。

当服务端因为某种原因导致崩溃之后,客户端再次向服务端发送数据,就会收到 RST 。

3.4 开启 SO_LINGER 选项设置 l_linger = 0

RST关闭连接.png

在前边《2.1.3 针对 SO_LINGER 选项的处理》小节我们介绍 SO_LINGER 选项的时候提到过,当我们将选项参数设置为 l_onoff = 1,l_linger = 0 时,当客户端调用 close 方法关闭连接的时候,这时内核协议栈会发出 RST 而不是 FIN 。跳过正常的四次挥手关闭流程直接强制关闭,Socket 缓冲区的数据来不及处理直接丢弃。

3.5 主动关闭方在关闭时 Socket 接收缓冲区还有未处理数据

客户端提前关闭Socket RST.png
  • 主动关闭方在调用 close() 系统调用关闭 Socket 时,内核会检查 Socket 接收缓冲区中是否还有数据未被读取处理,如果有,则直接清空 Socket 接收缓冲区中的未处理数据,并向对端发送 RST 。

  • 如果此时 Socket 接收缓冲区中没有未被处理的数据,内核才会走正常的关闭流程,尝试将 Socket 发送缓冲区中的数据发送出去,然后向对端发送 FIN ,走正常的四次挥手关闭流程。

3.6 主动关闭方 close 关闭但在 FIN_WAIT2 状态接收数据

close关闭但FIN_WAIT2接收数据 RST.png

TCP是一个面向连接的、可靠的、基于字节流的全双工传输层通信协议,既然它是全双工的,那就意味着TCP连接同时有一个读通道和写通道。

image.png

而调用 close() 来关闭连接,意味着会将读写通道同时关闭,之后不能再读取数据。

如果客户端调用 close() 方法关闭连接,而服务端在 CLOSE_WAIT 状态下继续向客户端发送数据,客户端在 FIN_WAIT2 状态下直接会丢弃数据,并发送 RST 给服务端,直接强制关闭连接,也是个暴脾气,哈哈。

4. Netty 对 RST 包的处理

同 TCP 正常关闭收到 FIN 包一样,当服务端收到 RST 包后,OP_READ 事件活跃,Reactor 线程再次来到了 AbstractNioByteChannel#read 方法处理 OP_READ 事件。

public abstract class AbstractNioByteChannel extends AbstractNioChannel {

        @Override
        public final void read() {
            final ChannelConfig config = config();

            ..........省略连接半关闭处理........

            ..........省略获取allocHandle过程.......

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    byteBuf = allocHandle.allocate(allocator);
                    //在读取Channel中的数据时会抛出IOExcetion异常             
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    .........省略.............

                } while (allocHandle.continueReading());

                .........省略.............
    
            } catch (Throwable t) {
                 handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                 ............省略...............
            }
        }
    }

}

这里和 TCP 正常关闭不同的是,在调用 doReadBytes 方法从 Channel 中读取数据的时候会抛出 IOException 异常。这里会有两种情况抛出异常:

  • 此时Socket接收缓冲区中只有 RST 包,并没有其他正常数据。

  • Socket 接收缓冲区有正常的数据,OP_READ 事件活跃,当调用 doReadBytes 方法从 Channel 中读取数据的过程中,对端发送 RST 强制关闭连接,这时会在读取的过程中抛出 IOException 异常。

当 doReadBytes 方法抛出 IOException 异常后,会被 catch(){...} 语句捕获到,随后在 handleReadException 方法中处理 TCP 异常关闭的情况。

4.1 handleReadException

image.png
        private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
                RecvByteBufAllocator.Handle allocHandle) {

            if (byteBuf != null) {
                if (byteBuf.isReadable()) {
                    readPending = false;
                    //如果发生异常时,已经读取到了部分数据,则触发ChannelRead事件
                    pipeline.fireChannelRead(byteBuf);
                } else {
                    byteBuf.release();
                }
            }
            allocHandle.readComplete();
            pipeline.fireChannelReadComplete();
            pipeline.fireExceptionCaught(cause);

            if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
                closeOnRead(pipeline);
            }
        }

这里可以看出,当服务端接收到 RST 强制关闭连接时,首先会触发 ExceptionCaught 事件在 pipeline 中传播,最终还是会调用到 closeOnRead 方法关闭连接,取消 Channel 注册,并触发 ChannelInactive 事件和 ChannelUnregistered 事件。

当发生异常时,如果已经从 Channel 中读取到了数据,那么也会触发 ChannelRead 事件,随后触发 ChannelReadComplete 事件和 ExceptionCaught 事件。

如果这里大家已经忘记了相关事件的传播处理流程,可以在回顾下这篇文章 一文聊透 Netty IO 事件的编排利器 pipeline

总结

本文我们介绍了 netty 在面对 TCP 连接关闭时的正常关闭和异常关闭两种处理场景时的处理逻辑和过程。

其中我们还穿插介绍了 SO_LINGER 选项对于 TCP 连接关闭行为的影响,以及 netty 针对 SO_LINGER 选项的处理过程。

同时笔者还为大家列举了关于导致 TCP 连接异常关闭的 7 种场景:

  1. 半连接队列 SYN-Queue 已满

  2. 全连接队列 ACCEPT-Queue 已满

  3. 连接未被监听的端口

  4. 服务端程序崩溃

  5. 开启 SO_LINGER 选项设置 l_linger = 0

  6. 主动关闭方在关闭时 Socket 接收缓冲区还有未处理数据

  7. 主动关闭方 close 关闭但在 FIN_WAIT2 状态接收数据

以及 Netty 对 RST 包的处理流程。在下篇文章中,笔者会为大家继续介绍 TCP 半关闭的处理流程实现。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,457评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,837评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,696评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,183评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,057评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,105评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,520评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,211评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,482评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,574评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,353评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,897评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,489评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,683评论 2 335

推荐阅读更多精彩内容