Node.js Streams 与背压机制:处理大文件的正确姿势
前言
在 Node.js 中处理大文件是一个经典场景——读取一个 10GB 的日志文件进行分析、将数据库导出的 CSV 转换格式、实时处理视频流……如果你用 fs.readFile() 一次性读入内存,要么 OOM 崩溃,要么严重阻塞事件循环。
Streams(流) 是 Node.js 处理大量数据的核心抽象。它将数据分成小块(chunk)逐步处理,内存占用恒定且可控。而背压(Backpressure) 是流系统中最重要也最容易被忽略的机制——它确保生产者不会淹没消费者。
本文将深入讲解 Node.js 四种流类型、pipeline API、背压原理,并通过实战案例展示正确处理大文件的姿势。
流的四种类型
Node.js 提供四种基本流类型:
| 类型 | 描述 | 示例 |
|---|---|---|
| Readable | 可读流,数据源 | fs.createReadStream, http.IncomingMessage |
| Writable | 可写流,数据目标 | fs.createWriteStream, http.ServerResponse |
| Duplex | 双工流,可读可写 | net.Socket, zlib 流 |
| Transform | 转换流,读入-处理-写出 | zlib.createGzip, crypto.createCipher |
Readable Stream(可读流)
import { createReadStream } from 'fs';import { Readable } from 'stream';
// 从文件创建可读流const fileStream = createReadStream('/var/log/app.log', { encoding: 'utf-8', highWaterMark: 64 * 1024 // 每次读取 64KB});
// 事件模式(flowing mode)fileStream.on('data', (chunk) => { console.log(`Received ${chunk.length} bytes`);});
fileStream.on('end', () => { console.log('File reading complete');});
fileStream.on('error', (err) => { console.error('Read error:', err);});两种消费模式
可读流有两种模式:flowing(流动) 和 paused(暂停)。
// Paused mode(暂停模式)— 手动拉取数据fileStream.on('readable', () => { let chunk; while ((chunk = fileStream.read()) !== null) { console.log(`Read ${chunk.length} bytes`); }});
// Flowing mode(流动模式)— 数据自动推送fileStream.on('data', (chunk) => { // 一旦注册 data 事件,流自动进入 flowing mode process.stdout.write(chunk);});
// 手动控制流动fileStream.pause(); // 暂停fileStream.resume(); // 恢复自定义 Readable
class CounterStream extends Readable { #current; #max;
constructor(max, options = {}) { super(options); this.#current = 1; this.#max = max; }
_read(size) { if (this.#current > this.#max) { this.push(null); // null 表示流结束 return; }
const str = String(this.#current++) + '\n'; this.push(str); }}
const counter = new CounterStream(100);counter.pipe(process.stdout);// 输出 1 到 100从异步数据源创建 Readable
import { Readable } from 'stream';
// 从异步生成器创建async function* fetchAllPages(url) { let page = 1; let hasMore = true;
while (hasMore) { const res = await fetch(`${url}?page=${page}`); const data = await res.json(); yield JSON.stringify(data) + '\n'; hasMore = data.length > 0; page++; }}
const apiStream = Readable.from(fetchAllPages('https://api.example.com/items'));Writable Stream(可写流)
import { createWriteStream } from 'fs';import { Writable } from 'stream';
// 文件可写流const output = createWriteStream('./output.txt');
output.write('Hello ');output.write('World\n');output.end('Done!'); // end 写入最后的数据并关闭流
output.on('finish', () => { console.log('All data written');});自定义 Writable
class LogWriter extends Writable { #lineCount = 0;
_write(chunk, encoding, callback) { const lines = chunk.toString().split('\n').filter(Boolean); this.#lineCount += lines.length;
for (const line of lines) { const timestamp = new Date().toISOString(); process.stdout.write(`[${timestamp}] ${line}\n`); }
// callback 必须调用,表示这个 chunk 处理完成 // 如果处理出错,传入 error: callback(new Error('...')) callback(); }
_final(callback) { console.log(`\nTotal lines processed: ${this.#lineCount}`); callback(); }}
const logger = new LogWriter();logger.write('First message\n');logger.write('Second message\n');logger.end('Final message\n');Transform Stream(转换流)
Transform 是最实用的流类型——它接收输入数据、处理后输出新的数据。
import { Transform } from 'stream';
// CSV 转 JSON 转换流class CSVToJSON extends Transform { #headers = null; #buffer = '';
constructor(options = {}) { super({ ...options, readableObjectMode: true }); }
_transform(chunk, encoding, callback) { this.#buffer += chunk.toString(); const lines = this.#buffer.split('\n'); this.#buffer = lines.pop() || ''; // 保留不完整的行
for (const line of lines) { if (!line.trim()) continue;
const values = line.split(',').map(v => v.trim());
if (!this.#headers) { this.#headers = values; continue; }
const obj = {}; this.#headers.forEach((header, i) => { obj[header] = values[i]; });
this.push(obj); }
callback(); }
_flush(callback) { // 处理最后残留的数据 if (this.#buffer.trim() && this.#headers) { const values = this.#buffer.split(',').map(v => v.trim()); const obj = {}; this.#headers.forEach((header, i) => { obj[header] = values[i]; }); this.push(obj); } callback(); }}
// 使用import { createReadStream } from 'fs';
const csvStream = createReadStream('./data.csv');const parser = new CSVToJSON();
csvStream.pipe(parser).on('data', (record) => { console.log(record); // { name: 'Alice', age: '30', email: 'alice@test.com' }});实用 Transform 示例
// 行分割转换流class LineSplitter extends Transform { #buffer = '';
constructor() { super({ readableObjectMode: true }); }
_transform(chunk, encoding, callback) { this.#buffer += chunk.toString(); const lines = this.#buffer.split('\n'); this.#buffer = lines.pop() || '';
for (const line of lines) { if (line) this.push(line); } callback(); }
_flush(callback) { if (this.#buffer) this.push(this.#buffer); callback(); }}
// 进度报告转换流class ProgressReporter extends Transform { #processed = 0; #total; #lastReport = 0;
constructor(total) { super(); this.#total = total; }
_transform(chunk, encoding, callback) { this.#processed += chunk.length;
const percent = Math.round((this.#processed / this.#total) * 100); if (percent - this.#lastReport >= 5) { // 每 5% 报告一次 this.#lastReport = percent; process.stderr.write(`\rProgress: ${percent}% (${this.#processed} / ${this.#total} bytes)`); }
this.push(chunk); // 数据原样传递 callback(); }
_flush(callback) { process.stderr.write('\rProgress: 100% — Complete!\n'); callback(); }}
// 数据过滤转换流class JSONFilter extends Transform { #predicate;
constructor(predicate) { super({ objectMode: true }); this.#predicate = predicate; }
_transform(obj, encoding, callback) { if (this.#predicate(obj)) { this.push(obj); } callback(); }}Duplex Stream(双工流)
双工流同时可读可写,但读写之间可以是独立的:
import { Duplex } from 'stream';
class EchoStream extends Duplex { #queue = [];
_write(chunk, encoding, callback) { // 写入端接收数据,转换后放入队列 this.#queue.push(chunk.toString().toUpperCase()); callback(); }
_read(size) { if (this.#queue.length > 0) { this.push(this.#queue.shift()); } else { // 没有数据可读时,延迟重试 setTimeout(() => this._read(size), 10); } }}背压(Backpressure)
背压是流系统中最关键的概念。当数据生产速度超过消费速度时,如果没有背压机制,数据会在内存中无限堆积。
背压问题演示
import { createReadStream, createWriteStream } from 'fs';
const readable = createReadStream('./huge-file.dat'); // 从 SSD 读取,非常快const writable = createWriteStream('./output.dat'); // 写入网络挂载的磁盘,很慢
// ❌ 错误做法:不处理背压readable.on('data', (chunk) => { writable.write(chunk); // write() 返回 false 时说明内部缓冲区满了 // 但我们忽略了返回值,继续狂写 // → 内存不断增长 → OOM});手动处理背压
// ✅ 正确做法:手动处理背压readable.on('data', (chunk) => { const canContinue = writable.write(chunk);
if (!canContinue) { // 内部缓冲区满了,暂停读取 readable.pause();
// 等缓冲区排空后再继续 writable.once('drain', () => { readable.resume(); }); }});
readable.on('end', () => { writable.end();});背压原理深入
每个流都有一个内部缓冲区,大小由 highWaterMark 控制(默认 16KB for Buffer,16 objects for objectMode)。
生产者 (Readable) 消费者 (Writable) │ │ │ chunk ──────────────► 内部缓冲区 │ chunk ──────────────► [####............] ← highWaterMark │ chunk ──────────────► [########........] │ chunk ──────────────► [############....] │ chunk ──────────────► [################] ← 缓冲区满! │ │ │ write() returns false │ │ ← 暂停生产 │ │ │ → 慢慢消费... │ │ [########....] │ │ [............] ← 缓冲区清空 │ │ │ 'drain' event ─────────► │ │ → 恢复生产 │highWaterMark 不是硬限制——即使超过了,write() 仍然能成功,数据只是堆在内存里。但它是一个”水位标记”,告诉你何时该暂停。
pipeline():正确连接流的方式
手动处理背压容易出错,Node.js 提供了 pipeline() 函数来自动处理:
import { pipeline } from 'stream/promises';import { createReadStream, createWriteStream } from 'fs';import { createGzip } from 'zlib';
// pipeline 自动处理:// 1. 背压// 2. 错误传播// 3. 流的清理(关闭/销毁)async function compressFile(input, output) { await pipeline( createReadStream(input), createGzip(), createWriteStream(output) ); console.log('Compression complete');}
compressFile('./huge-file.log', './huge-file.log.gz') .catch(console.error);pipeline vs pipe
// ❌ pipe 的问题:// 1. 不会自动传播错误// 2. 错误时不会自动清理流// 3. 需要手动给每个流添加 error handlerreadable .pipe(transform1) .pipe(transform2) .pipe(writable);// 如果 transform1 出错,readable 和 writable 不会被清理// → 文件描述符泄漏
// ✅ pipeline 解决所有问题import { pipeline } from 'stream';
pipeline( readable, transform1, transform2, writable, (err) => { if (err) { console.error('Pipeline failed:', err); } else { console.log('Pipeline succeeded'); } // 所有流都已正确清理 });
// 或者 Promise 版本import { pipeline as pipelineAsync } from 'stream/promises';
try { await pipelineAsync(readable, transform1, transform2, writable);} catch (err) { console.error('Pipeline failed:', err);}实战:大文件处理
场景一:大 CSV 文件统计分析
import { createReadStream } from 'fs';import { pipeline } from 'stream/promises';import { Transform, Writable } from 'stream';
// 行分割class LineParser extends Transform { #buffer = '';
constructor() { super({ readableObjectMode: true }); }
_transform(chunk, encoding, callback) { this.#buffer += chunk.toString(); const lines = this.#buffer.split('\n'); this.#buffer = lines.pop() || ''; for (const line of lines) { if (line.trim()) this.push(line); } callback(); }
_flush(callback) { if (this.#buffer.trim()) this.push(this.#buffer); callback(); }}
// CSV 解析(简化版)class CSVParser extends Transform { #headers = null;
constructor() { super({ objectMode: true }); }
_transform(line, encoding, callback) { const values = line.split(',').map(v => v.trim().replace(/^"|"$/g, ''));
if (!this.#headers) { this.#headers = values; } else { const record = {}; this.#headers.forEach((h, i) => { record[h] = values[i]; }); this.push(record); } callback(); }}
// 统计聚合class Aggregator extends Writable { stats = { total: 0, sumAge: 0, cities: new Map(), minAge: Infinity, maxAge: -Infinity };
constructor() { super({ objectMode: true }); }
_write(record, encoding, callback) { this.stats.total++;
const age = Number(record.age); if (!isNaN(age)) { this.stats.sumAge += age; this.stats.minAge = Math.min(this.stats.minAge, age); this.stats.maxAge = Math.max(this.stats.maxAge, age); }
const city = record.city || 'Unknown'; this.stats.cities.set(city, (this.stats.cities.get(city) || 0) + 1);
callback(); }
_final(callback) { console.log('\n=== Statistics ==='); console.log(`Total records: ${this.stats.total}`); console.log(`Average age: ${(this.stats.sumAge / this.stats.total).toFixed(1)}`); console.log(`Age range: ${this.stats.minAge} - ${this.stats.maxAge}`); console.log('\nTop cities:');
const sorted = [...this.stats.cities.entries()] .sort((a, b) => b[1] - a[1]) .slice(0, 10);
for (const [city, count] of sorted) { console.log(` ${city}: ${count}`); }
callback(); }}
// 运行await pipeline( createReadStream('./users.csv', { highWaterMark: 256 * 1024 }), new LineParser(), new CSVParser(), new Aggregator());场景二:日志文件实时处理
import { createReadStream, createWriteStream, statSync } from 'fs';import { pipeline } from 'stream/promises';import { Transform } from 'stream';
// JSON 日志解析class JSONLogParser extends Transform { #buffer = '';
constructor() { super({ readableObjectMode: true }); }
_transform(chunk, encoding, callback) { this.#buffer += chunk.toString(); const lines = this.#buffer.split('\n'); this.#buffer = lines.pop() || '';
for (const line of lines) { try { const log = JSON.parse(line); this.push(log); } catch { // 跳过无效的 JSON 行 } } callback(); }
_flush(callback) { if (this.#buffer.trim()) { try { this.push(JSON.parse(this.#buffer)); } catch { /* skip */ } } callback(); }}
// 错误日志过滤class ErrorFilter extends Transform { constructor() { super({ objectMode: true }); }
_transform(log, encoding, callback) { if (log.level === 'error' || log.level === 'fatal') { this.push(log); } callback(); }}
// 格式化输出class ErrorFormatter extends Transform { constructor() { super({ objectMode: true, writableObjectMode: true }); }
_transform(log, encoding, callback) { const formatted = [ `[${log.timestamp || new Date().toISOString()}]`, `[${(log.level || 'UNKNOWN').toUpperCase()}]`, log.message || 'No message', log.stack ? `\n Stack: ${log.stack}` : '', log.context ? `\n Context: ${JSON.stringify(log.context)}` : '', '\n---\n' ].join(' ');
this.push(formatted); callback(); }}
// 带进度的处理const fileSize = statSync('./app.log').size;
class Progress extends Transform { #processed = 0;
constructor(total) { super(); this.total = total; }
_transform(chunk, encoding, callback) { this.#processed += chunk.length; const pct = ((this.#processed / this.total) * 100).toFixed(1); process.stderr.write(`\rProcessing: ${pct}%`); this.push(chunk); callback(); }
_flush(callback) { process.stderr.write('\rProcessing: 100% — Done!\n'); callback(); }}
await pipeline( createReadStream('./app.log'), new Progress(fileSize), new JSONLogParser(), new ErrorFilter(), new ErrorFormatter(), createWriteStream('./errors.log'));
console.log('Error extraction complete');场景三:HTTP 文件上传流式处理
import { createServer } from 'http';import { createWriteStream } from 'fs';import { pipeline } from 'stream/promises';import { createGunzip } from 'zlib';import { Transform } from 'stream';import { randomUUID } from 'crypto';
// 限速转换流class RateLimiter extends Transform { #bytesPerSecond; #bytesThisSecond = 0; #lastReset = Date.now();
constructor(bytesPerSecond) { super(); this.#bytesPerSecond = bytesPerSecond; }
_transform(chunk, encoding, callback) { const now = Date.now(); if (now - this.#lastReset >= 1000) { this.#bytesThisSecond = 0; this.#lastReset = now; }
this.#bytesThisSecond += chunk.length;
if (this.#bytesThisSecond > this.#bytesPerSecond) { const delay = 1000 - (now - this.#lastReset); setTimeout(() => { this.push(chunk); callback(); }, delay); } else { this.push(chunk); callback(); } }}
// 大小限制转换流class SizeLimiter extends Transform { #maxBytes; #received = 0;
constructor(maxBytes) { super(); this.#maxBytes = maxBytes; }
_transform(chunk, encoding, callback) { this.#received += chunk.length;
if (this.#received > this.#maxBytes) { callback(new Error(`Upload exceeds maximum size of ${this.#maxBytes} bytes`)); return; }
this.push(chunk); callback(); }}
const server = createServer(async (req, res) => { if (req.method !== 'POST' || req.url !== '/upload') { res.writeHead(404); res.end('Not Found'); return; }
const fileId = randomUUID(); const filePath = `./uploads/${fileId}.dat`; const isGzipped = req.headers['content-encoding'] === 'gzip';
const streams = [ req, new SizeLimiter(100 * 1024 * 1024), // 100MB 限制 ];
if (isGzipped) { streams.push(createGunzip()); }
streams.push(createWriteStream(filePath));
try { await pipeline(...streams); res.writeHead(200, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ fileId, path: filePath })); } catch (err) { res.writeHead(413, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ error: err.message })); }});
server.listen(3000, () => { console.log('Upload server running on :3000');});流的组合与复用
compose() 函数
Node.js 18+ 提供了 stream.compose() 来将多个 Transform 组合成一个:
import { compose } from 'stream';
// 将多个 Transform 组合成一个可复用的流const logProcessor = compose( new LineParser(), new JSONLogParser(), new ErrorFilter(), new ErrorFormatter());
// 可以像单个 Transform 一样使用await pipeline( createReadStream('./app.log'), logProcessor, createWriteStream('./errors.log'));自定义流工具库
import { Transform } from 'stream';
// 工厂函数创建 Transformfunction createTransform(transformFn, flushFn = null) { return new Transform({ objectMode: true, transform(chunk, encoding, callback) { try { const result = transformFn(chunk); if (result !== undefined && result !== null) { this.push(result); } callback(); } catch (err) { callback(err); } }, flush(callback) { if (flushFn) { try { const result = flushFn(); if (result !== undefined) this.push(result); } catch (err) { return callback(err); } } callback(); } });}
// 使用const toUpperCase = () => createTransform(s => s.toString().toUpperCase());const addLineNumbers = () => { let n = 0; return createTransform(line => `${++n}: ${line}`);};const filterEmpty = () => createTransform(line => line.trim() ? line : undefined);
await pipeline( createReadStream('./input.txt'), new LineParser(), filterEmpty(), toUpperCase(), addLineNumbers(), createTransform(line => line + '\n'), createWriteStream('./output.txt'));性能优化
highWaterMark 调优
// 默认 16KB,对于大文件处理可以增大const readable = createReadStream('./huge.dat', { highWaterMark: 1024 * 1024 // 1MB — 减少 read 系统调用次数});
// 对于对象模式,highWaterMark 是对象数量const objectStream = new Transform({ objectMode: true, highWaterMark: 1000 // 缓冲 1000 个对象});避免在 Transform 中进行同步阻塞
// ❌ 阻塞事件循环class BadTransform extends Transform { _transform(chunk, encoding, callback) { const result = heavyCPUWork(chunk); // 同步阻塞! this.push(result); callback(); }}
// ✅ 使用 setImmediate 让出事件循环class GoodTransform extends Transform { _transform(chunk, encoding, callback) { setImmediate(() => { const result = heavyCPUWork(chunk); this.push(result); callback(); }); }}
// ✅✅ 更好:使用 Worker Threadsimport { Worker, isMainThread, parentPort, workerData } from 'worker_threads';
class WorkerTransform extends Transform { _transform(chunk, encoding, callback) { const worker = new Worker('./processor-worker.js', { workerData: chunk.toString() });
worker.on('message', (result) => { this.push(result); callback(); });
worker.on('error', callback); }}内存监控
import { Transform } from 'stream';
class MemoryMonitor extends Transform { #interval;
constructor() { super(); this.#interval = setInterval(() => { const usage = process.memoryUsage(); console.log(`Memory — RSS: ${(usage.rss / 1024 / 1024).toFixed(1)}MB, ` + `Heap: ${(usage.heapUsed / 1024 / 1024).toFixed(1)}MB / ` + `${(usage.heapTotal / 1024 / 1024).toFixed(1)}MB`); }, 2000); }
_transform(chunk, encoding, callback) { this.push(chunk); callback(); }
_flush(callback) { clearInterval(this.#interval); callback(); }}Web Streams API
Node.js 18+ 也支持 Web Streams API(与浏览器兼容):
// Web Streams APIconst readableStream = new ReadableStream({ start(controller) { controller.enqueue('Hello'); controller.enqueue('World'); controller.close(); }});
const transformStream = new TransformStream({ transform(chunk, controller) { controller.enqueue(chunk.toUpperCase()); }});
const writableStream = new WritableStream({ write(chunk) { console.log(chunk); }});
// 管道连接await readableStream .pipeThrough(transformStream) .pipeTo(writableStream);
// Node.js 流与 Web Streams 互转import { Readable } from 'stream';
const nodeStream = Readable.fromWeb(readableStream);const webStream = Readable.toWeb(nodeReadableStream);总结
Node.js Streams 是处理大量数据的核心基础设施:
- 四种流类型各有分工:Readable 产出、Writable 消费、Transform 变换、Duplex 双向
- 背压机制是流系统的生命线——永远不要忽略
write()的返回值 - pipeline() 是连接流的最佳实践——自动处理背压、错误传播和资源清理
- highWaterMark 是性能调优的关键旋钮
- 组合与复用让流处理管道可以像乐高一样拼装
掌握 Streams,你就掌握了 Node.js 处理海量数据的钥匙。
文章分享
如果这篇文章对你有帮助,欢迎分享给更多人!