node.js中流(Stream)的深度剖析

流(stream)在 Node.js 中是处理流数据的抽象接口(abstract interface)。stream 模块提供了基础的 API 。使用这些 API 可以很容易地来构建实现流接口的对象。Node.js 提供了多种流对象。流可以是可读的、可写的,或是可读写的。所有的流都是 EventEmitter 的实例。

Node.js 中有四种基本的流类型:

  • Readable - 可读的流 (例如 fs.createReadStream()).
  • Writable - 可写的流 (例如 fs.createWriteStream()).
  • Duplex - 可读写的流 (例如 net.Socket).
  • Transform - 在读写过程中可以修改和变换数据的 Duplex 流 (例如 zlib.createDeflate()).

接下来让我们一起去看看stream中的流是怎么样来工作的。

可写流 Writable

fs.createWriteStream(path[, options])创建一个可写流,对这个不太了解的可以查看fs.createWriteStream(path[, options])
 let fs=require('fs');
 let ws=fs.createWriteStream('2.txt',{
   highWaterMark:3
 })
 ws.write('我们都是好孩子,哈哈、、、','utf8',(err)=>{
   if(err){
     console.log(err);
   }
 })

那么这样一个可写流究竟是如何实现的呢?我们将通过手写代码来模拟fs.createWriteStream的功能来解析node中可写流的工作原理,下面们将通过一张图解来大概看看我们手写代码有哪些功能点,图片如下:

image

通过上面的图解代码的功能也就很明显了,下面我们就一一来实现,首先是创建一个类,构建好一个类的大体骨架:

  let fs=require('fs');
  let EventEmiter=require('events');
  class MyWriteStream extends EventEmiter{
    constructor(path,options){
      super();
      this.path=path;//路径
      this.flags=options.flags||'w';//模式
      this.encoding=options.encoding||null;//编码格式
      this.fd=options.fd||null;//打开文件的标识位
      this.mode=options.mode||0o666;//写入的mode
      this.autoClose=options.autoClose||true;//是否自动关闭
      this.start=options.start||0;//写入的开始位置
      this.pos=this.start;//写入的标示位置
      this.writing=false;//是否正在写入的标识
      this.highWaterMark=options.highWaterMark||1024*16;//每次写入的最大值
      this.buffers = [];//缓存区
      this.length = 0;//表示缓存区字节的长度
  
      this.open();
    }
    open(){
      
    }
    write(){
      
    }
    _write(chunk,encoding,callback){
      
    }
    clearBuffer(){
      
    }
    destroy(){
      
    }
  }
  
  module.exports=MyWriteStream;
  • open方法

如思维导图所示,open方法的功能主要是打开对应路径的文件与触发open事件,所以对应的代码片段如下:

 open(){
     fs.open(this.path,this.flags,this.mode,(err,fd)=>{
       if(err){
         if(this.autoClose){
           this.destroy();
         }
         this.emit('error',err);
         return;
       }
       this.fd=fd;
       this.emit('open');
     })
   }
  • write方法代码段如下:
 write(data,encoding,callback){
     let chunk = Buffer.isBuffer(data)?data:Buffer.from(data,this.encoding);
     let len=chunk.length;
     this.length+=len;
     //判断当前最新的缓存区是否小于每次写入的最大值
     let ret = this.length < this.highWaterMark;
     if (this.writing) {//表示正在向文件写数据,则当前数据必须放在缓存区里
       this.buffers.push({
         chunk,
         encoding,
         callback
       });
     } else {//直接调用底层的写入方法进行写入
       //在底层写完当前数据后要清空缓存区
       this.writing = true;
       this._write(chunk, encoding, () => {this.clearBuffer();callback&&callback()});
     }
     return ret;
   }
  • _write方法如下:
 _write(chunk,encoding,callback){
     if(typeof this.fd != 'number'){
       return this.once('open',()=>this._write(chunk, encoding, callback));
     }
     fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,bytesWrite)=>{
       if(err){
         if(this.autoClose){
           this.destroy();
           this.emit('error',err);
         }
       }else{
           this.pos += bytesWrite;
           //写入多少数据,缓存区减少多少字节
           this.length -= bytesWrite;
           callback && callback();
       }
     })
   }
  • destroy方法,代码如下:
   destroy(){
       fs.close(this.fd,()=>{
         this.emit('end');
         this.emit('close');
       })
     }
  • clearBuffer方法,代码如下:
 clearBuffer(){
     let data = this.buffers.shift();
     if(data){
       this._write(data.chunk,data.encoding,()=>{this.clearBuffer();data.callback()})
     }else{
       this.writing = false;
       //缓存区清空了
       this.emit('drain');
     }
   }
  • 最后完整的代码如下:
   let fs=require('fs');
   let EventEmiter=require('events');
   
   class MyWriteStream extends EventEmiter{
     constructor(path,options){
       super();
       this.path=path;//路径
       this.flags=options.flags||'w';//模式
       this.encoding=options.encoding||null;//编码格式
       this.fd=options.fd||null;//打开文件的标识位
       this.mode=options.mode||0o666;//写入的mode
       this.autoClose=options.autoClose||true;//是否自动关闭
       this.start=options.start||0;//写入的开始位置
       this.pos=this.start;//写入的标示位置
       this.writing=false;//是否正在写入的标识
       this.highWaterMark=options.highWaterMark||1024*16;//每次写入的最大值
       this.buffers = [];//缓存区
       this.length = 0;//表示缓存区字节的长度
   
       this.open();
     }
     open(){
       fs.open(this.path,this.flags,this.mode,(err,fd)=>{
         if(err){
           if(this.autoClose){
             this.destroy();
           }
           this.emit('error',err);
           return;
         }
         this.fd=fd;
         this.emit('open');
       })
     }
     write(data,encoding,callback){
       let chunk = Buffer.isBuffer(data)?data:Buffer.from(data,this.encoding);
       let len=chunk.length;
       this.length+=len;
       //判断当前最新的缓存区是否小于每次写入的最大值
       let ret = this.length < this.highWaterMark;
       if (this.writing) {//表示正在向文件写数据,则当前数据必须放在缓存区里
         this.buffers.push({
           chunk,
           encoding,
           callback
         });
       } else {//直接调用底层的写入方法进行写入
         //在底层写完当前数据后要清空缓存区
         this.writing = true;
         this._write(chunk, encoding, () => {this.clearBuffer();callback&&callback()});
       }
       return ret;
     }
     _write(chunk,encoding,callback){
       if(typeof this.fd != 'number'){
         return this.once('open',()=>this._write(chunk, encoding, callback));
       }
       fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,bytesWrite)=>{
         if(err){
           if(this.autoClose){
             this.destroy();
             this.emit('error',err);
           }
         }else{
             this.pos += bytesWrite;
             //写入多少数据,缓存区减少多少字节
             this.length -= bytesWrite;
             callback && callback();
         }
       })
     }
     clearBuffer(){
       let data = this.buffers.shift();
       if(data){
         this._write(data.chunk,data.encoding,()=>{this.clearBuffer();data.callback()})
       }else{
         this.writing = false;
         //缓存区清空了
         this.emit('drain');
       }
     }
     destroy(){
       fs.close(this.fd,()=>{
         this.emit('end');
         this.emit('close');
       })
     }
   }
   
   module.exports=MyWriteStream;

可读流 Readable - 可读的流 (例如 fs.createReadStream()).

fs.createReadStream()创建一个可读流(例如 fs.createReadStream()),可读流其实与可写流很相似,但是可读流事实上工作在下面两种模式之一:flowing 和 paused 。

  • 在 flowing 模式下, 可读流自动从系统底层读取数据,并通过 EventEmitter 接口的事件尽快将数据提供给应用。
  • 在 paused 模式下,必须显式调用 stream.read() 方法来从流中读取数据片段。

可读流可以通过下面途径切换到 paused 模式:

  • 如果不存在管道目标(pipe destination),可以通过调用 stream.pause() 方法实现。
  • 如果存在管道目标,可以通过取消 'data' 事件监听,并调用 stream.unpipe() 方法移除所有管道目标来实现。

这里需要记住的重要概念就是,可读流需要先为其提供消费或忽略数据的机制,才能开始提供数据。如果消费机制被禁用或取消,可读流将 尝试 停止生成数据。

注意: 为了向后兼容,取消 'data' 事件监听并 不会 自动将流暂停。同时,如果存在管道目标(pipe destination),且目标状态变为可以接收数据(drain and ask for more data),调用了 stream.pause() 方法也并不保证流会一直 保持 暂停状态。

注意: 如果 Readable 切换到 flowing 模式,且没有消费者处理流中的数据,这些数据将会丢失。 比如, 调用了 readable.resume() 方法却没有监听 'data' 事件,或是取消了 'data' 事件监听,就有可能出现这种情况。

flowing模式
//flowing 模式下createReadStream的工作代码如下:
let fs=require('fs');
let rs=fs.createReadStream('2.txt',{
  highWaterMark:3,
  encoding:'utf8'
})
rs.on('data',(data)=>{
  console.log(data);
})
 

其实,flowing模式下的可读流的流程与可读流差异不大,所以,这里就不再画原理分析图了,可以参考上述可写流的原理分析图;手写原理分析完整代码如下:

 let EventEmitter = require('events');
 let fs = require('fs');
 class ReadStream extends EventEmitter {
   constructor(path, options) {
     super(path, options);
     this.path = path;
     this.flags = options.flags || 'r';
     this.mode = options.mode || 0o666;
     this.highWaterMark = options.highWaterMark || 64 * 1024;
     this.pos = this.start = options.start || 0;
     this.end = options.end;
     this.encoding = options.encoding;
     this.flowing = null;
     this.buffer = Buffer.alloc(this.highWaterMark);
     this.open();
     this.on('newListener',(type,listener)=>{
       if(type == 'data'){
         this.flowing = true;
         this.read();
       }
     });
   }
   read(){
     if(typeof this.fd != 'number'){
       return this.once('open',()=>this.read());
     }
     let howMuchToRead = this.end?Math.min(this.end - this.pos + 1,this.highWaterMark):this.highWaterMark;
     fs.read(this.fd,this.buffer,0,howMuchToRead,this.pos,(err,bytes)=>{
       if(err){
         if(this.autoClose)
           this.destroy();
         return this.emit('error',err);
       }
       if(bytes){
         let data = this.buffer.slice(0,bytes);
         this.pos += bytes;
         data = this.encoding?data.toString(this.encoding):data;
         this.emit('data',data);
         if(this.end && this.pos > this.end){
           return this.endFn();
         }else{
           if(this.flowing)
             this.read();
         }
       }else{
         return this.endFn();
       }
 
     })
   }
   endFn(){
     this.emit('end');
     this.destroy();
   }
   open() {
     fs.open(this.path,this.flags,this.mode,(err,fd)=>{
       if(err){
         if(this.autoClose){
           this.destroy();
           return this.emit('error',err);
         }
       }
       this.fd = fd;
       this.emit('open');
     })
   }
   destroy(){
     fs.close(this.fd,()=>{
       this.emit('close');
     });
   }
   pipe(dest){
     this.on('data',data=>{
       let flag = dest.write(data);
       if(!flag){
         this.pause();
       }
     });
     dest.on('drain',()=>{
       this.resume();
     });
   }
   pause(){
     this.flowing = false;
   }
   resume(){
     this.flowing = true;
     this.read();
   }
 }
 module.exports = ReadStream;

paused 模式
//fs.createReadStream原生api的代码如下:
let fs=require('fs');
let rs=fs.createReadStream('2.txt',{
  highWaterMark:3,
  encoding:'utf8'
})
rs.on('readable',()=>{
  console.log(rs.read());
})
  

这里主要和flowing模式大同小异,只是这种模式下,读取到的数据会放到数据片段里面先缓存起来,并触发readable事件,再通过read方法来读取已读取到的数据片段。原理解析代码如下:

 let fs = require('fs');
 let EventEmitter = require('events');
 class ReadStream extends EventEmitter {
   constructor(path, options) {
     super(path, options);
     this.path = path;
     this.highWaterMark = options.highWaterMark || 64 * 1024;
     this.buffer = Buffer.alloc(this.highWaterMark);
     this.flags = options.flags || 'r';
     this.encoding = options.encoding;
     this.mode = options.mode || 0o666;
     this.start = options.start || 0;
     this.end = options.end;
     this.pos = this.start;
     this.autoClose = options.autoClose || true;
     this.bytesRead = 0;
     this.closed = false;
     this.flowing;
     this.needReadable = false;
     this.length = 0;
     this.buffers = [];
     this.on('end', function () {
       if (this.autoClose) {
         this.destroy();
       }
     });
     this.on('newListener', (type) => {
       if (type == 'data') {
         this.flowing = true;
         this.read();
       }
       if (type == 'readable') {
         this.read(0);
       }
     });
     this.open();
   }
   open() {
     fs.open(this.path, this.flags, this.mode, (err, fd) => {
       if (err) {
         if (this.autoClose) {
           this.destroy();
           return this.emit('error', err);
         }
       }
       this.fd = fd;
       this.emit('open');
     });
   }
 
   read(n) {
     if (typeof this.fd != 'number') {
       return this.once('open', () => this.read());
     }
     n = parseInt(n, 10);
     if (n != n) {
       n = this.length;
     }
     if (this.length == 0)
       this.needReadable = true;
     let ret;
     if (0 < n < this.length) {
       ret = Buffer.alloc(n);
       let b;
       let index = 0;
       while (null != (b = this.buffers.shift())) {
         for (let i = 0; i < b.length; i++) {
           ret[index++] = b[i];
           if (index == ret.length) {
             this.length -= n;
             b = b.slice(i + 1);
             this.buffers.unshift(b);
             break;
           }
         }
       }
       if (this.encoding) ret = ret.toString(this.encoding);
     }
     let _read = () => {
       let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
       fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {
         if (err) {
           return
         }
         let data;
         if (bytesRead > 0) {
           data = this.buffer.slice(0, bytesRead);
           this.pos += bytesRead;
           this.length += bytesRead;
           if (this.end && this.pos > this.end) {
             if (this.needReadable) {
               this.emit('readable');
             }
             this.emit('end');
           } else {
             this.buffers.push(data);
             if (this.needReadable) {
               this.emit('readable');
               this.needReadable = false;
             }
           }
         } else {
           if (this.needReadable) {
             this.emit('readable');
           }
           return this.emit('end');
         }
       })
     }
     if (this.length == 0 || (this.length < this.highWaterMark)) {
       _read(0);
     }
     return ret;
   }
   destroy() {
     fs.close(this.fd, (err) => {
       this.emit('close');
     });
   }
   pause() {
     this.flowing = false;
   }
   resume() {
     this.flowing = true;
     this.read();
   }
   pipe(dest) {
     this.on('data', (data) => {
       let flag = dest.write(data);
       if (!flag) this.pause();
     });
     dest.on('drain', () => {
       this.resume();
     });
     this.on('end', () => {
       dest.end();
     });
   }
 }
 module.exports = ReadStream;

以上就是个人大致对node中的stream的工作原理理解,欢迎大家多多指正,谢谢!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,529评论 5 475
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,015评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,409评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,385评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,387评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,466评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,880评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,528评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,727评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,528评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,602评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,302评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,873评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,890评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,132评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,777评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,310评论 2 342

推荐阅读更多精彩内容

  • stream 流是一个抽象接口,在 Node 里被不同的对象实现。例如 request to an HTTP se...
    明明三省阅读 3,392评论 1 10
  • 简介 主要对stream这个概念做一个形象的描述和理解,同时介绍一下比较常用的API。主要参考了Node.js的官...
    cooody阅读 1,200评论 0 0
  • 流是Node中最重要的组件和模式之一。在社区里有一句格言说:让一切事务流动起来。这已经足够来描述在Node中流...
    宫若石阅读 539评论 0 0
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,579评论 18 139
  • Stream在node中是一个无处不在的概念,但凡和IO沾边的程序都离不开stream,所以不弄懂stream是无...
    ifcode阅读 4,658评论 1 7