Python 异步编程 asyncio:协程、事件循环与 Task
什么是异步编程
在传统的同步编程中,代码按顺序逐行执行。如果一个操作需要等待(比如网络请求或文件读取),整个程序就会“阻塞”在那里,直到该操作完成。这种模型简单易懂,但在处理大量 I/O 密集型任务时效率低下。
异步编程则不同:当程序遇到一个需要等待的操作时,它不会傻等,而是可以先切换到其他任务继续执行,等那个操作完成后再回来处理结果。这样,一个线程就能同时处理成百上千个连接或任务,极大提升了程序的并发能力和响应速度。
Python 通过 asyncio 库实现了原生的异步编程支持。它的核心概念是 协程、事件循环 和 Task。本教程将带你从零开始,掌握这三大支柱。
快速上手:你的第一个异步程序
在深入理论之前,先看一个简单的例子。我们来模拟两个任务:一个等待 2 秒,一个等待 1 秒。同步版本总耗时为 3 秒,而异步版本只需 2 秒。
import asyncio
import time
# 定义一个协程:使用 async def
async def say_after(delay, what):
await asyncio.sleep(delay) # 异步等待,不阻塞
print(what)
async def main():
print(f"开始时间: {time.strftime('%X')}")
# 依次执行两个协程(这不是并发!)
await say_after(2, '你好')
await say_after(1, '世界')
print(f"结束时间: {time.strftime('%X')}")
# 运行顶层入口点
asyncio.run(main())
输出:
开始时间: 14:23:01
你好
世界
结束时间: 14:23:04
耗时 3 秒,和同步没有区别。因为我们用 await 顺序等待每个协程,第二个任务必须等第一个完成。要真正并发,你需要 Task。稍后会讲到。
协程 (Coroutine)
协程是 asyncio 的基本单元,你可以把它理解为“可以暂停和恢复的函数”。
定义协程:async def
使用 async def 定义的函数都会返回一个 协程对象,直接调用它不会执行内部代码。
async def my_coro():
return 42
result = my_coro() # 返回协程对象,不是 42
print(result) # <coroutine object my_coro at 0x7f...>
要真正运行协程,需要把它交给事件循环,或者用 asyncio.run() 这样的顶层入口。
挂起协程:await
在协程内部,使用 await 关键字可以挂起当前协程,将控制权交还给事件循环,并允许它去执行其他任务。await 后面必须跟一个可等待对象(awaitable),常见的有:
- 另一个协程
asyncio.Taskasyncio.Future
async def fetch_data():
print("开始抓取数据...")
await asyncio.sleep(2) # 模拟 I/O 等待
print("数据抓取完成")
return {'data': 100}
await asyncio.sleep(2) 让当前协程暂停 2 秒,在此期间事件循环可以自由调度其他协程。
注意:只能在
async def定义的函数内部使用await。在普通函数或全局作用域中使用会引发语法错误。
协程的几种运行方式
asyncio.run():运行一个顶层协程,通常作为主入口。await:在一个协程中等待另一个协程的结果。asyncio.create_task():将协程包装成 Task,实现后台并发(下一节详解)。
事件循环 (Event Loop)
事件循环是 asyncio 的心脏,它负责调度和执行所有异步任务。你可以把它想象成一个不断运行的循环,重复执行以下步骤:
- 检查哪些任务处于就绪状态。
- 选择一个任务执行一小段代码。
- 当任务遇到
await时,暂停它并记录等待的事件。 - 当某个事件完成(如定时器到期、网络数据到达),标记相应任务为就绪。
- 返回步骤 1。
在 Python 3.7+ 中,你几乎不需要直接操作事件循环对象,因为 asyncio.run() 会帮你创建、运行并清理循环。
# 典型的程序结构
async def main():
# 这里是你的异步逻辑
pass
if __name__ == "__main__":
asyncio.run(main())
一些底层场景下,你可能需要获取当前运行的循环:
loop = asyncio.get_running_loop()
# 但通常不需要直接操作 loop
Task:真正的并发
现在回到并发问题。前面 await 顺序调用的例子并没有并发效果。要让多个协程同时运行,就要用到 Task。
Task 是 asyncio 提供的、用于并发调度协程的高级抽象。当你用 asyncio.create_task() 将一个协程包装成 Task 时,该协程会被立刻提交到事件循环中准备运行,你可以同时保留多个 Task,事件循环会在它们之间切换。
创建 Task 并并发运行
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
# 创建 Task,它们会立即开始排程
task1 = asyncio.create_task(say_after(2, '你好'))
task2 = asyncio.create_task(say_after(1, '世界'))
print(f"开始时间: {time.strftime('%X')}")
# 等待所有 task 完成
await task1
await task2
print(f"结束时间: {time.strftime('%X')}")
asyncio.run(main())
输出:
开始时间: 14:27:00
世界
你好
结束时间: 14:27:02
这次总耗时 2 秒(最长的那个),两个协程是并发执行的。“世界”先打印是因为它只睡了 1 秒。
使用 asyncio.gather() 收集结果
如果要运行多个 Task 并汇总结果,asyncio.gather() 是一个方便的助手。它接收多个可等待对象,并发执行它们,并返回一个包含所有结果的列表。
async def fetch_data(id, delay):
await asyncio.sleep(delay)
return f"数据 {id}"
async def main():
# gather 内部会将协程自动包装成 Task 并发运行
results = await asyncio.gather(
fetch_data(1, 2),
fetch_data(2, 1),
fetch_data(3, 3)
)
print(results) # ['数据 1', '数据 2', '数据 3'],按传入顺序
asyncio.run(main())
Task 的常用操作
- 取消任务:
task.cancel()请求取消,协程内会抛出CancelledError,你需要处理以保证资源正确释放。 - 设置超时:
asyncio.wait_for(task, timeout=5)在指定秒数内等待任务完成,否则引发TimeoutError。 - 等待多个任务:
asyncio.wait(tasks, return_when=...)提供了更细粒度的控制,例如等待第一个完成(FIRST_COMPLETED)或全部完成(ALL_COMPLETED)。
错误处理与 Task
如果某个 Task 中发生了异常,该异常会在你 await 该 Task 时被抛出。使用 gather() 时,如果某个任务抛出异常,默认会立即传播该异常并等待其他任务(但可以通过 return_exceptions=True 参数让异常作为结果返回)。
async def buggy():
await asyncio.sleep(1)
raise ValueError("出错啦")
async def main():
task = asyncio.create_task(buggy())
try:
await task
except ValueError as e:
print(f"捕获到异常: {e}")
实战示例:异步 HTTP 请求
让我们用一个更贴近实际的例子来巩固:模拟并发下载多个网页。我们使用 aiohttp(需要 pip install aiohttp)作为异步 HTTP 客户端。
import asyncio
import aiohttp
import time
async def download_site(url, session):
async with session.get(url) as response:
# 读取响应内容但不处理(仅演示)
await response.text()
return f"{url}: 状态 {response.status}"
async def main():
urls = [
"https://example.com",
"https://python.org",
"https://google.com",
"https://github.com",
"https://stackoverflow.com"
] * 2 # 10 个请求
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [download_site(url, session) for url in urls]
results = await asyncio.gather(*tasks)
print(f"完成 {len(urls)} 个请求,耗时: {time.time() - start:.2f} 秒")
# 你可以进一步处理 results
if __name__ == "__main__":
asyncio.run(main())
这个版本利用了 aiohttp 的异步特性,10 个请求并发执行,总耗时只取决于最慢的那个请求,而非所有请求的时间之和。
重要注意事项与避坑指南
-
不要在协程中使用阻塞调用
如果你在协程内调用time.sleep()、requests.get()等同步阻塞库,整个事件循环都会被卡住,失去并发优势。务必使用asyncio.sleep()、aiohttp等异步替代品。 -
await的顺序 ≠ 并发
一个常见的误区是以为只要把协程都await一遍就能并发。记住:await是顺序等待;create_task+await或gather才能并行。 -
异常与取消处理
编写长期运行的协程时,注意捕获asyncio.CancelledError来执行清理逻辑(例如关闭数据库连接)。 -
仅 I/O 密集型受益
异步编程适合 I/O 密集型任务(网络、文件读写等)。如果是 CPU 密集型任务(如图像处理),应该使用多进程或线程池,并在 asyncio 中使用loop.run_in_executor()。
总结
你已经掌握了 Python 异步编程的核心:
- 协程:
async def定义,await挂起。 - 事件循环:调度器,由
asyncio.run()自动管理。 - Task:并发的最小单元,通过
create_task创建,配合gather等收集结果。
asyncio 生态丰富,许多现代 Python 库都提供了异步接口(例如 aiohttp、asyncpg、httpx 等)。理解协程、事件循环和 Task 这三者后,你就能充分利用异步 I/O 的能力,编写高效、可扩展的 Python 程序。
下一步建议:
- 阅读官方文档中关于
asyncio.Queue的内容,实现生产者‑消费者模式。 - 学习
asyncio.Semaphore来限制并发数。 - 探索第三方库
trio或anyio,它们提供了更结构化的并发模型。