Node.js中的Stream
流并不是Node.js特有的概念,几十年前在Unix操作系统中引入,程序可以通过管道运算符(|
)让流之间进行一些交互处理。
以读取一个文本内容为例,如果不使用流操作的话,首先会将文本内容读取到内存中,在文本全部读取完毕后,将其结果输出。假如这个文本非常大的话,就会占用大量电脑内存,还有可能引起电脑内存不足,导致读取失败。
如果使用流的话,它则不会等内容完全读完再返回,而是读取一点返回一点,直到内容读取结束,这样就解决了读取大文件占用大量内存的问题。
如下,使用Node.js fs
模块读取文件,并在与http服务器建立新连接时通过HTTP返回文件内容:
const server = http.createServer(function(req, res) {
fs.readFile(__dirname + "/data.txt", (err, data) => {
res.end(data);
});
});
server.listen(3000);
使用readFile
方法将会在文件全部读完后调用回调函数。回调中的res.end(data)
会把读取文件的结果返回到http客户端。
如果data.txt
这个文件很大,将会花费很长一段时间等待才能看到输出。
如果使用流的方式
const server = http.createServer(function(req, res) {
const rs = fs.createReadStream(__dirname + "/data.txt");
rs.pipe(res.end);
});
server.listen(3000);
它并没有等待直到文件被完全读取,而是在准备好要发送的大量数据后立即开始将其以流的方式传输到HTTP客户端。
pipe
上面的示例使用的pipe
方法,它是是个管道操作,表示流数据从一个位置流向另一个位置。
同样的,pipe
支持链式调用,允许多个流之间的不同操作。
src.pipe(dest1).pipe(dest2)
Node中流驱动的API
由于流操作的高效性,Node中有很多流驱动型的API
process.stdin
: 返回连接到stdin
的流process.stdout
: 返回连接到stdout
的流process.stderr
: 返回连接到stderr
的流createReadStream
: 创建文件的可读流createWriteStream
: 创建文件的可写流net.connect
: 初始化一个基于流的连接http.request
: 返回http.ClientRequest
类的实例,该实例是可写流zlib.createGzip
: 使用gzip将数据压缩到流中zlib.createGunzip
: 解压缩gzip流zlib.createDeflate
: 使用deflate(压缩算法)将数据压缩到流中zlib.createInflate
: 解压缩deflate流
不同类型的流
流分为四类:
Readable
: 可读的流(如fs.createReadStream()
)Writable
: 可写的流(如fs.createWriteStream()
)Duplex
: 可读写的流(如net.Socket
)Transform
: 在读写过程中可以修改和变换数据的 Duplex 流(如zlib.createDeflate()
)
实现一个可读流
从stream
获得Readable
流,并对其进行初始化,实现其_read
方法即可。
const Stream = require('stream');
const readableStream = new Stream.Readable({
_read() {}
});
_read
方法是从底层系统读取具体数据的逻辑,即生产数据的逻辑,在_read
方法中,通过调用push(data)
将数据放入可读流中供下游消耗,当全部数据都生产出来后,必须调用push(null)
来结束可读流,流一旦结束,便不能再调用push(data)
添加数据
通过监听data
事件的方式可以消耗可读流。在首次监听其data
事件后,readable
便会持续不断的调用_read()
,通过触发data
事件将数据输出,第一次data
事件会在下一个tick中触发,所以,可以安全地将数据输出前的逻辑放在事件监听后(同一个tick中),当数据全部被消耗时,会触发end
事件。
readableStream.push('hello world');
readableStream.push('leevare');
实现一个可写流
和实现可读流类似,需要从stream
获得Writable
流,并对其进行初始化,实现其_write
方法即可。
const writableStream = new Stream.Writable({
_write(chunk, encoding, next) {
console.log(chunk.toString());
// 写入完成时调用next通知流传入下一个数据
// next调用可以同步,也可以是异步
next();
}
});
将数据发送到可写流
writableStream.write('leevare')
// 当写入完成时,必须调用end来结束可写流,在调用之后,无法再次调用write写入数据。
// 调用end后,当写入完全结束,其会触发finish事件
writableStream.end();
把上述两个接口结合起来,将可读流的读取内容输出到可写流中
readableStream.pipe(writableStream);
- 本博客所有文章除特别声明外,均可转载和分享,转载请注明出处!
- 本文地址:https://www.leevii.com/?p=2623