BIO方式使得整个处理过程和连接时绑定,只要连接建立,无论客户端是否有消息,都要进行等待处理,一定程度上浪费了服务器端的硬件资源。
Java对于NIO的支持是通过channel和selector方式来实现,采用的方法为向channel注册感兴趣的事件,然后通过selector来获取发生了事件的Key。为实现TCP/IP+NIO方式的系统间通讯,Java提供了SocketChannel和ServerSocketChannel两个关键的类,网络IO的操作则改为ByteBuffer来实现。
Java NIO通道及通道模型:由一个专门的线程来处理所有的IO事件,并负责分发。事件驱动机制:事件到的时候触发,而不是同步的去监视事件。
线程通讯:线程之间通过wait,notify等方式通讯。保证每次上下文切换都是有意义的。减少无谓的线程切换。
Java NIO采用了双向通道channel进行数据传输,而不是单向的流stream,在通道上可以注册我们感兴趣的事件。一共有以下四种事件:
事件名对应值
服务端接收客户端连接事件
SelectionKey.OP_ACCEPT(16)
客户端连接服务端事件
SelectionKey.OP_CONNECT(8)
读事件
SelectionKey.OP_READ(1)
写事件
SelectionKey.OP_WRITE(4)
服务端和客户端各自维护一个管理通道的对象,我们称之为Selector,该对象能检测一个或多个通道channel上的事件。我们以服务端为例,如果服务端的selector注册了事件,某时刻客户端给服务端发送了一些数据,阻塞I/O这时会调用read()方法阻塞地读取数据,而NIO服务端会在selector中添加一个读事件。服务端的处理线程会轮询地访问selector,如果访问selector时发现有感兴趣的时间到达,则处理这些事件,如果没有感兴趣的事件到达,则处理线程会一直阻塞直到感兴趣的事件到达为止。
java.nio包中主要包括以下几个类或接口:
- Buffer 缓冲区,用来临时存放输入或输出数据
- Charset 用来把Unicode字符编码和其它字符编码互转
- Channel 数据传输通道,用来把Buffer中的数据写入到数据源,或者把数据源中的数据读入到Buffer。
- Selector 用来支持异步I/O操作,也叫非阻塞I/O操作
nio包中主要通过下面两个方面来提高I/O操作效率:
- 通过Buffer和Channel来提高I/O操作的速度
- 通过Selector来支持非阻塞I/O操作
与Socket和ServerSocket对应,NIO提供了SocketChannel和ServerSocketChannel对应,这两种通道同时支持一般的阻塞模式和更高效的非阻塞模式。
客户端通过SocketChannel.open()方法打开一个Socket通道,如果此时提供了SocketAddress参数,则会自动开始连接,否则需要主动调用connect()方法连接,创建连接后,可以像一般的Channel一样的用Buffer进行读写,这都是阻塞模式的。
服务器端通过ServerSocketChannel.open()创建,并使用bind()方法绑定到一个监听地址上,最后调用accept()方法阻塞等待客户端连接。当客户端连接后会返回一个SocketChannel以实现与客户端的读写交互。
多路复用套接字通道(Selector实现的非阻塞式IO)
套接字通道多路复用的思想是创建一个Selector,将多个通道对它进行注册,当套接字有关注的事件发生时,可以选出这个通道进行操作。
//创建一个选择器,可用close()关闭,isOpen()表示是否处于打开状态,他不隶属于当前线程
Selector selector = Selector.open();
//创建ServerSocketChannel,并把它绑定到指定端口上
ServerSocketChannel server = ServerSocketChannle.open();
server.bind(new InetSocketAddress("127.0.0.1",7777));
//设置为非阻塞模式,这个非常重要
server.configureBlocking(false);
//在选择器里面注册关注这个服务器套接字通道的accept事件
//ServerSocketChannel只有OP_ACCEPT可用,OP_CONNECT,OP_READ, OP_WRITE用于SocketChannel
server.register(selector,SelectionKey.OP_ACCEPT);
while(true){
//测试等待事件发生,分为直接返回的selectNow()和阻塞等待的select(),另外也可加一个参数表示阻塞超时
//停止阻塞的方法有两种:中断线程和selector.wakeup(),有事件发生时,会自动的wakeup()
//方法返回为select出的事件数
//另外务必注意一个问题是,当selector被select()阻塞时,其他的线程调用同一个selector的register也会被阻塞到select返回为止
//select操作会把发生关注事件的Key加入selectionKeys中(只管加不管减)
if(selector.select()==0){//
continue;
}
//获取发生了关注事件的Key集合,每个SelectionKey对应了一个注册的通道
Set<SelectionKey>keys = selector.selectedKeys();
//多说一句selector.keys()返回所有的SelectionKey(包括没有发生事件的)
for(SelectionKey key :keys){
//OP_ACCEPT 这个只有ServerSocketChannel才有可能触发
if(key.isAcceptable()){
//得到与客户端的套接字通信
SocketChannel channel = ((ServerSocketChannel) key.channel()).accept();
//同样设置为非阻塞模式
channel.configureBlocking(false);
//同样将与客户端的通道在selector上注册,OP_READ对应可读事件(对方有数据写入),可以通过key获取关联的选择器
channel.register(key.selector(),SelectionKey.OP_READ,ByteBuffer.allocate(1024));
}
//OP_READ 有数据可读
if(key.isReadable()){
SocketChannel channel = (SocketChannel) key.channel();
//得到附件,就是上面SocketChannel进行register的时候的第三个参数,可为随意Object
ByteBuffer buffer = (ByteBuffer)key.attachment();
//读数据,这里就简单写一下,实际上应该还是循环读取直到读不出来为止的
channel.read(buffer);
//改变自身关注事件,可以用位或操作|组合事件
key.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);
}
//OP_WRITE 可写状态,这个状态通常总是触发的,所以只在需要写操作时才进行关注
if(key.isWritable()){
//写数据掠过,可以自建buffer,也可用附件对象(看情况),主义buffer写入后需要flip
//...
//写完就把写状态关注去掉,否则会一直触发写事件
key.interestOps(SelectionKey.OP_READ);
}
//由于select操作只管对selectedKeys进行添加,所以key处理后我们需要从里面把key去掉
keys.remove(key);
}
服务器端程序
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.nio.channels.SocketChannel;
public class NIOServer {
//标志性数字
private static int flag = 0;
//定义缓冲区大小
private static int block = 4096;
//接收缓冲区
private static ByteBuffer receiveBuffer = ByteBuffer.allocate(block);
//发送缓冲区
private static ByteBuffer sendBuffer = ByteBuffer.allocate(block);
//定义Selector
private Selector selector;
public NIOServer(int port)throws IOException{
//打开服务器套接字通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//服务器配置为非阻塞
serverSocketChannel.configureBlocking(false);
//检索与此服务器套接字通道关联的套接字
ServerSocket serverSocket = serverSocketChannel.socket();
//进行服务的绑定
serverSocket.bind(new InetSocketAddress(port));
//通过open()方法找到Selector
selector = Selector.open();
//注册到selector
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Server Start ------- 8888:");
}
//监听
public void listen() throws IOException{
while(true){
//监控所有注册的channel,当其中注册的IO操作可以进行时,并将对应的SelectionKey加入selected-key set
selector.select();
//Selected-key set 代表了所有通过select()方法监测到可以进行IO操作发的channel,这个集合可以通过selectedKeys()拿到
Set<SelectionKey>selectionKeys=selector.selectedKeys();
Iterator<SelectionKey>iterator = selectionKeys.iterator();
while(iterator.hasNext()){
SelectionKey selectionKey = iterator.next();
handleKey(selectionKey);
iterator.remove();
}
}
}
//处理请求
public void handleKey(SelectionKey selectionKey)throws IOException{
//接受请求
ServerSocketChannel serverSocketChannel = null;
SocketChannel socketChannel = null;
String receiveText;
String sendText;
int count;
//测试此键的通道是否准备好接受新的套接字连接
if(selectionKey.isAcceptable()){
//返回创建此键的通道
serverSocketChannel = (ServerSocketChannel)selectionKey.channel();
//接受客户端建立连接的请求,并返回SocketChannel对象
socketChannel = serverSocketChannel.accept();
//配置为非阻塞
socketChannel.configureBlocking(false);
//注册到selector
socketChannel.register(selector, SelectionKey.OP_READ);
}else if (selectionKey.isReadable()){
//返回为之创建此键的通道
socketChannel = (SocketChannel)selectionKey.channel();
//将缓冲区清空,以备下次读取
receiveBuffer.clear();
//将发送来的数据读取到缓冲区
count = socketChannel.read(receiveBuffer);
if(count>0){
receiveText = new String(receiveBuffer.array(),0,count);
System.out.println("服务器端接受到的数据————"+receiveText);
socketChannel.register(selector, SelectionKey.OP_WRITE);
}
}else if (selectionKey.isWritable()){
//将缓冲区清空以备下次写入
sendBuffer.clear();
//返回为之创建此键的通道
socketChannel = (SocketChannel)selectionKey.channel();
sendText = "message from server--"+flag++;
//向缓冲区中输入数据
sendBuffer.put(sendText.getBytes());
//将缓冲区各标志复位,因为向里面put了数据标志被改变要想从中读取数据发向服务器,就要复位
sendBuffer.flip();
//输出到通道
socketChannel.write(sendBuffer);
System.out.println("服务器端向客户端发送数据--:"+sendText);
socketChannel.register(selector,SelectionKey.OP_READ);
}
}
public static void main(String[] args)throws IOException {
// TODO Auto-generated method stub
int port = 8888;
NIOServer server = new NIOServer(port);
server.listen();
}
}