- assert - 断言
- Buffer - 缓冲器
- child_process - 子进程
- cluster - 集群
- console - 控制台
- crypto - 加密
- dgram - 数据报
- dns - 域名服务器
- Error - 异常
- events - 事件
- fs - 文件系统
- global - 全局变量
- http - HTTP
- https - HTTPS
- module - 模块
- net - 网络
- os - 操作系统
- path - 路径
- process - 进程
- querystring - 查询字符串
- readline - 逐行读取
- repl - 交互式解释器
- stream - 流
- string_decoder - 字符串解码器
- timer - 定时器
- tls - 安全传输层
- tty - 终端
- url - 网址
- util - 实用工具
- v8 - V8引擎
- vm - 虚拟机
- zlib - 压缩
Node.js v10.8.0 文档
目录
-
- 本文档的组织结构
-
-
-
- 'close' 事件
- 'drain' 事件
- 'error' 事件
- 'finish' 事件
- 'pipe' 事件
- 'unpipe' 事件
- writable.cork()
- writable.destroy([error])
- writable.end([chunk][, encoding][, callback])
- writable.setDefaultEncoding(encoding)
- writable.uncork()
- writable.writableHighWaterMark
- writable.writableLength
- writable.write(chunk[, encoding][, callback])
-
-
- 两种模式
- 三种状态
- 选择一种方法
-
- 'close' 事件
- 'data' 事件
- 'end' 事件
- 'error' 事件
- 'readable' 事件
- readable.destroy([error])
- readable.isPaused()
- readable.pause()
- readable.pipe(destination[, options])
- readable.read([size])
- readable.readableHighWaterMark
- readable.readableLength
- readable.resume()
- readable.setEncoding(encoding)
- readable.unpipe([destination])
- readable.unshift(chunk)
- readable.wrap(stream)
- readable[Symbol.asyncIterator]()
- stream.finished(stream, callback)
- stream.pipeline(...streams[, callback])
-
stream - 流#
流(stream)是一种在 Node.js 中处理流式数据的抽象接口。
stream
模块提供了一些基础的 API,用于构建实现了流接口的对象。
Node.js 提供了多种流对象。
例如,发送到 HTTP 服务器的请求和 process.stdout
都是流的实例。
流可以是可读的、可写的、或是可读写的。
所有的流都是 EventEmitter
的实例。
stream
模块可以通过以下方式使用:
const stream = require('stream');
尽管理解流的工作方式很重要,但是 stream
模块本身主要用于开发者创建新类型的流实例。
对于以消费流对象为主的开发者,极少需要直接使用 stream
模块。
本文档的组织结构#
本文档分为两个主要章节,外加其他注意事项作为第三章节。 第一章节阐述了在应用程序中使用流时需要的 API 要素。 第二章节阐述了实现新类型的流时需要的 API 要素。
流的类型#
Node.js 中有四种基本的流类型:
Writable
- 可写入数据的流(例如fs.createWriteStream()
)。Readable
- 可读取数据的流(例如fs.createReadStream()
)。Duplex
- 可读又可写的流(例如net.Socket
)。Transform
- 在读写过程中可以修改或转换数据的Duplex
流(例如zlib.createDeflate()
)。
另外本模块还包含了工具类函数 pipeline 和 finished。
对象模式#
所有 Node.js API 创建的流都是专门运作在字符串和 Buffer
(或 Uint8Array
)对象上。
当然,流的实现也可以使用其它类型的 JavaScript 值(除了 null
,因为它在流中有特殊用途)。
这些流会以“对象模式”进行操作。
当创建流时,可以使用 objectMode
选项把流实例切换到对象模式。
试图将已经存在的流切换到对象模式是不安全的。
缓冲#
可写流和可读流都会在一个内部的缓冲器中存储数据,可以分别使用的 writable.writableBuffer
或 readable.readableBuffer
来获取。
可缓冲的数据的数量取决于传入流构造函数的 highWaterMark
选项。
对于普通的流,highWaterMark
选项指定了字节的总数量。
对于以对象模式运作的流,highWaterMark
指定了对象的总数量。
当调用 stream.push(chunk)
时,数据会被缓冲在可读流中。
如果流的消费程序没有调用 stream.read()
,则这些数据会停留在内部队列中,直到被消费。
一旦内部的可读缓冲的总大小达到 highWaterMark
指定的阈值时,流会暂时停止从底层资源读取数据,直到当前缓冲的数据被消费
(也就是说,流会停止调用内部的用于填充可读缓冲的 readable._read()
方法)。
当反复地调用 writable.write(chunk)
方法时,数据会被缓冲在可写流中。
当内部的可写缓冲的总大小小于 highWaterMark
设置的阈值时,调用 writable.write()
会返回 true
。
一旦内部缓冲的大小达到或超过 highWaterMark
时,则会返回 false
。
stream
API 的主要目标,特别是 stream.pipe()
方法,是为了限制数据的缓冲到可接受的程度,也就是读写速度不一致的源头与目的地不会压垮可用的内存。
因为 Duplex
和 Transform
都是可读又可写的,所以它们各自维护着两个相互独立的内部缓冲器用于读取和写入,
这使得它们在维护数据流时,读取和写入两边可以各自独立地运作。
例如,net.Socket
实例是 Duplex
流,它的可读端可以消费从 socket 接收的数据,而可写端则可以将数据写入到 socket。
因为数据写入到 socket 的速度可能比接收数据的速度快或者慢,所以在读写两端独立地进行操作(或缓冲)就显得很重要了。
用于消费流的 API#
几乎所有的 Node.js 应用都在某种程度上使用了流。 下面是一个例子,在 Node.js 应用程序中使用流实现了一个 HTTP 服务器:
const http = require('http');
const server = http.createServer((req, res) => {
// req 是一个 http.IncomingMessage 实例,它是可读流。
// res 是一个 http.ServerResponse 实例,它是可写流。
let body = '';
// 接收数据为 utf8 字符串,
// 如果没有设置字符编码,则会接收到 Buffer 对象。
req.setEncoding('utf8');
// 如果添加了监听器,则可读流会触发 'data' 事件。
req.on('data', (chunk) => {
body += chunk;
});
// 'end' 事件表明整个请求体已被接收。
req.on('end', () => {
try {
const data = JSON.parse(body);
// 响应一些信息给用户。
res.write(typeof data);
res.end();
} catch (er) {
// json 解析失败。
res.statusCode = 400;
return res.end(`错误: ${er.message}`);
}
});
});
server.listen(1337);
// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// 错误: Unexpected token o in JSON at position 1
可写流(比如例子中的 res
)会暴露了一些方法,比如 write()
和 end()
用于写入数据到流。
当数据可以从流读取时,可读流会使用 EventEmitter
API 来通知应用程序。
从流读取数据的方式有很多种。
可写流和可读流都通过多种方式使用 EventEmitter
API 来通讯流的当前状态。
Duplex
流和 Transform
流都是可写又可读的。
对于只需写入数据到流或从流消费数据的应用程序,并不需要直接实现流的接口,通常也不需要调用 require('stream')
。
需要实现新类型的流的开发者可以参考用于实现流的API章节。
可写流#
可写流是对数据要被写入的目的地的一种抽象。
可写流的例子包括:
- 客户端上的 HTTP 请求
- 服务器上的 HTTP 响应
- fs 写入的流
- zlib 流
- crypto 流
- TCP socket
- 子进程 stdin
process.stdout
、process.stderr
上面的一些例子事实上是实现了可写流接口的 Duplex
流。
所有可写流都实现了 stream.Writable
类定义的接口。
尽管可写流的具体实例可能略有差别,但所有的可写流都遵循同一基本的使用模式,如以下例子所示:
const myStream = getWritableStreamSomehow();
myStream.write('一些数据');
myStream.write('再来一些数据');
myStream.end('完成写入数据');
stream.Writable 类#
'close' 事件#
当流或其底层资源(比如文件描述符)被关闭时,触发 'close'
事件。
该事件表明不会再触发其他事件,且不会再发生运算。
不是所有可写流都会触发 'close'
事件。
'drain' 事件#
如果调用 stream.write(chunk)
方法返回 false
,则在适合恢复写入数据到流时触发 'drain'
事件。
// 向提供的可写流中写入数据一百万次。
// 注意背压(back-pressure)。
function writeOneMillionTimes(writer, data, encoding, callback) {
let i = 1000000;
write();
function write() {
let ok = true;
do {
i--;
if (i === 0) {
// 最后一次。
writer.write(data, encoding, callback);
} else {
// 检查是否可以继续写入。
// 不要传入 callback,因为写入还没有结束。
ok = writer.write(data, encoding);
}
} while (i > 0 && ok);
if (i > 0) {
// 不得不提前停下!
// 当 'drain' 事件触发后继续写入。
writer.once('drain', write);
}
}
}
'error' 事件#
当写入数据出错或使用管道出错时,触发 'error'
事件。
监听器回调函数被调用时会传入一个 Error
参数。
但触发 'error'
事件时,流还未被关闭。
'finish' 事件#
调用 stream.end()
方法且缓冲数据都已经传给底层系统之后,触发 'finish'
事件。
const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
writer.write(`你好,#${i}!\n`);
}
writer.end('这是结尾\n');
writer.on('finish', () => {
console.error('所有写入已完成。');
});
'pipe' 事件#
src
<stream.Readable> 通过管道流入到可写流的来源流。
当在可读流上调用 stream.pipe()
方法添加可写流到目标流向时,触发 'pipe'
事件。
const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('pipe', (src) => {
console.error('有数据正通过管道流入写入器');
assert.equal(src, reader);
});
reader.pipe(writer);
'unpipe' 事件#
src
<stream.Readable> 被移除写入管道的来源流。
当在可读流上调用 stream.unpipe()
方法从目标流向中移除当前可写流时,触发 'unpipe'
事件。
当可读流通过管道流向可写流发生错误时,也会触发 'unpipe'
事件。
const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('unpipe', (src) => {
console.error('已停止写入流的管道。');
assert.equal(src, reader);
});
reader.pipe(writer);
reader.unpipe(writer);
writable.cork()#
writable.cork()
方法会强制把所有写入的数据都缓冲到内存中。
当调用 stream.uncork()
或 stream.end()
方法时,被缓冲的数据才会被输出。
当写入大量小块数据到流时,内部缓冲可能失效,从而导致性能下降,writable.cork()
主要用于避免这种情况。
对于这种情况,实现了 writable._writev()
方法的流可以用更优的方式对写入的数据进行缓冲。
也可查看 writable.uncork()
。
writable.destroy([error])#
- 返回: <this>
摧毁流,并触发传入的 'error'
与 'close'
事件。
该方法被调用后,可写流就结束了,且之后调用 write()
或 end()
都会导致 ERR_STREAM_DESTROYED
错误。
实现流时不应该重写这个方法,而是重写 writable._destroy()
。
writable.end([chunk][, encoding][, callback])#
chunk
<string> | <Buffer> | <Uint8Array> | <any> 可选的,需要写入的数据。 对于非对象模式下的流chunk
必须是字符串、Buffer
、或Uint8Array
。 对于对象模式下的流,chunk
可以是任意 JavaScript 值,除了null
。encoding
<string> 如果chunk
是字符串,则指定字符编码。callback
<Function> 可选的,当流结束时的回调函数。- 返回: <this>
调用 writable.end()
方法表明已没有数据要被写入可写流。
可选的 chunk
和 encoding
参数可以在关闭流之前立即再写入一块数据。
如果传入了可选的 callback
函数,则它会做为监听器被添加到 'finish'
事件。
调用 stream.end()
之后再调用 stream.write()
方法会导致错误。
// 写入 'hello, ' ,并用 'world!' 来结束写入。
const fs = require('fs');
const file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world!');
// 后面不允许再写入数据!
writable.setDefaultEncoding(encoding)#
writable.setDefaultEncoding()
方法能为可写流设置默认的 encoding
。
writable.uncork()#
writable.uncork()
方法会输出 stream.cork()
方法被调用后缓冲的全部数据。
当使用 writable.cork()
和 writable.uncork()
来管理流写入缓存,建议使用 process.nextTick()
来延迟调用 writable.uncork()
。
通过这种方式,可以对单个 Node.js 事件循环中调用的所有 writable.write()
方法进行批处理。
stream.cork();
stream.write('一些 ');
stream.write('数据 ');
process.nextTick(() => stream.uncork());
如果一个流上多次调用 writable.cork()
方法,则必须调用同样次数的 writable.uncork()
方法才能输出缓冲的数据。
stream.cork();
stream.write('一些 ');
stream.cork();
stream.write('数据 ');
process.nextTick(() => {
stream.uncork();
// 数据不会被输出,直到第二次调用 uncork()。
stream.uncork();
});
也可查看 writable.cork()
。
writable.writableHighWaterMark#
返回当构造可写流时传入的 highWaterMark
的值。
writable.writableLength#
返回队列中准备被写入的字节数(或对象数)。
这个值提供了关于 highWaterMark
状态的内省数据。
writable.write(chunk[, encoding][, callback])#
chunk
<string> | <Buffer> | <Uint8Array> | <any> 要写入的数据。可选的。 对于非对象模式下的流chunk
必须是字符串、Buffer
或Uint8Array
。 对于对象模式下的流,chunk
可以是除null
外的任意 JavaScript 值。encoding
<string> 如果chunk
是字符串,则指定字符编码。callback
<Function> 当这块数据被输出时的回调函数。- 返回: <boolean> 如果流需要等待
'drain'
事件触发才能继续写入更多数据,则返回false
,否则返回true
。
writable.write()
方法写入一些数据到流中,并在这些数据被完全处理之后调用提供的 callback
。
如果发生错误,则 callback
可能被调用并传入错误作为第一个参数。
为了可靠地检测到写入的错误,可以为 'error'
事件添加监听器。
在确认了 chunk
后,如果内部的缓冲小于创建流时配置的 highWaterMark
,则返回 true
。
如果返回 false
,则应该停止向流中写入数据,直到 'drain'
事件被触发。
当流还未被排空, 则调用 write()
会缓冲 chunk
,并且返回 false。
一旦所有当前被缓冲的数据块都被排空了(被操作系统接受来进行输出),则触发 'drain'
事件。
建议一旦 write()
返回 false,则不再写入任何数据块,直到 'drain'
事件被触发。
当流还未被排空时,也是可以调用 write()
,Node.js 会缓冲所有被写入的数据块,直到达到最大内存占用,这时它会无条件中止。
甚至在它中止之前, 高内存占用将会导致垃圾回收器的性能变差和 RSS 变高(即使内存不再需要,通常也不会被释放回系统)。
如果远程的另一端没有读取数据,TCP 的 socket 可能永远也不会排空,所以写入到一个不会排空的 socket 可能会导致远程可利用的漏洞。
对于 Transform
, 写入数据到一个不会排空的流尤其成问题,因为 Transform
流默认会被暂停,直到它们被 pipe 或者被添加了 'data'
或 'readable'
事件处理函数。
如果要被写入的数据可以根据需要生成或者取得,建议将逻辑封装为一个可读流并且使用 stream.pipe()
。
如果要优先调用 write()
,则可以使用 'drain'
事件来防止背压与避免内存问题:
function write(data, cb) {
if (!stream.write(data)) {
stream.once('drain', cb);
} else {
process.nextTick(cb);
}
}
// 在回调函数被执行后再进行其他的写入。
write('hello', () => {
console.log('完成写入,可以进行更多的写入');
});
对象模式下的写入流会无视 encoding
参数。
可读流#
可读流是对提供数据的来源的一种抽象。
可读流的例子包括:
所有的可读流都实现了 stream.Readable
类上定义的接口。
两种模式#
可读流实质上运作于流动中(flowing)或已暂停(paused)两种模式之一。
在 flowing 模式中,数据自动地从底层的系统被读取,并通过 EventEmitter
接口的事件尽可能快地被提供给应用程序。
在 paused 模式中,必须显式调用 stream.read()
方法来从流中读取数据片段。
所有可读流都开始于 paused 模式,可以通过以下方式切换到 flowing 模式:
- 新增一个
'data'
事件处理函数。 - 调用
stream.resume()
方法。 - 调用
stream.pipe()
方法发送数据到可写流。
可读流可以通过以下方式切换回 paused 模式:
- 如果没有管道目标,调用
stream.pause()
方法。 - 如果有管道目标,移除所有管道目标。调用
stream.unpipe()
方法可以移除多个管道目标。
需要记住的重要概念是,只有提供了消费或忽略数据的机制后,可读流才会产生数据。 如果消费的机制被禁用或移除,则可读流会停止产生数据。
为了向后兼容,移除 'data'
事件处理函数不会自动地暂停流。
如果存在管道目标,一旦目标变为 drain
状态并请求接收数据时,则调用 stream.pause()
也不能保证流会保持暂停状态。
如果可读流切换到 flowing 模式,且没有可用的消费函数处理数据,则这些数据将会丢失。
例如,当调用 readable.resume()
方法时,没有监听 'data'
事件或 'data'
事件的处理函数从流中被移除了。
三种状态#
可读流运作的两种模式是对发生在可读流中更加复杂的内部状态管理的一种简化的抽象。
在任意时刻,任一可读流会处于以下三种状态之一:
readable.readableFlowing = null
readable.readableFlowing = false
readable.readableFlowing = true
当 readable.readableFlowing
为 null
时,没有提供消费流数据的机制,所以流不会产生数据。
在这个状态下,监听 'data'
事件、调用 readable.pipe()
方法、或调用 readable.resume()
方法,
则 readable.readableFlowing
会变成 true
,可读流开始主动地产生数据触发事件。
调用 readable.pause()
、readable.unpipe()
、或接收背压,则 readable.readableFlowing
会被设为 false
,暂时停止事件流动但不会停止数据的生成。
在这个状态下,为 'data'
事件设置监听器不会使 readable.readableFlowing
变成 true
。
const { PassThrough, Writable } = require('stream');
const pass = new PassThrough();
const writable = new Writable();
pass.pipe(writable);
pass.unpipe(writable);
// readableFlowing 现在为 false。
pass.on('data', (chunk) => { console.log(chunk.toString()); });
pass.write('ok'); // 不会触发 'data' 事件。
pass.resume(); // 必须调用它才会触发 'data' 事件。
当 readable.readableFlowing
为 false
时,数据可能会堆积在流的内部缓冲中。
选择一种方法#
可读流的 API 演化贯穿了多个 Node.js 版本,且提供了多种方法来消费流数据。 开发者通常应该选择其中一种方法来消费数据,且不要在单个流使用多种方法来消费数据。
对于大多数用户,建议使用 readable.pipe()
方法来消费流数据,因为它是最简单的一种实现。
开发者如果要精细地控制数据传递和产生的过程,可以使用 EventEmitter
和 readable.pause()
/readable.resume()
。
stream.Readable 类#
'close' 事件#
当流或其底层资源(比如文件描述符)被关闭时,触发 'close'
事件。
该事件表明不会再触发其他事件,且不会再发生运算。
不是所有可读流都会触发 'close'
事件。
'data' 事件#
chunk
<Buffer> | <string> | <any> 数据块。 对于非对象模式的流,数据块可以是字符串或Buffer
。 对于对象模式的流,数据块可是除null
以外的任意 JavaScript 值。
'data'
事件会在流将数据传递给消费者时触发。当流转换到 flowing 模式时会触发该事件。调用 readable.pipe()
, readable.resume()
方法,或为 'data'
事件添加回调可以将流转换到 flowing 模式。 'data'
事件也会在调用 readable.read()
方法并有数据返回时触发。
在没有明确暂停的流上添加 'data'
事件监听会将流转换为 flowing 模式。 数据会在可用时尽快传递给下个流程。
如果调用 readable.setEncoding()
方法明确为流指定了默认编码,回调函数将接收到一个字符串,否则接收到的数据将是一个 Buffer
实例。
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`接收到 ${chunk.length} 字节的数据。`);
});
'end' 事件#
'end'
事件将在流中再没有数据可供消费时触发。
注意: 'end'
事件只有在数据被完全消费后 才会触发 。 可以通过将流转换到
flowing 模式, 或反复调用 stream.read()
方法来实现这一点。
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
});
readable.on('end', () => {
console.log('There will be no more data.');
});
'error' 事件#
'error'
事件可以在任何时候在可读流实现(Readable implementation)上触发。
通常,这会在底层系统内部出错从而不能产生数据,或当流的实现试图传递错误数据时发生。
回调函数将接收到一个 Error
对象。
'readable' 事件#
'readable'
事件将在流中有数据可供读取时触发。在某些情况下,为 'readable'
事件添加回调将会导致一些数据被读取到内部缓存中。
const readable = getReadableStreamSomehow();
readable.on('readable', () => {
// 有一些数据可读了
});
当到达流数据尾部时, 'readable'
事件也会触发。触发顺序在 'end'
事件之前。
事实上, 'readable'
事件表明流有了新的动态:要么是有了新的数据,要么是到了流的尾部。 对于前者, stream.read()
将返回可用的数据。而对于后者, stream.read()
将返回
null
。 例如,下面的例子中的 foo.txt
是一个空文件:
const fs = require('fs');
const rr = fs.createReadStream('foo.txt');
rr.on('readable', () => {
console.log(`readable: ${rr.read()}`);
});
rr.on('end', () => {
console.log('end');
});
上面脚本的输出如下:
$ node test.js
readable: null
end
注意: 通常情况下,readable.pipe()
方法和 'data'
事件机制比 'readable'
事件更容易理解。然而处理 'readable'
事件可能造成吞吐量升高。
readable.destroy([error])#
销毁流,并且触发error
事件。然后,可读流将释放所有的内部资源。
开发者不应该覆盖这个方法,应该覆盖readable._destroy
方法。
readable.isPaused()#
- 返回: <boolean>
readable.isPaused()
方法返回可读流的当前操作状态。 该方法主要是在
readable.pipe()
方法的底层机制中用到。大多数情况下,没有必要直接使用该方法。
const readable = new stream.Readable();
readable.isPaused(); // === false
readable.pause();
readable.isPaused(); // === true
readable.resume();
readable.isPaused(); // === false
readable.pause()#
- 返回:
this
readable.pause()
方法将会使 flowing 模式的流停止触发 'data'
事件, 进而切出 flowing 模式。任何可用的数据都将保存在内部缓存中。
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
readable.pause();
console.log('There will be no additional data for 1 second.');
setTimeout(() => {
console.log('Now data will start flowing again.');
readable.resume();
}, 1000);
});
readable.pipe(destination[, options])#
destination
<stream.Writable> 数据写入目标-
options
<Object> Pipe 选项end
<boolean> 在 reader 结束时结束 writer 。默认为true
。
readable.pipe()
绑定一个 Writable 到 readable
上,
将可写流自动切换到 flowing 模式并将所有数据传给绑定的 Writable。数据流将被自动管理。这样,即使是可读流较快,目标可写流也不会超负荷(overwhelmed)。
下面例子将 readable
中的所有数据通过管道传递给名为 file.txt
的文件:
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// readable 中的所有数据都传给了 'file.txt'
readable.pipe(writable);
可以在单个可读流上绑定多个可写流。
readable.pipe()
方法返回 目标流 的引用,这样就可以对流进行链式地管道操作:
const r = fs.createReadStream('file.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);
默认情况下,当源可读流(the source Readable stream)触发 'end'
事件时,目标流也会调用 stream.end()
方法从而结束写入。要禁用这一默认行为, end
选项应该指定为 false
, 这将使目标流保持打开,
如下面例子所示:
reader.pipe(writer, { end: false });
reader.on('end', () => {
writer.end('Goodbye\n');
});
这里有一点要警惕,如果可读流在处理时发生错误,目标可写流 不会 自动关闭。 如果发生错误,需要 手动 关闭所有流以避免内存泄漏。
注意:不管对 process.stderr
和 process.stdout
指定什么选项,它们都是直到 Node.js 进程退出才关闭。
readable.read([size])#
readable.read()
方法从内部缓冲区中抽出并返回一些数据。 如果没有可读的数据,返回null。readable.read()
方法默认数据将作为“Buffer”对象返回
,除非已经使用readable.setEncoding()
方法设置编码或流运行在对象模式。
可选的size
参数指定要读取的特定数量的字节。如果size
字节不可读,将返回null
除非流已经结束,在这种情况下所有保留在内部缓冲区的数据将被返回。
如果没有指定size
参数,则内部缓冲区包含的所有数据将返回。
readable.read()
方法只应该在暂停模式下的可读流上运行。在流模式下,readable.read()
自动调用直到内部缓冲区的数据完全耗尽。
const readable = getReadableStreamSomehow();
readable.on('readable', () => {
let chunk;
while (null !== (chunk = readable.read())) {
console.log(`Received ${chunk.length} bytes of data.`);
}
});
一般来说,建议开发人员避免使用'readable'
事件和readable.read()
方法,使用readable.pipe()
或'data'
事件代替。
无论size
参数的值是什么,对象模式中的可读流将始终返回调用readable.read(size)
的单个项目。
注意:如果readable.read()
方法返回一个数据块,那么一个'data'
事件也将被发送。
注意:在已经被发出的'end'
事件后调用stream.read([size])
事件将返回null
。不会抛出运行时错误。
readable.readableHighWaterMark#
返回构造该可读流时传入的 'highWaterMark'
属性。
readable.readableLength#
- Returns: <number>
This property contains the number of bytes (or objects) in the queue
ready to be read. The value provides introspection data regarding
the status of the highWaterMark
.
readable.resume()#
- 返回:
this
readable.resume()
方法会重新触发 'data'
事件, 将暂停模式切换到流动模式。
readable.resume()
方法可以用来充分使用流中的数据,而不用实际处理任何数据,如以下示例所示:
getReadableStreamSomehow()
.resume()
.on('end', () => {
console.log('Reached the end, but did not read anything.');
});
readable.setEncoding(encoding)#
encoding
<string> 要使用的编码- Returns:
this
readble.setEncoding()
方法会为从可读流读入的数据设置字符编码
默认返回Buffer
对象。设置编码会使得该流数据返回指定编码的字符串而不是Buffer
对象。例如,调用readable.setEncoding('utf-8')
会使得输出数据作为UTF-8数据解析,并作为字符串返回。调用readable.setEncoding('hex')
使得数据被编码成16进制字符串格式。
可读流会妥善处理多字节字符,如果仅仅直接从流中取出Buffer
对象,很可能会导致错误解码。
const readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', (chunk) => {
assert.equal(typeof chunk, 'string');
console.log('got %d characters of string data', chunk.length);
});
readable.unpipe([destination])#
destination
<stream.Writable> 可选的,指定需要分离的目标流
readable.unpipe()
方法将之前通过stream.pipe()
方法绑定的流分离
如果 destination
没有传入, 则所有绑定的流都会被分离.
如果传入 destination
, 但它没有被pipe()
绑定过,则该方法不作为.
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt',
// but only for the first second
readable.pipe(writable);
setTimeout(() => {
console.log('Stop writing to file.txt');
readable.unpipe(writable);
console.log('Manually close the file stream');
writable.end();
}, 1000);
readable.unshift(chunk)#
chunk
<Buffer> | <Uint8Array> | <string> | <any> 数据块移动到可读队列底部。对于不以对象模式运行的流chunk
必须是字符串,Buffer
或者Uint8Array
。对于对象流,chunk
任何非null
的值。
readable.unshift()
方法会把一块数据压回到Buffer
内部。
这在如下特定情形下有用:
代码正在消费一个数据流,已经"乐观地"拉取了数据。
又需要"反悔-消费"一些数据,以便这些数据可以传给其他人用。
注意: 'end'
事件已经触发或者运行时错误抛出后,stream.unshift(chunk)
方法不能被调用。
使用 stream.unshift()
的开发者一般需要换一下思路,考虑用一个Transform 流替代.
更多信息请查看API for Stream Implementers部分。
// Pull off a header delimited by \n\n
// use unshift() if we get too much
// Call the callback with (error, header, stream)
const { StringDecoder } = require('string_decoder');
function parseHeader(stream, callback) {
stream.on('error', callback);
stream.on('readable', onReadable);
const decoder = new StringDecoder('utf8');
let header = '';
function onReadable() {
let chunk;
while (null !== (chunk = stream.read())) {
const str = decoder.write(chunk);
if (str.match(/\n\n/)) {
// found the header boundary
const split = str.split(/\n\n/);
header += split.shift();
const remaining = split.join('\n\n');
const buf = Buffer.from(remaining, 'utf8');
stream.removeListener('error', callback);
// remove the readable listener before unshifting
stream.removeListener('readable', onReadable);
if (buf.length)
stream.unshift(buf);
// now the body of the message can be read from the stream.
callback(null, header, stream);
} else {
// still reading the header.
header += str;
}
}
}
}
注意: 不像 stream.push(chunk)
,stream.unshift(chunk)
在重置流的内部读取状态时是不会结束读取过程。 如果在读取过程中调用 readable.unshift()
则会导致异常 (例如:即来自自定义流上的 stream._read()
内部方法上的实现)。 应该在调用 readable.unshift()
方法之后适当调用 stream.push('')
来重置读取状态,执行读取的过程中最好避免调用 readable.unshift()
方法。
readable.wrap(stream)#
stream
<Stream> 一个老版本的readable stream
Node.js在v0.10版本之前的流没有实现当前定义的所有流模块的API.(查看更多兼容性信息 Compatibility )
当使用老版本的Node.js库来触发'data'
事件和stream.pause()
方法仅是建议性的,
readable.wrap()
方法能创建一个把老版本的流作为数据源的Readable stream。
几乎没有必要使用readable.wrap()
,但是这个方法已经为老版本的Node.js应用和一些库提供了方便。
例子:
const { OldReader } = require('./old-api-module.js');
const { Readable } = require('stream');
const oreader = new OldReader();
const myReader = new Readable().wrap(oreader);
myReader.on('readable', () => {
myReader.read(); // etc.
});
readable\[Symbol.asyncIterator\]()#
- Returns: <AsyncIterator> to fully consume the stream.
const fs = require('fs');
async function print(readable) {
readable.setEncoding('utf8');
let data = '';
for await (const k of readable) {
data += k;
}
console.log(data);
}
print(fs.createReadStream('file')).catch(console.log);
If the loop terminates with a break
or a throw
, the stream will be
destroyed. In other terms, iterating over a stream will consume the stream
fully. The stream will be read in chunks of size equal to the highWaterMark
option. In the code example above, data will be in a single chunk if the file
has less then 64kb of data because no highWaterMark
option is provided to
fs.createReadStream()
.
Duplex 流与 Transform 流#
stream.Duplex 类#
Duplex 流是同时实现了 Readable 和 Writable 接口的流。
Duplex 流的实例包括了:
stream.Transform 类#
变换流(Transform streams) 是一种 Duplex 流。它的输出与输入是通过某种方式关联的。和所有 Duplex 流一样,变换流同时实现了 Readable 和 Writable 接口。
变换流的实例包括:
transform.destroy([error])#
销毁这个流,发射'error'
事件。
调用这个之后,变换流会释放全部内部资源
实现者不应该重载此方法,而应该实现readable._destroy
。
Transform
的默认_destroy
实现也发射'close'
事件。
stream.finished(stream, callback)#
stream
<Stream> 一个可读或可写的流callback
<Function> 一个回调函数,可以带有一个错误信息参数,也可没有
使用此函数,以在一个流不再可读、可写或发生了错误、提前关闭等事件时获得通知。
const { finished } = require('stream');
const rs = fs.createReadStream('archive.tar');
finished(rs, (err) => {
if (err) {
console.error('流发生错误', err);
} else {
console.log('流已读完');
}
});
rs.resume(); // 将流读完
在处理流的提前销毁(如被抛弃的HTTP请求)等错误事件时特别有用,此时流不会触发 'end'
或 'finish'
事件。
finished
API 也可做成承诺:
const finished = util.promisify(stream.finished);
const rs = fs.createReadStream('archive.tar');
async function run() {
await finished(rs);
console.log('流已读完');
}
run().catch(console.error);
rs.resume(); // 将流读完
stream.pipeline(...streams[, callback])#
...streams
<Stream> 两个或多个要用管道连接的流callback
<Function> 一个回调函数,可以带有一个错误信息参数
该模块方法用于在多个流之间架设管道,可以自动传递错误和完成扫尾工作,并且可在管道架设完成时提供一个回调函数:
const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');
// 使用 pipeline API 轻松连接多个流
// 并在管道完成时获得通知
// 使用pipeline高效压缩一个可能很大的tar文件:
pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
(err) => {
if (err) {
console.error('管道架设失败', err);
} else {
console.log('管道架设成功');
}
}
);
pipeline
API 也可做成承诺:
const pipeline = util.promisify(stream.pipeline);
async function run() {
await pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz')
);
console.log('管道架设成功');
}
run().catch(console.error);
用于实现流的 API#
stream
模块API的设计是为了让JavaScript的原型继承模式可以简单的实现流。
首先,一个流开发者可能声明了一个JavaScript类并且继承四个基本流类中的一个(stream.Writeable
,stream.Readable
,stream.Duplex
,或者stream.Transform
),确保他们调用合适的父类构造函数:
const { Writable } = require('stream');
class MyWritable extends Writable {
constructor(options) {
super(options);
// ...
}
}
新的流类必须实现一个或多个特定的方法,根据所创建的流类型,如下图所示:
用例 | 类 | 实现的方法 |
---|---|---|
只读流 | Readable | _read |
只写流 | writable | _write ,_writev,_final |
可读可写流 | Duplex | _read ,_write ,_writev,_final |
操作写数据,然后读结果 | Transform | _transform,_flush,_final |
注意:实现流的代码里面不应该出现调用“public”方法的地方因为这些方法是给使用者使用的(流使用者部分的API所述)。这样做可能会导致使用流的应用程序代码产生不利的副作用。
Simplified Construction#
对于许多简单的案例,它是有可能在不依赖继承的情况下创建流。这可以直接创建流实例,通过流基础类stream.Writable
,stream.Readable
,stream.Duplex
,或者stream.Transform
传入对象完成,对象包含合适的方法作为构造函数选项。
const { Writable } = require('stream');
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
}
});
Implementing a Writable Stream#
这个stream.Writable
类被用于实现可写流。
自定义可写流必须调用new stream.Writable([options])
构造函数并且实现writable._write()
方法。writable._writev()
方法也是可以实现的。
Constructor: new stream.Writable([options])#
-
options
<Object>highWaterMark
<number> 缓冲大小当开始调用stream.write()
返回false
。默认16384
(16kb), 对于objectMode
流为默认为16
。decodeStrings
<boolean> 是否解码字符串在调用stream._write()
传递到缓冲区之前。默认为true
objectMode
<boolean>stream.write(anyObj)
是否是一个有效的操作. 一旦设置,可以写字符串以外的值,例如Buffer
或者Uint8Array
只要流支持。默认为false
。write
<Function> 实现stream._write()
方法。writev
<Function> 实现stream._writev()
方法。destroy
<Function> 实现stream._destroy()
方法。final
<Function> 实现stream._final()
方法。
例如:
const { Writable } = require('stream');
class MyWritable extends Writable {
constructor(options) {
// Calls the stream.Writable() constructor
super(options);
// ...
}
}
或者,使用ES6之前的语法来创建构造函数:
const { Writable } = require('stream');
const util = require('util');
function MyWritable(options) {
if (!(this instanceof MyWritable))
return new MyWritable(options);
Writable.call(this, options);
}
util.inherits(MyWritable, Writable);
或者,使用简化的构造函数方法:
const { Writable } = require('stream');
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
}
});
writable.\_write(chunk, encoding, callback)#
chunk
<Buffer> | <string> | <any> 要写的块。会一直作为缓冲区,除非decodeStrings
选项设置为false
或者流以对象模式运行。encoding
<string> 如果块是字符串,那么encoding
就是该字符串的字符编码。 如果块是Buffer
,或者是流在对象模式下运行,encoding
可能被忽略。callback
<Function> 调用此函数err
参数可选)当块处理完成时。
所有可写流实现必须提供一个 writable._write()
方法将数据发送到底层资源。
注意:Transform 流提供自己实现的writable._write()
。
注意:此函数不得直接由应用程序代码调用。 它应该由子类实现,并由内部的Writable类方法调用。
必须调用callback
方法来表示写完成成功或失败,出现错误。callback
第一个参数必须是Error
对象如果调用失败,成功时为null
。
所有writable._write()
被调用并且callback
被调用将导致要缓冲的写入数据。 一旦调用callback
,流将会执行'drain'
事件。 如果想让流实现一次能够处理多个数据块,writable._writev()
方法应该被实现。
如果在构造函数选项中设置decodeStrings
属性,那么chunk
可能是一个字符串而不是一个缓冲区,encodeing
将会表示字符串的字符编码。 这是为了支持对某些字符串具有优化处理的实现数据编码。 如果decodeStrings
属性显式设置为false
,encoding
参数可以安全地忽略,chunk
将保持不变传递给.write()
的对象。
writable._write()
方法前缀为下划线,因为它是在定义它的类的内部,不应该直接调用用户程序。
writable.\_writev(chunks, callback)#
chunks
<Array> 要写的块 每个块都有以下格式{chunk:...,encoding:...}
。callback
<Function> 一个回调函数(可选地带有一个错误参数)在提供的块的处理完成时被调用。
注:此函数不得直接通过应用程序代码调用。 它应由子类实现,并由内部Writable进行调用类方法。
writable._writev()
方法能够一次处理多个数据块的流除了writable._write()
之外。如果实现,该方法将缓存的所有数据块写入队列。
writable._writev()
方法前缀为下划线,因为它是定义它的类的内部,不应该由用户程序直接调用。
writable.\_destroy(err, callback)#
err
<Error> 错误。callback
<Function> 回调函数err
参数可选。
通过 writable.destroy()
方法调用_destroy()
。它可以被子类覆盖,但不能直接调用。
writable.\_final(callback)#
callback
<Function> 在完成写入所有剩余数据时调用该函数err
参数可选)。
_final()
方法不能直接调用。 应该由子类负责实现,如果是,将仅可以由内部的Writable类方法进行调用。
这个可选的函数将在流关闭之前被调用, 直到callback
回调函数执行完成才触发finish
事件。这对于关闭资源或在流结束之前写入缓冲数据很有用。
Errors While Writing#
建议在处理writable._write()
和writable._writev()
方法期间发生的错误时传给回调函数的第一个参数来处理。这将导致Writable触发error
事件。从writable._write()
中抛出一个错误可能会导致意外和不一致的行为,具体取决于如何使用流。使用回调可确保对错误进行一致且可预测的处理。
const { Writable } = require('stream');
const myWritable = new Writable({
write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'));
} else {
callback();
}
}
});
一个可写流的例子#
下面说明了一个相当简单(有点无意义)的可写流实现。虽然这个具体的可写流实例没有任何真正的特殊用途,但该示例说明了一个自定义流实例所需要的元素:
const { Writable } = require('stream');
class MyWritable extends Writable {
constructor(options) {
super(options);
// ...
}
_write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'));
} else {
callback();
}
}
}
Decoding buffers in a Writable Stream#
解码buffers是一个常见任务,比如用变换流处理字符串输入。
当用多字节字符编码方式(比如UTF-8)时,这是一个微不足道的过程。
下面的例子展示了如何用StringDecoder
和 Writable解码多字节字符串。
const { Writable } = require('stream');
const { StringDecoder } = require('string_decoder');
class StringWritable extends Writable {
constructor(options) {
super(options);
const state = this._writableState;
this._decoder = new StringDecoder(state.defaultEncoding);
this.data = '';
}
_write(chunk, encoding, callback) {
if (encoding === 'buffer') {
chunk = this._decoder.write(chunk);
}
this.data += chunk;
callback();
}
_final(callback) {
this.data += this._decoder.end();
callback();
}
}
const euro = [[0xE2, 0x82], [0xAC]].map(Buffer.from);
const w = new StringWritable();
w.write('currency: ');
w.write(euro[0]);
w.end(euro[1]);
console.log(w.data); // currency: €
Implementing a Readable Stream#
stream.Readable
类扩展并实现了Readable。
用户实现的自定义可读流 必须 调用new stream.Readable([options])
构造函数并且实现readable._read()
方法。
new stream.Readable([options])#
options
<Object>highWaterMark
<number> 从底层资源读取数据并存储在内部缓冲区中的最大字节数。默认16384
(16kb), 或者16
对应objectMode
流模式。encoding
<string> 指定解析的字符编码格式. 默认 为null
objectMode
<boolean> 一个对象的流。 这意味着stream.read(n)
返回的是一个单一的对象而不是n个字节的缓冲区。默认false
read
<Function> 对stream._read()
方法的实现。 *destroy
<Function> 对stream._destroy()
方法的实现。
例如:
const { Readable } = require('stream');
class MyReadable extends Readable {
constructor(options) {
// Calls the stream.Readable(options) constructor
super(options);
// ...
}
}
或者使用ES6语法:
const { Readable } = require('stream');
const util = require('util');
function MyReadable(options) {
if (!(this instanceof MyReadable))
return new MyReadable(options);
Readable.call(this, options);
}
util.inherits(MyReadable, Readable);
或者使用简化的构造方法:
const { Readable } = require('stream');
const myReadable = new Readable({
read(size) {
// ...
}
});
readable.\_read(size)#
size
<number> 异步读取的字节数。
注意: 这个函数不能直接被应用程序代码调用。 它应由子类实现,并仅能由Readable对象内部方法调用。
所有实现可读流的实例必须实现readable._read()
方法去获得底层的数据资源。
当 readable._read()
被调用,如果读取的数据是可用的,应该在最开始的实现的时候使用this.push(dataChunk)
方法将该数据推入读取队列。_read()
应该一直读取资源直到推送数据方法readable.push()
返回false
的时候停止。想再次调用_read()
方法,需要再次往可读流里面push数据。
注意:一旦readable._read()
方法被调用,只有在 readable.push()
方法被调用之后,才能再次被调用。
size
可选参数。_read()
方法是一个实现读取数据的单操作,设置size
参数来确定要读取数据的大小。 其他的实现可能会忽略这个参数,只要数据可用就提供数据。 不需要等到stream.push(chunk)
方法推入一定size
的数据后才能调用。
readable._read()
方法的前缀是一个下划线,因为它是在定义它的类的内部,不应该被直接调用用户程序。
readable.\_destroy(err, callback)#
err
<Error> 错误。callback
<Function> 回调函数,第一个参数为err
参数。
_destroy()
需通过readable.destroy()
方法调用。它可以被子类覆盖,但不能直接调用。
readable.push(chunk[, encoding])#
chunk
<Buffer> | <Uint8Array> | <string> | <null> | <any> 压入读队列的数据块。 对于没有处在object mode的流来说chunk
必须是一个字符串,Buffer
或Uint8Array
; 对object mode 的流来说,chunk
可以使任何JavaScript值。encoding
<string> 字符串数据块的编码方式. 必须是可用的Buffer编码方式,例如'utf8'
或'ascii'
。- 返回 <boolean> 如果多余的数据块可能会继续压入,那么返回
true
; 否则返回false
.
当chunk
是一个Buffer
, Uint8Array
或者string
时,
这个数据块(chunk
)会被添加到内部队列供使用者消费。
在没有数据可写入后,给chunk
传了null
发出流结束(EOF)的信号。
当可读流处在传输模式下,'data'
事件触发时,可以通过
调用readable.read()
方法读出来数据,这数据是用readable.push()
添加的。
readable.push()
方法被设计得尽可能的灵活。
比如,当封装一个有'暂停/恢复'机制和带数据回调的底层source的时候,
那么这个底层的source可以被常规的可读流实例封装。就像下面的例子一样。
// source 是一个有readStop()和 readStart()方法的对象。
// 有数据就调`ondata`成员函数;
// 数据结束就调`onend`成员函数。
class SourceWrapper extends Readable {
constructor(options) {
super(options);
this._source = getLowlevelSourceObject();
// Every time there's data, push it into the internal buffer.
this._source.ondata = (chunk) => {
// if push() returns false, then stop reading from source
if (!this.push(chunk))
this._source.readStop();
};
// When the source ends, push the EOF-signaling `null` chunk
this._source.onend = () => {
this.push(null);
};
}
// _read will be called when the stream wants to pull more data in
// the advisory size argument is ignored in this case.
_read(size) {
this._source.readStart();
}
}
注意: readable.push()
方法是为了让可读流实现者调用的,
而且只来自readable._read()
方法内部。
Errors While Reading#
建议在调用
readable._read()
方法时发生的错误应该执行触发 'error'
事件,而不是抛出异常错误。从readable._read()
中抛出错误可能会导致意外的和不一致的行为,具体取决于流是以流还是暂停模式运行。 使用“错误”事件可确保一致且可预测的错误处理。
const { Readable } = require('stream');
const myReadable = new Readable({
read(size) {
if (checkSomeErrorCondition()) {
process.nextTick(() => this.emit('error', err));
return;
}
// do some work
}
});
一个数流的例子#
以下是可读流的一个基本例子,触发数字1到1,000,000升序,然后结束
const { Readable } = require('stream');
class Counter extends Readable {
constructor(opt) {
super(opt);
this._max = 1000000;
this._index = 1;
}
_read() {
const i = this._index++;
if (i > this._max)
this.push(null);
else {
const str = '' + i;
const buf = Buffer.from(str, 'ascii');
this.push(buf);
}
}
}
Implementing a Duplex Stream#
双工流(可读可写流)是可读流和可写流的实现,例如TCP套接字连接。
因为JavaScript不支持多重继承,所以stream.Duplex
类被扩展以实现双工流(而不是扩展stream.Readable
和stream.Writable
类)。
注意。stream.Duplex
类原型继承来自stream.Readable
和寄生的stream.Writable
,但是instanceof
将会在这两个基础类上正确工作,由于stream.Writable
覆盖了 Symbol.hasInstance方法。
自定义双工流必须通过new stream.Duplex([options])
构造函数并实现readable._read()
和writable._write()
方法。
new stream.Duplex(options)#
-
options
<Object> 传给可读和可写流的构造函数,还有如下字段: *allowHalfOpen
<boolean> 默认是true
。如果设置为false
, 那么当读端停止时,写端自动停止。readableObjectMode
<boolean> 默认是false
。会为流的读端设置objectMode
。如果objectMode
是true
,那就没有任何用。writableObjectMode
<boolean> 默认是false
。会为流的写端设置objectMode
。如果objectMode
是true
,那就没有任何用。readableHighWaterMark
<number> 设置highWaterMark
可读流的缓冲区大小。 如果已经设置highWaterMark
则readableHighWaterMark
不起作用。writableHighWaterMark
<number> 设置highWaterMark
可写流缓冲区大小。如果设置了highWaterMark
则writableHighWaterMark
不起作用。
例如:
const { Duplex } = require('stream');
class MyDuplex extends Duplex {
constructor(options) {
super(options);
// ...
}
}
或者, 使用ES6之前的语法来创建构造函数:
const { Duplex } = require('stream');
const util = require('util');
function MyDuplex(options) {
if (!(this instanceof MyDuplex))
return new MyDuplex(options);
Duplex.call(this, options);
}
util.inherits(MyDuplex, Duplex);
又或者, 用简化的构造函数:
const { Duplex } = require('stream');
const myDuplex = new Duplex({
read(size) {
// ...
},
write(chunk, encoding, callback) {
// ...
}
});
An Example Duplex Stream#
下面是一个可读可写流包装了一个假定的可读可写的底层源对象, 尽管用了一个与Node.js流不兼容的API。
下面是一个简单的例子, 在一个可读可写流中,来的buffers通过Writable 接口写入数据,再通过Readable接口读回数据。
const { Duplex } = require('stream');
const kSource = Symbol('source');
class MyDuplex extends Duplex {
constructor(source, options) {
super(options);
this[kSource] = source;
}
_write(chunk, encoding, callback) {
// The underlying source only deals with strings
if (Buffer.isBuffer(chunk))
chunk = chunk.toString();
this[kSource].writeSomeData(chunk);
callback();
}
_read(size) {
this[kSource].fetchSomeData(size, (data, encoding) => {
this.push(Buffer.from(data, encoding));
});
}
}
尽管在一个对象实例中共存,读端和写端却是相互独立于彼此,这是可读可写流最为重要的一点。
Object Mode Duplex Streams#
对可读可写流来说,objectMode
可以通过readableObjectMode
和 writableObjectMode
选项
来分别设置读端和写端。
比如下面的例子。 创建了一个变换流(一种可读可写流)。 在写端接收JavaScript数字,在读端转换为16进制字符串。
const { Transform } = require('stream');
// All Transform streams are also Duplex Streams
const myTransform = new Transform({
writableObjectMode: true,
transform(chunk, encoding, callback) {
// Coerce the chunk to a number if necessary
chunk |= 0;
// Transform the chunk into something else.
const data = chunk.toString(16);
// Push the data onto the readable queue.
callback(null, '0'.repeat(data.length % 2) + data);
}
});
myTransform.setEncoding('ascii');
myTransform.on('data', (chunk) => console.log(chunk));
myTransform.write(1);
// Prints: 01
myTransform.write(10);
// Prints: 0a
myTransform.write(100);
// Prints: 64
Implementing a Transform Stream#
一个Transform流是一种Duplex流,输入经过Transform流,做某种计算然后输出。 比如 zlib流和crypto流会做压缩,加密和解密数据。
注意: 输出流的大小,有多少数据包,到达时间都不一定非要和输入流一样。
比如,一个哈希流再输入结束时永远只会输出单个数据块;
而一个zlib
流的输出,可能比输入大得多或小得多。
stream.Transform
类被扩展了,实现了一个Transform流。
stream.Transform
类最初继承自stream.Duplex
,并且实现了它自己版本的writable._write()
和readable._read()
方法。
一般地,变换流必须实现transform._transform()
方法;
而transform._flush()
方法是非必须的。
注意: 用变换流时要注意,如果读端输出没有被消费,那么往写数据可能会引起写端暂停。
new stream.Transform([options])#
options
<Object> 传给可读和可写构造函数。 还有如下字段:transform
<Function> 对stream._transform()
方法的实现。flush
<Function> 对stream._flush()
方法的实现。
例如:
const { Transform } = require('stream');
class MyTransform extends Transform {
constructor(options) {
super(options);
// ...
}
}
或者,用ES6构造方法:
const { Transform } = require('stream');
const util = require('util');
function MyTransform(options) {
if (!(this instanceof MyTransform))
return new MyTransform(options);
Transform.call(this, options);
}
util.inherits(MyTransform, Transform);
又或者, 用简化构造方法:
const { Transform } = require('stream');
const myTransform = new Transform({
transform(chunk, encoding, callback) {
// ...
}
});
Events: 'finish' and 'end'#
'finish'
事件来自stream.Writable
;'end'
事件来自stream.Readable
类。
在调用了stream.end()
并且stream._transform()
处理了全部数据块之后,
'finish'
事件触发。
transform._flush()
中的回调函数被调用之后,所有数据已经输出,此时,'end'
事件触发
transform.\_flush(callback)#
callback
<Function> 一个当剩余的数据被冲刷后被调用的回调函数(error参数和data可选)。
Note: 应用程序代码禁止直接调用这个函数。它应该由子类来实现,并且只能被内部可读流的类方法调用。
在某些情况下,转换操作需要在流的末尾发射一块额外的数据。例如,zlib
压缩流会存储一种优先用于压缩输出的内部状态。但是在流结束的时候,那段额外的数据需要被冲刷才能完成数据压缩。
自定义的 Transform 实现,可以实现transform._flush()
方法。在没有更多的要写的数据被消耗时,会调用这个方法,但是在发射'end'
事件之前会发出可读流结束的信号。
在 transform._flush()
实现里,readable.push()
方法会在适当的时候调用零次或者多次。callback
在冲刷操作完成的时候一定会被调用。
transform._flush()
方法前缀为下划线,因为它是在定义它的类的内部,绝不应该被用户程序直接调用。
transform.\_transform(chunk, encoding, callback)#
chunk
<Buffer> | <string> | <any> 被转换的数据块。它总是一个buffer除非在option中配置decodeString
为false
或者当前流处在object mode
下。encoding
<string> 如果 chunk 是字符串,那么encoding就是该字符串的字符编码。如果块是Buffer,它是一个特殊的值'buffer',这种情况encoding可以被忽略。callback
<Function> 当块被处理完成时调用此函数(包含error和data参数)。
注意:此函数不得直接由应用程序代码调用。它应该由子类实现,并由内部的Readable类方法调用。
所有的变换流的执行必须提供一个_transform()
方法接收输入并提供输出。transform._transform()
的实现会处理写入的字节,做某种计算并输出,然后使用readable.push()
方法把这个输出传递到可读流。
从一个单个输入数据块可能会调用零次或多次transform.push()
方法,调用次数取决于每次把多少数据做为一个输出结果。
有可能从任意给定输入的数据块中没有产生任何输出。
callback
会在当前数据被完全消费之后调用。在处理过程输入的过程中如果出错了,第一个参数是一个错误对象,没有出错Error参数则为null。如果传递第二个参数,它会被转发到readable.push()
中。就像下面的例子:
transform.prototype._transform = function(data, encoding, callback) {
this.push(data);
callback();
};
transform.prototype._transform = function(data, encoding, callback) {
callback(null, data);
};
transform._transform()
方法前缀为下划线,因为它是定义在它的类的内部,不应该直接被用户程序调用。
transform._transform()
方法永远不能并行调用;流使用了队列机制,不论同步或者异步情况下,都必须先调用callback之后才能接收下一个数据。
Class: stream.PassThrough#
stream.PassThrough
类是一个极简Transform流,只是把输入字节原样不动直接输出。
一开始的目的是用来做例子和测试,但是有时也作为某些新颖的流的基本组成部分起作用。
其他注意事项#
Compatibility with Older Node.js Versions#
在v0.10之前的Node.js版本中,可读流接口更简单,但功能更弱,功能更少。
- 相比需要等待
stream.read()
方法调用之后才触发,'data'
事件自己会立即触发。 需要执行一定量工作来决定如何处理数据的应用程序需要将读取数据存储到缓冲区中,以便数据不会丢失。 - The
stream.pause()
方法是建议性的,而不是保证。 这意味着即使流处于暂停状态,仍然需要准备接收data
事件。
在Node.js v0.10中,添加了Readable类。 为了向后兼容较旧的Node.js程序,当添加data
事件处理程序或调用stream.resume()
方法时,可读流将切换到“流动模式”。 结果是,即使不使用新的stream.read()
方法和'readable'
事件,也不必担心丢失'data'
块。
虽然大多数应用程序将继续正常工作,但在以下情况下会引入极端情况:
- 未添加
'data'
事件监听。 - 未调用
stream.resume()
方法。 - 通过管道没有传送到任何可写的目的地。
例如,请考虑以下代码:
// WARNING! BROKEN!
net.createServer((socket) => {
// we add an 'end' method, but never consume the data
socket.on('end', () => {
// It will never get here.
socket.end('The message was received but was not processed.\n');
});
}).listen(1337);
在v0.10之前的Node.js版本中,传入的消息数据将会是简单地丢弃。 但是,在Node.js v0.10及更高版本中,套接字仍然存在永远停顿。
在这种情况下的解决方法是调用stream.resume()
开始读取数据:
// Workaround
net.createServer((socket) => {
socket.on('end', () => {
socket.end('The message was received but was not processed.\n');
});
// start the flow of data, discarding it.
socket.resume();
}).listen(1337);
除了新的可读流切换到流动模式之外,pre-v0.10风格的流可以使用包装在Readable类中readable.wrap()
方法。
readable.read(0)
#
在某些情况下,需要有机制来触发刷新基础可读流, 而没有实际消费任何数据。在这种情况下,可以调用readable.read(0)
,返回null
。
如果内部读取缓冲区低于highWaterMark
,并且该流目前未读取,则调用stream.read(0)
将触发调用底层 stream._read()
方法。
虽然大多数应用程序几乎都不需要这样做,但Node.js中会出现这种情况,尤其是在可读流类内部。
readable.push('')
#
不推荐使用readable.push('')
。
向一个不处在object mode的流压入一个Buffer
或Uint8Array
0字节字符串,会产生有趣的副作用。
因为调用了readable.push()
,所以会停止读进程。
然而,因为参数是一个空字符串,没有可读的数据添加到可读buffer, 所以没有可以供用户消费的数据。
`highWaterMark` discrepancy after calling `readable.setEncoding()`#
调用 readable.setEncoding()
会改变 highWaterMark
属性在非对象模式中的作用。
一般而言,我们直接将缓冲器存储的 字节数 同 highWaterMark
相比较。然而在调用 setEncoding()
之后,程序会将缓冲器中存储的 字符数 与 highWaterMark
相比较。
在通常情况下,如使用 latin1
或 ascii
时,这不成问题。但在处理可能含有多字节字符的字符串时,此行为需要当心。