IO基本概念
Linux的内核将所有外部设备都可以看做一个文件来操作,而对一个文件的读写都是通过内核提供的系统调用,内核给我们返回一个文件描述符file descriptor,文件描述符是一个数组索引,指向内核维护的文件表格,应用程序对文件的读写就是通过文件描述符的操作完成。
那么我们对与外部设备的操作都可以看做对文件进行操作。而对一个socket的读写也会有相应的描述符,称为socketfd(socket描述符)。
linux将内存分为内核区,用户区。linux内核给我们管理所有的硬件资源,应用程序通过调用系统调用和内核交互,达到使用硬件资源的目的。应用程序通过系统调用read发起一个读操作,这时候内核创建一个文件描述符,并通过驱动程序向硬件发送读指令,并将读的的数据放在这个描述符对应结构体的内核缓存区中,然后再把这个数据读到用户进程空间中,这样完成了一次读操作;但是大家都知道I/O设备相比cpu的速度是极慢的。linux提供的read系统调用,也是一个阻塞函数。这样我们的应用进程在发起read系统调用时,就必须阻塞,就进程被挂起而等待文件描述符的读就绪,那么什么是文件描述符读就绪,什么是写就绪?
读就绪:就是这个文件描述符的接收缓冲区中的数据字节数大于等于套接字接收缓冲区低水位标记的当前大小;
写就绪:该描述符发送缓冲区的可用空间字节数大于等于描述符发送缓冲区低水位标记的当前大小。(如果是socket fd,说明上一个数据已经发送完成)。
接收低水位标记和发送低水位标记:由应用程序指定,比如应用程序指定接收低水位为64个字节。那么接收缓冲区有64个字节,才算fd读就绪;
综上所述,一个基本的IO,它会涉及到两个系统对象,一个是调用这个IO的进程对象,另一个就是系统内核,当一个read操作发生时,它会经历如下阶段:
- 进程对象通过read系统调用向内核发起读请求
- 内核向硬件发送读指令,并等待读就绪。
- 内核把将要读取的数据复制到内核缓冲区
- 数据从内核缓冲区拷贝到用户进程空间中
同步与异步
所谓同步就是发出一个调用后,在没有得到结果之前该调用就不返回,就是调用者主动等待这个调用的结果。 异步则相反,调用发出后,这个调用就直接返回了,调用者不会立刻得到结果,而是在调用发出后,被调用者通过回调函数等方式来告知调用者。
举个通俗的例子:
你打电话问书店老板有没有《分布式系统》这本书,如果是同步通信机制,书店老板会说,你稍等,”我查一下”,然后开始查啊查,等查好了(可能是5秒,也可能是一天)告诉你结果(返回结果)。
而异步通信机制,书店老板直接告诉你我查一下啊,查好了打电话给你,然后直接挂电话了(不返回结果)。然后查好了,他会主动打电话给你。在这里老板通过“回电”这种方式来回调。
本质上,访问数据的方式,同步需要当前线程读写数据,在读写数据的过程中数据可能还没ready,可能会阻塞,而异步io则是操作系统等数据ready之后会通知进程数据好了,可以直接读了
阻塞io模型:
在缺省情形下,所有文件操作都是阻塞的,在进程空间中调用recvfrom,其系统调用直到数据报到达且被拷贝到应用进程的缓冲区中或者发生错误才返回,期间一直在等待。我们就说进程在从调用recvfrom开始到它返回的整段时间内是被阻塞的。
当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段,准备数据,对于network IO, 很多时候数据在一开始还没有到达,比如还没有收到一个完整的UDP包,这个时候kernel就要等待足够的数据到来。而用户进程这边,整个进程会被阻塞,当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block状态,重新运行起来。
非阻塞IO模型
进程把一个套接口设置为非阻塞是在通知内核:当所请求的IO操作不能满足要求时,不把本进程投入睡眠,而是返回一个错误。也就是说当数据没有到达时并不等待,而是以一个错误返回。
从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作,一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存,然后返回。所以,用户进程其实是需要不断的主动询问kernel数据好了没有。
IO复用模型
linux提供select/poll,进程通过将一个或多个fd传递给select或poll系统调用,select/poll会不断轮询所负责的所有socket,可以侦测许多fd是否就绪,但select和poll是顺序扫描fd是否就绪,并且支持的fd数量有限。linux还提供了epoll系统调用,它是基于事件驱动的方式,而不是顺序扫描,当某个socket有数据到达了,可以直接通知用户进程,而不需要顺序轮询扫描,提高了效率。
当进程调用了select,整个进程会被block,同时,kernel会监视所有select负责的socket,当任何一个socket的数据准备好了,select就会返回,这个图和阻塞IO的图其实并没有多大区别,事实上,还更差一点,因为这里需要使用两个System call,select和recvFrom,而blocking io只调用了一个system call(recvfrom),但是select的好处在与它可以同时处理多个connection,(如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。)
信号驱动异步IO模型
首先开启套接口信号驱动I/O功能, 并通过系统调用sigaction安装一个信号处理函数(此系统调用立即返回,进程继续工作,它是非阻塞的)。当数据报准备好被读时,就为该进程生成一个SIGIO信号。随即可以在信号处理程序中调用recvfrom来读数据报,井通知主循环数据已准备好被处理中。也可以通知主循环,让它来读数据报。
异步I/O模型
告知内核启动某个操作,并让内核在整个操作完成后(包括将数据从内核拷贝到用户自己的缓冲区)通知用户进程,这种模型和信号驱动模型的主要区别是:信号驱动IO:由内核通知我们何时可以启动一个IO操作,异步IO模型:由内核通知我们IO操作何时完成
用户进程发起read操作之后,立刻就可以开始去做其他的事了,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,不会对用户进程产生任何block,然后,kernel会等待数据准备完成,然后再将数据拷贝到用户进程内存,当着一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作已经完成。
小结 前面几种都是同步IO,在内核数据copy到用户空间都是阻塞的。最后一种是异步IO,通过API把IO操作交给操作系统处理,当前进程不关心具体IO的实现,通过回调函数或者信号量通知当前进程直接对IO返回结果进行处理。一个IO操作其实分成了两个步骤,发起IO请求和实际的IO操作,同步IO和异步IO的区别就在于第二步是否阻塞,如果实际的IO读写阻塞请求进程,那就是同步IO,因此前四种都是同步IO,如果不阻塞,而是操作系统帮你做完IO操作再将结果返回给你,那就是异步IO。阻塞IO和非阻塞IO的区别在于第一步,发起IO请求是否会被阻塞,如果阻塞直到完成那么就是传统的阻塞IO,如果不阻塞,那就是非阻塞IO.
举个简单例子来说明:
有A,B,C,D四个人在钓鱼:
A用的是最老式的鱼竿,所以呢,得一直守着,等到鱼上钩了再拉杆;
B的鱼竿有个功能,能够显示是否有鱼上钩,所以呢,B没事就睡觉,隔会再看看有没有鱼上钩,有的话就迅速拉杆;
C用的鱼竿和B差不多,但他想了一个好办法,就是同时放好几根鱼竿,然后守在旁边,一旦有显示说鱼上钩了,它就将对应的鱼竿拉起来;
D是个有钱人,干脆雇了一个人帮他钓鱼,一旦那个人把鱼钓上来了,就给D发个短信。
A: 阻塞IO, B: 非阻塞IO: C: NIO D: AIO
AIO BIO NIO
- AIO异步非阻塞IO,AIO方式适用于连接数目多且连接比较长的架构,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持。
- NIO同步非阻塞IO,适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂,JDK1.4开始支持。
- BIO同步阻塞IO,适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序直观简单易理解。
Java对BIO、NIO、AIO的支持:
Java BIO: 同步并阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制改善
JAVA NIO: 同步非阻塞,服务器实现模式为一个请求一个线程,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有IO请求才启动一个线程进行处理。
Java AIO: 异步非阻塞,服务器实现模式为一个有效请求一个线程,客户端的IO请求都是由OS先完成了再通知服务器应用去启动线程进行处理。
Selector
Nio中的selector具体是一个什么样的东西?想想一个场景:在一个养鸡场,有这么一个人,每天的工作就是不停检查几个特殊的鸡笼,如果有鸡进来,有鸡出去,有鸡生蛋,有鸡生病等等,就把相应的情况记录下来,如果鸡场的负责人想知道情况,只需要询问那个人即可。在这里,这个人就相当Selector,每个鸡笼相当于一个SocketChannel,每个线程通过一个Selector管理多个SocketChannel,
为了实现Selector管理多个SocketChannel,必须将具体得socketChannel对象注册到Selector,并声明需要监听的事件,一共有四种事件:
1、connect:客户端连接服务端事件,对应值为SelectionKey.OPCONNECT(8)
2、accept:服务端接收客户端连接事件,对应值为SelectionKey.OPACCEPT(16)
3、read:读事件,对应值为SelectionKey.OPREAD(1)
4、write:写事件,对应值为SelectionKey.OPWRITE(4)
当SocketChannel有对应的事件发生时,Selector都可以观察到,并进行处理。
服务端代码
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(port));
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while(true){
int n = selector.select();
if (n == 0) continue;
Iterator ite = this.selector.selectedKeys().iterator();
while(ite.hasNext()){
SelectionKey key = (SelectionKey)ite.next();
if (key.isAcceptable()){
SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept();
clntChan.configureBlocking(false);
//将选择器注册到连接到的客户端信道,
//并指定该信道key值的属性为OP_READ,
//同时为该信道指定关联的附件
clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufSize));
}
if (key.isReadable()){
handleRead(key);
}
if (key.isWritable() && key.isValid()){
handleWrite(key);
}
if (key.isConnectable()){
System.out.println("isConnectable = true");
}
ite.remove();
}
}
服务端操作过程:
- 创建ServerSocketChannel实例,并绑定指定端口
- 创建selector实例
- 将serverSocketChannel注册到selector,并指定事件OP_ACCEPT,最底层的socket通过channel和selector建立关联
- 如果没有准备好的socket,select方法会被阻塞一段时间
- 如果底层有socket已经准备好,selector的select方法会返回socket的个数,而且selectedKeys方法会返回socket对应的事件(connect, accept、read、write);
- 根据事件类型,进行不同的逻辑处理
在步骤3中,selector只注册了serverSocketChannel的OP_ACCEPT事件,如果有客户端A连接服务,执行select方法时,可以通过serverSocketChannel获取客户端A的socketChannel,并在selector上注册socketChannel的OP_READ事件。如果客户端A发送数据,会触发read事件,下次轮询调用select方法时,就能通过socketChannel读取数据,同时在selector上注册该socketChannel的OP_WRITE事件,实现服务器往客户端写数据。
Selector实现原理
下载openjdk源码,我们一探究竟,
比较清晰得看到,openjdk中Selector的实现是SelectorImpl,
然后SelectorImpl又将职责委托给了具体的平台,比如图中框出的linux2.6以后才有的EpollSelectorImpl, Windows平台则是WindowsSelectorImpl, MacOSX平台是KQueueSelectorImpl. 从名字也可以猜到,openjdk肯定在底层还是用epoll,kqueue,iocp这些技术来实现的I/O多路复用
获取selector
众所周知,Selector.open()可以得到一个Selector实例,怎么实现的呢?
// Selector.java
public static Selector open() throws IOException {
// 首先找到provider,然后再打开Selector
return SelectorProvider.provider().openSelector();
}
// java.nio.channels.spi.SelectorProvider
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
// 这里就是打开Selector的真正方法
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
在openjdk中,每个操作系统都有一个sun.nio.ch.DefaultSelectorProvider实现,如果系统是Linux的话,真正创建的是sun.nio.ch.EPollSelectorProvider,如果是macos,真正创建的是 KQueueSelectorProvider,我们以linux的EpollSelector为例说明:
public class EPollSelectorProvider
extends SelectorProviderImpl
{
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}
}
很直观,这样我们在Linux平台就得到了最终的Selector实现:sun.nio.ch.EPollSelectorImpl
EPollSelector如何进行select
epoll原理
epoll是Linux下的一种IO多路复用技术,可以非常高效的处理数以百万计的socket句柄。
三个epoll相关的系统调用:
int epoll_create(int size)
epoll_create建立一个epoll对象。参数size是内核保证能够正确处理的最大句柄数,多于这个最大数时内核可不保证效果。int epoll_ctl(int epfd, int op, int fd, struct epoll_event event)
epoll_ctl可以操作epoll_create创建的epoll,如将socket句柄加入到epoll中让其监控,或把epoll正在监控的某个socket句柄移出epoll。int epoll_wait(int epfd, struct epoll_event events,int maxevents, int timeout)
epoll_wait在调用时,在给定的timeout时间内,所监控的句柄中有事件发生时,就返回用户态的进程。
epoll内部实现大概如下:
- epoll初始化时,会向内核注册一个文件系统,用于存储被监控的句柄文件,调用epoll_create时,会在这个文件系统中创建一个file节点。同时epoll会开辟自己的内核高速缓存区,以红黑树的结构保存句柄,以支持快速的查找、插入、删除。还会再建立一个list链表,用于存储准备就绪的事件。
- 当执行epoll_ctl时,除了把socket句柄放到epoll文件系统里file对象对应的红黑树上之外,还会给内核中断处理程序注册一个回调函数,告诉内核,如果这个句柄的中断到了,就把它放到准备就绪list链表里。所以,当一个socket上有数据到了,内核在把网卡上的数据copy到内核中后,就把socket插入到就绪链表里。
- 当epoll_wait调用时,仅仅观察就绪链表里有没有数据,如果有数据就返回,否则就sleep,超时时立刻返回。
Epoll fd的创建
EPollSelectorImpl的构造器代码如下:
EPollSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
this.epfd = EPoll.create();
this.pollArrayAddress = EPoll.allocatePollArray(NUM_EPOLLEVENTS);
try {
// makePipe返回管道的2个文件描述符,编码在一个long类型的变量中
// 高32位代表读 低32位代表写
// 使用pipe为了实现Selector的wakeup逻辑
long fds = IOUtil.makePipe(false);
this.fd0 = (int) (fds >>> 32);
this.fd1 = (int) fds;
} catch (IOException ioe) {
EPoll.freePollArray(pollArrayAddress);
FileDispatcherImpl.closeIntFD(epfd);
throw ioe;
}
// register one end of the socket pair for wakeups
EPoll.ctl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
}
}
static native int create() throws IOException; // epollCreate方法,这是个native方法。
在Epoll.c中可以看到:
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPoll_create(JNIEnv *env, jclass clazz) {
/* size hint not used in modern kernels */
int epfd = epoll_create(256);
if (epfd < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");
}
return epfd;
}
可以看到最后还是使用了操作系统的api: epoll_create函数
Epoll wait等待内核IO事件
调用Selector.select(),
@Override
public final int select(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
return lockAndDoSelect(null, (timeout == 0) ? -1 : timeout);
}
private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout)
throws IOException
{
synchronized (this) {
ensureOpen();
if (inSelect)
throw new IllegalStateException("select in progress");
inSelect = true;
try {
synchronized (publicSelectedKeys) {
return doSelect(action, timeout);
}
} finally {
inSelect = false;
}
}
}
protected int doSelect(Consumer<SelectionKey> action, long timeout)
throws IOException
{
assert Thread.holdsLock(this);
// epoll_wait timeout is int
int to = (int) Math.min(timeout, Integer.MAX_VALUE);
boolean blocking = (to != 0);
boolean timedPoll = (to > 0);
int numEntries;
processUpdateQueue();
processDeregisterQueue();
try {
begin(blocking);
do {
long startTime = timedPoll ? System.nanoTime() : 0;
//
numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to);
if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
// timed poll interrupted so need to adjust timeout
long adjust = System.nanoTime() - startTime;
to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
if (to <= 0) {
// timeout expired so no retry
numEntries = 0;
}
}
} while (numEntries == IOStatus.INTERRUPTED);
assert IOStatus.check(numEntries);
} finally {
end(blocking);
}
processDeregisterQueue();
return processEvents(numEntries, action);
}
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPoll_wait(JNIEnv *env, jclass clazz, jint epfd,
jlong address, jint numfds, jint timeout)
{
struct epoll_event *events = jlong_to_ptr(address);
// 发起epoll_wait系统调用等待内核事件
int res = epoll_wait(epfd, events, numfds, timeout);
if (res < 0) {
if (errno == EINTR) {
return IOS_INTERRUPTED;
} else {
JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
return IOS_THROWN;
}
}
return res;
}
可以看到,最后还是发起的epoll_wait系统调用.
epoll control以及openjdk对事件管理的封装
JDK中对于注册到Selector上的IO事件关系是使用SelectionKey来表示,代表了Channel感兴趣的事件,如Read,Write,Connect,Accept.
调用Selector.register()时均会将事件存储到EpollArrayWrapper的成员变量eventsLow和eventsHigh中
// events for file descriptors with registration changes pending, indexed
// by file descriptor and stored as bytes for efficiency reasons. For
// file descriptors higher than MAX_UPDATE_ARRAY_SIZE (unlimited case at
// least) then the update is stored in a map.
// 使用数组保存事件变更, 数组的最大长度是MAX_UPDATE_ARRAY_SIZE, 最大64*1024
private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE];
// 超过数组长度的事件会缓存到这个map中,等待下次处理
private Map<Integer,Byte> eventsHigh;
/**
* Sets the pending update events for the given file descriptor. This
* method has no effect if the update events is already set to KILLED,
* unless {@code force} is {@code true}.
*/
private void setUpdateEvents(int fd, byte events, boolean force) {
// 判断fd和数组长度
if (fd < MAX_UPDATE_ARRAY_SIZE) {
if ((eventsLow[fd] != KILLED) || force) {
eventsLow[fd] = events;
}
} else {
Integer key = Integer.valueOf(fd);
if (!isEventsHighKilled(key) || force) {
eventsHigh.put(key, Byte.valueOf(events));
}
}
}
在EpollArrayWrapper.poll()的时候, 首先会调用updateRegistrations,
/**
* Returns the pending update events for the given file descriptor.
*/
private byte getUpdateEvents(int fd) {
if (fd < MAX_UPDATE_ARRAY_SIZE) {
return eventsLow[fd];
} else {
Byte result = eventsHigh.get(Integer.valueOf(fd));
// result should never be null
return result.byteValue();
}
}
/**
* Update the pending registrations.
*/
private void updateRegistrations() {
synchronized (updateLock) {
int j = 0;
while (j < updateCount) {
int fd = updateDescriptors[j];
// 从保存的eventsLow和eventsHigh里取出事件
short events = getUpdateEvents(fd);
boolean isRegistered = registered.get(fd);
int opcode = 0;
if (events != KILLED) {
// 判断操作类型以传给epoll_ctl
// 没有指定EPOLLET事件类型
if (isRegistered) {
opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
} else {
opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
}
if (opcode != 0) {
// 熟悉的epoll_ctl
epollCtl(epfd, opcode, fd, events);
if (opcode == EPOLL_CTL_ADD) {
registered.set(fd);
} else if (opcode == EPOLL_CTL_DEL) {
registered.clear(fd);
}
}
}
j++;
}
updateCount = 0;
}
}
private native void epollCtl(int epfd, int opcode, int fd, int events);
JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,
jint opcode, jint fd, jint events)
{
struct epoll_event event;
int res;
event.events = events;
event.data.fd = fd;
// 发起epoll_ctl调用来进行IO事件的管理
RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);
/*
* A channel may be registered with several Selectors. When each Selector
* is polled a EPOLL_CTL_DEL op will be inserted into its pending update
* list to remove the file descriptor from epoll. The "last" Selector will
* close the file descriptor which automatically unregisters it from each
* epoll descriptor. To avoid costly synchronization between Selectors we
* allow pending updates to be processed, ignoring errors. The errors are
* harmless as the last update for the file descriptor is guaranteed to
* be EPOLL_CTL_DEL.
*/
if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");
}
}
简而言之,register最后还是使用的epoll_ctl
有个小细节是jdk没有指定ET(边缘触发)还是LT(水平触发),所以默认会用LT.
水平触发(level-trggered)
只要文件描述符关联的读内核缓冲区非空,有数据可以读取,就一直发出可读信号进行通知,
当文件描述符关联的内核写缓冲区不满,有空间可以写入,就一直发出可写信号进行通知
LT模式支持阻塞和非阻塞两种方式。epoll默认的模式是LT。
边缘触发(edge-triggered)
当文件描述符关联的读内核缓冲区由空转化为非空的时候,则发出可读信号进行通知,
当文件描述符关联的内核写缓冲区由满转化为不满的时候,则发出可写信号进行通知
两者的区别在哪里呢?水平触发是只要读缓冲区有数据,就会一直触发可读信号,而边缘触发仅仅在空变为非空的时候通知一次.
LT(level triggered)是缺省的工作方式,并且同时支持block和no-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表.
水平触发和边缘触发模式区别
读缓冲区刚开始是空的
读缓冲区写入2KB数据
水平触发和边缘触发模式此时都会发出可读信号
收到信号通知后,读取了1kb的数据,读缓冲区还剩余1KB数据
水平触发会再次进行通知,而边缘触发不会再进行通知
所以,边缘触发需要一次性的把缓冲区的数据读完为止,也就是一直读,直到读到EGAIN为止,EGAIN说明缓冲区已经空了,因为这一点,边缘触发需要设置文件句柄为非阻塞
//水平触发
ret = read(fd, buf, sizeof(buf));
//边缘触发
while(true) {
ret = read(fd, buf, sizeof(buf);
if (ret == EAGAIN) break;
}
在AbstractSelectorImpl中有3个set保存事件
// Public views of the key sets
// 注册的所有事件
private Set<SelectionKey> publicKeys; // Immutable
// 内核返回的IO事件封装,表示哪些fd有数据可读可写
private Set<SelectionKey> publicSelectedKeys; // Removal allowed, but not addition
// 取消的事件
private final Set<SelectionKey> cancelledKeys = new HashSet<SelectionKey>();
在EpollArrayWrapper.poll调用完成之后, 会调用updateSelectedKeys来更新上面的三set
private int updateSelectedKeys() {
int entries = pollWrapper.updated;
int numKeysUpdated = 0;
for (int i=0; i<entries; i++) {
int nextFD = pollWrapper.getDescriptor(i);
SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
// ski is null in the case of an interrupt
if (ski != null) {
int rOps = pollWrapper.getEventOps(i);
if (selectedKeys.contains(ski)) {
if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
numKeysUpdated++;
}
} else {
ski.channel.translateAndSetReadyOps(rOps, ski);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
selectedKeys.add(ski);
numKeysUpdated++;
}
}
}
}
return numKeysUpdated;
}
jdk中Selector是对操作系统的IO多路复用调用的一个封装,在Linux中就是对epoll的封装。epoll实质上是将event loop交给了内核,因为网络数据都是首先到内核的,直接内核处理可以避免无谓的系统调用和数据拷贝, 性能是最好的。jdk中对IO事件的封装是SelectionKey, 保存Channel关心的事件。
简单说,nio是依赖操作系统的实现,java并不能一个线程同时监听多个socket,
,在早期的JDK1.4和1.5 update10版本之前,Selector基于select/poll模型实现,是基于IO复用技术的非阻塞IO。在JDK1.5 update10和linux core2.6以上版本,sun优化了Selctor的实现,底层使用epoll替换了select/poll。