快活林资源网 Design By www.csstdc.com

什么是流?

说到流,就涉及到一个*nix的概念:管道——在*nix中,流在Shell中被实现为可以通过 |(管道符) 进行桥接的数据,一个进程的输出(stdout)可被直接作为下一个进程的输入(stdin)。

在Node中,流(Stream)的概念与之类似,代表一种数据流可供桥接的能力。

pipe

流化的精髓在于 .pipe()方法。可供桥接的能力,在于数据流的两端(上游/下游 或称为 读/写流)以一个 .pipe()方法进行桥接。

Node.js中的流(Stream)介绍

伪代码的表现形式为:
复制代码 代码如下:
//上游.pipe(下游)
Readable.pipe(Writable);

流的分类

这里并不打算讨论所谓的Node  v0.4 之前的“经典”流。那么,流分为这么几类(皆为抽象接口:

1.stream.Readable    可读流(需要实现_read方法,关注点在于对数据流读取的细节
2.stream.Writable     可写流(需要实现_write方法,关注点在于对数据流写入的细节
3.stream.Duplex        可读/写流(需要实现以上两接口,关注点为以上两接口的细节
4.stream.Transform  继承自Duplex(需要实现_transform方法,关注点在于对数据块的处理

简单来说:

1).pipe() 的拥有者一定具备 Readable 流(并不局限于)能力,它拥有 'readable'/'data'/'end'/'close'/'error' 一系列事件可供订阅,也提供 .read()/.pause()/.resume()等一系列方法供调用;
2).pipe() 的参数一定具备Writable 流(并不局限于 )能力,它拥有 'drain'/'pipe'/'unpipe'/'error'/'finish' 事件可供访问,也提供 .write()/.end() 等一系列方法供调用

什么鬼

有没有一丝丝焦虑?别急,做为一个说人话的低级码工,我会把Stream掰开了和您扯一扯的。

Stream类,在 Node.js的源码 里,是这么定义的:
复制代码 代码如下:
var EE = require('events').EventEmitter;
var util = require('util');
util.inherits(Stream, EE);
 
function Stream() {
  EE.call(this);
}

可以看出,本质上,Stream是一个EventEmitter,那意味着它具备事件驱动的功能(.emit/.on...)。众所周知,“Node.js 就是基于V8的事件驱动平台”,实现了事件驱动的流式编程,具备了和Node一样的异步回调的特征。

比如在 Readable 流中,有一个 readable 事件,在一个暂停的只读流中,只要有数据块准备好可读时,它就会被发送给订阅者(Readable 流有哪些呢?express中的 req,ftp或者mutli-form上传组件的req.part,系统中的标准输入 process.stdin等)。有了readable 事件,我们可以做个处理shell 命令输出的分析器之类的工具:
复制代码 代码如下:
process.stdin.on('readable', function(){
   var buf = process.stdin.read();
   if(buf){
      var data = buf.toString();
      // parsing data ...                                               
   }
});

这样调用:

复制代码 代码如下:
head -10 some.txt | node parser.js

对于 Readable 流,我们还可以订阅它的 data 和 end 事件,以获取数据块并在流枯竭时获得通知,如 经典socket示例 中那样:
复制代码 代码如下:
req.on('connect', function(res, socket, head) {
    socket.on('data', function(chunk) {
      console.log(chunk.toString());
    });
    socket.on('end', function() {
      proxy.close();
    });
  });

Readable流状态的切换
需要注意的是,Readable 流有两种状态:flowing mode(激流) 和 pause  mode(暂停)。前者根本停不下来,谁被pipe上了就马上不停的给;后者会暂停,直到下游显式的调用 Stream.read() 请求才读取数据块。Readable 流初始化时是 pause mode的。

这两种状态可以互为切换的,其中,

有以下任一行为,pause 转 flowing:

1.对 Readable 流添加一个data事件订阅
2.对 Readable 调用 .resume() 显式开启flowing
3.调用 Readable 流的 .pipe(writable) ,桥接到一个 Writable 流上

有以下任一行为,flowing 转回 pause:

1.Readable 流还没有 pipe 到任何流上,可调 .pause() 暂停
2.Readable 流已经 pipe 到了流上,需 remove 掉所有 data 事件订阅,并且调用 .unpipe()方法逐一解除与下游流的关系

妙用

结合流的异步特性,我可以写出这样的应用:直接将 用户A 的输出桥接到 用户B 的页面上输出:
复制代码 代码如下:
router.post('/post', function(req, res) {
    var destination = req.headers['destination']; //发给谁
    cache[destionation] = req;
    //是的,并不返回,所以最好是个ajax请求
});

用户B请求的时候:

复制代码 代码如下:
router.get('/inbox', function(req, res){
    var user = req.headers['user'];
    cache.find(user, function(err, previousReq){ //找到之前存的req
       var form = new multiparty.Form();
       form.parse(previousReq);  // 有文件给我
       form.on('part', function (part) {
            part.pipe(res); //流式大法好:)
 
            part.on('error', function (err) {
                console.log(err);
                messaging.setRequestDone(uniqueID);
                return res.end(err);
            });
        });
    });
});

参考

how to write node programs with streams: stream-handbook

快活林资源网 Design By www.csstdc.com
广告合作:本站广告合作请联系QQ:858582 申请时备注:广告合作(否则不回)
免责声明:本站资源来自互联网收集,仅供用于学习和交流,请遵循相关法律法规,本站一切资源不代表本站立场,如有侵权、后门、不妥请联系本站删除!
快活林资源网 Design By www.csstdc.com

《魔兽世界》大逃杀!60人新游玩模式《强袭风暴》3月21日上线

暴雪近日发布了《魔兽世界》10.2.6 更新内容,新游玩模式《强袭风暴》即将于3月21 日在亚服上线,届时玩家将前往阿拉希高地展开一场 60 人大逃杀对战。

艾泽拉斯的冒险者已经征服了艾泽拉斯的大地及遥远的彼岸。他们在对抗世界上最致命的敌人时展现出过人的手腕,并且成功阻止终结宇宙等级的威胁。当他们在为即将于《魔兽世界》资料片《地心之战》中来袭的萨拉塔斯势力做战斗准备时,他们还需要在熟悉的阿拉希高地面对一个全新的敌人──那就是彼此。在《巨龙崛起》10.2.6 更新的《强袭风暴》中,玩家将会进入一个全新的海盗主题大逃杀式限时活动,其中包含极高的风险和史诗级的奖励。

《强袭风暴》不是普通的战场,作为一个独立于主游戏之外的活动,玩家可以用大逃杀的风格来体验《魔兽世界》,不分职业、不分装备(除了你在赛局中捡到的),光是技巧和战略的强弱之分就能决定出谁才是能坚持到最后的赢家。本次活动将会开放单人和双人模式,玩家在加入海盗主题的预赛大厅区域前,可以从强袭风暴角色画面新增好友。游玩游戏将可以累计名望轨迹,《巨龙崛起》和《魔兽世界:巫妖王之怒 经典版》的玩家都可以获得奖励。