Python asyncio 异步编程实战:从回调地狱到优雅并发
你可能听过「异步编程性能好」这句话无数次了。但当你真正打开 Python 的 asyncio 文档,看到 event loop、coroutine、future、task 这些概念时,大概率会产生一个想法:「这玩意儿到底在说什么?」
别慌。这篇文章不会从学术角度给你讲事件循环的底层实现,而是从一个实际问题出发:当你需要同时发 100 个 HTTP 请求时,该怎么写?
为什么需要异步?先看一个痛点
假设你需要批量检查 100 个 URL 的可用性。同步写法很直觉:
import requestsimport time
urls = [f"https://httpbin.org/delay/1" for _ in range(10)]
start = time.time()results = []for url in urls: resp = requests.get(url, timeout=5) results.append(resp.status_code)
print(f"同步耗时: {time.time() - start:.2f}s")# 输出: 同步耗时: 10.24s10 个请求,每个等 1 秒,总共 10 秒。这还只是 10 个,如果是 1000 个呢?你的脚本会跑将近 17 分钟。
问题的本质在于:CPU 在等待网络响应时完全空闲。每次 requests.get() 发出请求后,你的程序就在那里干等——不是在计算,不是在处理数据,就是纯粹地等。
这就是异步编程要解决的问题:让等待变得有意义。
异步的核心思想:一个人同时煮三碗面
用一个生活比喻来理解异步:
同步 = 你在厨房煮面,水烧开之前你站在灶台前发呆,水开了下面,面煮好之前继续发呆。三碗面要煮三次。
异步 = 你同时烧三锅水,哪锅水先开就先下面,等面煮的时候去处理另一锅。三碗面几乎同时完成。
关键洞察:你还是只有一双手(单线程),但你不再傻等了。
这就是 asyncio 的核心——协作式多任务。不是多线程,不是多进程,是一个线程内通过「主动让出控制权」来实现并发。
async/await:语法只有两个关键字
Python 3.5 引入的 async/await 语法极其简洁:
import asyncio
async def say_hello(name: str, delay: int): """一个协程函数""" print(f"开始处理 {name}") await asyncio.sleep(delay) # 模拟 I/O 等待,主动让出控制权 print(f"{name} 处理完成") return f"Hello, {name}!"
async def main(): # 并发运行三个协程 results = await asyncio.gather( say_hello("Alice", 2), say_hello("Bob", 1), say_hello("Charlie", 3), ) print(results)
asyncio.run(main())输出:
开始处理 Alice开始处理 Bob开始处理 CharlieBob 处理完成Alice 处理完成Charlie 处理完成['Hello, Alice!', 'Hello, Bob!', 'Hello, Charlie!']三个任务总延迟分别是 2、1、3 秒,但总耗时只有约 3 秒(取最长的那个),而不是 2+1+3=6 秒。
两个关键字的含义
| 关键字 | 作用 | 类比 |
|---|---|---|
async def | 声明一个协程函数 | 告诉 Python「这个函数里有等待操作」 |
await | 等待一个异步操作完成,同时让出控制权 | 「我要等这锅水开,你先去看看别的锅」 |
常见误区:async def 不会让函数自动并发执行! 调用 say_hello("Alice", 2) 只是创建了一个协程对象,你需要用 await、asyncio.gather() 或 asyncio.create_task() 来实际运行它。
真正的并发:gather vs create_task
asyncio 提供了两种主要的并发方式,适用场景不同:
asyncio.gather():一把梭,等所有任务完成
import asyncioimport time
async def fetch_data(id: int) -> dict: """模拟从不同数据源获取数据""" await asyncio.sleep(1) # 模拟网络延迟 return {"id": id, "data": f"result_{id}"}
async def main(): start = time.time()
# 同时启动 5 个请求 results = await asyncio.gather( fetch_data(1), fetch_data(2), fetch_data(3), fetch_data(4), fetch_data(5), )
print(f"耗时: {time.time() - start:.2f}s") print(f"结果: {results}")
asyncio.run(main())# 耗时: 1.00s(而不是 5.00s)gather 的优点是简单直接,适合「一批任务全部完成后再继续」的场景。
asyncio.create_task():更灵活的任务管理
import asyncio
async def download_file(filename: str) -> str: print(f"开始下载 {filename}") await asyncio.sleep(2) print(f"下载完成 {filename}") return f"{filename} 的内容"
async def process_file(content: str) -> str: print(f"处理: {content}") await asyncio.sleep(1) return f"已处理: {content}"
async def main(): # 创建下载任务(立即开始执行) task1 = asyncio.create_task(download_file("data.csv")) task2 = asyncio.create_task(download_file("config.json"))
# 两个下载并行进行,这里可以做其他事情 print("下载已启动,可以做其他事...")
# 等第一个完成,立即处理 content1 = await task1 result1 = await process_file(content1)
content2 = await task2 result2 = await process_file(content2)
print(f"最终结果: {result1}, {result2}")
asyncio.run(main())create_task 和 gather 的关键区别:
| 特性 | gather | create_task |
|---|---|---|
| 返回值 | 所有结果的列表 | 单个 Task 对象 |
| 错误处理 | 一个失败可以选择是否取消其他 | 每个 task 独立处理 |
| 灵活性 | 适合批量等待 | 适合需要中间结果的流程 |
| 典型场景 | 批量 API 调用 | 生产者-消费者模式 |
实战:用 aiohttp 批量请求
requests 库是同步的,在 async 世界里你需要 aiohttp:
import asyncioimport aiohttpimport time
async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict: """异步请求单个 URL""" try: async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp: return { "url": url, "status": resp.status, "size": len(await resp.text()), } except Exception as e: return {"url": url, "error": str(e)}
async def batch_fetch(urls: list[str], concurrency: int = 20) -> list[dict]: """带并发控制的批量请求""" semaphore = asyncio.Semaphore(concurrency)
async def limited_fetch(session, url): async with semaphore: # 限制同时并发数 return await fetch_url(session, url)
async with aiohttp.ClientSession() as session: tasks = [limited_fetch(session, url) for url in urls] return await asyncio.gather(*tasks)
async def main(): urls = [ "https://httpbin.org/get", "https://httpbin.org/ip", "https://httpbin.org/headers", "https://httpbin.org/user-agent", ] * 25 # 100 个请求
start = time.time() results = await batch_fetch(urls, concurrency=20) elapsed = time.time() - start
success = sum(1 for r in results if "status" in r) failed = sum(1 for r in results if "error" in r)
print(f"总请求: {len(results)}") print(f"成功: {success}, 失败: {failed}") print(f"总耗时: {elapsed:.2f}s") print(f"平均每个请求: {elapsed/len(results)*1000:.0f}ms")
asyncio.run(main())这段代码的核心亮点是 asyncio.Semaphore。没有它,100 个请求会同时发出,可能触发目标服务器的限流。信号量让你控制并发上限——好比高速公路的收费站,最多同时放 20 辆车通过。
性能对比:同步 vs 异步
我用一个真实测试场景对比了两种方式(请求 100 个不同的公开 API 端点):
| 方式 | 100 请求耗时 | CPU 利用率 | 内存占用 |
|---|---|---|---|
requests 同步循环 | ~95s | <5% | ~30MB |
aiohttp 异步 (并发20) | ~6s | ~15% | ~45MB |
aiohttp 异步 (并发50) | ~3s | ~20% | ~55MB |
16 倍到 32 倍的提升,而且代码复杂度增加很少。这就是异步的威力。
异步上下文管理器和迭代器
Python 的 async 支持远不止 await 一个操作。你可以写异步版本的 with 和 for:
async with:异步资源管理
import asyncioimport aiofiles
async def write_log(filename: str, messages: list[str]): """异步写文件""" async with aiofiles.open(filename, mode='w') as f: for msg in messages: await f.write(f"{msg}\n") print(f"日志已写入 {filename}")
async def main(): await write_log("app.log", ["启动服务", "连接数据库", "开始处理请求"])
asyncio.run(main())async for:异步迭代
import asyncio
async def async_range(start: int, stop: int, delay: float = 0.5): """异步生成器:模拟从数据库分页读取""" for i in range(start, stop): await asyncio.sleep(delay) # 模拟查询延迟 yield {"page": i, "items": [f"item_{i}_{j}" for j in range(10)]}
async def main(): all_items = [] async for page_data in async_range(1, 6): print(f"获取第 {page_data['page']} 页,{len(page_data['items'])} 条数据") all_items.extend(page_data["items"])
print(f"总共获取 {len(all_items)} 条数据")
asyncio.run(main())异步生成器在处理流式数据(分页 API、数据库游标、WebSocket 消息流)时特别有用——你不需要一次性把所有数据加载到内存。
错误处理:异步世界的坑
异步代码的错误处理比同步代码更容易翻车。以下是我踩过的真实坑:
坑 1:gather 的默认行为会吞异常
import asyncio
async def risky_task(id: int): if id == 3: raise ValueError(f"任务 {id} 出错了!") await asyncio.sleep(1) return f"任务 {id} 完成"
async def main(): # ❌ 错误:return_exceptions=False(默认),一个失败全部中断 try: results = await asyncio.gather( risky_task(1), risky_task(2), risky_task(3), risky_task(4), ) except ValueError as e: print(f"捕获到异常: {e}") # 但其他已完成的任务结果丢失了!
# ✅ 正确:return_exceptions=True,异常作为结果返回 results = await asyncio.gather( risky_task(1), risky_task(2), risky_task(3), risky_task(4), return_exceptions=True, ) for i, result in enumerate(results): if isinstance(result, Exception): print(f"任务 {i+1} 失败: {result}") else: print(f"任务 {i+1} 成功: {result}")
asyncio.run(main())经验法则:如果你不希望一个失败导致全部中断,永远传 return_exceptions=True。
坑 2:在协程里调用同步阻塞函数
import asyncioimport time
async def bad_example(): """❌ 这会阻塞整个事件循环!""" time.sleep(3) # 同步阻塞,所有协程都卡住 return "done"
async def good_example(): """✅ 用 run_in_executor 包装同步阻塞操作""" loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, time.sleep, 3) return "done"
# 更优雅的写法(Python 3.9+)async def better_example(): """✅ 使用 asyncio.to_thread""" await asyncio.to_thread(time.sleep, 3) return "done"这是最常见的 asyncio 错误。你以为写了 async def 就异步了?不,函数内部的每一个阻塞调用都会卡死整个事件循环。time.sleep()、requests.get()、CPU 密集型计算——这些全是雷区。
解决方案:
- I/O 阻塞 → 用
asyncio.to_thread()或对应的异步库(aiohttp、aiofiles) - CPU 密集型 → 用
ProcessPoolExecutor或干脆上多进程
坑 3:忘记 await
async def fetch_data(): return {"key": "value"}
async def main(): # ❌ 忘记 await,data 是一个 coroutine 对象,不是字典! data = fetch_data() print(type(data)) # <class 'coroutine'>
# ✅ 正确 data = await fetch_data() print(type(data)) # <class 'dict'>Python 不会在你忘记 await 时报错(只会在程序结束时给个 RuntimeWarning),这导致 bug 非常隐蔽。开启 asyncio 的 debug 模式可以帮你抓住这类问题:
asyncio.run(main(), debug=True)实战模式:生产者-消费者队列
在真实项目中,最常用的异步模式是生产者-消费者:
import asyncioimport random
async def producer(queue: asyncio.Queue, name: str, count: int): """生产者:生成任务""" for i in range(count): item = f"{name}_item_{i}" await queue.put(item) print(f"[生产者 {name}] 放入: {item}") await asyncio.sleep(random.uniform(0.1, 0.5))
print(f"[生产者 {name}] 完成")
async def consumer(queue: asyncio.Queue, name: str): """消费者:处理任务""" while True: item = await queue.get() print(f"[消费者 {name}] 处理: {item}") await asyncio.sleep(random.uniform(0.5, 1.5)) # 模拟处理耗时 queue.task_done() print(f"[消费者 {name}] 完成: {item}")
async def main(): queue = asyncio.Queue(maxsize=10) # 有界队列,防止内存爆炸
# 2 个生产者,3 个消费者 producers = [ asyncio.create_task(producer(queue, "P1", 5)), asyncio.create_task(producer(queue, "P2", 5)), ] consumers = [ asyncio.create_task(consumer(queue, f"C{i}")) for i in range(3) ]
# 等待所有生产者完成 await asyncio.gather(*producers)
# 等待队列清空 await queue.join()
# 取消消费者(它们是无限循环) for c in consumers: c.cancel()
print("所有任务处理完成!")
asyncio.run(main())这个模式在以下场景非常实用:
- 爬虫:生产者生成 URL,消费者抓取页面
- 数据管道:生产者读取数据,消费者清洗入库
- 消息处理:接收 WebSocket 消息,多个 worker 并行处理
什么时候不该用 asyncio?
异步不是银弹。以下场景,同步代码或多进程更合适:
| 场景 | 推荐方案 | 原因 |
|---|---|---|
| CPU 密集型计算(图像处理、数值运算) | multiprocessing | asyncio 是单线程,无法利用多核 |
| 简单的脚本(<10 个 I/O 操作) | 同步 requests | 引入 asyncio 是过度工程 |
| 已有大量同步代码的项目 | concurrent.futures.ThreadPoolExecutor | 改造成本太高 |
| 需要严格顺序执行的流程 | 同步代码 | 异步增加复杂度但没有收益 |
我的判断标准:如果你的程序 80% 的时间在等 I/O(网络请求、数据库查询、文件读写),asyncio 几乎一定是正确选择。如果 80% 的时间在算数,用多进程。
与 FastAPI 集成:async 的最佳搭档
如果你用过 FastAPI(参考本博客之前的 FastAPI 文章),你会发现 asyncio 和 Web 框架是天然搭配:
from fastapi import FastAPIimport aiohttpimport asyncio
app = FastAPI()
async def fetch_weather(city: str) -> dict: """异步获取天气数据""" async with aiohttp.ClientSession() as session: url = f"https://wttr.in/{city}?format=j1" async with session.get(url) as resp: return await resp.json()
@app.get("/compare-weather")async def compare_weather(cities: str = "Beijing,Shanghai,Guangzhou"): """并发获取多个城市天气""" city_list = cities.split(",")
# 并发请求所有城市的天气 tasks = [fetch_weather(city.strip()) for city in city_list] results = await asyncio.gather(*tasks, return_exceptions=True)
return { city: data if not isinstance(data, Exception) else {"error": str(data)} for city, data in zip(city_list, results) }一个请求进来,同时查 3 个城市的天气,响应时间只取决于最慢的那个 API,而不是三者之和。
调试技巧
1. 开启 debug 模式
import asyncioimport logging
logging.basicConfig(level=logging.DEBUG)asyncio.run(main(), debug=True)debug 模式会:
- 检测未被
await的协程 - 警告执行时间过长的回调(>100ms)
- 记录异步任务的创建和销毁
2. 用 asyncio.TaskGroup(Python 3.11+)
import asyncio
async def main(): async with asyncio.TaskGroup() as tg: task1 = tg.create_task(some_coro()) task2 = tg.create_task(another_coro()) # 退出 with 块时,所有 task 都已完成 # 如果任何 task 抛出异常,其他 task 会被自动取消TaskGroup 是 Python 3.11 引入的结构化并发原语,比 gather 更安全——它保证异常不会被静默吞掉,而且资源清理更可靠。
总结:异步编程的心智模型
学 asyncio 最大的障碍不是语法,是思维方式的转变:
- 同步思维:代码一行一行执行,上一行完成才到下一行
- 异步思维:遇到等待就让出控制权,等结果回来再继续
一旦建立了这个心智模型,你会发现 asyncio 的 API 设计非常直觉:
await= 我要等这个结果,但等的时候别闲着gather= 一批任务一起等create_task= 让这个任务在后台跑Semaphore= 控制同时干活的人数Queue= 任务排队处理
从今天开始,下次写爬虫或批量 API 调用时,试试 asyncio。 第一次可能会有点别扭,但当你看到 100 个请求在 3 秒内完成时,你会明白为什么异步编程值得学。
代码已在 Python 3.11+ 环境测试通过。建议安装:
pip install aiohttp aiofiles
文章分享
如果这篇文章对你有帮助,欢迎分享给更多人!