Nod.js 中的 stream

什么是 stream

Stream 借鉴自 Unix 编程哲学中的 pipe。

Unix shell 命令中,管道式的操作 | 将上一个命令的输出作为下一个命令的输入。Node.js stream 中则是通过 .pip() 方法来进行的。

来看一个 stream 的运用场景:从服务器读取文件并返回给页面。

  • 朴素的实现:
var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    fs.readFile(__dirname + '/data.txt', function (err, data) {
        res.end(data);
    });
});
server.listen(8000);
  • stream 实现:
var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.pipe(res);
});
server.listen(8000);

好处:

  • 代码更加简洁。
  • 可自由组合各种模块来处理数据。

stream 的种类

分五种:

  • readable
  • writable
  • duplex
  • transform
  • classic

readable

readable 类型的流产生的数据,可通过 .pip() 输送到能够消费(consume)流数据的地方,比如 writable,transform,duplex 类型的对象。

一个 readable stream 示例:

var Readable = require('stream').Readable;

var rs = new Readable;
rs.push('beep ');
rs.push('boop\n');
rs.push(null);

rs.pipe(process.stdout);

运行结果:

$ node read0.js
beep boop

_read 方法与按需输出

上面 rs.push(null) 表示没有更多数据了。直接将数据塞入到 readable 流中,然后被缓冲起来,直到被消费(示例中消费方为 process.stdout,即输出到命令行。)。因为消费者有可能并不能立即消费这些内容,直接 push 数据后会消耗不必要的资源。

更好的做法是,让 readable 流只在消费者需要数据的时候再 push。这是通过定义能 raedable 对象定义 ._read 方法来完成的。

var Readable = require('stream').Readable;
var rs = Readable();

var c = 97;
rs._read = function () {
    rs.push(String.fromCharCode(c++));
    if (c > 'z'.charCodeAt(0)) rs.push(null);
};

rs.pipe(process.stdout);

运行结果:

$ node read1.js
abcdefghijklmnopqrstuvwxyz

这种方式下,定义了 readable 流产生数据的方法 ._read,但并没有马上执行并输出数据,而是在 process.stdout 读取时,才调用输出的。

_read 方法可动态接收一个可选的 size 参数,由消费方指定一次读取想要多少字节的数据,当然,_read 方法的实现中是可以忽略这个入参的。

下面的示例可证明 _read 方法是消费方调用的时候才执行的,而不是主动执行。

var Readable = require('stream').Readable;
var rs = Readable();

var c = 97 - 1;

rs._read = function () {
    if (c >= 'z'.charCodeAt(0)) return rs.push(null);
    
    setTimeout(function () {
        rs.push(String.fromCharCode(++c));
    }, 100);
};

rs.pipe(process.stdout);

process.on('exit', function () {
    console.error('\n_read() called ' + (c - 97) + ' times');
});
process.stdout.on('error', process.exit);

输出任意数据

上面展示的是输出简单字符串,如果需要输出其他复杂数据,初始化时设置上正确的 objectMode 参数,Readable({ objectMode: true })

消费 readable 流产生的数据

一般情况下, 我们会将 readable 产生的数据直接传递给其他的消费方,比如 throughconcat-stream 创建的流对象。但直接操作来自 readable 的数据也是可以的。

process.stdin.on('readable', function () {
    var buf = process.stdin.read();
    console.dir(buf);
});

上面的示例代码中,监听了来自命令行的输入,有数据时 stdin readable 事件便会触发,然后可通过调用 read() 方法获取到数据。数据结束时 read() 返回 null 表示没有更多数据了。

$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume0.js 
<Buffer 61 62 63 0a>
<Buffer 64 65 66 0a>
<Buffer 67 68 69 0a>
null

同时 read() 方法支持传递一个 size 指定一次获取多少的字节(bytes)。比如一次只获取 3 字节的数据:

process.stdin.on('readable', function () {
    var buf = process.stdin.read(3);
    console.dir(buf);
});

这将导致我们获取到的数据是不完整的,因为指定了尺寸后,超出的部分是拿不到的。

$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume1.js 
<Buffer 61 62 63>
<Buffer 0a 64 65>
<Buffer 66 0a 67>

此时可通过调用 read(0) 告诉 Node.js 我们不希望丢弃超出的部分。这样超出的部分会在后面的读取中陆续输出。

process.stdin.on('readable', function () {
    var buf = process.stdin.read(3);
    console.dir(buf);
+    process.stdin.read(0);
});
$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume2.js 
<Buffer 61 62 63>
<Buffer **0a** 64 65>
<Buffer 66 **0a** 67>
<Buffer 68 69 **0a**>

除了调用 read(0),还可调用 unshift() 方法将多余数据放回去。这样下次 read() 的时候数据都还在。比如一个解析换行的示例,遇到换行时将本行输出,剩下的数据塞回,以在下一行处理时使用。

var offset = 0;

process.stdin.on('readable', function () {
    var buf = process.stdin.read();
    if (!buf) return;
    for (; offset < buf.length; offset++) {
        if (buf[offset] === 0x0a) {
            console.dir(buf.slice(0, offset).toString());
            buf = buf.slice(offset + 1);
            offset = 0;
            process.stdin.unshift(buf);
            return;
        }
    }
    process.stdin.unshift(buf);
});
$ tail -n +50000 /usr/share/dict/american-english | head -n10 | node lines.js 
'hearties'
'heartiest'
'heartily'
'heartiness'
'heartiness\'s'
'heartland'
'heartland\'s'
'heartlands'
'heartless'
'heartlessly'

writable 流

writable 流可作为 .pip() 的对象。

src.pipe(writableStream)

创建 writable 流

需要实现 ._write(chunk, enc, next) 方法,其中:

  • chunk 为接收到的数据
  • encopts.decodeStringfalse 且收到的数据这字符串时,它表示字符串的编码
  • next(err) 数据处理后的回调,可传递一个错误信息以表示数据处理失败

默认情况下,获取到的字符串数据会转为 Buffer,可设置 Writable({ decodeStrings: false }) 来获取字符串数据。

一个 writable 示例:

var Writable = require('stream').Writable;
var ws = Writable();
ws._write = function (chunk, enc, next) {
    console.dir(chunk);
    next();
};

process.stdin.pipe(ws);

向 writable 流写入数据

通过调用 writable 流的 write 方法来写入。

process.stdout.write('beep boop\n');

通过调用 end() 来结束数据的写入。

var fs = require('fs');
var ws = fs.createWriteStream('message.txt');

ws.write('beep ');

setTimeout(function () {
    ws.end('boop\n');
}, 1000);

duplex

双工类型的流,同时具有 writable 和 readable 流的功能。Node.js 内建的 zlib,TCP sockets 以及 crypto 都是双工类型的。

所以可对双工类型的流进行如下操作:

a.pip(b).pip(a)

transform

一种特殊类型的双工流,区别在于 transform 类型其输出是输入的转换。跟它的名字一样,这里面对数据进行一些转换后输出。比如,通过 zlib.createGzip 来对数据进行 gzip 的压缩。有时候也将这种类型的流称为 through steam

classic stream

这里指使用旧版 api 的流。当一个流身上绑定了 data 事件的监听时,便会回退为经典旧版的流。

classic readable stream

当有数据时它会派发 data 事件,数据输出结束时派发 end 事件给消费者。

.pipe() 通过检查 stream.readable 以判断该流是否是 readable 类型。

classic readable 流的创建

一个 classic readable 流的创建示例:

var Stream = require('stream');
var stream = new Stream;
stream.readable = true;

var c = 64;
var iv = setInterval(function () {
    if (++c >= 75) {
        clearInterval(iv);
        stream.emit('end');
    }
    else stream.emit('data', String.fromCharCode(c));
}, 100);

stream.pipe(process.stdout);

从 classic readable 流读取数据

数据读取是通过监听流上的 dataend 事件。

一个从 classic readable 流读取数据的示例:

process.stdin.on('data', function (buf) {
    console.log(buf);
});
process.stdin.on('end', function () {
    console.log('__END__');
});

一般不建议通过这种方式来操作,一旦给流绑定 data 事件处理器,即回退到旧的 api 来使用流。如果真的有兼容操作旧版流的需求,应该通过 throughconcat-stream 来进行。

classic writable stream

只需要实现 .write(buf).end(buf) 及 .destroy() 方法即可,比较简单。

内建的流对象

image

总结

本质上,所有流都是 EventEmitter,通过事件可写入和读取数据。但通过新的 stream api,可方便地通过 .pipe() 方法来使用流而不是事件的方式。

使用 stream 可显著提高程序性能,特别是处理数据量大的情况下。它可以将数据分片处理,而不是粗暴地将数据看作一个整体。但掌握并熟练是需要一定时间练习的。

参考