一. 为什么需要
解决多请求问题,但是这些请求不需要一直占有整个线程资源(比如IO操作时不必一直等待),所以不适合使用一个请求分配一个线程的多线程方案;类似于消息队列模型,但是是事件驱动,没有Queue来做缓冲;优点:解耦、高效、提高复用,缺点:需要操作系统底层支持、内部回调复杂。
二. 预备知识
IO操作主要分成两部分:
- 数据准备,将数据从磁盘加载到内核缓存
- 将数据从内核缓存加载到用户缓存
2.1 IO的4种模型
- 阻塞、非阻塞(等待数据全部读取成功再返回,还是读取为空马上返回然后下次再读)
- 同步、异步(用户缓存主动去读取内核缓存,还是内核缓存读取磁盘成功后通知用户缓存)
- NIO是同步非阻塞模型,也是IO多路复用基础
- Reactor模式基于同步I/O,Proactor模式基于异步I/O
2.2 IO多路复用
区别于传统的多进程并发模型 (每有新的IO流就分配一个新的进程管理),IO多路复用仅使用单个线程,通过记录跟踪每个I/O流的状态来同时管理多个I/O流(哪个IO流ready线程就处理哪个)
select, poll, epoll 都是I/O多路复用的具体的实现:
select:仅返回有无事件不返回具体事件Id,只能监控1024个连接,线程不安全
poll:连接数无限制
epoll:返回具体事件Id,线程安全
三. 反应器模式
处理一个或多个客户端并发请求服务的事件设计模式。当请求抵达后,服务处理程序使用I/O多路复用策略,然后同步地派发这些请求至相关的请求处理程序。
3.1 模块组成
包括5个模块:
- Handle:事件(网络编程中就是一个Socket,数据库操作中就是一个DBConnection,Java NIO中的Channel)
- EventHandler:事件处理器,用于处理不同状态的事件
- Concrete Event Handler:事件处理器的具体实现,实现了事件处理器所提供的各种回调方法,从而实现特定于业务的逻辑
- Synchronous Event Demultiplexer:用于等待事件的发生,调用方在调用它的时候会被阻塞,一直阻塞到同步事件分离器上有事件产生为止(NIO中对应Selector,当Selector.select()返回时说明有事件发生,然后调用Selector的selectedKeys()方法获取Set<SelectionKey>,一个SelectionKey表示一个有事件发生的Channel以及该Channel上的事件类型)
- Initiation Dispatcher:用于管理EventHandler、分发event。通过Synchronous Event Demultiplexer来等待事件的发生,一旦事件发生,Initiation Dispatcher首先会分离出每一个事件,然后调用事件处理器,最后调用相关的回调方法来处理这些事件
3.2 运行流程
- 初始化dispatcher,注册具体事件处理器到分发器(即指定什么事件触发什么事件处理器)
- 注册完毕后,分发器调用handle_events方法启动事件循环,并启动Synchronous Event Demultiplexer等待事件发生(阻塞等待)
- 当有事件发生,即某个Handle变为ready状态(如TCP socket变为等待读状态),Synchronous Event Demultiplexer就会通知Initiation Dispatcher
- Initiation Dispatcher根据发生的事件,将被事件源激活的Handle作为『key』来寻找并分发恰当的事件处理器回调方法
3.3 具体模型分类
- 单线程模型(I/O、非I/O业务操作都在一个线程上处理,可能会大大延迟I/O请求的响应)
- 工作站线程池模型(非I/O操作从Reactor线程中移出转交给工作者线程池执行)
- 多线程模型(mainReactor线程主要负责接收客户端的连接请求,然后将接收到的SocketChannel传递给subReactor,由subReactor来完成和客户端的通信),但是注意subReactor线程只负责完成I/O的read()或者write()操作,在读取到数据后业务逻辑的处理仍然放入到工作者线程池中完成,可避免因为read()数据量太大而导致后面的客户端连接请求得不到即时处理的情况
四. 源码分析
高性能NIO框架netty、腾讯开源RPC框架Tars的NIO模型都是很典型的Reactor设计模式,下面以Tars源码来分析Reactor模式的java NIO实现(仅展示关键实现)。
package com.qq.tars.net.core.nio;
4.1 Reactor
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SelectableChannel;
public final class Reactor extends Thread {
protected volatile Selector selector = null;
private Acceptor acceptor = null;
//启动
public Reactor(SelectorManager selectorManager) throws IOException {
this.acceptor = new TCPAcceptor(selectorManager);
this.selector = Selector.open();
}
//注册
public void registerChannel(SelectableChannel channel, int ops, Object attachment) throws IOException {
SelectionKey key = channel.register(this.selector, ops, attachment);
}
//循环事件
public void run() {
for (;;) {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
dispatchEvent(key);
}
}
}
//处理事件
private void dispatchEvent(final SelectionKey key) throws IOException {
if (key.isConnectable()) {
acceptor.handleConnectEvent(key);
} else if (key.isAcceptable()) {
acceptor.handleAcceptEvent(key);
} else if (key.isReadable()) {
acceptor.handleReadEvent(key);
} else if (key.isValid() && key.isWritable()) {
acceptor.handleWriteEvent(key);
}
}
}
4.2 TCPAcceptor
处理不同事件,以处理connect、read事件为例:
public void handleConnectEvent(SelectionKey key) throws IOException {
//1. Get the client channel
SocketChannel client = (SocketChannel) key.channel();
//2. Set the session status
TCPSession session = (TCPSession) key.attachment();
if (session == null) throw new RuntimeException("The session is null when connecting to ...");
//3. Connect to server
try {
client.finishConnect();
key.interestOps(SelectionKey.OP_READ);
session.setStatus(SessionStatus.CLIENT_CONNECTED);
} finally {
session.finishConnect();
}
}
public void handleReadEvent(SelectionKey key) throws IOException {
TCPSession session = (TCPSession) key.attachment();
if (session == null) throw new RuntimeException("The session is null when reading data...");
session.read();
}
4.3 TCPSession
以read事件的readResponse方法为例:
//放入工作线程池
response = selectorManager.getProtocolFactory().getDecoder().decodeResponse(tempBuffer, this);
selectorManager.getThreadPool().execute(new WorkThread(response, selectorManager));
4.4 工作线程池
SelectorManager提供线程池,WorkThread具体进行业务处理