Node.js 流与 Buffer:处理大数据与文件

FreeGuideOnline 最新 2026-06-15

Node.js 中的 Buffer:二进制数据的容器

在深入理解流之前,你必须先掌握 Buffer。在 Node.js 中,Buffer 类用于直接处理二进制数据。JavaScript 原生对二进制数据支持较弱,主要处理字符串,而 Node.js 需要应对网络协议、文件系统、加密等大量二进制场景,因此内置了 Buffer。

Buffer 本质上是堆外固定大小的内存块,你可以把它想象成一个整数数组,每个元素是一个字节(0-255)。但与普通数组不同,Buffer 的大小一旦创建就不可更改。

创建 Buffer

// 1. 从字符串创建(默认 utf-8 编码)
const buf1 = Buffer.from('Hello 世界');

// 2. 分配指定大小的 Buffer(内容未初始化,可能包含旧数据,性能较好)
const buf2 = Buffer.alloc(1024); // 填充0的安全缓冲区

// 3. 不安全的快速分配(内容随机,需立即覆盖)
const buf3 = Buffer.allocUnsafe(1024);

// 4. 从十六进制字符串或数组创建
const buf4 = Buffer.from([0x48, 0x65, 0x6c, 0x6c, 0x6f]);

Buffer 的读写与编码

通过下标直接读写单个字节,或使用方法批量转换:

const buf = Buffer.from('Node.js');

// 读取第一个字节的十进制值
console.log(buf[0]); // 78 (即 'N' 的 ASCII 码)

// 遍历每一个字节
for (const b of buf) {
  console.log(b);
}

// 转为字符串,支持多种编码
console.log(buf.toString('utf-8'));  // Node.js
console.log(buf.toString('hex'));    // 4e6f64652e6a73
console.log(buf.toString('base64')); // Tm9kZS5qcw==

为什么需要关注 Buffer?

Buffer 是流操作的基础单位。当读取文件或网络数据时,数据以 Buffer 块的形式到达。理解 Buffer 的不可变特性和编码转换,能帮你避免常见的乱码与性能陷阱。


Node.js 流的核心理念

流(Stream)是处理大规模数据的基石。试想你需要读取一个 10GB 的日志文件并将其压缩后写入另一个文件。如果一次性将整个文件读入内存,进程会立刻崩溃。流提供了分块处理的能力:数据被分割成一个个小的 chunk(通常是 Buffer),逐个处理并释放,从而将内存占用控制在极低水平。

Node.js 的流基于 EventEmitter,它发出 dataenderror 等事件,同时提供了更优雅的管道(pipe)机制。流并非 Node.js 独创,但其设计在 I/O 密集型应用中展现出了极高的价值。

流的四大类型

  1. Readable Stream(可读流):产生数据,例如 fs.createReadStream()、HTTP 请求的 req 对象。
  2. Writable Stream(可写流):消费数据,例如 fs.createWriteStream()、HTTP 响应的 res 对象。
  3. Duplex Stream(双工流):同时可读可写,例如 TCP socket。
  4. Transform Stream(转换流):双工流的一种,在读写过程中可对数据进行修改或转换,例如压缩解压流 zlib.createGzip()

可读流:如何优雅地消耗数据

可读流有两种工作模式:流动模式(flowing)暂停模式(paused)。默认情况下,可读流处于暂停模式,你必须显式调用 read() 或将其切换到流动模式来获取数据。

流动模式

通过监听 data 事件自动进入流动模式,数据会源源不断地被推送:

const fs = require('fs');

const rs = fs.createReadStream('./large-file.txt', { highWaterMark: 64 * 1024 }); // 每块 64KB
rs.on('data', (chunk) => {
  console.log(`收到 ${chunk.length} 字节数据`);
  // 在此处理 chunk(Buffer)
});
rs.on('end', () => {
  console.log('读取完毕');
});
rs.on('error', (err) => {
  console.error('读取错误:', err);
});

暂停模式

使用 read() 手动提取数据,适合需要精确控制流速的场景:

rs.on('readable', () => {
  let chunk;
  while ((chunk = rs.read()) !== null) {
    console.log(`读取到 ${chunk.length} 字节`);
  }
});

关键参数:highWaterMark

该选项设置在内部缓冲区最多存放多少字节的数据。对于可读流,当缓冲区数据量低于 highWaterMark 时,流会继续从底层资源拉取数据。默认值为 64 KB,你可以根据内存和吞吐需求调整它。


可写流:数据的安全着陆点

可写流通过 write() 方法接收数据,并由 end() 方法标志写入结束。其内部同样有缓冲区,用于应对生产者快于消费者的场景。

const ws = fs.createWriteStream('./output.txt', { highWaterMark: 16 * 1024 });

// write() 返回布尔值:false 表示内部缓冲区已满,你应该暂停上游生成
const canContinue = ws.write('这是一些数据');
if (!canContinue) {
  console.log('背压出现,暂停数据源');
}

ws.end('最后一块数据'); // 结束写入,可传入最后一批数据
ws.on('finish', () => {
  console.log('所有数据写入完成');
});

背压(Backpressure)机制

如果可写流处理数据的速度跟不上可读流产生的速度,数据会在内存中积压,最终耗尽内存。背压就是解决这一问题的协同机制:

  • 可写流的 write() 方法在内部缓冲区超过 highWaterMark 时返回 false
  • 生产者应当暂停生成数据,直到可写流发出 drain 事件。
rs.on('data', (chunk) => {
  const canWrite = ws.write(chunk);
  if (!canWrite) {
    rs.pause(); // 暂停可读流
  }
});
ws.on('drain', () => {
  rs.resume(); // 缓冲区排空,恢复可读流
});

管道 pipe():把流连接起来

手动处理背压繁琐且易出错,Node.js 提供了 pipe() 方法自动完成连接与背压管理。它是流的黄金搭档。

const rs = fs.createReadStream('./input.dat');
const ws = fs.createWriteStream('./output.dat');
rs.pipe(ws);

就这么简单。pipe() 会自动控制流速,将数据从可读流输送到可写流,并在必要时暂停/恢复,同时传递错误。

管道链与错误处理

你可以通过 pipeline() 工具函数连接多个流并获取错误回调,这比直接链式 pipe 更安全。

const { pipeline } = require('stream');
const zlib = require('zlib');

pipeline(
  fs.createReadStream('./input.txt'),
  zlib.createGzip(),               // 转换流:压缩
  fs.createWriteStream('./input.txt.gz'),
  (err) => {
    if (err) {
      console.error('管道处理失败', err);
    } else {
      console.log('压缩成功');
    }
  }
);

转换流实战:数据在途中变形

转换流既是可读又是可写流,它继承自 Duplex,专门用于在管道中对数据块进行转换。最常见的转换流是 zlib 压缩和解压流,以及自定义的数据处理流。

自定义一个过滤转换流

你可以继承 Transform 类来实现一个行过滤器,只保留包含关键词的行:

const { Transform } = require('stream');

class GrepStream extends Transform {
  constructor(keyword) {
    super();
    this.keyword = keyword;
    this.tail = ''; // 处理跨 chunk 的不完整行
  }

  _transform(chunk, encoding, callback) {
    const data = this.tail + chunk.toString();
    const lines = data.split('\n');
    this.tail = lines.pop(); // 最后一段可能不完整,留到下次
    for (const line of lines) {
      if (line.includes(this.keyword)) {
        this.push(line + '\n');
      }
    }
    callback();
  }

  _flush(callback) {
    if (this.tail.includes(this.keyword)) {
      this.push(this.tail);
    }
    callback();
  }
}

// 使用
fs.createReadStream('./app.log')
  .pipe(new GrepStream('ERROR'))
  .pipe(process.stdout);

对象模式与流的高级配置

默认情况下,流处理的是 Buffer 或字符串。但通过设置 objectMode: true,流可以传递任意 JavaScript 对象,非常适合处理结构化数据流,如从数据库返回的记录行。

const { Readable } = require('stream');

const objectStream = new Readable({
  objectMode: true,
  read() {
    this.push({ id: 1, name: 'task1' });
    this.push({ id: 2, name: 'task2' });
    this.push(null); // 结束
  }
});

objectStream.on('data', (obj) => {
  console.log('获取对象:', obj);
});

在对象模式下,highWaterMark 单位变成了对象个数而非字节数。


性能优化与最佳实践

  1. 合理设置 highWaterMark:根据数据规模和可用内存调整,默认 64KB 对大多数场景足够,处理高速网络流时可适当增加到 256KB 或更高。
  2. 避免手动消费流混用:不要在同一个可读流上既监听 data 又调用 read(),或既监听 data 又使用 pipe(),这会导致不可预期的行为。
  3. 总是处理错误:未监听的 error 事件会使进程崩溃。对每个流都要附加 error 监听器,或使用 pipeline 集中捕获。
  4. 利用 stream.finished() 优雅清理:它返回一个 Promise,当流结束时(无论成功、失败或提前关闭)进行资源回收。
const { finished } = require('stream');
finished(rs, (err) => {
  if (err) console.error('流异常终止', err);
  else console.log('流正常结束');
});
  1. 使用 stream.promises API:Node.js 15+ 提供了基于 promise 的流操作,如 stream.promises.pipeline(),便于在 async/await 中使用。

总结

Buffer 是你在 Node.js 中操作二进制数据的基础单元,而流则为处理大型数据提供了内存友好、高效的分块策略。掌握四大流类型以及 pipe/pipeline 机制,能够让你轻松应对文件处理、网络通信、数据转换等核心场景。记住:永远以流的方式思考大数据,让数据像水流一样流过你的应用,而非堵在内存里。