流是传输数据时常见的思想,就是一部分一部分的传输内容,是文件读写、网络通信的基础概念。
Node.js 也提供了 stream 的 api,包括 Readable
可读流、Writable
可写流、Duplex
双工流、Transform
转换流。它们分别实现 _read
、_write
、_read + _write
、_transform
方法,来做数据的返回和处理。
创建 Readable 对象既可以直接调用 Readable api 创建,然后重写_read
方法,也可以继承 Readable 实现一个子类,之后实例化。其他流同理。(Readable 可以很容易的和 generator 结合)
当读入的速率大于写入速率的时候就会出现“背压”现象,会爆缓冲区导致数据丢失,解决的方式是根据 write 的速率来动态 pause
和 resume
可读流的速率。
当调用 writable stream 的 write 方法的时候会返回一个 boolean
值代表是写入了目标还是放在了缓冲区:
-
true
: 数据已经写入目标 -
false
:目标不可写入,暂时放在缓冲区
我们可以判断返回 false 的时候就 pause,然后等缓冲区清空了就 resume:
const rs = fs.createReadStream(src, {
flags: "r",
encoding: null,
fd: null,
mode: 438,
autoClose: true,
start: 0,
// end:3,
highWaterMark: 4,
});
const ws = fs.createWriteStream(dst, {
flags: "w",
encoding: "utf-8",
fd: null,
mode: 438,
autoClose: true,
start: 0,
// end:3,
highWaterMark: 4,
});
rs.on('data', function (chunk) {
// 判断返回 false 的时候就 pause
if (ws.write(chunk) === false) {
rs.pause();
}
});
rs.on('end', function () {
ws.end();
});
// 当缓冲区可以继续写入数据时通过drain时间让生产者知到
ws.on('drain', function () {
// 缓冲区清空了就 resume
rs.resume();
});
这样就能达到根据写入速率暂停和恢复读入速率的功能,解决了背压问题
流是掌握 IO 绕不过去的一个概念,而背压问题也是流很常见的问题,遇到了数据丢失可以考虑是否发生了背压。希望这篇文章能够帮大家,真正掌握 stream!