Node.js中的Stream

流并不是Node.js特有的概念,几十年前在Unix操作系统中引入,程序可以通过管道运算符(|)让流之间进行一些交互处理。

以读取一个文本内容为例,如果不使用流操作的话,首先会将文本内容读取到内存中,在文本全部读取完毕后,将其结果输出。假如这个文本非常大的话,就会占用大量电脑内存,还有可能引起电脑内存不足,导致读取失败。

如果使用流的话,它则不会等内容完全读完再返回,而是读取一点返回一点,直到内容读取结束,这样就解决了读取大文件占用大量内存的问题。

如下,使用Node.js fs模块读取文件,并在与http服务器建立新连接时通过HTTP返回文件内容:

const server = http.createServer(function(req, res) {
  fs.readFile(__dirname + "/data.txt", (err, data) => {
    res.end(data);
  });
});
server.listen(3000);

使用readFile方法将会在文件全部读完后调用回调函数。回调中的res.end(data)会把读取文件的结果返回到http客户端。

如果data.txt这个文件很大,将会花费很长一段时间等待才能看到输出。

如果使用流的方式

const server = http.createServer(function(req, res) {
  const rs = fs.createReadStream(__dirname + "/data.txt");
  rs.pipe(res.end);
});
server.listen(3000);

它并没有等待直到文件被完全读取,而是在准备好要发送的大量数据后立即开始将其以流的方式传输到HTTP客户端。

pipe

上面的示例使用的pipe方法,它是是个管道操作,表示流数据从一个位置流向另一个位置。

同样的,pipe支持链式调用,允许多个流之间的不同操作。

src.pipe(dest1).pipe(dest2)

Node中流驱动的API

由于流操作的高效性,Node中有很多流驱动型的API

  • process.stdin: 返回连接到stdin的流
  • process.stdout: 返回连接到stdout的流
  • process.stderr: 返回连接到stderr的流
  • createReadStream: 创建文件的可读流
  • createWriteStream: 创建文件的可写流
  • net.connect: 初始化一个基于流的连接
  • http.request: 返回http.ClientRequest类的实例,该实例是可写流
  • zlib.createGzip: 使用gzip将数据压缩到流中
  • zlib.createGunzip: 解压缩gzip流
  • zlib.createDeflate: 使用deflate(压缩算法)将数据压缩到流中
  • zlib.createInflate: 解压缩deflate流

不同类型的流

流分为四类:

  • Readable: 可读的流(如fs.createReadStream()
  • Writable: 可写的流(如fs.createWriteStream()
  • Duplex: 可读写的流(如net.Socket
  • Transform: 在读写过程中可以修改和变换数据的 Duplex 流(如zlib.createDeflate()

实现一个可读流

stream获得Readable流,并对其进行初始化,实现其_read方法即可。

const Stream = require('stream');
const readableStream = new Stream.Readable({
  _read() {}
});

_read方法是从底层系统读取具体数据的逻辑,即生产数据的逻辑,在_read方法中,通过调用push(data)将数据放入可读流中供下游消耗,当全部数据都生产出来后,必须调用push(null)来结束可读流,流一旦结束,便不能再调用push(data)添加数据

通过监听data事件的方式可以消耗可读流。在首次监听其data事件后,readable便会持续不断的调用_read(),通过触发data事件将数据输出,第一次data事件会在下一个tick中触发,所以,可以安全地将数据输出前的逻辑放在事件监听后(同一个tick中),当数据全部被消耗时,会触发end事件。

readableStream.push('hello world');
readableStream.push('leevare');

实现一个可写流

和实现可读流类似,需要从stream获得Writable流,并对其进行初始化,实现其_write方法即可。

const writableStream = new Stream.Writable({
  _write(chunk, encoding, next) {
    console.log(chunk.toString());
    // 写入完成时调用next通知流传入下一个数据
    // next调用可以同步,也可以是异步
    next();
  }
});

将数据发送到可写流

writableStream.write('leevare')
// 当写入完成时,必须调用end来结束可写流,在调用之后,无法再次调用write写入数据。
// 调用end后,当写入完全结束,其会触发finish事件
writableStream.end();

把上述两个接口结合起来,将可读流的读取内容输出到可写流中

readableStream.pipe(writableStream);
如果您觉得本文对您有用,欢迎捐赠或留言~
微信支付
支付宝

发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注