学AI,懂这些Python就够了(二):async/await 异步编程 —— 让AI智能体快10倍的并发秘籍
系列:「学AI,懂这些Python就够了」—— 面向AI智能体开发者的Python速成指南
标签:
Python异步编程asyncioAI Agent并发难度:⭐⭐⭐☆☆
前言
假设你的智能体需要同时调用3个API(天气查询、新闻搜索、股价获取),每个API响应需要2秒。如果是同步代码,你需要等6秒。但用了异步编程,你只需要等2秒。
这就是异步编程在AI智能体开发中的核心价值:I/O密集型任务(API调用、数据库查询、文件读写)等待的时间,可以用来干别的事。
本文是 「学AI,懂这些Python就够了」 系列的第二篇,带你从零掌握Python异步编程,让你的智能体"快人一步"。
1. 为什么AI开发者必须学异步编程
1.1 AI智能体的I/O密集型特征
典型的AI智能体工作流是这样的:
用户输入 → 向量检索(100ms) → LLM推理(2s) → 工具调用(500ms) → LLM总结(1.5s) → 返回结果
每一步都是 I/O密集型操作(等待网络响应),而不是 CPU密集型操作(大量计算)。在等待网络响应时,CPU是空闲的——这就是异步编程发挥价值的地方。
1.2 收益量化
| 场景 | 同步耗时 | 异步耗时 | 加速比 |
|---|---|---|---|
| 调用1个API | 2.0s | 2.0s | 1x |
| 调用5个API(串行) | 10.0s | 2.1s | ~5x |
| 调用20个API(串行) | 40.0s | 2.5s | ~16x |
| 混合(检索+LLM+工具) | 5.0s | 2.3s | ~2x |
1.3 什么场景适合异步
| ✅ 适合异步 | ❌ 不适合异步 |
|---|---|
| 并发调用多个LLM API | 纯数学计算 |
| 同时检索多个知识库 | 本地文件压缩 |
| 批量发送消息/邮件 | 图像渲染处理 |
| WebSocket实时通信 | 大规模矩阵运算 |
| 流式输出+后台处理 | 单线程CPU密集型任务 |
2. 同步 vs 异步:一张图看懂
2.1 代码对比
先看一段直观的对比代码:
import time
import asyncio
# ========== 同步版本 ==========
def sync_task(name: str, delay: float):
"""模拟一个同步I/O任务"""
print(f"[同步] {name} 开始")
time.sleep(delay) # 阻塞当前线程!
print(f"[同步] {name} 完成")
return f"{name} 结果"
def run_sync():
start = time.time()
# 三个任务串行执行
result1 = sync_task("任务A", 1)
result2 = sync_task("任务B", 1)
result3 = sync_task("任务C", 1)
print(f"同步总耗时: {time.time() - start:.2f}秒")
# 输出: 同步总耗时: 3.00秒
# ========== 异步版本 ==========
async def async_task(name: str, delay: float):
"""模拟一个异步I/O任务"""
print(f"[异步] {name} 开始")
await asyncio.sleep(delay) # 不阻塞!让出控制权
print(f"[异步] {name} 完成")
return f"{name} 结果"
async def run_async():
start = time.time()
# 三个任务并发执行
results = await asyncio.gather(
async_task("任务A", 1),
async_task("任务B", 1),
async_task("任务C", 1),
)
print(f"异步总耗时: {time.time() - start:.2f}秒")
# 输出: 异步总耗时: 1.00秒
# 运行
# run_sync()
# asyncio.run(run_async())
2.2 核心原理:事件循环(Event Loop)
┌─────────────────────────────────────────────────────┐
│ 事件循环 (Event Loop) │
│ │
│ 任务A: [运行]──[等待I/O]──────────────[继续] │
│ 任务B: [运行]──[等待I/O]──────────[继续] │
│ 任务C: [运行]──[等待I/O]──────[继续] │
│ │
│ 时间轴: 0s ───── 0.5s ───── 1.0s ───── 1.5s │
│ │
│ 总耗时 ≈ 最慢的任务耗时,而不是各任务耗时之和 │
└─────────────────────────────────────────────────────┘
关键理解:
time.sleep()会阻塞整个线程,期间什么都不能做await asyncio.sleep()只是挂起当前协程,事件循环可以去执行其他任务- 所有异步任务都在 单线程 中运行,通过协程切换实现并发
3. async/await:异步编程的核心语法
3.1 定义和运行协程
import asyncio
# 1. async def 定义协程函数
async def hello():
print("Hello")
await asyncio.sleep(1) # await 等待另一个协程
print("World")
return "Done"
# 2. asyncio.run() 运行协程 —— 推荐方式
result = asyncio.run(hello())
print(result) # "Done"
⚠️ 重要:
asyncio.run()会创建事件循环,只能调用一次。在Jupyter Notebook中可以直接await。
3.2 create_task —— 创建并发任务
async def fetch_data(source: str, delay: float):
print(f"正在从 {source} 获取数据...")
await asyncio.sleep(delay)
print(f"{source} 数据获取完成")
return f"{source}_data"
async def main():
# 方式1:创建Task(立即开始执行)
task1 = asyncio.create_task(fetch_data("数据库A", 2))
task2 = asyncio.create_task(fetch_data("数据库B", 1))
task3 = asyncio.create_task(fetch_data("API-C", 1.5))
# 可以在这期间做其他事情
print("任务已启动,等待中...")
# 等待所有任务完成
result1 = await task1
result2 = await task2
result3 = await task3
print(f"结果: {result1}, {result2}, {result3}")
asyncio.run(main())
3.3 顺序await vs 并发await
async def bad_example():
"""❌ 错误示范:虽然用了async,但还是串行执行"""
r1 = await fetch_data("API-1", 1) # 等1秒
r2 = await fetch_data("API-2", 1) # 再等1秒
r3 = await fetch_data("API-3", 1) # 再等1秒
return [r1, r2, r3]
# 总耗时: 3秒 —— 跟同步一样慢!
async def good_example():
"""✅ 正确示范:真正并发执行"""
tasks = [
asyncio.create_task(fetch_data(f"API-{i}", 1))
for i in range(3)
]
results = []
for task in tasks:
results.append(await task)
return results
# 总耗时: 1秒 —— 真正的并发!
async def better_example():
"""✅ 更简洁:使用gather"""
results = await asyncio.gather(
fetch_data("API-1", 1),
fetch_data("API-2", 1),
fetch_data("API-3", 1),
)
return results
# 总耗时: 1秒 —— 最简洁的写法!
3.4 超时控制与异常处理
async def safe_api_call():
try:
# 设置3秒超时
result = await asyncio.wait_for(
fetch_data("慢速API", 10),
timeout=3.0
)
return result
except asyncio.TimeoutError:
print("API调用超时,返回默认值")
return "default_value"
except Exception as e:
print(f"API调用出错: {e}")
return None
async def batch_with_timeout():
"""批量调用,部分失败不影响整体"""
results = await asyncio.gather(
fetch_data("API-A", 1),
fetch_data("API-B", 5), # 这个会超时
fetch_data("API-C", 0.5),
return_exceptions=True # 关键参数!异常不中断其他任务
)
# results: ["API-A_data", TimeoutError(), "API-C_data"]
return [r for r in results if not isinstance(r, Exception)]
4. 任务管理:gather、wait、as_completed
这是异步编程最核心的3个任务编排工具。
4.1 gather —— 并发执行,按序返回
async def demo_gather():
"""最常用的并发工具"""
results = await asyncio.gather(
fetch_data("源1", 3), # 最慢的
fetch_data("源2", 0.5), # 最快的
fetch_data("源3", 1),
)
# 结果顺序 = 参数顺序,不是完成顺序!
# results = ["源1_data", "源2_data", "源3_data"]
return results
# 适用场景:你需要所有结果,且关心结果与请求的对应关系
4.2 wait —— 灵活等待策略
async def demo_wait():
tasks = [
asyncio.create_task(fetch_data(f"源{i}", i * 0.5))
for i in range(5)
]
# 策略1:等待所有完成
done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
# 策略2:等待第一个完成就开始处理
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
first_result = done.pop().result()
print(f"最快的任务返回了: {first_result}")
# pending中的任务还在继续运行!
# 策略3:遇到第一个异常就返回
done, pending = await asyncio.wait(
tasks, return_when=asyncio.FIRST_EXCEPTION
)
4.3 as_completed —— 按完成顺序处理
async def demo_as_completed():
"""谁先完成就先处理谁 —— 流式处理的最佳工具"""
coros = [fetch_data(f"源{i}", i * 0.8) for i in range(5)]
for coro in asyncio.as_completed(coros):
# 谁先完成,这个循环就先把谁的结果给你
result = await coro
print(f"收到结果: {result}")
# 可以立即处理,不用等最慢的那个!
# 实际AI应用:多个模型同时推理,展示最先返回的结果
async def multi_model_inference(prompt: str, models: list):
"""多个模型同时推理,按完成顺序流式返回"""
async def call_model(model_name: str):
return await call_llm(prompt, model=model_name)
for coro in asyncio.as_completed(
[call_model(m) for m in models]
):
result = await coro
yield result # 先完成的先输出
4.4 工具对比决策树
你需要的行为是什么?
├── 等所有完成,按请求顺序拿结果 → 用 gather()
├── 等第一个完成就开始处理 → 用 wait(FIRST_COMPLETED)
├── 边完成边处理,流式输出 → 用 as_completed()
└── 结构化并发(Python 3.11+) → 用 TaskGroup
5. 协程间通信与并发控制
5.1 asyncio.Queue —— 生产者-消费者模式
async def producer_consumer_demo():
"""智能体中常见的模式:LLM产出任务,工具消费者执行"""
task_queue = asyncio.Queue(maxsize=10)
result_queue = asyncio.Queue()
async def producer():
"""LLM 生成工具调用任务"""
for i in range(20):
await task_queue.put(f"task_{i}")
print(f"产出任务 task_{i}")
await asyncio.sleep(0.1) # LLM流式生成中
async def consumer(worker_id: int):
"""工具执行器"""
while True:
task = await task_queue.get()
if task is None: # 结束信号
break
result = f"{task}_done_by_worker_{worker_id}"
await result_queue.put(result)
task_queue.task_done()
await asyncio.sleep(0.3) # 模拟工具执行
# 启动生产者和消费者
producers = [asyncio.create_task(producer())]
consumers = [asyncio.create_task(consumer(i)) for i in range(3)]
# 等待所有任务完成
await asyncio.gather(*producers)
await task_queue.join() # 等待所有任务被处理
# 发送结束信号
for _ in consumers:
await task_queue.put(None)
await asyncio.gather(*consumers)
5.2 Semaphore —— 控制并发数
async def controlled_concurrency():
"""限制同时进行的请求数量 —— 防止被限流"""
semaphore = asyncio.Semaphore(5) # 最多同时5个请求
async def rate_limited_request(url: str):
async with semaphore: # 获取信号量
return await fetch_url(url)
# 离开with块时自动释放信号量
# 即使有100个URL,同一时间最多5个并发请求
urls = [f"https://api.example.com/endpoint_{i}" for i in range(100)]
tasks = [rate_limited_request(url) for url in urls]
results = await asyncio.gather(*tasks)
return results
5.3 asyncio.Lock —— 保护共享资源
async def shared_resource_demo():
"""多个协程操作同一资源时确保线程安全"""
lock = asyncio.Lock()
shared_data = {"count": 0}
async def safe_increment():
async with lock: # 同一时间只有一个协程能进入
current = shared_data["count"]
await asyncio.sleep(0.01) # 模拟计算
shared_data["count"] = current + 1
# 100个协程同时+1,结果正确为100
await asyncio.gather(*[safe_increment() for _ in range(100)])
print(shared_data["count"]) # 100
6. 实战:异步HTTP请求封装
6.1 基础异步请求(aiohttp)
import aiohttp
import asyncio
async def fetch_json(url: str) -> dict:
"""异步获取JSON数据"""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
# 推荐使用 httpx(同时支持同步和异步,API更现代)
import httpx
async def fetch_with_httpx(url: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=10.0)
response.raise_for_status()
return response.json()
6.2 完整的AI API调用客户端
import httpx
import asyncio
from typing import Optional
class AsyncAPIClient:
"""异步API客户端 —— 用于并发调用LLM接口"""
def __init__(
self,
base_url: str,
api_key: str,
max_concurrent: int = 10,
timeout: float = 30.0,
):
self.base_url = base_url
self.api_key = api_key
self._semaphore = asyncio.Semaphore(max_concurrent)
self._timeout = timeout
self._client: Optional[httpx.AsyncClient] = None
async def __aenter__(self):
"""异步上下文管理器入口"""
self._client = httpx.AsyncClient(
base_url=self.base_url,
timeout=self._timeout,
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
},
)
return self
async def __aexit__(self, *args):
"""异步上下文管理器出口"""
if self._client:
await self._client.aclose()
async def chat_completion(
self, messages: list, model: str = "gpt-4", **kwargs
) -> dict:
"""调用Chat Completions API(带并发控制)"""
async with self._semaphore:
for attempt in range(3): # 最多重试3次
try:
response = await self._client.post(
"/v1/chat/completions",
json={
"model": model,
"messages": messages,
**kwargs,
},
)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code >= 500:
# 服务器错误,指数退避重试
await asyncio.sleep(2 ** attempt)
continue
raise
except httpx.TimeoutException:
if attempt == 2:
raise
await asyncio.sleep(1)
raise Exception("重试耗尽")
async def batch_completion(
self, prompts: list[list[dict]], model: str = "gpt-4"
) -> list[dict]:
"""批量并发调用LLM"""
tasks = [
self.chat_completion(messages, model=model)
for messages in prompts
]
return await asyncio.gather(*tasks, return_exceptions=True)
# 使用方式
async def main():
async with AsyncAPIClient(
base_url="https://api.openai.com",
api_key="sk-xxx",
max_concurrent=5,
) as client:
# 单个请求
result = await client.chat_completion(
messages=[{"role": "user", "content": "Hello"}]
)
# 批量并发(5个请求,最多同时5个)
prompts = [
[{"role": "user", "content": f"问题{i}"}]
for i in range(10)
]
results = await client.batch_completion(prompts)
return results
# asyncio.run(main())
7. 异步编程最佳实践
7.1 DO:应该做的事
# ✅ 使用 asyncio.run() 作为入口
if __name__ == "__main__":
asyncio.run(main())
# ✅ 使用 Semaphore 控制并发数
sem = asyncio.Semaphore(5)
# ✅ 设置超时
result = await asyncio.wait_for(long_task(), timeout=10.0)
# ✅ 用 return_exceptions=True 防止部分失败影响整体
results = await asyncio.gather(*tasks, return_exceptions=True)
# ✅ 使用异步上下文管理器管理资源
async with httpx.AsyncClient() as client:
...
# ✅ 流式响应用 async for
async for chunk in stream_response():
process(chunk)
7.2 DON’T:不应做的事
# ❌ 在异步函数中使用 time.sleep —— 会阻塞事件循环!
async def bad():
time.sleep(1) # 阻塞整个线程!
await asyncio.sleep(1) # ✅ 这才是正确的
# ❌ 在异步函数中使用 requests —— 没有异步版本
async def bad():
import requests
requests.get(url) # 阻塞!
# ✅ 用 httpx.AsyncClient 或 aiohttp
# ❌ 忘记 await
async def bad():
asyncio.sleep(1) # 返回一个coroutine对象,但没有执行!
await asyncio.sleep(1) # ✅ 正确
# ❌ 循环中逐个 await(串行执行)
async def bad():
for url in urls:
result = await fetch(url) # 一个接一个
# ✅ 用 gather/as_completed 并发
# ❌ 不处理 CancelledError
async def bad():
try:
while True:
await asyncio.sleep(1)
except asyncio.CancelledError:
print("清理资源...") # ✅ 正确处理后重新抛出或return
raise
7.3 异步编程检查清单
- 所有 I/O 操作都使用异步版本(
aiohttp/httpx.AsyncClient) - 永远不要用
time.sleep(),用await asyncio.sleep() - 并发任务用
gather()而非循环await - 用
Semaphore限制并发数防止限流 - 所有网络请求都设置超时
- 用
return_exceptions=True做容错处理 - 使用
async with管理异步资源
8. 总结与下一步
本篇核心知识点
异步编程知识体系
├── 核心概念:事件循环、协程、非阻塞I/O
├── 基础语法:async def / await / asyncio.run()
├── 任务编排:gather() / wait() / as_completed() / TaskGroup
├── 并发控制:Semaphore / Lock / Queue
├── HTTP实践:aiohttp / httpx.AsyncClient
└── 最佳实践:7要6不要
常用 asyncio API 速查
| API | 用途 | 签名 |
|---|---|---|
asyncio.run(coro) |
运行协程入口 | run(main()) |
asyncio.create_task(coro) |
创建Task立即执行 | task = create_task(coro) |
asyncio.gather(*tasks) |
并发执行,按序返回 | results = await gather(*tasks) |
asyncio.wait(tasks) |
灵活等待策略 | done, pending = await wait(tasks) |
asyncio.wait_for(coro, timeout) |
超时控制 | result = await wait_for(coro, 10) |
asyncio.as_completed(coros) |
按完成顺序迭代 | for coro in as_completed(coros) |
asyncio.sleep(delay) |
异步等待 | await sleep(1) |
asyncio.Semaphore(n) |
并发数控制 | async with Semaphore(5) |
asyncio.Queue(maxsize) |
协程间通信 | await queue.put(item) |
asyncio.Lock() |
互斥锁 | async with lock |
下一步
掌握了异步编程,你的智能体已经能从"慢悠悠的串行"变成"火力全开的并发"。下一篇文章中,我们将学习AI开发者最常用的4个Python第三方库——requests、httpx、pydantic、dataclasses,它们会让你的代码更简洁、更健壮。
上一篇:学AI,懂这些Python就够了(一):变量·数据类型·控制流·函数·类 —— 30分钟搞定Python核心语法
下一篇:学AI,懂这些Python就够了(三):requests·httpx·pydantic·dataclasses —— AI开发者必装的4个Python库
本文是「学AI,懂这些Python就够了」系列的第2篇。异步编程是区分"能写代码"和"能写高效代码"的分水岭,多写多练,把 gather 和 Semaphore 变成肌肉记忆。
更多推荐
所有评论(0)