Python 异步编程 允许程序在等待 I/O (输入/输出) 操作完成时执行其他任务,从而提高程序的并发性和响应能力。它通过在单线程中切换任务,避免了传统同步阻塞 I/O 模型中因等待外部操作(如网络请求、文件读写、数据库查询)而导致的性能瓶颈。Python 3.5 引入的 asyncawait 关键字为异步编程提供了原生的语言支持,并通过标准库 asyncio 提供了事件循环、协程、任务和传输等核心组件,极大地推动了 Python 在构建高性能网络服务和并发应用方面的发展。

核心思想:

  • 单线程并发:通过任务切换实现并发,而非真正并行。
  • 非阻塞 I/O:在等待 I/O 完成时,CPU 不空闲,转而执行其他准备就绪的任务。
  • async/await:定义协程和暂停/恢复执行的语法糖。
  • asyncio 事件循环:调度和执行协程的核心组件。
  • 协程 (Coroutine):异步函数 (async def),是实现并发的基本单元。
  • 任务 (Task):对协程的封装,由事件循环调度执行。
  • 适用于 I/O 密集型任务:如 Web 服务、数据库访问、网络爬虫等。

一、为什么需要异步编程?理解传统并发模型的局限性

在深入异步编程之前,首先需要理解它解决了什么问题以及它与传统并发模型的区别。

1.1 传统同步编程的局限性 (阻塞 I/O)

传统的同步编程模型中,当程序执行一个 I/O 操作(例如,向一个远程服务器发送 HTTP 请求,或者读取一个大文件)时,当前线程会暂停执行,等待 I/O 操作完成并返回结果。在此期间,CPU 资源会因为等待而处于空闲状态,无法执行其他任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import time
import requests

def fetch_sync(url):
print(f"开始同步抓取: {url}")
response = requests.get(url) # 阻塞调用
time.sleep(1) # 模拟处理时间
print(f"完成同步抓取: {url}, 状态码: {response.status_code}")
return response.status_code

def main_sync():
urls = ["http://example.com/1", "http://example.com/2", "http://example.com/3"]
start_time = time.time()
for url in urls:
fetch_sync(url)
end_time = time.time()
print(f"同步执行总耗时: {end_time - start_time:.2f} 秒")

# 假设 example.com 响应很快,但由于请求是串行执行,
# 总耗时约为 3 * (请求时间 + 1秒模拟处理) ≈ 3 * (0.1 + 1) = 3.3 秒
# 实际上,fetch_sync 中的 requests.get 是网络请求,会阻塞当前线程,等待响应。

上述代码中,每次 requests.get() 调用都会阻塞,直到服务器响应。如果有 N 个请求,总时间大致是 N 个请求耗时之和。当 N 很大时,效率非常低下。

1.2 Python 中的并发与并行

在 Python 中,由于全局解释器锁 (Global Interpreter Lock, GIL) 的存在,单个进程的 CPython 解释器在任何时刻只能执行一条字节码指令。这意味着即使在多线程环境下,Python 代码也无法实现真正的 CPU 并行性。

  • 进程 (Multiprocessing):通过创建多个进程来绕过 GIL,实现真正的并行计算。适用于 CPU 密集型任务。
  • 线程 (Multithreading):在同一个进程中创建多个线程。由于 GIL,无法并行执行 Python 字节码。但对于 I/O 密集型任务,当一个线程因 I/O 操作而阻塞时(例如等待网络数据),GIL 会被释放,允许其他线程运行。因此,多线程在 I/O 密集型任务中仍能提升并发度。
  • 异步编程 (Asynchronous Programming):在单个线程中,通过协程 (async/await) 和事件循环来管理和调度多个 I/O 任务。当一个 I/O 操作启动后,协程会主动交出控制权给事件循环,事件循环则去执行其他已准备好的协程。当 I/O 完成后,事件循环再将控制权交还给原来的协程。这是一种协作式多任务不涉及线程切换的开销,非常适合 I/O 密集型任务。

异步编程是 Python 处理 I/O 密集型任务的理想选择,因为它避免了 GIL 的限制,并且上下文切换开销远小于线程。

二、Python 异步编程的核心概念

Python 异步编程主要围绕以下几个核心概念展开:

2.1 协程 (Coroutine)

协程是异步编程的基本单元。在 Python 中,一个用 async def 定义的函数就是一个协程。协程在被调用时不会立即执行,而是返回一个“协程对象”(coroutine object)。这个对象代表了一个在未来某个时间点可能完成的操作。

1
2
3
4
async def my_coroutine():
print("开始协程")
await asyncio.sleep(1) # 模拟异步操作
print("协程结束")

2.2 asyncawait 关键字

  • async:

    • async def:用于定义一个协程函数。
    • async for:异步迭代。
    • async with:异步上下文管理器。
  • await:

    • await 关键字只能在 async def 函数内部使用。
    • await 一个表达式时(例如 await some_other_coroutine()),它会暂停当前协程的执行,将控制权交还给事件循环。
    • 事件循环会去运行其他就绪的协程。
    • await 的操作完成后,事件循环会恢复之前暂停的协程,并从 await 点继续执行。

2.3 事件循环 (Event Loop)

事件循环是 $asyncio$ 的核心。它是一个永不停止的循环,负责:

  1. 调度:决定哪个协程在何时运行。
  2. 管理:维护所有激活的协程和任务的状态。
  3. 处理 I/O 事件:监测(通过底层机制如 epollkqueue 等)哪些 I/O 操作已完成,然后“唤醒”相应的协程。

你可以将事件循环想象成一个勤劳的指挥家,在多个等待 I/O 的乐手(协程)之间切换,确保在空闲时段总有其他乐手在演奏。

图:asyncio 事件循环工作原理简化图

2.4 Future 和 Task

  • Future (未来对象):表示一个异步操作的最终结果。它可能尚未完成,但会在未来的某个时间点完成。当它完成时,可以查询其结果或异常。协程内部的 await 表达式,期待的就是一个 Future 对象。
  • Task (任务对象):是 asyncio.Future 的一个子类。它将协程封装起来,并代表着协程在事件循环中的执行。当我们使用 asyncio.create_task() 调度一个协程时,它会返回一个 Task 对象。事件循环实际运行的是这些 Task

三、asyncio 模块的使用

Python 的标准库 asyncio 提供了构建异步应用程序的框架。

3.1 asyncio.run():异步程序的入口点

asyncio.run(coroutine) 用于运行最顶层的协程 (main entry point)。它负责启动事件循环,运行协程直到完成,然后关闭事件循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio
import time

async def say_hello(name, delay):
print(f"{time.strftime('%H:%M:%S')} - {name}: 准备说你好...")
await asyncio.sleep(delay) # 模拟异步 I/O 操作
print(f"{time.strftime('%H:%M:%S')} - {name}: 你好!")

async def main():
start_time = time.time()

# 同时运行两个协程。由于 asyncio.sleep(delay) 会交出控制权,
# 两个协程会在一个事件循环中交替执行,而不是串行等待。
await say_hello("Alice", 2)
await say_hello("Bob", 1) # Bob 会在 Alice 'say_hello' (sleep) 期间先行执行

end_time = time.time()
print(f"总耗时: {end_time - start_time:.2f} 秒")

if __name__ == "__main__":
asyncio.run(main())

输出分析

1
2
3
4
5
14:30:00 - Alice: 准备说你好...
14:30:00 - Bob: 准备说你好...
14:30:01 - Bob: 你好!
14:30:02 - Alice: 你好!
总耗时: 2.00 秒

通过 awaitAlice 在等待的 2 秒期间,Bob 被事件循环调度并执行完毕。整个 main 函数的执行时间取决于最长的异步操作,而不是所有操作的总和。

3.2 asyncio.create_task()asyncio.gather():并发执行多个任务

要真正实现并发,我们需要让事件循环同时调度多个协程。

  • asyncio.create_task(coroutine): 将一个协程包装成一个 Task 并将其安排到事件循环中运行。它会立即返回 Task 对象,协程的执行将在事件循环的下一个可用周期开始。
  • asyncio.gather(*coros_or_futures, return_exceptions=False): 最常用的并发工具之一。它接受多个协程或 Future 对象,并以并发的方式运行它们。它会等待所有输入都完成,然后以列表的形式返回它们的结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import asyncio
import time
import aiohttp # 异步 HTTP 客户端库

async def fetch_url(url):
print(f"开始抓取: {url}")
async with aiohttp.ClientSession() as session:
async with session.get(url) as response: # 异步 HTTP GET
await asyncio.sleep(0.5) # 模拟一些处理时间
status = response.status
print(f"完成抓取: {url}, 状态码: {status}")
return status

async def main_concurrent():
urls = ["https://www.google.com", "https://www.python.org", "https://docs.python.org/3/library/asyncio.html"]
start_time = time.time()

# 创建独立的任务,并让事件循环并发调度它们
tasks = [asyncio.create_task(fetch_url(url)) for url in urls]

# 等待所有任务完成并获取结果
results = await asyncio.gather(*tasks)

end_time = time.time()
print(f"并发执行总耗时: {end_time - start_time:.2f} 秒")
print(f"所有结果: {results}")

if __name__ == "__main__":
# Windows 用户可能需要特殊设置才能运行 aiohttp
# asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main_concurrent())

输出分析

1
2
3
4
5
6
7
8
开始抓取: https://www.google.com
开始抓取: https://www.python.org
开始抓取: https://docs.python.org/3/library/asyncio.html
完成抓取: https://www.google.com, 状态码: 200
完成抓取: https://www.python.org, 状态码: 200
完成抓取: https://docs.python.org/3/library/asyncio.html, 状态码: 200
并发执行总耗时: 约 0.5-0.8 秒 (取决于网络和模拟的 sleep 时间)
所有结果: [200, 200, 200]

相较于同步版本,三个网络请求几乎同时开始,总耗时显著减少。

3.3 处理阻塞代码 (asyncio.to_thread)

尽管异步编程在 I/O 密集型任务中表现出色,但它仍然是单线程的。这意味着任何 CPU 密集型或会阻塞当前线程的同步 I/O 操作都会阻塞整个事件循环,导致所有其他异步任务停滞。

为了在异步代码中安全地执行阻塞操作,asyncio 提供了 asyncio.to_thread() (Python 3.9+) 函数。它会在一个新的线程中运行同步函数,并将结果以 Future 的形式返回给事件循环,从而不阻塞主事件循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import asyncio
import time

def blocking_cpu_task(label, duration):
"""一个模拟 CPU 密集型任务的阻塞函数"""
print(f"{time.strftime('%H:%M:%S')} - {label}: 开始阻塞 CPU 任务 (持续 {duration} 秒)...")
start = time.monotonic()
while time.monotonic() - start < duration:
pass # 纯 CPU 忙循环
print(f"{time.strftime('%H:%M:%S')} - {label}: 阻塞 CPU 任务完成。")
return f"{label} 完成"

async def async_io_task(label, delay):
"""一个模拟 I/O 密集型任务的异步函数"""
print(f"{time.strftime('%H:%M:%S')} - {label}: 开始异步 I/O 任务 (持续 {delay} 秒)...")
await asyncio.sleep(delay)
print(f"{time.strftime('%H:%M:%S')} - {label}: 异步 I/O 任务完成。")
return f"{label} 完成"

async def main_blocking_vs_async():
start_time = time.time()

# 直接调用阻塞任务会完全阻塞事件循环
# await blocking_cpu_task("Bad Blocking", 2) # 这会堵塞整个事件循环

# 使用 asyncio.to_thread() 在单独线程中运行阻塞任务
cpu_task_coro = asyncio.to_thread(blocking_cpu_task, "CPU Task", 2)
io_task_coro = async_io_task("IO Task", 1)

# 并发运行
results = await asyncio.gather(cpu_task_coro, io_task_coro)

end_time = time.time()
print(f"总耗时: {end_time - start_time:.2f} 秒")
print(f"所有结果: {results}")

if __name__ == "__main__":
asyncio.run(main_blocking_vs_async())

输出分析

1
2
3
4
5
6
14:30:00 - CPU Task: 开始阻塞 CPU 任务 (持续 2 秒)...
14:30:00 - IO Task: 开始异步 I/O 任务 (持续 1 秒)...
14:30:01 - IO Task: 异步 I/O 任务完成。
14:30:02 - CPU Task: 阻塞 CPU 任务完成。
总耗时: 2.00 秒
所有结果: ['CPU Task 完成', 'IO Task 完成']

通过 asyncio.to_thread,尽管 CPU Task 持续了 2 秒,但 IO Task 可以在其阻塞期间完成,整个程序耗时由最长的阻塞操作决定,实现了更好的并发性。

四、常见的异步第三方库

除了 asyncio 自身,Python 异步生态系统还涌现了许多优秀的第三方库:

  • aiohttp:一个功能强大的异步 HTTP 客户端/服务器框架,用于构建高性能 Web 应用程序和进行异步 HTTP 请求。
  • FastAPI:一个现代、快速 (高性能) 的 Web 框架,基于 Starlette 和 Pydantic 构建,原生支持 async/await
  • httpx:Python Http Standard,一个功能完备的 HTTP 客户端,同时支持同步和异步接口。
  • asyncpg / aiomysql / aiosqlite:异步数据库驱动,用于在异步应用中连接 PostgreSQL、MySQL、SQLite 等数据库。
  • SQLAlchemy (2.0+)*:Python 著名的 ORM/SQL 工具包,在 2.0 及以后版本提供了原生的异步 API 支持。
  • websockets:一个用于构建 WebSocket 服务器和客户端的库。

五、实践建议和常见陷阱

  1. 始终 await 异步操作: فراموش await 是初学者最常犯的错误。如果 await 一个协程,它将包装成 Future 并被调度执行。如果没有 await,仅是创建了一个协程对象,但它不会被事件循环激活。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    async def greet():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

    async def main():
    greet() # ❌ 协程对象已创建,但没有 await,不会运行
    await asyncio.sleep(0.1) # await 其他操作,确保事件循环有机会运行

    # 运行结果:只会输出 'Hello' (如果 greet() 没有 await sleep,且 main 中有 await)
    # 实际上由于没有 await greet(),greet()中的任何内容都不会被执行
    # 正确做法:await greet() 或 asyncio.create_task(greet())
  2. 避免在 async def 函数中直接执行阻塞代码:这会阻塞整个事件循环,导致所有其他异步任务停滞。如果必须执行阻塞操作,请使用 asyncio.to_thread() 或将阻塞操作放在单独的进程中。

  3. 异常处理:异步任务中的异常会传播到 await 它的调用者。使用 try...except 块来捕获异常。asyncio.gather 默认只返回第一个发生的异常,要返回所有异常并继续执行其他任务,可以设置 return_exceptions=True

  4. 取消任务Task 支持取消。可以通过 task.cancel() 来请求取消一个正在运行的任务。被取消的协程会抛出 asyncio.CancelledError,需要在协程内部进行优雅处理 (例如通过 try...except asyncio.CancelledError).

  5. 资源管理 (async with):对于需要打开和关闭的资源 (如文件、网络连接),应使用异步上下文管理器 (async with),如 aiohttp.ClientSession

    1
    2
    3
    async with aiohttp.ClientSession() as session:
    async with session.get(url) as response:
    data = await response.text()
  6. 调试asyncio 提供了方便的调试模式。可以通过 asyncio.run(main(), debug=True) 来启用,它会警告你未被 await 的协程、执行时间过长的回调等。

六、总结

Python 异步编程通过 async/await 关键字和 asyncio 模块,为处理 I/O 密集型任务提供了一种高效、单线程的并发模型。它解决了传统同步阻塞 I/O 的性能瓶颈,并避免了多线程在 Python 中因 GIL 带来的CPU并行限制。理解协程、事件循环、Future 和 Task 等核心概念,并掌握 asyncio.run()asyncio.create_task()asyncio.gather() 以及 asyncio.to_thread() 等工具的使用,是构建高性能、响应式 Python 应用程序的关键。随着 FastAPI、aiohttp 等异步生态的日益成熟,异步编程已经成为现代 Python 开发中不可或缺的技能。