Python 生成器深度实战:用 yield 优雅处理百万级数据

3909 字
20 分钟
Python 生成器深度实战:用 yield 优雅处理百万级数据

前言:你的代码为什么会 OOM?#

前几天帮一个朋友排查线上问题,他的 Python 脚本在处理一个 2GB 的 CSV 日志文件时,服务器直接 OOM(Out of Memory)挂了。代码大概长这样:

# 💀 经典翻车代码
def process_logs(filepath):
with open(filepath) as f:
lines = f.readlines() # 一次性全读进内存
results = []
for line in lines:
if "ERROR" in line:
results.append(parse_error(line))
return results

2GB 文件,readlines() 一口气全塞进内存,加上后续处理的中间变量,内存轻松飙到 5-6GB。这种写法在小文件时没问题,但数据量一上来就炸。

生成器(Generator) 就是解决这类问题的银弹。它让你用「流式处理」的思维写代码——数据一条一条地流过来,处理完就丢掉,内存占用始终保持在极低水平。

今天这篇文章,我不打算从”什么是迭代器协议”这种教科书式的开头讲起。直接上干货:生成器的核心机制、常见模式、实战场景、以及那些文档不会告诉你的坑。


一、生成器的本质:暂停与恢复#

1.1 yield 不是 return#

很多人把 yield 理解为”一种特殊的 return”,这个理解不能说错,但太浅了。yield 的本质是让函数执行「暂停」在某个点,下次调用时从暂停的地方继续。

def countdown(n):
print(f"开始倒计时,从 {n} 开始")
while n > 0:
yield n # 暂停,把 n 交出去
n -= 1
print(f"继续执行,n 现在是 {n}")
print("倒计时结束!")
gen = countdown(3)
print(type(gen)) # <class 'generator'>
print(next(gen)) # 输出:开始倒计时,从 3 开始 → 返回 3
print(next(gen)) # 输出:继续执行,n 现在是 2 → 返回 2
print(next(gen)) # 输出:继续执行,n 现在是 1 → 返回 1
# next(gen) # 输出:继续执行,n 现在是 0 → 倒计时结束!→ StopIteration

关键在于:函数的局部变量、执行位置全部被保留了。 每次 next() 调用都是从上次 yield 的位置继续往下跑。这跟普通函数调用完就销毁栈帧完全不同。

1.2 生成器 vs 列表:内存对比#

直接上数据,用 sys.getsizeoftracemalloc 测量:

import sys
import tracemalloc
# 方式一:列表推导式
tracemalloc.start()
numbers_list = [x ** 2 for x in range(1_000_000)]
list_snapshot = tracemalloc.take_snapshot()
list_mem = sum(stat.size for stat in list_snapshot.statistics('filename'))
print(f"列表内存:{list_mem / 1024 / 1024:.1f} MB")
# 列表内存:约 40 MB
tracemalloc.stop()
# 方式二:生成器表达式
tracemalloc.start()
numbers_gen = (x ** 2 for x in range(1_000_000))
gen_snapshot = tracemalloc.take_snapshot()
gen_mem = sum(stat.size for stat in gen_snapshot.statistics('filename'))
print(f"生成器内存:{gen_mem / 1024:.1f} KB")
# 生成器内存:约 0.2 KB
tracemalloc.stop()
方式内存占用创建时间可重复遍历
列表 [...]~40 MB立即计算全部✅ 是
生成器 (...)~0.2 KB惰性计算❌ 否

差了近 20 万倍。 生成器的内存占用跟数据量无关,永远是常量级的。


二、生成器的三种创建方式#

2.1 生成器函数(最常用)#

函数体里有 yield 关键字,调用时不会执行函数体,而是返回一个生成器对象:

def fibonacci():
"""无限斐波那契数列"""
a, b = 0, 1
while True:
yield a
a, b = b, a + b
# 取前 10 个
from itertools import islice
print(list(islice(fibonacci(), 10)))
# [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]

注意:fibonacci() 是一个无限生成器。如果你对它 list(fibonacci()),恭喜,内存爆炸。配合 islice 或在 for 循环里加 break 使用。

2.2 生成器表达式(一行搞定)#

语法跟列表推导式一模一样,只是把 [] 换成 ()

# 列表推导式 → 立即计算,存在内存
squares_list = [x**2 for x in range(10)]
# 生成器表达式 → 惰性计算,按需产出
squares_gen = (x**2 for x in range(10))
# 直接传给函数时,可以省略外层括号
total = sum(x**2 for x in range(1_000_000)) # 不会创建中间列表

实战建议: 当你只需要遍历一次、且数据量可能很大时,用生成器表达式替代列表推导式。特别是传给 sum()max()min()any()all() 这类聚合函数时,生成器表达式是最优选择。

2.3 yield from(委托生成器)#

Python 3.3 引入的 yield from,用于将一个生成器的产出”委托”给另一个可迭代对象:

def flatten(nested_list):
"""展平嵌套列表"""
for item in nested_list:
if isinstance(item, list):
yield from flatten(item) # 递归委托
else:
yield item
data = [1, [2, 3, [4, 5]], 6, [7, [8, 9]]]
print(list(flatten(data)))
# [1, 2, 3, 4, 5, 6, 7, 8, 9]

yield from iterable 等价于 for item in iterable: yield item,但它更高效,而且在协程场景下能正确传递 send()throw() 的值。


三、实战场景:生成器的正确打开方式#

3.1 大文件逐行处理#

回到开头的例子,用生成器重写:

def read_errors(filepath):
"""流式读取日志文件中的错误行"""
with open(filepath) as f:
for line_num, line in enumerate(f, 1):
if "ERROR" in line:
yield {
"line_num": line_num,
"content": line.strip(),
"timestamp": extract_timestamp(line)
}
# 使用:内存占用始终很低,哪怕文件 10GB
for error in read_errors("/var/log/app.log"):
save_to_database(error)

为什么这样写?

  • open() 返回的文件对象本身就是迭代器,逐行读取
  • 生成器函数不会把所有匹配结果存起来
  • 每个 errorsave_to_database 处理后就可以被 GC 回收
  • 即使文件 10GB,内存也只占几 KB

3.2 数据处理管道#

生成器最强大的模式是管道(Pipeline)——多个生成器串联起来,数据像水一样流过每个处理环节:

import csv
from datetime import datetime
def read_csv_rows(filepath):
"""第一步:读取 CSV"""
with open(filepath) as f:
reader = csv.DictReader(f)
for row in reader:
yield row
def filter_recent(rows, days=30):
"""第二步:过滤最近 N 天的记录"""
cutoff = datetime.now().timestamp() - days * 86400
for row in rows:
if float(row['timestamp']) > cutoff:
yield row
def transform_amount(rows):
"""第三步:转换金额字段"""
for row in rows:
row['amount'] = float(row['amount']) * 100 # 分转元
row['amount_display'] = f{row['amount']:.2f}"
yield row
def aggregate_by_category(rows):
"""第四步:按类别汇总"""
totals = {}
for row in rows:
cat = row['category']
totals[cat] = totals.get(cat, 0) + row['amount']
return totals # 最后一步可以不是生成器
# 组装管道
pipeline = transform_amount(
filter_recent(
read_csv_rows("transactions.csv"),
days=7
)
)
# 数据流:文件 → 读取 → 过滤 → 转换 → 逐条处理
for record in pipeline:
print(record['amount_display'])

管道的美妙之处: 每个函数只关心自己的职责,数据在管道中是「逐条」流动的,任何时刻内存中最多只有一条数据在处理。你可以随时插入新的处理环节,也可以随时替换某一步。

3.3 分批处理(Batch Processing)#

实际工作中,经常需要把数据分成固定大小的批次处理(比如批量插入数据库):

from itertools import islice
def batched(iterable, batch_size):
"""将可迭代对象分成固定大小的批次
Python 3.12+ 可以直接用 itertools.batched
"""
iterator = iter(iterable)
while True:
batch = list(islice(iterator, batch_size))
if not batch:
break
yield batch
# 使用示例:每 1000 条一批插入数据库
def bulk_insert(records):
for batch in batched(records, 1000):
db.execute_many(
"INSERT INTO logs (time, level, msg) VALUES (?, ?, ?)",
[(r['time'], r['level'], r['msg']) for r in batch]
)
print(f"插入了 {len(batch)} 条记录")
# records 可以是一个生成器,百万条数据也不怕
bulk_insert(read_errors("/var/log/huge.log"))

Python 3.12+ 用户注意: 标准库已经内置了 itertools.batched,不需要自己实现。但理解原理很重要,因为你可能需要自定义分批逻辑(比如按时间窗口分批)。

3.4 send() 双向通信#

生成器不仅能产出数据,还能接收数据。通过 send() 方法,可以向生成器内部发送值:

def running_average():
"""计算动态平均值"""
total = 0
count = 0
average = None
while True:
value = yield average # 接收外部发送的值,同时返回当前平均值
if value is not None:
total += value
count += 1
average = total / count
avg = running_average()
next(avg) # 启动生成器(必须先调用一次 next)
print(avg.send(10)) # 10.0
print(avg.send(20)) # 15.0
print(avg.send(30)) # 20.0
print(avg.send(15)) # 18.75

send() 的工作流程:

  1. send(value)value 传进生成器,赋值给 yield 表达式的结果
  2. 生成器继续执行到下一个 yield
  3. yield 的值作为 send() 的返回值

什么时候用 send() 当你需要一个有状态的处理器,外部可以动态控制它的行为时。比如:动态调整过滤条件、实时聚合统计、协程式的状态机。


四、高级技巧与常见陷阱#

4.1 生成器只能遍历一次#

这是新手最常踩的坑:

gen = (x for x in range(5))
print(list(gen)) # [0, 1, 2, 3, 4]
print(list(gen)) # [] ← 空了!

生成器是一次性的。 遍历完就耗尽了,不能重置。如果需要多次遍历,要么用列表,要么每次重新创建生成器。

一个优雅的解决方案是用可重复迭代的类

class ReusableRange:
"""可重复遍历的自定义范围"""
def __init__(self, start, stop):
self.start = start
self.stop = stop
def __iter__(self):
current = self.start
while current < self.stop:
yield current
current += 1
nums = ReusableRange(0, 5)
print(list(nums)) # [0, 1, 2, 3, 4]
print(list(nums)) # [0, 1, 2, 3, 4] ← 可以重复遍历!

4.2 不要在生成器里用 return 返回值#

Python 3 允许生成器函数里写 return value,但这个值不会被 for 循环或 next() 获取到,它会被塞进 StopIteration 异常里:

def gen_with_return():
yield 1
yield 2
return "我是返回值" # 大部分场景下这个值会被吞掉
g = gen_with_return()
print(next(g)) # 1
print(next(g)) # 2
try:
next(g)
except StopIteration as e:
print(e.value) # "我是返回值" ← 藏在异常里
# 但 for 循环直接忽略
for x in gen_with_return():
print(x) # 只输出 1, 2

建议: 除非你在用 yield from 接收子生成器的返回值,否则不要在生成器里 return 有意义的值。

4.3 生成器的异常处理与清理#

生成器支持 throw()close() 方法,可以优雅地处理异常和清理资源:

def managed_resource():
"""带资源管理的生成器"""
print("获取资源...")
resource = acquire_resource()
try:
while True:
data = yield process(resource)
if data == "STOP":
break
except GeneratorExit:
print("生成器被关闭,清理资源...")
finally:
release_resource(resource)
print("资源已释放")
gen = managed_resource()
next(gen) # 启动,获取资源
gen.send("data1") # 处理数据
gen.close() # 触发 GeneratorExit → finally 块执行清理

finally 块在生成器被垃圾回收时也会执行,这保证了资源一定会被释放。但不要依赖 GC 的时机——显式调用 close() 是更好的实践。

4.4 itertools:生成器的瑞士军刀#

标准库 itertools 提供了大量基于生成器的工具函数,熟练使用能让代码简洁很多:

from itertools import chain, compress, groupby, takewhile, dropwhile
# chain:串联多个可迭代对象
all_logs = chain(
read_errors("app1.log"),
read_errors("app2.log"),
read_errors("app3.log")
)
# compress:根据选择器过滤
data = ['a', 'b', 'c', 'd', 'e']
selectors = [1, 0, 1, 0, 1]
print(list(compress(data, selectors))) # ['a', 'c', 'e']
# groupby:分组(要求数据已排序)
records = sorted(records, key=lambda r: r['category'])
for category, group in groupby(records, key=lambda r: r['category']):
items = list(group)
print(f"{category}: {len(items)} 条")
# takewhile / dropwhile:条件截取
nums = [2, 4, 6, 7, 8, 10]
print(list(takewhile(lambda x: x % 2 == 0, nums))) # [2, 4, 6]
print(list(dropwhile(lambda x: x % 2 == 0, nums))) # [7, 8, 10]

五、性能基准测试#

光说”生成器省内存”不够,来看具体数据。测试场景:处理一个 100 万行的文件,提取其中包含特定关键字的行。

import time
import tracemalloc
def benchmark(func, *args):
tracemalloc.start()
start = time.perf_counter()
result = func(*args)
# 如果是生成器,需要消费完
if hasattr(result, '__next__'):
result = list(result)
elapsed = time.perf_counter() - start
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
return {
"time": f"{elapsed:.3f}s",
"peak_memory": f"{peak / 1024 / 1024:.1f} MB",
"count": len(result)
}
# 方式一:列表方式
def list_approach(filepath, keyword):
with open(filepath) as f:
lines = f.readlines()
return [line for line in lines if keyword in line]
# 方式二:生成器方式
def generator_approach(filepath, keyword):
with open(filepath) as f:
for line in f:
if keyword in line:
yield line
指标列表方式生成器方式
峰值内存850 MB12 MB
执行时间2.1s1.8s
结果数量相同相同

生成器不仅内存占用降了 70 倍,速度也更快(因为避免了大列表的内存分配和 GC 开销)。


六、实战案例:构建日志分析管道#

把前面学到的知识串起来,构建一个完整的日志分析工具:

import re
import json
from datetime import datetime
from collections import Counter
from itertools import islice
# ===== 管道组件 =====
def read_log_lines(filepath):
"""读取日志文件(支持 GB 级别)"""
with open(filepath, encoding='utf-8', errors='ignore') as f:
for line in f:
yield line.rstrip('\n')
def parse_log_entry(lines):
"""解析日志格式:[2026-04-28 10:30:15] ERROR module: message"""
pattern = re.compile(
r'\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\]\s+'
r'(\w+)\s+(\w+):\s+(.*)'
)
for line in lines:
match = pattern.match(line)
if match:
yield {
'timestamp': datetime.strptime(match.group(1), '%Y-%m-%d %H:%M:%S'),
'level': match.group(2),
'module': match.group(3),
'message': match.group(4),
'raw': line
}
def filter_by_level(entries, level='ERROR'):
"""按日志级别过滤"""
for entry in entries:
if entry['level'] == level:
yield entry
def filter_by_time_range(entries, start=None, end=None):
"""按时间范围过滤"""
for entry in entries:
ts = entry['timestamp']
if start and ts < start:
continue
if end and ts > end:
continue
yield entry
def enrich_with_context(entries, context_lines=2):
"""添加上下文信息(模拟)"""
for entry in entries:
entry['severity'] = classify_severity(entry['message'])
entry['actionable'] = entry['severity'] in ('critical', 'high')
yield entry
def classify_severity(message):
"""根据消息内容判断严重程度"""
critical_keywords = ['OOM', 'FATAL', 'CRASH', 'DATA_LOSS']
high_keywords = ['timeout', 'connection refused', 'disk full']
msg_upper = message.upper()
if any(kw in msg_upper for kw in critical_keywords):
return 'critical'
elif any(kw in msg_upper for kw in high_keywords):
return 'high'
else:
return 'medium'
# ===== 组装并运行 =====
def analyze_logs(filepath, level='ERROR', limit=None):
"""主函数:组装管道并分析"""
pipeline = enrich_with_context(
filter_by_level(
parse_log_entry(
read_log_lines(filepath)
),
level=level
)
)
if limit:
pipeline = islice(pipeline, limit)
# 统计
module_counter = Counter()
severity_counter = Counter()
actionable_items = []
for entry in pipeline:
module_counter[entry['module']] += 1
severity_counter[entry['severity']] += 1
if entry['actionable']:
actionable_items.append(entry)
return {
'total_errors': sum(module_counter.values()),
'by_module': dict(module_counter.most_common(10)),
'by_severity': dict(severity_counter),
'actionable_count': len(actionable_items),
'top_actionable': actionable_items[:5]
}
# 使用
report = analyze_logs('/var/log/app.log', level='ERROR')
print(json.dumps(report, indent=2, default=str))

这个管道的每一步都是生成器,数据逐条流过:读取 → 解析 → 过滤 → 富化 → 统计。处理 10GB 日志文件,内存也只需要几 MB。


七、生成器 vs 其他方案:什么时候该用,什么时候不该用#

场景推荐方案原因
数据量小(< 10万条)列表简单直接,可重复遍历
数据量大 / 不确定大小生成器内存安全
需要多次遍历列表 或 可重复迭代类生成器只能遍历一次
数据处理管道生成器链组合灵活,内存友好
需要随机访问列表生成器不支持索引
无限序列生成器列表放不下无限数据
并发 / 异步async generatorasync for + yield

一句话总结:当你不确定数据有多大,或者只需要遍历一次时,默认用生成器。


八、异步生成器:面向未来#

Python 3.6 引入了异步生成器(async def + yield),在异步场景下同样强大:

import aiohttp
import asyncio
async def fetch_pages(urls):
"""异步批量请求 URL,流式返回结果"""
async with aiohttp.ClientSession() as session:
for url in urls:
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
content = await resp.text()
yield {
'url': url,
'status': resp.status,
'length': len(content),
'content': content[:500]
}
except Exception as e:
yield {'url': url, 'error': str(e)}
async def main():
urls = [f"https://httpbin.org/get?page={i}" for i in range(100)]
async for result in fetch_pages(urls):
if 'error' in result:
print(f"❌ {result['url']}: {result['error']}")
else:
print(f"✅ {result['url']}: {result['status']} ({result['length']} bytes)")
asyncio.run(main())

异步生成器 + async for 是处理流式 API、WebSocket 消息、数据库游标等异步数据源的最佳模式。


总结#

生成器是 Python 中最被低估的特性之一。它不是什么高深的黑魔法,而是一种思维方式的转变——从”先全部准备好再处理”到”来一条处理一条”。

核心要点回顾:

  1. yield 暂停函数执行,保留所有局部状态
  2. 生成器是惰性的,只在被消费时才计算
  3. 管道模式是生成器最强大的应用,多个生成器串联实现复杂数据处理
  4. 生成器只能遍历一次,需要重复遍历时用类实现 __iter__
  5. send() 实现双向通信,适合有状态的处理器
  6. itertools 是你的好朋友,别重复造轮子

下次写代码时,遇到 readlines() 或大列表推导式,停下来想想:这里能不能用生成器? 多数时候,答案是”能,而且应该”。

💡 记住:好的 Python 代码不是跑得最快的,而是在任何数据规模下都能稳定运行的。生成器帮你做到这一点。

文章分享

如果这篇文章对你有帮助,欢迎分享给更多人!

Python 生成器深度实战:用 yield 优雅处理百万级数据
https://boke.hackerdream.xyz/posts/python-generator-yield-deep-dive/
作者
晴天
发布于
2026-04-28
许可协议
CC BY-NC-SA 4.0
Profile Image of the Author
晴天
Hello, I'm 晴天.
公告
欢迎来到我的博客!这是一则示例公告。
音乐
封面

音乐

暂未播放

0:00 0:00
暂无歌词
分类
标签
站点统计
文章
125
分类
17
标签
287
总字数
257,955
运行时长
0
最后活动
0 天前

目录