很早之前看到过写drain的文章,最近我在工作中也遇到了个需要自己控制流的场景,整理了一下相关资料,在这里记录一下吧。

这个是之前看的关于drain事件的文章 探究 Node.js 中的 drain 事件

然后是一些讲Stream讲的不错的文章:

官网Stream模块文档

Nodejs Stream系列文章

深入理解 Node Stream 内部机制

我的场景

业务场景如下:

image.png

因为开放API服务和真正的cos之间隔了一层rpc的代理,这里在下载大文件时有两点问题:

  1. 单次rpc请求包体不宜过大
  2. cos代理服务和开放API服务的内存都有限,不能把大文件整个读入内存,需要做分块控制

那么最自然的思路是,把大文件的分片rpc调用读取,基于nodejs的Stream封装成一个可读流,这样就不需要自己做流程控制了。

伪代码:

import {Readable, ReadableOptions} from "stream"
class RpcReadStream extends Readable{
    public constructor(options?: ReadableOptions){
        super(options)
    }
    public async _read() {
        let buf =  await this._readFromRpc()
        this.push(buf)
    }
    private _end(){
    this.push(null)
    }
    private async _readFromRpc():Promise<Buffer>{
        //TODO
        return Buffer.from("todo")
    }
}

let rpcReadStream = new RpcReadStream()
rpcReadStream.pipe(ctx.response)

做的时候顺便做了一些测试,记一些很弱智但是容易搞错的点,另外还有和探究 Node.js 中的 drain 事件文章中结论不一致的地方,也记录一下吧。

一些问题

记住调用pipe后就交出了stream的控制权

我很弱智的在调用 rpcReadStream.pipe(ctx.response) 后,还尝试在别的地方通过 rpcReadStream.pause() 来停止数据读入。 结果发现并没有作用, _read函数还是在不断调用,当时有点懵,还想了会儿。

后来在

writeStream.addListener("drain",()=>{})

里边加上一些log,果然drain在不断触发,而pipe的逻辑里边drain触发的时候是会将可读流resume的。。。
这里还是记录一下吧,一旦将可读流pipe出去后,就不要再尝试操作它了,控制权已经交出去了,除非执行unpipe。

若不限流,数据会在写缓冲区无限积压

探究 Node.js 中的 drain 事件文章中描述的,限流和不限流时内存表现差不多。

而我这边写了个生产快、消费慢的demo代码,在不限流时,数据会在写缓冲区无限积压,打印writeableLength不断增大。

反之,在加上用drain事件限流的逻辑后,readableLength 和 writeableLength 的长度,都相对稳定。

唯一不同的是我的demo是从内存中随机生成内容,而文章中用的是fs.createReadStream,或许其内部实现比较奇怪,在pause后还会读取,或者水位线开的特别高?后续有机会求证下~

可读流水位线的作用?

目前看到的文档和文章里边,大多是讲可写流的水位线满了之后,背压触发可写流停止读取数据,但是对于可读流的水位线作用并没有深入讨论。

可写流:

  • 水位线满,writeStrea.write返回false,一般用来pause可读流
  • 数据发送成功 ,缓冲区数据减少,水位线未满,触发 “drain” 事件,一般用来恢复可读流
  • 产生场景是可读流在流动模式下,生产速度大于消费速度

可读流:

  • 水位线满调用readStream.push返回false
  • 恢复的标志是数据被读取?
  • 产生的场景是在暂停模式下,因数据不消费导致积压满?

经验证,确实是这样。给readStream绑定”readable”事件后,让readStream进入暂停模式,然后在_read函数里打出push的返回值和readableLength,默认增加到16k后,push返回fasle,不再读取数据。

而一旦调用readStream.read()读取缓冲区中的全部数据,水位线即恢复到不满状态,重新开始读取数据。

可以理解为,可读流的水位线,一般在读流的暂停模式下起作用;而可写流的水位线,一般用来控制流动模式下可读流的状态。

☞ 参与评论