一.流的概念
stream是数据集合,与数组、字符串差不多。但stream不一次性访问全部数据,而是一部分一部分发送/接收(chunk式的),所以不必占用那么大块内存,尤其适用于处理大量(外部)数据的场景
stream具有管道(pipeline)特性,例如:
const grep = ... // A stream for the grep output
const wc = ... // A stream for the wc input
grep.pipe(wc)
很多原生模块都是基于stream的,包括进程的stdin/stdout/stderr
:
例如常见的场景:
const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
const src = fs.createReadStream('./big.file');
src.pipe(res);
});
server.listen(8000);
其中pipe
方法把可读流的输出(数据源)作为可写流的输入(目标),直接把读文件的输出流作为输入连接到HTTP响应的输出流,从而避免把整个文件读入内存
P.S.甚至日常使用的console.log()
内部实现也是stream
二.流的类型
Node中有4种基础流:
Readable
可读流是对源的抽象, 从中可以消耗数据,如
fs.createReadStream
Writable
可写流是对可写入数据的目标的抽象,如
fs.createWriteStream
Duplex(双工)
双工流既可读又可写,如TCP socket
Transform(转换)
转换流本质上是双工流,用于在写入和读取数据时对其进行修改或转换,如
zlib.createGzip
用gzip压缩数据转换流看一看做一个输入可写流,输出可读流的函数
P.S.有一种转换流叫(Pass)Through Stream(通过流),类似于FP中的
identity = x => x
三.管道
src.pipe(res)
要求源必须可读,目标必须可写,所以,如果是对双工流进行管道传输,就可以像Linux的管道一样链式调用:
readableSrc
.pipe(transformStream1)
.pipe(transformStream2)
.pipe(finalWrtitableDest)
pipe()
方法返回目标流,所以:
// a (readable), b and c (duplex), and d (writable)
a.pipe(b).pipe(c).pipe(d)
// 等价于
a.pipe(b)
b.pipe(c)
c.pipe(d)
# Linux下,等价于
$ a | b | c | d
四.流与事件
事件驱动是Node在设计上的一个重要特点,很多Node原生对象都是基于事件机制(EventEmitter
模块)实现的,包括流(stream
模块):
Most of Node’s objects — like HTTP requests, responses, and streams — implement the EventEmitter module so they can provide a way to emit and listen to events.
所有stream都是EventEmitter
实例,通过事件机制来读写数据,例如上面提到的pipe()
方法相当于:
// readable.pipe(writable)
readable.on('data', (chunk) => {
writable.write(chunk);
});
readable.on('end', () => {
writable.end();
});
P.S.pipe
还处理了一些别的事情,比如错误处理,EoF以及某个流的速度较快/较慢的情况
Readable与Writable stream的主要事件和方法如下:
Readable的主要事件有:
data
事件:stream把一个chunk传递给使用者时触发end
事件:再没有要从stream中获取(consume)的数据时触发
Writable的主要事件有:
drain
事件,断流了,这是Writable stream可以接收更多数据的信号finish
事件,当所有数据都已flush到下层系统时触发
五.Readable stream的两种模式:Paused与Flowing
一个Readable stream要么流动(Flowing)要么暂停(Paused),也被称为拉(pull)和推(push)两种模式
创建出来后默认处于Paused状态,可以通过read()
方法读取数据。如果处于Flowing状态,数据会持续地流出来,此时只需要通过监听事件来使用这些数据,如果没有使用者的话,数据会丢失,所以都会监听Readable stream的data
事件,实际上监听data
事件会把Readable stream从Paused状态切换到Flowing,移除data
事件监听会再切回来。需要手动切换的话,可以通过resume()
和pause()
来做
使用pipe()
方式时不用关心这些,都会自动处理妥当:
Readable触发
data
事件,直到Writable忙不过来了pipe
收到信号后调用Readable.pause()
,进入Paused模式Writable再干一会儿压力不大了的时候,会触发
drain
事件,此时pipe
调用Readable.resume()
进入Flowing模式,让Readable接着触发data
事件
highWaterMark与backpressure
其实drain
事件就是用来应对Backpressure现象的,简单的说,Backpressure就是下游的消费速度限制了传输,造成下游向上游的反向压力
如果消费速度慢于生产速度,会在下游产生堆积,来不及处理的数据会存放到Writable的buffer里,如果不加(限流)处理,这个buffer会持续增长,可能溢出进而造成错误或数据丢失
Backpressure现象发生的标志是Writable.write()
返回了false
,说明来自上游的待处理数据量已经触及highWaterMark(高水位线,默认16kb):
Buffer level when stream.write() starts returning false. Defaults to 16384 (16kb), or 16 for objectMode streams.
这是下游开始有点紧张了(todo项足够忙一阵子了)的信号。建议在此时对上游限流,即调用Readable.pause()
先给停了,给下游多点时间处理堆积的数据,下游觉得轻松了会触发darin
事件,表示此时有能力处理更多数据了,所以这时候应该开闸放水(Readable.resume()
)
注意,Readable的数据会存放在缓存中,直到有个Writable来消耗这些数据。所以Paused状态只是说不往下流了,已经缓存的数据还在Readable的buffer里。所以如果不限流,来不及处理的数据就缓存在下游,并持续堆积,限流的话,这部分数据被缓存在上游,因为限流了而不再持续堆积
另外,Readable也有highWaterMark的概念:
The maximum number of bytes to store in the internal buffer before ceasing to read from the underlying resource. Defaults to 16384 (16kb), or 16 for objectMode streams
是对从实际数据源读取速度的限制(比如从磁盘读文件),防止生产速度太快引发缓存堆积(比如一顿猛push()
)。所以Flowing Readable的正常工作方式是被push()
–push()
–push()
…诶,发现buffer里的量已经攒够一个chunk了,吐给下游。同样,Readable触及highWaterMark的标志是push()
返回false
,说明Readable的buffer不那么十分空了,此时如果还持续push()
,没错,也会出现BackPressure(Readable消费能力限制了从数据源到Readable的传输速度):
快-------------慢
数据源-------->Readable------->Writable
快--------------慢
只要上游(生产)快,下游(消费)慢就会出现BackPressure,所以在readable.pipe(writable)
的简单场景,可能会出现上面两段BackPressure
六.示例
Writable stream
常见的造大文件:
const fs = require('fs');
const file = fs.createWriteStream('./big.file');
for(let i=0; i<= 1e6; i++) {
file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n');
}
file.end();
通过fs.createWriteStream()
创建指向文件的Writable stream,通过write()
填充数据,写完后end()
或者更一般的,直接new
一个Writable
:
const { Writable } = require('stream');
const outStream = new Writable({
write(chunk, encoding, callback) {
console.log(chunk.toString());
// nowrap version
// process.stdout.write(chunk.toString());
callback();
}
});
process.stdin.pipe(outStream);
一个最简单的echo
实现,把当前进程的标准输入接到自定义输出流outStream
,像日志中间件一样(标准输入流经outStream
,再该干嘛干嘛去callback
):
cc
oo
nn
ss
oo
ll
ee
Console {
log: [Function: bound consoleCall],
...
}
write()
方法的3个参数中,chunk
是个Buffer,encoding
在某些场景下需要,大多数时候可以忽略,callback
是应该在chunk
处理完毕后调用的通知函数,表明写入成功与否(失败的话,传Error对象进去),类似于尾触发机制中的next()
或者更简单的echo
实现:
process.stdin.pipe(process.stdout);
直接把标准输入流连接到标准输出流
Readable stream
const { Readable } = require('stream');
const inStream = new Readable();
inStream.push('ABCDEFGHIJKLM');
inStream.push('NOPQRSTUVWXYZ');
inStream.push(null); // No more data
inStream.pipe(process.stdout);
通过push
向Readable stream里填充数据,push(null)
表示结束。上例中把所有数据都读进来,然后才交给标准输出,实际上有更高效的方式(按需推数据给使用者):
const { Readable } = require('stream');
const inStream = new Readable({
read(size) {
this.push(String.fromCharCode(this.currentCharCode++));
if (this.currentCharCode > 90) {
this.push(null);
}
}
});
inStream.currentCharCode = 65;
inStream.pipe(process.stdout);
read()
方法每次吐一个字符,使用者从Readable stream取数据的时候,read()
会持续触发
Duplex/Transform stream
Duplex stream兼具Readable和Writable的特点:既可以作为数据源(生产者),也可以作为目标(消费者)。例如:
const { Duplex } = require('stream');
const inoutStream = new Duplex({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
},
read(size) {
this.push(String.fromCharCode(this.currentCharCode++));
if (this.currentCharCode > 90) {
this.push(null);
}
}
});
inoutStream.currentCharCode = 65;
process.stdin.pipe(inoutStream).pipe(process.stdout);
上例把前2个例子结合起来了,inoutStream
被连接到标准输出流了,A-Z
会作为数据源传递给标准输出(打印出来),同时标准输入流被接到inoutStream
,来自标准输入的所有数据会被log
出来,效果如下:
ABCDEFGHIJKLMNOPQRSTUVWXYZcc
oo
nn
ss
oo
ll
ee
Console {
log: [Function: bound consoleCall],
...
}
P.S.先输出A-Z
是因为pipe()
会把Readable stream切换到Flowing模式,所以一开始就把A-Z
“流”出来了
注意,Duplex stream的Readable与Writable部分是完全独立的,读写互不影响,Duplex只是把两个特性组合成一个对象了,就像两根筷子一样绑在一起的单向管道
Transform stream是一种有意思的Duplex stream:其输出是根据输入计算得来的。所以不用分别实现read/write()
方法,只实现一个transform()
方法就够了:
const { Transform } = require('stream');
const upperCaseTr = new Transform({
// 函数签名与write一致
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
process.stdin.pipe(upperCaseTr).pipe(process.stdout);
同样,Transform stream的Readable与Writable部分也是独立的(不手动push
就不会自动传递到Readable部分),只是形式上结合起来了
P.S.另外,stream之间除了可以传递Buffer/String,还可以传递Object(包括Array),具体见Streams Object Mode
Node提供了一些原生Transform stream,例如zlib
和crypto
stream:
const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream(file + '.gz'));
简单的命令行工具,gzip
压缩。更多示例见Node’s built-in transform streams