Maurice Wu
Published on

Stream of Node.js

  1. Stream types: The already-mentioned four foundational stream types (Writeable, Readable, Duplex, and Transform).
  2. Buffering: Temporary data storage within streams, controlled by highWaterMark to manage memory usage.
  3. Backpressure: Mechanism to balance data flow between fast producers and slow consumers, preventing memory issues.
  4. Modes of OperationFlowing (automatic data reading) vs Paused (manual data reading) modes for controlling data consumption.
  5. Piping: Connecting streams to automatically manage data flow and backpressure between them.
  6. Events: Asynchronous notifications for stream states and data availability, crucial for stream interaction.
  7. Object Mode: An option with streams that enable the capability to process JavaScript objects instead of just buffers or strings.

我认为 Stream 是 Node 中最好的特性。用事件的方式(Stream的实现采用了EventEmitter)来解决CPU 和 IO 之间处理速度的矛盾(CPU不必等待I/O设备的操作结果)。

流的事件

1698719718466-25d52424-e15a-431d-b1a8-58db72e861a3

在可读流上最重要的事件有:
data 事件,当流传递给消费者一个数据块的时候会触发。
end 事件,当在流中没有可以消费的数据的时候会触发。

在可写流上面最重要的事件有:
drain 事件,当可写流可以接受更多的数据时的一个标志。
finish 事件,当所有的数据都写入到底层系统中时会触发。

可读流的暂停和流动模式

  1. 默认一开始是暂停模式,添加 data 事件监听会切换到流动模式。
  2. 使用 read() 方法从暂停模式的流中按需读取数据,然而,对于一个在流动模式的可读流,数据持续流动,我们不得不监听事件去消费它。
  3. 使用 resume() 方法和 pause() 方法手动切换两个流模式

Backpressure

ReadableStreamWriteableStream 都可以设置 highWaterMark,表示 Stream 内部缓冲区的大小。 当调用readable.push 推入数据的时候,如果返回 false, 则表示内部缓冲区已满,不应该继续推入数据。

实现业务需要的流

实现可写流

实现 Writable 的 write 方法,描述如何处理数据,当处理完成当前 chunk 后,调用 callback. callback 接收 error 参数。

const { Writable } = require('stream')
const outStream = new Writable({
  write(chunk, encoding, callback) {
    // 模拟数据处理
    console.log(chunk.toString())
    callback()
  },
})

process.stdin.pipe(outStream)

实现可读流

const { Readable } = require('node:stream')
const source = ['hello world\n', 'hello world again\n']
const r1 = new Readable({
  highWaterMark: 2, // 设置高水位标记
  read(size) {
    const chunk = source.shift()
    if (chunk) {
      // backpressure
      let available = this.push(chunk)
      while (available) {
        const nextChunk = source.shift()
        if (nextChunk) {
          available = this.push(nextChunk)
        } else {
          this.push(null)
          break // 没有更多数据时结束流
        }
      }
    } else {
      // 当没有更多数据时,推送 null 以结束流
      this.push(null)
    }
  },
})

r1.on('data', (chunk) => {
  console.log('Received chunk:', chunk.toString())
})

实现双向流

需要同时实现write 和 read 方法。

const { Duplex } = require('stream')

const inoutStream = new Duplex({
  write(chunk, encoding, callback) {
    console.log(chunk.toString())
    callback()
  },

  read(size) {
    this.push(String.fromCharCode(this.currentCharCode++))
    if (this.currentCharCode > 90) {
      this.push(null)
    }
  },
})

inoutStream.currentCharCode = 65
process.stdin.pipe(inoutStream).pipe(process.stdout)

实现转换流

转换流的输出是从输入里面计算的

const { Transform } = require('stream')
const upperCaseTr = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase())
    callback()
  },
})
process.stdin.pipe(upperCaseTr).pipe(process.stdout)

流对象模式

除了Buffer/String值之外,Stream还可以通过流对象模式(ObjectMode)接收任何的javascript对象。

const arrayToObject = new Transform({
  readableObjectMode: true, // 调用实例的read方法将会返回一个js对象
  writableObjectMode: true, // 调用实例的write方法可以写入一个js对象
  transform(obj, encoding, callback) {
    // 现在第一个参数是一个对象
    callback(obj)
  },
})

参考

Node Stream Series