一 概念
一般基于消息形式实现的系统间的通信方式有四种:TCP/IP+BIO、TCP/IP+NIO、UDP/IP+BIO、UDP/IP+BIO。其中BIO代表阻塞IO,NIO代表非阻塞IO。
这里先介绍几种IO模型
二 几种IO模型
在介绍几种IO模型之前需要先有一个概念,假设存在系统A、B。当A向B发送信息时,B接收这样一个输入操作通常有两个阶段:
- B系统等待数据从网络中到达,并将其复制到操作系统内核缓冲区。
- 将数据从操作系统内核复制到应用程序缓冲区。
有了以上概念之后可以开始了解下面的几种IO模型。
1. 阻塞式IO模型
可以从其IO流程中看出,阻塞式IO会在接收数据的时候一直阻塞直到数据被复制到应用程序缓冲区为止。同样的,发送时的阻塞也如此。
下面给出基于java实现的TCP/IP+阻塞式IO。
- 服务器端代码:
public class BIOServer {
public static void main(String[] args) throws IOException {
//打开通道并绑定ip与端口
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8888));
while (true) {
//会阻塞到接收到客户端连接请求为止
SocketChannel channel = serverSocketChannel.accept();
String msg = "Hello Client";
//发送消息,也会阻塞
channel.write(ByteBuffer.wrap(msg.getBytes()));
ByteBuffer buffer = ByteBuffer.allocate(1024);
//读取消息,也是阻塞的
channel.read(buffer);
byte[] buf = buffer.array();
String receiveContent = new String(buf).trim();
System.out.println(receiveContent);
}
}
}
- 客户端代码:
public class BIOClient {
public static void main(String[] args) throws IOException {
SocketChannel channel = SocketChannel.open();
channel.connect(new InetSocketAddress("localhost", 8888));
ByteBuffer buffer = ByteBuffer.allocate(128);
channel.read(buffer);
byte[] buf = buffer.array();
String receiveContent = new String(buf).trim();
System.out.println(receiveContent);
String msg = "Hello Server";
channel.write(ByteBuffer.wrap(msg.getBytes()));
}
}
这种方式的优点很明显,就是代码简单,容易上手。缺点在于服务器端在面临多个客户端连接时显得处理太慢,但该问题可以通过开启多个线程的方法解决,但大量的线程会导致内存消耗严重。
2. 非阻塞式IO模型
可以看出,非阻塞式IO模型与阻塞式IO模型的区别是当操作系统内核没有将数据准备好的时候其会直接返回而不是阻塞在当前。但实际上这样也引出了一个问题,那就是需要通过轮询的方式去不断的判断是否操作系统内核已经准备好了对应的数据。这种方式有个很大的缺点就是消耗大量CPU的时间。
在java中设置非阻塞式IO只需要进行如下配置:
serverSocketChannel.configureBlocking(false);
2. IO复用模型
IO复用模型一般是通过调用select或poll去阻塞在系统调用上而不是全部的IO过程。
可以看出,进程阻塞在select函数直到操作系统将数据准备好之后再调用read从内核取到数据。初步看起来似乎IO复用与IO阻塞比较起来没有什么优势,但试想有这样一个场景:
A与B通信时,双方都有发信息与收信息的需求。如果采用IO阻塞模型,那么一方想要做一个IO动作(例如发信息)之前必须先收到对方的信息,例如在上面IO阻塞模型中Client代码中,如果要向Server发送信息必须先收到来自Server的信息。当然解决这个问题可以采用多线程的方式为发送与接收都单独开一个线程。
而在IO复用模式中,select指示内核等待多个事件中的任意一个发生都可以返回,这避免了多线程的开销。
- 服务器代码
public class NIOServer {
private static Selector selector;
private static ServerSocketChannel serverSocketChannel;
static {
try {
//获取selector,是用来判定要取哪些操作的
selector = Selector.open();
//获取通道
serverSocketChannel = ServerSocketChannel.open();
//设置非阻塞模式
serverSocketChannel.configureBlocking(false);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(4567));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
//阻塞方法,注意这里的阻塞是阻塞到select方法而不是io。
selector.select();
Set<SelectionKey> sk = selector.selectedKeys();
Iterator<SelectionKey> iterator = sk.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
if (selectionKey.isAcceptable()) {
handleAccept(selectionKey);
} else if (selectionKey.isReadable()) {
handleRead(selectionKey);
}
}
}
}
private static void handleAccept(SelectionKey key) throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
String msg = "Hello Client";
socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
socketChannel.register(selector, SelectionKey.OP_READ);
}
private static void handleRead(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(128);
socketChannel.read(buffer);
byte[] data = buffer.array();
String msg = new String(data).trim();
System.out.println("server received msg form client" + msg);
}
}
- 客户端代码
public class NIOClient {
private static Selector selector;
public static void main(String[] args) throws IOException {
selector = Selector.open();
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
channel.connect(new InetSocketAddress("localhost", 4567));
channel.register(selector, SelectionKey.OP_CONNECT);
while (true) {
selector.select();
Set<SelectionKey> sk = selector.selectedKeys();
Iterator<SelectionKey> iterator = sk.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isConnectable()) {
handleConnect(key);
} else if (key.isReadable()) {
handleRead(key);
}
}
}
}
private static void handleConnect(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
if (channel.isConnectionPending()) {
channel.finishConnect();
}
channel.configureBlocking(false);
String msg = "Hello Server";
channel.write(ByteBuffer.wrap(msg.getBytes()));
channel.register(selector, SelectionKey.OP_READ);
}
private static void handleRead(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(128);
channel.read(buffer);
byte[] data = buffer.array();
String msg = new String(data).trim();
System.out.println("client received form server:"+msg);
}
}