场景
假设我们需要上传一组动态增加的数据, 输入端可以看作inputSteam, 输入端是outputSteam, 但是输入和输出端不能直接对接, 那么我们要怎样实现呢?
我希望的解决方案时, 输入和输出通过一个"数据池"间接连接, 输入端把数据写到数据池中, 输出端从数据池中读数据, 这里要求数据池有"阻塞"功能, 即数据池满了阻塞输入端, 数据池空了, 阻塞输出端.
以上效果可以使用PipedInputStream
和PipedOutputStream
实现.
前言
- 这两个类需要配套使用, 可以实现管道(pipe)传输数据
- 默认的使用方式是, 通过管道连接线程A和B, 在A线程使用
PipedOutputStream
写数据, 数据缓存到"管道"后, B线程使用PipedInputStream
读取数据, 以此完成数据传输, 如果在同一个线程使用这两个类, 可能导致死锁
PipedOutputStream
PipedOutputStream
是管道的发送端. 写线程通过它来往"管道"填充数据.
我们先看看它有哪几个方法, 从命名和注释基本就能知道每个方法的作用
// 关联PipedInputStream
public void connect(PipedInputStream snk)
// 写一个数据
public void write(int b)
// 写一段数据
public void write(byte b[], int off, int len)
// 通知读线程, 管道中有数据等待读取
public void flush()
// 关闭发送端, 不再发送数据
public void close()
以上注释已经大致说明了这个类的功能了, 接着我们逐个方法分析
connect
public synchronized void connect(PipedInputStream snk) throws IOException {
// 先确保
// 1. 连接对象(输入的snk)不能为空
// 2. 不能重复连接
sink = snk;
snk.in = -1;
snk.out = 0;
snk.connected = true;
}
从上可以看出, connect
方法就是修改连接的PipedInputStream
的成员变量, 使其处于已连接状态.
write
public void write(int b) throws IOException {
// 确保sink不为空, 即确保已经连接
sink.receive(b);
}
public void write(byte b[], int off, int len) throws IOException {
// 先确保
// 1. 已经连接
// 2. 输出数组b不为空
// 3. off和len不会导致数组越界
if (sink == null) {
// ...
} else if (len == 0) {
// 如果len == 0, 表示不读取数据, 所以可以直接返回
return;
}
sink.receive(b, off, len);
}
从上可以看出, 两个write
方法, 最后都调用了响应的PipedInputStream#receive
方法, 这表明
数据存储的地方和写数据的具体逻辑都在
PipedInputStream
中
后面我们再详细分析.
flush
public synchronized void flush() throws IOException {
if (sink != null) {
synchronized (sink) {
sink.notifyAll();
}
}
}
这个方法先尝试获取sink的锁, 然后通过notifyAll()
来调度线程, 在这里, 具体就是使读线程开始读取数据, 这里涉及读写线程间的沟通调度问题, 在了解完PipedInputStream
之后我们再重新看这个问题.
close
public void close() throws IOException {
if (sink != null) {
sink.receivedLast();
}
}
这个方法就是简单的调用了PipedInputStream#receivedLast()
方法, 从方法名可以判断出, 这个方法就是通知PipedInputStream
, 数据已经填充完毕.
总结
从上面的分析可以看出, PipedOutputStream
基本就是一个"接口"类, 不会对数据进行实际的操作, 也不承担具体的职责, 只负责把数据交给PipedInputStream
处理.
下面我们接着分析最关键的PipedInputStream
的源码
PipedInputStream
成员变量
我们先看下关键的几个变量
// 缓存数组, "管道"数据的存储的地方
protected byte buffer[];
// 写下一个数据时, 保存到缓存数组的位置
// 小于0表示无可读数据, 缓存数组为空
// in == out时表示缓存数组已满
protected int in = -1;
// 下一个被读数据在缓存数组的位置
protected int out = 0;
看上面3个成员变量我们基本可以知道
"管道"内部使用了数组来缓存写入的数据, 等待读取. 通过
in
和out
两个值来记录数组的写位置和读位置
其余变量都是一些状态标识
// 写数据端(输入端)是否已经关闭
boolean closedByWriter = false;
// 读数据端(输出端)是否已经关闭
volatile boolean closedByReader = false;
// 是否处于已连接状态
boolean connected = false;
// 记录读线程
Thread readSide;
// 记录写线程
Thread writeSide;
这些变量都是用于判断当前"管道"的状态
其中readSide
和writeSide
是一种简单的标记读写线程的方式, 源码注释中也有说明这种方式并不可靠, 这种方式针对的应该是两条线程的情况, 所以我们使用的时候应该尽量按照设计意图来使用
在两条线程中建立"管道"传递数据, 写线程写数据, 读线程读数据.
构造函数
它包含了好几个构造函数, 我们只看参数最多的那个
public PipedInputStream(PipedOutputStream src, int pipeSize) throws IOException {
initPipe(pipeSize);
connect(src);
}
最终都会要求我们调用上面的两个方法, 都比较简单就不贴代码了
-
initPipe()
里面对byte数组buffer
变量进行赋值, 也就是初始化缓冲区域 -
connect()
方法直接调用了PipedOutputStream#connect
, 上面已经分析过了, 最终效果就是指明PipedOutputStream
的连接对象, 改变connected
变量的值, 使得PipedInputStream
处于连接状态.
receive
通过上面PipedOutputStream
的分析可以知道, 写数据的方法会调用PipedInputStream
的reveive
方法, 所以我们首先分析这个方法, 了解写数据的逻辑. 注意阅读注释!
// 写单个数据
protected synchronized void receive(int b) throws IOException {
// 检查当前"管道"状态, 确保能够读写数据
checkStateForReceive();
// 本方法由PipedOutputStream调用, 所以线程是写线程, 记录该线程
writeSide = Thread.currentThread();
if (in == out)
// in == out表示缓存数组已经满了, 阻塞线程等待
// 这里确保了未读的缓存数据不会丢失
awaitSpace();
// 当检测到缓存数组有空间, 等待结束后, 会继续执行以下代码
if (in < 0) {
// in小于0表示当前无数据, 设置读, 写位置都是0
in = 0;
out = 0;
}
// 写操作
// 1. 把数据写到目标位置(in)
// 2. 后移in, 指明下一个写数据的位置
buffer[in++] = (byte)(b & 0xFF);
// 如果in超出缓存长度, 回到0, 循环利用缓存数组
if (in >= buffer.length) {
in = 0;
}
}
注意该方法带有synchronized
关键字, 表明在该方法内, 会持有对象锁, 我们留到最后再分析各个环节中, 对象锁的归属问题.
在写数据前会先通过checkStateForReceive
检查"管道"状态, 确保
- 当前处于连接状态
- 管道读写两端都没有被关闭
- 读线程状态正常
接着用writeSide
记录当前线程为写线程, 用来后续判断线程状态;
然后判断目标位置(in
), 如果in == out
表明当前缓存数组已经满了, 不能再写数据了, 所以会通过awaitSpace()
方法阻塞写线程;
// 此时写线程持有锁
private void awaitSpace() throws IOException {
// 只有缓存数组已满才需要等待
while (in == out) {
// 检查管道状态, 防止在等待的过程中状态发生变化
checkStateForReceive();
// 标准用法中仅涉及两条线程, 所以这里可以认为是通知读线程读数据
notifyAll();
try {
// 释放对象锁, 等待读线程读数据, 调用后就会阻塞写线程
// 1s后取消等待是为了再次检查管道状态
// 注意等待结束后, 锁仍然在写线程
wait(1000);
} catch (InterruptedException ex) {
// 直接抛出异常
IoUtils.throwInterruptedIoException();
}
}
}
以上基本可以概括为
缓存数组有空间时直接写数据, 无空间时阻塞写线程, 直至有空间可以写数据
接着分析写一段数据的receive(byte[], int, int)
方法, 注意阅读注释!
synchronized void receive(byte b[], int off, int len) throws IOException {
checkStateForReceive();
writeSide = Thread.currentThread();
// len是需要写进缓存数据的总长度
// bytesToTransfer用来记录剩余个数
int bytesToTransfer = len;
// 循环写数据过程, 直至需要写的数据全部处理完毕
while (bytesToTransfer > 0) {
if (in == out)
// in == out表示缓存区域已经满了, 阻塞线程等待
awaitSpace();
// nextTransferAmount用来记录本次过程写进缓存的个数
int nextTransferAmount = 0;
if (out < in) {
// 因为out必然大于等于0, 所以这里 0 <= out < int
// out < in 表示[in, buffer.length)和[0, out)两个区间可以写数据
// 先写数据进[in, buffer.length)区间, 避免处理头尾连接的逻辑, 如果还有数据剩余, 留到下一个循环处理
nextTransferAmount = buffer.length - in;
} else if (in < out) {
// 注意in有可能为-1, 所以特殊判断下
if (in == -1) {
// in == -1表示缓存数组为空, 整个数组都可以写数据
// 从这里可知, 单次写数据最大长度就是缓存数组的长度
in = out = 0;
nextTransferAmount = buffer.length - in;
} else {
// in < out 表示[in, out)区间可以写数据
nextTransferAmount = out - in;
}
}
// 到这里nextTransferAmount表示本次过程**最多**可以写的数据
if (nextTransferAmount > bytesToTransfer)
// 位置比需要的多, 所以修改nextTransferAmount
nextTransferAmount = bytesToTransfer;
// 经过上面的判断, nextTransferAmount表示本次过程可以写进缓存的个数
assert(nextTransferAmount > 0);
// 把数据写进缓存
System.arraycopy(b, off, buffer, in, nextTransferAmount);
// 计算剩余个数
bytesToTransfer -= nextTransferAmount;
// 移动数据起点
off += nextTransferAmount;
// 后移in
in += nextTransferAmount;
// 如果in超出缓存长度, 回到0
if (in >= buffer.length) {
in = 0;
}
}
}
代码逻辑注释已经说明得很清楚了, 当你需要处理头尾相连的数组时, 可以学习上面循环处理数据的方法, 逻辑清晰, 不需要太多的边界判断.
receiveLast
当输入端关闭时(调用PipedOutputStream#close()
), 会调用receivedLast()
synchronized void receivedLast() {
// 标记输入端关闭
closedByWriter = true;
// 通知读线程读数据
notifyAll();
}
该方法使用变量标记输入端已经关闭, 表示不会有新数据写入了.
read
分析完写数据, 接下来该分析读数据了.
public synchronized int read() throws IOException {
// synchronized关键字, 读线程需要持有锁才能读数据
// 先检查管道状态
if (!connected) {
throw new IOException("Pipe not connected");
} else if (closedByReader) {
throw new IOException("Pipe closed");
} else if (writeSide != null && !writeSide.isAlive()
&& !closedByWriter && (in < 0)) {
// 只要in >= 0, 表示还有数据没有读, 所以不抛出异常
// 这个判断表明了, 即使输入端已经调用了close, 也能继续读已经写入的数据
throw new IOException("Write end dead");
}
// 记录读线程
readSide = Thread.currentThread();
int trials = 2;
while (in < 0) {
// in<0表示缓存区域为空, 只要输入端没有被关闭, 阻塞线程等待数据写入, 即等待in >= 0
if (closedByWriter) {
// 输入端关闭了, 同时in < 0, 表示数据传输完毕了, 返回-1
return -1;
}
// 检查写线程的状态, 线程状态异常则认为管道异常, 检查2次
if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
throw new IOException("Pipe broken");
}
// 这里可以认为是通知写线程写数据
notifyAll();
try {
// 阻塞线程, 等待1s, 这里会释放锁, 给机会写线程获取锁, 写数据
wait(1000);
} catch (InterruptedException ex) {
IoUtils.throwInterruptedIoException();
}
}
// 执行到这里证明in >= 0, 即缓存数组中有数据
// 关键的读操作
// 1. 读取out指向的byte数据
// 2. 后移out
// 3. 把byte转成int, 高位补0
int ret = buffer[out++] & 0xFF;
// out超出长度则回到位置0
if (out >= buffer.length) {
out = 0;
}
if (in == out) {
// 读取的数据追上了输入的数据, 则当前缓存区域为空, 所以设置in = -1
in = -1;
}
return ret;
}
从上面的注释分析可以知道
- 即使调用了
PipedOutputStream#close()
, 只要管道中还有数据, 仍可以读数据, 所以实际使用时, 输入端输入完毕后可以直接close
输入端. - 当管道中没有数据时, 会阻塞读线程, 直至管道被关闭, 线程异常或者数据被写入到管道中.
接着看看读取一段数据的方法
public synchronized int read(byte b[], int off, int len) throws IOException {
// 参数byte[](下面称输出数组)是数据读取后存放的地方, 所以要先检查该数组
if (b == null) {
// 确保输出数组不为null, 否则读出的数据不能写入
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
// 确保下标不会越界
throw new IndexOutOfBoundsException();
} else if (len == 0) {
// len参数表示需要读取的长度, 等于0时相当于不读数据, 所以直接返回
return 0;
}
// 先单独读一个数据是为了确保已经有数据写入, 因为如果当前无数据, 则会阻塞当前的读线程
int c = read();
// 返回值小于0(实际上只能是-1), 表示管道已经没有数据了, 所以这里也直接返回-1
if (c < 0) {
return -1;
}
// 把读取到的第一个数据放到输出数组, 看后面的代码时紧记这里已经读了1个数据
b[off] = (byte) c;
// 记录读取到的数据长度
int rlen = 1;
// 循环条件:
// in >= 0确保还有数据可以读
// len > 1确保只读取外部请求的数据长度, 因为上面已经读了1个数据, 所以是大于1, 而不是大于0
while ((in >= 0) && (len > 1)) {
// available用来记录当前可以读取的数据
int available;
if (in > out) {
// in > out表示[out, in)区间数据可读
// in的值正常情况下是不会大于buffer.length的, 因为当 in == buffer.length时, in就会赋值0
// 这里的Math.min显得有点多余, 可能是为了以防万一吧
available = Math.min((buffer.length - out), (in - out));
} else {
// 首先in是不会等于out的, 因为如果相等, 在上面读第一个数据的时候就会把in赋值-1, 也就不会进入这个循环
// 当in < out表示[out, buffer.length)和[0, in)两个区间的数据可读
// 和receive方法类似, 为了不处理跨边界的情况, 先读[out, buffer.length)区间数据
available = buffer.length - out;
}
// 外部已经读了一个数据, 所以只需要读(len - 1)个数据了
if (available > (len - 1)) {
available = len - 1;
}
// 经过上面的判断, available表示本次需要读的数据长度
// 复制数据到输出数组
System.arraycopy(buffer, out, b, off + rlen, available);
// 后移out变量
out += available;
// 记录已经读到的数据量
rlen += available;
// 计算剩余需要读的数据
len -= available;
// 如果已经读到缓存数组的尾部, 回到开头
if (out >= buffer.length) {
out = 0;
}
if (in == out) {
// in == out表示已经没有数据可以读了, 所以in赋值-1
in = -1;
}
}
return rlen;
}
上面的方法我们需要注意:
while
方法体内是不会阻塞读线程的!while
方法体内是不会阻塞读线程的!while
方法体内是不会阻塞读线程的! 重要的事情说3遍~ 所以如果管道内只有1个数据, 那么读取到输出数组的就只有这1个数据,read
方法返回值会是1, 在读取数据后处理输出数组时需要特别注意这点.
available
我们在读数据前可以利用available()
先看看管道中的数据个数.
public synchronized int available() throws IOException {
if(in < 0)
// 管道中无数据
return 0;
else if(in == out)
// 缓存数组已满
return buffer.length;
else if (in > out)
// [out, in)区间内为有效数据
return in - out;
else
// in < out
// [in, out)区间为无效数据, 其余为有效数据, 所以长度为 buffer.length - (out - in)
return in + buffer.length - out;
}
到这里我们已经把所有PipedOutputStream
和PipedInputStream
的所有方法分析完毕了~ 接着我们再分析下读写过程中对象锁的归属问题.
锁
分析这部分我们先要了解下wait
和notifyAll
的作用, 可以参考知乎上这个回答java中的notify和notifyAll有什么区别? - 文龙的回答 - 知乎, 本文不再说明了, 重点理解锁池和等待池概念
锁池:假设线程A已经拥有了某个对象(注意:不是类)的锁,而其它的线程想要调用这个对象的某个synchronized方法(或者synchronized块),由于这些线程在进入对象的synchronized方法之前必须先获得该对象的锁的拥有权,但是该对象的锁目前正被线程A拥有,所以这些线程就进入了该对象的锁池中。
等待池:假设一个线程A调用了某个对象的wait()方法,线程A就会释放该对象的锁后,进入到了该对象的等待池中
首先, 需要注意, PipedOutputStream
中, 两个write
方法都没有synchronized
关键字, 所以我们不需要关心PipedOutputStream
的对象锁.
我们重点分析PipedInputStream
里面, read
和receive
方法.
假设我们先调用receive
写数据, 后调用read
读数据
当我们写数据时, 进入了receive
方法, 因为synchronized
关键字, 此时写线程会获取到了对象锁, 然后写数据到管道中, 注意, 在这个过程中, 读线程是不能通过read
方法读取数据的, 因为读线程获取不了对象锁, 如果这次写操作中, 管道中的缓存数组满了, 此时写线程会进入awaitSpace()
方法, 在该方法内, 写线程先调用了notifyAll
方法, 使读线程进入锁池准备竞争对象锁, 然后调用wait(1000)
方法, 在这1s内, 写线程释放了对象锁, 然后进入等待池.
写线程释放对象锁后, 读线程就能够获取对象锁, 进入read
方法内了, 然后读数据, 只要管道中存在至少一个数据, 就不会阻塞线程, 读取数据后直接退出方法, 释放对象锁, 如果这次读操作中, 管道中的缓存数组没有任何数据, 此时读线程就会调用notifyAll
方法, 使写线程从等待池移到锁池, 准备竞争对象锁, 然后再调用wait(1000)
方法, 在这1s内, 读线程释放对象锁, 自己进入等待池.
以上就是一次读写中, 对象锁的转移过程, 但是在实际过程中, 我们都是两个线程在各自的循环体内一直读数据和一直写数据的, 所以每一次循环的时候都会竞争锁, 可能先读后写, 或者先写后读.
总结
分析这两个类的源码我们应该可以学习到
-
InputSteam
和OutputSteam
的接口含义 - 使用数组缓存数据的方法, 使用
while
循环避免处理边界问题 -
wait
和notifyAll
协调读写线程的逻辑 - 使用这两个类实现传输数据流