架构设计:系统间通信(3)——IO通信模型和JAVA实践 上篇

1、全文摘要

系统间通信本来是一个很大的概念,我们首先从通信模型开始讲解。在理解了四种通信模型的工作特点和区别后,对于我们后文介绍搭建在其上的各种通信框架,集成思想都是有益的。

目前常用的IO通信模型包括四种(这里说的是网络IO):阻塞式同步IO、非阻塞式同步IO、多路复用IO、和真正的异步IO。这些IO模式都是要靠操作系统进行支持,应用程序只是提供相应的实现,对操作系统进行调用。

上篇中,首先介绍传统的阻塞式同步IO和非阻塞式同步IO两种工作模式,然后使用JAVA进行实现;下篇,对多路复用IO工作模式和异步IO工作模式进行介绍,并介绍JAVA对这两种工作模式的支持。

2、传统阻塞模式(BIO)

这个小节的介绍,在《架构设计:系统间通信(1)——概述从“聊天”开始上篇》这篇文章中已经说明了,这里只是“接着讲”,您可以理解成“在概述的基础上继续深入写”。BIO就是:blocking IO。最容易理解、最容易实现的IO工作方式,应用程序向操作系统请求网络IO操作,这时应用程序会一直等待;另一方面,操作系统收到请求后,也会等待,直到网络上有数据传到监听端口;操作系统在收集数据后,会把数据发送给应用程序;最后应用程序收到数据,并解除等待状态。如下图所示:

image

(请您注意,上面中交互的两个元素是应用程序和它所使用的操作系统)就TCP协议来说,整个过程实际上分为三个步骤:三次握手建立连接、传输数据(包括验证和重发)、断开连接。当然,断开连接的过程并不在我们讨论的IO的主要过程中。但是我们讨论IO模型,应该把建立连接和传输数据的这两个过程分开讨论。

2-1、JAVA对阻塞模式的支持

JAVA对阻塞模式的支持,就是java.net包中的Socket套接字实现。这里要说明一下,Socket套接字是TCP/UDP等传输协议的实现。例如客户端使用TCP协议连接这台服务器的时候,当TCP三次握手后,应用程序就会创建一个socket套接字对象(注意,这里还没有进行数据内容的传输),当这个TCP连接出现数据传输时,socket套接字就会把数据传输的表现告诉程序员(例如read方法接触阻塞状态)

下面这段代码时java对阻塞模式的支持:

package testBSocket;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;

public class SocketServer1 {

    static {
        BasicConfigurator.configure();
    }

    /**
     * 日志
     */
    private static final Log LOGGER = LogFactory.getLog(SocketServer1.class);

    public static void main(String[] args) throws Exception{
        ServerSocket serverSocket = new ServerSocket(83);

        try {
            while(true) {
                //这里JAVA通过JNI请求操作系统,并一直等待操作系统返回结果(或者出错)
                Socket socket = serverSocket.accept();

                //下面我们收取信息(这里还是阻塞式的,一直等待,直到有数据可以接受)
                InputStream in = socket.getInputStream();
                OutputStream out = socket.getOutputStream();
                Integer sourcePort = socket.getPort();
                int maxLen = 2048;
                byte[] contextBytes = new byte[maxLen];
                int realLen;
                StringBuffer message = new StringBuffer();
                //read的时候,程序也会被阻塞,直到操作系统把网络传来的数据准备好。
                while((realLen = in.read(contextBytes, 0, maxLen)) != -1) {
                    message.append(new String(contextBytes , 0 , realLen));
                    /*
                     * 我们假设读取到“over”关键字,
                     * 表示客户端的所有信息在经过若干次传送后,完成
                     * */
                    
                }
                //下面打印信息
                SocketServer1.LOGGER.info("服务器收到来自于端口:" + sourcePort + "的信息:" + message);

                //下面开始发送信息
                out.write("回发响应信息!".getBytes());

                //关闭
                out.close();
                in.close();
                socket.close();
            }
        } catch(Exception e) {
            SocketServer1.LOGGER.error(e.getMessage(), e);
        } finally {
            if(serverSocket != null) {
                serverSocket.close();
            }
        }
    }
}

上面的服务器端代码可以直接运行。代码执行到serverSocket.accept()的位置就会等待,这个调用的含义是应用程序向操作系统请求客户端连接的接收,这里代码会阻塞,而底层调用的位置在DualStackPlainSocketImpl这个类里面(注意我使用的测试环境时window8,所以是由这个类处理;如果您是在windows7环境下进行测试,那么处理类TwoStacksPlainSocketImpl,这是Windows环境;如果您使用的测试环境时Linux,那么视Linux内核版本而已,具体的处理类又是不一样的)。

image
image

2-2、存在的问题

很明显,我们在代码里面并没有设置timeout属性,所以运行的是“if”这段的代码,很明显在调用JNI后,下层也在等待有客户端连接上来。这种调用方式当然有问题:

  • 同一时间,服务器只能接受来自于客户端A的请求信息;虽然客户端A和客户端B的请求是同时进行的,但客户端B发送的请求信息只能等到服务器接受完A的请求数据后,才能被接受。
  • 由于服务器一次只能处理一个客户端请求,当处理完成并返回后(或者异常时),才能进行第二次请求的处理。很显然,这样的处理方式在高并发的情况下,是不能采用的。
  • 实际上以上的问题是可以通过多线程来解决的,实际上就是当accept接收到一个客户端的连接后,服务器端启动一个显得线程,来读写客户端的数据,并完成相应的业务处理。但是你无法影响操作系统底层的“同步IO”机制。

3、非阻塞模式

一定注意:阻塞/非阻塞的描述是针对应用程序中的线程进行的,对于阻塞方式的一种改进是应用程序将其“一直等待”的状态主动打开,如下图所示:

image

这种模式下,应用程序的线程不再一直等待操作系统的IO状态,而是在等待一段时间后,就解除阻塞。如果没有得到想要的结果,则再次进行相同的操作。这样的工作方式,暴增了应用程序的线程可以不会一直阻塞,而是可以进行一些其他工作。

3-1、JAVA对非阻塞模式的支持

那么JAVA中是否支持这种非阻塞IO的工作模式呢?我们继续分析DualStackPlainSocketImpl中的accept0实现:

image

那么timeout实在哪里设置的呢?在ServerSocket中,调用了DualStackPlainSocketImpl的父类SocketImpl进行timeout的设置:

image

ServerSocket中的setSoTimeout方法也有相应的注释说明:

Enable/disable SO_TIMEOUT with the specified timeout,in milliseconds.With this option set to a non-zero timeout,a call to accept() for this ServerSocket will block for only this amount of time.If the timeout expires,a java.net.SocketTimeoutException is raised,though the ServerSocket is still vaild.The option must be enabled prior to entering the blocking operation to have effect.The timeout must be >0.A timeout of zero is interpreted as an infinite timeout.

那么java中对非阻塞IO的支持如下:

package testBSocket;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;

public class SocketServer2 {

    static {
        BasicConfigurator.configure();
    }

    private static Object xWait = new Object();

    /**
     * 日志
     */
    private static final Log LOGGER = LogFactory.getLog(SocketServer2.class);

    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = null;

        try {
            serverSocket = new ServerSocket(83);
            serverSocket.setSoTimeout(100);
            while(true) {
                Socket socket = null;
                try {
                    socket = serverSocket.accept();
                } catch(SocketTimeoutException e1) {
                    //===========================================================
                    //      执行到这里,说明本次accept没有接收到任何数据报文
                    //      主线程在这里就可以做一些事情,记为X
                    //===========================================================
                    synchronized (SocketServer2.xWait) {
                        SocketServer2.LOGGER.info("这次没有从底层接收到任务数据报文,等待10毫秒,模拟事件X的处理时间");
                        SocketServer2.xWait.wait(10);
                    }
                    continue;
                }

                InputStream in = socket.getInputStream();
                OutputStream out = socket.getOutputStream();
                Integer sourcePort = socket.getPort();
                int maxLen = 2048;
                byte[] contextBytes = new byte[maxLen];
                int realLen;
                StringBuffer message = new StringBuffer();
                //下面我们收取信息(这里还是阻塞式的,一直等待,直到有数据可以接受)
                while((realLen = in.read(contextBytes, 0, maxLen)) != -1) {
                    message.append(new String(contextBytes , 0 , realLen));
                    /*
                     * 我们假设读取到“over”关键字,
                     * 表示客户端的所有信息在经过若干次传送后,完成
                     * */
                    if(message.indexOf("over") != -1) {
                        break;
                    }
                }
                //下面打印信息
                SocketServer2.LOGGER.info("服务器收到来自于端口:" + sourcePort + "的信息:" + message);

                //下面开始发送信息
                out.write("回发响应信息!".getBytes());

                //关闭
                out.close();
                in.close();
                socket.close();
            } 
        } catch(Exception e) {
            SocketServer2.LOGGER.error(e.getMessage(), e);
        } finally {
            if(serverSocket != null) {
                serverSocket.close();
            }
        }
    }
}

执行效果如下:

image

这里我们针对了SocketServer增加了阻塞等待时间,实际上只实现了非阻塞IO模型中的第一步:监听连接状态的非阻塞。通过运行代码,我们可以发现read()方法还是被阻塞的,说明socket套接字等待数据读取的过程,还是被阻塞的。

3-2、继续改进

那么,我们能不能改进read()方式,让它变成非阻塞模式呢?当然是可以,socket套接字同样支持等待超时时间设置。代码如下:

package testBSocket;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;

public class SocketServer3 {

    static {
        BasicConfigurator.configure();
    }

    private static Object xWait = new Object();

    /**
     * 日志
     */
    private static final Log LOGGER = LogFactory.getLog(SocketServer3.class);

    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = null;

        try {
            serverSocket = new ServerSocket(83);
            serverSocket.setSoTimeout(100);
            while(true) {
                Socket socket = null;
                try {
                    socket = serverSocket.accept();
                } catch(SocketTimeoutException e1) {
                    //===========================================================
                    //      执行到这里,说明本次accept没有接收到任何TCP连接
                    //      主线程在这里就可以做一些事情,记为X
                    //===========================================================
                    synchronized (SocketServer3.xWait) {
                        SocketServer3.LOGGER.info("这次没有从底层接收到任何TCP连接,等待10毫秒,模拟事件X的处理时间");
                        SocketServer3.xWait.wait(10);
                    }
                    continue;
                }

                InputStream in = socket.getInputStream();
                OutputStream out = socket.getOutputStream();
                Integer sourcePort = socket.getPort();
                int maxLen = 2048;
                byte[] contextBytes = new byte[maxLen];
                int realLen;
                StringBuffer message = new StringBuffer();
                //下面我们收取信息(设置成非阻塞方式,这样read信息的时候,又可以做一些其他事情)
                socket.setSoTimeout(10);
                BIORead:while(true) {
                    try {
                        while((realLen = in.read(contextBytes, 0, maxLen)) != -1) {
                            message.append(new String(contextBytes , 0 , realLen));
                            /*
                             * 我们假设读取到“over”关键字,
                             * 表示客户端的所有信息在经过若干次传送后,完成
                             * */
                            if(message.indexOf("over") != -1) {
                                break BIORead;
                            }
                        }
                    } catch(SocketTimeoutException e2) {
                        //===========================================================
                        //      执行到这里,说明本次read没有接收到任何数据流
                        //      主线程在这里又可以做一些事情,记为Y
                        //===========================================================
                        SocketServer3.LOGGER.info("这次没有从底层接收到任务数据报文,等待10毫秒,模拟事件Y的处理时间");
                        continue;
                    }
                }
                //下面打印信息
                SocketServer3.LOGGER.info("服务器收到来自于端口:" + sourcePort + "的信息:" + message);

                //下面开始发送信息
                out.write("回发响应信息!".getBytes());

                //关闭
                out.close();
                in.close();
                socket.close();
            } 
        } catch(Exception e) {
            SocketServer3.LOGGER.error(e.getMessage(), e);
        } finally {
            if(serverSocket != null) {
                serverSocket.close();
            }
        }
    }
}

这样一来,我们利用JAVA实现了完整的“非阻塞IO”模型:让TCP连接和数据读取这两个过程,都变成了“非阻塞”方式了。

然并卵,这种处理方式实际上并没有解决accept方法、read方法阻塞的根本问题。根据上文的叙述,accept方法、read方法阻塞的根本问题是底层接受数据报文时的“同步IO”工作方式。这两次改进过程,只是解决了IO操作的两步中的第一步:将程序里面的阻塞方式变成了非阻塞方式。

3-3、利用线程再改进

另一个方面,由于应用程序级别,我们并没有使用多线程技术,这就导致了应用程序只能一个socket套接字一个socket套接字的处理。这个socket套接字没有处理完,就没有处理下一个socket套接字。针对这个问题我们还是可以进行改进的:让应用程序面上,各个socket套接字的处理不相互影响:

package testBSocket;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;

/**
 * 通过加入线程的概念,让socket server能够在应用层面,
 * 通过非阻塞的方式同时处理多个socket套接字
 * @author yinwenjie
 */
public class SocketServer4 {

    static {
        BasicConfigurator.configure();
    }

    private static Object xWait = new Object();

    private static final Log LOGGER = LogFactory.getLog(SocketServer4.class);

    public static void main(String[] args) throws Exception{
        ServerSocket serverSocket = new ServerSocket(83);
        serverSocket.setSoTimeout(100);
        try {
            while(true) {
                Socket socket = null;
                try {
                    socket = serverSocket.accept();
                } catch(SocketTimeoutException e1) {
                    //===========================================================
                    //      执行到这里,说明本次accept没有接收到任何TCP连接
                    //      主线程在这里就可以做一些事情,记为X
                    //===========================================================
                    synchronized (SocketServer4.xWait) {
                        SocketServer4.LOGGER.info("这次没有从底层接收到任何TCP连接,等待10毫秒,模拟事件X的处理时间");
                        SocketServer4.xWait.wait(10);
                    }
                    continue;
                }
                //当然业务处理过程可以交给一个线程(这里可以使用线程池),并且线程的创建是很耗资源的。
                //最终改变不了.accept()只能一个一个接受socket连接的情况
                SocketServerThread socketServerThread = new SocketServerThread(socket);
                new Thread(socketServerThread).start();
            }
        } catch(Exception e) {
            SocketServer4.LOGGER.error(e.getMessage(), e);
        } finally {
            if(serverSocket != null) {
                serverSocket.close();
            }
        }
    }
}

/**
 * 当然,接收到客户端的socket后,业务的处理过程可以交给一个线程来做。
 * 但还是改变不了socket被一个一个的做accept()的情况。
 * @author yinwenjie
 */
class SocketServerThread implements Runnable {

    /**
     * 日志
     */
    private static final Log LOGGER = LogFactory.getLog(SocketServerThread.class);

    private Socket socket;

    public SocketServerThread (Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        InputStream in = null;
        OutputStream out = null;
        try {
            in = socket.getInputStream();
            out = socket.getOutputStream();
            Integer sourcePort = socket.getPort();
            int maxLen = 2048;
            byte[] contextBytes = new byte[maxLen];
            int realLen;
            StringBuffer message = new StringBuffer();
            //下面我们收取信息(设置成非阻塞方式,这样read信息的时候,又可以做一些其他事情)
            this.socket.setSoTimeout(10);
            BIORead:while(true) {
                try {
                    while((realLen = in.read(contextBytes, 0, maxLen)) != -1) {
                        message.append(new String(contextBytes , 0 , realLen));
                        /*
                         * 我们假设读取到“over”关键字,
                         * 表示客户端的所有信息在经过若干次传送后,完成
                         * */
                        if(message.indexOf("over") != -1) {
                            break BIORead;
                        }
                    }
                } catch(SocketTimeoutException e2) {
                    //===========================================================
                    //      执行到这里,说明本次read没有接收到任何数据流
                    //      主线程在这里又可以做一些事情,记为Y
                    //===========================================================
                    SocketServerThread.LOGGER.info("这次没有从底层接收到任务数据报文,等待10毫秒,模拟事件Y的处理时间");
                    continue;
                }
            }
            //下面打印信息
            Long threadId = Thread.currentThread().getId();
            SocketServerThread.LOGGER.info("服务器(线程:" + threadId + ")收到来自于端口:" + sourcePort + "的信息:" + message);

            //下面开始发送信息
            out.write("回发响应信息!".getBytes());

            //关闭
            out.close();
            in.close();
            this.socket.close();
        } catch(Exception e) {
            SocketServerThread.LOGGER.error(e.getMessage(), e);
        }
    }
}

3-4、依然存在的问题

引入了多线程技术后,IO的处理吞吐量大大提高了,但是这样做就真的没有问题了吗,您要知道操作系统可是有“最大线程”限制的:

  • 虽然在服务器端,请求的处理交给了一个独立线程进行,但是操作系统通知accept()的方式还是单个处理的(甚至都不是非阻塞模式)。也就是说,实际上是服务器接收到数据报文后的“业务处理过程”可以多线程(包括可以是非阻塞模式),但是数据报文的接受还是需要一个一个的来。
  • 在linux系统中,可以创建的线程是有限的。我们可以通过cat /proc/sys/kernel/threads-max命令查看可以创建的最大线程数。当然这个值是可以更改的,但是线程越多,CPU切换所需的时间也越长,用来处理真正业务的需求也就越少。
  • 创建一个线程是有较大的资源消耗的。JVM创建一个线程的时候,即使这个线程不做任何工作,JVM都会分配一个堆栈空间。这个空间的大小默认为128K,您可以通过-Xss参数进行调整。
  • 当然您还可以使用ThreadPoolExecutor线程池来缓解线程的创建问题,但是又会造成BlockingQueue积压任务的持续增加,同样消耗了大量资源。另外,如果您的应用程序大量使用长连接的话,线程是不会关闭的。这样系统资源的消耗更容易失控。
  • 最后,无论您是使用的多线程、还是加入了非阻塞模式,这都是在应用程序层面的处理,而底层socketServer所匹配的操作系统的IO模型始终是“同步IO”,最根本的问题并没有解决。
  • 那么,如果你真想单纯使用线程来解决问题,那么您自己都可以计算出来您一个服务器节点可以一次接受多大的并发了。看来,单纯使用线程解决这个问题不是最好的方法。

4、多路复用IO(IO Multiplex)

我们将详细讲解操作系统支持的多路复用IO的工作方式,并介绍JAVA1.4版本中加入的JAVA NIO对多路复用IO的实现。(东西太多,我们放下下篇中)

5、异步IO(真正的NIO)

我们将细讲解操作系统支持的异步IO方式,并介绍JAVA1.7版本中加入的NIO2.0(AIO)对异步IO的实现。(东西太多,我们放下下篇中)

摘自:https://blog.csdn.net/yinwenjie

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容