Node.js Streams 与背压机制:处理大文件的正确姿势

3298 字
16 分钟
Node.js Streams 与背压机制:处理大文件的正确姿势

前言#

在 Node.js 中处理大文件是一个经典场景——读取一个 10GB 的日志文件进行分析、将数据库导出的 CSV 转换格式、实时处理视频流……如果你用 fs.readFile() 一次性读入内存,要么 OOM 崩溃,要么严重阻塞事件循环。

Streams(流) 是 Node.js 处理大量数据的核心抽象。它将数据分成小块(chunk)逐步处理,内存占用恒定且可控。而背压(Backpressure) 是流系统中最重要也最容易被忽略的机制——它确保生产者不会淹没消费者。

本文将深入讲解 Node.js 四种流类型、pipeline API、背压原理,并通过实战案例展示正确处理大文件的姿势。

流的四种类型#

Node.js 提供四种基本流类型:

类型描述示例
Readable可读流,数据源fs.createReadStream, http.IncomingMessage
Writable可写流,数据目标fs.createWriteStream, http.ServerResponse
Duplex双工流,可读可写net.Socket, zlib
Transform转换流,读入-处理-写出zlib.createGzip, crypto.createCipher

Readable Stream(可读流)#

import { createReadStream } from 'fs';
import { Readable } from 'stream';
// 从文件创建可读流
const fileStream = createReadStream('/var/log/app.log', {
encoding: 'utf-8',
highWaterMark: 64 * 1024 // 每次读取 64KB
});
// 事件模式(flowing mode)
fileStream.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes`);
});
fileStream.on('end', () => {
console.log('File reading complete');
});
fileStream.on('error', (err) => {
console.error('Read error:', err);
});

两种消费模式#

可读流有两种模式:flowing(流动)paused(暂停)

// Paused mode(暂停模式)— 手动拉取数据
fileStream.on('readable', () => {
let chunk;
while ((chunk = fileStream.read()) !== null) {
console.log(`Read ${chunk.length} bytes`);
}
});
// Flowing mode(流动模式)— 数据自动推送
fileStream.on('data', (chunk) => {
// 一旦注册 data 事件,流自动进入 flowing mode
process.stdout.write(chunk);
});
// 手动控制流动
fileStream.pause(); // 暂停
fileStream.resume(); // 恢复

自定义 Readable#

class CounterStream extends Readable {
#current;
#max;
constructor(max, options = {}) {
super(options);
this.#current = 1;
this.#max = max;
}
_read(size) {
if (this.#current > this.#max) {
this.push(null); // null 表示流结束
return;
}
const str = String(this.#current++) + '\n';
this.push(str);
}
}
const counter = new CounterStream(100);
counter.pipe(process.stdout);
// 输出 1 到 100

从异步数据源创建 Readable#

import { Readable } from 'stream';
// 从异步生成器创建
async function* fetchAllPages(url) {
let page = 1;
let hasMore = true;
while (hasMore) {
const res = await fetch(`${url}?page=${page}`);
const data = await res.json();
yield JSON.stringify(data) + '\n';
hasMore = data.length > 0;
page++;
}
}
const apiStream = Readable.from(fetchAllPages('https://api.example.com/items'));

Writable Stream(可写流)#

import { createWriteStream } from 'fs';
import { Writable } from 'stream';
// 文件可写流
const output = createWriteStream('./output.txt');
output.write('Hello ');
output.write('World\n');
output.end('Done!'); // end 写入最后的数据并关闭流
output.on('finish', () => {
console.log('All data written');
});

自定义 Writable#

class LogWriter extends Writable {
#lineCount = 0;
_write(chunk, encoding, callback) {
const lines = chunk.toString().split('\n').filter(Boolean);
this.#lineCount += lines.length;
for (const line of lines) {
const timestamp = new Date().toISOString();
process.stdout.write(`[${timestamp}] ${line}\n`);
}
// callback 必须调用,表示这个 chunk 处理完成
// 如果处理出错,传入 error: callback(new Error('...'))
callback();
}
_final(callback) {
console.log(`\nTotal lines processed: ${this.#lineCount}`);
callback();
}
}
const logger = new LogWriter();
logger.write('First message\n');
logger.write('Second message\n');
logger.end('Final message\n');

Transform Stream(转换流)#

Transform 是最实用的流类型——它接收输入数据、处理后输出新的数据。

import { Transform } from 'stream';
// CSV 转 JSON 转换流
class CSVToJSON extends Transform {
#headers = null;
#buffer = '';
constructor(options = {}) {
super({ ...options, readableObjectMode: true });
}
_transform(chunk, encoding, callback) {
this.#buffer += chunk.toString();
const lines = this.#buffer.split('\n');
this.#buffer = lines.pop() || ''; // 保留不完整的行
for (const line of lines) {
if (!line.trim()) continue;
const values = line.split(',').map(v => v.trim());
if (!this.#headers) {
this.#headers = values;
continue;
}
const obj = {};
this.#headers.forEach((header, i) => {
obj[header] = values[i];
});
this.push(obj);
}
callback();
}
_flush(callback) {
// 处理最后残留的数据
if (this.#buffer.trim() && this.#headers) {
const values = this.#buffer.split(',').map(v => v.trim());
const obj = {};
this.#headers.forEach((header, i) => {
obj[header] = values[i];
});
this.push(obj);
}
callback();
}
}
// 使用
import { createReadStream } from 'fs';
const csvStream = createReadStream('./data.csv');
const parser = new CSVToJSON();
csvStream.pipe(parser).on('data', (record) => {
console.log(record);
// { name: 'Alice', age: '30', email: 'alice@test.com' }
});

实用 Transform 示例#

// 行分割转换流
class LineSplitter extends Transform {
#buffer = '';
constructor() {
super({ readableObjectMode: true });
}
_transform(chunk, encoding, callback) {
this.#buffer += chunk.toString();
const lines = this.#buffer.split('\n');
this.#buffer = lines.pop() || '';
for (const line of lines) {
if (line) this.push(line);
}
callback();
}
_flush(callback) {
if (this.#buffer) this.push(this.#buffer);
callback();
}
}
// 进度报告转换流
class ProgressReporter extends Transform {
#processed = 0;
#total;
#lastReport = 0;
constructor(total) {
super();
this.#total = total;
}
_transform(chunk, encoding, callback) {
this.#processed += chunk.length;
const percent = Math.round((this.#processed / this.#total) * 100);
if (percent - this.#lastReport >= 5) { // 每 5% 报告一次
this.#lastReport = percent;
process.stderr.write(`\rProgress: ${percent}% (${this.#processed} / ${this.#total} bytes)`);
}
this.push(chunk); // 数据原样传递
callback();
}
_flush(callback) {
process.stderr.write('\rProgress: 100% — Complete!\n');
callback();
}
}
// 数据过滤转换流
class JSONFilter extends Transform {
#predicate;
constructor(predicate) {
super({ objectMode: true });
this.#predicate = predicate;
}
_transform(obj, encoding, callback) {
if (this.#predicate(obj)) {
this.push(obj);
}
callback();
}
}

Duplex Stream(双工流)#

双工流同时可读可写,但读写之间可以是独立的:

import { Duplex } from 'stream';
class EchoStream extends Duplex {
#queue = [];
_write(chunk, encoding, callback) {
// 写入端接收数据,转换后放入队列
this.#queue.push(chunk.toString().toUpperCase());
callback();
}
_read(size) {
if (this.#queue.length > 0) {
this.push(this.#queue.shift());
} else {
// 没有数据可读时,延迟重试
setTimeout(() => this._read(size), 10);
}
}
}

背压(Backpressure)#

背压是流系统中最关键的概念。当数据生产速度超过消费速度时,如果没有背压机制,数据会在内存中无限堆积。

背压问题演示#

import { createReadStream, createWriteStream } from 'fs';
const readable = createReadStream('./huge-file.dat'); // 从 SSD 读取,非常快
const writable = createWriteStream('./output.dat'); // 写入网络挂载的磁盘,很慢
// ❌ 错误做法:不处理背压
readable.on('data', (chunk) => {
writable.write(chunk);
// write() 返回 false 时说明内部缓冲区满了
// 但我们忽略了返回值,继续狂写
// → 内存不断增长 → OOM
});

手动处理背压#

// ✅ 正确做法:手动处理背压
readable.on('data', (chunk) => {
const canContinue = writable.write(chunk);
if (!canContinue) {
// 内部缓冲区满了,暂停读取
readable.pause();
// 等缓冲区排空后再继续
writable.once('drain', () => {
readable.resume();
});
}
});
readable.on('end', () => {
writable.end();
});

背压原理深入#

每个流都有一个内部缓冲区,大小由 highWaterMark 控制(默认 16KB for Buffer,16 objects for objectMode)。

生产者 (Readable) 消费者 (Writable)
│ │
│ chunk ──────────────► 内部缓冲区
│ chunk ──────────────► [####............] ← highWaterMark
│ chunk ──────────────► [########........]
│ chunk ──────────────► [############....]
│ chunk ──────────────► [################] ← 缓冲区满!
│ │
│ write() returns false │
│ ← 暂停生产 │
│ │ → 慢慢消费...
│ │ [########....]
│ │ [............] ← 缓冲区清空
│ │
│ 'drain' event ─────────► │
│ → 恢复生产 │

highWaterMark 不是硬限制——即使超过了,write() 仍然能成功,数据只是堆在内存里。但它是一个”水位标记”,告诉你何时该暂停。

pipeline():正确连接流的方式#

手动处理背压容易出错,Node.js 提供了 pipeline() 函数来自动处理:

import { pipeline } from 'stream/promises';
import { createReadStream, createWriteStream } from 'fs';
import { createGzip } from 'zlib';
// pipeline 自动处理:
// 1. 背压
// 2. 错误传播
// 3. 流的清理(关闭/销毁)
async function compressFile(input, output) {
await pipeline(
createReadStream(input),
createGzip(),
createWriteStream(output)
);
console.log('Compression complete');
}
compressFile('./huge-file.log', './huge-file.log.gz')
.catch(console.error);

pipeline vs pipe#

// ❌ pipe 的问题:
// 1. 不会自动传播错误
// 2. 错误时不会自动清理流
// 3. 需要手动给每个流添加 error handler
readable
.pipe(transform1)
.pipe(transform2)
.pipe(writable);
// 如果 transform1 出错,readable 和 writable 不会被清理
// → 文件描述符泄漏
// ✅ pipeline 解决所有问题
import { pipeline } from 'stream';
pipeline(
readable,
transform1,
transform2,
writable,
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline succeeded');
}
// 所有流都已正确清理
}
);
// 或者 Promise 版本
import { pipeline as pipelineAsync } from 'stream/promises';
try {
await pipelineAsync(readable, transform1, transform2, writable);
} catch (err) {
console.error('Pipeline failed:', err);
}

实战:大文件处理#

场景一:大 CSV 文件统计分析#

import { createReadStream } from 'fs';
import { pipeline } from 'stream/promises';
import { Transform, Writable } from 'stream';
// 行分割
class LineParser extends Transform {
#buffer = '';
constructor() {
super({ readableObjectMode: true });
}
_transform(chunk, encoding, callback) {
this.#buffer += chunk.toString();
const lines = this.#buffer.split('\n');
this.#buffer = lines.pop() || '';
for (const line of lines) {
if (line.trim()) this.push(line);
}
callback();
}
_flush(callback) {
if (this.#buffer.trim()) this.push(this.#buffer);
callback();
}
}
// CSV 解析(简化版)
class CSVParser extends Transform {
#headers = null;
constructor() {
super({ objectMode: true });
}
_transform(line, encoding, callback) {
const values = line.split(',').map(v => v.trim().replace(/^"|"$/g, ''));
if (!this.#headers) {
this.#headers = values;
} else {
const record = {};
this.#headers.forEach((h, i) => { record[h] = values[i]; });
this.push(record);
}
callback();
}
}
// 统计聚合
class Aggregator extends Writable {
stats = {
total: 0,
sumAge: 0,
cities: new Map(),
minAge: Infinity,
maxAge: -Infinity
};
constructor() {
super({ objectMode: true });
}
_write(record, encoding, callback) {
this.stats.total++;
const age = Number(record.age);
if (!isNaN(age)) {
this.stats.sumAge += age;
this.stats.minAge = Math.min(this.stats.minAge, age);
this.stats.maxAge = Math.max(this.stats.maxAge, age);
}
const city = record.city || 'Unknown';
this.stats.cities.set(city, (this.stats.cities.get(city) || 0) + 1);
callback();
}
_final(callback) {
console.log('\n=== Statistics ===');
console.log(`Total records: ${this.stats.total}`);
console.log(`Average age: ${(this.stats.sumAge / this.stats.total).toFixed(1)}`);
console.log(`Age range: ${this.stats.minAge} - ${this.stats.maxAge}`);
console.log('\nTop cities:');
const sorted = [...this.stats.cities.entries()]
.sort((a, b) => b[1] - a[1])
.slice(0, 10);
for (const [city, count] of sorted) {
console.log(` ${city}: ${count}`);
}
callback();
}
}
// 运行
await pipeline(
createReadStream('./users.csv', { highWaterMark: 256 * 1024 }),
new LineParser(),
new CSVParser(),
new Aggregator()
);

场景二:日志文件实时处理#

import { createReadStream, createWriteStream, statSync } from 'fs';
import { pipeline } from 'stream/promises';
import { Transform } from 'stream';
// JSON 日志解析
class JSONLogParser extends Transform {
#buffer = '';
constructor() {
super({ readableObjectMode: true });
}
_transform(chunk, encoding, callback) {
this.#buffer += chunk.toString();
const lines = this.#buffer.split('\n');
this.#buffer = lines.pop() || '';
for (const line of lines) {
try {
const log = JSON.parse(line);
this.push(log);
} catch {
// 跳过无效的 JSON 行
}
}
callback();
}
_flush(callback) {
if (this.#buffer.trim()) {
try {
this.push(JSON.parse(this.#buffer));
} catch { /* skip */ }
}
callback();
}
}
// 错误日志过滤
class ErrorFilter extends Transform {
constructor() {
super({ objectMode: true });
}
_transform(log, encoding, callback) {
if (log.level === 'error' || log.level === 'fatal') {
this.push(log);
}
callback();
}
}
// 格式化输出
class ErrorFormatter extends Transform {
constructor() {
super({ objectMode: true, writableObjectMode: true });
}
_transform(log, encoding, callback) {
const formatted = [
`[${log.timestamp || new Date().toISOString()}]`,
`[${(log.level || 'UNKNOWN').toUpperCase()}]`,
log.message || 'No message',
log.stack ? `\n Stack: ${log.stack}` : '',
log.context ? `\n Context: ${JSON.stringify(log.context)}` : '',
'\n---\n'
].join(' ');
this.push(formatted);
callback();
}
}
// 带进度的处理
const fileSize = statSync('./app.log').size;
class Progress extends Transform {
#processed = 0;
constructor(total) {
super();
this.total = total;
}
_transform(chunk, encoding, callback) {
this.#processed += chunk.length;
const pct = ((this.#processed / this.total) * 100).toFixed(1);
process.stderr.write(`\rProcessing: ${pct}%`);
this.push(chunk);
callback();
}
_flush(callback) {
process.stderr.write('\rProcessing: 100% — Done!\n');
callback();
}
}
await pipeline(
createReadStream('./app.log'),
new Progress(fileSize),
new JSONLogParser(),
new ErrorFilter(),
new ErrorFormatter(),
createWriteStream('./errors.log')
);
console.log('Error extraction complete');

场景三:HTTP 文件上传流式处理#

import { createServer } from 'http';
import { createWriteStream } from 'fs';
import { pipeline } from 'stream/promises';
import { createGunzip } from 'zlib';
import { Transform } from 'stream';
import { randomUUID } from 'crypto';
// 限速转换流
class RateLimiter extends Transform {
#bytesPerSecond;
#bytesThisSecond = 0;
#lastReset = Date.now();
constructor(bytesPerSecond) {
super();
this.#bytesPerSecond = bytesPerSecond;
}
_transform(chunk, encoding, callback) {
const now = Date.now();
if (now - this.#lastReset >= 1000) {
this.#bytesThisSecond = 0;
this.#lastReset = now;
}
this.#bytesThisSecond += chunk.length;
if (this.#bytesThisSecond > this.#bytesPerSecond) {
const delay = 1000 - (now - this.#lastReset);
setTimeout(() => {
this.push(chunk);
callback();
}, delay);
} else {
this.push(chunk);
callback();
}
}
}
// 大小限制转换流
class SizeLimiter extends Transform {
#maxBytes;
#received = 0;
constructor(maxBytes) {
super();
this.#maxBytes = maxBytes;
}
_transform(chunk, encoding, callback) {
this.#received += chunk.length;
if (this.#received > this.#maxBytes) {
callback(new Error(`Upload exceeds maximum size of ${this.#maxBytes} bytes`));
return;
}
this.push(chunk);
callback();
}
}
const server = createServer(async (req, res) => {
if (req.method !== 'POST' || req.url !== '/upload') {
res.writeHead(404);
res.end('Not Found');
return;
}
const fileId = randomUUID();
const filePath = `./uploads/${fileId}.dat`;
const isGzipped = req.headers['content-encoding'] === 'gzip';
const streams = [
req,
new SizeLimiter(100 * 1024 * 1024), // 100MB 限制
];
if (isGzipped) {
streams.push(createGunzip());
}
streams.push(createWriteStream(filePath));
try {
await pipeline(...streams);
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ fileId, path: filePath }));
} catch (err) {
res.writeHead(413, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: err.message }));
}
});
server.listen(3000, () => {
console.log('Upload server running on :3000');
});

流的组合与复用#

compose() 函数#

Node.js 18+ 提供了 stream.compose() 来将多个 Transform 组合成一个:

import { compose } from 'stream';
// 将多个 Transform 组合成一个可复用的流
const logProcessor = compose(
new LineParser(),
new JSONLogParser(),
new ErrorFilter(),
new ErrorFormatter()
);
// 可以像单个 Transform 一样使用
await pipeline(
createReadStream('./app.log'),
logProcessor,
createWriteStream('./errors.log')
);

自定义流工具库#

import { Transform } from 'stream';
// 工厂函数创建 Transform
function createTransform(transformFn, flushFn = null) {
return new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
try {
const result = transformFn(chunk);
if (result !== undefined && result !== null) {
this.push(result);
}
callback();
} catch (err) {
callback(err);
}
},
flush(callback) {
if (flushFn) {
try {
const result = flushFn();
if (result !== undefined) this.push(result);
} catch (err) {
return callback(err);
}
}
callback();
}
});
}
// 使用
const toUpperCase = () => createTransform(s => s.toString().toUpperCase());
const addLineNumbers = () => {
let n = 0;
return createTransform(line => `${++n}: ${line}`);
};
const filterEmpty = () => createTransform(line => line.trim() ? line : undefined);
await pipeline(
createReadStream('./input.txt'),
new LineParser(),
filterEmpty(),
toUpperCase(),
addLineNumbers(),
createTransform(line => line + '\n'),
createWriteStream('./output.txt')
);

性能优化#

highWaterMark 调优#

// 默认 16KB,对于大文件处理可以增大
const readable = createReadStream('./huge.dat', {
highWaterMark: 1024 * 1024 // 1MB — 减少 read 系统调用次数
});
// 对于对象模式,highWaterMark 是对象数量
const objectStream = new Transform({
objectMode: true,
highWaterMark: 1000 // 缓冲 1000 个对象
});

避免在 Transform 中进行同步阻塞#

// ❌ 阻塞事件循环
class BadTransform extends Transform {
_transform(chunk, encoding, callback) {
const result = heavyCPUWork(chunk); // 同步阻塞!
this.push(result);
callback();
}
}
// ✅ 使用 setImmediate 让出事件循环
class GoodTransform extends Transform {
_transform(chunk, encoding, callback) {
setImmediate(() => {
const result = heavyCPUWork(chunk);
this.push(result);
callback();
});
}
}
// ✅✅ 更好:使用 Worker Threads
import { Worker, isMainThread, parentPort, workerData } from 'worker_threads';
class WorkerTransform extends Transform {
_transform(chunk, encoding, callback) {
const worker = new Worker('./processor-worker.js', {
workerData: chunk.toString()
});
worker.on('message', (result) => {
this.push(result);
callback();
});
worker.on('error', callback);
}
}

内存监控#

import { Transform } from 'stream';
class MemoryMonitor extends Transform {
#interval;
constructor() {
super();
this.#interval = setInterval(() => {
const usage = process.memoryUsage();
console.log(`Memory — RSS: ${(usage.rss / 1024 / 1024).toFixed(1)}MB, ` +
`Heap: ${(usage.heapUsed / 1024 / 1024).toFixed(1)}MB / ` +
`${(usage.heapTotal / 1024 / 1024).toFixed(1)}MB`);
}, 2000);
}
_transform(chunk, encoding, callback) {
this.push(chunk);
callback();
}
_flush(callback) {
clearInterval(this.#interval);
callback();
}
}

Web Streams API#

Node.js 18+ 也支持 Web Streams API(与浏览器兼容):

// Web Streams API
const readableStream = new ReadableStream({
start(controller) {
controller.enqueue('Hello');
controller.enqueue('World');
controller.close();
}
});
const transformStream = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
}
});
const writableStream = new WritableStream({
write(chunk) {
console.log(chunk);
}
});
// 管道连接
await readableStream
.pipeThrough(transformStream)
.pipeTo(writableStream);
// Node.js 流与 Web Streams 互转
import { Readable } from 'stream';
const nodeStream = Readable.fromWeb(readableStream);
const webStream = Readable.toWeb(nodeReadableStream);

总结#

Node.js Streams 是处理大量数据的核心基础设施:

  1. 四种流类型各有分工:Readable 产出、Writable 消费、Transform 变换、Duplex 双向
  2. 背压机制是流系统的生命线——永远不要忽略 write() 的返回值
  3. pipeline() 是连接流的最佳实践——自动处理背压、错误传播和资源清理
  4. highWaterMark 是性能调优的关键旋钮
  5. 组合与复用让流处理管道可以像乐高一样拼装

掌握 Streams,你就掌握了 Node.js 处理海量数据的钥匙。

文章分享

如果这篇文章对你有帮助,欢迎分享给更多人!

Node.js Streams 与背压机制:处理大文件的正确姿势
https://boke.hackerdream.xyz/posts/node-streams-backpressure/
作者
晴天
发布于
2026-02-12
许可协议
CC BY-NC-SA 4.0
Profile Image of the Author
晴天
Hello, I'm 晴天.
公告
欢迎来到我的博客!这是一则示例公告。
音乐
封面

音乐

暂未播放

0:00 0:00
暂无歌词
分类
标签
站点统计
文章
125
分类
17
标签
287
总字数
257,955
运行时长
0
最后活动
0 天前

目录