Node.js 流与 Buffer:处理大数据与文件
Node.js 中的 Buffer:二进制数据的容器
在深入理解流之前,你必须先掌握 Buffer。在 Node.js 中,Buffer 类用于直接处理二进制数据。JavaScript 原生对二进制数据支持较弱,主要处理字符串,而 Node.js 需要应对网络协议、文件系统、加密等大量二进制场景,因此内置了 Buffer。
Buffer 本质上是堆外固定大小的内存块,你可以把它想象成一个整数数组,每个元素是一个字节(0-255)。但与普通数组不同,Buffer 的大小一旦创建就不可更改。
创建 Buffer
// 1. 从字符串创建(默认 utf-8 编码)
const buf1 = Buffer.from('Hello 世界');
// 2. 分配指定大小的 Buffer(内容未初始化,可能包含旧数据,性能较好)
const buf2 = Buffer.alloc(1024); // 填充0的安全缓冲区
// 3. 不安全的快速分配(内容随机,需立即覆盖)
const buf3 = Buffer.allocUnsafe(1024);
// 4. 从十六进制字符串或数组创建
const buf4 = Buffer.from([0x48, 0x65, 0x6c, 0x6c, 0x6f]);
Buffer 的读写与编码
通过下标直接读写单个字节,或使用方法批量转换:
const buf = Buffer.from('Node.js');
// 读取第一个字节的十进制值
console.log(buf[0]); // 78 (即 'N' 的 ASCII 码)
// 遍历每一个字节
for (const b of buf) {
console.log(b);
}
// 转为字符串,支持多种编码
console.log(buf.toString('utf-8')); // Node.js
console.log(buf.toString('hex')); // 4e6f64652e6a73
console.log(buf.toString('base64')); // Tm9kZS5qcw==
为什么需要关注 Buffer?
Buffer 是流操作的基础单位。当读取文件或网络数据时,数据以 Buffer 块的形式到达。理解 Buffer 的不可变特性和编码转换,能帮你避免常见的乱码与性能陷阱。
Node.js 流的核心理念
流(Stream)是处理大规模数据的基石。试想你需要读取一个 10GB 的日志文件并将其压缩后写入另一个文件。如果一次性将整个文件读入内存,进程会立刻崩溃。流提供了分块处理的能力:数据被分割成一个个小的 chunk(通常是 Buffer),逐个处理并释放,从而将内存占用控制在极低水平。
Node.js 的流基于 EventEmitter,它发出 data、end、error 等事件,同时提供了更优雅的管道(pipe)机制。流并非 Node.js 独创,但其设计在 I/O 密集型应用中展现出了极高的价值。
流的四大类型
- Readable Stream(可读流):产生数据,例如
fs.createReadStream()、HTTP 请求的req对象。 - Writable Stream(可写流):消费数据,例如
fs.createWriteStream()、HTTP 响应的res对象。 - Duplex Stream(双工流):同时可读可写,例如 TCP socket。
- Transform Stream(转换流):双工流的一种,在读写过程中可对数据进行修改或转换,例如压缩解压流
zlib.createGzip()。
可读流:如何优雅地消耗数据
可读流有两种工作模式:流动模式(flowing) 和 暂停模式(paused)。默认情况下,可读流处于暂停模式,你必须显式调用 read() 或将其切换到流动模式来获取数据。
流动模式
通过监听 data 事件自动进入流动模式,数据会源源不断地被推送:
const fs = require('fs');
const rs = fs.createReadStream('./large-file.txt', { highWaterMark: 64 * 1024 }); // 每块 64KB
rs.on('data', (chunk) => {
console.log(`收到 ${chunk.length} 字节数据`);
// 在此处理 chunk(Buffer)
});
rs.on('end', () => {
console.log('读取完毕');
});
rs.on('error', (err) => {
console.error('读取错误:', err);
});
暂停模式
使用 read() 手动提取数据,适合需要精确控制流速的场景:
rs.on('readable', () => {
let chunk;
while ((chunk = rs.read()) !== null) {
console.log(`读取到 ${chunk.length} 字节`);
}
});
关键参数:highWaterMark
该选项设置在内部缓冲区最多存放多少字节的数据。对于可读流,当缓冲区数据量低于 highWaterMark 时,流会继续从底层资源拉取数据。默认值为 64 KB,你可以根据内存和吞吐需求调整它。
可写流:数据的安全着陆点
可写流通过 write() 方法接收数据,并由 end() 方法标志写入结束。其内部同样有缓冲区,用于应对生产者快于消费者的场景。
const ws = fs.createWriteStream('./output.txt', { highWaterMark: 16 * 1024 });
// write() 返回布尔值:false 表示内部缓冲区已满,你应该暂停上游生成
const canContinue = ws.write('这是一些数据');
if (!canContinue) {
console.log('背压出现,暂停数据源');
}
ws.end('最后一块数据'); // 结束写入,可传入最后一批数据
ws.on('finish', () => {
console.log('所有数据写入完成');
});
背压(Backpressure)机制
如果可写流处理数据的速度跟不上可读流产生的速度,数据会在内存中积压,最终耗尽内存。背压就是解决这一问题的协同机制:
- 可写流的
write()方法在内部缓冲区超过highWaterMark时返回false。 - 生产者应当暂停生成数据,直到可写流发出
drain事件。
rs.on('data', (chunk) => {
const canWrite = ws.write(chunk);
if (!canWrite) {
rs.pause(); // 暂停可读流
}
});
ws.on('drain', () => {
rs.resume(); // 缓冲区排空,恢复可读流
});
管道 pipe():把流连接起来
手动处理背压繁琐且易出错,Node.js 提供了 pipe() 方法自动完成连接与背压管理。它是流的黄金搭档。
const rs = fs.createReadStream('./input.dat');
const ws = fs.createWriteStream('./output.dat');
rs.pipe(ws);
就这么简单。pipe() 会自动控制流速,将数据从可读流输送到可写流,并在必要时暂停/恢复,同时传递错误。
管道链与错误处理
你可以通过 pipeline() 工具函数连接多个流并获取错误回调,这比直接链式 pipe 更安全。
const { pipeline } = require('stream');
const zlib = require('zlib');
pipeline(
fs.createReadStream('./input.txt'),
zlib.createGzip(), // 转换流:压缩
fs.createWriteStream('./input.txt.gz'),
(err) => {
if (err) {
console.error('管道处理失败', err);
} else {
console.log('压缩成功');
}
}
);
转换流实战:数据在途中变形
转换流既是可读又是可写流,它继承自 Duplex,专门用于在管道中对数据块进行转换。最常见的转换流是 zlib 压缩和解压流,以及自定义的数据处理流。
自定义一个过滤转换流
你可以继承 Transform 类来实现一个行过滤器,只保留包含关键词的行:
const { Transform } = require('stream');
class GrepStream extends Transform {
constructor(keyword) {
super();
this.keyword = keyword;
this.tail = ''; // 处理跨 chunk 的不完整行
}
_transform(chunk, encoding, callback) {
const data = this.tail + chunk.toString();
const lines = data.split('\n');
this.tail = lines.pop(); // 最后一段可能不完整,留到下次
for (const line of lines) {
if (line.includes(this.keyword)) {
this.push(line + '\n');
}
}
callback();
}
_flush(callback) {
if (this.tail.includes(this.keyword)) {
this.push(this.tail);
}
callback();
}
}
// 使用
fs.createReadStream('./app.log')
.pipe(new GrepStream('ERROR'))
.pipe(process.stdout);
对象模式与流的高级配置
默认情况下,流处理的是 Buffer 或字符串。但通过设置 objectMode: true,流可以传递任意 JavaScript 对象,非常适合处理结构化数据流,如从数据库返回的记录行。
const { Readable } = require('stream');
const objectStream = new Readable({
objectMode: true,
read() {
this.push({ id: 1, name: 'task1' });
this.push({ id: 2, name: 'task2' });
this.push(null); // 结束
}
});
objectStream.on('data', (obj) => {
console.log('获取对象:', obj);
});
在对象模式下,highWaterMark 单位变成了对象个数而非字节数。
性能优化与最佳实践
- 合理设置 highWaterMark:根据数据规模和可用内存调整,默认 64KB 对大多数场景足够,处理高速网络流时可适当增加到 256KB 或更高。
- 避免手动消费流混用:不要在同一个可读流上既监听
data又调用read(),或既监听data又使用pipe(),这会导致不可预期的行为。 - 总是处理错误:未监听的
error事件会使进程崩溃。对每个流都要附加error监听器,或使用pipeline集中捕获。 - 利用
stream.finished()优雅清理:它返回一个 Promise,当流结束时(无论成功、失败或提前关闭)进行资源回收。
const { finished } = require('stream');
finished(rs, (err) => {
if (err) console.error('流异常终止', err);
else console.log('流正常结束');
});
- 使用
stream.promisesAPI:Node.js 15+ 提供了基于 promise 的流操作,如stream.promises.pipeline(),便于在async/await中使用。
总结
Buffer 是你在 Node.js 中操作二进制数据的基础单元,而流则为处理大型数据提供了内存友好、高效的分块策略。掌握四大流类型以及 pipe/pipeline 机制,能够让你轻松应对文件处理、网络通信、数据转换等核心场景。记住:永远以流的方式思考大数据,让数据像水流一样流过你的应用,而非堵在内存里。