Python 异步编程 是一种处理并发任务的编程范式,它允许程序在等待某些操作(如 I/O 操作、网络请求、数据库查询)完成时,切换到执行其他任务,从而提高程序的吞吐量和响应速度。与传统的多线程/多进程并发模型不同,异步编程通常使用协程 (Coroutines)事件循环 (Event Loop) 来实现,避免了线程/进程切换的开销,也绕开了 Python 的全局解释器锁 (GIL) 对 CPU 密集型任务的限制(尽管异步编程主要适用于 I/O 密集型任务)。

核心思想:异步编程通过在等待 I/O 完成时“暂停”当前任务,并“切换”到其他可执行任务,从而在单线程内实现并发和最大化 I/O 利用率。


一、为什么需要异步编程?

传统的 Python 程序(同步阻塞式)在执行 I/O 操作时会阻塞整个程序,直到 I/O 完成。例如,一个 Web 服务器在处理一个耗时的网络请求时,就无法处理其他用户的请求,导致性能低下。

1.1 同步阻塞 (Synchronous Blocking)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import time
import requests

def fetch_url_sync(url):
print(f"Start fetching {url}")
response = requests.get(url) # 阻塞式I/O操作
print(f"Finished fetching {url} in {response.elapsed.total_seconds():.2f}s")
return response.text

urls = [
"https://jsonplaceholder.typicode.com/todos/1",
"https://jsonplaceholder.typicode.com/todos/2",
"https://jsonplaceholder.typicode.com/todos/3",
]

start_time = time.time()
for url in urls:
fetch_url_sync(url)
end_time = time.time()
print(f"Total sync time: {end_time - start_time:.2f}s")
# 假设每个请求耗时 0.2s,则总耗时约 0.6s

这段代码会逐个执行 URL 请求,每个请求都会阻塞程序的执行,直到响应返回。

1.2 多线程 (Multithreading)

多线程可以实现并行执行任务,但 Python 的 GIL 限制了同一时刻只有一个线程能在 CPU 上执行 Python 字节码。对于计算密集型任务,多线程并不能真正并行加速。对于 I/O 密集型任务,线程在等待 I/O 时会释放 GIL,所以多线程在 I/O 密集型场景下确实能提高并发度。然而,线程的创建和上下文切换有开销,且存在数据竞争和锁的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import time
import requests
import threading

def fetch_url_thread(url):
print(f"Start fetching {url} in thread {threading.current_thread().name}")
response = requests.get(url)
print(f"Finished fetching {url} in {response.elapsed.total_seconds():.2f}s in thread {threading.current_thread().name}")
return response.text

urls = [
"https://jsonplaceholder.typicode.com/todos/1",
"https://jsonplaceholder.typicode.com/todos/2",
"https://jsonplaceholder.typicode.com/todos/3",
]

start_time = time.time()
threads = []
for url in urls:
thread = threading.Thread(target=fetch_url_thread, args=(url,), name=f"Thread-{url.split('/')[-1]}")
threads.append(thread)
thread.start()

for thread in threads:
thread.join() # 等待所有线程完成
end_time = time.time()
print(f"Total multithread time: {end_time - start_time:.2f}s")
# 假设每个请求耗时 0.2s,多个请求并行,但受GIL和线程开销影响,总耗时可能接近最慢的那个请求,比如0.25s

1.3 异步编程 (Asynchronous Programming)

异步编程是一种单线程并发模型。它通过在高延迟操作(如网络请求、磁盘 I/O)发生时,将 CPU 资源让给其他任务,从而在单个线程内实现高并发。当 I/O 操作完成后,程序会“恢复”之前暂停的任务。

优点

  • 高并发、高性能 (I/O 密集型):避免了线程/进程切换的开销,以及 GIL 的限制(因为 I/O 操作时 Python 代码并没有运行)。
  • 资源消耗低:协程比线程/进程更轻量级。
  • 代码结构清晰async/await 语法使得异步代码看起来像同步代码,易于理解和维护。
  • 避免死锁问题:由于是单线程,不存在多线程/多进程的资源竞争和死锁问题。

缺点

  • 不适合计算密集型任务:由于是单线程,无法利用多核 CPU,计算密集型任务仍会阻塞事件循环。
  • 传染性 (“Async/Await is Contagious”):一旦引入异步,相关的函数和库也需要是异步的,否则同步阻塞调用会阻塞整个事件循环。
  • 调试难度稍高:异步程序的错误栈跟踪可能比同步程序复杂。

二、Python 异步编程的核心概念

Python 异步编程主要由 asyncio 库提供支持,并围绕以下核心概念展开:

2.1 协程 (Coroutines)

  • 定义:协程是一种用户态的轻量级线程,它允许函数在执行过程中暂停,并在稍后从暂停点恢复执行。在 Python 中,通过 async def 定义的函数就是协程函数,调用它会返回一个协程对象。
  • 关键字
    • async def:定义一个协程函数
    • await:用于等待一个可等待对象 (Awaitable) 的完成。当 await 一个 I/O 操作时,协程会暂停执行,并将控制权交还给事件循环,从而允许事件循环去运行其他协程。
1
2
3
4
async def my_coroutine():
print("Coroutine started")
await asyncio.sleep(1) # 模拟一个耗时1秒的I/O操作
print("Coroutine finished")

2.2 可等待对象 (Awaitables)

可等待对象是可以在 await 表达式中使用的对象。主要有三种可等待对象:
* 协程 (Coroutines):通过 async def 定义的函数被调用后返回的对象。
* 任务 (Tasks)asyncio.Task 对象,用于在事件循环中调度和运行协程。
* Future (未来对象)asyncio.Future 对象,表示一个尚未完成的操作的结果,可以被 await

2.3 事件循环 (Event Loop)

  • 定义:事件循环是异步编程的核心。它是一个无限循环,负责监听事件(如 I/O 完成、定时器到期等),并将这些事件分派给相应的协程来处理。
  • 作用:当一个协程 await 一个阻塞操作时,它会暂停并把控制权交还给事件循环。事件循环会去执行其他已准备好的协程。当原先的阻塞操作完成时,事件循环会通知并重新调度对应的协程继续执行。
  • 获取和运行
    • asyncio.get_event_loop():获取当前线程的事件循环。
    • loop.run_until_complete(coro):运行一个协程直到它完成。
    • asyncio.run(coro) (Python 3.7+ 推荐):一个更高级的函数,负责创建和关闭事件循环,运行协程,并处理一些细节。

2.4 任务 (Tasks)

  • 定义asyncio.Taskasyncio 中包装协程的可等待对象。它用于在事件循环中并发地调度和运行协程。通过 asyncio.create_task(coro) 创建一个任务,并将其注册到事件循环中。
  • 作用:如果你创建了多个任务,事件循环会在它们之间切换执行,从而实现并发。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio

async def task_func(name, duration):
print(f"Task {name}: Starting, will run for {duration} seconds.")
await asyncio.sleep(duration) # 模拟I/O操作
print(f"Task {name}: Finished.")

async def main():
print("Main: Creating tasks...")
# 创建任务,并让事件循环调度它们
task1 = asyncio.create_task(task_func("One", 2))
task2 = asyncio.create_task(task_func("Two", 1))

print("Main: Waiting for tasks to complete...")
# await 任务,等待它们完成
await task1
await task2
print("Main: All tasks completed.")

if __name__ == "__main__":
asyncio.run(main())

运行结果可能如下 (具体时间点由调度决定):

1
2
3
4
5
6
7
Main: Creating tasks...
Main: Waiting for tasks to complete...
Task One: Starting, will run for 2 seconds.
Task Two: Starting, will run for 1 second.
Task Two: Finished.
Task One: Finished.
Main: All tasks completed.

可以看到,”Task Two: Finished.” 比 “Task One: Finished.” 先输出,说明它们是并发执行的。总耗时接近最长的任务(2秒),而不是它们的总和(3秒)。

三、asyncio 模块的使用

asyncio 是 Python 用于编写并发代码的基础库,使用 async/await 语法。

3.1 基本的 async/await 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio
import time

async def say_after(delay, what):
await asyncio.sleep(delay) # 模拟耗时操作
print(what)

async def main():
print(f"started at {time.strftime('%X')}")

# 协程对象并不会立即执行,需要用await或者任务来驱动
await say_after(1, 'hello')
await say_after(2, 'world') # 这个会阻塞上面hello的完成

print(f"finished at {time.strftime('%X')}")

if __name__ == "__main__":
asyncio.run(main())
# 预期输出:
# started at 00:00:00 (示例时间)
# hello
# world
# finished at 00:00:03 (示例时间)
# 总耗时约 3 秒,因为是顺序 await

3.2 实现并发 (使用 asyncio.create_taskasyncio.gather)

要真正实现并发而非顺序执行,需要将协程包装成任务。

a. 使用 asyncio.create_task

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio
import time

async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)

async def main_concurrent_task():
print(f"started at {time.strftime('%X')}")

# 创建任务,让事件循环调度它们
task1 = asyncio.create_task(say_after(1, 'hello'))
task2 = asyncio.create_task(say_after(2, 'world'))

# await 任务,等待它们完成。它们是并发运行的
await task1
await task2

print(f"finished at {time.strftime('%X')}")

if __name__ == "__main__":
asyncio.run(main_concurrent_task())
# 预期输出:
# started at 00:00:00 (示例时间)
# hello
# world
# finished at 00:00:02 (示例时间)
# 总耗时约 2 秒 (取最耗时任务的时间),因为是并发执行

b. 使用 asyncio.gather

asyncio.gather 是一个更方便的方式来同时运行多个可等待对象,并收集它们的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import asyncio
import time

async def say_after(delay, what):
await asyncio.sleep(delay)
return what # 返回结果

async def main_gather():
print(f"started at {time.strftime('%X')}")

# 同时运行多个可等待对象,并等待所有完成
results = await asyncio.gather(
say_after(1, 'hello'),
say_after(2, 'world'),
say_after(0.5, 'python')
)

print(f"results: {results}")
print(f"finished at {time.strftime('%X')}")

if __name__ == "__main__":
asyncio.run(main_gather())
# 预期输出:
# started at 00:00:00
# python
# hello
# world
# results: ['hello', 'world', 'python']
# finished at 00:00:02
# 总耗时约 2 秒

3.3 异步网络请求示例

结合 aiohttp(一个异步 HTTP 客户端/服务器库)进行异步网络请求,相比 requests 性能更高。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import asyncio
import aiohttp
import time

async def fetch_url_async(session, url):
print(f"Start fetching {url}")
async with session.get(url) as response: # 异步HTTP GET请求
content = await response.text() # 异步读取响应体
print(f"Finished fetching {url}")
return len(content)

async def main_async_fetch():
urls = [
"https://jsonplaceholder.typicode.com/todos/1",
"https://jsonplaceholder.typicode.com/todos/2",
"https://jsonplaceholder.typicode.com/todos/3",
"https://jsonplaceholder.typicode.com/posts/1",
"https://jsonplaceholder.typicode.com/posts/2",
]

start_time = time.time()
async with aiohttp.ClientSession() as session: # 异步HTTP客户端会话
tasks = [fetch_url_async(session, url) for url in urls]
# 同时运行所有任务,等待它们完成
results = await asyncio.gather(*tasks)

end_time = time.time()
print(f"Total async fetch time: {end_time - start_time:.2f}s")
print(f"Results (content lengths): {results}")

if __name__ == "__main__":
asyncio.run(main_async_fetch())
# 假设每个请求耗时 0.2s,5个请求并发,总耗时可能在 0.25s 左右

3.4 异步迭代器和异步生成器

  • 异步迭代器 (async for):允许你在异步地获取元素时暂停。
  • 异步生成器 (async yield):允许你创建一个生成器,其生成值的过程可以是异步的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import asyncio

class AsyncCounter:
def __init__(self, limit):
self.limit = limit
self.current = 0

async def __aiter__(self): # 异步迭代器协议
return self

async def __anext__(self): # 异步迭代器协议
if self.current < self.limit:
await asyncio.sleep(0.1) # 模拟异步操作
self.current += 1
return self.current
else:
raise StopAsyncIteration

async def async_generator_example():
for i in range(3):
await asyncio.sleep(0.05)
yield i * 2 # 异步生成值

async def main_async_iter_gen():
print("--- Async Iterator ---")
async for count in AsyncCounter(5):
print(f"Count: {count}")

print("--- Async Generator ---")
async for value in async_generator_example():
print(f"Generated: {value}")

if __name__ == "__main__":
asyncio.run(main_async_iter_gen())

四、异步编程的挑战与注意事项

  1. 同步阻塞函数会阻塞事件循环:如果在异步代码中调用了普通的同步阻塞函数,那么整个事件循环都会被阻塞,导致其他协程无法执行。

    • 解决方案:对于 I/O 密集型的同步阻塞函数,可以使用 loop.run_in_executor() 将其放到单独的线程池或进程池中运行,避免阻塞主事件循环。
    • 对于 CPU 密集型的同步阻塞函数,也应使用 run_in_executor() 放到进程池中运行,以充分利用多核 CPU 并绕过 GIL。
  2. 错误处理:异步代码中的异常处理与同步代码类似,使用 try...except 块。对于 asyncio.gather,如果 return_exceptions=True,则异常会被作为结果返回;否则,第一个发生的异常会立即传播。

  3. 取消任务 (Canceling Tasks)

    • task.cancel() 可以尝试取消一个正在运行的任务。
    • 任务应该优雅地处理取消请求,通常在 try...finally 块中使用 asyncio.CancelledError 进行捕获。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    async def cancelable_task():
    try:
    print("Cancelable task: Starting...")
    await asyncio.sleep(5)
    print("Cancelable task: Finished.")
    except asyncio.CancelledError:
    print("Cancelable task: Was cancelled!")
    raise # 重新抛出以表明任务被取消

    async def main_cancel():
    task = asyncio.create_task(cancelable_task())
    await asyncio.sleep(1) # 等待任务启动
    task.cancel() # 取消任务
    try:
    await task # 等待任务真正结束 (或取消)
    except asyncio.CancelledError:
    print("Main: Task was indeed cancelled.")
    print("Main: Done.")

    if __name__ == "__main__":
    asyncio.run(main_cancel())
  4. 死锁和竞争条件 (避免):由于 asyncio 是单线程模型,理论上不会有传统多线程的死锁问题。但如果使用 asyncio.Lock 等同步原语时,仍需小心编写逻辑以避免程序逻辑上的死锁(例如,一直等待一个不会被释放的锁)。

五、总结

Python 的异步编程,特别是基于 asyncio 库和 async/await 语法的协程模型,为处理 I/O 密集型任务提供了一种高效、轻量级的解决方案。

  • 核心优势:通过单线程内的任务切换,实现高并发、低资源消耗,特别适用于网络服务、爬虫等场景。
  • 关键概念
    • 协程 (async def, await):可暂停和恢复的函数。
    • 事件循环 (asyncio.run, asyncio.get_event_loop):调度和管理协程执行的核心。
    • 任务 (asyncio.create_task):将协程包装成可由事件循环调度的并发单位。
  • 常用工具asyncio.gather 用于并发运行多个任务并收集结果;aiohttp 等第三方库提供异步 I/O 功能。
  • 注意事项:避免同步阻塞代码阻塞事件循环;正确处理异常和任务取消;异步代码具有“传染性”。

随着 Python 3.7+ 对 asyncio.run() 的引入,异步编程的入门门槛已大大降低。掌握异步编程,将能极大地拓宽 Python 在高性能网络应用领域的应用范围。