首先引用Netty官网的内容对Netty进行一个正式的介绍。
Netty是为了快速开发可维护的高性能、高可扩展、网络服务器和客户端程序而提供的异步事件驱动基础框架和工具。换句话说,Netty是一个Java NIO客户端/服务器框架。基于Netty,可以快速轻松地开发网络服务器和客户端的应用程序。与直接使用Java NIO相比,Netty给大家造出了一个非常优美的轮子,它可以大大简化了网络编程流程。例如,Netty极大地简化TCP、UDP套接字、HTTP Web服务程序的开发。
Netty的目标之一,是要使开发可以做到“快速和轻松”。除了做到“快速和轻松”的开发TCP/UDP等自定义协议的通信程序之外,Netty经过精心设计,还可以做到“快速和轻松”地开发应用层协议的程序,如FTP,SMTP,HTTP以及其他的传统应用层协议。
Netty的目标之二,是要做到高性能、高可扩展性。基于Java的NIO,Netty设计了一套优秀的Reactor反应器模式。后面会详细介绍Netty中反应器模式的实现。在基于Netty的反应器模式实现中的Channel(通道)、Handler(处理器)等基类,能快速扩展以覆盖不同协议、完成不同业务处理的大量应用类。
6.1 第一个Netty的实践案例DiscardServer
在开始实践之前,第一步就是要准备Netty的版本,配置好开发环境。
6.1.1 创建第一个Netty项目
首先我们需要创建项目,项目名称是NettyDemos。这是一个丢弃服务器(DiscardServer),功能很简单:读取客户端的输入数据,直接丢弃,不给客户端任何回复。
在使用Netty前,首先需要考虑一下JDK的版本,官网建议使用JDK1.6以上,本书使用的是JDK1.8。然后是Netty自己的版本,建议使用Netty 4.0以上的版本。虽然Netty在不断升级,但是4.0以上的版本使用比较广泛。Netty曾经升级到5.0,不过出现了一些问题,版本又回退了。本书使用的Netty版本是4.1.6。
使用maven导入Netty以依赖到工程(或项目),Netty的maven依赖如下:
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.6.Final</version>
</dependency>
那么现在可以正式开始编写第一个项目程序了。
6.1.2 第一个Netty服务器端程序
这里创建一个服务端类:NettyDiscardServer,用以实现消息的Discard“丢弃”功能,它的源代码如下:
//...
public class NettyDiscardServer {
private final int serverPort;
ServerBootstrap b = new ServerBootstrap();
public NettyDiscardServer(int port) {
this.serverPort = port;
}
public void runServer() {
//创建反应器线程组
EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
try {
//1 设置反应器线程组
b.group(bossLoopGroup, workerLoopGroup);
//2 设置nio类型的通道
b.channel(NioServerSocketChannel.class);
//3 设置监听端口
b.localAddress(serverPort);
//4 设置通道的参数
b.option(ChannelOption.SO_KEEPALIVE, true);
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
//5 装配子通道流水线
b.childHandler(new ChannelInitializer<SocketChannel>() {
//有连接到达时会创建一个通道
protected void initChannel(SocketChannelch) throws Exception {
// 流水线管理子通道中的Handler处理器
// 向子通道流水线添加一个handler处理器
ch.pipeline().addLast(new NettyDiscardHandler());
}
});
// 6 开始绑定服务器
// 通过调用sync同步方法阻塞直到绑定成功
ChannelFuturechannelFuture = b.bind().sync();
Logger.info(" 服务器启动成功,监听端口: " +
channelFuture.channel().localAddress());
// 7 等待通道关闭的异步任务结束
// 服务监听通道会一直等待通道关闭的异步任务结束
ChannelFuturecloseFuture = channelFuture.channel().closeFuture();
closeFuture.sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 8关闭EventLoopGroup,
// 释放掉所有资源包括创建的线程
workerLoopGroup.shutdownGracefully();
bossLoopGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
int port = NettyDemoConfig.SOCKET_SERVER_PORT;
new NettyDiscardServer(port).runServer();
}
}
如果是第一次看Netty开发的程序,上面的代码是看不懂的,因为代码里边涉及很多的Netty组件。
Netty是基于反应器模式实现的。还好,大家已经非常深入地了解了反应器模式,现在大家顺藤摸瓜学习Netty的结构就相对简单了。
首先要说的是Reactor反应器。前面讲到,反应器的作用是进行一个IO事件的select查询和dispatch分发。Netty中对应的反应器组件有多种,应用场景不同,用到的反应器也各不相同。一般来说,对应于多线程的Java NIO通信的应用场景,Netty的反应器类型为:NioEventLoopGroup。
在上面的例子中,使用了两个NioEventLoopGroup实例。第一个通常被称为“包工头”,负责服务器通道新连接的IO事件的监听。第二个通常被称为“工人”,主要负责传输通道的IO事件的处理。
其次要说的是Handler处理器(也称为处理程序)。Handler处理器的作用是对应到IO事件,实现IO事件的业务处理。Handler处理器需要专门开发,稍后,将专门对它进行介绍。
再次,在上面的例子中,还用到了Netty的服务启动类ServerBootstrap,它的职责是一个组装和集成器,将不同的Netty组件组装在一起。另外,ServerBootstrap能够按照应用场景的需要,为组件设置好对应的参数,最后实现Netty服务器的监听和启动。服务启动类ServerBootstrap也是本章重点之一,稍候另起一小节进行详细的介绍。
6.1.3 业务处理器NettyDiscardHandler
在反应器(Reactor)模式中,所有的业务处理都在Handler处理器中完成。这里编写一个新类:NettyDiscardHandler。NettyDiscardHandler的业务处理很简单:把收到的任何内容直接丢弃(discard),也不会回复任何消息。代码如下:
//...
public class NettyDiscardHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContextctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
try {
Logger.info("收到消息,丢弃如下:");
while (in.isReadable()) {
System.out.print((char) in.readByte());
}
System.out.println();
} finally {
ReferenceCountUtil.release(msg);
}
}
}
首先说明一下,这里将引入一个新的概念:入站和出站。简单来说,入站指的是输入,出站指的是输出。后面也会有详细介绍。
Netty的Handler处理器需要处理多种IO事件(如可读、可写),对应于不同的IO事件,Netty提供了一些基础的方法。这些方法都已经提前封装好,后面直接继承或者实现即可。比如说,对于处理入站的IO事件的方法,对应的接口为ChannelInboundHandler入站处理接口,而ChannelInboundHandlerAdapter则是Netty提供的入站处理的默认实现。
也就是说,如果要实现自己的入站处理器Handler,只要继承ChannelInboundHandlerAdapter入站处理器,再写入自己的入站处理的业务逻辑。如果要读取入站的数据,只要写在了入站处理方法channelRead中即可。
在上面例子中的channelRead方法,它读取了Netty的输入数据缓冲区ByteBuf。Netty的ByteBuf,可以对应到前面介绍的NIO的数据缓冲区。它们在功能上是类似的,不过相对而言,Netty的版本性能更好,使用也更加方便。后面会另开一节进行详细的介绍。
6.1.4 运行NettyDiscardServer
在上面的例子中,出现了Netty中的各种组件:服务器启动器、缓冲区、反应器、Handler业务处理器、Future异步任务监听、数据传输通道等。这些Netty组件都是需要掌握的,也都是我们在后面要专项学习的。
如果看不懂这个NettyDiscardServer程序,一点儿也没关系。这个程序在本章的目的,仅仅是为大家展示一下Netty开发中会涉及什么内容,给大家留一个初步的印象。
接下来,大家可以启动NettyDiscardServer服务器,体验一下Netty程序的运行。
找到服务器类NettyDiscardServer。启动它的main方法,就启动了这个服务器。
但是,如果要看到最终的丢弃效果,不能仅仅启动服务器,还需要启动客户端,由客户端向服务器发送消息。客户端在哪儿呢?
这里的客户端,只要能发消息到服务器即可,不需要其他特殊的功能。因此,可以直接使用前面示例中的EchoClient程序来作为客户端运行即可,因为端口是一致的。
找到发送消息到服务器的客户端类:EchoClient。启动它的main方法,就启动了这个客户端。然后在客户端的标准化输入窗口,不断输入要发送的消息,发送到服务器即可。
虽然EchoClient客户端是使用Java NIO编写的,而NettyDiscardServer服务端是使用Netty编写的,但是不影响它们之间的相互通信。因为NettyDiscardServer的底层也是使用Java NIO。
client执行结果如下:
[main|EchoClient.start]:客户端启动成功!
[Thread-0|EchoClient$Processer.run]:请输入发送内容:
neetyDiscardServer
服务器执行结果如下:
[main|NettyDiscardServer.runServer] |> 服务器启动成功,监听端口: /0:0:0:0:0:0:0:0:18899
[nioEventLoopGroup-3-1|NettyDiscardHandler.channelRead] |> 收到消息,丢弃如下:
2019-11-21 11:37:55 >>neetyDiscardServer
6.2 解密Netty中的Reactor反应器模式
在前面的章节中,已经反复说明:设计模式是Java代码或者程序的重要组织方式,如果不了解设计模式,学习Java程序往往找不到头绪,上下求索而不得其法。故而,在学习Netty组件之前,我们必须了解Netty中的反应器模式是如何实现的。
现在,先回顾一下Java NIO中IO事件的处理流程和反应器模式的基础内容。
6.2.1 回顾Reactor反应器模式中IO事件的处理流程
一个IO事件从操作系统底层产生后,在Reactor反应器模式中的处理流程如图6-1所示。
整个流程大致分为4步,具体如下:
第1步:通道注册。IO源于通道(Channel)。IO是和通道(对应于底层连接而言)强相关的。一个IO事件,一定属于某个通道。但是,如果要查询通道的事件,首先要将通道注册到选择器。只需通道提前注册到Selector选择器即可,IO事件会被选择器查询到。
第2步:查询选择。在反应器模式中,一个反应器(或者SubReactor子反应器)会负责一个线程;不断地轮询,查询选择器中的IO事件(选择键)。
第3步:事件分发。如果查询到IO事件,则分发给与IO事件有绑定关系的Handler业务处理器。
第4步:完成真正的IO操作和业务处理,这一步由Handler业务处理器负责。
以上4步,就是整个反应器模式的IO处理器流程。其中,第1步和第2步,其实是Java NIO的功能,反应器模式仅仅是利用了Java NIO的优势而已。
题外话:上面的流程比较重要,是学习Netty的基础。如果这里看不懂,作为铺垫,请先回到反应器模式的详细介绍部分,回头再学习一下反应器模式。
6.2.2 Netty中的Channel通道组件
Channel通道组件是Netty中非常重要的组件,为什么首先要说的是Channel通道组件呢?原因是:反应器模式和通道紧密相关,反应器的查询和分发的IO事件都来自于Channel通道组件。
Netty中不直接使用Java NIO的Channel通道组件,对Channel通道组件进行了自己的封装。在Netty中,有一系列的Channel通道组件,为了支持多种通信协议,换句话说,对于每一种通信连接协议,Netty都实现了自己的通道。
另外一点就是,除了Java的NIO,Netty还能处理Java的面向流的OIO(Old-IO,即传统的阻塞式IO)。
总结起来,Netty中的每一种协议的通道,都有NIO(异步IO)和OIO(阻塞式IO)两个版本。
对应于不同的协议,Netty中常见的通道类型如下:
NioSocketChannel:异步非阻塞TCP Socket传输通道。
NioServerSocketChannel:异步非阻塞TCP Socket服务器端监听通道。
NioDatagramChannel:异步非阻塞的UDP传输通道。
NioSctpChannel:异步非阻塞Sctp传输通道。
NioSctpServerChannel:异步非阻塞Sctp服务器端监听通道。
OioSocketChannel:同步阻塞式TCP Socket传输通道。
OioServerSocketChannel:同步阻塞式TCP Socket服务器端监听通道。
OioDatagramChannel:同步阻塞式UDP传输通道。
OioSctpChannel:同步阻塞式Sctp传输通道。
OioSctpServerChannel:同步阻塞式Sctp服务器端监听通道。
一般来说,服务器端编程用到最多的通信协议还是TCP协议。对应的传输通道类型为NioSocketChannel类,服务器监听类为NioServerSocketChannel。在主要使用的方法上,其他的通道类型和这个NioSocketChannel类在原理上基本是相通的,因此,本书的很多案例都以NioSocketChannel通道为主。
在Netty的NioSocketChannel内部封装了一个Java NIO的SelectableChannel成员。通过这个内部的Java NIO通道,Netty的NioSocketChannel通道上的IO操作,最终会落地到Java NIO的SelectableChannel底层通道。NioSocketChannel的继承关系图,如图6-2所示。
<div class="ne-content J_NEContent" style="min-height: 380px;"><h3 class="section j-chapter" data-paragraphid="368be3ce46c14ac89a7e432d7231f839_5">6.2.3 Netty中的Reactor反应器</h3><p class="section" data-paragraphid="63a8dd0324ed4f9cb23ea04a75397994_5">在反应器模式中,一个反应器(或者SubReactor子反应器)会负责一个事件处理线程,不断地轮询,通过Selector选择器不断查询注册过的IO事件(选择键)。如果查询到IO事件,则分发给Handler业务处理器。</p><p class="section" data-paragraphid="d7457d86297b4b1c8fdd86ffeb99a80c_5">Netty中的反应器有多个实现类,与Channel通道类有关系。对应于NioSocketChannel通道,Netty的反应器类为:NioEventLoop。</p><p class="section" data-paragraphid="63ed314cc2d84c1789b0ecee43403251_5">NioEventLoop类绑定了两个重要的Java成员属性:一个是Thread线程类的成员,一个是Java NIO选择器的成员属性。NioEventLoop的继承关系和主要的成员属性,如下图6-3所示。</p><p class="section" data-paragraphid="00f9fb758e7d46a4b3adc401fbe30c17_5" style="clear: both;"><img class="paragraph-img" data-src="https://easyreadfs.nosdn.127.net/image_25da655d9f4f4086a6dceaa1301837d4" data-width="716" data-height="430" src="https://easyreadfs.nosdn.127.net/image_25da655d9f4f4086a6dceaa1301837d4" style="width: auto; height: 380px;"></p><p class="yd-imagetitle section" data-paragraphid="e63a5f41223c46bb8c62e046f4e3cb6c_5">图6-3 NioEventLoop的继承关系和主要的成员</p><p class="section" data-paragraphid="19de58688bad4e44a56e6304c117edbb_5">通过这个关系图,可以看出:NioEventLoop和前面章节讲到反应器,在思路上是一致的:一个NioEventLoop拥有一个Thread线程,负责一个Java NIO Selector选择器的IO事件轮询。</p><p class="section" data-paragraphid="c7e13172192f40508efe2b58850f16d9_5">在Netty中,EventLoop反应器和Netty Channel通道,关系如何呢?理论上来说,一个EventLoopNetty反应器和NettyChannel通道是一对多的关系:一个反应器可以注册成千上万的通道。</p><p class="section" data-paragraphid="c1b11b9cc107483ebea6ad373fa0f128_5" style="clear: both;"><img class="paragraph-img" data-src="https://easyreadfs.nosdn.127.net/image_3d7585d868ba45e9a61c52c92023b72f" data-width="934" data-height="302" src="https://easyreadfs.nosdn.127.net/image_3d7585d868ba45e9a61c52c92023b72f" style="width: 660px;"></p><p class="yd-imagetitle section" data-paragraphid="fa38737ed7c04e1fa34f235773f7fc35_5">图6-4 EventLoop反应器和通道(Channel)的关系</p></div>
<div class="ne-content J_NEContent" style="min-height: 380px;"><h3 class="section j-chapter" data-paragraphid="6171f90705004186873c2b9531916e6b_5">6.2.4 Netty中的Handler处理器</h3><p class="section" data-paragraphid="f3f8264ff01c45d2be4c747d81df6c66_5">在前面的章节,解读Java NIO的IO事件类型时讲到,可供选择器监控的通道IO事件类型包括以下4种:</p><p class="section" data-paragraphid="f889295c837b40679f47ca9ddfe2425b_5"><span class="yd-font-hkkai">• 可读:SelectionKey.OP_READ</span></p><p class="section" data-paragraphid="b9d3ba2c8b7e4a19a184e53aa1278790_5"><span class="yd-font-hkkai">• 可写:SelectionKey.OP_WRITE</span></p><p class="section" data-paragraphid="e7c9230fbd6f47c3829ce1bc690d07e1_5"><span class="yd-font-hkkai">• 连接:SelectionKey.OP_CONNECT</span></p><p class="section" data-paragraphid="ad0a483c2e4448e4809632345f5f166a_5"><span class="yd-font-hkkai">• 接收:SelectionKey.OP_ACCEPT</span></p><p class="section" data-paragraphid="55f14811fb024b3292248ece8f756c5b_5">在Netty中,EventLoop反应器内部有一个Java NIO选择器成员执行以上事件的查询,然后进行对应的事件分发。事件分发(Dispatch)的目标就是Netty自己的Handler处理器。</p><p class="section" data-paragraphid="76fde06e885f4975bb5cf60507b02b69_5">Netty的Handler处理器分为两大类:第一类是ChannelInboundHandler通道入站处理器;第二类是ChannelOutboundHandler通道出站处理器。二者都继承了ChannelHandler处理器接口。Netty中的Handler处理器的接口与继承关系,如图6-5所示。</p><p class="section" data-paragraphid="2b7579581b524f6fa9dc23c9a898e15f_5" style="clear: both;"><img class="paragraph-img" data-src="https://easyreadfs.nosdn.127.net/image_bee94aa9aff843da8fff6d2257acad32" data-width="955" data-height="262" src="https://easyreadfs.nosdn.127.net/image_bee94aa9aff843da8fff6d2257acad32" style="width: 660px;"></p><p class="yd-imagetitle section" data-paragraphid="543f28efcdfe4812932c939ef4697b34_5">图6-5 Netty中的Handler处理器的接口与继承关系</p><p class="section" data-paragraphid="1fec128031a34de59c6fc9ff2bd030bf_5">Netty中的入站处理,不仅仅是OP_READ输入事件的处理,还是从通道底层触发,由Netty通过层层传递,调用ChannelInboundHandler通道入站处理器进行的某个处理。以底层的Java NIO中的OP_READ输入事件为例:在通道中发生了OP_READ事件后,会被EventLoop查询到,然后分发给ChannelInboundHandler通道入站处理器,调用它的入站处理的方法read。在ChannelInboundHandler通道入站处理器内部的read方法可以从通道中读取数据。</p><p class="section" data-paragraphid="fbe05e2668c74971bd1290389728e4da_5">Netty中的入站处理,触发的方向为:从通道到ChannelInboundHandler通道入站处理器。</p><p class="section" data-paragraphid="d8a58b717d4a4220a9c486066ef82d1d_5">Netty中的出站处理,本来就包括Java NIO的OP_WRITE可写事件。注意,OP_WRITE可写事件是Java NIO的底层概念,它和Netty的出站处理的概念不是一个维度,Netty的出站处理是应用层维度的。那么,Netty中的出站处理,具体指的是什么呢?指的是从ChanneOutboundHandler通道出站处理器到通道的某次IO操作,例如,在应用程序完成业务处理后,可以通过ChanneOutboundHandler通道出站处理器将处理的结果写入底层通道。它的最常用的一个方法就是write()方法,把数据写入到通道。</p><p class="section" data-paragraphid="979b1da7e11b457d80c7fe83bece0971_5">这两个业务处理接口都有各自的默认实现:ChannelInboundHandler的默认实现为ChannelInboundHandlerAdapter,叫作通道入站处理适配器。ChanneOutboundHandler的默认实现为ChanneloutBoundHandlerAdapter,叫作通道出站处理适配器。这两个默认的通道处理适配器,分别实现了入站操作和出站操作的基本功能。如果要实现自己的业务处理器,不需要从零开始去实现处理器的接口,只需要继承通道处理适配器即可。</p></div>
<div class="ne-content J_NEContent" style="min-height: 380px;"><h3 class="section j-chapter" data-paragraphid="535265601f6149d5ba0b2255b9587175_5">6.2.5 Netty的流水线(Pipeline)</h3><p class="section" data-paragraphid="8564c54c41794dc7b51be018e930de5c_5">来梳理一下Netty的反应器模式中各个组件之间的关系:</p><p class="section" data-paragraphid="c69b64297fa64842b9d63af09e8a1d7b_5">(1)反应器(或者SubReactor子反应器)和通道之间是一对多的关系:一个反应器可以查询很多个通道的IO事件。</p><p class="section" data-paragraphid="517cb3479c2a4dc6b3f9bf1ae548231b_5">(2)通道和Handler处理器实例之间,是多对多的关系:一个通道的IO事件被多个的Handler实例处理;一个Handler处理器实例也能绑定到很多的通道,处理多个通道的IO事件。</p><p class="section" data-paragraphid="a965a4adcf7e42ccb151780a58f5f599_5">问题是:通道和Handler处理器实例之间的绑定关系,Netty是如何组织的呢?</p><p class="section" data-paragraphid="5cebbdb6e3fe42d1926bb05a951f40f5_5">Netty设计了一个特殊的组件,叫作ChannelPipeline(通道流水线),它像一条管道,将绑定到一个通道的多个Handler处理器实例,串在一起,形成一条流水线。ChannelPipeline(通道流水线)的默认实现,实际上被设计成一个双向链表。所有的Handler处理器实例被包装成了双向链表的节点,被加入到了ChannelPipeline(通道流水线)中。</p><p class="section" data-paragraphid="60e862a0c8024493b6e7f755bcf1dc29_5">重点申明:一个Netty通道拥有一条Handler处理器流水线,成员的名称叫作pipeline。</p><p class="section" data-paragraphid="2f23ab92d0fe45aabe574750b7830dbd_5">问题来了:这里为什么将pipeline翻译成流水线,而不是翻译成为管道呢?这是有原因的。具体来说,与流水线内部的Handler处理器之间处理IO事件的先后次序有关。</p><p class="section" data-paragraphid="1c80508f949e4a80ae6e465843b6835e_5">以入站处理为例。每一个来自通道的IO事件,都会进入一次ChannelPipeline通道流水线。在进入第一个Handler处理器后,这个IO事件将按照既定的从前往后次序,在流水线上不断地向后流动,流向下一个Handler处理器。</p><p class="section" data-paragraphid="25b472a3ee1d43e783204068d53d9014_5">在向后流动的过程中,会出现3种情况:</p><p class="section" data-paragraphid="f7b5e5719bce464487cc057df984a58f_5">(1)如果后面还有其他Handler入站处理器,那么IO事件可以交给下一个Handler处理器,向后流动。</p><p class="section" data-paragraphid="8480060258e543f58f2f38a122c90f5e_5">(2)如果后面没有其他的入站处理器,这就意味着这个IO事件在此次流水线中的处理结束了。</p><p class="section" data-paragraphid="22ed6958ff904db7b2c5cd505f124565_5">(3)如果在流水线中间需要终止流动,可以选择不将IO事件交给下一个Handler处理器,流水线的执行也被终止了。</p><p class="section" data-paragraphid="e61839f7f2a649ec831ee0488cabd527_5">为什么说Handler的处理是按照既定的次序,而不是从前到后的次序呢?Netty是这样规定的:入站处理器Handler的执行次序,是从前到后;出站处理器Handler的执行次序,是从后到前。总之,IO事件在流水线上的执行次序,与IO事件的类型是有关系的,如图6-6所示。</p><p class="section" data-paragraphid="7f3af6e5654a4e428c48c4a8334703f7_5" style="clear: both;"><img class="paragraph-img" data-src="https://easyreadfs.nosdn.127.net/image_7cfd7ede1148462b8ca6d1b58dcd6f52" data-width="1328" data-height="338" src="https://easyreadfs.nosdn.127.net/image_7cfd7ede1148462b8ca6d1b58dcd6f52" style="width: 660px;"></p><p class="yd-imagetitle section" data-paragraphid="2df29a5fef3246d5a263f9df9b897a37_5">图6-6 流水线上入站处理器和出站处理器的执行次序</p><p class="section" data-paragraphid="8ca898c0b5a54d379747714d21d8f5a7_5">除了流动的方向与IO操作的类型有关之外,流动过程中经过的处理器节点的类型,也是与IO操作的类型有关。入站的IO操作只会且只能从Inbound入站处理器类型的Handler流过;出站的IO操作只会且只能从Outbound出站处理器类型的Handler流过。</p><p class="section" data-paragraphid="2e2ad261b29747dcb6c7227df7be0be9_5">总之,流水线是通道的“大管家”,为通道管理好了它的一大堆Handler“小弟”。</p><p class="section" data-paragraphid="280aa39aa2c6409cbd93a0465b45ae2a_5">了解完了流水线之后,大家应该对Netty中的通道、EventLoop反应器、Handler处理器,以及三者之间的协作关系,有了一个清晰的认知和了解。至此,大家基本可以动手开发简单的Netty程序了。不过,为了方便开发者,Netty提供了一个类把上面的三个组件快速组装起来,这个系列的类叫作Bootstrap启动器。严格来说,不止一个类名字为Bootstrap,例如在服务器端的启动类叫作ServerBootstrap类。</p><p class="section" data-paragraphid="0cd95f9b6bef43809825ddf7058c1d2c_5">下面,为大家详细介绍一下这个提升开发效率的Bootstrap启动器类。</p></div>
<div class="ne-content J_NEContent" style="min-height: 380px;"><h2 class="section j-chapter" data-paragraphid="eb6f27a4afc54a6e8c4bba05b389fcb9_5">6.3 详解Bootstrap启动器类</h2><p class="section" data-paragraphid="931751baa13b47918dfca58718620aad_5">Bootstrap类是Netty提供的一个便利的工厂类,可以通过它来完成Netty的客户端或服务器端的Netty组件的组装,以及Netty程序的初始化。当然,Netty的官方解释是,完全可以不用这个Bootstrap启动器。但是,一点点去手动创建通道、完成各种设置和启动、并且注册到EventLoop,这个过程会非常麻烦。通常情况下,还是使用这个便利的Bootstrap工具类会效率更高。</p><p class="section" data-paragraphid="23560e82539d431a8c11f98e9acf6d66_5">在Netty中,有两个启动器类,分别用在服务器和客户端。如图6-7所示。</p><p class="section" data-paragraphid="70d8aefdb71f4727af67313ff6d7fca3_5" style="clear: both;"><img class="paragraph-img" data-src="https://easyreadfs.nosdn.127.net/image_4aa9efed815e470a8f510d36e37b6791" data-width="680" data-height="366" src="https://easyreadfs.nosdn.127.net/image_4aa9efed815e470a8f510d36e37b6791" style="width: 660px;"></p><p class="yd-imagetitle section" data-paragraphid="8c1dcb7e61eb4090b082f00eaced90b7_5">图6-7 Netty中的两个启动器类</p><p class="section" data-paragraphid="8cc7100e5e144bdb97b34ba3877c8b2c_5">这两个启动器仅仅是使用的地方不同,它们大致的配置和使用方法都是相同的。下面以ServerBootstrap服务器启动类作为重点的介绍对象。</p><p class="section" data-paragraphid="bafe6a3e8b744063bc280c9705c198c8_5">在介绍ServerBootstrap的服务器启动流程之前,首先介绍一下涉及到的两个基础概念:父子通道、EventLoopGroup线程组(事件循环线程组)。</p></div>
<div class="ne-content J_NEContent" style="min-height: 380px;"><h3 class="section j-chapter" data-paragraphid="b2aa9d877894421791c49127c13d6f0f_5">6.3.1 父子通道</h3><p class="section" data-paragraphid="05f8a8de632242a1b41c1d9c745f2a6a_5">在Netty中,每一个NioSocketChannel通道所封装的是Java NIO通道,再往下就对应到了操作系统底层的socket描述符。理论上来说,操作系统底层的socket描述符分为两类:</p><p class="section" data-paragraphid="177f0babbbfb40bf966cc05f99cb2d9a_5"><span class="yd-font-hkkai">• 连接监听类型。连接监听类型的socket描述符,放在服务器端,它负责接收客户端的套接字连接;在服务器端,一个“连接监听类型”的socket描述符可以接受(Accept)成千上万的传输类的socket描述符。</span></p><p class="section" data-paragraphid="a64b3bf7e616406f91179ff1eb7d2ccb_5"><span class="yd-font-hkkai">• 传输数据类型。数据传输类的socket描述符负责传输数据。同一条TCP的Socket传输链路,在服务器和客户端,都分别会有一个与之相对应的数据传输类型的socket描述符。</span></p><p class="section" data-paragraphid="ad45cc650b064fdf8ae1965ae8e75a43_5">在Netty中,异步非阻塞的服务器端监听通道NioServerSocketChannel,封装在Linux底层的描述符,是“连接监听类型”socket描述符;而NioSocketChannel异步非阻塞TCP Socket传输通道,封装在底层Linux的描述符,是“数据传输类型”的socket描述符。</p><p class="section" data-paragraphid="6e5c5ce46a434e01b87ae5cc787cd016_5">在Netty中,将有接收关系的NioServerSocketChannel和NioSocketChannel,叫作父子通道。其中,NioServerSocketChannel负责服务器连接监听和接收,也叫父通道(Parent Channel)。对应于每一个接收到的NioSocketChannel传输类通道,也叫子通道(Child Channel)。</p></div>
<div class="ne-content J_NEContent" style="min-height: 380px;"><h3 class="section j-chapter" data-paragraphid="30096e0ec3a34a9aa0380dcd53673d56_5">6.3.2 EventLoopGroup线程组</h3><p class="section" data-paragraphid="4274acbb38c5495bafcbfa217fee4929_5">Netty中的Reactor反应器模式,肯定不是单线程版本的反应器模式,而是多线程版本的反应器模式。Netty的多线程版本的反应器模式是如何实现的呢?</p><p class="section" data-paragraphid="29f7ea94f3894d06b73668ac8fcb2c74_5">在Netty中,一个EventLoop相当于一个子反应器(SubReactor)。大家已经知道,一个NioEventLoop子反应器拥有了一个线程,同时拥有一个Java NIO选择器。Netty如何组织外层的反应器呢?答案是使用EventLoopGroup线程组。多个EventLoop线程组成一个EventLoopGroup线程组。</p><p class="section" data-paragraphid="a74a413d1d7c4487bd1e334c7a355dd3_5">反过来说,Netty的EventLoopGroup线程组就是一个多线程版本的反应器。而其中的单个EventLoop线程对应于一个子反应器(SubReactor)。</p><p class="section" data-paragraphid="f1009fa8199249afa7f7185c21c53fc4_5">Netty的程序开发不会直接使用单个EventLoop线程,而是使用EventLoopGroup线程组。EventLoopGroup的构造函数有一个参数,用于指定内部的线程数。在构造器初始化时,会按照传入的线程数量,在内部构造多个Thread线程和多个EventLoop子反应器(一个线程对应一个EventLoop子反应器),进行多线程的IO事件查询和分发。</p><p class="section" data-paragraphid="3de395dea1424efab0a5896f82093dec_5">如果使用EventLoopGroup的无参数的构造函数,没有传入线程数或者传入的线程数为0,那么EventLoopGroup内部的线程数到底是多少呢?默认的EventLoopGroup内部线程数为最大可用的CPU处理器数量的2倍。假设电脑使用的是4核的CPU,那么在内部会启动8个EventLoop线程,相当8个子反应器(SubReactor)实例。</p><p class="section" data-paragraphid="a884b788f0914ab5870442e450a5dfbc_5">从前文可知,为了及时接受(Accept)到新连接,在服务器端,一般有两个独立的反应器,一个反应器负责新连接的监听和接受,另一个反应器负责IO事件处理。对应到Netty服务器程序中,则是设置两个EventLoopGroup线程组,一个EventLoopGroup负责新连接的监听和接受,一个EventLoopGroup负责IO事件处理。</p><p class="section" data-paragraphid="198ef0ee33b247f1bcd42d48923e8f81_5">那么,两个反应器如何分工呢?负责新连接的监听和接受的EventLoopGroup线程组,查询父通道的IO事件,有点像负责招工的包工头,因此,可以形象地称为“包工头”(Boss)线程组。另一个EventLoopGroup线程组负责查询所有子通道的IO事件,并且执行Handler处理器中的业务处理——例如数据的输入和输出(有点儿像搬砖),这个线程组可以形象地称为“工人”(Worker)线程组。</p><p class="section" data-paragraphid="bfd50f76d77b4ed984ffbb0a1cc203fb_5">至此,已经介绍完了两个基础概念:父子通道、EventLoopGroup线程组。下一节将介绍ServerBootstrap的启动流程。</p></div>
<div class="ne-content J_NEContent" style="min-height: 380px;"><h3 class="section j-chapter" data-paragraphid="e79b35c264374e4caf213b56bff8d10c_5">6.3.3 Bootstrap的启动流程</h3><p class="section" data-paragraphid="1b15da4950484dae88b587088d9fb2fa_5">Bootstrap的启动流程,也就是Netty组件的组装、配置,以及Netty服务器或者客户端的启动流程。在本节中对启动流程进行了梳理,大致分成了8个步骤。本书仅仅演示的是服务器端启动器的使用,用到的启动器类为ServerBootstrap。正式使用前,首先创建一个服务器端的启动器实例。</p><p class="yd-code section" data-paragraphid="8c419657f1054d11b22033cbb797d36d_5">//创建一个服务器端的启动器
ServerBootstrap b = new ServerBootstrap();</p><p class="section" data-paragraphid="7ee67facabce41278dbcc1fc07cfe295_5">接下来,结合前面的NettyDiscardServer服务器的程序代码,给大家详细介绍一下Bootstrap启动流程中精彩的8个步骤。</p><p class="section" data-paragraphid="62b04a4b695d411e91cd231710e8cbe6_5"><span class="bold">第1步:创建反应器线程组,并赋值给ServerBootstrap启动器实例</span></p><p class="yd-code section" data-paragraphid="7a8edc9c3a3d4683afd53f37a402f72c_5">//创建反应器线程组
//boss线程组
EventLoopGroupbossLoopGroup = new NioEventLoopGroup(1);
//worker线程组
EventLoopGroupworkerLoopGroup = new NioEventLoopGroup();
//...
//1 设置反应器线程组
b.group(bossLoopGroup, workerLoopGroup);</p><p class="section" data-paragraphid="08615faf35d245939421127ac03f7b9d_5">在设置反应器线程组之前,创建了两个NioEventLoopGroup线程组,一个负责处理连接监听IO事件,名为bossLoopGroup;另一个负责数据IO事件和Handler业务处理,名为workerLoopGroup。</p><p class="section" data-paragraphid="be60dac69ace45439646d5459ba44559_5">在线程组创建完成后,就可以配置给启动器实例,调用的方法是b.group(bossGroup,workerGroup),它一次性地给启动器配置了两大线程组。</p><p class="section" data-paragraphid="ac0129ea8fac48f9b4d4b3319457fc86_5">不一定非得配置两个线程组,可以仅配置一个EventLoopGroup反应器线程组。具体的配置方法是调用b.group(workerGroup)。在这种模式下,连接监听IO事件和数据传输IO事件可能被挤在了同一个线程中处理。这样会带来一个风险:新连接的接受被更加耗时的数据传输或者业务处理所阻塞。</p><p class="section" data-paragraphid="1c18fd0dea384bb89262a7a8b4d759b4_5">在服务器端,建议设置成两个线程组的工作模式。</p><p class="section" data-paragraphid="c1c0f7e763454faea2b2384d729dd3ee_5"><span class="bold">第2步:设置通道的IO类型</span></p><p class="section" data-paragraphid="7e43e397fe8b4768bde76e7d94eb1d1f_5">Netty不止支持Java NIO,也支持阻塞式的OIO(也叫BIO,Block-IO,即阻塞式IO)。下面配置的是Java NIO类型的通道类型,方法如下:</p><p class="yd-code section" data-paragraphid="42f8d4ed28344676934d8f171ad90bfa_5">//2 设置nio类型的通道
b.channel(NioServerSocketChannel.class);</p><p class="section" data-paragraphid="8bde26b4a29a4c0f99e751dae0288ab9_5">如果确实需要指定Bootstrap的IO模型为BIO,那么这里配置上Netty的OioServerSocketChannel.class类即可。由于NIO的优势巨大,通常不会在Netty中使用BIO。</p><p class="section" data-paragraphid="c3ea478e516748bab543610446f64526_5"><span class="bold">第3步:设置监听端口</span></p><p class="yd-code section" data-paragraphid="900a6cc7f5124bcaa7abcdfb375f2f1b_5">//3 设置监听端口
b.localAddress(new InetSocketAddress(port));</p><p class="section" data-paragraphid="36461ca9a056417c9ab6c263b05cc64b_5">这是最为简单的一步操作,主要是设置服务器的监听地址。</p><p class="section" data-paragraphid="85152b4b1151424caaa267d212ecba41_5"><span class="bold">第4步:设置传输通道的配置选项</span></p><p class="yd-code section" data-paragraphid="6998415d34c14d009decb38d9f6b4771_5">//4 设置通道的参数
b.option(ChannelOption.SO_KEEPALIVE, true);
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);</p><p class="section" data-paragraphid="2ba44a9c17bc437c9171f978316dc8fd_5">这里用到了Bootstrap的option()选项设置方法。对于服务器的Bootstrap而言,这个方法的作用是:给父通道(Parent Channel)接收连接通道设置一些选项。</p><p class="section" data-paragraphid="1fba09f56c924dcc97f8a25c45a8233a_5">如果要给子通道(Child Channel)设置一些通道选项,则需要用另外一个childOption()设置方法。</p><p class="section" data-paragraphid="7f46c363fd8a4b7486927e1f874b80d4_5">可以设置哪些通道选项(ChannelOption)呢?在上面的代码中,设置了一个底层TCP相关的选项ChannelOption.SO_KEEPALIVE。该选项表示:是否开启TCP底层心跳机制,true为开启,false为关闭。</p><p class="section" data-paragraphid="937525bb278e428f8a661b1db444761d_5">其他的通道设置选项,参见下一小节。</p><p class="section" data-paragraphid="5b1e7a4111614a35937fca3fd47f7584_5"><span class="bold">第5步:装配子通道的Pipeline流水线</span></p><p class="section" data-paragraphid="3e4978171e0f48ea9a3306d579a3eac5_5">上一节介绍到,每一个通道的子通道,都用一条ChannelPipeline流水线。它的内部有一个双向的链表。装配流水线的方式是:将业务处理器ChannelHandler实例加入双向链表中。</p><p class="section" data-paragraphid="956aa489a6384a209c7824399bacf7c1_5">装配子通道的Handler流水线调用childHandler()方法,传递一个ChannelInitializer通道初始化类的实例。在父通道成功接收一个连接,并创建成功一个子通道后,就会初始化子通道,这里配置的ChannelInitializer实例就会被调用。</p><p class="section" data-paragraphid="6f285ebc1dac44c4a61382aaf0cdfe42_5">在ChannelInitializer通道初始化类的实例中,有一个initChannel初始化方法,在子通道创建后会被执行到,向子通道流水线增加业务处理器。</p><p class="yd-code section" data-paragraphid="07006449048d4347b548dee308bc4d87_5">//5 装配子通道流水线
b.childHandler(new ChannelInitializer<SocketChannel>() {
//有连接到达时会创建一个通道的子通道,并初始化
protected void initChannel(SocketChannelch) throws Exception {
// 流水线管理子通道中的Handler业务处理器
// 向子通道流水线添加一个Handler业务处理器
ch.pipeline().addLast(new NettyDiscardHandler());
}
});</p><p class="section" data-paragraphid="09c88744546d4d46b7b5416c664fffff_5">为什么仅装配子通道的流水线,而不需要装配父通道的流水线呢?原因是:父通道也就是NioServerSocketChannel连接接受通道,它的内部业务处理是固定的:接受新连接后,创建子通道,然后初始化子通道,所以不需要特别的配置。如果需要完成特殊的业务处理,可以使用ServerBootstrap的handler(ChannelHandler handler)方法,为父通道设置ChannelInitializer初始化器。</p><p class="section" data-paragraphid="9a67f1f3df254f23af95ceed4238b0f0_5">说明一下,ChannelInitializer处理器器有一个泛型参数SocketChannel,它代表需要初始化的通道类型,这个类型需要和前面的启动器中设置的通道类型,一一对应起来。</p><p class="section" data-paragraphid="61761428bd1c43fc8998713df5a2ec44_5"><span class="bold">第6步:开始绑定服务器新连接的监听端口</span></p><p class="yd-code section" data-paragraphid="350f6c3e4a0f4d1ca8f5438d1fe32ebb_5">// 6 开始绑定端口,通过调用sync同步方法阻塞直到绑定成功
ChannelFuturechannelFuture = b.bind().sync();
Logger.info(" 服务器启动成功,监听端口: " +
channelFuture.channel().localAddress());</p><p class="section" data-paragraphid="1a8d9d6ebdb4454a8b11b2a7c7c9d569_5">这个也很简单。b.bind()方法的功能:返回一个端口绑定Netty的异步任务channelFuture。在这里,并没有给channelFuture异步任务增加回调监听器,而是阻塞channelFuture异步任务,直到端口绑定任务执行完成。</p><p class="section" data-paragraphid="8020f741deb547b099b2555f47fe61ae_5">在Netty中,所有的IO操作都是异步执行的,这就意味着任何一个IO操作会立刻返回,在返回的时候,异步任务还没有真正执行。什么时候执行完成呢?Netty中的IO操作,都会返回异步任务实例(如ChannelFuture实例),通过自我阻塞一直到ChannelFuture异步任务执行完成,或者为ChannelFuture增加事件监听器的两种方式,以获得Netty中的IO操作的真正结果。上面使用了第一种。</p><p class="section" data-paragraphid="f4621a321b6348769697a527267b3c17_5">至此,服务器正式启动。</p><p class="section" data-paragraphid="8626828d1d664753a49d82b346df3bf0_5"><span class="bold">第7步:自我阻塞,直到通道关闭</span></p><p class="yd-code section" data-paragraphid="6ec09c11e7684c00a24a9cbce861eaf9_5">// 7 等待通道关闭
// 自我阻塞,直到通道关闭的异步任务结束
ChannelFuturecloseFuture = channelFuture.channel().closeFuture();
closeFuture.sync();</p><p class="section" data-paragraphid="2202f4d06cfd45de8ba63726f861523d_5">如果要阻塞当前线程直到通道关闭,可以使用通道的closeFuture()方法,以获取通道关闭的异步任务。当通道被关闭时,closeFuture实例的sync()方法会返回。</p><p class="section" data-paragraphid="02c3a98bbdbb48b396375f253bf9f5b2_5"><span class="bold">第8步:关闭EventLoopGroup</span></p><p class="yd-code section" data-paragraphid="9ef4f6dd6d844103b3edf2af1c829955_5">// 8关闭EventLoopGroup,
// 释放掉所有资源,包括创建的反应器线程
workerLoopGroup.shutdownGracefully();
bossLoopGroup.shutdownGracefully();</p><p class="section" data-paragraphid="f2c9c693086140ba90b850111f73d146_5">关闭Reactor反应器线程组,同时会关闭内部的subReactor子反应器线程,也会关闭内部的Selector选择器、内部的轮询线程以及负责查询的所有的子通道。在子通道关闭后,会释放掉底层的资源,如TCP Socket文件描述符等。</p></div>
<div class="ne-content J_NEContent" style="min-height: 380px;"><h3 class="section j-chapter" data-paragraphid="d6354f8b3dad43848bf7ae07df7c1297_5">6.3.4 ChannelOption通道选项</h3><p class="section" data-paragraphid="3a03798801494205b158e2812da9ab28_5">无论是对于NioServerSocketChannel父通道类型,还是对于NioSocketChannel子通道类型,都可以设置一系列的ChannelOption选项。在ChannelOption类中,定义了一大票通道选项。下面介绍一些常见的选项。</p><p class="section" data-paragraphid="7c9c9d561f504f0ea2bebb38460f6601_5"><span class="bold">1. SO_RCVBUF,SO_SNDBUF</span></p><p class="section" data-paragraphid="fdac18c8243a498b80d261c1c2096eab_5">此为TCP参数。每个TCP socket(套接字)在内核中都有一个发送缓冲区和一个接收缓冲区,这两个选项就是用来设置TCP连接的这两个缓冲区大小的。TCP的全双工的工作模式以及TCP的滑动窗口便是依赖于这两个独立的缓冲区及其填充的状态。</p><p class="section" data-paragraphid="c7c7ef4876cf4eb899322bb739ee75b3_5"><span class="bold">2. TCP_NODELAY</span></p><p class="section" data-paragraphid="33a02f9d1dcf45af8324baf15ea8545a_5">此为TCP参数。表示立即发送数据,默认值为True(Netty默认为True,而操作系统默认为False)。该值用于设置Nagle算法的启用,该算法将小的碎片数据连接成更大的报文(或数据包)来最小化所发送报文的数量,如果需要发送一些较小的报文,则需要禁用该算法。Netty默认禁用该算法,从而最小化报文传输的延时。</p><p class="section" data-paragraphid="26eab14dc16f4aa6a216af8b98c61084_5">说明一下:这个参数的值,与是否开启Nagle算法是相反的,设置为true表示关闭,设置为false表示开启。通俗地讲,如果要求高实时性,有数据发送时就立刻发送,就设置为true,如果需要减少发送次数和减少网络交互次数,就设置为false。</p><p class="section" data-paragraphid="9f3536d4a4e94ac99116c3372d5df0b4_5"><span class="bold">3. SO_KEEPALIVE</span></p><p class="section" data-paragraphid="49aac33ea23b4a7f8f720abcbb8097cd_5">此为TCP参数。表示底层TCP协议的心跳机制。true为连接保持心跳,默认值为false。启用该功能时,TCP会主动探测空闲连接的有效性。可以将此功能视为TCP的心跳机制,需要注意的是:默认的心跳间隔是7200s即2小时。Netty默认关闭该功能。</p><p class="section" data-paragraphid="9257bb81e1a84799a05fe276e74047af_5"><span class="bold">4. SO_REUSEADDR</span></p><p class="section" data-paragraphid="a028c67886a243af9a8482d3aa1b6665_5">此为TCP参数。设置为true时表示地址复用,默认值为false。有四种情况需要用到这个参数设置:</p><p class="section" data-paragraphid="72497c600f674371bccb6ff59b12c9b3_5"><span class="yd-font-hkkai">• 当有一个有相同本地地址和端口的socket1处于TIME_WAIT状态时,而我们希望启动的程序的socket2要占用该地址和端口。例如在重启服务且保持先前端口时。</span></p><p class="section" data-paragraphid="ed32091d11f7426eb164b762f6ff2bbb_5"><span class="yd-font-hkkai">• 有多块网卡或用IP Alias技术的机器在同一端口启动多个进程,但每个进程绑定的本地IP地址不能相同。</span></p><p class="section" data-paragraphid="b2f87bdfaa0141b4adb9238a3c32f03a_5"><span class="yd-font-hkkai">• 单个进程绑定相同的端口到多个socket(套接字)上,但每个socket绑定的IP地址不同。</span></p><p class="section" data-paragraphid="cddf235522ba4dc7bebf45e37c955301_5"><span class="yd-font-hkkai">• 完全相同的地址和端口的重复绑定。但这只用于UDP的多播,不用于TCP。</span></p><p class="section" data-paragraphid="8aa104c1cbac4f499c450fc1a5b7e594_5"><span class="bold">5. SO_LINGER</span></p><p class="section" data-paragraphid="58c70f4e4acc43c9a38c373f349ba4ea_5">此为TCP参数。表示关闭socket的延迟时间,默认值为-1,表示禁用该功能。-1表示socket.close()方法立即返回,但操作系统底层会将发送缓冲区全部发送到对端。0表示socket.close()方法立即返回,操作系统放弃发送缓冲区的数据,直接向对端发送RST包,对端收到复位错误。非0整数值表示调用socket.close()方法的线程被阻塞,直到延迟时间到来、发送缓冲区中的数据发送完毕,若超时,则对端会收到复位错误。</p><p class="section" data-paragraphid="c5366055a3cc4c418376fe81b7724c5b_5"><span class="bold">6. SO_BACKLOG</span></p><p class="section" data-paragraphid="825b52025e3b466fa81740dcd7671d83_5">此为TCP参数。表示服务器端接收连接的队列长度,如果队列已满,客户端连接将被拒绝。默认值,在Windows中为200,其他操作系统为128。</p><p class="section" data-paragraphid="b17660b3faf54239887343560a51c492_5">如果连接建立频繁,服务器处理新连接较慢,可以适当调大这个参数。</p><p class="section" data-paragraphid="225f6e5600424682b0f5c8dea1597def_5"><span class="bold">7. SO_BROADCAST</span></p><p class="section" data-paragraphid="6f8c057c4eeb40e9bb65bf6dd1a1db99_5">此为TCP参数。表示设置广播模式。</p></div>
<div class="ne-content J_NEContent" style="min-height: 380px;"><h2 class="section j-chapter" data-paragraphid="84a8d7cf99b344abbd531cf48f45b2c7_5">6.4 详解Channel通道</h2><p class="section" data-paragraphid="db5501de48e14527af3764218d4692f3_5">先介绍一下,在使用Channel通道的过程中所涉及的主要成员和方法。然后,为大家介绍一下Netty所提供了一个专门的单元测试通道——EmbeddedChannel(嵌入式通道)。</p></div>
<div class="ne-content J_NEContent" style="min-height: 380px;"><h3 class="section j-chapter" data-paragraphid="530dcff67cd34ef2a95eb0de6729a96a_5">6.4.1 Channel通道的主要成员和方法</h3><p class="section" data-paragraphid="ce86663a62d14e69880fbbcde3c90145_5">在Netty中,通道是其中的核心概念之一,代表着网络连接。通道是通信的主题,由它负责同对端进行网络通信,可以写入数据到对端,也可以从对端读取数据。</p><p class="section" data-paragraphid="0bc33fa62df04061aa93bf2d2ae543d7_5">通道的抽象类AbstractChannel的构造函数如下:</p><p class="yd-code section" data-paragraphid="053317f6a32649299586b217a6392e1b_5">protected AbstractChannel(Channel parent) {
this.parent = parent; //父通道
id = newId();
unsafe = newUnsafe(); //底层的NIO 通道,完成实际的IO操作
pipeline = newChannelPipeline(); //一条通道,拥有一条流水线
}</p><p class="section" data-paragraphid="915e5ed3ea544e54a517e35dc27920e6_5">AbstractChannel内部有一个pipeline属性,表示处理器的流水线。Netty在对通道进行初始化的时候,将pipeline属性初始化为DefaultChannelPipeline的实例。这段代码也表明,每个通道拥有一条ChannelPipeline处理器流水线。</p><p class="section" data-paragraphid="5d1599d1cf424383a93684426cbe7867_5">AbstractChannel内部有一个parent属性,表示通道的父通道。对于连接监听通道(如NioServerSocketChannel实例)来说,其父亲通道为null;而对于每一条传输通道(如NioSocketChannel实例),其parent属性的值为接收到该连接的服务器连接监听通道。</p><p class="section" data-paragraphid="7b82897fec204ba6a883da5559063a9a_5">几乎所有的通道实现类都继承了AbstractChannel抽象类,都拥有上面的parent和pipeline两个属性成员。</p><p class="section" data-paragraphid="46bd4fc11a79476591e3d4e8fba0c6f8_5">再来看一下,在通道接口中所定义的几个重要方法:</p><p class="section" data-paragraphid="7c045e9595cd4387aec3ab3d06a414ec_5"><span class="bold">方法1.ChannelFuture connect(SocketAddress address)</span></p><p class="section" data-paragraphid="9c10b743df19498ea67896838616a19e_5">此方法的作用为:连接远程服务器。方法的参数为远程服务器的地址,调用后会立即返回,返回值为负责连接操作的异步任务ChannelFuture。此方法在客户端的传输通道使用。</p><p class="section" data-paragraphid="863376875004423aa48bd99681d4662b_5"><span class="bold">方法2.ChannelFuture bind(SocketAddress address)</span></p><p class="section" data-paragraphid="dc197032016a4a7798f6be8dcc2f63b5_5">此方法的作用为:绑定监听地址,开始监听新的客户端连接。此方法在服务器的新连接监听和接收通道使用。</p><p class="section" data-paragraphid="739e4806135d4fc1ac26b466b593efe1_5"><span class="bold">方法3.ChannelFuture close()</span></p><p class="section" data-paragraphid="5bcecca0d8f943c28340dbc45e4358ec_5">此方法的作用为:关闭通道连接,返回连接关闭的ChannelFuture异步任务。如果需要在连接正式关闭后执行其他操作,则需要为异步任务设置回调方法;或者调用ChannelFuture异步任务的sync( )方法来阻塞当前线程,一直等到通道关闭的异步任务执行完毕。</p><p class="section" data-paragraphid="fb9558f129fb415daa204d6550a5ee8b_5"><span class="bold">方法4.Channel read()</span></p><p class="section" data-paragraphid="6fb72f508d844cb5a79cd1f068ecc6c8_5">此方法的作用为:读取通道数据,并且启动入站处理。具体来说,从内部的Java NIO Channel通道读取数据,然后启动内部的Pipeline流水线,开启数据读取的入站处理。此方法的返回通道自身用于链式调用。</p><p class="section" data-paragraphid="7b39e0d4e5ed44efab526a2dde505aef_5"><span class="bold">方法5.ChannelFuture write(Object o)</span></p><p class="section" data-paragraphid="2802225de27f428ebdd898f30d623312_5">此方法的作用为:启程出站流水处理,把处理后的最终数据写到底层Java NIO通道。此方法的返回值为出站处理的异步处理任务。</p><p class="section" data-paragraphid="3860cd9fbaca471799e1e38c27f575cb_5"><span class="bold">方法6.Channel flush()</span></p><p class="section" data-paragraphid="872f16159e8f4bc897bb5341c0264e14_5">此方法的作用为:将缓冲区中的数据立即写出到对端。并不是每一次write操作都是将数据直接写出到对端,write操作的作用在大部分情况下仅仅是写入到操作系统的缓冲区,操作系统会将根据缓冲区的情况,决定什么时候把数据写到对端。而执行flush()方法立即将缓冲区的数据写到对端。</p><p class="section" data-paragraphid="3b7aff530a4444de9731e64df5044326_5">上面的6种方法,仅仅是比较常见的方法。在Channel接口中以及各种通道的实现类中,还定义了大量的通道操作方法。在一般的日常的开发中,如果需要用到,请直接查阅Netty API文档或者Netty源代码。</p></div>
<div class="ne-content J_NEContent" style="min-height: 380px;"><h3 class="section j-chapter" data-paragraphid="d3c4a017655e4680aeb2ef05e16e7a36_5">6.4.2 EmbeddedChannel嵌入式通道</h3><p class="section" data-paragraphid="5c146459a4104a5ab2241d898c246e6e_5">在Netty的实际开发中,通信的基础工作,Netty已经替大家完成。实际上,大量的工作是设计和开发ChannelHandler通道业务处理器,而不是开发Outbound出站处理器,换句话说就是开发Inbound入站处理器。开发完成后,需要投入单元测试。单元测试的大致流程是:需要将Handler业务处理器加入到通道的Pipeline流水线中,接下来先后启动Netty服务器、客户端程序,相互发送消息,测试业务处理器的效果。如果每开发一个业务处理器,都进行服务器和客户端的重复启动,这整个的过程是非常的烦琐和浪费时间的。如何解决这种徒劳的、低效的重复工作呢?</p><p class="section" data-paragraphid="4cf63943eed54297b5414154051aa558_5">Netty提供了一个专用通道——名字叫EmbeddedChannel(嵌入式通道)。</p><p class="section" data-paragraphid="803ca89a5a4c48418ea6cc8285531388_5">EmbeddedChannel仅仅是模拟入站与出站的操作,底层不进行实际的传输,不需要启动Netty服务器和客户端。除了不进行传输之外,EmbeddedChannel的其他的事件机制和处理流程和真正的传输通道是一模一样的。因此,使用它,开发人员可以在开发的过程中方便、快速地进行ChannelHandler业务处理器的单元测试。</p><p class="section" data-paragraphid="4229503d3f6b4072ad4de3857fd0a762_5">为了模拟数据的发送和接收,EmbeddedChannel提供了一组专门的方法,具体如表6-1所示。</p><p class="yd-paragraph-c section" data-paragraphid="1b6647ccba51446db4bdc77cbd6a4fd2_5"><span class="bold">表6-1 EmbeddedChannel单元测试的辅助方法</span></p><p class="section" data-paragraphid="d0ab161562a84283af15e2f777f39551_5" style="clear: both;"><img class="paragraph-img" data-src="https://easyreadfs.nosdn.127.net/image_fb77b280cdcf4198aa930db69473d867" data-width="1436" data-height="493" src="https://easyreadfs.nosdn.127.net/image_fb77b280cdcf4198aa930db69473d867" style="width: 660px;"></p><p class="section" data-paragraphid="c25102a0978b44f4baba2166bbadbc2e_5">最为重要的两个方法为:writeInbound和readOutbound方法。</p><p class="section" data-paragraphid="9d3ef7da75c44099a3709a3675a67f6c_5"><span class="bold">方法1.writeInbound入站数据写到通道</span></p><p class="section" data-paragraphid="f313b1fc84c7408c863edad0627ab0d8_5">它的使用场景是:测试入站处理器。在测试入站处理器时(例如测试一个解码器),需要读取Inbound(入站)数据。可以调用writeInbound方法,向EmbeddedChannel写入一个入站二进制ByteBuf数据包,模拟底层的入站包。</p><p class="section" data-paragraphid="4bb202ea5815491dbf19df8abd25ed39_5"><span class="bold">方法2.readOutbound读取通道的出站数据</span></p><p class="section" data-paragraphid="45c78b07aab7440cb8769694b2be42c9_5">它的使用场景是:测试出站处理器。在测试出站处理器时(例如测试一个编码器),需要查看处理过的结果数据。可以调用readOutbound方法,读取通道的最终出站结果,它是经过流水线一系列的出站处理后,最终的出站数据包。比较绕口,重复一遍,通过readOutbound,可以读取完成EmbeddedChannel最后一个出站处理器,处理后的ByteBuf二进制出站包。</p><p class="section" data-paragraphid="8a12b9e851d142f49c375eed5cf05ded_5">总之,这个EmbeddedChannel类,既具备通道的通用接口和方法,又增加了一些单元测试的辅助方法,在开发时是非常有用的。它的具体用法,后面还会结合其他的Netty组件的实例反复提到。</p></div>
<div class="ne-content J_NEContent" style="min-height: 380px;"><h2 class="section j-chapter" data-paragraphid="d4e07f99394c42539c7f4effaffd1f44_5">6.5 详解Handler业务处理器</h2><p class="section" data-paragraphid="3fc9ae12e70b45d48652b35a95cd6d95_5">在Reactor反应器经典模型中,反应器查询到IO事件后,分发到Handler业务处理器,由Handler完成IO操作和业务处理。</p><p class="section" data-paragraphid="a6a533508395455a9cf2493fa62b3556_5">整个的IO处理操作环节包括:从通道读数据包、数据包解码、业务处理、目标数据编码、把数据包写到通道,然后由通道发送到对端,如图6-8所示。</p><p class="section" data-paragraphid="bfacb21ae32c44d3aa0a107420856992_5">前后两个环节,从通道读数据包和由通道发送到对端,由Netty的底层负责完成,不需要用户程序负责。</p><p class="section" data-paragraphid="381f3ba046a94b308ad06c4cbe00230d_5" style="clear: both;"><img class="paragraph-img" data-src="https://easyreadfs.nosdn.127.net/image_c7cd89a1a56549aa8f58df56614bf2dc" data-width="951" data-height="383" src="https://easyreadfs.nosdn.127.net/image_c7cd89a1a56549aa8f58df56614bf2dc" style="width: 660px;"></p><p class="yd-imagetitle section" data-paragraphid="fad15b28dc0f46d6b6dfa6050e39a64d_5">图6-8 整个的IO处理操作环节</p><p class="section" data-paragraphid="6478f1656bbc4fe586bf61d6e77ba3c8_5">用户程序主要在Handler业务处理器中,Handler涉及的环节为:数据包解码、业务处理、目标数据编码、把数据包写到通道中。</p><p class="section" data-paragraphid="49ca94b6e99542e5b739d24ddc02a318_5">前面已经介绍过,从应用程序开发人员的角度来看,有入站和出站两种类型操作。</p><p class="section" data-paragraphid="18a172efba394b6687f0fe2c063b882d_5"><span class="yd-font-hkkai">• 入站处理,触发的方向为:自底向上,Netty的内部(如通道)到ChannelInboundHandler入站处理器。</span></p><p class="section" data-paragraphid="48934d02ba0047a88ecdb413db3fce77_5"><span class="yd-font-hkkai">• 出站处理,触发的方向为:自顶向下,从ChannelOutboundHandler出站处理器到Netty的内部(如通道)。</span></p><p class="section" data-paragraphid="55b5487d9470459ba3b1c741297a6ac8_5">按照这种方向来分,前面数据包解码、业务处理两个环节——属于入站处理器的工作;后面目标数据编码、把数据包写到通道中两个环节——属于出站处理器的工作。</p></div>
<div class="ne-content J_NEContent" style="min-height: 380px;"><h3 class="section j-chapter" data-paragraphid="b4cb5e07bd5543639b19c7c1341ca259_5">6.5.1 ChannelInboundHandler通道入站处理器</h3><p class="section" data-paragraphid="e1d29736afca428385804b894e088bb5_5">当数据或者信息入站到Netty通道时,Netty将触发入站处理器ChannelInboundHandler所对应的入站API,进行入站操作处理。</p><p class="section" data-paragraphid="585e9e0f79544034b1491e9bd088d7d5_5">ChannelInboundHandler的主要操作,如图6-9所示,具体的介绍如下:</p><p class="section" data-paragraphid="654dfec3442541dca6da948483b5abf4_5" style="clear: both;"><img class="paragraph-img" data-src="https://easyreadfs.nosdn.127.net/image_e4623cec888c4f44b9f05f7657b11b68" data-width="819" data-height="616" src="https://easyreadfs.nosdn.127.net/image_e4623cec888c4f44b9f05f7657b11b68" style="width: auto; height: 380px;"></p><p class="yd-imagetitle section" data-paragraphid="ee06daef17344c3cac40c472823be2e7_5">图6-9 ChannelInboundHandler的主要操作</p><p class="section" data-paragraphid="dab3e0f588fc4c9cbacbd9445fda44ca_5"><span class="bold">1. channelRegistered</span></p><p class="section" data-paragraphid="b866e003a3d04a7bbf11ef0b00a845f0_5">当通道注册完成后,Netty会调用fireChannelRegistered,触发通道注册事件。通道会启动该入站操作的流水线处理,在通道注册过的入站处理器Handler的channelRegistered方法,会被调用到。</p><p class="section" data-paragraphid="2427ca1ba2e04b62b0038c9e53f9809b_5"><span class="bold">2. channelActive</span></p><p class="section" data-paragraphid="25c1c9b0c17e4b54b843083b5d1d5f28_5">当通道激活完成后,Netty会调用fireChannelActive,触发通道激活事件。通道会启动该入站操作的流水线处理,在通道注册过的入站处理器Handler的channelActive方法,会被调用到。</p><p class="section" data-paragraphid="d855cdca15de4b08b3b91f1dfaa29b06_5"><span class="bold">3. channelRead</span></p><p class="section" data-paragraphid="0820da23d35c4cb19687cad7e9d1f539_5">当通道缓冲区可读,Netty会调用fireChannelRead,触发通道可读事件。通道会启动该入站操作的流水线处理,在通道注册过的入站处理器Handler的channelRead方法,会被调用到。</p><p class="section" data-paragraphid="374e253b57d04b7a80956fafbfde8721_5"><span class="bold">4. channelReadComplete</span></p><p class="section" data-paragraphid="5337245b4d324fcf9f38c0d1e7dfc0f1_5">当通道缓冲区读完,Netty会调用fireChannelReadComplete,触发通道读完事件。通道会启动该入站操作的流水线处理,在通道注册过的入站处理器Handler的channelReadComplete方法,会被调用到。</p><p class="section" data-paragraphid="bca716c2e86b4e2f98b5b0d6ea35a594_5"><span class="bold">5. channelInactive</span></p><p class="section" data-paragraphid="2f59133d23fb4faeb5ae3465182d08c3_5">当连接被断开或者不可用,Netty会调用fireChannelInactive,触发连接不可用事件。通道会启动对应的流水线处理,在通道注册过的入站处理器Handler的channelInactive方法,会被调用到。</p><p class="section" data-paragraphid="1db6e36fdb2247ada973844404a24f75_5"><span class="bold">6. exceptionCaught</span></p><p class="section" data-paragraphid="0c9216827d7c4a79838000170f6341be_5">当通道处理过程发生异常时,Netty会调用fireExceptionCaught,触发异常捕获事件。通道会启动异常捕获的流水线处理,在通道注册过的处理器Handler的exceptionCaught方法,会被调用到。注意,这个方法是在通道处理器中ChannelHandler定义的方法,入站处理器、出站处理器接口都继承到了该方法。</p><p class="section" data-paragraphid="6b224a6c56b14e14918e448e88979cc8_5">上面介绍的并不是ChanneInboundHandler的全部方法,仅仅介绍了其中几种比较重要的方法。在Netty中,它的默认实现为ChannelInboundHandlerAdapter,在实际开发中,只需要继承这个ChannelInboundHandlerAdapter默认实现,重写自己需要的方法即可。</p></div>
<div class="ne-content J_NEContent" style="min-height: 380px;"><h3 class="section j-chapter" data-paragraphid="1b071317fdc341fb8bdf4fe20e8b8623_5">6.5.2 ChannelOutboundHandler通道出站处理器</h3><p class="section" data-paragraphid="571f24d914f04eba82e352bed4129ae2_5">当业务处理完成后,需要操作Java NIO底层通道时,通过一系列的ChannelOutboundHandler通道出站处理器,完成Netty通道到底层通道的操作。比方说建立底层连接、断开底层连接、写入底层Java NIO通道等。ChannelOutboundHandler接口定义了大部分的出站操作,如图6-10所示,具体的介绍如下:</p><p class="section" data-paragraphid="32872eda841f497b86d6076071881a6e_5" style="clear: both;"><img class="paragraph-img" data-src="https://easyreadfs.nosdn.127.net/image_08d2d0d7b18d496aa84bcdf37ae69c04" data-width="853" data-height="654" src="https://easyreadfs.nosdn.127.net/image_08d2d0d7b18d496aa84bcdf37ae69c04" style="width: auto; height: 380px;"></p><p class="yd-imagetitle section" data-paragraphid="1eec92a037f74e08ba9d0315d098dc21_5">如图6-10 ChannelOutboundHandler的主要操作</p><p class="section" data-paragraphid="557ced287e694286aa92787490121674_5">再强调一下,出站处理的方向:是通过上层Netty通道,去操作底层Java IO通道。主要出站(Outbound)的操作如下:</p><p class="section" data-paragraphid="64f2eae7008749429b3ef374f3e48a3a_5"><span class="bold">1. bind</span></p><p class="section" data-paragraphid="7db9169c1cb0412eb7b0a8716d10acdf_5">监听地址(IP+端口)绑定:完成底层Java IO通道的IP地址绑定。如果使用TCP传输协议,这个方法用于服务器端。</p><p class="section" data-paragraphid="e2c4301e8a07464c80f786cd13253b82_5"><span class="bold">2. connect</span></p><p class="section" data-paragraphid="c16c175b5e414cd0bce5eff20bc270f5_5">连接服务端:完成底层Java IO通道的服务器端的连接操作。如果使用TCP传输协议,这个方法用于客户端。</p><p class="section" data-paragraphid="72781b9fc2fd40a58e7d81fc5434c120_5"><span class="bold">3. write</span></p><p class="section" data-paragraphid="a50ea1aca84a4101be637650fd027ba6_5">写数据到底层:完成Netty通道向底层Java IO通道的数据写入操作。此方法仅仅是触发一下操作而已,并不是完成实际的数据写入操作。</p><p class="section" data-paragraphid="76ffea7fc45c4d34a3e222410b721402_5"><span class="bold">4. flush</span></p><p class="section" data-paragraphid="50d5ae8ec8784e9e96fdddd0b00fd60b_5">腾空缓冲区中的数据,把这些数据写到对端:将底层缓存区的数据腾空,立即写出到对端。</p><p class="section" data-paragraphid="b62ca1c3951f4dd38830f5829518fee5_5"><span class="bold">5. read</span></p><p class="section" data-paragraphid="0393b6c40f544da4a5e4bfb626beb539_5">从底层读数据:完成Netty通道从Java IO通道的数据读取。</p><p class="section" data-paragraphid="e6446a0116344658829b37f0e7b09edc_5"><span class="bold">6. disConnect</span></p><p class="section" data-paragraphid="95d803b007064d1c87713dafff737f3b_5">断开服务器连接:断开底层Java IO通道的服务器端连接。如果使用TCP传输协议,此方法主要用于客户端。</p><p class="section" data-paragraphid="cb4c7c29cc5f4edf8e43874a14afe68f_5"><span class="bold">7. close</span></p><p class="section" data-paragraphid="1a8d4e49e6c54c398579ce1f5b776f30_5">主动关闭通道:关闭底层的通道,例如服务器端的新连接监听通道。</p><p class="section" data-paragraphid="aac7f3639e15480aa1ceac833a5dcd31_5">上面介绍的并不是ChannelOutboundHandler的全部方法,仅仅介绍了其中几个比较重要的方法。在Netty中,它的默认实现为ChannelOutboundHandlerAdapter,在实际开发中,只需要继承这个ChannelOutboundHandlerAdapter默认实现,重写自己需要的方法即可。</p></div>
<div class="ne-content J_NEContent" style="min-height: 380px;"><h3 class="section j-chapter" data-paragraphid="748071b29d4b474fb2e3864f707855fc_5">6.5.3 ChannelInitializer通道初始化处理器</h3><p class="section" data-paragraphid="69c2d9d874a442bda54cdb3c322b4518_5">在前面已经讲到,通道和Handler业务处理器的关系是:一条Netty的通道拥有一条Handler业务处理器流水线,负责装配自己的Handler业务处理器。装配Handler的工作,发生在通道开始工作之前。现在的问题是:如果向流水线中装配业务处理器呢?这就得借助通道的初始化类——ChannelInitializer。</p><p class="section" data-paragraphid="c6c8cdf134334a8d83f43f2f7cea5065_5">首先回顾一下NettyDiscardServer丢弃服务端的代码,在给接收到的新连接装配Handler业务处理器时,使用childHandler()方法设置了一个ChannelInitializer实例:</p><p class="yd-code section" data-paragraphid="90c958807daf4130b661fb139400b074_5">//5 装配子通道流水线
b.childHandler(new ChannelInitializer<SocketChannel>() {
//有连接到达时会创建一个通道
protected void initChannel(SocketChannelch) throws Exception {
// 流水线管理子通道中的Handler业务处理器
// 向子通道流水线添加一个Handler业务处理器
ch.pipeline().addLast(new NettyDiscardHandler());
}
});</p><p class="section" data-paragraphid="2c1a2211c5ee48e58b95a848a63b1103_5">上面的ChannelInitializer也是通道初始化器,属于入站处理器的类型。在示例代码中,使用了ChannelInitializer的initChannel()方法。它是何方神圣呢?</p><p class="section" data-paragraphid="f0c4895700054394aec007d6cedc99fb_5">initChannel()方法是ChannelInitializer定义的一个抽象方法,这个抽象方法需要开发人员自己实现。在父通道调用initChannel()方法时,会将新接收的通道作为参数,传递给initChannel()方法。initChannel()方法内部大致的业务代码是:拿到新连接通道作为实际参数,往它的流水线中装配Handler业务处理器。</p></div>