作为一个异步事件驱动的 JavaScript 运行时,Node.js对流的操作做了很好的内置支持。流(stream)是 Node.js 中的一个基本概念,可实现高效的数据处理,尤其是在处理大量信息或实时处理数据时非常有用。在本文中,我们主要探讨 Node.js 中流的概念,了解可用的不同类型的流(可读、可写、双工和转换),并讨论有效使用流的最佳实践。
什么是流(Stream)
流是 Node.js 应用程序中的一个基本概念,通过顺序读取或写入输入和输出来实现高效的数据处理。它们对于文件操作、网络通信和其他形式的端到端数据交换很方便。
流的独特之处在于它们以小的顺序块处理数据,而不是立即将整个数据集加载到内存中。当处理大量数据时,文件大小可能超过可用内存,这种方法非常有用。流使得以较小的片段处理数据成为可能,从而可以处理较大的文件。
如上图所示,从流中读取数据时,数据通常以块的形式读取或作为连续流读取。从流中读取的数据块可以存储在缓冲区中。缓冲区提供临时存储空间来保存数据块,直到可以进一步处理它们。
为了进一步说明这个概念,我们以看线上电影为例,我们知道,一部电影视频的大小,比较高清的大都好几个G了,当我们在浏览器上看电影时,浏览器是不可能把整部电影都下载到本地时才播放的,至少在目前的4G时代,没有一个用户会接收这样的等待时间的。流使应用程序能够以更小的连续块来处理数据,而不是在内存中获取和存储整个数据源(这可能是大量且不切实际的)。当电影数据流到达浏览器时,浏览器会立即播放出来而不是累计存储完整部电影才播放。
为什么要使用流(Stream)
- 使用流,无需在处理数据前将大量数据加载到内存中。相反,数据以更小的、可管理的块的形式进行处理,从而减少了内存需求并有效地利用了系统资源。
- 流可以在数据可用时立即进行数据处理,而无需等待整个有效负载的传输。这会加快响应时间并提高整体性能。
了解并有效利用流使开发人员能够实现最佳的内存使用、更快的数据处理和增强的代码模块化,使其成为 Node.js 应用程序中的强大功能。然而,不同类型的 Node.js 流可用于特定目的,并提供数据处理方面的多功能性。为了在 Node.js 应用程序中有效地使用流,清楚地了解每种流类型非常重要。因此,让我们深入研究 Node.js 中可用的不同流类型。
Node.js 流的类型
可读流(Readable Streams)
可读流允许从源读取数据,例如文件或网络套接字。它们按顺序发出数据块,并且可以通过将侦听器附加到“数据”事件来使用。可读流可以处于流动或暂停状态,具体取决于数据的消费方式。
const fs = require('fs');
// Create a Readable stream from a file
const readStream = fs.createReadStream('the_princess_bride_input.txt', 'utf8');
// Readable stream 'data' event handler
readStream.on('data', (chunk) => {
console.log(`Received chunk: ${chunk}`);
});
// Readable stream 'end' event handler
readStream.on('end', () => {
console.log('Data reading complete.');
});
// Readable stream 'error' event handler
readStream.on('error', (err) => {
console.error(`Error occurred: ${err}`);
});
如上面的代码片段所示,我们使用 fs 模块通过 createReadStream() 方法创建一个可读流。我们传递文件路径 the_princess_bride_input.txt 和编码 utf8 作为参数。可读流以小块的形式从小文件中读取数据。
我们将事件处理程序附加到可读流来处理不同的事件。当有数据块可供读取时,将发出 data 事件。当可读流完成从文件中读取所有数据时,将发出结束事件。如果读取过程中发生错误,则会发出 error 事件。
通过使用 Readable 流并监听相应的事件,可以高效地从文件等源读取数据,并对接收到的块执行进一步的操作。
可写流(Writable Streams)
可写流处理将数据写入目的地,例如文件或网络套接字(sockets)。它们提供 write() 和 end() 等方法将数据发送到流。可写流可用于以分块方式写入大量数据,防止内存溢出。
const fs = require('fs');
// Create a Writable stream to a file
const writeStream = fs.createWriteStream('the_princess_bride_output.txt');
// Writable stream 'finish' event handler
writeStream.on('finish', () => {
console.log('Data writing complete.');
});
// Writable stream 'error' event handler
writeStream.on('error', (err) => {
console.error(`Error occurred: ${err}`);
});
// Write a quote from "The to the Writable stream
writeStream.write('As ');
writeStream.write('You ');
writeStream.write('Wish');
writeStream.end();
在上面的代码示例中,我们使用 fs 模块通过 createWriteStream() 方法创建一个可写流。我们指定将写入数据的文件路径 (the_princess_bride_output.txt)。
我们将事件处理程序附加到可写流来处理不同的事件。当可写流完成写入所有数据时,将发出 finish 事件。如果写入过程中发生错误,则会发出 error 事件。 write() 方法用于将各个数据块写入可写流。在此示例中,我们将字符串“As”、“You”和“Wish”写入流。最后,我们调用 end() 来表示数据写入结束。
通过使用可写流并侦听相应的事件,您可以有效地将数据写入目标,并在写入过程完成后执行任何必要的清理或后续操作。
双工流(Duple Streams)
双工流表示可读流和可写流的组合。它们允许同时从源读取数据和将数据写入源。双工流是双向的,在读写同时发生的情况下提供灵活性。
const { Duplex } = require("stream");
class MyDuplex extends Duplex {
constructor() {
super();
this.data = "";
this.index = 0;
this.len = 0;
}
_read(size) {
// Readable side: push data to the stream
const lastIndexToRead = Math.min(this.index + size, this.len);
this.push(this.data.slice(this.index, lastIndexToRead));
this.index = lastIndexToRead;
if (size === 0) {
// Signal the end of reading
this.push(null);
}
}
_write(chunk, encoding, next) {
const stringVal = chunk.toString();
console.log(`Writing chunk: ${stringVal}`);
this.data += stringVal;
this.len += stringVal.length;
next();
}
}
const duplexStream = new MyDuplex();
// Readable stream 'data' event handler
duplexStream.on("data", (chunk) => {
console.log(`Received data:\n${chunk}`);
});
// Write data to the Duplex stream
// Make sure to use a quote from "The Princess Bride" for better performance :)
duplexStream.write("Hello.\n");
duplexStream.write("My name is Inigo Montoya.\n");
duplexStream.write("You killed my father.\n");
duplexStream.write("Prepare to die.\n");
// Signal writing ended
duplexStream.end();
在上面的示例中,我们从流模块扩展 Duplex 类来创建 Duplex 流。 Duplex流代表可读流和可写流(可以相互独立)。
我们定义 Duplex 流的 _read() 和 _write() 方法来处理各自的操作。在本例中,我们将写入流和读取流绑定在一起,但这只是为了这个示例 - 双工流支持独立的读取和写入流。
在 _read() 方法中,我们实现了 Duplex 流的可读部分。我们使用 this.push() 将数据推送到流中,当大小变为 0 时,我们通过将 null 推送到流来表示读取结束。
在 _write() 方法中,我们实现了 Duplex 流的可写端。我们处理接收到的数据块并将其添加到内部缓冲区。调用next()方法表明写操作完成。
事件处理程序附加到双工流的数据事件以处理流的可读端。要将数据写入 Duplex 流,我们可以使用 write() 方法。最后,我们调用 end() 来表示写入结束。
双工流允许您创建允许读取和写入操作的双向流,从而实现灵活的数据处理场景。
转换流(Transform streams)
转换流是一种特殊类型的双工流,它在数据通过流时修改或转换数据。它们通常用于数据操作任务,例如压缩、加密或解析。转换流接收输入、处理它并发出修改后的输出。
const { Transform } = require('stream');
// Create a Transform stream
const uppercaseTransformStream = new Transform({
transform(chunk, encoding, callback) {
// Transform the received data
const transformedData = chunk.toString().toUpperCase();
// Push the transformed data to the stream
this.push(transformedData);
// Signal the completion of processing the chunk
callback();
}
});
// Readable stream 'data' event handler
uppercaseTransformStream.on('data', (chunk) => {
console.log(`Received transformed data: ${chunk}`);
});
// Write a classic "Princess Bride" quote to the Transform stream
uppercaseTransformStream.write('Have fun storming the castle!');
uppercaseTransformStream.end();
如上面的代码片段所示,我们使用流模块中的 Transform 类来创建 Transform 流。我们在变换流选项对象中定义transform()方法来处理变换操作。在transform()方法中,我们实现了转换逻辑。在这种情况下 - 我们使用 chunk.toString().toUpperCase() 将接收到的数据块转换为大写。我们使用 this.push() 将转换后的数据推送到流中。最后,我们调用callback()来指示块的处理完成。
我们将事件处理程序附加到转换流的数据事件以处理流的可读部分。要将数据写入 Transform 流,我们使用 write() 方法。我们调用 end() 来表示写入结束。
转换流允许您在数据流经流时即时执行数据转换,从而实现灵活且可定制的数据处理。
了解这些不同类型的流可以让开发人员根据自己的具体需求选择合适的流类型。
Node.js Streams的使用
为了更好地掌握 Node.js Streams 的实际实现,让我们考虑一个从文件读取数据并在转换和压缩后使用流将其写入另一个文件的示例。
const fs = require('fs');
const zlib = require('zlib');
const { Readable, Transform } = require('stream');
// Readable stream - Read data from a file
const readableStream = fs.createReadStream('classic_tale_of_true_love_and_high_adventure.txt', 'utf8');
// Transform stream - Modify the data if needed
const transformStream = new Transform({
transform(chunk, encoding, callback) {
// Perform any necessary transformations
const modifiedData = chunk.toString().toUpperCase(); // Placeholder for transformation logic
this.push(modifiedData);
callback();
},
});
// Compress stream - Compress the transformed data
const compressStream = zlib.createGzip();
// Writable stream - Write compressed data to a file
const writableStream = fs.createWriteStream('compressed-tale.gz');
// Pipe streams together
readableStream.pipe(transformStream).pipe(compressStream).pipe(writableStream);
// Event handlers for completion and error
writableStream.on('finish', () => {
console.log('Compression complete.');
});
writableStream.on('error', (err) => {
console.error('An error occurred during compression:', err);
});
在此代码片段中,我们使用可读流读取文件,将数据转换为大写并使用两个转换流(一个是我们的,一个是内置的 zlib 转换流)对其进行压缩,最后将数据写入文件使用可写流。
我们使用 fs.createReadStream() 创建一个可读流来从输入文件读取数据。使用 Transform 类创建转换流。在这里,您可以对数据实施任何必要的转换(在本示例中,我们再次使用了 toUpperCase())。然后我们使用 zlib.createGzip() 创建另一个转换流,以使用 Gzip 压缩算法压缩转换后的数据。最后,使用 fs.createWriteStream() 创建可写流,将压缩数据写入compressed-tale.gz 文件。
.pipe() 方法用于按顺序将流连接在一起。我们从可读流开始,将其通过管道传输到转换流,然后将转换流通过管道传输到压缩流,最后,将压缩流通过管道传输到可写流。它允许您建立从可读流通过转换和压缩流到可写流的简化数据流。最后,事件处理程序附加到可写流以处理完成和错误事件。
使用pipe()可以简化连接流的过程,自动处理数据流,并确保从可读流到可写流的高效且无错误的传输。它负责管理底层流事件和错误传播。
使用 Node.js Streams 时,遵循最佳实践以确保最佳性能和可维护代码非常重要。
实现流控制机制:当可写流无法跟上从可读流读取数据的速率时,当可读流完成读取时,缓冲区中可能会残留大量数据。在某些情况下,这甚至可能超出可用内存量。这称为背压。为了有效地处理背压,请考虑实施流量控制机制,例如使用pause()和resume()方法或利用第三方模块,例如pump或through2。
通过遵循这些最佳实践,开发人员可以确保高效的流处理、最大限度地减少资源使用并构建强大且可扩展的应用程序
总结
Node.js Streams 是一项强大的功能,可以以非阻塞的方式高效处理数据流。通过利用流,开发人员可以处理大型数据集、处理实时数据并以节省内存的方式执行操作。了解不同类型的流,例如可读、可写、双工和转换,并遵循最佳实践可确保最佳的流处理、错误管理和资源利用率。通过利用流的力量,开发人员可以使用 Node.js 构建高性能且可扩展的应用程序。