- Published on
Stream of Node.js
- Stream types: The already-mentioned four foundational stream types (Writeable, Readable, Duplex, and Transform).
- Buffering: Temporary data storage within streams, controlled by
highWaterMark
to manage memory usage. - Backpressure: Mechanism to balance data flow between fast producers and slow consumers, preventing memory issues.
- Modes of Operation: Flowing (automatic data reading) vs Paused (manual data reading) modes for controlling data consumption.
- Piping: Connecting streams to automatically manage data flow and backpressure between them.
- Events: Asynchronous notifications for stream states and data availability, crucial for stream interaction.
- Object Mode: An option with streams that enable the capability to process JavaScript objects instead of just buffers or strings.
我认为 Stream 是 Node 中最好的特性。用事件的方式(Stream的实现采用了EventEmitter)来解决CPU 和 IO 之间处理速度的矛盾(CPU不必等待I/O设备的操作结果)。
流的事件

在可读流上最重要的事件有:
data 事件,当流传递给消费者一个数据块的时候会触发。
end 事件,当在流中没有可以消费的数据的时候会触发。
在可写流上面最重要的事件有:
drain 事件,当可写流可以接受更多的数据时的一个标志。
finish 事件,当所有的数据都写入到底层系统中时会触发。
可读流的暂停和流动模式
- 默认一开始是暂停模式,添加 data 事件监听会切换到流动模式。
- 使用 read() 方法从暂停模式的流中按需读取数据,然而,对于一个在流动模式的可读流,数据持续流动,我们不得不监听事件去消费它。
- 使用 resume() 方法和 pause() 方法手动切换两个流模式
Backpressure
ReadableStream
和 WriteableStream
都可以设置 highWaterMark
,表示 Stream 内部缓冲区的大小。 当调用readable.push
推入数据的时候,如果返回 false, 则表示内部缓冲区已满,不应该继续推入数据。
实现业务需要的流
实现可写流
实现 Writable 的 write 方法,描述如何处理数据,当处理完成当前 chunk 后,调用 callback. callback 接收 error 参数。
const { Writable } = require('stream')
const outStream = new Writable({
write(chunk, encoding, callback) {
// 模拟数据处理
console.log(chunk.toString())
callback()
},
})
process.stdin.pipe(outStream)
实现可读流
const { Readable } = require('node:stream')
const source = ['hello world\n', 'hello world again\n']
const r1 = new Readable({
highWaterMark: 2, // 设置高水位标记
read(size) {
const chunk = source.shift()
if (chunk) {
// backpressure
let available = this.push(chunk)
while (available) {
const nextChunk = source.shift()
if (nextChunk) {
available = this.push(nextChunk)
} else {
this.push(null)
break // 没有更多数据时结束流
}
}
} else {
// 当没有更多数据时,推送 null 以结束流
this.push(null)
}
},
})
r1.on('data', (chunk) => {
console.log('Received chunk:', chunk.toString())
})
实现双向流
需要同时实现write 和 read 方法。
const { Duplex } = require('stream')
const inoutStream = new Duplex({
write(chunk, encoding, callback) {
console.log(chunk.toString())
callback()
},
read(size) {
this.push(String.fromCharCode(this.currentCharCode++))
if (this.currentCharCode > 90) {
this.push(null)
}
},
})
inoutStream.currentCharCode = 65
process.stdin.pipe(inoutStream).pipe(process.stdout)
实现转换流
转换流的输出是从输入里面计算的
const { Transform } = require('stream')
const upperCaseTr = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase())
callback()
},
})
process.stdin.pipe(upperCaseTr).pipe(process.stdout)
流对象模式
除了Buffer/String值之外,Stream还可以通过流对象模式(ObjectMode)接收任何的javascript对象。
const arrayToObject = new Transform({
readableObjectMode: true, // 调用实例的read方法将会返回一个js对象
writableObjectMode: true, // 调用实例的write方法可以写入一个js对象
transform(obj, encoding, callback) {
// 现在第一个参数是一个对象
callback(obj)
},
})