系列:「学AI,懂这些Python就够了」—— 面向AI智能体开发者的Python速成指南

标签:Python 异步编程 asyncio AI Agent 并发

难度:⭐⭐⭐☆☆


前言

假设你的智能体需要同时调用3个API(天气查询、新闻搜索、股价获取),每个API响应需要2秒。如果是同步代码,你需要等6秒。但用了异步编程,你只需要等2秒

这就是异步编程在AI智能体开发中的核心价值:I/O密集型任务(API调用、数据库查询、文件读写)等待的时间,可以用来干别的事。

本文是 「学AI,懂这些Python就够了」 系列的第二篇,带你从零掌握Python异步编程,让你的智能体"快人一步"。


1. 为什么AI开发者必须学异步编程

1.1 AI智能体的I/O密集型特征

典型的AI智能体工作流是这样的:

用户输入 → 向量检索(100ms) → LLM推理(2s) → 工具调用(500ms) → LLM总结(1.5s) → 返回结果

每一步都是 I/O密集型操作(等待网络响应),而不是 CPU密集型操作(大量计算)。在等待网络响应时,CPU是空闲的——这就是异步编程发挥价值的地方。

1.2 收益量化

场景 同步耗时 异步耗时 加速比
调用1个API 2.0s 2.0s 1x
调用5个API(串行) 10.0s 2.1s ~5x
调用20个API(串行) 40.0s 2.5s ~16x
混合(检索+LLM+工具) 5.0s 2.3s ~2x

1.3 什么场景适合异步

✅ 适合异步 ❌ 不适合异步
并发调用多个LLM API 纯数学计算
同时检索多个知识库 本地文件压缩
批量发送消息/邮件 图像渲染处理
WebSocket实时通信 大规模矩阵运算
流式输出+后台处理 单线程CPU密集型任务

2. 同步 vs 异步:一张图看懂

2.1 代码对比

先看一段直观的对比代码:

import time
import asyncio

# ========== 同步版本 ==========
def sync_task(name: str, delay: float):
    """模拟一个同步I/O任务"""
    print(f"[同步] {name} 开始")
    time.sleep(delay)  # 阻塞当前线程!
    print(f"[同步] {name} 完成")
    return f"{name} 结果"

def run_sync():
    start = time.time()
    # 三个任务串行执行
    result1 = sync_task("任务A", 1)
    result2 = sync_task("任务B", 1)
    result3 = sync_task("任务C", 1)
    print(f"同步总耗时: {time.time() - start:.2f}秒")
    # 输出: 同步总耗时: 3.00秒

# ========== 异步版本 ==========
async def async_task(name: str, delay: float):
    """模拟一个异步I/O任务"""
    print(f"[异步] {name} 开始")
    await asyncio.sleep(delay)  # 不阻塞!让出控制权
    print(f"[异步] {name} 完成")
    return f"{name} 结果"

async def run_async():
    start = time.time()
    # 三个任务并发执行
    results = await asyncio.gather(
        async_task("任务A", 1),
        async_task("任务B", 1),
        async_task("任务C", 1),
    )
    print(f"异步总耗时: {time.time() - start:.2f}秒")
    # 输出: 异步总耗时: 1.00秒

# 运行
# run_sync()
# asyncio.run(run_async())

2.2 核心原理:事件循环(Event Loop)

┌─────────────────────────────────────────────────────┐
│                    事件循环 (Event Loop)              │
│                                                     │
│  任务A: [运行]──[等待I/O]──────────────[继续]        │
│  任务B:        [运行]──[等待I/O]──────────[继续]     │
│  任务C:                [运行]──[等待I/O]──────[继续] │
│                                                     │
│  时间轴: 0s ───── 0.5s ───── 1.0s ───── 1.5s       │
│                                                     │
│  总耗时 ≈ 最慢的任务耗时,而不是各任务耗时之和         │
└─────────────────────────────────────────────────────┘

关键理解

  • time.sleep() 会阻塞整个线程,期间什么都不能做
  • await asyncio.sleep() 只是挂起当前协程,事件循环可以去执行其他任务
  • 所有异步任务都在 单线程 中运行,通过协程切换实现并发

3. async/await:异步编程的核心语法

3.1 定义和运行协程

import asyncio

# 1. async def 定义协程函数
async def hello():
    print("Hello")
    await asyncio.sleep(1)  # await 等待另一个协程
    print("World")
    return "Done"

# 2. asyncio.run() 运行协程 —— 推荐方式
result = asyncio.run(hello())
print(result)  # "Done"

⚠️ 重要asyncio.run() 会创建事件循环,只能调用一次。在Jupyter Notebook中可以直接 await

3.2 create_task —— 创建并发任务

async def fetch_data(source: str, delay: float):
    print(f"正在从 {source} 获取数据...")
    await asyncio.sleep(delay)
    print(f"{source} 数据获取完成")
    return f"{source}_data"

async def main():
    # 方式1:创建Task(立即开始执行)
    task1 = asyncio.create_task(fetch_data("数据库A", 2))
    task2 = asyncio.create_task(fetch_data("数据库B", 1))
    task3 = asyncio.create_task(fetch_data("API-C", 1.5))

    # 可以在这期间做其他事情
    print("任务已启动,等待中...")

    # 等待所有任务完成
    result1 = await task1
    result2 = await task2
    result3 = await task3

    print(f"结果: {result1}, {result2}, {result3}")

asyncio.run(main())

3.3 顺序await vs 并发await

async def bad_example():
    """❌ 错误示范:虽然用了async,但还是串行执行"""
    r1 = await fetch_data("API-1", 1)  # 等1秒
    r2 = await fetch_data("API-2", 1)  # 再等1秒
    r3 = await fetch_data("API-3", 1)  # 再等1秒
    return [r1, r2, r3]
    # 总耗时: 3秒 —— 跟同步一样慢!

async def good_example():
    """✅ 正确示范:真正并发执行"""
    tasks = [
        asyncio.create_task(fetch_data(f"API-{i}", 1))
        for i in range(3)
    ]
    results = []
    for task in tasks:
        results.append(await task)
    return results
    # 总耗时: 1秒 —— 真正的并发!

async def better_example():
    """✅ 更简洁:使用gather"""
    results = await asyncio.gather(
        fetch_data("API-1", 1),
        fetch_data("API-2", 1),
        fetch_data("API-3", 1),
    )
    return results
    # 总耗时: 1秒 —— 最简洁的写法!

3.4 超时控制与异常处理

async def safe_api_call():
    try:
        # 设置3秒超时
        result = await asyncio.wait_for(
            fetch_data("慢速API", 10),
            timeout=3.0
        )
        return result
    except asyncio.TimeoutError:
        print("API调用超时,返回默认值")
        return "default_value"
    except Exception as e:
        print(f"API调用出错: {e}")
        return None

async def batch_with_timeout():
    """批量调用,部分失败不影响整体"""
    results = await asyncio.gather(
        fetch_data("API-A", 1),
        fetch_data("API-B", 5),    # 这个会超时
        fetch_data("API-C", 0.5),
        return_exceptions=True      # 关键参数!异常不中断其他任务
    )
    # results: ["API-A_data", TimeoutError(), "API-C_data"]
    return [r for r in results if not isinstance(r, Exception)]

4. 任务管理:gather、wait、as_completed

这是异步编程最核心的3个任务编排工具。

4.1 gather —— 并发执行,按序返回

async def demo_gather():
    """最常用的并发工具"""
    results = await asyncio.gather(
        fetch_data("源1", 3),   # 最慢的
        fetch_data("源2", 0.5), # 最快的
        fetch_data("源3", 1),
    )
    # 结果顺序 = 参数顺序,不是完成顺序!
    # results = ["源1_data", "源2_data", "源3_data"]
    return results

# 适用场景:你需要所有结果,且关心结果与请求的对应关系

4.2 wait —— 灵活等待策略

async def demo_wait():
    tasks = [
        asyncio.create_task(fetch_data(f"源{i}", i * 0.5))
        for i in range(5)
    ]

    # 策略1:等待所有完成
    done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)

    # 策略2:等待第一个完成就开始处理
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    first_result = done.pop().result()
    print(f"最快的任务返回了: {first_result}")
    # pending中的任务还在继续运行!

    # 策略3:遇到第一个异常就返回
    done, pending = await asyncio.wait(
        tasks, return_when=asyncio.FIRST_EXCEPTION
    )

4.3 as_completed —— 按完成顺序处理

async def demo_as_completed():
    """谁先完成就先处理谁 —— 流式处理的最佳工具"""
    coros = [fetch_data(f"源{i}", i * 0.8) for i in range(5)]

    for coro in asyncio.as_completed(coros):
        # 谁先完成,这个循环就先把谁的结果给你
        result = await coro
        print(f"收到结果: {result}")
        # 可以立即处理,不用等最慢的那个!

# 实际AI应用:多个模型同时推理,展示最先返回的结果
async def multi_model_inference(prompt: str, models: list):
    """多个模型同时推理,按完成顺序流式返回"""
    async def call_model(model_name: str):
        return await call_llm(prompt, model=model_name)

    for coro in asyncio.as_completed(
        [call_model(m) for m in models]
    ):
        result = await coro
        yield result  # 先完成的先输出

4.4 工具对比决策树

你需要的行为是什么?
├── 等所有完成,按请求顺序拿结果 → 用 gather()
├── 等第一个完成就开始处理       → 用 wait(FIRST_COMPLETED)
├── 边完成边处理,流式输出       → 用 as_completed()
└── 结构化并发(Python 3.11+)  → 用 TaskGroup

5. 协程间通信与并发控制

5.1 asyncio.Queue —— 生产者-消费者模式

async def producer_consumer_demo():
    """智能体中常见的模式:LLM产出任务,工具消费者执行"""
    task_queue = asyncio.Queue(maxsize=10)
    result_queue = asyncio.Queue()

    async def producer():
        """LLM 生成工具调用任务"""
        for i in range(20):
            await task_queue.put(f"task_{i}")
            print(f"产出任务 task_{i}")
            await asyncio.sleep(0.1)  # LLM流式生成中

    async def consumer(worker_id: int):
        """工具执行器"""
        while True:
            task = await task_queue.get()
            if task is None:  # 结束信号
                break
            result = f"{task}_done_by_worker_{worker_id}"
            await result_queue.put(result)
            task_queue.task_done()
            await asyncio.sleep(0.3)  # 模拟工具执行

    # 启动生产者和消费者
    producers = [asyncio.create_task(producer())]
    consumers = [asyncio.create_task(consumer(i)) for i in range(3)]

    # 等待所有任务完成
    await asyncio.gather(*producers)
    await task_queue.join()  # 等待所有任务被处理

    # 发送结束信号
    for _ in consumers:
        await task_queue.put(None)
    await asyncio.gather(*consumers)

5.2 Semaphore —— 控制并发数

async def controlled_concurrency():
    """限制同时进行的请求数量 —— 防止被限流"""
    semaphore = asyncio.Semaphore(5)  # 最多同时5个请求

    async def rate_limited_request(url: str):
        async with semaphore:  # 获取信号量
            return await fetch_url(url)
        # 离开with块时自动释放信号量

    # 即使有100个URL,同一时间最多5个并发请求
    urls = [f"https://api.example.com/endpoint_{i}" for i in range(100)]
    tasks = [rate_limited_request(url) for url in urls]
    results = await asyncio.gather(*tasks)
    return results

5.3 asyncio.Lock —— 保护共享资源

async def shared_resource_demo():
    """多个协程操作同一资源时确保线程安全"""
    lock = asyncio.Lock()
    shared_data = {"count": 0}

    async def safe_increment():
        async with lock:  # 同一时间只有一个协程能进入
            current = shared_data["count"]
            await asyncio.sleep(0.01)  # 模拟计算
            shared_data["count"] = current + 1

    # 100个协程同时+1,结果正确为100
    await asyncio.gather(*[safe_increment() for _ in range(100)])
    print(shared_data["count"])  # 100

6. 实战:异步HTTP请求封装

6.1 基础异步请求(aiohttp)

import aiohttp
import asyncio

async def fetch_json(url: str) -> dict:
    """异步获取JSON数据"""
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()

# 推荐使用 httpx(同时支持同步和异步,API更现代)
import httpx

async def fetch_with_httpx(url: str) -> dict:
    async with httpx.AsyncClient() as client:
        response = await client.get(url, timeout=10.0)
        response.raise_for_status()
        return response.json()

6.2 完整的AI API调用客户端

import httpx
import asyncio
from typing import Optional

class AsyncAPIClient:
    """异步API客户端 —— 用于并发调用LLM接口"""

    def __init__(
        self,
        base_url: str,
        api_key: str,
        max_concurrent: int = 10,
        timeout: float = 30.0,
    ):
        self.base_url = base_url
        self.api_key = api_key
        self._semaphore = asyncio.Semaphore(max_concurrent)
        self._timeout = timeout
        self._client: Optional[httpx.AsyncClient] = None

    async def __aenter__(self):
        """异步上下文管理器入口"""
        self._client = httpx.AsyncClient(
            base_url=self.base_url,
            timeout=self._timeout,
            headers={
                "Authorization": f"Bearer {self.api_key}",
                "Content-Type": "application/json",
            },
        )
        return self

    async def __aexit__(self, *args):
        """异步上下文管理器出口"""
        if self._client:
            await self._client.aclose()

    async def chat_completion(
        self, messages: list, model: str = "gpt-4", **kwargs
    ) -> dict:
        """调用Chat Completions API(带并发控制)"""
        async with self._semaphore:
            for attempt in range(3):  # 最多重试3次
                try:
                    response = await self._client.post(
                        "/v1/chat/completions",
                        json={
                            "model": model,
                            "messages": messages,
                            **kwargs,
                        },
                    )
                    response.raise_for_status()
                    return response.json()
                except httpx.HTTPStatusError as e:
                    if e.response.status_code >= 500:
                        # 服务器错误,指数退避重试
                        await asyncio.sleep(2 ** attempt)
                        continue
                    raise
                except httpx.TimeoutException:
                    if attempt == 2:
                        raise
                    await asyncio.sleep(1)
            raise Exception("重试耗尽")

    async def batch_completion(
        self, prompts: list[list[dict]], model: str = "gpt-4"
    ) -> list[dict]:
        """批量并发调用LLM"""
        tasks = [
            self.chat_completion(messages, model=model)
            for messages in prompts
        ]
        return await asyncio.gather(*tasks, return_exceptions=True)

# 使用方式
async def main():
    async with AsyncAPIClient(
        base_url="https://api.openai.com",
        api_key="sk-xxx",
        max_concurrent=5,
    ) as client:
        # 单个请求
        result = await client.chat_completion(
            messages=[{"role": "user", "content": "Hello"}]
        )

        # 批量并发(5个请求,最多同时5个)
        prompts = [
            [{"role": "user", "content": f"问题{i}"}]
            for i in range(10)
        ]
        results = await client.batch_completion(prompts)
        return results

# asyncio.run(main())

7. 异步编程最佳实践

7.1 DO:应该做的事

# ✅ 使用 asyncio.run() 作为入口
if __name__ == "__main__":
    asyncio.run(main())

# ✅ 使用 Semaphore 控制并发数
sem = asyncio.Semaphore(5)

# ✅ 设置超时
result = await asyncio.wait_for(long_task(), timeout=10.0)

# ✅ 用 return_exceptions=True 防止部分失败影响整体
results = await asyncio.gather(*tasks, return_exceptions=True)

# ✅ 使用异步上下文管理器管理资源
async with httpx.AsyncClient() as client:
    ...

# ✅ 流式响应用 async for
async for chunk in stream_response():
    process(chunk)

7.2 DON’T:不应做的事

# ❌ 在异步函数中使用 time.sleep —— 会阻塞事件循环!
async def bad():
    time.sleep(1)      # 阻塞整个线程!
    await asyncio.sleep(1)  # ✅ 这才是正确的

# ❌ 在异步函数中使用 requests —— 没有异步版本
async def bad():
    import requests
    requests.get(url)  # 阻塞!
    # ✅ 用 httpx.AsyncClient 或 aiohttp

# ❌ 忘记 await
async def bad():
    asyncio.sleep(1)   # 返回一个coroutine对象,但没有执行!
    await asyncio.sleep(1)  # ✅ 正确

# ❌ 循环中逐个 await(串行执行)
async def bad():
    for url in urls:
        result = await fetch(url)  # 一个接一个
    # ✅ 用 gather/as_completed 并发

# ❌ 不处理 CancelledError
async def bad():
    try:
        while True:
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("清理资源...")  # ✅ 正确处理后重新抛出或return
        raise

7.3 异步编程检查清单

  • 所有 I/O 操作都使用异步版本(aiohttp / httpx.AsyncClient
  • 永远不要用 time.sleep(),用 await asyncio.sleep()
  • 并发任务用 gather() 而非循环 await
  • Semaphore 限制并发数防止限流
  • 所有网络请求都设置超时
  • return_exceptions=True 做容错处理
  • 使用 async with 管理异步资源

8. 总结与下一步

本篇核心知识点

异步编程知识体系
├── 核心概念:事件循环、协程、非阻塞I/O
├── 基础语法:async def / await / asyncio.run()
├── 任务编排:gather() / wait() / as_completed() / TaskGroup
├── 并发控制:Semaphore / Lock / Queue
├── HTTP实践:aiohttp / httpx.AsyncClient
└── 最佳实践:7要6不要

常用 asyncio API 速查

API 用途 签名
asyncio.run(coro) 运行协程入口 run(main())
asyncio.create_task(coro) 创建Task立即执行 task = create_task(coro)
asyncio.gather(*tasks) 并发执行,按序返回 results = await gather(*tasks)
asyncio.wait(tasks) 灵活等待策略 done, pending = await wait(tasks)
asyncio.wait_for(coro, timeout) 超时控制 result = await wait_for(coro, 10)
asyncio.as_completed(coros) 按完成顺序迭代 for coro in as_completed(coros)
asyncio.sleep(delay) 异步等待 await sleep(1)
asyncio.Semaphore(n) 并发数控制 async with Semaphore(5)
asyncio.Queue(maxsize) 协程间通信 await queue.put(item)
asyncio.Lock() 互斥锁 async with lock

下一步

掌握了异步编程,你的智能体已经能从"慢悠悠的串行"变成"火力全开的并发"。下一篇文章中,我们将学习AI开发者最常用的4个Python第三方库——requests、httpx、pydantic、dataclasses,它们会让你的代码更简洁、更健壮。

上一篇学AI,懂这些Python就够了(一):变量·数据类型·控制流·函数·类 —— 30分钟搞定Python核心语法

下一篇学AI,懂这些Python就够了(三):requests·httpx·pydantic·dataclasses —— AI开发者必装的4个Python库


本文是「学AI,懂这些Python就够了」系列的第2篇。异步编程是区分"能写代码"和"能写高效代码"的分水岭,多写多练,把 gather 和 Semaphore 变成肌肉记忆。

Logo

这里是“一人公司”的成长家园。我们提供从产品曝光、技术变现到法律财税的全栈内容,并连接云服务、办公空间等稀缺资源,助你专注创造,无忧运营。

更多推荐