Python 异步编程 asyncio:协程、事件循环与 Task

FreeGuideOnline 最新 2026-06-16

什么是异步编程

在传统的同步编程中,代码按顺序逐行执行。如果一个操作需要等待(比如网络请求或文件读取),整个程序就会“阻塞”在那里,直到该操作完成。这种模型简单易懂,但在处理大量 I/O 密集型任务时效率低下。

异步编程则不同:当程序遇到一个需要等待的操作时,它不会傻等,而是可以先切换到其他任务继续执行,等那个操作完成后再回来处理结果。这样,一个线程就能同时处理成百上千个连接或任务,极大提升了程序的并发能力和响应速度。

Python 通过 asyncio 库实现了原生的异步编程支持。它的核心概念是 协程事件循环Task。本教程将带你从零开始,掌握这三大支柱。

快速上手:你的第一个异步程序

在深入理论之前,先看一个简单的例子。我们来模拟两个任务:一个等待 2 秒,一个等待 1 秒。同步版本总耗时为 3 秒,而异步版本只需 2 秒。

import asyncio
import time

# 定义一个协程:使用 async def
async def say_after(delay, what):
    await asyncio.sleep(delay)  # 异步等待,不阻塞
    print(what)

async def main():
    print(f"开始时间: {time.strftime('%X')}")
    
    # 依次执行两个协程(这不是并发!)
    await say_after(2, '你好')
    await say_after(1, '世界')
    
    print(f"结束时间: {time.strftime('%X')}")

# 运行顶层入口点
asyncio.run(main())

输出:

开始时间: 14:23:01
你好
世界
结束时间: 14:23:04

耗时 3 秒,和同步没有区别。因为我们用 await 顺序等待每个协程,第二个任务必须等第一个完成。要真正并发,你需要 Task。稍后会讲到。

协程 (Coroutine)

协程是 asyncio 的基本单元,你可以把它理解为“可以暂停和恢复的函数”。

定义协程:async def

使用 async def 定义的函数都会返回一个 协程对象,直接调用它不会执行内部代码。

async def my_coro():
    return 42

result = my_coro()   # 返回协程对象,不是 42
print(result)        # <coroutine object my_coro at 0x7f...>

要真正运行协程,需要把它交给事件循环,或者用 asyncio.run() 这样的顶层入口。

挂起协程:await

在协程内部,使用 await 关键字可以挂起当前协程,将控制权交还给事件循环,并允许它去执行其他任务。await 后面必须跟一个可等待对象(awaitable),常见的有:

  • 另一个协程
  • asyncio.Task
  • asyncio.Future
async def fetch_data():
    print("开始抓取数据...")
    await asyncio.sleep(2)   # 模拟 I/O 等待
    print("数据抓取完成")
    return {'data': 100}

await asyncio.sleep(2) 让当前协程暂停 2 秒,在此期间事件循环可以自由调度其他协程。

注意:只能在 async def 定义的函数内部使用 await。在普通函数或全局作用域中使用会引发语法错误。

协程的几种运行方式

  1. asyncio.run():运行一个顶层协程,通常作为主入口。
  2. await:在一个协程中等待另一个协程的结果。
  3. asyncio.create_task():将协程包装成 Task,实现后台并发(下一节详解)。

事件循环 (Event Loop)

事件循环是 asyncio 的心脏,它负责调度和执行所有异步任务。你可以把它想象成一个不断运行的循环,重复执行以下步骤:

  1. 检查哪些任务处于就绪状态。
  2. 选择一个任务执行一小段代码。
  3. 当任务遇到 await 时,暂停它并记录等待的事件。
  4. 当某个事件完成(如定时器到期、网络数据到达),标记相应任务为就绪。
  5. 返回步骤 1。

在 Python 3.7+ 中,你几乎不需要直接操作事件循环对象,因为 asyncio.run() 会帮你创建、运行并清理循环。

# 典型的程序结构
async def main():
    # 这里是你的异步逻辑
    pass

if __name__ == "__main__":
    asyncio.run(main())

一些底层场景下,你可能需要获取当前运行的循环:

loop = asyncio.get_running_loop()
# 但通常不需要直接操作 loop

Task:真正的并发

现在回到并发问题。前面 await 顺序调用的例子并没有并发效果。要让多个协程同时运行,就要用到 Task

Task 是 asyncio 提供的、用于并发调度协程的高级抽象。当你用 asyncio.create_task() 将一个协程包装成 Task 时,该协程会被立刻提交到事件循环中准备运行,你可以同时保留多个 Task,事件循环会在它们之间切换。

创建 Task 并并发运行

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    # 创建 Task,它们会立即开始排程
    task1 = asyncio.create_task(say_after(2, '你好'))
    task2 = asyncio.create_task(say_after(1, '世界'))
    
    print(f"开始时间: {time.strftime('%X')}")
    
    # 等待所有 task 完成
    await task1
    await task2
    
    print(f"结束时间: {time.strftime('%X')}")

asyncio.run(main())

输出:

开始时间: 14:27:00
世界
你好
结束时间: 14:27:02

这次总耗时 2 秒(最长的那个),两个协程是并发执行的。“世界”先打印是因为它只睡了 1 秒。

使用 asyncio.gather() 收集结果

如果要运行多个 Task 并汇总结果,asyncio.gather() 是一个方便的助手。它接收多个可等待对象,并发执行它们,并返回一个包含所有结果的列表。

async def fetch_data(id, delay):
    await asyncio.sleep(delay)
    return f"数据 {id}"

async def main():
    # gather 内部会将协程自动包装成 Task 并发运行
    results = await asyncio.gather(
        fetch_data(1, 2),
        fetch_data(2, 1),
        fetch_data(3, 3)
    )
    print(results)  # ['数据 1', '数据 2', '数据 3'],按传入顺序

asyncio.run(main())

Task 的常用操作

  • 取消任务task.cancel() 请求取消,协程内会抛出 CancelledError,你需要处理以保证资源正确释放。
  • 设置超时asyncio.wait_for(task, timeout=5) 在指定秒数内等待任务完成,否则引发 TimeoutError
  • 等待多个任务asyncio.wait(tasks, return_when=...) 提供了更细粒度的控制,例如等待第一个完成(FIRST_COMPLETED)或全部完成(ALL_COMPLETED)。

错误处理与 Task

如果某个 Task 中发生了异常,该异常会在你 await 该 Task 时被抛出。使用 gather() 时,如果某个任务抛出异常,默认会立即传播该异常并等待其他任务(但可以通过 return_exceptions=True 参数让异常作为结果返回)。

async def buggy():
    await asyncio.sleep(1)
    raise ValueError("出错啦")

async def main():
    task = asyncio.create_task(buggy())
    try:
        await task
    except ValueError as e:
        print(f"捕获到异常: {e}")

实战示例:异步 HTTP 请求

让我们用一个更贴近实际的例子来巩固:模拟并发下载多个网页。我们使用 aiohttp(需要 pip install aiohttp)作为异步 HTTP 客户端。

import asyncio
import aiohttp
import time

async def download_site(url, session):
    async with session.get(url) as response:
        # 读取响应内容但不处理(仅演示)
        await response.text()
        return f"{url}: 状态 {response.status}"

async def main():
    urls = [
        "https://example.com",
        "https://python.org",
        "https://google.com",
        "https://github.com",
        "https://stackoverflow.com"
    ] * 2  # 10 个请求

    start = time.time()
    async with aiohttp.ClientSession() as session:
        tasks = [download_site(url, session) for url in urls]
        results = await asyncio.gather(*tasks)
    print(f"完成 {len(urls)} 个请求,耗时: {time.time() - start:.2f} 秒")
    # 你可以进一步处理 results

if __name__ == "__main__":
    asyncio.run(main())

这个版本利用了 aiohttp 的异步特性,10 个请求并发执行,总耗时只取决于最慢的那个请求,而非所有请求的时间之和。

重要注意事项与避坑指南

  1. 不要在协程中使用阻塞调用
    如果你在协程内调用 time.sleep()requests.get() 等同步阻塞库,整个事件循环都会被卡住,失去并发优势。务必使用 asyncio.sleep()aiohttp 等异步替代品。

  2. await 的顺序 ≠ 并发
    一个常见的误区是以为只要把协程都 await 一遍就能并发。记住:await 是顺序等待;create_task + awaitgather 才能并行。

  3. 异常与取消处理
    编写长期运行的协程时,注意捕获 asyncio.CancelledError 来执行清理逻辑(例如关闭数据库连接)。

  4. 仅 I/O 密集型受益
    异步编程适合 I/O 密集型任务(网络、文件读写等)。如果是 CPU 密集型任务(如图像处理),应该使用多进程或线程池,并在 asyncio 中使用 loop.run_in_executor()

总结

你已经掌握了 Python 异步编程的核心:

  • 协程async def 定义,await 挂起。
  • 事件循环:调度器,由 asyncio.run() 自动管理。
  • Task:并发的最小单元,通过 create_task 创建,配合 gather 等收集结果。

asyncio 生态丰富,许多现代 Python 库都提供了异步接口(例如 aiohttpasyncpghttpx 等)。理解协程、事件循环和 Task 这三者后,你就能充分利用异步 I/O 的能力,编写高效、可扩展的 Python 程序。

下一步建议:

  • 阅读官方文档中关于 asyncio.Queue 的内容,实现生产者‑消费者模式。
  • 学习 asyncio.Semaphore 来限制并发数。
  • 探索第三方库 trioanyio,它们提供了更结构化的并发模型。