Node.js流模式编程详解

本文是Node.js设计模式的笔记, 代码都是来自 <Node.js Design Patterns> by Mario Casciaro.

流的重要性

一般我们处理数据有两种模式, 一种是buffer模式, 一种是stream模式, buffer模式就是取完数据一次性操作, stream模式就是边取数据边操作.
举个例子, 如果打开一个2G的文件, 用buffer模式就是先分配2G的内存, 把文件全部读出来, 然后开始操作内存, 而用流模式的方法就是边读数据, 边开始处理.

从这里看出stream模式无论是在空间和时间上都优于buffer模式:
在空间上, 内存只会占用当前需要处理的一块数据区域的大小, 而不是整个文件.
在时间上, 因为不需要全部的数据就可以开始处理, 时间就相当于节约了, 从串行变成了并行操作(这里的并行不是多线程的并行, 而是生产者和消费者并行).

还有一个好处就是链式调用, 也就是可组合操作, 大大增加了代码的可重用性.
比如下面这个代码(中间的pipe可以很方便的增删):

fs.createReadStream(file)
     .pipe(zlib.createGzip())
     //.pipe(crypto.createCipher('aes192', 'secret'))
     .pipe(req)
     .on('finish', function() {
       console.log('File succesfully sent');
     });

开始编码

nodejs里面的stream一般分四种, 其中转换流是一种特殊的读写流.

  • 输入流(stream.Readable)
  • 输出流(stream.Writable)
  • 读写流(stream.Duplex)
  • 转换流(stream.Transform)

另外, nodejs里面的流有两种模式, 二进制模式和对象模式.

  • 二进制模式, 每个分块都是buffer或者string对象.
  • 对象模式, 流内部处理的是一系列普通对象.

输入流(stream.Readable)

先看一下怎么使用输入流, 这里一般有两种方法, 一个是非流动模式, 一个是流动模式.
非流动模式就是直接调用read()方法, 被动模式就是监听data事件.
下面直接看代码:

// 非流动模式
process.stdin
    .on('readable', function() {
        // 有数据到了, 拼命读, 直到读完.
        var chunk;
        console.log('New data available');
        while((chunk = process.stdin.read()) !== null) {
            console.log(
            'Chunk read: (' + chunk.length + ') "' + chunk.toString() + '"'
            );
        }})
    .on('end', function() {
       process.stdout.write('End of stream');
    });

接下来看流动模式怎么玩

// 流动模式
process.stdin
     .on('data', function(chunk) {
       console.log('New data available');
       console.log(
         'Chunk read: (' + chunk.length + ')" ' +
         chunk.toString() + '"'
       );
     })
     .on('end', function() {
       process.stdout.write('End of stream');
     });

同样实现一个输入流也很简单, 主要是

  1. 继承Readable类
  2. 实现_read(size)接口(一般带下划线的表示内部函数, 调用者不要直接调用, 相当于C++里面的protect方法, 只是javascript里面没有对方法做区分, 只能是命名上面区分一下了).
    下面看示例代码:
// randomStream.js
var stream = require('stream');
var util = require('util');
var chance = require('chance').Chance();

function RandomStream(options) {
    // option支持3个参数
    // encoding String 用于转换Buffer到String的编码类型(默认null)
    // objMode Boolean 用户指定是否是对象模式(默认false)
    // highWaterMark Number 最高水位(可读的最大数据量), 默认是16K
    stream.Readable.call(this, options);
}
util.inherits(RandomStream, stream.Readable);
RandomStream.prototype._read = function(size) {
    // 这是一个随机产生数据的流, 5%的概率输出null, 也就是流停止.
    var chunk = chance.string();
    console.log('Pushing chunk of size:' + chunk.length);
    this.push(chunk, 'utf8');
    if(chance.bool({likelihood: 5})) {
       this.push(null);
    }
}
module.exports = RandomStream;

好, 接下来是如何使用:

// generateRandom.js
var RandomStream = require('./randomStream');
var randomStream = new RandomStream();
randomStream.on('readable', function() {
    var chunk;
    while((chunk = randomStream.read()) !== null) {
        console.log("Chunk received: " + chunk.toString());
    }
});

输出流(stream.Writable)

先是怎么用:

// 写数据
writable.write(chunk, [encoding], [callback]);
// 结束流
writable.end([chunk], [encoding], [callback])

回压(back-pressure)

这里涉及一个概念, 回压(back-pressure), 意思就是当生产者速度大于消费者的时候, 输出流的水位会不断上升, 当到达设定的最高水位时候, 就会写入失败, 这时候也就是产生了back-pressure, 那如何处理呢, 此时输入流在水位降低到零点的时候会有一个drain事件发送, 只要监听这个事件, 在事件发生的时候就可以继续向流写入数据了.
直接看代码:

var chance = require('chance').Chance();
require('http').createServer(function (req, res) {
    res.writeHead(200, {'Content-Type': 'text/plain'});
    function generateMore() {             //[1]
        while(chance.bool({likelihood: 95})) {
            var shouldContinue = res.write(
                chance.string({length: (16 * 1024) – 1})
            );
            if(!shouldContinue) {             //[3]
                console.log('Backpressure');
                return res.once('drain', generateMore);
            }
        }
        res.end('\nThe end...\n', function() {
            console.log('All data was sent');
        });
    }
    generateMore();
}).listen(8080, function () {
  console.log('Listening');
});

同样, 实现一个输出流也很简单, 只要继承Writable类, 实现_write()接口即可.
示例代码:

// toFileStream.js
var stream = require('stream');
var fs = require('fs');
var util = require('util');
var path = require('path');
var mkdirp = require('mkdirp');

function ToFileStream() {
    // 这次我们用对象模式
    stream.Writable.call(this, {objectMode: true});
};
util.inherits(ToFileStream, stream.Writable);
ToFileStream.prototype._write = function(chunk, encoding, callback) {
    var self = this;
    mkdirp(path.dirname(chunk.path), function(err) {
        if(err) {
            return callback(err);
        }
        fs.writeFile(chunk.path, chunk.content, callback);
    });
}
module.exports = ToFileStream;

下面是调用的代码

var ToFileStream = require('./toFileStream');
var tfs = new ToFileStream();
tfs.write({path: "file1.txt", content: "Hello"});
tfs.write({path: "file2.txt", content: "Node.js"});
tfs.write({path: "file3.txt", content: "Streams"});
tfs.end(function() {
    console.log("All files created");
});

读写流(stream.Duplex)

就是把输入流和输出流的接口都实现了.
注意:
此时option参数是同时传给了内部的Readable和Writeable, 如果要使用不同的选项, 就要分开配置,
像这样:
this._writableState.objectMode
this._readableState.objectMode
同时, Duplex又多了一个选项allowHalfOpen, 这个选项的意思是, 当其中一个流关闭的时候, 另外一条流是否也同时关闭, 默认是true, 也就是不同时关闭.

转换流(stream.Transform)

对于读写流来说, 要实现的是 _read() 和 _write() 接口, 而转换流要实现的是 _transform() 和 _flush()接口.
区别是什么, 转换流一般在transform的过程中把读写都做了, 也就是在处理输入的时候, 直接就输出了. 最后在输入结束的时候_flush() 会被调用, 就可以把剩余的内部数据一并输出了.

示例代码:

// 这代码写的很漂亮, 解决的问题是在流中操作替换操作
// 其中替换的部分可以仔细看一下, stream和buffer一个很大的区别就是stream会被切割
// 导致要替换的数据也有可能被切割, 这个例子就提供了一种解决方法, 
// 这个在后续实践中肯定也会遇到的.
var stream = require('stream');
var util = require('util');
function ReplaceStream(searchString, replaceString) {
    stream.Transform.call(this, {decodeStrings: false});
    this.searchString = searchString;
    this.replaceString = replaceString;
    this.tailPiece = '';
}
util.inherits(ReplaceStream, stream.Transform);
ReplaceStream.prototype._transform = function(chunk, encoding, callback) {
    var pieces = (this.tailPiece + chunk).split(this.searchString);
    var lastPiece = pieces[pieces.length - 1];
    var tailPieceLen = this.searchString.length - 1;
    this.tailPiece = lastPiece.slice(-tailPieceLen);
    pieces[pieces.length - 1] = lastPiece.slice(0, -tailPieceLen);
    this.push(pieces.join(this.replaceString));       //[3]
    callback();
}
ReplaceStream.prototype._flush = function(callback) {
    this.push(this.tailPiece);
    callback();
}
module.exports = ReplaceStream;

几个流操作相关的有用包

程序员就是懒, 有这几个包就可以少写一些代码了.

  • readable-stream, 统一了nodejs 实现的不同版本stream接口
  • [through2](https://npmjs.org/ package/through2), 用于快速创建转化流
  • from2, 用于快速创建输入流
  • writable2, 用于快速创建输出流
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,902评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,037评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,978评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,867评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,763评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,104评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,565评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,236评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,379评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,313评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,363评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,034评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,637评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,719评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,952评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,371评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,948评论 2 341

推荐阅读更多精彩内容

  • stream 流是一个抽象接口,在 Node 里被不同的对象实现。例如 request to an HTTP se...
    明明三省阅读 3,384评论 1 10
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,566评论 18 139
  • https://nodejs.org/api/documentation.html 工具模块 Assert 测试 ...
    KeKeMars阅读 6,293评论 0 6
  • 流是Node中最重要的组件和模式之一。在社区里有一句格言说:让一切事务流动起来。这已经足够来描述在Node中流...
    宫若石阅读 533评论 0 0
  • 周一早晨似乎总是昏昏沉沉,做事无法集中注意力,喝两杯咖啡都不一定能提神。大概是因为周末的生物钟还没有倒过来时差吧。...
    vivi要学习阅读 188评论 0 0