Skip to content

通过并行执行提高工具性能

ADK 已支持Python v1.10.0

从适用于 Python 的智能体开发工具包 (ADK) v1.10.0 版本开始,框架会自动尝试并行运行智能体请求的所有函数工具 (Function Tools)

这种并行机制可以显著提升智能体的性能和响应速度,特别是在智能体需要依赖多个外部 API 或执行长时间任务的情况下。例如,如果你有 3 个工具,每个工具耗时 2 秒,通过并行运行,总执行时间将接近 2 秒,而不是累积的 6 秒。

并行执行在以下场景中效果尤为显著:

  • 研究任务:智能体在进入下一阶段工作流之前,需要从多个来源(如 Google 搜索、财报文件、内网数据库)同时收集信息。
  • API 聚合调用:智能体需要独立访问多个 API(例如,同时对比多家航空公司的机票价格)。
  • 分发与通信:当智能体需要同时向多个接收者或通过多个独立渠道发送通知时。

为了启用这一性能改进,你的自定义工具必须构建为支持异步执行。本指南将解释并行工具执行在 ADK 中的工作原理,以及如何构建工具以充分利用这一处理能力。

注意

如果在工具调用集中有任何一个工具使用了同步处理,它将会阻塞其他工具的并行执行,导致整个批次的性能下降。


构建并行就绪工具

要启用并行执行,你需要将工具函数定义为异步函数。在 Python 中,这意味着使用 async defawait 语法,这使得 ADK 能够在 asyncio 事件循环中并发运行它们。

以下是一些针对并行处理和异步操作优化的工具示例:

HTTP 网络调用示例

下面的代码演示了如何使用 aiohttp 修改 get_weather() 函数以支持异步操作:

async def get_weather(city: str) -> dict:
    """获取指定城市的天气。"""
    async with aiohttp.ClientSession() as session:
        async with session.get(f"http://api.weather.com/{city}") as response:
            return await response.json()

数据库调用示例

你可以编写异步数据库调用函数,避免 IO 阻塞:

async def query_database(query: str) -> list:
    """异步执行数据库查询。"""
    async with asyncpg.connect("postgresql://...") as conn:
        return await conn.fetch(query)

长时间循环的控制权释放 (Yielding)

如果工具需要处理大量循环任务,建议主动释放控制权,以允许其他工具执行:

async def process_data(data: list) -> dict:
    results = []
    for i, item in enumerate(data):
        processed = await process_item(item)  # 异步等待点
        results.append(processed)

        # 针对长循环,定期释放事件循环控制权
        if i % 100 == 0:
            await asyncio.sleep(0)  
    return {"results": results}

Tip

使用 await asyncio.sleep(0) 可以在不产生实际延迟的情况下,将控制权交还给事件循环,从而避免单任务独占 CPU。

密集型操作的线程池

对于计算密集型(CPU Bound)函数,建议使用线程池来管理计算资源,避免阻塞异步主线程:

async def cpu_intensive_tool(data: list) -> dict:
    loop = asyncio.get_event_loop()

    # 使用线程池处理 CPU 密集型工作
    with ThreadPoolExecutor() as executor:
        result = await loop.run_in_executor(
            executor,
            expensive_computation, # 同步计算函数
            data
        )
    return {"result": result}

进程分块处理

处理海量数据时,可以将线程池技术与数据分块结合:

async def process_large_dataset(dataset: list) -> dict:
    results = []
    chunk_size = 1000

    for i in range(0, len(dataset), chunk_size):
        chunk = dataset[i:i + chunk_size]

        # 分块在线程池中处理
        loop = asyncio.get_event_loop()
        with ThreadPoolExecutor() as executor:
            chunk_result = await loop.run_in_executor(
                executor, process_chunk, chunk
            )

        results.extend(chunk_result)

        # 块处理间隙释放控制权
        await asyncio.sleep(0)

    return {"total_processed": len(results), "results": results}

编写并行就绪提示词和工具描述

在辅助 AI 模型进行决策时,你可以在提示词(Prompts)中显式暗示或引导其并行发出调用:

建议的提示词示例:

当用户请求多项信息时,请尽可能并行调用函数。

示例场景:
- 用户问:“获取伦敦的天气和美元对欧元的汇率” → 同时调用 get_weather 和 get_exchange_rate。
- 用户问:“对比城市 A 和 B” → 并行调用 get_weather, get_population, get_distance。
- 用户问:“分析多只股票” → 为每只股票并行调用 get_stock_price。

优先选择多个专注的特定函数调用,而非单个复杂的全能函数。

此外,在工具描述中明确指出支持并行也会有所帮助:

async def get_weather(city: str) -> dict:
    """获取单个城市的实时天气。

    此工具已针对并行执行优化。你可以同时针对不同城市进行多次调用。

    Args:
        city: 城市名称,如 'London'、'New York'。

    Returns:
        包含温度、天气状况、湿度的字典。
    """
    await asyncio.sleep(2)  # 模拟 API 耗时
    return {"city": city, "temp": 72, "condition": "sunny"}

后续步骤