首先给大家分享一个Linux下的OpenJDK1.8源码,这个源码里包含了sun包源码 ,自己要去找挺难找的,下面的源码分析就用到了。
OpenJDK1.8 提取码:xbae
一、NIO简介
Java NIO主要由三个部分组成,Channel、Buffer和Selector,Channel。
借用赛哥的一句话:NIO的本质模型就是等待消息的到来,处理到来的消息。
等待消息的到来这一部分由Selector去监听。
处理到来的消息由Channel和Buffer处理,通道(Channel)类似于Java中的流(IO Stream),但是通道是双向的(流是单向),通道中的数据必须先读入到Buffer中,在从Buffer中进行读取,或者先把数据写入到Buffer中,在把Buffer中的 数据写入到通道。
二、Channel
Java NIO中提供以下4种Channel:
FileChannel:从文件中读写数据
DatagramChannel:通过UDP协议读写网络中的数据
SocketChannel:通过TCP协议读写网络中的数据
ServerSocketChannel:在服务器端可以监听新进来的TCP连接,像WEB服务器那样,对每一个新进来的请求创建一个SocketChannel
三、Buffer
Java NIO 有以下Buffer类型
ByteBuffer
MappedByteBuffer
CharBuffer
DoubleBuffer
FloatBuffer
IntBuffer
LongBuffer
ShortBuffer
我们主要是通过对Buffer进行读写操作,将数据写入Channel中。通过研究Buffer源码可以发现,Buffer其实是数组,有以下几个属性。
索引 | 说明 |
---|---|
capacity | 缓冲区数组的总长度 |
position | 下一个要操作的数据元素的位置 |
limit | 缓冲区数组中不可操作的下一个元素的位置:limit<=capacity |
mark | 用于记录当前position的前一个位置或者默认是-1 |
Buffer的设计非常简单,通过以上几个简单的属性就可以完成读写操作,当然,简单带来坏处就是使用起来有点麻烦。不过熟练后还是使用起来还是很简单的。
我们看一个非常简单的例子:
初始状态的一个ByteBuffer(总长度为10):
向Buffer中写入5个字节:
读取的时候 ,调用Buffer.filp(),此时postion变为0,limit变为 5,也就是能从0读取到4:
Buffer.compact()方法将所有未读的数据拷贝到Buffer起始处。然后将position设到最后一个未读元素正后面。
Buffer.rewind()方法将position设回0
Buffer.mark()方法,可以标记Buffer中的一个特定的position,之后可以通过调用Buffer.reset()方法恢复到这个position
Buffer.rewind()方法将position设回0,所以你可以重读Buffer中的所有数据。limit保持不变,仍然表示能从Buffer中读取多少个元素。
四、NIO的一个小Demo
首先是服务端的代码:
public class ServerConnect {
private static final int BUF_SIZE = 1024;
private static final int PORT = 8080;
private static final int TIMEOUT = 3000;
public static void selector() {
Selector selector = null;
ServerSocketChannel ssc = null;
try {
// 打开一个Slectore
selector = Selector.open();
// 打开一个Channel
ssc = ServerSocketChannel.open();
// 将Channel绑定端口
ssc.socket().bind(new InetSocketAddress(PORT));
// 设置Channel为非阻塞,如果设置为阻塞,其实和BIO差不多了。
ssc.configureBlocking(false);
// 向selector中注册Channel和感兴趣的事件
ssc.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// selector监听事件,select会被阻塞,直到selector监听的channel中有事件发生或者超时,会返回一个事件数量
//TIMEOUT就是超时时间,selector初始化的时候会添加一个用于主动唤醒的pipe,待会源码分析会说
if (selector.select(TIMEOUT) == 0) {
System.out.println("==");
continue;
}
/**
* SelectionKey的组成是selector和Channel
* 有事件发生的channel会被包装成selectionKey添加到selector的publicSelectedKeys属性中
* publicSelectedKeys是SelectionKey的Set集合
*下面这一部分遍历,就是遍历有事件的channel
*/
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isAcceptable()) {
handleAccept(key);
}
if (key.isReadable()) {
handleRead(key);
}
if (key.isWritable() && key.isValid()) {
handleWrite(key);
}
if (key.isConnectable()) {
System.out.println("isConnectable = true");
}
//每次使用完,必须将该SelectionKey移除,否则会一直存储在publicSelectedKeys中
//下一次遍历又会重复处理
iter.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (selector != null) {
selector.close();
}
if (ssc != null) {
ssc.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void handleAccept(SelectionKey key) throws IOException {
ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel();
SocketChannel sc = ssChannel.accept();
sc.configureBlocking(false);
sc.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocateDirect(BUF_SIZE));
}
public static void handleRead(SelectionKey key) throws IOException {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buf = (ByteBuffer) key.attachment();
long bytesRead = sc.read(buf);
while (bytesRead > 0) {
buf.flip();
while (buf.hasRemaining()) {
System.out.print((char) buf.get());
}
System.out.println();
buf.clear();
bytesRead = sc.read(buf);
}
if (bytesRead == -1) {
sc.close();
}
}
public static void handleWrite(SelectionKey key) throws IOException {
ByteBuffer buf = (ByteBuffer) key.attachment();
buf.flip();
SocketChannel sc = (SocketChannel) key.channel();
while (buf.hasRemaining()) {
sc.write(buf);
}
buf.compact();
}
}
客户端代码:
public class Client {
public static void client() {
// 申请一块空间
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel socketChannel = null;
Thread.currentThread().setName("client");
try {
// 打开一个Channel
socketChannel = SocketChannel.open();
//设置为非阻塞
socketChannel.configureBlocking(false);
//连接IP和端口号
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
if (socketChannel.finishConnect()) {
int i = 0;
while (true) {
// 为了不让消息发送太快,每发一条睡1s
TimeUnit.SECONDS.sleep(1);
String info = Thread.currentThread().getName()+":I'm " + i++ + "-th information from client";
//清空Buffer
buffer.clear();
//写入到Buffer中
buffer.put(info.getBytes());
//进行flip操作,为了下面可以将buffer中数据读取到channel中。
buffer.flip();
// 将buffer中的数据写入到channel中
while (buffer.hasRemaining()) {
System.out.println(Thread.currentThread().getName()+":"+buffer);
int write = socketChannel.write(buffer);
System.out.println(Thread.currentThread().getName()+":"+write);
}
}
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
} finally {
try {
if (socketChannel != null) {
socketChannel.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
五、NIO源码分析
1.NIO中selector.open()是调用了SelectorProvider.provider().openSelector(),ServerSocketChannel.open()是调用SelectorProvider.provider().openServerSocketChannel(),两个主要组件的开启都是SelectorProvider.provider()的提供,我们先看一下这个源码。
public static SelectorProvider provider() {
//很明显provider是单例的
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
//会根据不同的操作系统创建不同的provider
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
/**
* 根据不同系统返回不同SelectorProvider.
*/
public static SelectorProvider create() {
String osname = AccessController
.doPrivileged(new GetPropertyAction("os.name"));
if (osname.equals("SunOS"))
return createProvider("sun.nio.ch.DevPollSelectorProvider");
if (osname.equals("Linux"))
return createProvider("sun.nio.ch.EPollSelectorProvider");
return new sun.nio.ch.PollSelectorProvider();
}
这一部分设计的非常好,因为NIO需要操作系统底层提供支持,这一部分代码可以根据不同的操作系统提供不同的实现,可以让我们不用关系底层如何实现的。
2.ServerSocketChannel.open()最终是new ServerSocketChannelImpl(this);this是SelectorProvider的实现类的实例化对象
ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
super(sp);
//获取文件描述符
this.fd = Net.serverSocket(true);
this.fdVal = IOUtil.fdVal(fd);
this.state = ST_INUSE;
}
3.selector.open()会根据操作系统的不同,得到的selector也会不同,如果是windows系统获取的是WindowsSelectorImpl,Linux系统是EPollSelectorImpl,还有一些操作系统会提供PollSelectorImpl。这里的EPollSelectorImpl和PollSelectorImpl对应这上次NIO基础原理中的EPoll模型和Poll模型。
首先我们来分析一下PollSelectorImpl。
PollSelectorImpl(SelectorProvider sp) {
super(sp, 1, 1);
// 本地方法,新建一个pipe,返回以long编码的管道的两个文件描述符。
//管道的读端以高32位返回,
//而写入结束以低32位返回。
//这个pipe主要用途是用来唤醒selector的
long pipeFds = IOUtil.makePipe(false);
//读文件描述符
fd0 = (int) (pipeFds >>> 32);
//写文件描述符
fd1 = (int) pipeFds;
try {
// 新建一个存fd的数组
pollWrapper = new PollArrayWrapper(INIT_CAP);
// 初始化,将pipe的fd放入数组中
pollWrapper.initInterrupt(fd0, fd1);
// 新建一个存放SelectionKey的数组
channelArray = new SelectionKeyImpl[INIT_CAP];
} catch (Throwable t) {
......
}
}
再来看看EPollSelectorImpl有什么不一样:
EPollSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
long pipeFds = IOUtil.makePipe(false);
fd0 = (int) (pipeFds >>> 32);
fd1 = (int) pipeFds;
try {
//这里不再是一个数组了
pollWrapper = new EPollArrayWrapper();
//初始化,添加用于中断的pipe
pollWrapper.initInterrupt(fd0, fd1);
fdToKey = new HashMap<>();
} catch (Throwable t) {
......
}
}
EPollArrayWrapper() throws IOException {
// 创建epoll的文件描述符
epfd = epollCreate();
int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
//开辟一个数组,存储来自epoll_wait的结果的epoll_event数组
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();
//文件描述符> 64k时需要使用eventHigh
if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
eventsHigh = new HashMap<>();
}
4.我们再来分析一下如何注册事件的。注册的时候,首先把selector和channel封装成一个SelectionKeyImpl,最终调用implRegister(),把fd添加到pollWrapper,把key添加到keys中。因为pollWrapper数据结构的不同,所以添加方式也有点区别。
poll:
protected void implRegister(SelectionKeyImpl ski) {
synchronized (closeLock) {
if (closed)
throw new ClosedSelectorException();
//检测容量是否够用
if (channelArray.length == totalChannels) {
// 新建一个更大的数组
int newSize = pollWrapper.totalChannels * 2;
SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
//拷贝
for (int i=channelOffset; i<totalChannels; i++)
temp[i] = channelArray[i];
channelArray = temp;
//扩容存储fd的数组
pollWrapper.grow(newSize);
}
channelArray[totalChannels] = ski;
ski.setIndex(totalChannels);
pollWrapper.addEntry(ski.channel);
totalChannels++;
keys.add(ski);
}
}
epoll
protected void implRegister(SelectionKeyImpl ski) {
if (closed)
throw new ClosedSelectorException();
SelChImpl ch = ski.channel;
int fd = Integer.valueOf(ch.getFDVal());
fdToKey.put(fd, ski);
pollWrapper.add(fd);
keys.add(ski);
}
5.初始化准备好了,我们再来分析一下select()是如何监听channel的,select()最终是调用doSelect(long timeout)方法,里面调用本地方法,本地方法调用的系统提供的操作,这些操作对应NIO基础原理中的三个模型。
首先来PollSelectorImpl的:
protected int doSelect(long timeout)
throws IOException
{
if (channelArray == null)
throw new ClosedSelectorException();
processDeregisterQueue();
try {
begin();
pollWrapper.poll(totalChannels, 0, timeout);
} finally {
end();
}
//清理那些已经cancelled的SelectionKey
processDeregisterQueue();
//统计有事件发生的SelectionKey数量,并把符合条件发生事件的SelectionKey添加到selectedKeys哈希表中,提供给后续使用
int numKeysUpdated = updateSelectedKeys();
// 第零个位置使用来中断的,如果不为0,则pipe中写入了数据,用于中断,这里进行重置
if (pollWrapper.getReventOps(0) != 0) {
// Clear the wakeup pipe
pollWrapper.putReventOps(0, 0);
synchronized (interruptLock) {
//将fd0的数据全部读完
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
/**
* poll0是一个本地方法,调用系统底层的实现了
* 对应poll模型
*/
int poll(int numfds, int offset, long timeout) {
return poll0(pollArrayAddress + (offset * SIZE_POLLFD),
numfds, timeout);
}
EPollSelectorImpl:
protected int doSelect(long timeout) throws IOException {
if (closed)
throw new ClosedSelectorException();
//清理那些已经cancelled的SelectionKey,底层会调用epoll_ctl方法移除被epoll所监听的文件描述符
processDeregisterQueue();
try {
begin();
pollWrapper.poll(timeout);
} finally {
end();
}
//清理那些已经cancelled的SelectionKey,底层会调用epoll_ctl方法移除被epoll所监听的文件描述符
processDeregisterQueue();
//更新epoll已选择fd的密钥。 将就绪密钥添加到就绪队列。
int numKeysUpdated = updateSelectedKeys();
//判断是否为中断,如果中断了,则清除记录的中断位置的内容
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
/**
* epollWait也是一个本地方法
* 对应epoll模型
*/
int poll(long timeout) throws IOException {
updateRegistrations();
// 调用系统底层的实现,会将有事件的fd放在pollArray中
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
// 查询是否存在中断,并且记录中断事件的位置
for (int i=0; i<updated; i++) {
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break;
}
}
return updated;
}
6.NIO还有一个小细节,就是我们最开始建立selelctor的时候,会创建一个pipe,我之前也提到了这个pipe是用来唤醒selector,selector调用select()方法后,会进入阻塞状态,如果没有事件他会一直阻塞,那么我们如何主动唤醒呢,于是就用到了这个pipe。
PollSelectorImpl和EPollSelectorImpl实现都是如下方式:
public Selector wakeup() {
synchronized (interruptLock) {
if (!interruptTriggered) {
pollWrapper.interrupt();
interruptTriggered = true;
}
}
return this;
}
我对Linux下的pipe不太了解,我猜应该是使用了中断指令。
在Windows下,WindowsSelectorImpl的实现方式:
public Selector wakeup() {
synchronized(this.interruptLock) {
if (!this.interruptTriggered) {
this.setWakeupSocket();
this.interruptTriggered = true;
}
return this;
}
}
其中setWakeupSocket()方法会调用一个本地方法setWakeupSocket0(),这个本地方法会想pipe中发送一个字节,selector就能够监听到这个pipe中有读事件,然后selector就被唤醒了。
贴一张NIO各个组件之间的关系图,看完源码后可以仔细看一下这幅图,再自己跟着源码走一遍。
6、总结和反思
NIO源码分析到此结束了,此次阅读源码过程还是有点困难,但从中获取到了很多的新知识。
1.最开始,我以为大学学的操作系统没太大用处,现在发现,涉及到底层原理时,离不开操作系统的知识。以后得抽个时间把操作系统在好好看一遍。
2.NIO中用到了Reactor设计模式,有效的解决基于轮询方式的效率低的问题
3.select、poll和epoll底层数据各不相同,poll采用链表,解决了fd数量的限制,epoll底层使用的是红黑树,能够有效的提升效率。
4.NIO并不一定是非常高效的,在连接数量大,且连接比较短的情况下,NIO效率非常高,但是在连接数量小,且一次性发送大量数据的情况下,可以选择BIO加多线程的方式处理。
5.除了NIO,还有一个AIO,以后有空可以研究研究。