简介
Netty是由Jboss提供的一个异步的、基于事件驱动的Java网络应用框架,用来快速开发高性能、高可靠性的网络IO程序。
本质上是一个NIO框架,所以想要彻底理解Netty,需要先搞明白什么是NIO
应用场景
用于开发RPC框架
分布式系统中,各个节点之间需要远程服务调用,高性能的RPC框架是必不可少的,Netty作为异步高性能呢过的通信框架,往往作为基础通信组件整合到RPC框架中使用。例如阿里的Dubbo就是使用Netty作为基础通信组件。
游戏行业
Netty作为高性能的基础通信组件,提供了TCP/UDP和HTTP协议栈,方便定制和开发私有协议栈,可以用来开发账号登陆服务器、地图服务器之间的高性能通信。
大数据
Hadoop的高性能通信和序列化组件Avro的RPC框架,默认采用的就是Netty进行跨界点通信,它的Netty Service就是基于Netty框架二次封装实现。
原生NIO存在的问题
NIO的类库和API繁杂,使用麻烦,需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
需要熟悉Java多线程编程,因为NIO编程涉及到Reactor模式,你必须对多线程和网络编程非常熟悉,才能编写高质量的NIO程序。
开发工作量和难度都非常大,例如客户端断开重练、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等问题。
JDK NIO存在bug,例如Epoll Bug,会导致Selector空轮询,最终导致CPU100%,知道JDK1.7版本该问题仍旧存在,没有根本解决。
Nettty的优点
Netty对JDK自带的NIO的API进行封装,解决了上述问题。
设计优雅,适用于各种传输类型的统一API阻塞和非阻塞Socket,基于灵活且可扩展的事件模型,可以清晰地分离关注点,高度可定制的线程模型,单线程或一个或多个线程池。
高性能、高吞吐量、低延迟、减少资源消耗、最小化不必要的内存复制(零拷贝)
安全,完整SSL/TLS和StartTLS支持
社区活跃、发现Bug能及时修复,同时更多新功能会被加入
Netty版本
Netty版本分别是netty3.x、netty4.x、netty5.x
Netty5出现重大bug,已经被官网废弃,目前推荐使用Netty4.x稳定版本
Netty架构设计
不同的线程模型,对程序性能有很大影响,目前存在的线程模型有:传统的阻塞I/O服务模型和Reactor模式。Netty线程模型主要基于Reactor多线程模型做了一定的改进。
传统阻塞I/O模型
采用阻塞IO模式获取输入的数据,每个连接都需要独立的线程完成数据的输入、业务处理和数据返回。
问题:
并发量很大,会创建大量的线程,占用很大系统资源。
连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在read操作,造成线程资源浪费。
Reactor模式
Reactor模式基本设计思想就是I/O复用结合线程池
基于 I/O 复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接。当某个连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理。
基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。
根据Reactor的数量和处理资源线程的数量不同,Reactor模式可以分为3种不同的实现
单Reactor单线程
单Reactor多线程
主从Reactor多线程
- 单Reactor单线程
1.Reactor对象通过Select监控客户端请求,收到事件后通过Dispatch进行分发
2.如果是连接请求事件,则由Acceptor通过accept处理连接请求,然后创建一个Handler对象处理连接后的后续业务
3.如果是处理请求事件,则Reactor分发给对应的Handler来处理
4.Handler会完成read、业务处理、send等业务流程
总结:这种模式,服务端用一个线程通过多路复用搞定了所有的IO操作,代码简单,清晰明了,但是所有的事情都在一个线程上处理,无法发挥多核CPU的性能,Handler在处理某个连接上的业务时,整个进程就无法处理其他连接事件,很容易导致性能瓶颈,还有就是如果线程意外终止,就会进入死循环,会导致整个系统通信不可用。
- 单Reactor多线程
1.Reactor对象通过select监控客户端请求事件,收到事件后,通过dispatch进行转发
2.如果是连接请求事件,则由Acceptor通过accept处理连接请求,然后创建一个Handler对象处理连接后的后续业务
3.如果是处理请求事件,则Reactor分发给对应的Handler来处理
4.handler只负责响应事件,不做具体的业务处理,通过read读取数据,然后分发给worker线程池的某个线程处理业务
5.worker线程池会分配独立的线程完成业务处理,并将结果返回给handler
6.handler收到响应后,通过send将结果返回给客户端
总结:可以充分利用多核CPU的处理能力,但是多线程之间数据共享和访问比较复杂,Reactor处理所有的事件的监听和响应,在高并发场景下容易出现性能瓶颈。
- 主从Reactor多线程
1.Reactor主线程对象通过select监听连接事件,收到事件后,通过Acceptor处理连接事件
2.当Acceptor处理连接事件后,主Reactor将连接分配给子Reactor
3.子Reactor将连接加入到连接队列进行监听,并创建handler进行各种事件的处理
4.当有新事件,子Reactor会调用对应的handler进行处理
5.handler通过read读取数据,分发给后面的worker线程处理
6.worker线程池分配独立的worker线程进行业务处理,并返回结果
7.handler收到响应结果后,再通过send将结果返回给client
注:一个Reactor主线程可以对应多个Reactor子线程(这个在图上没有展示)。
总结:
优点,主线程和子线程职责明确,主线程只接收连接,子线程完成后续业务操作,主线程和子线程数据交互简单,主线程只需要把新连接传给子线程,子线程无需返回数据。
缺点,编程复杂度有点高。
这种模式在很多项目中广泛使用,例如Nginx主从Reactor多进程模型,Memcached主从多线程,Netty主从多线程模型。
Netty模型
1.Netty抽象出两个线程池BossGroup和WorkerGroup,BossGroup负责接收客户端的连接,WorkerGroup负责网络的读写,它们的类型都是NioEventLoopGroup。
2.NioEventLoopGroup相当于一个事件循环组,这个组中含有多个事件循环,每一个事件循环都是NioEventLoop
3.NioEventLoop表示一个不断循环的执行处理任务的线程,每个NioEventLoop都有一个selector,用于监听绑定在其上的socket的网络通讯
4.NioEventLoopGroup可以有多个线程,即可以有多个NioEventLoop
5.每个Boss NioEventLoop循环执行3步
①轮询accept事件
②处理accept事件,与client建立连接,生成NioSocketChannel,并将其注册到某个worker NioEventLoop上的selector
③处理任务队列中的任务,即runAllTasks
6.每个Worker NioEventLoop执行步骤
①轮询read、write事件
②处理I/O事件,即read、write事件,在对应的NioSocketChannel上处理
③处理任务队列的任务,即runAllTasks
7.每个Worker NioEventLoop处理业务时,会使用pipeline,pipeline中包含了Channel,通过pipeline可以获取到对应的通道,管道中维护了很多的处理器
理论知识就整理到这,下面来个入门案例。
入门案例
- 依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.37.Final</version>
</dependency>
- 服务端
public class NettyServer {
public static void main(String[] args) throws Exception {
/**
* 创建两个线程组bossGroup和workerGroup
* bossGroup只负责连接请求, workerGroup负责处理客户端业务
* 两个线程组包含的子线程个数: 默认是cpu核数 * 2
*/
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//服务器启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128) //设置线程队列得到的两个个数
.childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
//给pipeline设置处理器
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});
//绑定一个端口并且同步, 生成一个ChannelFuture对象
ChannelFuture cf = bootstrap.bind(10000).sync();
cf.channel().closeFuture().sync();
}finally {
//优雅的关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* @param ctx 上下文对象
* @param msg 客户端发送的数据, 默认为Object
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务器线程名: "+Thread.currentThread().getName());
System.out.println("ctx: "+ctx);
ByteBuf buf = (ByteBuf)msg;
System.out.println("客户端消息: "+buf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址: "+ctx.channel().remoteAddress());
}
/**
* 数据读取完毕
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//将数据写入缓存并刷新
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端!!!", CharsetUtil.UTF_8));
}
/**
* 处理异常, 一般是需要关闭通道
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
- 客户端
public class NettyClient {
public static void main(String[] args) throws Exception{
//客户端一个事件循环组就可以了
EventLoopGroup group = new NioEventLoopGroup();
try {
//启动类
Bootstrap bootstrap = new Bootstrap();
//设置启动参数
bootstrap.group(group)
.channel(NioSocketChannel.class) //设置线程组
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyClientHandler());//加入自己的处理器
}
});
//启动客户端连接服务器端
ChannelFuture cf = bootstrap.connect("127.0.0.1", 10000).sync();
cf.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 通道就绪触发该方法
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("hello server", CharsetUtil.UTF_8));
}
/**
* 当通道有读取事件时, 会触发
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf)msg;
System.out.println("服务器:" + ctx.channel().remoteAddress() + " "+ buf.toString(CharsetUtil.UTF_8));
}
/**
* 异常处理
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
- 测试
分别启动服务端和客户端
# 服务端输出
服务器线程名: nioEventLoopGroup-3-1
ctx: ChannelHandlerContext(NettyServerHandler#0, [id: 0x2f14e16a, L:/127.0.0.1:10000 - R:/127.0.0.1:55613])
客户端消息: hello server
客户端地址: /127.0.0.1:55613
# 客户端输出
服务器:/127.0.0.1:10000 hello, 客户端!!!