BIO(同步阻塞I/O模式)
数据的读取比较阻塞在一个线程中进行,内核调用read()、write()、accept()函数均阻塞
缺点
- IO代码里read操作是阻塞操作,如果连接不做数据读写操作会导致线程阻塞,浪费资源
- 线程很多,会导致服务器线程太多,压力太大
BIO模式实现一个聊天室程序
// 聊天室程序服务端
import static java.util.concurrent.Executors.newFixedThreadPool;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
public class Server {
private static final ExecutorService EXECUTOR_SERVICE = newFixedThreadPool(10);
private List<Socket> sockets;
private int port;
private ServerSocket serverSocket;
private boolean isClosed = false;
public Server(int port) {
this.port = port;
sockets = new ArrayList<>();
}
public void start() {
try {
serverSocket = new ServerSocket(port);
System.out.println("服务启动成功,监听端口:" + port);
while (!isClosed) {
Socket socket = serverSocket.accept();
System.out.println(socket.getRemoteSocketAddress() + "连接上线...");
sockets.add(socket);
EXECUTOR_SERVICE.submit(new SocketThread(socket));
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (serverSocket != null) {
try {
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
for (Socket socket : sockets) {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
private class SocketThread implements Runnable {
private Socket socket;
public SocketThread(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
String msg = "欢迎【"
+ socket.getRemoteSocketAddress() + "】进入聊天室!当前聊天室有【"
+ sockets.size() + "】人";
sendMsg(msg);
BufferedReader reader = null;
try {
reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String line;
while ((line = reader.readLine()) != null) {
msg = "【" + socket.getRemoteSocketAddress() + "】:" + line;
sendMsg(msg);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
private void sendMsg(String msg) {
for (Socket socket : sockets) {
PrintWriter pw = null;
try {
pw = new PrintWriter(socket.getOutputStream(), true);
pw.println(msg);
pw.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public boolean isClosed() {
return isClosed;
}
public void setClosed(boolean closed) {
isClosed = closed;
}
}
import static java.util.concurrent.Executors.newFixedThreadPool;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
// 聊天室程序客户端
public class Client {
private static final ExecutorService EXECUTOR_SERVICE = newFixedThreadPool(1);
private String serverIp;
private int serverPort;
private Socket socket;
private boolean isClosed;
public Client(String serverIp, int serverPort) {
this.serverIp = serverIp;
this.serverPort = serverPort;
}
public void start() {
try {
socket = new Socket(serverIp, serverPort);
EXECUTOR_SERVICE.submit(new ClientThread(socket));
BufferedReader reader = null;
try {
reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String line;
while ((line = reader.readLine()) != null) {
System.out.println(line);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
private class ClientThread implements Runnable {
private Socket socket;
public ClientThread(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
Scanner scanner = new Scanner(System.in);
System.out.println("请留言:");
while (!isClosed) {
String line = scanner.nextLine();
PrintWriter pw = null;
try {
pw = new PrintWriter(socket.getOutputStream(), true);
pw.println(line);
pw.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public boolean isClosed() {
return isClosed;
}
public void setClosed(boolean closed) {
isClosed = closed;
}
}
NIO(同步非阻塞IO模式)
服务器实现模式为一个线程可以处理多个请求(连接),客户端发送的连接请求都会注册到多路复用器selector上,多路复用器轮询到连接有IO请求就进行处理。
NIO实现一个聊天室
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
public class NIOServer {
private int port;
private ServerSocketChannel serverSocketChannel;
private Selector selector;
private volatile boolean isClosed;
private List<SocketChannel> socketChannels = new ArrayList<>();
public NIOServer(int port) {
this.port = port;
}
public void start() {
try {
serverSocketChannel = ServerSocketChannel.open();
// 绑定端口
serverSocketChannel.socket().bind(new InetSocketAddress(port));
// 设置非阻塞
serverSocketChannel.configureBlocking(false);
selector = Selector.open();
// 把ServerSocketChannel注册到selector上,并且selector对客户端accept连接操作感兴趣
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (!isClosed) {
System.out.println("等待客户端链接...");
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey selectionKey = keyIterator.next();
handler(selectionKey);
keyIterator.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void handler(SelectionKey selectionKey) throws IOException {
if (selectionKey.isAcceptable()) {
acceptHandler(selectionKey);
}
if (selectionKey.isReadable()) {
readHandler(selectionKey);
}
}
private void acceptHandler(SelectionKey selectionKey) throws IOException {
System.out.println("有新客户端链接...");
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ);
socketChannels.add(socketChannel);
String msg = "欢迎【"
+ socketChannel.getRemoteAddress() + "】进入聊天室!当前聊天室有【"
+ socketChannels.size() + "】人";
print(msg);
}
private void readHandler(SelectionKey selectionKey) throws IOException {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int len = socketChannel.read(byteBuffer);
if (len != -1) {
String line = "【" + socketChannel.getRemoteAddress() + "】:" + new String(byteBuffer.array(), 0, len);
System.out.println(line);
print(line);
}
selectionKey.interestOps(SelectionKey.OP_READ);
}
private void print(String line) throws IOException {
for (SocketChannel channel : socketChannels) {
ByteBuffer buffer = ByteBuffer.wrap(line.getBytes());
channel.write(buffer);
}
}
public static void main(String[] args) {
NIOServer server = new NIOServer(9023);
server.start();
}
}
import static java.util.concurrent.Executors.newFixedThreadPool;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.ExecutorService;
public class NIOClient {
private static final ExecutorService EXECUTOR_SERVICE = newFixedThreadPool(1);
private String ip;
private int port;
private volatile boolean isClosed;
private SocketChannel socketChannel;
public NIOClient(String ip, int port) {
this.ip = ip;
this.port = port;
}
public void start() {
try {
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress(ip, port));
Selector selector = Selector.open();
socketChannel.register(selector, SelectionKey.OP_CONNECT);
EXECUTOR_SERVICE.submit(new ChatThread());
while (!isClosed) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
handler(selectionKey);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void handler(SelectionKey selectionKey) throws IOException {
if (selectionKey.isConnectable()) {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
if (socketChannel.isConnectionPending()) {
socketChannel.finishConnect();
}
socketChannel.configureBlocking(false);
socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ);
}
if (selectionKey.isReadable()) {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int len = socketChannel.read(byteBuffer);
if (len != -1) {
String line = new String(byteBuffer.array(), 0, len);
System.out.println(line);
}
selectionKey.interestOps(SelectionKey.OP_READ);
}
}
private class ChatThread implements Runnable {
@Override
public void run() {
Scanner scanner = new Scanner(System.in);
System.out.println("请留言:");
while (!isClosed) {
String line = scanner.nextLine();
ByteBuffer byteBuffer = ByteBuffer.wrap(line.getBytes());
try {
socketChannel.write(byteBuffer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
NIOClient client = new NIOClient("127.0.0.1", 9023);
client.start();
}
}
AIO(异步非阻塞IO)
异步 IO 是基于事件和回调机制实现的,也就是应用操作之后会直接返回,不会堵塞在那里,当后台处理完成,操作系统会通知相应的线程进行后续的操作。
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class AIOServer {
private Charset charset = Charset.forName("utf-8");
private int port;
private AsynchronousChannelGroup channelGroup;
private AsynchronousServerSocketChannel serverSocketChannel;
private List<AsynchronousSocketChannel> socketChannels;
private boolean isClosed;
public AIOServer(int port) {
this.port = port;
socketChannels = new ArrayList<>();
}
private void start() throws IOException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
channelGroup = AsynchronousChannelGroup.withThreadPool(executorService);
serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);
serverSocketChannel.bind(new InetSocketAddress(port));
System.out.println("启动服务器,监听端口:"+port);
serverSocketChannel.accept(null, new AcceptHandler());
//阻塞式调用,防止占用系统资源
System.in.read();
}
private void print(String line) throws IOException {
for (AsynchronousSocketChannel channel : socketChannels) {
ByteBuffer buffer = send(line);
channel.write(buffer);
}
}
private class ClientHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel socketChannel;
public ClientHandler(AsynchronousSocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
@Override
public void completed(Integer result, ByteBuffer buffer) {
buffer.flip();
try {
String receive = receive(buffer);
SocketAddress remoteAddress = socketChannel.getRemoteAddress();
if ("quit".equalsIgnoreCase(receive)) {
System.out.println(remoteAddress + "已下线...");
socketChannels.remove(socketChannel);
String msg = "【"
+ remoteAddress + "】退出聊天室!当前聊天室有【"
+ socketChannels.size() + "】人";
print(msg);
socketChannel.close();
return;
}
String line = "【" + remoteAddress + "】:" + receive;
System.out.println(line);
print(line);
buffer.clear();
socketChannel.read(buffer, buffer,this);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ByteBuffer buffer) {
}
}
private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
@Override
public void completed(AsynchronousSocketChannel socketChannel, Object attachment) {
if (serverSocketChannel.isOpen()){
serverSocketChannel.accept(null,this);
}
if (socketChannel.isOpen()) {
socketChannels.add(socketChannel);
try {
String msg = "欢迎【"
+ socketChannel.getRemoteAddress() + "】进入聊天室!当前聊天室有【"
+ socketChannels.size() + "】人";
print(msg);
ByteBuffer buffer = ByteBuffer.allocate(1024);
ClientHandler clientHandler = new ClientHandler(socketChannel);
socketChannel.read(buffer, buffer, clientHandler);
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("error");
}
}
private ByteBuffer send(String msg) {
return charset.encode(msg);
}
private String receive(ByteBuffer buffer) {
CharBuffer charBuffer = charset.decode(buffer);
return String.valueOf(charBuffer);
}
public static void main(String[] args) {
AIOServer server = new AIOServer(9090);
try {
server.start();
} catch (IOException e) {
e.printStackTrace();
}
}
}
import static java.util.concurrent.Executors.newFixedThreadPool;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
public class AIOClient {
private Charset charset = Charset.forName("utf-8");
private static final ExecutorService EXECUTOR_SERVICE = newFixedThreadPool(1);
private String ip;
private int port;
private volatile boolean isClosed;
private AsynchronousSocketChannel socketChannel;
public AIOClient(String ip, int port) {
this.ip = ip;
this.port = port;
}
public void start() {
try {
socketChannel = AsynchronousSocketChannel.open();
socketChannel.connect(new InetSocketAddress(ip, port)).get();
EXECUTOR_SERVICE.submit(new ChatThread(Thread.currentThread()));
while (!isClosed) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
int length = socketChannel.read(buffer).get();
if (length > 0) {
buffer.flip();
System.out.println(receive(buffer));
buffer.clear();
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
System.out.println("线程中断退出...");
try {
socketChannel.close();
} catch (IOException e1) {
e1.printStackTrace();
}
EXECUTOR_SERVICE.shutdown();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
private class ChatThread implements Runnable {
private Thread mainThread;
public ChatThread(Thread mainThread) {
this.mainThread = mainThread;
}
@Override
public void run() {
Scanner scanner = new Scanner(System.in);
System.out.println("请留言:");
while (!isClosed) {
String line = scanner.nextLine();
socketChannel.write(send(line));
if ("quit".equalsIgnoreCase(line)) {
isClosed = true;
mainThread.interrupt();
}
}
}
}
private ByteBuffer send(String msg) {
return charset.encode(msg);
}
private String receive(ByteBuffer buffer) {
CharBuffer charBuffer = charset.decode(buffer);
return String.valueOf(charBuffer);
}
public static void main(String[] args) {
AIOClient client = new AIOClient("127.0.0.1", 9090);
client.start();
}
}