通过并行执行提高工具性能¶
从适用于 Python 的智能体开发工具包 (ADK) v1.10.0 版本开始,框架会自动尝试并行运行智能体请求的所有函数工具 (Function Tools)。
这种并行机制可以显著提升智能体的性能和响应速度,特别是在智能体需要依赖多个外部 API 或执行长时间任务的情况下。例如,如果你有 3 个工具,每个工具耗时 2 秒,通过并行运行,总执行时间将接近 2 秒,而不是累积的 6 秒。
并行执行在以下场景中效果尤为显著:
- 研究任务:智能体在进入下一阶段工作流之前,需要从多个来源(如 Google 搜索、财报文件、内网数据库)同时收集信息。
- API 聚合调用:智能体需要独立访问多个 API(例如,同时对比多家航空公司的机票价格)。
- 分发与通信:当智能体需要同时向多个接收者或通过多个独立渠道发送通知时。
为了启用这一性能改进,你的自定义工具必须构建为支持异步执行。本指南将解释并行工具执行在 ADK 中的工作原理,以及如何构建工具以充分利用这一处理能力。
注意
如果在工具调用集中有任何一个工具使用了同步处理,它将会阻塞其他工具的并行执行,导致整个批次的性能下降。
构建并行就绪工具¶
要启用并行执行,你需要将工具函数定义为异步函数。在 Python 中,这意味着使用 async def 和 await 语法,这使得 ADK 能够在 asyncio 事件循环中并发运行它们。
以下是一些针对并行处理和异步操作优化的工具示例:
HTTP 网络调用示例¶
下面的代码演示了如何使用 aiohttp 修改 get_weather() 函数以支持异步操作:
async def get_weather(city: str) -> dict:
"""获取指定城市的天气。"""
async with aiohttp.ClientSession() as session:
async with session.get(f"http://api.weather.com/{city}") as response:
return await response.json()
数据库调用示例¶
你可以编写异步数据库调用函数,避免 IO 阻塞:
async def query_database(query: str) -> list:
"""异步执行数据库查询。"""
async with asyncpg.connect("postgresql://...") as conn:
return await conn.fetch(query)
长时间循环的控制权释放 (Yielding)¶
如果工具需要处理大量循环任务,建议主动释放控制权,以允许其他工具执行:
async def process_data(data: list) -> dict:
results = []
for i, item in enumerate(data):
processed = await process_item(item) # 异步等待点
results.append(processed)
# 针对长循环,定期释放事件循环控制权
if i % 100 == 0:
await asyncio.sleep(0)
return {"results": results}
Tip
使用 await asyncio.sleep(0) 可以在不产生实际延迟的情况下,将控制权交还给事件循环,从而避免单任务独占 CPU。
密集型操作的线程池¶
对于计算密集型(CPU Bound)函数,建议使用线程池来管理计算资源,避免阻塞异步主线程:
async def cpu_intensive_tool(data: list) -> dict:
loop = asyncio.get_event_loop()
# 使用线程池处理 CPU 密集型工作
with ThreadPoolExecutor() as executor:
result = await loop.run_in_executor(
executor,
expensive_computation, # 同步计算函数
data
)
return {"result": result}
进程分块处理¶
处理海量数据时,可以将线程池技术与数据分块结合:
async def process_large_dataset(dataset: list) -> dict:
results = []
chunk_size = 1000
for i in range(0, len(dataset), chunk_size):
chunk = dataset[i:i + chunk_size]
# 分块在线程池中处理
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
chunk_result = await loop.run_in_executor(
executor, process_chunk, chunk
)
results.extend(chunk_result)
# 块处理间隙释放控制权
await asyncio.sleep(0)
return {"total_processed": len(results), "results": results}
编写并行就绪提示词和工具描述¶
在辅助 AI 模型进行决策时,你可以在提示词(Prompts)中显式暗示或引导其并行发出调用:
建议的提示词示例:
当用户请求多项信息时,请尽可能并行调用函数。
示例场景:
- 用户问:“获取伦敦的天气和美元对欧元的汇率” → 同时调用 get_weather 和 get_exchange_rate。
- 用户问:“对比城市 A 和 B” → 并行调用 get_weather, get_population, get_distance。
- 用户问:“分析多只股票” → 为每只股票并行调用 get_stock_price。
优先选择多个专注的特定函数调用,而非单个复杂的全能函数。
此外,在工具描述中明确指出支持并行也会有所帮助:
async def get_weather(city: str) -> dict:
"""获取单个城市的实时天气。
此工具已针对并行执行优化。你可以同时针对不同城市进行多次调用。
Args:
city: 城市名称,如 'London'、'New York'。
Returns:
包含温度、天气状况、湿度的字典。
"""
await asyncio.sleep(2) # 模拟 API 耗时
return {"city": city, "temp": 72, "condition": "sunny"}
后续步骤¶
- 如需了解构建智能体工具的更多基础知识,请参阅函数工具指南。
- 如需查看利用并行处理的详细代码示例,请访问 adk-python 示例库。