源码分析: PipedInputStream和PipedOutputStream

场景

假设我们需要上传一组动态增加的数据, 输入端可以看作inputSteam, 输入端是outputSteam, 但是输入和输出端不能直接对接, 那么我们要怎样实现呢?

我希望的解决方案时, 输入和输出通过一个"数据池"间接连接, 输入端把数据写到数据池中, 输出端从数据池中读数据, 这里要求数据池有"阻塞"功能, 即数据池满了阻塞输入端, 数据池空了, 阻塞输出端.

以上效果可以使用PipedInputStreamPipedOutputStream实现.

前言

  1. 这两个类需要配套使用, 可以实现管道(pipe)传输数据
  2. 默认的使用方式是, 通过管道连接线程A和B, 在A线程使用PipedOutputStream写数据, 数据缓存到"管道"后, B线程使用PipedInputStream读取数据, 以此完成数据传输, 如果在同一个线程使用这两个类, 可能导致死锁

PipedOutputStream

PipedOutputStream是管道的发送端. 写线程通过它来往"管道"填充数据.

我们先看看它有哪几个方法, 从命名和注释基本就能知道每个方法的作用

// 关联PipedInputStream
public void connect(PipedInputStream snk)

// 写一个数据
public void write(int b)

// 写一段数据
public void write(byte b[], int off, int len)

// 通知读线程, 管道中有数据等待读取
public void flush()

// 关闭发送端, 不再发送数据
public void close()

以上注释已经大致说明了这个类的功能了, 接着我们逐个方法分析

connect

public synchronized void connect(PipedInputStream snk) throws IOException {
    // 先确保
    // 1. 连接对象(输入的snk)不能为空
    // 2. 不能重复连接
    sink = snk;
    snk.in = -1;
    snk.out = 0;
    snk.connected = true;
}

从上可以看出, connect方法就是修改连接的PipedInputStream的成员变量, 使其处于已连接状态.

write

public void write(int b)  throws IOException {
    // 确保sink不为空, 即确保已经连接
    sink.receive(b);
}

public void write(byte b[], int off, int len) throws IOException {
    // 先确保
    // 1. 已经连接
    // 2. 输出数组b不为空
    // 3. off和len不会导致数组越界
    if (sink == null) {
        // ...
    } else if (len == 0) {
        // 如果len == 0, 表示不读取数据, 所以可以直接返回
        return;
    }
        
    sink.receive(b, off, len);
}

从上可以看出, 两个write方法, 最后都调用了响应的PipedInputStream#receive方法, 这表明

数据存储的地方写数据的具体逻辑都在PipedInputStream

后面我们再详细分析.

flush

public synchronized void flush() throws IOException {
    if (sink != null) {
        synchronized (sink) {
            sink.notifyAll();
        }
    }
}

这个方法先尝试获取sink的锁, 然后通过notifyAll()来调度线程, 在这里, 具体就是使读线程开始读取数据, 这里涉及读写线程间的沟通调度问题, 在了解完PipedInputStream之后我们再重新看这个问题.

close

public void close() throws IOException {
    if (sink != null) {
        sink.receivedLast();
    }
}

这个方法就是简单的调用了PipedInputStream#receivedLast()方法, 从方法名可以判断出, 这个方法就是通知PipedInputStream, 数据已经填充完毕.

总结

从上面的分析可以看出, PipedOutputStream基本就是一个"接口"类, 不会对数据进行实际的操作, 也不承担具体的职责, 只负责把数据交给PipedInputStream处理.

下面我们接着分析最关键的PipedInputStream的源码

PipedInputStream

成员变量

我们先看下关键的几个变量

// 缓存数组, "管道"数据的存储的地方
protected byte buffer[];

// 写下一个数据时, 保存到缓存数组的位置
// 小于0表示无可读数据, 缓存数组为空
// in == out时表示缓存数组已满
protected int in = -1;

// 下一个被读数据在缓存数组的位置
protected int out = 0;

看上面3个成员变量我们基本可以知道

"管道"内部使用了数组来缓存写入的数据, 等待读取. 通过inout两个值来记录数组的写位置和读位置

其余变量都是一些状态标识

// 写数据端(输入端)是否已经关闭
boolean closedByWriter = false;
// 读数据端(输出端)是否已经关闭
volatile boolean closedByReader = false;
// 是否处于已连接状态
boolean connected = false;
// 记录读线程
Thread readSide;
// 记录写线程
Thread writeSide;

这些变量都是用于判断当前"管道"的状态

其中readSidewriteSide是一种简单的标记读写线程的方式, 源码注释中也有说明这种方式并不可靠, 这种方式针对的应该是两条线程的情况, 所以我们使用的时候应该尽量按照设计意图来使用

在两条线程中建立"管道"传递数据, 写线程写数据, 读线程读数据.

构造函数

它包含了好几个构造函数, 我们只看参数最多的那个

public PipedInputStream(PipedOutputStream src, int pipeSize) throws IOException {
    initPipe(pipeSize);
    connect(src);
}

最终都会要求我们调用上面的两个方法, 都比较简单就不贴代码了

  1. initPipe()里面对byte数组buffer变量进行赋值, 也就是初始化缓冲区域
  2. connect()方法直接调用了PipedOutputStream#connect, 上面已经分析过了, 最终效果就是指明PipedOutputStream的连接对象, 改变connected变量的值, 使得PipedInputStream处于连接状态.

receive

通过上面PipedOutputStream的分析可以知道, 写数据的方法会调用PipedInputStreamreveive方法, 所以我们首先分析这个方法, 了解写数据的逻辑. 注意阅读注释!

// 写单个数据
protected synchronized void receive(int b) throws IOException {
    // 检查当前"管道"状态, 确保能够读写数据
    checkStateForReceive();
    // 本方法由PipedOutputStream调用, 所以线程是写线程, 记录该线程
    writeSide = Thread.currentThread();
    if (in == out)
        // in == out表示缓存数组已经满了, 阻塞线程等待
        // 这里确保了未读的缓存数据不会丢失
        awaitSpace();
    // 当检测到缓存数组有空间, 等待结束后, 会继续执行以下代码
    if (in < 0) {
        // in小于0表示当前无数据, 设置读, 写位置都是0
        in = 0;
        out = 0;
    }
    // 写操作
    // 1. 把数据写到目标位置(in)
    // 2. 后移in, 指明下一个写数据的位置
    buffer[in++] = (byte)(b & 0xFF);
    // 如果in超出缓存长度, 回到0, 循环利用缓存数组
    if (in >= buffer.length) {
        in = 0;
    }
}

注意该方法带有synchronized关键字, 表明在该方法内, 会持有对象锁, 我们留到最后再分析各个环节中, 对象锁的归属问题.

在写数据前会先通过checkStateForReceive检查"管道"状态, 确保

  1. 当前处于连接状态
  2. 管道读写两端都没有被关闭
  3. 读线程状态正常

接着用writeSide记录当前线程为写线程, 用来后续判断线程状态;

然后判断目标位置(in), 如果in == out表明当前缓存数组已经满了, 不能再写数据了, 所以会通过awaitSpace()方法阻塞写线程;

// 此时写线程持有锁
private void awaitSpace() throws IOException {
    // 只有缓存数组已满才需要等待
    while (in == out) {
        // 检查管道状态, 防止在等待的过程中状态发生变化 
        checkStateForReceive();
        // 标准用法中仅涉及两条线程, 所以这里可以认为是通知读线程读数据
        notifyAll();
        try {
            // 释放对象锁, 等待读线程读数据, 调用后就会阻塞写线程
            // 1s后取消等待是为了再次检查管道状态
            // 注意等待结束后, 锁仍然在写线程
            wait(1000);
        } catch (InterruptedException ex) {
            // 直接抛出异常
            IoUtils.throwInterruptedIoException();
        }
    }
}

以上基本可以概括为

缓存数组有空间时直接写数据, 无空间时阻塞写线程, 直至有空间可以写数据

接着分析写一段数据的receive(byte[], int, int)方法, 注意阅读注释!

synchronized void receive(byte b[], int off, int len)  throws IOException {
    checkStateForReceive();
    writeSide = Thread.currentThread();
    // len是需要写进缓存数据的总长度
    // bytesToTransfer用来记录剩余个数
    int bytesToTransfer = len;
    // 循环写数据过程, 直至需要写的数据全部处理完毕
    while (bytesToTransfer > 0) {
        if (in == out)
            // in == out表示缓存区域已经满了, 阻塞线程等待
            awaitSpace();
        // nextTransferAmount用来记录本次过程写进缓存的个数
        int nextTransferAmount = 0;
        if (out < in) {
            // 因为out必然大于等于0, 所以这里 0 <= out < int
            // out < in 表示[in, buffer.length)和[0, out)两个区间可以写数据
            // 先写数据进[in, buffer.length)区间, 避免处理头尾连接的逻辑, 如果还有数据剩余, 留到下一个循环处理
            nextTransferAmount = buffer.length - in;
        } else if (in < out) {
            // 注意in有可能为-1, 所以特殊判断下
            if (in == -1) {
                // in == -1表示缓存数组为空, 整个数组都可以写数据
                // 从这里可知, 单次写数据最大长度就是缓存数组的长度
                in = out = 0;
                nextTransferAmount = buffer.length - in;
            } else {
                // in < out 表示[in, out)区间可以写数据
                nextTransferAmount = out - in;
            }
        }
        // 到这里nextTransferAmount表示本次过程**最多**可以写的数据
        if (nextTransferAmount > bytesToTransfer)
            // 位置比需要的多, 所以修改nextTransferAmount
            nextTransferAmount = bytesToTransfer;
        // 经过上面的判断, nextTransferAmount表示本次过程可以写进缓存的个数
        assert(nextTransferAmount > 0);
        // 把数据写进缓存
        System.arraycopy(b, off, buffer, in, nextTransferAmount);
        // 计算剩余个数
        bytesToTransfer -= nextTransferAmount;
        // 移动数据起点
        off += nextTransferAmount;
        // 后移in
        in += nextTransferAmount;
        // 如果in超出缓存长度, 回到0
        if (in >= buffer.length) {
            in = 0;
        }
    }
}

代码逻辑注释已经说明得很清楚了, 当你需要处理头尾相连的数组时, 可以学习上面循环处理数据的方法, 逻辑清晰, 不需要太多的边界判断.

receiveLast

当输入端关闭时(调用PipedOutputStream#close()), 会调用receivedLast()

synchronized void receivedLast() {
    // 标记输入端关闭
    closedByWriter = true;
    // 通知读线程读数据
    notifyAll();
}

该方法使用变量标记输入端已经关闭, 表示不会有新数据写入了.

read

分析完写数据, 接下来该分析读数据了.

public synchronized int read()  throws IOException {
    // synchronized关键字, 读线程需要持有锁才能读数据
    
    // 先检查管道状态
    if (!connected) {
        throw new IOException("Pipe not connected");
    } else if (closedByReader) {
        throw new IOException("Pipe closed");
    } else if (writeSide != null && !writeSide.isAlive()
        && !closedByWriter && (in < 0)) {
        // 只要in >= 0, 表示还有数据没有读, 所以不抛出异常
        // 这个判断表明了, 即使输入端已经调用了close, 也能继续读已经写入的数据
        throw new IOException("Write end dead");
    }
    
    // 记录读线程
    readSide = Thread.currentThread();
    int trials = 2;
    while (in < 0) {
        // in<0表示缓存区域为空, 只要输入端没有被关闭, 阻塞线程等待数据写入, 即等待in >= 0
        if (closedByWriter) {
            // 输入端关闭了, 同时in < 0, 表示数据传输完毕了, 返回-1 
            return -1;
        }
        // 检查写线程的状态, 线程状态异常则认为管道异常, 检查2次
        if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
            throw new IOException("Pipe broken");
        }
        // 这里可以认为是通知写线程写数据
        notifyAll();
        try {
            // 阻塞线程, 等待1s, 这里会释放锁, 给机会写线程获取锁, 写数据
            wait(1000);
        } catch (InterruptedException ex) {
            IoUtils.throwInterruptedIoException();
        }
    }

    // 执行到这里证明in >= 0, 即缓存数组中有数据
    // 关键的读操作
    // 1. 读取out指向的byte数据
    // 2. 后移out
    // 3. 把byte转成int, 高位补0
    int ret = buffer[out++] & 0xFF;
    // out超出长度则回到位置0
    if (out >= buffer.length) {
        out = 0;
    }
    if (in == out) {
        // 读取的数据追上了输入的数据, 则当前缓存区域为空, 所以设置in = -1
        in = -1;
    }

    return ret;
}

从上面的注释分析可以知道

  1. 即使调用了PipedOutputStream#close(), 只要管道中还有数据, 仍可以读数据, 所以实际使用时, 输入端输入完毕后可以直接close输入端.
  2. 当管道中没有数据时, 会阻塞读线程, 直至管道被关闭, 线程异常或者数据被写入到管道中.

接着看看读取一段数据的方法

public synchronized int read(byte b[], int off, int len)  throws IOException {
    // 参数byte[](下面称输出数组)是数据读取后存放的地方, 所以要先检查该数组
    if (b == null) {
        // 确保输出数组不为null, 否则读出的数据不能写入
        throw new NullPointerException();
    } else if (off < 0 || len < 0 || len > b.length - off) {
        // 确保下标不会越界
        throw new IndexOutOfBoundsException();
    } else if (len == 0) {
        // len参数表示需要读取的长度, 等于0时相当于不读数据, 所以直接返回
        return 0;
    }

    // 先单独读一个数据是为了确保已经有数据写入, 因为如果当前无数据, 则会阻塞当前的读线程
    int c = read();
    // 返回值小于0(实际上只能是-1), 表示管道已经没有数据了, 所以这里也直接返回-1
    if (c < 0) {
        return -1;
    }
    // 把读取到的第一个数据放到输出数组, 看后面的代码时紧记这里已经读了1个数据
    b[off] = (byte) c;
    // 记录读取到的数据长度
    int rlen = 1;
    // 循环条件:
    // in >= 0确保还有数据可以读
    // len > 1确保只读取外部请求的数据长度, 因为上面已经读了1个数据, 所以是大于1, 而不是大于0
    while ((in >= 0) && (len > 1)) {

        // available用来记录当前可以读取的数据
        int available;

        if (in > out) {
            // in > out表示[out, in)区间数据可读
            // in的值正常情况下是不会大于buffer.length的, 因为当 in == buffer.length时, in就会赋值0
            // 这里的Math.min显得有点多余, 可能是为了以防万一吧
            available = Math.min((buffer.length - out), (in - out));
        } else {
            // 首先in是不会等于out的, 因为如果相等, 在上面读第一个数据的时候就会把in赋值-1, 也就不会进入这个循环
            // 当in < out表示[out, buffer.length)和[0, in)两个区间的数据可读
            // 和receive方法类似, 为了不处理跨边界的情况, 先读[out, buffer.length)区间数据
            available = buffer.length - out;
        }

        // 外部已经读了一个数据, 所以只需要读(len - 1)个数据了
        if (available > (len - 1)) {
            available = len - 1;
        }
        // 经过上面的判断, available表示本次需要读的数据长度
        // 复制数据到输出数组
        System.arraycopy(buffer, out, b, off + rlen, available);
        // 后移out变量
        out += available;
        // 记录已经读到的数据量
        rlen += available;
        // 计算剩余需要读的数据
        len -= available;

        // 如果已经读到缓存数组的尾部, 回到开头
        if (out >= buffer.length) {
            out = 0;
        }
        if (in == out) {
            // in == out表示已经没有数据可以读了, 所以in赋值-1
            in = -1;
        }
    }
    return rlen;
}

上面的方法我们需要注意:

while方法体内是不会阻塞读线程的! while方法体内是不会阻塞读线程的! while方法体内是不会阻塞读线程的! 重要的事情说3遍~ 所以如果管道内只有1个数据, 那么读取到输出数组的就只有这1个数据, read方法返回值会是1, 在读取数据后处理输出数组时需要特别注意这点.

available

我们在读数据前可以利用available()先看看管道中的数据个数.

public synchronized int available() throws IOException {
    if(in < 0)
        // 管道中无数据
        return 0;
    else if(in == out)
        // 缓存数组已满
        return buffer.length;
    else if (in > out)
        // [out, in)区间内为有效数据
        return in - out;
    else
        // in < out
        // [in, out)区间为无效数据, 其余为有效数据, 所以长度为 buffer.length - (out - in)
        return in + buffer.length - out;
}

到这里我们已经把所有PipedOutputStreamPipedInputStream的所有方法分析完毕了~ 接着我们再分析下读写过程中对象锁的归属问题.

分析这部分我们先要了解下waitnotifyAll的作用, 可以参考知乎上这个回答java中的notify和notifyAll有什么区别? - 文龙的回答 - 知乎, 本文不再说明了, 重点理解锁池等待池概念

  • 锁池:假设线程A已经拥有了某个对象(注意:不是类)的锁,而其它的线程想要调用这个对象的某个synchronized方法(或者synchronized块),由于这些线程在进入对象的synchronized方法之前必须先获得该对象的锁的拥有权,但是该对象的锁目前正被线程A拥有,所以这些线程就进入了该对象的锁池中。

  • 等待池:假设一个线程A调用了某个对象的wait()方法,线程A就会释放该对象的锁后,进入到了该对象的等待池中

首先, 需要注意, PipedOutputStream中, 两个write方法都没有synchronized关键字, 所以我们不需要关心PipedOutputStream的对象锁.

我们重点分析PipedInputStream里面, readreceive方法.

假设我们先调用receive写数据, 后调用read读数据

当我们写数据时, 进入了receive方法, 因为synchronized关键字, 此时写线程会获取到了对象锁, 然后写数据到管道中, 注意, 在这个过程中, 读线程是不能通过read方法读取数据的, 因为读线程获取不了对象锁, 如果这次写操作中, 管道中的缓存数组满了, 此时写线程会进入awaitSpace()方法, 在该方法内, 写线程先调用了notifyAll方法, 使读线程进入锁池准备竞争对象锁, 然后调用wait(1000)方法, 在这1s内, 写线程释放了对象锁, 然后进入等待池.

写线程释放对象锁后, 读线程就能够获取对象锁, 进入read方法内了, 然后读数据, 只要管道中存在至少一个数据, 就不会阻塞线程, 读取数据后直接退出方法, 释放对象锁, 如果这次读操作中, 管道中的缓存数组没有任何数据, 此时读线程就会调用notifyAll方法, 使写线程从等待池移到锁池, 准备竞争对象锁, 然后再调用wait(1000)方法, 在这1s内, 读线程释放对象锁, 自己进入等待池.

以上就是一次读写中, 对象锁的转移过程, 但是在实际过程中, 我们都是两个线程在各自的循环体内一直读数据和一直写数据的, 所以每一次循环的时候都会竞争锁, 可能先读后写, 或者先写后读.

总结

分析这两个类的源码我们应该可以学习到

  1. InputSteamOutputSteam的接口含义
  2. 使用数组缓存数据的方法, 使用while循环避免处理边界问题
  3. waitnotifyAll协调读写线程的逻辑
  4. 使用这两个类实现传输数据流
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,723评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,080评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,604评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,440评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,431评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,499评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,893评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,541评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,751评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,547评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,619评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,320评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,890评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,896评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,137评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,796评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,335评论 2 342

推荐阅读更多精彩内容