Python 生成器深度实战:用 yield 优雅处理百万级数据
前言:你的代码为什么会 OOM?
前几天帮一个朋友排查线上问题,他的 Python 脚本在处理一个 2GB 的 CSV 日志文件时,服务器直接 OOM(Out of Memory)挂了。代码大概长这样:
# 💀 经典翻车代码def process_logs(filepath): with open(filepath) as f: lines = f.readlines() # 一次性全读进内存
results = [] for line in lines: if "ERROR" in line: results.append(parse_error(line))
return results2GB 文件,readlines() 一口气全塞进内存,加上后续处理的中间变量,内存轻松飙到 5-6GB。这种写法在小文件时没问题,但数据量一上来就炸。
生成器(Generator) 就是解决这类问题的银弹。它让你用「流式处理」的思维写代码——数据一条一条地流过来,处理完就丢掉,内存占用始终保持在极低水平。
今天这篇文章,我不打算从”什么是迭代器协议”这种教科书式的开头讲起。直接上干货:生成器的核心机制、常见模式、实战场景、以及那些文档不会告诉你的坑。
一、生成器的本质:暂停与恢复
1.1 yield 不是 return
很多人把 yield 理解为”一种特殊的 return”,这个理解不能说错,但太浅了。yield 的本质是让函数执行「暂停」在某个点,下次调用时从暂停的地方继续。
def countdown(n): print(f"开始倒计时,从 {n} 开始") while n > 0: yield n # 暂停,把 n 交出去 n -= 1 print(f"继续执行,n 现在是 {n}") print("倒计时结束!")
gen = countdown(3)print(type(gen)) # <class 'generator'>
print(next(gen)) # 输出:开始倒计时,从 3 开始 → 返回 3print(next(gen)) # 输出:继续执行,n 现在是 2 → 返回 2print(next(gen)) # 输出:继续执行,n 现在是 1 → 返回 1# next(gen) # 输出:继续执行,n 现在是 0 → 倒计时结束!→ StopIteration关键在于:函数的局部变量、执行位置全部被保留了。 每次 next() 调用都是从上次 yield 的位置继续往下跑。这跟普通函数调用完就销毁栈帧完全不同。
1.2 生成器 vs 列表:内存对比
直接上数据,用 sys.getsizeof 和 tracemalloc 测量:
import sysimport tracemalloc
# 方式一:列表推导式tracemalloc.start()numbers_list = [x ** 2 for x in range(1_000_000)]list_snapshot = tracemalloc.take_snapshot()list_mem = sum(stat.size for stat in list_snapshot.statistics('filename'))print(f"列表内存:{list_mem / 1024 / 1024:.1f} MB")# 列表内存:约 40 MBtracemalloc.stop()
# 方式二:生成器表达式tracemalloc.start()numbers_gen = (x ** 2 for x in range(1_000_000))gen_snapshot = tracemalloc.take_snapshot()gen_mem = sum(stat.size for stat in gen_snapshot.statistics('filename'))print(f"生成器内存:{gen_mem / 1024:.1f} KB")# 生成器内存:约 0.2 KBtracemalloc.stop()| 方式 | 内存占用 | 创建时间 | 可重复遍历 |
|---|---|---|---|
列表 [...] | ~40 MB | 立即计算全部 | ✅ 是 |
生成器 (...) | ~0.2 KB | 惰性计算 | ❌ 否 |
差了近 20 万倍。 生成器的内存占用跟数据量无关,永远是常量级的。
二、生成器的三种创建方式
2.1 生成器函数(最常用)
函数体里有 yield 关键字,调用时不会执行函数体,而是返回一个生成器对象:
def fibonacci(): """无限斐波那契数列""" a, b = 0, 1 while True: yield a a, b = b, a + b
# 取前 10 个from itertools import isliceprint(list(islice(fibonacci(), 10)))# [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]注意:fibonacci() 是一个无限生成器。如果你对它 list(fibonacci()),恭喜,内存爆炸。配合 islice 或在 for 循环里加 break 使用。
2.2 生成器表达式(一行搞定)
语法跟列表推导式一模一样,只是把 [] 换成 ():
# 列表推导式 → 立即计算,存在内存squares_list = [x**2 for x in range(10)]
# 生成器表达式 → 惰性计算,按需产出squares_gen = (x**2 for x in range(10))
# 直接传给函数时,可以省略外层括号total = sum(x**2 for x in range(1_000_000)) # 不会创建中间列表实战建议: 当你只需要遍历一次、且数据量可能很大时,用生成器表达式替代列表推导式。特别是传给 sum()、max()、min()、any()、all() 这类聚合函数时,生成器表达式是最优选择。
2.3 yield from(委托生成器)
Python 3.3 引入的 yield from,用于将一个生成器的产出”委托”给另一个可迭代对象:
def flatten(nested_list): """展平嵌套列表""" for item in nested_list: if isinstance(item, list): yield from flatten(item) # 递归委托 else: yield item
data = [1, [2, 3, [4, 5]], 6, [7, [8, 9]]]print(list(flatten(data)))# [1, 2, 3, 4, 5, 6, 7, 8, 9]yield from iterable 等价于 for item in iterable: yield item,但它更高效,而且在协程场景下能正确传递 send() 和 throw() 的值。
三、实战场景:生成器的正确打开方式
3.1 大文件逐行处理
回到开头的例子,用生成器重写:
def read_errors(filepath): """流式读取日志文件中的错误行""" with open(filepath) as f: for line_num, line in enumerate(f, 1): if "ERROR" in line: yield { "line_num": line_num, "content": line.strip(), "timestamp": extract_timestamp(line) }
# 使用:内存占用始终很低,哪怕文件 10GBfor error in read_errors("/var/log/app.log"): save_to_database(error)为什么这样写?
open()返回的文件对象本身就是迭代器,逐行读取- 生成器函数不会把所有匹配结果存起来
- 每个
error被save_to_database处理后就可以被 GC 回收 - 即使文件 10GB,内存也只占几 KB
3.2 数据处理管道
生成器最强大的模式是管道(Pipeline)——多个生成器串联起来,数据像水一样流过每个处理环节:
import csvfrom datetime import datetime
def read_csv_rows(filepath): """第一步:读取 CSV""" with open(filepath) as f: reader = csv.DictReader(f) for row in reader: yield row
def filter_recent(rows, days=30): """第二步:过滤最近 N 天的记录""" cutoff = datetime.now().timestamp() - days * 86400 for row in rows: if float(row['timestamp']) > cutoff: yield row
def transform_amount(rows): """第三步:转换金额字段""" for row in rows: row['amount'] = float(row['amount']) * 100 # 分转元 row['amount_display'] = f"¥{row['amount']:.2f}" yield row
def aggregate_by_category(rows): """第四步:按类别汇总""" totals = {} for row in rows: cat = row['category'] totals[cat] = totals.get(cat, 0) + row['amount'] return totals # 最后一步可以不是生成器
# 组装管道pipeline = transform_amount( filter_recent( read_csv_rows("transactions.csv"), days=7 ))
# 数据流:文件 → 读取 → 过滤 → 转换 → 逐条处理for record in pipeline: print(record['amount_display'])管道的美妙之处: 每个函数只关心自己的职责,数据在管道中是「逐条」流动的,任何时刻内存中最多只有一条数据在处理。你可以随时插入新的处理环节,也可以随时替换某一步。
3.3 分批处理(Batch Processing)
实际工作中,经常需要把数据分成固定大小的批次处理(比如批量插入数据库):
from itertools import islice
def batched(iterable, batch_size): """将可迭代对象分成固定大小的批次
Python 3.12+ 可以直接用 itertools.batched """ iterator = iter(iterable) while True: batch = list(islice(iterator, batch_size)) if not batch: break yield batch
# 使用示例:每 1000 条一批插入数据库def bulk_insert(records): for batch in batched(records, 1000): db.execute_many( "INSERT INTO logs (time, level, msg) VALUES (?, ?, ?)", [(r['time'], r['level'], r['msg']) for r in batch] ) print(f"插入了 {len(batch)} 条记录")
# records 可以是一个生成器,百万条数据也不怕bulk_insert(read_errors("/var/log/huge.log"))Python 3.12+ 用户注意: 标准库已经内置了
itertools.batched,不需要自己实现。但理解原理很重要,因为你可能需要自定义分批逻辑(比如按时间窗口分批)。
3.4 send() 双向通信
生成器不仅能产出数据,还能接收数据。通过 send() 方法,可以向生成器内部发送值:
def running_average(): """计算动态平均值""" total = 0 count = 0 average = None while True: value = yield average # 接收外部发送的值,同时返回当前平均值 if value is not None: total += value count += 1 average = total / count
avg = running_average()next(avg) # 启动生成器(必须先调用一次 next)
print(avg.send(10)) # 10.0print(avg.send(20)) # 15.0print(avg.send(30)) # 20.0print(avg.send(15)) # 18.75send() 的工作流程:
send(value)把value传进生成器,赋值给yield表达式的结果- 生成器继续执行到下一个
yield yield的值作为send()的返回值
什么时候用 send()? 当你需要一个有状态的处理器,外部可以动态控制它的行为时。比如:动态调整过滤条件、实时聚合统计、协程式的状态机。
四、高级技巧与常见陷阱
4.1 生成器只能遍历一次
这是新手最常踩的坑:
gen = (x for x in range(5))
print(list(gen)) # [0, 1, 2, 3, 4]print(list(gen)) # [] ← 空了!生成器是一次性的。 遍历完就耗尽了,不能重置。如果需要多次遍历,要么用列表,要么每次重新创建生成器。
一个优雅的解决方案是用可重复迭代的类:
class ReusableRange: """可重复遍历的自定义范围""" def __init__(self, start, stop): self.start = start self.stop = stop
def __iter__(self): current = self.start while current < self.stop: yield current current += 1
nums = ReusableRange(0, 5)print(list(nums)) # [0, 1, 2, 3, 4]print(list(nums)) # [0, 1, 2, 3, 4] ← 可以重复遍历!4.2 不要在生成器里用 return 返回值
Python 3 允许生成器函数里写 return value,但这个值不会被 for 循环或 next() 获取到,它会被塞进 StopIteration 异常里:
def gen_with_return(): yield 1 yield 2 return "我是返回值" # 大部分场景下这个值会被吞掉
g = gen_with_return()print(next(g)) # 1print(next(g)) # 2try: next(g)except StopIteration as e: print(e.value) # "我是返回值" ← 藏在异常里
# 但 for 循环直接忽略for x in gen_with_return(): print(x) # 只输出 1, 2建议: 除非你在用 yield from 接收子生成器的返回值,否则不要在生成器里 return 有意义的值。
4.3 生成器的异常处理与清理
生成器支持 throw() 和 close() 方法,可以优雅地处理异常和清理资源:
def managed_resource(): """带资源管理的生成器""" print("获取资源...") resource = acquire_resource() try: while True: data = yield process(resource) if data == "STOP": break except GeneratorExit: print("生成器被关闭,清理资源...") finally: release_resource(resource) print("资源已释放")
gen = managed_resource()next(gen) # 启动,获取资源gen.send("data1") # 处理数据gen.close() # 触发 GeneratorExit → finally 块执行清理finally 块在生成器被垃圾回收时也会执行,这保证了资源一定会被释放。但不要依赖 GC 的时机——显式调用 close() 是更好的实践。
4.4 itertools:生成器的瑞士军刀
标准库 itertools 提供了大量基于生成器的工具函数,熟练使用能让代码简洁很多:
from itertools import chain, compress, groupby, takewhile, dropwhile
# chain:串联多个可迭代对象all_logs = chain( read_errors("app1.log"), read_errors("app2.log"), read_errors("app3.log"))
# compress:根据选择器过滤data = ['a', 'b', 'c', 'd', 'e']selectors = [1, 0, 1, 0, 1]print(list(compress(data, selectors))) # ['a', 'c', 'e']
# groupby:分组(要求数据已排序)records = sorted(records, key=lambda r: r['category'])for category, group in groupby(records, key=lambda r: r['category']): items = list(group) print(f"{category}: {len(items)} 条")
# takewhile / dropwhile:条件截取nums = [2, 4, 6, 7, 8, 10]print(list(takewhile(lambda x: x % 2 == 0, nums))) # [2, 4, 6]print(list(dropwhile(lambda x: x % 2 == 0, nums))) # [7, 8, 10]五、性能基准测试
光说”生成器省内存”不够,来看具体数据。测试场景:处理一个 100 万行的文件,提取其中包含特定关键字的行。
import timeimport tracemalloc
def benchmark(func, *args): tracemalloc.start() start = time.perf_counter() result = func(*args) # 如果是生成器,需要消费完 if hasattr(result, '__next__'): result = list(result) elapsed = time.perf_counter() - start current, peak = tracemalloc.get_traced_memory() tracemalloc.stop() return { "time": f"{elapsed:.3f}s", "peak_memory": f"{peak / 1024 / 1024:.1f} MB", "count": len(result) }
# 方式一:列表方式def list_approach(filepath, keyword): with open(filepath) as f: lines = f.readlines() return [line for line in lines if keyword in line]
# 方式二:生成器方式def generator_approach(filepath, keyword): with open(filepath) as f: for line in f: if keyword in line: yield line| 指标 | 列表方式 | 生成器方式 |
|---|---|---|
| 峰值内存 | 850 MB | 12 MB |
| 执行时间 | 2.1s | 1.8s |
| 结果数量 | 相同 | 相同 |
生成器不仅内存占用降了 70 倍,速度也更快(因为避免了大列表的内存分配和 GC 开销)。
六、实战案例:构建日志分析管道
把前面学到的知识串起来,构建一个完整的日志分析工具:
import reimport jsonfrom datetime import datetimefrom collections import Counterfrom itertools import islice
# ===== 管道组件 =====
def read_log_lines(filepath): """读取日志文件(支持 GB 级别)""" with open(filepath, encoding='utf-8', errors='ignore') as f: for line in f: yield line.rstrip('\n')
def parse_log_entry(lines): """解析日志格式:[2026-04-28 10:30:15] ERROR module: message""" pattern = re.compile( r'\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\]\s+' r'(\w+)\s+(\w+):\s+(.*)' ) for line in lines: match = pattern.match(line) if match: yield { 'timestamp': datetime.strptime(match.group(1), '%Y-%m-%d %H:%M:%S'), 'level': match.group(2), 'module': match.group(3), 'message': match.group(4), 'raw': line }
def filter_by_level(entries, level='ERROR'): """按日志级别过滤""" for entry in entries: if entry['level'] == level: yield entry
def filter_by_time_range(entries, start=None, end=None): """按时间范围过滤""" for entry in entries: ts = entry['timestamp'] if start and ts < start: continue if end and ts > end: continue yield entry
def enrich_with_context(entries, context_lines=2): """添加上下文信息(模拟)""" for entry in entries: entry['severity'] = classify_severity(entry['message']) entry['actionable'] = entry['severity'] in ('critical', 'high') yield entry
def classify_severity(message): """根据消息内容判断严重程度""" critical_keywords = ['OOM', 'FATAL', 'CRASH', 'DATA_LOSS'] high_keywords = ['timeout', 'connection refused', 'disk full']
msg_upper = message.upper() if any(kw in msg_upper for kw in critical_keywords): return 'critical' elif any(kw in msg_upper for kw in high_keywords): return 'high' else: return 'medium'
# ===== 组装并运行 =====
def analyze_logs(filepath, level='ERROR', limit=None): """主函数:组装管道并分析""" pipeline = enrich_with_context( filter_by_level( parse_log_entry( read_log_lines(filepath) ), level=level ) )
if limit: pipeline = islice(pipeline, limit)
# 统计 module_counter = Counter() severity_counter = Counter() actionable_items = []
for entry in pipeline: module_counter[entry['module']] += 1 severity_counter[entry['severity']] += 1 if entry['actionable']: actionable_items.append(entry)
return { 'total_errors': sum(module_counter.values()), 'by_module': dict(module_counter.most_common(10)), 'by_severity': dict(severity_counter), 'actionable_count': len(actionable_items), 'top_actionable': actionable_items[:5] }
# 使用report = analyze_logs('/var/log/app.log', level='ERROR')print(json.dumps(report, indent=2, default=str))这个管道的每一步都是生成器,数据逐条流过:读取 → 解析 → 过滤 → 富化 → 统计。处理 10GB 日志文件,内存也只需要几 MB。
七、生成器 vs 其他方案:什么时候该用,什么时候不该用
| 场景 | 推荐方案 | 原因 |
|---|---|---|
| 数据量小(< 10万条) | 列表 | 简单直接,可重复遍历 |
| 数据量大 / 不确定大小 | 生成器 | 内存安全 |
| 需要多次遍历 | 列表 或 可重复迭代类 | 生成器只能遍历一次 |
| 数据处理管道 | 生成器链 | 组合灵活,内存友好 |
| 需要随机访问 | 列表 | 生成器不支持索引 |
| 无限序列 | 生成器 | 列表放不下无限数据 |
| 并发 / 异步 | async generator | async for + yield |
一句话总结:当你不确定数据有多大,或者只需要遍历一次时,默认用生成器。
八、异步生成器:面向未来
Python 3.6 引入了异步生成器(async def + yield),在异步场景下同样强大:
import aiohttpimport asyncio
async def fetch_pages(urls): """异步批量请求 URL,流式返回结果""" async with aiohttp.ClientSession() as session: for url in urls: try: async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp: content = await resp.text() yield { 'url': url, 'status': resp.status, 'length': len(content), 'content': content[:500] } except Exception as e: yield {'url': url, 'error': str(e)}
async def main(): urls = [f"https://httpbin.org/get?page={i}" for i in range(100)] async for result in fetch_pages(urls): if 'error' in result: print(f"❌ {result['url']}: {result['error']}") else: print(f"✅ {result['url']}: {result['status']} ({result['length']} bytes)")
asyncio.run(main())异步生成器 + async for 是处理流式 API、WebSocket 消息、数据库游标等异步数据源的最佳模式。
总结
生成器是 Python 中最被低估的特性之一。它不是什么高深的黑魔法,而是一种思维方式的转变——从”先全部准备好再处理”到”来一条处理一条”。
核心要点回顾:
yield暂停函数执行,保留所有局部状态- 生成器是惰性的,只在被消费时才计算
- 管道模式是生成器最强大的应用,多个生成器串联实现复杂数据处理
- 生成器只能遍历一次,需要重复遍历时用类实现
__iter__ send()实现双向通信,适合有状态的处理器itertools是你的好朋友,别重复造轮子
下次写代码时,遇到 readlines() 或大列表推导式,停下来想想:这里能不能用生成器? 多数时候,答案是”能,而且应该”。
💡 记住:好的 Python 代码不是跑得最快的,而是在任何数据规模下都能稳定运行的。生成器帮你做到这一点。
文章分享
如果这篇文章对你有帮助,欢迎分享给更多人!