通过并行执行提高工具性能¶
从 Agent Development Kit (ADK) 版本 1.10.0 开始,框架尝试并行运行任何智能体请求的函数工具。此行为可以显著提高你的智能体的性能和响应性,特别是对于依赖多个外部 API 或长时间运行任务的智能体。例如,如果你有 3 个工具,每个需要 2 秒,通过并行运行它们,总执行时间将接近 2 秒,而不是 6 秒。并行运行工具函数的能力可以改善你的智能体的性能,特别是在以下场景中:
- 研究任务: 智能体在进入工作流的下一阶段之前从多个来源收集信息。
- API 调用: 智能体独立访问多个 API,例如使用来自多个航空公司的 API 搜索可用航班。
- 发布和通信任务: 当智能体需要通过多个独立渠道或多个接收者发布或通信时。
但是,你的自定义工具必须构建为支持异步执行才能启用此性能改进。本指南解释了并行工具执行在 ADK 中的工作原理以及如何构建你的工具以充分利用此处理功能。
Warning
任何在工具函数调用集中使用同步处理的 ADK 工具都会阻止其他工具并行执行,即使其他工具允许并行执行。
构建并行就绪工具¶
通过将工具函数定义为异步函数来启用并行执行。在 Python 代码中,这意味着使用 async def
和 await
语法,这允许 ADK 在 asyncio
事件循环中并发运行它们。以下部分显示了为并行处理和异步操作构建的智能体工具示例。
HTTP 网络调用示例¶
以下代码示例显示如何修改 get_weather()
函数以异步操作并允许并行执行:
async def get_weather(city: str) -> dict:
async with aiohttp.ClientSession() as session:
async with session.get(f"http://api.weather.com/{city}") as response:
return await response.json()
数据库调用示例¶
以下代码示例显示如何编写数据库调用函数以异步操作:
async def query_database(query: str) -> list:
async with asyncpg.connect("postgresql://...") as conn:
return await conn.fetch(query)
长时间循环的 yielding 行为示例¶
在工具处理多个请求或大量长时间运行请求的情况下,考虑添加 yielding 代码以允许其他工具执行,如下面的代码示例所示:
async def process_data(data: list) -> dict:
results = []
for i, item in enumerate(data):
processed = await process_item(item) # 产生点
results.append(processed)
# 为长时间循环添加定期产生点
if i % 100 == 0:
await asyncio.sleep(0) # 产生控制权
return {"results": results}
重要
使用 asyncio.sleep()
函数进行暂停以避免阻塞其他函数的执行。
密集操作的线程池示例¶
在执行处理密集型函数时,考虑创建线程池以更好地管理可用计算资源,如下面的示例所示:
async def cpu_intensive_tool(data: list) -> dict:
loop = asyncio.get_event_loop()
# 对 CPU 密集型工作使用线程池
with ThreadPoolExecutor() as executor:
result = await loop.run_in_executor(
executor,
expensive_computation,
data
)
return {"result": result}
进程分块示例¶
在对长列表或大量数据执行进程时,考虑将线程池技术与将处理分成数据块相结合,并在块之间产生处理时间,如下面的示例所示:
async def process_large_dataset(dataset: list) -> dict:
results = []
chunk_size = 1000
for i in range(0, len(dataset), chunk_size):
chunk = dataset[i:i + chunk_size]
# 在线程池中处理块
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
chunk_result = await loop.run_in_executor(
executor, process_chunk, chunk
)
results.extend(chunk_result)
# 在块之间产生控制权
await asyncio.sleep(0)
return {"total_processed": len(results), "results": results}
编写并行就绪提示和工具描述¶
在构建 AI 模型的提示时,考虑明确指定或暗示并行进行函数调用。以下 AI 提示示例指导模型并行使用工具:
当用户请求多条信息时,始终并行调用函数。
示例:
- "获取伦敦的天气和美元兑欧元汇率" → 同时调用两个函数
- "比较城市 A 和 B" → 并行调用 get_weather、get_population、get_distance
- "分析多只股票" → 并行调用每只股票的 get_stock_price
始终优先选择多个特定函数调用而不是单个复杂调用。
以下示例显示了暗示通过并行执行更高效使用的工具函数描述:
async def get_weather(city: str) -> dict:
"""获取单个城市的当前天气。
此函数针对并行执行进行了优化 - 为不同城市多次调用。
Args:
city: 城市名称,例如:'London'、'New York'
Returns:
天气数据,包括温度、条件、湿度
"""
await asyncio.sleep(2) # 模拟 API 调用
return {"city": city, "temp": 72, "condition": "sunny"}
后续步骤¶
有关为智能体构建工具和函数调用的更多信息,请参阅函数工具。有关利用并行处理的工具的更详细示例,请参阅 adk-python 存储库中的示例。