文章内容来自李林峰的《Netty权威指南》
JDK的NIO类库
-
缓冲区Buffer
- ByteBuffer:字节缓冲区
- CharBuffer:字符缓冲区
- ShortBuffer:短整型缓冲区
- IntBuffer:整型缓冲区
- LongBuffer:长整型缓冲区
- FloatBuffer:浮点型缓冲区
- DoubleBuffer:双精度浮点型缓冲区
通道Channel
网络数据通过channel读取和写入,全双工模式多路复用器Selector
提供选择已就绪任务的能力,selector会不断轮询注册在上面的channel,某个channel发生读或写事件,就处于就绪状态,会被selector轮询出来,然后通过SelectionKey获取就绪Channel的集合,进行后续IO操作
NIO服务端
-
序列图
-
步骤
- 打开ServerSocketChannel,用于监听客户端的连接,是所有客户端连接的父管道
ServerSocketChannel serverChannel=ServerSocketChannel.open();
- 绑定监听端口,设置为非阻塞模式
serverChannel.configureBlocking(false); serverChannel.socket().bind(new InetSocketAddress(port),1024); (注:1024指serverChannel的backlog为1024)
- 创建Reactor线程,创建多路复用器Selector并启动线程
selector=Selector.open(); new Thread(new ReactorTask()).start();
- 将ServerSocketChannel 注册到Reactor线程的多路复用器Selector,监听accept事件
serverChannel.register(selector,SelectionKey.OP_ACCEPT);
- 多路复用器在线程run方法的无限循环体内轮询准备就绪的key
selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectedKeys.iterator(); SelectionKey key = null; while (iterator.hasNext()){ key = iterator.next(); iterator.remove(); try { handleInput(key); } catch (Exception e) { if(key!=null){ key.cancel(); if(key.channel()!=null){ key.channel().close(); } } } }
- 多路复用器监听到新客户端的接入,处理新的接入请求,完成TCP三次握手,建立物理链路
SocketChannel sc=serverChannel.accept();
- 设置客户端链路为非阻塞模式
sc.configureBlocking(false);
- 将新接入的客户端连接注册到Reactor线程的多路复用器,监听读操作,读取客户端发送的网络消息
sc.register(selector, SelectionKey.OP_READ);
- 异步读取客户端请求消息到缓冲区
ByteBuffer readBuffer=ByteBuffer.allocate(1024); int readBytes=sc.read(readBuffer);
- 对ByteBuffer进行编解码,如果有半包消息指针reset,继续读取后续的报文,将解码成功的消息封装成task,放入线程池中,进行业务操作
List<Object> messageList=null; while(buffer.hasRemain()){ byteBuffer.mark(); Object message=decode(byteBuffer); if(message==null){ byteBuffer.reset(); break; } messageList.add(message); } if(!byteBuffer.hasRemain()){ byteBuffer.clear(); }else{ byteBuffer.compact(); } if(!messageList!=null & !messageList.isEmpty()){ for(Object messageE : messageList){ handlerTask(messageE); } }
- 将POJO对象encode成ByteBuffer,调用SocketChannel的异步write接口,将消息异步发送给客户端
channel.write(buffer);
NIO客户端
-
序列图
-
步骤
- 打开SocketChannel,绑定客户端本地地址
SocketChannel socketChannel = SocketChannel.open();
- 设置SocketChannel 为非阻塞模式,同时设置客户端连接TCP参数
socketChannel.configureBlocking(false); socket.setSendBufferSize(BUFFER_SIZE); ......
- 异步连接服务端
socketChannel.connect(new InetSocketAddress(host, port);
- 判断是否连接成功,如果连接成功,注册SelectionKey.OP_READ到多路复用器,如果没有说明服务端没返回TCP握手应答,链路没有建立,需要将socketChannel注册到selector,注册SelectionKey.OP_CONNECT
if (socketChannel.connect(new InetSocketAddress(host, port))) { socketChannel.register(selector, SelectionKey.OP_READ); } else { socketChannel.register(selector, SelectionKey.OP_CONNECT); }
- 向Reactor线程的多路复用器Selector注册OP_CONNECT状态位,监听服务端ACK应答
socketChannel.register(selector, SelectionKey.OP_CONNECT);
- 创建selector,创建多路复用器并启动线程
selector = Selector.open();
new Thread(new ReactorTask()).start();
- selector在无限循环体轮询就绪的key
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectedKeys.iterator();
SelectionKey key = null;
while (iterator.hasNext()){
key = iterator.next();
iterator.remove();
try {
handleInput(key);
} catch (Exception e) {
if(key!=null){
key.cancel();
if(key.channel()!=null){
key.channel().close();
}
}
}
}
- 接受connect事件并处理
if (key.isConnectable()) {
handlerConnect();
}
- 判断连接是否完成,完成则注册读操作到selector
if (key.isReadable()) {
registerRead();
}
- 向多路复用器selector注册OP_READ事件
socketChannel.register(selector, SelectionKey.OP_READ);
- 异步读请求消息到ByteBuffer
int readBytes = sc.read(byteBuffer);
- 对ByteBuffer进行编解码,如果有半包消息指针reset,继续读取后续的报文,将解码成功的消息封装成task,放入线程池中,进行业务操作
List<Object> messageList=null;
while(buffer.hasRemain()){
byteBuffer.mark();
Object message=decode(byteBuffer);
if(message==null){
byteBuffer.reset();
break;
}
messageList.add(message);
}
if(!byteBuffer.hasRemain()){
byteBuffer.clear();
}else{
byteBuffer.compact();
}
if(!messageList!=null & !messageList.isEmpty()){
for(Object messageE : messageList){
handlerTask(messageE);
}
}
- 将POJO对象encode成ByteBuffer,调用SocketChannel的异步write接口,将消息异步发送给客户端
channel.write(buffer);