一、Channel概述
在Java NIO中Channel是提供与I/O服务的直接连接。Channel用在字节缓冲区和位于通道另一侧的实体(通常是一个文件或套接字)之间有效地传输数据。
Channel可以形象地比喻为银行出纳窗口使用的导管。您的薪水支票就是您要传送的信息,载体(Carrier)就好比一个缓冲区。您先填充缓冲区(将您的支票放到载体上),接着将缓冲“写”到通道中(将载体丢进导管中),然后信息负载就被传递到通道另一侧的 I/O 服务(银行出纳员)。
该过程的回应是:出纳员填充缓冲区(将您的收据放到载体上),接着开始一个反方向的通道传输(将载体丢回到导管中)。载体就到了通道的您这一侧(一个填满了的缓冲区正等待您的查验),然后您就会 flip 缓冲区(打开盖子)并将它清空(移除您的收据)。现在您可以开车走了,下一个对象(银行客户)将使用同样的载体(Buffer)和导管(Channel)对象来重复上述过程。
多数情况下,通道与操作系统的文件描述符(File Descriptor)和文件句柄(File Handle)有着一对一的关系。虽然通道比文件描述符更广义,但您将经常使用到的多数通道都是连接到开放的文件描述符的。 Channel 类提供维持平台独立性所需的抽象过程,不过仍然会模拟现代操作系统本身的 I/O 性能。
通道是一种途径,借助该途径,可以用最小的总开销来访问操作系统本身的 I/O 服务。缓冲区则是通道内部用来发送和接收数据的端点。
与在NIO中类似,在netty中,Channel就是连接网络Socket和具有read, write, connect, and bind能力组件的一条通道,并且大大降低了直接使用Socket进行通信的复杂度;Channel是网络操作抽象类,聚合了一组功能,包括但不限于网络读写、客户端发起连接、主动关闭连接,同时也包含了 Netty 框架相关的一些功能,包括获取 Channel 的 EventLoop,获取缓冲区分配器 ByteBufAllocator 和 Pipeline 等。
二、 原理
如图所示:
- 一旦客户端连接成功,将新建一个Channel同该客户端进行绑定;
- Channel从EventLoopGroup获取一个EventLoop,并注册到该EventLoop,Channel生命周期内都和该EventLoop在一起(注册时获得SelectionKey);
- Channel同客户端进行网络连接、关闭和读写,生成相对应的Event(改变SelectionKey信息),触发EventLoop调度线程进行执行;
- 如果是读事件,执行线程调度Pipeline来处理用户业务逻辑;
Channel是Netty抽象出来的网络IO读写相关的接口,为什么不直接使用JDK NIO原生的Channel而要另起灶炉呢,主要原因如下:
- JDK的SocketChannel和ServerSocketChannel没有提供统一的Channel接口供业务开发者使用,对于用户而言,没有统一的操作视图,使用起来并不方便;
- JDK的SocketChannel和ServerSocketChannel的主要职责就是网络操作,由于它们是SPI类接口,由具体的虚拟机厂家来提供,所以通过继承SPI功能类来扩展其功能的难度很大。直接实现ServerSocketChannel和SocketChannel抽象类,其工作量和重新开发一个新的Channel的功能类差不多;
- Netty的Channel需要能跟Netty整体框架融合在一起,例如IO模型、基于ChannelPipeline的定制模型,以及基于元数据描述配置化的TCP参数等,这些JDK的SocketChannel和ServerSocketChannel都没有提供,需要重新封装。
- 自定义的Channel,功能实现更加灵活。
基于以上的4个原因,Netty重新设计了Channel接口,并且给予了很多不同的实现,它的设计原理很简单,但是功能却比较复杂,主要的设计理念如下:
- 在Channel接口层,采用Facade模式进行统一封装,将网络IO操作,及相关联的其他操作封装起来,统一对外提供。
- Channel接口的定义尽量大而全,为SocketChannel和ServerSocketChannel提供统一的视图,由不同的子类实现不同的功能,公共功能在抽象父类实现,最大限度上实现功能和接口的重用。
-
具体采用聚合而非包含的方式,将相关的功能类聚合在Channel中,由Channel统一负责分配和调度,功能实现更加灵活
。
三、 Channel功能介绍
Channel的功能比较繁杂,通过分类的方式对它主要的功能进行介绍。
3.1 网络IO操作
Channel网络IO相关的方法定义如下:
下面对这些API的功能进行分类说明,读写相关的API列表。
- Chanenl read():从当前Channel读取数据到第一个inbound缓冲区中,如果数据被成功读取,触发ChannelHandler.channelRead(ChannelHandlerContext,Object)事件,读取操作调用完之后,紧接着会触发ChannelHandler.channelReadComplete(ChannelHandlerContext)事件,这样业务的ChannelHandler可以决定是否需要继续读取数据。如果已经有读操作请求被挂起,则后续的读操作会被忽略。
- ChannelFuture write(Object msg):请求将当前的msg通过ChannelPipeline写入到目标Channel中。注意,write操作只是将消息存入到消息发送环形数组中,并没有真正被发送,只有调用flush操作才会被写入到Channel中,发送给对方。
- ChannelFuture write(Object msg, ChannelPromise promise):功能与write(Object msg)相同,但是携带了ChannelPromise参数负责设置写入操作的结果。
- ChannelFuture writeAndFlush(Object msg,ChannelPromise promise):与方法3功能类似,不同之处在于它会将消息写入Channel中发送,等价于单独调用write和flush操作的组合。
- ChannelFuture writeAndFlush(Object msg):功能等于方法4,但是没有ChannelPromise promise参数。
- Channel flush():将之前写入到发送环形数组中的消息全部写入到目标Channel中,发送给通信对方。
- ChannelFuture close(ChannelPromise promise):主动关闭当前连接,通过ChannelPromise设置操作结果并进行结果通知,无论操作是否成功,都可以通过ChannelPromise获取操作结果。改操作会级联触发ChannelPipe中所有ChannelHandler的ChannelHandler.close(ChannelHandlerContext,ChannelPromise)事件。
- ChannelFuture disconnect(ChannelPromise promise):请求断开与远程通信对端的连接并使用ChannelPromise获取操作结果的通知信息。该方法会级联触发ChannelHandler.disconnect(ChannelHandlerContext,ChannelPromise)事件。
- ChannelFuture connect(SocketAddress remoteAddress):客户端使用指定的服务端地址发起连接请求,如果连接因为应答超时而失败,ChannelFuture中的操作结果ConnectTimeoutException异常,如果连接被拒绝,操作结果为ConnectException。该方法会级联触发ChannelHandler.connect(ChannelHandlerContext,SocketAddress,SocketAddress,ChannelPromise)事件。
- ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress):功能与方法9类似,唯一不同的是先绑定本地地址localAddress,然后在连接服务端。
- ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise):与方式9功能类似,唯一不同的是多了一个ChannelPromise参数用于写入操作结果。
- ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise):与方法11功能类似,唯一不同的就是绑定了本地地址。
- ChannelFuture bind(SocketAddress localAddress):绑定指定的本地Socket地址,该方法会级联触发ChannelHandler.bind(ChannelHandlerContext,SocketAddress,ChannelPromise)事件。
- ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise):与13方法功能类似,多了一个ChannelPromise参数用于写入操作结果。
- ChannelConfig config():获取当前Channel的配置信息,例如CONNECT_TIMEOUT_MILLIS。
- blooean is opoen():判断当前Channel是否已经打开
- boolean isRegistered():判断当前Channel是否已经注册到EventLoop上。
- boolean isActive():判断当前Channel是否已经处于激活状态
- ChannelMetadata metadata():获取当前Channel的元数据描述信息,包括TCP参数配等。
- SocketAddress localAddress():获取当前Channel的本地绑定地址
- SocketAddress remoteAddress():获取当前Channel通信的远程Socket地址。
3.2 其他常用的API功能说明
第一个比较重要的方法是eventLoop,Channel需要注册到EventLoop的多路复用器上,用于处理IO事件,通过eventLoop方法可以获取到Channel注册的EventLoop。EventLoop本质上就是处理网络读写事件的Reactor线程。在Netty中,它不仅仅用来处理网络事件,也可以用来执行定时任务和用户自定义NIO Task等任务。
第二个比较常用的方法时metadata方法,熟悉TCP协议的同学都可能知道,当创建Socket的时候需要指定TCP参数,例如接收和发送的TCP缓冲区大小,TCP的超时事件,是否重用地址等等。在Netty中,每个Channel对应一个物理连接,每个链接都有自己的TCP参数配置。所以Channel会聚合一个ChannelMetadata用来对TCP参数提供元数据描述信息,通过metadata方法就可以获取当前Channel的TCP参数配置。
第三个方法时parent,对于服务端Channel而言,它的父Channel为空,对于客户端Channel,它的父Channel就是创建它的ServerSocketChannel。
第四个方法是用户获取Channel标识id,它返回ChannelId对象,ChannelId是Channel的唯一标识,它的可能生成策略如下:
- (1)机器的MAC地址(EUI-48或者EUI-64)等可以代表全局唯一的信息。
- (2)当前进程的ID。
- (3)当前系统时间的毫秒----System.currentTimeMillis
- (4)当前系统时间的纳秒----System.nanoTime
- (5)32位的随机整型数
- (6)32位的自增的序列
四、Channel状态
4.1 状态相关API
boolean isOpen(); // 是否开放
boolean isRegistered(); // 是否注册到一个EventLoop
boolean isActive(); // 是否激活
boolean isWritable(); // 是否可写
open表示Channel的开放状态,True表示Channel可用,False表示Channel已关闭不再可用。registered表示Channel的注册状态,True表示已注册到一个EventLoop,False表示没有注册到EventLoop。active表示Channel的激活状态,对于ServerSocketChannel,True表示Channel已绑定到端口;对于SocketChannel,表示Channel可用(open)且已连接到对端。Writable表示Channel的可写状态,当Channel的写缓冲区outboundBuffer非null且可写时返回True。
一个正常结束的Channel状态转移有以下两种情况:
REGISTERED->CONNECT/BIND->ACTIVE->CLOSE->INACTIVE->UNREGISTERED
REGISTERED->ACTIVE->CLOSE->INACTIVE->UNREGISTERED
其中第一种是服务端用于绑定的Channel或者客户端用于发起连接的Channel,第二种是服务端接受的SocketChannel。一个异常关闭的Channel则不会服从这样的状态转移。
五、 Channel源码分析
Channel的实现类非常多,继承关系复杂,从学习的角度我们抽取最重要的两个NioServerSocketChannel和NioSocketChannel。
服务端NioServerSocketChannel的继承关系类图如下:
客户端NioSocketChannel的继承关系类图如下:
5.1 AbstractChannel源码分析
成员变量定义
在分析Abstract源码之前先了解下它的成员变量定义,首先定义了五个静态全局异常:
- FLUSH0_CLOSED_CHANNEL_EXCEPTION
- ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION
- CLOSE_CLOSED_CHANNEL_EXCEPTION
- WRITE_CLOSED_CHANNEL_EXCEPTION
- FLUSH0_NOT_YET_CONNECTED_EXCEPTION
然后看AbstractChannel的字段:
private final Channel parent; // 父Channel
private final ChannelId id;
private final Unsafe unsafe;
private final DefaultChannelPipeline pipeline; // 处理通道
private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
private final CloseFuture closeFuture = new CloseFuture(this);
private volatile SocketAddress localAddress; // 本地地址
private volatile SocketAddress remoteAddress; // 远端地址
private volatile EventLoop eventLoop; // EventLoop线程
private volatile boolean registered; // 是否注册到EventLoop
private boolean closeInitiated;
/** Cache for the string representation of this channel */
private boolean strValActive;
private String strVal;
然后,我们看其中的构造方法:
/**
* Creates a new instance.
*
* @param parent
* the parent of this channel. {@code null} if there's no parent.
*/
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
newUnsafe()和newChannelPipeline()可由子类覆盖实现。在Netty的实现中每一个Channel都有一个对应的Unsafe内部类:AbstractChannel--AbstractUnsafe,AbstractNioChannel--AbstractNioUnsafe等等,newUnsafe()方法正好用来生成这样的对应关系。ChannelPipeline将在之后讲解,这里先了解它的功能:作为用户处理器Handler的容器为用户提供自定义处理I/O事件的能力即为用户提供业务逻辑处理。AbstractChannel中对I/O事件的处理,都委托给ChannelPipeline处理,代码基本一样:
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return pipeline.bind(localAddress);
}
通过AbstractChannel源码看下网络IO相关的操作,前面有提到网络IO操作时讲到它会触发Channelpipeline中对应的事件方法,Netty基于事件驱动,我们也可以理解为当Channel进行IO操作时会产生对应的IO事件,然后驱动事件在ChannelPipe中传播,由相对应的ChannelHandler对事件进行拦截和处理,不关心的事件可以直接忽略。采用事件驱动的方式可以非常轻松的通过事件定义来划分事件拦截切面,方便业务的定制和功能扩展,相比AOP,其性能更高,但是功能却基本等价。
网络IO操作直接调用DefaultPipeline的相关方法,由DefaultChannelPipeline中对应的ChannelHandler进行具体的逻辑处理。
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return pipeline.bind(localAddress);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
return pipeline.connect(remoteAddress);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
return pipeline.connect(remoteAddress, localAddress);
}
@Override
public ChannelFuture disconnect() {
return pipeline.disconnect();
}
@Override
public ChannelFuture close() {
return pipeline.close();
}
@Override
public ChannelFuture deregister() {
return pipeline.deregister();
}
@Override
public Channel flush() {
pipeline.flush();
return this;
}
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, promise);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, localAddress, promise);
}
@Override
public ChannelFuture disconnect(ChannelPromise promise) {
return pipeline.disconnect(promise);
}
@Override
public ChannelFuture close(ChannelPromise promise) {
return pipeline.close(promise);
}
@Override
public ChannelFuture deregister(ChannelPromise promise) {
return pipeline.deregister(promise);
}
@Override
public Channel read() {
pipeline.read();
return this;
}
AbstractChannel其他方法都比较简单,主要关注状态判定的方法:
@Override
public boolean isWritable() {
ChannelOutboundBuffer buf = unsafe.outboundBuffer();
return buf != null && buf.isWritable(); // 写缓冲区不为null且可写
}
@Override
public boolean isRegistered() {
return registered;
}
对于Channel的实现来说,其中的内部类Unsafe才是关键,因为其中含有I/O事件处理的细节。AbstractUnsafe作为AbstractChannel的内部类,定义了I/O事件处理的基本框架,其中的细节留给子类实现。我们将依次对各个事件框架进行分析。
- register事件框架
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) { // 已经注册则失败
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) { // EventLoop不兼容当前Channel
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
// 当前线程为EventLoop线程直接执行;否则提交任务给EventLoop线程
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly(); // 异常时关闭Channel
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
类似eventLoop.inEventLoop()及之后的一段代码在Netty中使用了很多次,这是为了保证I/O事件以及用户定义的I/O事件处理逻辑(业务逻辑)在一个线程中处理。我们看提交的任务register0():
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister(); // 模板方法,细节由子类完成
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded(); // 将用户Handler添加到ChannelPipeline
safeSetSuccess(promise);
pipeline.fireChannelRegistered(); // 触发Channel注册事件
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
// ServerSocketChannel接受的Channel此时已被激活
if (isActive()) {
// 首次注册且激活触发Channel激活事件
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead(); // 可视为模板方法
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly(); // 可视为模板方法
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
register0()方法定义了注册到EventLoop的整体框架,整个流程如下:
(1).注册的具体细节由doRegister()方法完成,子类中实现。
(2).注册后将处理业务逻辑的用户Handler添加到ChannelPipeline。
(3).异步结果设置为成功,触发Channel的Registered事件。
(4).对于服务端接受的客户端连接,如果首次注册,触发Channel的Active事件,如果已设置autoRead,则调用beginRead()开始读取数据。
对于(4)的是因为fireChannelActive()中也根据autoRead配置,调用了beginRead()方法。beginRead()方法其实也是一个框架,细节由doBeginRead()方法在子类中实现:
@Override
public final void beginRead() {
assertEventLoop();
if (!isActive()) {
return;
}
try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
异常处理的closeForcibly()方法也是一个框架,细节由doClose()方法在子类中实现:
@Override
public final void closeForcibly() {
assertEventLoop();
try {
doClose();
} catch (Exception e) {
logger.warn("Failed to close a channel.", e);
}
}
register框架中有一对safeSetXXX()方法,将未完成的Promise标记为完成且成功或失败,其实现如下:
/**
* Marks the specified {@code promise} as success. If the {@code promise} is done already, log a message.
*/
protected final void safeSetSuccess(ChannelPromise promise) {
if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
}
}
至此,register事件框架分析完毕。
- bind事件框架
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
bind事件框架较为简单,主要完成在Channel绑定完成后触发Channel的Active事件。其中的invokeLater()方法向Channel注册到的EventLoop提交一个任务:
private void invokeLater(Runnable task) {
try {
// This method is used by outbound operation implementations to trigger an inbound event later.
// They do not trigger an inbound event immediately because an outbound operation might have been
// triggered by another inbound event handler method. If fired immediately, the call stack
// will look like this for example:
//
// handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
// -> handlerA.ctx.close()
// -> channel.unsafe.close()
// -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
//
// which means the execution of two inbound handler methods of the same handler overlap undesirably.
eventLoop().execute(task);
} catch (RejectedExecutionException e) {
logger.warn("Can't invoke task later as EventLoop rejected it", e);
}
}
closeIfClosed()方法当Channel不再打开时关闭Channel,代码如下:
protected final void closeIfClosed() {
if (isOpen()) {
return;
}
close(voidPromise());
}
close()也是一个框架,后面会单独进行分析;
- disconnect事件框架
@Override
public final void disconnect(final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable()) {
return;
}
boolean wasActive = isActive();
try {
doDisconnect(); // 模板方法,细节由子类实现
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (wasActive && !isActive()) {
invokeLater(new Runnable() { // 触发Inactive事件
@Override
public void run() {
pipeline.fireChannelInactive();
}
});
}
safeSetSuccess(promise);
closeIfClosed(); // doDisconnect() might have closed the channel
}
- close事件框架
@Override
public final void close(final ChannelPromise promise) {
assertEventLoop();
close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false);
}
private void close(final ChannelPromise promise, final Throwable cause,
final ClosedChannelException closeCause, final boolean notify) {
if (!promise.setUncancellable()) {
return;
}
if (closeInitiated) {
if (closeFuture.isDone()) {
// Closed already.
safeSetSuccess(promise); // 已经关闭,保证底层close只执行一次
} else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
// This means close() was called before so we just register a listener and return
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
promise.setSuccess(); // 当Channel关闭时,将此次close异步请求结果也设置为成功
}
});
}
return;
}
closeInitiated = true;
final boolean wasActive = isActive();
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer. // 设置为空禁止write操作,同时作为标记字段表示正在关闭
Executor closeExecutor = prepareToClose();
if (closeExecutor != null) {
closeExecutor.execute(new Runnable() {
@Override
public void run() {
try {
// Execute the close.
doClose0(promise); // 由上面的prepareToClose返回的executor执行
} finally {
// Call invokeLater so closeAndDeregister is executed in the EventLoop again!
invokeLater(new Runnable() { // Channel注册的EventLoop执行
@Override
public void run() {
if (outboundBuffer != null) {
// Fail all the queued messages
// 写缓冲队列中的数据全部设置失败
outboundBuffer.failFlushed(cause, notify);
outboundBuffer.close(closeCause);
}
fireChannelInactiveAndDeregister(wasActive);
}
});
}
}
});
} else {
try {
// Close the channel and fail the queued messages in all cases.
doClose0(promise); // 当前调用线程执行
} finally {
if (outboundBuffer != null) {
// Fail all the queued messages.
outboundBuffer.failFlushed(cause, notify);
outboundBuffer.close(closeCause);
}
}
if (inFlush0) {
invokeLater(new Runnable() {
@Override
public void run() {
fireChannelInactiveAndDeregister(wasActive);
}
});
} else {
fireChannelInactiveAndDeregister(wasActive);
}
}
}
private void doClose0(ChannelPromise promise) {
try {
doClose(); // 模板方法,细节由子类实现
closeFuture.setClosed();
safeSetSuccess(promise);
} catch (Throwable t) {
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
close事件框架保证只有一个线程执行了真正关闭的doClose()方法,prepareToClose()做一些关闭前的清除工作并返回一个Executor,如果不为空,需要在该Executor里执行doClose0()方法;为空,则在当前线程执行(为什么这样设计?)。写缓冲区outboundBuffer同时也作为一个标记字段,为空表示Channel正在关闭此时禁止写操作。fireChannelInactiveAndDeregister()方法需要invokeLater()使用EventLoop执行,是因为其中会调用deRegister()方法触发Inactive事件,而事件执行需要在EventLoop中执行。
private void fireChannelInactiveAndDeregister(final boolean wasActive) {
deregister(voidPromise(), wasActive && !isActive());
}
- deregister事件框架
@Override
public final void deregister(final ChannelPromise promise) {
assertEventLoop();
deregister(promise, false);
}
private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
if (!promise.setUncancellable()) {
return;
}
if (!registered) { // 已经deregister
safeSetSuccess(promise);
return;
}
// As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
// we need to ensure we do the actual deregister operation later. This is needed as for example,
// we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in
// the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay,
// the deregister operation this could lead to have a handler invoked by different EventLoop and so
// threads.
//
// See:
// https://github.com/netty/netty/issues/4435
invokeLater(new Runnable() {
@Override
public void run() {
try {
doDeregister(); // 模板方法,子类实现具体细节
} catch (Throwable t) {
logger.warn("Unexpected exception occurred while deregistering a channel.", t);
} finally {
if (fireChannelInactive) {
pipeline.fireChannelInactive(); // 根据参数触发Inactive事件
}
// Some transports like local and AIO does not allow the deregistration of
// an open channel. Their doDeregister() calls close(). Consequently,
// close() calls deregister() again - no need to fire channelUnregistered, so check
// if it was registered.
if (registered) {
registered = false;
pipeline.fireChannelUnregistered(); // 首次调用触发Unregistered事件
}
safeSetSuccess(promise);
}
}
});
}
deregister事件框架的处理流程很清晰,其中,使用invokeLater()方法是因为:用户可能会在ChannlePipeline中将当前Channel注册到新的EventLoop,确保ChannelPipiline事件和doDeregister()在同一个EventLoop完成。
需要注意的是:事件之间可能相互调用,比如:disconnect->close->deregister。
- write事件框架
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
// 联系close操作,outboundBuffer为空表示Channel正在关闭,禁止写数据
safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg); // 释放msg 防止泄露
return;
}
int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
事实上,这是Netty定义的write操作的全部代码,完成的功能是将要写的消息Msg加入到写缓冲区。其中的filterOutboundMessage()可对消息进行过滤整理,例如把HeapBuffer转为DirectBuffer,具体实现由子类负责:
protected Object filterOutboundMessage(Object msg) throws Exception {
return msg; // 默认实现
}
- flush事件框架
@Override
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) { // Channel正在关闭直接返回
return;
}
outboundBuffer.addFlush(); // 添加一个标记
flush0();
}
@SuppressWarnings("deprecation")
protected void flush0() {
if (inFlush0) {
// Avoid re-entrance
return; // 正在flush返回防止多次调用
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return; // Channel正在关闭或者已没有需要写的数据
}
inFlush0 = true;
// Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) {
try {
// Channel已经非激活,将所有进行中的写请求标记为失败
if (isOpen()) {
outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
} finally {
inFlush0 = false;
}
return;
}
try {
doWrite(outboundBuffer); // 模板方法,细节由子类实现
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
/**
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
* failing all flushed messages and also ensure the actual close of the underlying transport
* will happen before the promises are notified.
*
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
*/
close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
} else {
try {
shutdownOutput(voidPromise(), t);
} catch (Throwable t2) {
close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
}
} finally {
inFlush0 = false;
}
}
flush事件中执行真正的底层写操作,Netty对于写的处理引入了一个写缓冲区ChannelOutboundBuffer,由该缓冲区控制Channel的可写状态,其具体实现,将会在缓冲区一章中分析。
至此,Unsafe中的事件方法已经分析完7个,但还有connect和read没有引入,下一节将进行分析。(见下一篇文章)