在学习netty源码之前,应该对netty的基本用法有所了解,由于netty大多数时候用于开发服务器端程序,因此下面以一个时间服务器为例,演示Netty的基本使用,并对主要概念进行介绍。
2.1 服务器启动程序
时间服务器很简单,每次收到QUERY TIME ORDER
请求后返回当前时间。
- main方法中通过ServerBootstrap启动netty服务器
//创建两个线程组,专门用于网络事件的处理,Reactor线程组
//一个用来接收客户端的连接,
//一个用来进行SocketChannel的网络读写
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try{
//辅助启动类
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,workGroup) // 注册两个线程组
.channel(NioServerSocketChannel.class)//创建的channel为NioServerSocketChannel【nio-ServerSocketChannel】
.option(ChannelOption.SO_BACKLOG, 1024) // 设置TCP属性
.childOption(ChannelOption.SO_KEEPALIVE, true) //配置与cient连接的channel属性
.childHandler(new ChildChannelHandler());//处理IO事件的处理类,处理网络事件
ChannelFuture f = b.bind(port).sync();//绑定端口后同步等待
f.channel().closeFuture().sync();//阻塞等待
}catch(Exception e){
e.printStackTrace();
}finally{
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
- 定义ChannelInitializer,会在ServerChannel注册到事件循环后触发initChannel事件。
// ChannelHandler初始化处理器
class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeServerHandler());
}
}
- TimeServerHandler 里面负责处理业务逻辑,发送当前时间
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ByteBuf buf=(ByteBuf) msg;//将msg转换成Netty的ByteBuf对象
byte[] req=new byte[buf.readableBytes()];
buf.readBytes(req);
String body=new String(req,"GBK");
System.out.println("The time server receive order : "+body);
String currentTime="QUERY TIME ORDER".equalsIgnoreCase(body)?new Date(System.currentTimeMillis()).toString():"BAD ORDER";
ByteBuf resp=Unpooled.copiedBuffer(currentTime.getBytes());
ctx.write(resp);//只是写入缓冲区
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();//通过网络发送
}
}
2.2 过程解析
- 创建辅助启动类ServerBootstrap,并设置相关配置:
- group() 设置处理Accept事件和读写操作的事件循环组
- channel() 设置通道类型为NioServerSocketChannel,这是netty自己定义的Channel,指的是服务器通道,内部包含java原生的服务器端通道
ServerSocketChannel
,相应的还有客户端通道NioSocketChannel
。 - option()/childOption() 设置服务器通道的选项和建立连接后的客户端通道的选项
- childHandler() 设置子处理器,内部需要将用户自定义的处理器加入到netty的pipeline中,这涉及到Channel,ChannelHandler和Pipeline,后续会有讲解 。
2.调用bind()方法绑定端口,sync()会阻塞等待处理请求。这是因为bind()方法是一个异步过程,会立即返回一个ChannelFuture对象,调用sync()会等待执行完成。
3.获得Channel的closeFuture阻塞等待关闭,服务器Channel关闭时closeFuture会完成并释放。
Future的使用参见 Netty源码分析-04 Future和Promise
2.3 相关概念
在学习Netty的源码之前,需要对Netty的主要概念进行了解,主要是初步明白每个类负责的任务是什么,能够完成哪些工作。当然,每个概念的具体实现会在后续章节中进行介绍。
Channel
这里的Channel与Java的Channel不是同一个,是netty自己定义的通道;Netty的Channel是对网络连接处理的抽象,负责与网络进行通讯,支持NIO和OIO两种方式;内部与网络socket连接,通过channel能够进行I/O操作,如读、写、连接和绑定。
通过Channel可以执行具体的I/O操作,如read, write, connect, 和bind。在Netty中,所有I/O操作都是异步的;Netty的服务器端处理客户端连接的Channel创建时可以设置父Channel。例如:ServerSocketChannel接收到请求创建SocketChannel,SocketChannel的父为ServerSocketChannel。
ChannelHandler与ChannelPipeline
ChannelHandler是通道处理器,用来处理I/O事件或拦截I/O操作,ChannelPipeline字如其名,是一个双向流水线,内部维护了多个ChannelHandler,服务器端收到I/O事件后,每次顺着ChannelPipeline依次调用ChannelHandler的相关方法。
ChannelHandler是个接口,通常我们在Netty中需要使用下面的子类:
- ChannelInboundHandler 用来处理输入的I/O事件
- ChannelOutboundHandler 用来处理输出的I/O事件
另外,下面的adapter类提供了
- ChannelInboundHandlerAdapter 用来处理输入的I/O事件
- ChannelOutboundHandlerAdapter 用来处理输出的I/O事件
- ChannelDuplexHandler 可以用来处理输入和输出的I/O事件
Netty的ChannelPipeline和ChannelHandler机制类似于Servlet和Filter过滤器/拦截器,每次收到请求会依次调用配置好的拦截器链。Netty服务器收到消息后,将消息在ChannelPipeline中流动和传递,途经的ChannelHandler会对消息进行处理,ChannelHandler分为两种inbound和outbound,服务器read过程中只会调用inbound的方法,write时只寻找链中的outbound的Handler。
ChannelPipeline内部维护了一个双向链表,Head和Tail分别代表表头和表尾,Head作为总入口和总出口,负责底层的网络读写操作;用户自己定义的ChannelHandler会被添加到链表中,这样就可以对I/O事件进行拦截和处理;这样的好处在于用户可以方便的通过新增和删除链表中的ChannelHandler来实现不同的业务逻辑,不需要对已有的ChannelHandler进行修改。
如图所示,在服务器初始化后,ServerSocketChannel的会创建一个Pipeline,内部维护了ChannelHanlder的双向链表,读取数据时,会依次调用ChannelInboundHandler子类的channelRead()方法,例如:读取到客户端数据后,依次调用解码-业务逻辑-直到Tail。
而写入数据时,会从用户自定义的ChannelHandler出发查找ChannelOutboundHandler的子类,调用channelWrite(),最终由Head的write()向socket写入数据。例如:写入数据会通过业务逻辑的组装--编码--写入socket(Head的write)。
EventLoop与EventLoopGroup
EventLoop是事件循环,EventLoopGroup是运行在线程池中的事件循环组,Netty使用了Reactor模型,服务器的连接和读写放在线程池之上的事件循环中执行,这是Netty获得高性能的原因之一。事件循环内部会打开selector,并将Channel注册到事件循环中,事件循环不断的进行select()查找准备就绪的描述符;此外,某些系统任务也会被提交到事件循环组中运行。
ServerBootstrap
ServerBootstrap是辅助启动类,用于服务端的启动,内部维护了很多用于启动和建立连接的属性。包括:
- EventLoopGroup group 线程池组
- channel是通道
- channelFactory 通道工厂,用于创建channel
- localAddress 本地地址
- options 通道的选项,主要是TCP连接的属性
- attrs 用来设置channel的属性,
- handler 通道处理器
2.4 启动过程
了解了上面的概念后,我们再来根据程序说明一下服务器的启动过程,主要分为四个阶段:
- 配置config:设置启动器/服务器通道/客户端通道等相关配置;
- 初始化init:主要功能是打开java的serversocketchannel,内部会初始化Netty的Channel及其ChannelPipeline;
- 注册register:将初始化后的Netty-Channel注册到事件循环的selector上面。具体过程:将打开netty的Channel注册到线程池组的selector上;触发Pipeline上面ChannelHandler的channelRegistered,至此注册完毕;
- 绑定bind:将java的ServerSocketChannel绑定到本地的端口上面,结束后使用fireChannelActive通知Pipeline里的ChannelHandle,执行其channelActive方法;
由于注册阶段是异步的,绑定阶段会与之同时进行,因此注册阶段完毕后会判断绑定阶段是否结束从而触发channelActive。在启动完毕后,会建立下图中的连接结构:
Netty的Channel一端与java的Channel相连接,可以进行网络I/O操作;另一端与Pipeline连接,用来执行业务逻辑。一旦事件循环组中的EventLoop在循环中select()到准备就绪的I/O描述符后,就会交给NettyChannel处理,NettyChannel交给Pipeline的链表进行业务逻辑处理。