使用 ADK 构建你的第一个智能体团队:一个渐进式天气机器人¶
本教程基于 Agent Development Kit 的快速入门示例扩展而来。现在,你可以更深入地探索,并构建一个更复杂的多智能体系统。
我们将着手构建一个天气机器人智能体团队,在简单的基础上逐步添加高级功能。从一个可以查询天气的单个智能体开始,我们将逐步添加以下功能:
- 利用不同的 AI 模型(Gemini、GPT、Claude)
- 为不同任务设计专门的子智能体(如问候和告别)
- 实现智能体之间的智能委托
- 使用持久会话状态为智能体提供记忆能力
- 通过回调实现关键的安全防护机制
为什么选择天气机器人团队?
这个看似简单的用例提供了一个实用且易于理解的场景,用于探索构建复杂、真实世界智能体应用所需的核心 ADK 概念。你将学习如何构建交互结构、管理状态、确保安全性,以及协调多个协同工作的 AI "大脑"。
再说一下什么是 ADK?
提醒一下,ADK 是一个 Python 框架,旨在简化由大型语言模型(LLM)驱动的应用程序的开发。它为创建能够推理、计划、使用工具、与用户动态交互以及在团队中有效协作的智能体提供了强大的构建模块。
在这个高级教程中,你将掌握:
- ✅ 工具定义与使用:创建 Python 函数(
工具)赋予智能体特定能力(如获取数据),并指导智能体如何有效使用它们。 - ✅ 多 LLM 灵活性:通过 LiteLLM 集成配置智能体使用各种领先的 LLM(Gemini、GPT-4o、Claude Sonnet),让你为每个任务选择最佳模型。
- ✅ 智能体委托与协作:设计专门的子智能体,并实现用户请求的自动路由(
自动流程)到团队中最合适的智能体。 - ✅ 会话状态记忆:利用
会话状态和工具上下文使智能体能够记住对话轮次之间的信息,实现更具上下文的交互。 - ✅ 使用回调实现安全防护:实现
before_model_callback和before_tool_callback,根据预定义规则检查、修改或阻止请求/工具使用,增强应用程序的安全性和控制。
最终预期:
完成本教程后,你将构建一个功能齐全的多智能体天气机器人系统。这个系统不仅能提供天气信息,还能处理对话礼仪,记住最后查询的城市,并在定义的安全边界内运行,所有这些都由 ADK 编排。
前提条件:
- ✅ 扎实的 Python 编程理解
- ✅ 熟悉大型语言模型(LLM)、API 和智能体概念
- ❗ 至关重要:完成 ADK 快速入门教程或具备 ADK 基础知识(Agent、Runner、SessionService、基本工具使用)。本教程直接建立在这些概念之上。
- ✅ 你计划使用的 LLM 的 API 密钥(例如,Google AI Studio 用于 Gemini、OpenAI Platform、Anthropic Console)
关于执行环境的说明:
本教程专为 Google Colab、Colab Enterprise 或 Jupyter 笔记本等交互式笔记本环境而设计。请注意以下几点:
- 运行异步代码:笔记本环境处理异步代码的方式不同。你会看到使用
await的示例(适用于事件循环已经运行的情况,常见于笔记本中)或asyncio.run()(在作为独立.py脚本运行或在特定笔记本设置中通常需要)。代码块为两种场景提供指导。 - 手动 Runner/Session 设置:这些步骤涉及显式创建
Runner和SessionService实例。之所以展示这种方法,是因为它为你提供了对智能体执行生命周期、会话管理和状态持久性的细粒度控制。
替代方案:使用 ADK 的内置工具(Web UI / CLI / API 服务器)
如果你更喜欢使用 ADK 标准工具自动处理运行器和会话管理的设置,你可以在这里找到为此目的构建的等效代码。该版本设计为可以直接使用 adk web(用于 Web UI)、adk run(用于 CLI 交互)或 adk api_server(暴露 API)等命令运行。请遵循该替代资源中提供的 README.md 说明。
准备好构建你的智能体团队了吗?让我们开始吧!
注意:本教程适用于 adk 1.0.0 及更高版本
# @title 步骤 0:设置和安装
# 安装 ADK 和 LiteLLM 以支持多模型
!pip install google-adk -q
!pip install litellm -q
print("安装完成。")
# @title 导入必要的库
import os
import asyncio
from google.adk.agents import Agent
from google.adk.models.lite_llm import LiteLlm # 用于多模型支持
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.genai import types # 用于创建消息 Content/Parts
import warnings
# 忽略所有警告
warnings.filterwarnings("ignore")
import logging
logging.basicConfig(level=logging.ERROR)
print("库已导入。")
# @title 配置 API 密钥(替换为你的实际密钥!)
# --- 重要:用你的真实 API 密钥替换占位符 ---
# Gemini API 密钥(从 Google AI Studio 获取:https://aistudio.google.com/app/apikey)
os.environ["GOOGLE_API_KEY"] = "YOUR_GOOGLE_API_KEY" # <--- 替换
# [可选]
# OpenAI API 密钥(从 OpenAI Platform 获取:https://platform.openai.com/api-keys)
os.environ['OPENAI_API_KEY'] = 'YOUR_OPENAI_API_KEY' # <--- 替换
# [可选]
# Anthropic API 密钥(从 Anthropic Console 获取:https://console.anthropic.com/settings/keys)
os.environ['ANTHROPIC_API_KEY'] = 'YOUR_ANTHROPIC_API_KEY' # <--- 替换
# --- 验证密钥(可选检查)---
print("API 密钥设置:")
print(f"Google API 密钥已设置:{'Yes' if os.environ.get('GOOGLE_API_KEY') and os.environ['GOOGLE_API_KEY'] != 'YOUR_GOOGLE_API_KEY' else 'No (替换占位符!)'}")
print(f"OpenAI API 密钥已设置:{'Yes' if os.environ.get('OPENAI_API_KEY') and os.environ['OPENAI_API_KEY'] != 'YOUR_OPENAI_API_KEY' else 'No (替换占位符!)'}")
print(f"Anthropic API 密钥已设置:{'Yes' if os.environ.get('ANTHROPIC_API_KEY') and os.environ['ANTHROPIC_API_KEY'] != 'YOUR_ANTHROPIC_API_KEY' else 'No (替换占位符!)'}")
# 配置 ADK 直接使用 API 密钥(本多模型设置不使用 Vertex AI)
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "False"
# @markdown **安全提示:**最佳做法是安全地管理 API 密钥(例如,使用 Colab Secrets 或环境变量),而不是直接在笔记本中硬编码。替换上面的占位符字符串。
# --- 定义模型常量以便于使用 ---
# 更多支持的模型可以在这里参考:https://ai.google.dev/gemini-api/docs/models#model-variations
MODEL_GEMINI_2_5_FLASH = "gemini-2.5-flash"
# 更多支持的模型可以在这里参考:https://docs.litellm.ai/docs/providers/openai#openai-chat-completion-models
MODEL_GPT_4O = "openai/gpt-4.1" # 你也可以尝试:gpt-4.1-mini、gpt-4o 等
# 更多支持的模型可以在这里参考:https://docs.litellm.ai/docs/providers/anthropic
MODEL_CLAUDE_SONNET = "anthropic/claude-sonnet-4-20250514" # 你也可以尝试:claude-opus-4-20250514、claude-3-7-sonnet-20250219 等
print("\n环境已配置。")
步骤 1:你的第一个智能体 - 基本天气查询¶
让我们首先构建天气机器人的基础组件:一个能够执行特定任务——查询天气信息的单个智能体。这涉及创建两个核心部分:
- 工具:一个 Python 函数,赋予智能体获取天气数据的能力。
- 智能体:理解用户请求,知道它拥有天气工具,并决定何时以及如何使用它的 AI "大脑"。
1. 定义工具(get_weather)¶
在 ADK 中,工具是赋予智能体超越纯文本生成能力的构建块。它们通常是执行特定操作的常规 Python 函数,如调用 API、查询数据库或执行计算。
我们的第一个工具将提供模拟天气报告。这使我们能够专注于智能体结构,而无需暂时使用外部 API 密钥。之后,你可以轻松地将这个模拟函数替换为调用真实天气服务的函数。
关键概念:文档字符串至关重要!智能体的 LLM 严重依赖函数的文档字符串来理解:
- 工具做什么
- 何时使用它
- 它需要什么参数(
city: str) - 它返回什么信息
最佳实践:为你的工具编写清晰、描述性和准确的文档字符串。这对于 LLM 正确使用工具至关重要。
# @title 定义 get_weather 工具
def get_weather(city: str) -> dict:
"""获取指定城市的当前天气报告。
Args:
city (str): 城市名称(例如,"New York"、"London"、"Tokyo")。
Returns:
dict: 包含天气信息的字典。
包含一个 'status' 键('success' 或 'error')。
如果是 'success',包含 'report' 键与天气详情。
如果是 'error',包含 'error_message' 键。
"""
print(f"--- 工具:get_weather 被调用,城市:{city} ---") # 记录工具执行
city_normalized = city.lower().replace(" ", "") # 基本标准化
# 模拟天气数据
mock_weather_db = {
"newyork": {"status": "success", "report": "纽约的天气是晴朗的,温度为 25°C。"},
"london": {"status": "success", "report": "伦敦多云,温度为 15°C。"},
"tokyo": {"status": "success", "report": "东京有小雨,温度为 18°C。"},
}
if city_normalized in mock_weather_db:
return mock_weather_db[city_normalized]
else:
return {"status": "error", "error_message": f"抱歉,我没有 '{city}' 的天气信息。"}
# 工具使用示例(可选测试)
print(get_weather("New York"))
print(get_weather("Paris"))
2. 定义智能体(weather_agent)¶
现在,让我们创建智能体本身。ADK 中的 Agent 编排用户、LLM 和可用工具之间的交互。
我们用几个关键参数配置它:
name:此智能体的唯一标识符(例如,"weather_agent_v1")。model:指定使用哪个 LLM(例如,MODEL_GEMINI_2_5_FLASH)。我们将从特定的 Gemini 模型开始。description:智能体总体目的的简洁摘要。当其他智能体需要决定是否将任务委派给这个智能体时,这一点变得至关重要。instruction:为 LLM 提供详细指导,包括其行为方式、角色、目标,特别是如何以及何时使用其分配的工具。tools:包含智能体被允许使用的实际 Python 工具函数的列表(例如,[get_weather])。
最佳实践:提供清晰具体的instruction提示。指令越详细,LLM 就越能理解其角色以及如何有效使用其工具。如果需要,明确说明错误处理。
最佳实践:选择描述性的name和description值。这些在 ADK 内部使用,对于自动委托等功能(稍后介绍)至关重要。
# @title 定义天气智能体
# 使用前面定义的模型常量之一
AGENT_MODEL = MODEL_GEMINI_2_5_FLASH # 从 Gemini 开始
weather_agent = Agent(
name="weather_agent_v1",
model=AGENT_MODEL, # 可以是 Gemini 的字符串或 LiteLlm 对象
description="为特定城市提供天气信息。",
instruction="你是一个有用的天气助手。"
"当用户询问特定城市的天气时,"
"使用 'get_weather' 工具查找信息。"
"如果工具返回错误,礼貌地告知用户。"
"如果工具成功,清晰地呈现天气报告。",
tools=[get_weather], # 直接传递函数
)
print(f"智能体 '{weather_agent.name}' 已使用模型 '{AGENT_MODEL}' 创建。")
3. 设置 Runner 和 Session Service¶
为了管理对话和执行智能体,我们需要两个更多的组件:
SessionService:负责管理不同用户和会话的对话历史和状态。InMemorySessionService是一个简单的实现,将所有内容存储在内存中,适用于测试和简单的应用程序。它跟踪交换的消息。我们将在步骤 4 中更多地探索状态持久性。Runner:引擎负责编排交互流程。它接受用户输入,将其路由到适当的智能体,根据智能体的逻辑管理对 LLM 和工具的调用,通过SessionService处理会话更新,并生成表示交互进展的事件。
# @title 设置 Session Service 和 Runner
# --- 会话管理 ---
# 关键概念:SessionService 存储对话历史和状态。
# InMemorySessionService 是本教程的简单、非持久存储。
session_service = InMemorySessionService()
# 定义用于标识交互上下文的常量
APP_NAME = "weather_tutorial_app"
USER_ID = "user_1"
SESSION_ID = "session_001" # 为简单起见使用固定 ID
# 创建将发生对话的特定会话
session = await session_service.create_session(
app_name=APP_NAME,
user_id=USER_ID,
session_id=SESSION_ID
)
print(f"会话已创建:App='{APP_NAME}',User='{USER_ID}',Session='{SESSION_ID}'")
# --- 或者 ---
# 如果作为标准 Python 脚本(.py 文件)运行,请取消注释以下行:
# async def init_session(app_name:str,user_id:str,session_id:str) -> InMemorySessionService:
# session = await session_service.create_session(
# app_name=app_name,
# user_id=user_id,
# session_id=session_id
# )
# print(f"会话已创建:App='{app_name}',User='{user_id}',Session='{session_id}'")
# return session
#
# session = asyncio.run(init_session(APP_NAME,USER_ID,SESSION_ID))
# --- Runner ---
# 关键概念:Runner 编排智能体执行循环。
runner = Runner(
agent=weather_agent, # 我们要运行的智能体
app_name=APP_NAME, # 将运行与我们的应用关联
session_service=session_service # 使用我们的会话管理器
)
print(f"已为智能体 '{runner.agent.name}' 创建 Runner。")
4. 与智能体交互¶
我们需要一种方法向智能体发送消息并接收其响应。由于 LLM 调用和工具执行可能需要时间,ADK 的 Runner 是异步运行的。
我们将定义一个 async 辅助函数(call_agent_async),它:
- 接受用户查询字符串。
- 将其打包到 ADK
Content格式中。 - 调用
runner.run_async,提供用户/会话上下文和新的消息。 - 遍历运行器产生的事件。事件表示智能体执行步骤(例如,工具调用请求、工具结果接收、中间 LLM 思考、最终响应)。
- 使用
event.is_final_response()识别并打印最终响应事件。
为什么是 async?与 LLM 和潜在工具(如外部 API)的交互是 I/O 绑定的操作。使用 asyncio 允许程序高效地处理这些操作,而不会阻塞执行。
# @title 定义智能体交互函数
from google.genai import types # 用于创建消息 Content/Parts
async def call_agent_async(query: str, runner, user_id, session_id):
"""向智能体发送查询并打印最终响应。"""
print(f"\n>>> 用户查询:{query}")
# 以 ADK 格式准备用户的消息
content = types.Content(role='user', parts=[types.Part(text=query)])
final_response_text = "智能体没有产生最终响应。" # 默认值
# 关键概念:run_async 执行智能体逻辑并产生事件。
# 我们遍历事件以找到最终答案。
async for event in runner.run_async(user_id=user_id, session_id=session_id, new_message=content):
# 你可以取消注释下面的行以查看执行期间的*所有*事件
# print(f" [事件] 作者:{event.author},类型:{type(event).__name__},最终:{event.is_final_response()},内容:{event.content}")
# 关键概念:is_final_response() 标记轮次的结束消息。
if event.is_final_response():
if event.content and event.content.parts:
# 假设第一部分中的文本响应
final_response_text = event.content.parts[0].text
elif event.actions and event.actions.escalate: # 处理潜在错误/升级
final_response_text = f"智能体升级:{event.error_message or '无特定消息。'}"
# 如果需要,在这里添加更多检查(例如,特定错误代码)
break # 找到最终响应后停止处理事件
print(f"<<< 智能体响应:{final_response_text}")
5. 运行对话¶
最后,让我们通过发送一些查询来测试我们的设置。我们将我们的 async 调用包装在一个主 async 函数中,并使用 await 运行它。
观察输出:
- 看到用户查询。
- 注意
--- 工具:get_weather 被调用... ---日志,当智能体使用工具时。 - 观察智能体的最终响应,包括它如何处理天气数据不可用的情况(对于巴黎)。
# @title 运行初始对话
# 我们需要一个 async 函数来 await 我们的交互辅助函数
async def run_conversation():
await call_agent_async("伦敦的天气如何?",
runner=runner,
user_id=USER_ID,
session_id=SESSION_ID)
await call_agent_async("巴黎怎么样?",
runner=runner,
user_id=USER_ID,
session_id=SESSION_ID) # 期望工具的错误消息
await call_agent_async("告诉我纽约的天气",
runner=runner,
user_id=USER_ID,
session_id=SESSION_ID)
# 在异步上下文中使用 await 执行对话(如 Colab/Jupyter)
await run_conversation()
# --- 或者 ---
# 如果作为标准 Python 脚本(.py 文件)运行,请取消注释以下行:
# import asyncio
# if __name__ == "__main__":
# try:
# asyncio.run(run_conversation())
# except Exception as e:
# print(f"发生错误:{e}")
恭喜!你已经成功构建并交互了你的第一个 ADK 智能体。它理解用户的请求,使用工具查找信息,并根据工具的结果适当响应。
在下一步中,我们将探索如何轻松切换为这个智能体提供支持的底层语言模型。
步骤 2:使用 LiteLLM 实现多模型支持 [可选]¶
在步骤 1 中,我们构建了一个由特定 Gemini 模型驱动的功能性天气智能体。虽然有效,但现实世界的应用程序通常受益于使用不同大型语言模型(LLM)的灵活性。为什么?
- 性能:某些模型在特定任务上表现出色(例如,编码、推理、创意写作)。
- 成本:不同模型具有不同的价格点。
- 能力:模型提供多样化的功能、上下文窗口大小和微调选项。
- 可用性/冗余:拥有替代方案可确保即使一个提供商遇到问题,你的应用程序仍能正常运行。
ADK 通过其与 LiteLLM 库的集成,使模型之间的切换变得无缝。LiteLLM 作为 100 多个不同 LLM 的统一接口。
在本步骤中,我们将:
- 学习如何使用
LiteLlm包装器配置 ADKAgent以使用来自 OpenAI(GPT)和 Anthropic(Claude)等提供商的模型。 - 定义、配置(使用它们自己的会话和运行器)并立即测试我们的天气智能体的实例,每个实例都由不同的 LLM 支持。
- 与这些不同的智能体交互,观察它们响应中的潜在差异,即使使用相同的底层工具。
1. 导入 LiteLlm¶
我们在初始设置(步骤 0)期间导入了这个,但它是多模型支持的关键组件:
2. 定义和测试多模型智能体¶
我们不再只传递模型名称字符串(默认为 Google 的 Gemini 模型),而是将所需的模型标识符字符串包装在 LiteLlm 类中。
- 关键概念:
LiteLlm包装器:LiteLlm(model="provider/model_name")语法告诉 ADK 通过 LiteLLM 库将此智能体的请求路由到指定的模型提供商。
确保你已在步骤 0 中配置了 OpenAI 和 Anthropic 的必要 API 密钥。我们将使用 call_agent_async 函数(之前定义的,现在接受 runner、user_id 和 session_id)在每个智能体设置后立即与其交互。
下面的每个代码块将:
- 使用特定的 LiteLLM 模型(
MODEL_GPT_4O或MODEL_CLAUDE_SONNET)定义智能体。 - 专门为该智能体的测试运行创建一个新的、独立的
InMemorySessionService和会话。这使本演示的对话历史保持隔离。 - 创建一个为特定智能体及其会话服务配置的
Runner。 - 立即调用
call_agent_async发送查询并测试智能体。
最佳实践:使用模型名称的常量(如步骤 0 中定义的 MODEL_GPT_4O、MODEL_CLAUDE_SONNET)以避免拼写错误并使代码更易于管理。
错误处理:我们将智能体定义包装在 try...except 块中。这可以防止整个代码单元在特定提供商的 API 密钥缺失或无效时失败,允许教程继续使用已配置的模型。
首先,让我们创建并测试使用 OpenAI 的 GPT-4o 的智能体。
# @title 定义和测试 GPT 智能体
# 确保步骤 1 中的 'get_weather' 函数在你的环境中已定义。
# 确保之前定义的 'call_agent_async' 已定义。
# --- 使用 GPT-4o 的智能体 ---
weather_agent_gpt = None # 初始化为 None
runner_gpt = None # 初始化运行器为 None
try:
weather_agent_gpt = Agent(
name="weather_agent_gpt",
# 关键变化:包装 LiteLLM 模型标识符
model=LiteLlm(model=MODEL_GPT_4O),
description="提供天气信息(使用 GPT-4o)。",
instruction="你是一个由 GPT-4o 驱动的有用天气助手。"
"对于城市天气请求使用 'get_weather' 工具。"
"根据工具的输出状态清晰地呈现成功报告或礼貌的错误消息。",
tools=[get_weather], # 重用相同的工具
)
print(f"智能体 '{weather_agent_gpt.name}' 已使用模型 '{MODEL_GPT_4O}' 创建。")
# InMemorySessionService 是本教程的简单、非持久存储。
session_service_gpt = InMemorySessionService() # 创建专用服务
# 定义用于标识交互上下文的常量
APP_NAME_GPT = "weather_tutorial_app_gpt" # 此测试的唯一应用名称
USER_ID_GPT = "user_1_gpt"
SESSION_ID_GPT = "session_001_gpt" # 为简单起见使用固定 ID
# 创建将发生对话的特定会话
session_gpt = await session_service_gpt.create_session(
app_name=APP_NAME_GPT,
user_id=USER_ID_GPT,
session_id=SESSION_ID_GPT
)
print(f"会话已创建:App='{APP_NAME_GPT}',User='{USER_ID_GPT}',Session='{SESSION_ID_GPT}'")
# 创建特定于此智能体及其会话服务的运行器
runner_gpt = Runner(
agent=weather_agent_gpt,
app_name=APP_NAME_GPT, # 使用特定的应用名称
session_service=session_service_gpt # 使用特定的会话服务
)
print(f"已为智能体 '{runner_gpt.agent.name}' 创建 Runner。")
# --- 测试 GPT 智能体 ---
print("\n--- 测试 GPT 智能体 ---")
# 确保 call_agent_async 使用正确的 runner、user_id、session_id
await call_agent_async(query = "东京的天气怎么样?",
runner=runner_gpt,
user_id=USER_ID_GPT,
session_id=SESSION_ID_GPT)
# --- 或者 ---
# 如果作为标准 Python 脚本(.py 文件)运行,请取消注释以下行:
# import asyncio
# if __name__ == "__main__":
# try:
# asyncio.run(call_agent_async(query = "东京的天气怎么样?",
# runner=runner_gpt,
# user_id=USER_ID_GPT,
# session_id=SESSION_ID_GPT)
# except Exception as e:
# print(f"发生错误:{e}")
except Exception as e:
print(f"❌ 无法创建或运行 GPT 智能体 '{MODEL_GPT_4O}'。检查 API 密钥和模型名称。错误:{e}")
接下来,我们将对 Anthropic 的 Claude Sonnet 执行相同的操作。
# @title 定义和测试 Claude 智能体
# 确保步骤 1 中的 'get_weather' 函数在你的环境中已定义。
# 确保之前定义的 'call_agent_async' 已定义。
# --- 使用 Claude Sonnet 的智能体 ---
weather_agent_claude = None # 初始化为 None
runner_claude = None # 初始化运行器为 None
try:
weather_agent_claude = Agent(
name="weather_agent_claude",
# 关键变化:包装 LiteLLM 模型标识符
model=LiteLlm(model=MODEL_CLAUDE_SONNET),
description="提供天气信息(使用 Claude Sonnet)。",
instruction="你是一个由 Claude Sonnet 驱动的有用天气助手。"
"对于城市天气请求使用 'get_weather' 工具。"
"分析工具的字典输出('status'、'report'/'error_message')。"
"清晰地呈现成功报告或礼貌的错误消息。",
tools=[get_weather], # 重用相同的工具
)
print(f"智能体 '{weather_agent_claude.name}' 已使用模型 '{MODEL_CLAUDE_SONNET}' 创建。")
# InMemorySessionService 是本教程的简单、非持久存储。
session_service_claude = InMemorySessionService() # 创建专用服务
# 定义用于标识交互上下文的常量
APP_NAME_CLAUDE = "weather_tutorial_app_claude" # 唯一应用名称
USER_ID_CLAUDE = "user_1_claude"
SESSION_ID_CLAUDE = "session_001_claude" # 为简单起见使用固定 ID
# 创建将发生对话的特定会话
session_claude = await session_service_claude.create_session(
app_name=APP_NAME_CLAUDE,
user_id=USER_ID_CLAUDE,
session_id=SESSION_ID_CLAUDE
)
print(f"会话已创建:App='{APP_NAME_CLAUDE}',User='{USER_ID_CLAUDE}',Session='{SESSION_ID_CLAUDE}'")
# 创建特定于此智能体及其会话服务的运行器
runner_claude = Runner(
agent=weather_agent_claude,
app_name=APP_NAME_CLAUDE, # 使用特定的应用名称
session_service=session_service_claude # 使用特定的会话服务
)
print(f"已为智能体 '{runner_claude.agent.name}' 创建 Runner。")
# --- 测试 Claude 智能体 ---
print("\n--- 测试 Claude 智能体 ---")
# 确保 call_agent_async 使用正确的 runner、user_id、session_id
await call_agent_async(query = "请告诉我伦敦的天气。",
runner=runner_claude,
user_id=USER_ID_CLAUDE,
session_id=SESSION_ID_CLAUDE)
# --- 或者 ---
# 如果作为标准 Python 脚本(.py 文件)运行,请取消注释以下行:
# import asyncio
# if __name__ == "__main__":
# try:
# asyncio.run(call_agent_async(query = "请告诉我伦敦的天气。",
# runner=runner_claude,
# user_id=USER_ID_CLAUDE,
# session_id=SESSION_ID_CLAUDE)
# except Exception as e:
# print(f"发生错误:{e}")
except Exception as e:
print(f"❌ 无法创建或运行 Claude 智能体 '{MODEL_CLAUDE_SONNET}'。检查 API 密钥和模型名称。错误:{e}")
仔细观察两个代码块的输出。你应该看到:
- 每个智能体(
weather_agent_gpt、weather_agent_claude)都成功创建(如果 API 密钥有效)。 - 为每个智能体设置了专用的会话和运行器。
- 每个智能体在处理查询时都正确识别了使用
get_weather工具的需求(你会看到--- 工具:get_weather 被调用... ---日志)。 - 底层工具逻辑保持相同,始终返回我们的模拟数据。
- 然而,每个智能体生成的最终文本响应可能在措辞、语气或格式上略有不同。这是因为指令提示由不同的 LLM(GPT-4o 与 Claude Sonnet)解释和执行。
这一步展示了 ADK + LiteLLM 提供的强大功能和灵活性。你可以轻松地使用各种 LLM 进行实验和部署智能体,同时保持核心应用程序逻辑(工具、基本智能体结构)的一致性。
在下一步中,我们将超越单个智能体,构建一个小团队,智能体可以相互委派任务!
步骤 3:构建智能体团队 - 问候和告别的委托¶
在步骤 1 和 2 中,我们构建并实验了一个专注于天气查询的单个智能体。虽然对其特定任务有效,但现实世界的应用程序通常涉及处理更广泛的用户交互。我们可以继续向单个天气智能体添加更多工具和复杂指令,但这很快会变得难以管理且效率较低。
更稳健的方法是构建一个智能体团队。这涉及:
- 创建多个专门的智能体,每个都设计用于特定能力(例如,一个用于天气,一个用于问候,一个用于计算)。
- 指定一个根智能体(或编排器)来接收初始用户请求。
- 使根智能体能够根据用户的意图将请求委托给最合适的专门子智能体。
为什么要构建智能体团队?
- 模块化:更容易开发、测试和维护各个智能体。
- 专业化:每个智能体都可以针对其特定任务进行微调(指令、模型选择)。
- 可扩展性:通过添加新智能体来添加新功能更简单。
- 效率:允许对更简单的任务(如问候)使用可能更简单/更便宜的模型。
在本步骤中,我们将:
- 定义用于处理问候(
say_hello)和告别(say_goodbye)的简单工具。 - 创建两个新的专门子智能体:
greeting_agent和farewell_agent。 - 更新我们的主要天气智能体(
weather_agent_v2)以充当根智能体。 - 使用其子智能体配置根智能体,启用自动委托。
- 通过向根智能体发送不同类型的请求来测试委托流程。
1. 为子智能体定义工具¶
首先,让我们创建将作为新专家智能体工具的简单 Python 函数。请记住,清晰的文档字符串对于将使用它们的智能体至关重要。
# @title 为问候和告别智能体定义工具
from typing import Optional # 确保导入 Optional
# 如果独立运行此步骤,请确保步骤 1 中的 'get_weather' 可用。
# def get_weather(city: str) -> dict: ... (来自步骤 1)
def say_hello(name: Optional[str] = None) -> str:
"""提供简单的问候。如果提供了名字,将使用它。
Args:
name (str, optional): 要问候的人的名字。如果未提供,则默认为通用问候。
Returns:
str: 友好的问候消息。
"""
if name:
greeting = f"你好,{name}!"
print(f"--- 工具:say_hello 被调用,名字:{name} ---")
else:
greeting = "你好!" # 如果 name 为 None 或未明确传递,则使用默认问候
print(f"--- 工具:say_hello 被调用,没有特定名字(name_arg_value:{name})---")
return greeting
def say_goodbye() -> str:
"""提供简单的告别消息以结束对话。"""
print(f"--- 工具:say_goodbye 被调用 ---")
return "再见!祝你有美好的一天。"
print("问候和告别工具已定义。")
# 可选自测
print(say_hello("Alice"))
print(say_hello()) # 不带参数测试(应使用默认 "你好!")
print(say_hello(name=None)) # 明确将 name 设为 None 测试(应使用默认 "你好!")
2. 定义子智能体(问候和告别)¶
现在,为我们的专家创建 Agent 实例。注意它们高度集中的 instruction 和至关重要的清晰 description。description 是根智能体用来决定何时委托给这些子智能体的主要信息。
最佳实践:子智能体的 description 字段应准确简洁地总结其特定能力。这对于有效的自动委托至关重要。
最佳实践:子智能体的 instruction 字段应针对其有限范围量身定制,准确告诉它们该做什么和不该做什么(例如,"你的唯一任务是...")。
# @title 定义问候和告别子智能体
# 如果你想使用 Gemini 以外的模型,请确保导入 LiteLlm 并设置 API 密钥(来自步骤 0/2)
# from google.adk.models.lite_llm import LiteLlm
# MODEL_GPT_4O、MODEL_CLAUDE_SONNET 等应该已定义
# 否则,继续使用:model = MODEL_GEMINI_2_5_FLASH
# --- 问候智能体 ---
greeting_agent = None
try:
greeting_agent = Agent(
# 对于简单任务使用可能不同/更便宜的模型
model = MODEL_GEMINI_2_5_FLASH,
# model=LiteLlm(model=MODEL_GPT_4O), # 如果你想尝试其他模型
name="greeting_agent",
instruction="你是问候智能体。你的唯一任务是向用户提供友好的问候。"
"使用 'say_hello' 工具生成问候。"
"如果用户提供了他们的名字,确保将其传递给工具。"
"不要参与任何其他对话或任务。",
description="使用 'say_hello' 工具处理简单的问候和打招呼。", # 对委托至关重要
tools=[say_hello],
)
print(f"✅ 智能体 '{greeting_agent.name}' 已使用模型 '{greeting_agent.model}' 创建。")
except Exception as e:
print(f"❌ 无法创建问候智能体。检查 API 密钥({greeting_agent.model})。错误:{e}")
# --- 告别智能体 ---
farewell_agent = None
try:
farewell_agent = Agent(
# 可以使用相同或不同的模型
model = MODEL_GEMINI_2_5_FLASH,
# model=LiteLlm(model=MODEL_GPT_4O), # 如果你想尝试其他模型
name="farewell_agent",
instruction="你是告别智能体。你的唯一任务是提供礼貌的告别消息。"
"当用户表示他们要离开或结束对话时使用 'say_goodbye' 工具"
"(例如,使用 'bye'、'goodbye'、'thanks bye'、'see you' 等词)。"
"不要执行任何其他操作。",
description="使用 'say_goodbye' 工具处理简单的告别和再见。", # 对委托至关重要
tools=[say_goodbye],
)
print(f"✅ 智能体 '{farewell_agent.name}' 已使用模型 '{farewell_agent.model}' 创建。")
except Exception as e:
print(f"❌ 无法创建告别智能体。检查 API 密钥({farewell_agent.model})。错误:{e}")
3. 使用子智能体定义根智能体(天气智能体 v2)¶
现在,我们升级我们的 weather_agent。关键变化是:
- 添加
sub_agents参数:我们传递一个包含刚创建的greeting_agent和farewell_agent实例的列表。 - 更新
instruction:我们明确告诉根智能体关于其子智能体以及何时应该将任务委托给它们。
关键概念:自动委托(自动流程) 通过提供 sub_agents 列表,ADK 启用自动委托。当根智能体收到用户查询时,其 LLM 不仅考虑自己的指令和工具,还考虑每个子智能体的 description。如果 LLM 确定查询更符合子智能体描述的能力(例如,"处理简单的问候"),它将自动生成一个特殊的内部操作,将控制权转移到该子智能体进行该轮次。然后子智能体使用其自己的模型、指令和工具处理查询。
最佳实践:确保根智能体的指令清楚地指导其委托决策。按名称提及子智能体并描述应该发生委托的条件。
# @title 使用子智能体定义根智能体
# 在定义根智能体之前,确保子智能体已成功创建。
# 还要确保原始的 'get_weather' 工具已定义。
root_agent = None
runner_root = None # 初始化运行器
if greeting_agent and farewell_agent and 'get_weather' in globals():
# 让我们为根智能体使用一个强大的 Gemini 模型来处理编排
root_agent_model = MODEL_GEMINI_2_5_FLASH
weather_agent_team = Agent(
name="weather_agent_v2", # 给它一个新的版本名称
model=root_agent_model,
description="主协调智能体。处理天气请求并将问候/告别委托给专家。",
instruction="你是协调团队的主要天气智能体。你的主要职责是提供天气信息。"
"仅对特定天气请求使用 'get_weather' 工具(例如,'伦敦的天气')。"
"你有专门的子智能体:"
"1. 'greeting_agent':处理简单的问候,如 'Hi'、'Hello'。为这些委托给它。"
"2. 'farewell_agent':处理简单的告别,如 'Bye'、'See you'。为这些委托给它。"
"分析用户的查询。如果是问候,委托给 'greeting_agent'。如果是告别,委托给 'farewell_agent'。"
"如果是天气请求,使用 'get_weather' 自己处理。"
"对于其他任何事情,适当地回应或声明你无法处理。",
tools=[get_weather], # 根智能体仍然需要天气工具来完成其核心任务
# 关键变化:在这里链接子智能体!
sub_agents=[greeting_agent, farewell_agent]
)
print(f"✅ 根智能体 '{weather_agent_team.name}' 已使用模型 '{root_agent_model}' 创建,子智能体:{[sa.name for sa in weather_agent_team.sub_agents]}")
else:
print("❌ 无法创建根智能体,因为一个或多个子智能体初始化失败或 'get_weather' 工具缺失。")
if not greeting_agent: print(" - 问候智能体缺失。")
if not farewell_agent: print(" - 告别智能体缺失。")
if 'get_weather' not in globals(): print(" - get_weather 函数缺失。")
4. 与智能体团队交互¶
现在我们已经定义了带有专门子智能体的根智能体(weather_agent_team - 注意:确保此变量名称与前一个代码块中定义的名称匹配,可能是 # @title 使用子智能体定义根智能体,可能将其命名为 root_agent),让我们测试委托机制。
以下代码块将:
- 定义一个
async函数run_team_conversation。 - 在此函数内部,专门为此测试运行创建一个新的、专用的
InMemorySessionService和特定会话(session_001_agent_team)。这隔离了用于测试团队动态的对话历史。 - 创建一个配置为使用我们的
weather_agent_team(根智能体)和专用会话服务的Runner(runner_agent_team)。 - 使用我们更新的
call_agent_async函数向runner_agent_team发送不同类型的查询(问候、天气请求、告别)。我们明确传递此特定测试的运行器、用户 ID 和会话 ID。 - 立即执行
run_team_conversation函数。
我们期望以下流程:
- "你好!" 查询发送到
runner_agent_team。 - 根智能体(
weather_agent_team)接收它,并根据其指令和greeting_agent的描述委托任务。 greeting_agent处理查询,调用其say_hello工具,并生成响应。- "纽约的天气怎么样?" 查询不被委托,由根智能体使用其
get_weather工具直接处理。 - "谢谢,再见!" 查询被委托给
farewell_agent,它使用其say_goodbye工具。
# @title 与智能体团队交互
import asyncio # 确保导入 asyncio
# 确保根智能体(例如,前一个单元格中的 'weather_agent_team' 或 'root_agent')已定义。
# 确保 call_agent_async 函数已定义。
# 在定义对话函数之前检查根智能体变量是否存在
root_agent_var_name = 'root_agent' # 步骤 3 指南中的默认名称
if 'weather_agent_team' in globals(): # 检查用户是否使用了此名称
root_agent_var_name = 'weather_agent_team'
elif 'root_agent' not in globals():
print("⚠️ 未找到根智能体('root_agent' 或 'weather_agent_team')。无法定义 run_team_conversation。")
# 分配一个虚拟值以防止稍后代码块运行时出现 NameError
root_agent = None # 或设置一个标志以防止执行
# 仅在根智能体存在时定义和运行
if root_agent_var_name in globals() and globals()[root_agent_var_name]:
# 为对话逻辑定义主 async 函数。
# 此函数内部的 'await' 关键字对于异步操作是必需的。
async def run_team_conversation():
print("\n--- 测试智能体团队委托 ---")
session_service = InMemorySessionService()
APP_NAME = "weather_tutorial_agent_team"
USER_ID = "user_1_agent_team"
SESSION_ID = "session_001_agent_team"
session = await session_service.create_session(
app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID
)
print(f"会话已创建:App='{APP_NAME}',User='{USER_ID}',Session='{SESSION_ID}'")
actual_root_agent = globals()[root_agent_var_name]
runner_agent_team = Runner( # 或使用 InMemoryRunner
agent=actual_root_agent,
app_name=APP_NAME,
session_service=session_service
)
print(f"已为智能体 '{actual_root_agent.name}' 创建 Runner。")
# --- 使用 await 的交互(在 async def 中正确)---
await call_agent_async(query = "你好!",
runner=runner_agent_team,
user_id=USER_ID,
session_id=SESSION_ID)
await call_agent_async(query = "纽约的天气怎么样?",
runner=runner_agent_team,
user_id=USER_ID,
session_id=SESSION_ID)
await call_agent_async(query = "谢谢,再见!",
runner=runner_agent_team,
user_id=USER_ID,
session_id=SESSION_ID)
# --- 执行 `run_team_conversation` async 函数 ---
# 根据你的环境选择以下方法之一。
# 注意:这可能需要所用模型的 API 密钥!
# 方法 1:直接 await(笔记本/异步 REPL 的默认方法)
# 如果你的环境支持顶级 await(如 Colab/Jupyter 笔记本),
# 这意味着事件循环已经在运行,因此你可以直接 await 函数。
print("尝试使用 'await' 执行(笔记本的默认方法)...")
await run_team_conversation()
# 方法 2:asyncio.run(用于标准 Python 脚本 [.py])
# 如果从终端将此代码作为标准 Python 脚本运行,
# 脚本上下文是同步的。需要 `asyncio.run()` 来
# 创建和管理事件循环以执行你的 async 函数。
# 要使用此方法:
# 1. 注释掉上面的 `await run_team_conversation()` 行。
# 2. 取消注释以下块:
"""
import asyncio
if __name__ == "__main__": # 确保仅在直接执行脚本时运行
print("使用 'asyncio.run()' 执行(用于标准 Python 脚本)...")
try:
# 这会创建一个事件循环,运行你的 async 函数,并关闭循环。
asyncio.run(run_team_conversation())
except Exception as e:
print(f"发生错误:{e}")
"""
else:
# 如果之前未找到根智能体变量,则打印此消息
print("\n⚠️ 跳过智能体团队对话执行,因为根智能体在前一步骤中未成功定义。")
仔细查看输出日志,特别是 --- 工具:... 被调用 --- 消息。你应该观察到:
- 对于 "你好!",调用了
say_hello工具(表示greeting_agent处理了它)。 - 对于 "纽约的天气怎么样?",调用了
get_weather工具(表示根智能体处理了它)。 - 对于 "谢谢,再见!",调用了
say_goodbye工具(表示farewell_agent处理了它)。
这确认了成功的自动委托!根智能体在其指令和 sub_agents 的 description 的指导下,正确地将用户请求路由到团队内适当的专家智能体。
你现在已经用多个协作智能体构建了你的应用程序。这种模块化设计是构建更复杂和更强大的智能体系统的基础。在下一步中,我们将使用会话状态赋予我们的智能体跨轮次记住信息的能力。
步骤 4:使用会话状态添加记忆和个性化¶
到目前为止,我们的智能体团队可以通过委托处理不同的任务,但每次交互都是全新开始——智能体在会话中没有过去对话或用户偏好的记忆。为了创建更复杂和上下文感知的体验,智能体需要记忆。ADK 通过会话状态提供此功能。
什么是会话状态?
- 它是一个与特定用户会话(由
APP_NAME、USER_ID、SESSION_ID标识)绑定的 Python 字典(session.state)。 - 它在该会话内的多个对话轮次中持久化信息。
- 智能体和工具可以读取和写入此状态,使它们能够记住详细信息、调整行为和个性化响应。
智能体如何与状态交互:
ToolContext(主要方法):工具可以接受一个ToolContext对象(如果声明为最后一个参数,ADK 会自动提供)。此对象通过tool_context.state提供对会话状态的直接访问,允许工具在执行期间读取偏好或保存结果。output_key(自动保存智能体响应):可以使用output_key="your_key"配置Agent。然后,ADK 将自动将智能体在一个轮次的最终文本响应保存到session.state["your_key"]中。
在本步骤中,我们将通过以下方式增强我们的天气机器人团队:
- 使用新的
InMemorySessionService来独立演示状态。 - 使用
temperature_unit的用户偏好初始化会话状态。 - 创建天气工具的状态感知版本(
get_weather_stateful),它通过ToolContext读取此偏好并调整其输出格式(摄氏度/华氏度)。 - 更新根智能体以使用此状态感知工具,并使用
output_key配置它以自动将其最终天气报告保存到会话状态。 - 运行对话以观察初始状态如何影响工具,手动状态更改如何改变后续行为,以及
output_key如何持久化智能体的响应。
1. 初始化新的 Session Service 和 State¶
为了清楚地演示状态管理而不受先前步骤的干扰,我们将实例化一个新的 InMemorySessionService。我们还将创建一个带有初始状态的会话,定义用户首选的温度单位。
# @title 1. 为状态演示初始化新会话
# 导入必要的会话组件
from google.adk.sessions import InMemorySessionService
# 为此步骤创建新的会话服务
session_service_state_demo = InMemorySessionService()
print("✅ 为状态演示创建了新的 InMemorySessionService。")
# 为本教程的这一部分定义新的会话 ID
SESSION_ID_STATEFUL = "session_state_demo_001"
USER_ID_STATEFUL = "user_state_demo"
# 定义初始状态数据 - 用户最初偏好摄氏度
initial_state = {
"user_preference_temperature_unit": "Celsius"
}
# 创建会话,提供初始状态
session_stateful = await session_service_state_demo.create_session(
app_name=APP_NAME, # 使用一致的应用程序名称
user_id=USER_ID_STATEFUL,
session_id=SESSION_ID_STATEFUL,
state=initial_state # <<< 在创建时初始化状态
)
print(f"✅ 为用户 '{USER_ID_STATEFUL}' 创建了会话 '{SESSION_ID_STATEFUL}'。")
# 验证初始状态是否设置正确
retrieved_session = await session_service_state_demo.get_session(app_name=APP_NAME,
user_id=USER_ID_STATEFUL,
session_id = SESSION_ID_STATEFUL)
print("\n--- 初始会话状态 ---")
if retrieved_session:
print(retrieved_session.state)
else:
print("错误:无法检索会话。")
2. 创建状态感知天气工具(get_weather_stateful)¶
现在,我们创建一个新版本的天气工具。它的关键功能是接受 tool_context: ToolContext,这允许它访问 tool_context.state。它将读取 user_preference_temperature_unit 并相应地格式化温度。
-
关键概念:
ToolContext此对象是允许你的工具逻辑与会话上下文交互的桥梁,包括读取和写入状态变量。如果定义为工具函数的最后一个参数,ADK 会自动注入它。 -
最佳实践:从状态读取时,使用
dictionary.get('key', default_value)来处理键可能尚不存在的情况,确保你的工具不会崩溃。
from google.adk.tools.tool_context import ToolContext
def get_weather_stateful(city: str, tool_context: ToolContext) -> dict:
"""检索天气,根据会话状态转换温度单位。"""
print(f"--- 工具:get_weather_stateful 为 {city} 调用 ---")
# --- 从状态读取偏好 ---
preferred_unit = tool_context.state.get("user_preference_temperature_unit", "Celsius") # 默认为摄氏度
print(f"--- 工具:读取状态 'user_preference_temperature_unit':{preferred_unit} ---")
city_normalized = city.lower().replace(" ", "")
# 模拟天气数据(内部始终以摄氏度存储)
mock_weather_db = {
"newyork": {"temp_c": 25, "condition": "sunny"},
"london": {"temp_c": 15, "condition": "cloudy"},
"tokyo": {"temp_c": 18, "condition": "light rain"},
}
if city_normalized in mock_weather_db:
data = mock_weather_db[city_normalized]
temp_c = data["temp_c"]
condition = data["condition"]
# 根据状态偏好格式化温度
if preferred_unit == "Fahrenheit":
temp_value = (temp_c * 9/5) + 32 # 计算华氏度
temp_unit = "°F"
else: # 默认为摄氏度
temp_value = temp_c
temp_unit = "°C"
report = f"{city.capitalize()} 的天气是 {condition},温度为 {temp_value:.0f}{temp_unit}。"
result = {"status": "success", "report": report}
print(f"--- 工具:以 {preferred_unit} 生成报告。结果:{result} ---")
# 写回状态的示例(此工具可选)
tool_context.state["last_city_checked_stateful"] = city
print(f"--- 工具:更新状态 'last_city_checked_stateful':{city} ---")
return result
else:
# 处理未找到城市的情况
error_msg = f"抱歉,我没有 '{city}' 的天气信息。"
print(f"--- 工具:未找到城市 '{city}'。---")
return {"status": "error", "error_message": error_msg}
print("✅ 状态感知 'get_weather_stateful' 工具已定义。")
3. 重新定义子智能体并更新根智能体¶
为了确保此步骤是独立的并能正确构建,我们首先完全按照步骤 3 中的方式重新定义 greeting_agent 和 farewell_agent。然后,我们定义新的根智能体(weather_agent_v4_stateful):
- 它使用新的
get_weather_stateful工具。 - 它包括用于委托的问候和告别子智能体。
- 至关重要的是,它设置了
output_key="last_weather_report",这会自动将其最终天气响应保存到会话状态。
# @title 3. 使用 output_key 重新定义子智能体和根智能体
# 确保必要的导入:Agent、LiteLlm、Runner
from google.adk.agents import Agent
from google.adk.models.lite_llm import LiteLlm
from google.adk.runners import Runner
# 确保工具 'say_hello'、'say_goodbye' 已定义(来自步骤 3)
# 确保模型常量 MODEL_GPT_4O、MODEL_GEMINI_2_5_FLASH 等已定义
# --- 重新定义问候智能体(来自步骤 3)---
greeting_agent = None
try:
greeting_agent = Agent(
model=MODEL_GEMINI_2_5_FLASH,
name="greeting_agent",
instruction="你是问候智能体。你的唯一任务是使用 'say_hello' 工具提供友好的问候。不要做其他任何事情。",
description="使用 'say_hello' 工具处理简单的问候和打招呼。",
tools=[say_hello],
)
print(f"✅ 智能体 '{greeting_agent.name}' 已重新定义。")
except Exception as e:
print(f"❌ 无法重新定义问候智能体。错误:{e}")
# --- 重新定义告别智能体(来自步骤 3)---
farewell_agent = None
try:
farewell_agent = Agent(
model=MODEL_GEMINI_2_5_FLASH,
name="farewell_agent",
instruction="你是告别智能体。你的唯一任务是使用 'say_goodbye' 工具提供礼貌的告别消息。不要执行任何其他操作。",
description="使用 'say_goodbye' 工具处理简单的告别和再见。",
tools=[say_goodbye],
)
print(f"✅ 智能体 '{farewell_agent.name}' 已重新定义。")
except Exception as e:
print(f"❌ 无法重新定义告别智能体。错误:{e}")
# --- 定义更新后的根智能体 ---
root_agent_stateful = None
runner_root_stateful = None # 初始化运行器
# 在创建根智能体之前检查前提条件
if greeting_agent and farewell_agent and 'get_weather_stateful' in globals():
root_agent_model = MODEL_GEMINI_2_5_FLASH # 选择编排模型
root_agent_stateful = Agent(
name="weather_agent_v4_stateful", # 新版本名称
model=root_agent_model,
description="主智能体:提供天气(状态感知单位),委托问候/告别,将报告保存到状态。",
instruction="你是主要的天气智能体。你的工作是使用 'get_weather_stateful' 提供天气。"
"该工具将根据存储在状态中的用户偏好格式化温度。"
"将简单的问候委托给 'greeting_agent',将告别委托给 'farewell_agent'。"
"仅处理天气请求、问候和告别。",
tools=[get_weather_stateful], # 使用状态感知工具
sub_agents=[greeting_agent, farewell_agent], # 包括子智能体
output_key="last_weather_report" # <<< 自动保存智能体的最终天气响应
)
print(f"✅ 根智能体 '{root_agent_stateful.name}' 已使用状态感知工具和 output_key 创建。")
# --- 为此根智能体和新会话服务创建 Runner ---
runner_root_stateful = Runner(
agent=root_agent_stateful,
app_name=APP_NAME,
session_service=session_service_stateful # 使用新的状态感知会话服务
)
print(f"✅ 已为状态感知根智能体 '{runner_root_stateful.agent.name}' 创建 Runner,使用状态感知会话服务。")
else:
print("❌ 无法创建状态感知根智能体。缺少前提条件。")
if not greeting_agent: print(" - 缺少 greeting_agent 定义。")
if not farewell_agent: print(" - 缺少 farewell_agent 定义。")
if 'get_weather_stateful' not in globals(): print(" - 缺少 get_weather_stateful 工具。")
4. 交互并测试状态流程¶
现在,让我们执行一个旨在测试状态交互的对话,使用 runner_root_stateful(与我们的状态感知智能体和 session_service_stateful 关联)。我们将使用之前定义的 call_agent_async 函数,确保传递正确的运行器、用户 ID(USER_ID_STATEFUL)和会话 ID(SESSION_ID_STATEFUL)。
对话流程将是:
- 检查天气(伦敦):
get_weather_stateful工具应该从第 1 节中初始化的会话状态中读取初始的 "Celsius" 偏好。根智能体的最终响应(摄氏度的天气报告)应该通过output_key配置保存到state['last_weather_report']。 - 手动更新状态:我们将直接修改存储在
InMemorySessionService实例(session_service_stateful)中的状态。- 为什么要直接修改?
session_service.get_session()方法返回会话的副本。修改该副本不会影响后续智能体运行中使用的状态。对于这个使用InMemorySessionService的测试场景,我们访问内部的sessions字典来更改user_preference_temperature_unit的实际存储状态值为 "Fahrenheit"。注意:在实际应用程序中,状态更改通常由工具或智能体逻辑返回EventActions(state_delta=...)触发,而不是直接手动更新。
- 为什么要直接修改?
- 再次检查天气(纽约):
get_weather_stateful工具现在应该从状态中读取更新后的 "Fahrenheit" 偏好并相应地转换温度。根智能体的新响应(华氏度的天气)将由于output_key而覆盖state['last_weather_report']中的之前值。 - 问候智能体:验证对
greeting_agent的委托仍然与状态感知操作一起正常工作。此交互将成为此特定序列中output_key保存的最后响应。 - 检查最终状态:对话结束后,我们最后一次检索会话(获取副本)并打印其状态,以确认
user_preference_temperature_unit确实是 "Fahrenheit",观察output_key保存的最终值(在此运行中将是问候语),并查看工具写入的last_city_checked_stateful值。
# @title 4. 交互以测试状态流程和 output_key
import asyncio # 确保导入 asyncio
# 确保状态感知运行器 (runner_root_stateful) 从前一个单元格可用
# 确保 call_agent_async、USER_ID_STATEFUL、SESSION_ID_STATEFUL、APP_NAME 已定义
if 'runner_root_stateful' in globals() and runner_root_stateful:
# 为状态感知对话逻辑定义主 async 函数。
# 此函数内部的 'await' 关键字对于异步操作是必需的。
async def run_stateful_conversation():
print("\n--- 测试状态:温度单位转换和 output_key ---")
# 1. 检查天气(使用初始状态:摄氏度)
print("--- 轮次 1:请求伦敦的天气(期望摄氏度)---")
await call_agent_async(query= "伦敦的天气怎么样?",
runner=runner_root_stateful,
user_id=USER_ID_STATEFUL,
session_id=SESSION_ID_STATEFUL
)
# 2. 手动更新状态偏好为华氏度 - 直接修改存储
print("\n--- 手动更新状态:将单位设置为华氏度 ---")
try:
# 直接访问内部存储 - 这是针对 InMemorySessionService 的测试特定方法
# 注意:在使用持久化服务(数据库、VertexAI)的生产环境中,你通常会
# 通过智能体操作或特定服务 API(如果可用)更新状态,
# 而不是直接操作内部存储。
stored_session = session_service_stateful.sessions[APP_NAME][USER_ID_STATEFUL][SESSION_ID_STATEFUL]
stored_session.state["user_preference_temperature_unit"] = "Fahrenheit"
# 可选:如果有任何逻辑依赖于时间戳,你可能还想更新时间戳
# import time
# stored_session.last_update_time = time.time()
print(f"--- 存储的会话状态已更新。当前 'user_preference_temperature_unit':{stored_session.state.get('user_preference_temperature_unit', '未设置')} ---") # 为安全起见添加了 .get
except KeyError:
print(f"--- 错误:无法从应用 '{APP_NAME}' 中用户 '{USER_ID_STATEFUL}' 的内部存储检索会话 '{SESSION_ID_STATEFUL}' 以更新状态。检查 ID 以及会话是否已创建。---")
except Exception as e:
print(f"--- 更新内部会话状态时出错:{e} ---")
# 3. 再次检查天气(工具现在应该使用华氏度)
# 这也将通过 output_key 更新 'last_weather_report'
print("\n--- 轮次 2:请求纽约的天气(期望华氏度)---")
await call_agent_async(query= "告诉我纽约的天气。",
runner=runner_root_stateful,
user_id=USER_ID_STATEFUL,
session_id=SESSION_ID_STATEFUL
)
# 4. 测试基本委托(应该仍然有效)
# 这将再次更新 'last_weather_report',覆盖纽约天气报告
print("\n--- 轮次 3:发送问候 ---")
await call_agent_async(query= "你好!",
runner=runner_root_stateful,
user_id=USER_ID_STATEFUL,
session_id=SESSION_ID_STATEFUL
)
# --- 执行 `run_stateful_conversation` async 函数 ---
# 根据你的环境选择以下方法之一。
# 方法 1:直接 await(笔记本/异步 REPL 的默认方法)
# 如果你的环境支持顶级 await(如 Colab/Jupyter 笔记本),
# 这意味着事件循环已经在运行,因此你可以直接 await 函数。
print("尝试使用 'await' 执行(笔记本的默认方法)...")
await run_stateful_conversation()
# 方法 2:asyncio.run(用于标准 Python 脚本 [.py])
# 如果从终端将此代码作为标准 Python 脚本运行,
# 脚本上下文是同步的。需要 `asyncio.run()` 来
# 创建和管理事件循环以执行你的 async 函数。
# 要使用此方法:
# 1. 注释掉上面的 `await run_stateful_conversation()` 行。
# 2. 取消注释以下块:
"""
import asyncio
if __name__ == "__main__": # 确保仅在直接执行脚本时运行
print("使用 'asyncio.run()' 执行(用于标准 Python 脚本)...")
try:
# 这会创建一个事件循环,运行你的 async 函数,并关闭循环。
asyncio.run(run_stateful_conversation())
except Exception as e:
print(f"发生错误:{e}")
"""
# --- 对话后检查最终会话状态 ---
# 此块在任一执行方法完成后运行。
print("\n--- 检查最终会话状态 ---")
final_session = await session_service_stateful.get_session(app_name=APP_NAME,
user_id= USER_ID_STATEFUL,
session_id=SESSION_ID_STATEFUL)
if final_session:
# 使用 .get() 进行更安全的访问以防键缺失
print(f"最终偏好:{final_session.state.get('user_preference_temperature_unit', '未设置')}")
print(f"最终最后天气报告(来自 output_key):{final_session.state.get('last_weather_report', '未设置')}")
print(f"最终最后检查的城市(由工具):{final_session.state.get('last_city_checked_stateful', '未设置')}")
# 打印完整状态以获取详细视图
# print(f"完整状态字典:{final_session.state}") # 用于详细视图
else:
print("\n❌ 错误:无法检索最终会话状态。")
else:
print("\n⚠️ 跳过状态测试对话。状态感知根智能体运行器 ('runner_root_stateful') 不可用。")
---
通过审查对话流程和最终会话状态输出,你可以确认:
* **状态读取:** 天气工具(`get_weather_stateful`)正确地从状态中读取了 `user_preference_temperature_unit`,最初为伦敦使用 "Celsius"。
* **状态更新:** 直接修改成功地将存储的偏好更改为 "Fahrenheit"。
* **状态读取(已更新):** 当被问及纽约的天气时,工具随后读取了 "Fahrenheit" 并执行了转换。
* **工具状态写入:** 工具成功地通过 `tool_context.state` 将 `last_city_checked_stateful`(第二次天气检查后的 "New York")写入状态。
* **委托:** 即使在状态修改后,对 `greeting_agent` 的 "你好!" 委托也正常工作。
* **`output_key`:** `output_key="last_weather_report"` 成功地为根智能体最终响应的*每一轮*保存了根智能体的*最终*响应。在此序列中,最后的响应是问候语("你好!"),因此它覆盖了状态键中的天气报告。
* **最终状态:** 最终检查确认偏好持久化为 "Fahrenheit"。
你现在已经成功集成了会话状态,使用 `ToolContext` 个性化智能体行为,手动操作状态以测试 `InMemorySessionService`,并观察了 `output_key` 如何提供一种简单的机制来将智能体的最后响应保存到状态。这种对状态管理的基础理解是关键,因为我们将在接下来的步骤中使用回调实现安全防护。
---
## 步骤 5:添加安全性 - 使用 `before_model_callback` 进行输入防护 {: #step-adding-safety-input-guardrail-with-before-model-callback }
我们的智能体团队正变得更加强大,能够记住偏好并有效使用工具。然而,在现实场景中,我们经常需要安全机制来控制智能体的行为,*在*潜在问题请求甚至到达核心大型语言模型(LLM)*之前*。
ADK 提供了**回调(Callbacks)**——允许你挂钩到智能体执行生命周期中特定点的函数。`before_model_callback` 对输入安全特别有用。
**什么是 `before_model_callback`?**
* 它是一个你定义的 Python 函数,ADK 会在智能体将其编译后的请求(包括对话历史、指令和最新的用户消息)发送到底层 LLM *之前*执行它。
* **目的**:检查请求,必要时修改它,或根据预定义规则完全阻止它。
**常见用例:**
* **输入验证/过滤**:检查用户输入是否符合标准或包含不允许的内容(如 PII 或关键字)。
* **防护栏(Guardrails)**:防止有害、离题或违反策略的请求被 LLM 处理。
* **动态提示修改**:在发送之前,将及时信息(例如,来自会话状态)添加到 LLM 请求上下文中。
**工作原理:**
1. 定义一个接受 `callback_context: CallbackContext` 和 `llm_request: LlmRequest` 的函数。
* `callback_context`:提供对智能体信息、会话状态(`callback_context.state`)等的访问。
* `llm_request`:包含打算发送给 LLM 的完整负载(`contents`、`config`)。
2. 在函数内部:
* **检查**:检查 `llm_request.contents`(特别是最后一条用户消息)。
* **修改(谨慎使用)**:你*可以*更改 `llm_request` 的部分内容。
* **阻止(防护)**:返回一个 `LlmResponse` 对象。ADK 将立即发回此响应,*跳过*该轮次的 LLM 调用。
* **允许**:返回 `None`。ADK 继续使用(可能修改的)请求调用 LLM。
**在本步骤中,我们将:**
1. 定义一个 `before_model_callback` 函数(`block_keyword_guardrail`),检查用户输入中是否包含特定关键字("BLOCK")。
2. 更新我们的状态感知根智能体(来自步骤 4 的 `weather_agent_v4_stateful`)以使用此回调。
3. 创建一个与此更新后的智能体关联的新运行器,但使用*相同的状态感知会话服务*以保持状态连续性。
4. 通过发送正常请求和包含关键字的请求来测试防护。
---
### 1. 定义防护回调函数 {: #define-the-guardrail-callback-function }
此函数将检查 `llm_request` 内容中的最后一条用户消息。如果找到 "BLOCK"(不区分大小写),它会构造并返回一个 `LlmResponse` 来阻止流程;否则,它返回 `None`。
```python
# @title 1. 定义 before_model_callback 防护
# 确保必要的导入可用
from google.adk.agents.callback_context import CallbackContext
from google.adk.models.llm_request import LlmRequest
from google.adk.models.llm_response import LlmResponse
from google.genai import types # 用于创建响应内容
from typing import Optional
def block_keyword_guardrail(
callback_context: CallbackContext, llm_request: LlmRequest
) -> Optional[LlmResponse]:
"""
检查最新用户消息中的 'BLOCK'。如果找到,则阻止 LLM 调用
并返回预定义的 LlmResponse。否则,返回 None 以继续。
"""
agent_name = callback_context.agent_name # 获取被拦截模型调用的智能体名称
print(f"--- 回调:block_keyword_guardrail 正在为智能体运行:{agent_name} ---")
# 从请求历史中的最新用户消息中提取文本
last_user_message_text = ""
if llm_request.contents:
# 查找角色为 'user' 的最新消息
for content in reversed(llm_request.contents):
if content.role == 'user' and content.parts:
# 为简单起见,假设文本在第一部分
if content.parts[0].text:
last_user_message_text = content.parts[0].text
break # 找到最后一条用户消息文本
print(f"--- 回调:正在检查最后一条用户消息:'{last_user_message_text[:100]}...' ---") # 记录前 100 个字符
# --- 防护逻辑 ---
keyword_to_block = "BLOCK"
if keyword_to_block in last_user_message_text.upper(): # 不区分大小写检查
print(f"--- 回调:发现 '{keyword_to_block}'。正在阻止 LLM 调用!---")
# 可选:在状态中设置标志以记录阻止事件
callback_context.state["guardrail_block_keyword_triggered"] = True
print(f"--- 回调:设置状态 'guardrail_block_keyword_triggered': True ---")
# 构造并返回 LlmResponse 以停止流程并改回此响应
return LlmResponse(
content=types.Content(
role="model", # 模仿智能体的响应
parts=[types.Part(text=f"我无法处理此请求,因为它包含被阻止的关键字 '{keyword_to_block}'。")],
)
# 注意:如果需要,你也可以在这里设置 error_message 字段
)
else:
# 未找到关键字,允许请求继续到 LLM
print(f"--- 回调:未找到关键字。允许 {agent_name} 的 LLM 调用。---")
return None # 返回 None 信号 ADK 正常继续
print("✅ block_keyword_guardrail 函数已定义。")
2. 更新根智能体以使用回调函数¶
我们重新定义根智能体,添加 before_model_callback 参数并将其指向我们的新防护函数。为了清晰起见,我们将给它一个新的版本名称。
重要提示:如果子智能体(greeting_agent、farewell_agent)和状态感知工具(get_weather_stateful)尚未从前面的步骤中可用,我们需要在此上下文中重新定义它们,确保根智能体定义可以访问其所有组件。
# @title 2. 使用 before_model_callback 更新根智能体
# --- 重新定义子智能体(确保它们存在于此上下文中)---
greeting_agent = None
try:
# 使用定义的模型常量
greeting_agent = Agent(
model=MODEL_GEMINI_2_5_FLASH,
name="greeting_agent", # 保持原始名称以保持一致性
instruction="你是问候智能体。你的唯一任务是使用 'say_hello' 工具提供友好的问候。不要做其他任何事情。",
description="使用 'say_hello' 工具处理简单的问候和打招呼。",
tools=[say_hello],
)
print(f"✅ 子智能体 '{greeting_agent.name}' 已重新定义。")
except Exception as e:
print(f"❌ 无法重新定义问候智能体。检查模型/API 密钥 ({greeting_agent.model})。错误:{e}")
farewell_agent = None
try:
# 使用定义的模型常量
farewell_agent = Agent(
model=MODEL_GEMINI_2_5_FLASH,
name="farewell_agent", # 保持原始名称
instruction="你是告别智能体。你的唯一任务是使用 'say_goodbye' 工具提供礼貌的告别消息。不要执行任何其他操作。",
description="使用 'say_goodbye' 工具处理简单的告别和再见。",
tools=[say_goodbye],
)
print(f"✅ 子智能体 '{farewell_agent.name}' 已重新定义。")
except Exception as e:
print(f"❌ 无法重新定义告别智能体。检查模型/API 密钥 ({farewell_agent.model})。错误:{e}")
# --- 定义带有回调的根智能体 ---
root_agent_model_guardrail = None
runner_root_model_guardrail = None
# 在继续之前检查所有组件
if greeting_agent and farewell_agent and 'get_weather_stateful' in globals() and 'block_keyword_guardrail' in globals():
# 使用定义的模型常量
root_agent_model = MODEL_GEMINI_2_5_FLASH
root_agent_model_guardrail = Agent(
name="weather_agent_v5_model_guardrail", # 为了清晰起见的新版本名称
model=root_agent_model,
description="主智能体:处理天气,委托问候/告别,包括输入关键字防护。",
instruction="你是主要的天气智能体。使用 'get_weather_stateful' 提供天气。"
"将简单的问候委托给 'greeting_agent',将告别委托给 'farewell_agent'。"
"仅处理天气请求、问候和告别。",
tools=[get_weather_stateful],
sub_agents=[greeting_agent, farewell_agent], # 引用重新定义的子智能体
output_key="last_weather_report", # 保留步骤 4 中的 output_key
before_model_callback=block_keyword_guardrail # <<< 分配防护回调
)
print(f"✅ 根智能体 '{root_agent_model_guardrail.name}' 已使用 before_model_callback 创建。")
# --- 为此智能体创建 Runner,使用相同的状态感知会话服务 ---
# 确保 session_service_stateful 从步骤 4 存在
if 'session_service_stateful' in globals():
runner_root_model_guardrail = Runner(
agent=root_agent_model_guardrail,
app_name=APP_NAME, # 使用一致的 APP_NAME
session_service=session_service_stateful # <<< 使用步骤 4 中的服务
)
print(f"✅ 已为防护智能体 '{runner_root_model_guardrail.agent.name}' 创建 Runner,使用状态感知会话服务。")
else:
print("❌ 无法创建运行器。缺少步骤 4 中的 'session_service_stateful'。")
else:
print("❌ 无法创建带有模型防护的根智能体。一个或多个先决条件缺失或初始化失败:")
if not greeting_agent: print(" - 问候智能体")
if not farewell_agent: print(" - 告别智能体")
if 'get_weather_stateful' not in globals(): print(" - 'get_weather_stateful' 工具")
if 'block_keyword_guardrail' not in globals(): print(" - 'block_keyword_guardrail' 回调")
3. 交互并测试防护¶
让我们测试防护的行为。我们将使用与步骤 4 中相同的会话(SESSION_ID_STATEFUL)来展示状态在这些更改中持久存在。
- 发送正常的请求(应该通过防护并执行)。
- 发送包含 "BLOCK" 的请求(应该被回调拦截)。
- 发送问候(应该通过根智能体的防护,被委托,并正常执行)。
# @title 3. 交互以测试模型输入防护
import asyncio # 确保导入 asyncio
# 确保防护智能体的运行器可用
if 'runner_root_model_guardrail' in globals() and runner_root_model_guardrail:
# 定义用于防护测试对话的主 async 函数。
# 此函数内部的 'await' 关键字对于异步操作是必需的。
async def run_guardrail_test_conversation():
print("\n--- 测试模型输入防护 ---")
# 使用带有回调的智能体的运行器和现有的状态感知会话 ID
# 定义一个辅助 lambda 以进行更清晰的交互调用
interaction_func = lambda query: call_agent_async(query,
runner_root_model_guardrail,
USER_ID_STATEFUL, # 使用现有用户 ID
SESSION_ID_STATEFUL # 使用现有会话 ID
)
# 1. 正常请求(回调允许,应使用之前状态更改中的华氏度)
print("--- 轮次 1:请求伦敦的天气(期望允许,华氏度)---")
await interaction_func("伦敦的天气怎么样?")
# 2. 包含被阻止关键字的请求(回调拦截)
print("\n--- 轮次 2:使用被阻止关键字请求(期望被阻止)---")
await interaction_func("BLOCK 东京的天气请求") # 回调应捕获 "BLOCK"
# 3. 正常问候(回调允许根智能体,发生委托)
print("\n--- 轮次 3:发送问候(期望允许)---")
await interaction_func("再次你好")
# --- 执行 `run_guardrail_test_conversation` async 函数 ---
# 根据你的环境选择以下方法之一。
# 方法 1:直接 await(笔记本/异步 REPL 的默认方法)
# 如果你的环境支持顶级 await(如 Colab/Jupyter 笔记本),
# 这意味着事件循环已经在运行,因此你可以直接 await 函数。
print("尝试使用 'await' 执行(笔记本的默认方法)...")
await run_guardrail_test_conversation()
# 方法 2:asyncio.run(用于标准 Python 脚本 [.py])
# 如果从终端将此代码作为标准 Python 脚本运行,
# 脚本上下文是同步的。需要 `asyncio.run()` 来
# 创建和管理事件循环以执行你的 async 函数。
# 要使用此方法:
# 1. 注释掉上面的 `await run_guardrail_test_conversation()` 行。
# 2. 取消注释以下块:
"""
import asyncio
if __name__ == "__main__": # 确保仅在直接执行脚本时运行
print("使用 'asyncio.run()' 执行(用于标准 Python 脚本)...")
try:
# 这会创建一个事件循环,运行你的 async 函数,并关闭循环。
asyncio.run(run_guardrail_test_conversation())
except Exception as e:
print(f"发生错误:{e}")
"""
# --- 对话后检查最终会话状态 ---
# 此块在任一执行方法完成后运行。
# 可选:检查状态中由回调设置的触发标志
print("\n--- 检查最终会话状态(防护测试后)---")
# 使用与此状态感知会话关联的会话服务实例
final_session = await session_service_stateful.get_session(app_name=APP_NAME,
user_id=USER_ID_STATEFUL,
session_id=SESSION_ID_STATEFUL)
if final_session:
# 使用 .get() 进行更安全的访问
print(f"防护触发标志:{final_session.state.get('guardrail_block_keyword_triggered', '未设置 (或 False)')}")
print(f"最后天气报告:{final_session.state.get('last_weather_report', '未设置')}") # 如果成功,应该是伦敦天气
print(f"温度单位:{final_session.state.get('user_preference_temperature_unit', '未设置')}") # 应该是华氏度
# print(f"完整状态字典:{final_session.state}") # 用于详细视图
else:
print("\n❌ 错误:无法检索最终会话状态。")
else:
print("\n⚠️ 跳过模型防护测试。运行器 ('runner_root_model_guardrail') 不可用。")
观察执行流程:
- 伦敦天气: 回调函数为
weather_agent_v5_model_guardrail运行,检查消息,打印 "未找到关键字。允许 LLM 调用。",并返回None。智能体继续执行,调用get_weather_stateful工具(该工具使用步骤 4 状态更改中的 "Fahrenheit" 偏好),并返回天气。此响应通过output_key更新last_weather_report。 - BLOCK 请求: 回调函数再次为
weather_agent_v5_model_guardrail运行,检查消息,发现 "BLOCK",打印 "阻止 LLM 调用!",设置状态标志,并返回预定义的LlmResponse。智能体的底层 LLM 在此轮中从未被调用。用户看到回调函数的阻止消息。 - 再次问候: 回调函数为
weather_agent_v5_model_guardrail运行,允许请求。根智能体随后委托给greeting_agent。注意:在根智能体上定义的before_model_callback不会自动应用于子智能体。greeting_agent正常执行,调用其say_hello工具,并返回问候语。
你已经成功实现了输入安全层!before_model_callback 提供了一个强大的机制来执行规则并控制智能体行为,在进行昂贵或潜在风险的 LLM 调用之前。接下来,我们将应用类似的概念来为工具使用本身添加防护。
步骤 6:添加安全性 - 工具参数防护(before_tool_callback)¶
在步骤 5 中,我们添加了一个防护来检查并可能阻止用户输入,在它到达 LLM 之前。现在,我们将添加另一层控制,在 LLM 决定使用工具之后但在该工具实际执行之前。这对于验证 LLM 想要传递给工具的参数很有用。
ADK 为此精确目的提供了 before_tool_callback。
什么是 before_tool_callback?
- 它是一个 Python 函数,在 LLM 请求使用特定工具并决定参数后,在该工具函数运行之前执行。
- 目的:验证工具参数,根据特定输入阻止工具执行,动态修改参数,或执行资源使用策略。
常见用例:
- 参数验证:检查 LLM 提供的参数是否有效、在允许范围内或符合预期格式。
- 资源保护:防止使用可能昂贵、访问受限数据或导致不良副作用的输入调用工具(例如,阻止某些参数的 API 调用)。
- 动态参数修改:在工具运行之前,根据会话状态或其他上下文信息调整参数。
工作原理:
-
定义一个接受
tool: BaseTool、args: Dict[str, Any]和tool_context: ToolContext的函数。tool:即将被调用的工具对象(检查tool.name)。args:LLM 为工具生成的参数字典。tool_context:提供对会话状态(tool_context.state)、智能体信息等的访问。
-
在函数内部:
- 检查:检查
tool.name和args字典。 - 修改:直接更改
args字典中的值。如果返回None,工具将使用这些修改后的参数运行。 - 阻止/覆盖(防护):返回一个字典。ADK 将此字典视为工具调用的结果,完全跳过原始工具函数的执行。该字典理想情况下应匹配它所阻止的工具的预期返回格式。
- 允许:返回
None。ADK 继续使用(可能修改的)参数执行实际的工具函数。
- 检查:检查
在本步骤中,我们将:
- 定义一个
before_tool_callback函数(block_paris_tool_guardrail),专门检查是否使用城市 "Paris" 调用get_weather_stateful工具。 - 如果检测到 "Paris",回调将阻止工具并返回自定义错误字典。
- 更新我们的根智能体(
weather_agent_v6_tool_guardrail)以包含before_model_callback和这个新的before_tool_callback。 - 为此智能体创建新的运行器,使用相同的状态感知会话服务。
- 通过请求允许的城市和被阻止的城市("Paris")的天气来测试流程。
1. 定义工具防护回调函数¶
此函数针对 get_weather_stateful 工具。它检查 city 参数。如果是 "Paris",它返回一个看起来像工具自己的错误响应的错误字典。否则,它通过返回 None 允许工具运行。
# @title 1. 定义 before_tool_callback 防护
# 确保必要的导入可用
from google.adk.tools.base_tool import BaseTool
from google.adk.tools.tool_context import ToolContext
from typing import Optional, Dict, Any # 用于类型提示
def block_paris_tool_guardrail(
tool: BaseTool, args: Dict[str, Any], tool_context: ToolContext
) -> Optional[Dict]:
"""
检查是否为 'Paris' 调用了 'get_weather_stateful'。
如果是,则阻止工具执行并返回特定的错误字典。
否则,通过返回 None 允许工具调用继续。
"""
tool_name = tool.name
agent_name = tool_context.agent_name # 尝试工具调用的智能体
print(f"--- 回调:block_paris_tool_guardrail 正在为智能体 '{agent_name}' 中的工具 '{tool_name}' 运行 ---")
print(f"--- 回调:正在检查参数:{args} ---")
# --- 防护逻辑 ---
target_tool_name = "get_weather_stateful" # 匹配 FunctionTool 使用的函数名称
blocked_city = "paris"
# 检查是否是正确的工具以及城市参数是否匹配被阻止的城市
if tool_name == target_tool_name:
city_argument = args.get("city", "") # 安全地获取 'city' 参数
if city_argument and city_argument.lower() == blocked_city:
print(f"--- 回调:检测到被阻止的城市 '{city_argument}'。正在阻止工具执行!---")
# 可选:更新状态
tool_context.state["guardrail_tool_block_triggered"] = True
print(f"--- 回调:设置状态 'guardrail_tool_block_triggered': True ---")
# 返回与工具预期的错误输出格式匹配的字典
# 此字典成为工具的结果,跳过实际的工具运行。
return {
"status": "error",
"error_message": f"策略限制:目前通过工具防护已禁用针对 '{city_argument.capitalize()}' 的天气检查。"
}
else:
print(f"--- 回调:城市 '{city_argument}' 允许用于工具 '{tool_name}'。---")
else:
print(f"--- 回调:工具 '{tool_name}' 不是目标工具。允许。---")
# 如果上面的检查没有返回字典,则允许工具执行
print(f"--- 回调:允许工具 '{tool_name}' 继续。---")
return None # 返回 None 允许实际的工具函数运行
print("✅ block_paris_tool_guardrail 函数已定义。")
2. 更新根智能体以使用两个回调¶
我们再次重新定义根智能体(weather_agent_v6_tool_guardrail),这次在步骤 5 的 before_model_callback 旁边添加 before_tool_callback 参数。
自包含执行说明:与步骤 5 类似,在定义此智能体之前,请确保执行上下文中已定义或可用所有先决条件(子智能体、工具、before_model_callback)。
# @title 2. 使用两个回调更新根智能体(自包含)
# --- 确保先决条件已定义 ---
# (包括或确保执行以下定义:Agent, LiteLlm, Runner, ToolContext,
# MODEL 常量,say_hello, say_goodbye, greeting_agent, farewell_agent,
# get_weather_stateful, block_keyword_guardrail, block_paris_tool_guardrail)
# --- 重新定义子智能体(确保它们存在于此上下文中)---
greeting_agent = None
try:
# 使用定义的模型常量
greeting_agent = Agent(
model=MODEL_GEMINI_2_5_FLASH,
name="greeting_agent", # 保持原始名称以保持一致性
instruction="你是问候智能体。你的唯一任务是使用 'say_hello' 工具提供友好的问候。不要做其他任何事情。",
description="使用 'say_hello' 工具处理简单的问候和打招呼。",
tools=[say_hello],
)
print(f"✅ 子智能体 '{greeting_agent.name}' 已重新定义。")
except Exception as e:
print(f"❌ 无法重新定义问候智能体。检查模型/API 密钥 ({greeting_agent.model})。错误:{e}")
farewell_agent = None
try:
# 使用定义的模型常量
farewell_agent = Agent(
model=MODEL_GEMINI_2_5_FLASH,
name="farewell_agent", # 保持原始名称
instruction="你是告别智能体。你的唯一任务是使用 'say_goodbye' 工具提供礼貌的告别消息。不要执行任何其他操作。",
description="使用 'say_goodbye' 工具处理简单的告别和再见。",
tools=[say_goodbye],
)
print(f"✅ 子智能体 '{farewell_agent.name}' 已重新定义。")
except Exception as e:
print(f"❌ 无法重新定义告别智能体。检查模型/API 密钥 ({farewell_agent.model})。错误:{e}")
# --- 定义带有两个回调的根智能体 ---
root_agent_tool_guardrail = None
runner_root_tool_guardrail = None
if ('greeting_agent' in globals() and greeting_agent and
'farewell_agent' in globals() and farewell_agent and
'get_weather_stateful' in globals() and
'block_keyword_guardrail' in globals() and
'block_paris_tool_guardrail' in globals()):
root_agent_model = MODEL_GEMINI_2_5_FLASH
root_agent_tool_guardrail = Agent(
name="weather_agent_v6_tool_guardrail", # 新版本名称
model=root_agent_model,
description="主智能体:处理天气,委托,包括输入和工具防护。",
instruction="你是主要的天气智能体。使用 'get_weather_stateful' 提供天气。"
"将问候委托给 'greeting_agent',将告别委托给 'farewell_agent'。"
"仅处理天气、问候和告别。",
tools=[get_weather_stateful],
sub_agents=[greeting_agent, farewell_agent],
output_key="last_weather_report",
before_model_callback=block_keyword_guardrail, # 输入防护
before_tool_callback=block_paris_tool_guardrail # <<< 工具防护
)
print(f"✅ 根智能体 '{root_agent_tool_guardrail.name}' 已使用两个回调创建。")
# --- 为此智能体创建 Runner,使用相同的状态感知会话服务 ---
if 'session_service_stateful' in globals():
runner_root_tool_guardrail = Runner(
agent=root_agent_tool_guardrail,
app_name=APP_NAME,
session_service=session_service_stateful # 保持状态连续性
)
print(f"✅ 已为双重防护智能体 '{runner_root_tool_guardrail.agent.name}' 创建 Runner。")
else:
print("❌ 无法创建运行器。缺少 'session_service_stateful'。")
else:
print("❌ 无法创建带有双重防护的根智能体。缺少先决条件。")
3. 交互以测试工具参数防护¶
让我们测试交互流程,再次使用前面步骤中相同的状态感知会话(SESSION_ID_STATEFUL)。
- 请求伦敦的天气(允许)。
- 请求巴黎的天气(被工具防护阻止)。
- 再次请求伦敦的天气(允许,确认防护是特定的)。
# @title 3. 交互以测试工具参数防护
import asyncio # 确保导入 asyncio
# 确保防护智能体的运行器可用
if 'runner_root_tool_guardrail' in globals() and runner_root_tool_guardrail:
# 定义用于工具防护测试对话的主 async 函数。
async def run_tool_guardrail_test_conversation():
print("\n--- 测试工具参数防护 ---")
# 使用带有两个回调的智能体的运行器
interaction_func = lambda query: call_agent_async(query,
runner_root_tool_guardrail,
USER_ID_STATEFUL,
SESSION_ID_STATEFUL
)
# 1. 允许的城市(伦敦)
print("--- 轮次 1:请求伦敦的天气(期望允许)---")
await interaction_func("伦敦的天气怎么样?")
# 2. 被阻止的城市(巴黎)- 工具防护应触发
print("\n--- 轮次 2:请求巴黎的天气(期望被工具防护阻止)---")
await interaction_func("巴黎的天气怎么样?")
# 预期:智能体将调用工具,回调将拦截它,返回错误,
# 智能体将看到该错误并将其传达给用户。
# 3. 允许的城市(再次伦敦)- 确认系统未损坏
print("\n--- 轮次 3:再次请求伦敦的天气(期望允许)---")
await interaction_func("再检查一次伦敦")
# --- 执行 `run_tool_guardrail_test_conversation` async 函数 ---
# 根据你的环境选择以下方法之一。
# 方法 1:直接 await(笔记本/异步 REPL 的默认方法)
print("尝试使用 'await' 执行(笔记本的默认方法)...")
await run_tool_guardrail_test_conversation()
# 方法 2:asyncio.run(用于标准 Python 脚本 [.py])
# 如果从终端将此代码作为标准 Python 脚本运行,
# 请注释掉上面的 `await` 行并取消注释下面的块:
"""
import asyncio
if __name__ == "__main__":
print("使用 'asyncio.run()' 执行(用于标准 Python 脚本)...")
try:
asyncio.run(run_tool_guardrail_test_conversation())
except Exception as e:
print(f"发生错误:{e}")
"""
# --- 对话后检查最终会话状态 ---
print("\n--- 检查最终会话状态(工具防护测试后)---")
final_session = await session_service_stateful.get_session(app_name=APP_NAME,
user_id=USER_ID_STATEFUL,
session_id=SESSION_ID_STATEFUL)
if final_session:
# 检查工具阻止标志
print(f"工具防护触发标志:{final_session.state.get('guardrail_tool_block_triggered', '未设置 (或 False)')}")
# 检查输入防护标志(不应在此测试中触发)
print(f"输入防护触发标志:{final_session.state.get('guardrail_block_keyword_triggered', '未设置 (或 False)')}")
print(f"最后天气报告:{final_session.state.get('last_weather_report', '未设置')}")
else:
print("\n❌ 错误:无法检索最终会话状态。")
else:
print("\n⚠️ 跳过工具防护测试。运行器 ('runner_root_tool_guardrail') 不可用。")
分析输出
- 伦敦(允许): 智能体调用
get_weather_stateful。before_tool_callback看到 "London",打印 "允许工具...",并返回None。工具执行并返回真实(模拟)数据。 - 巴黎(阻止): 智能体尝试调用
get_weather_stateful(city='Paris')。before_tool_callback看到 "Paris",打印 "阻止工具执行!",并返回错误字典。实际的工具函数从未运行。 智能体接收此错误字典作为工具的输出,并告诉用户由于策略限制无法获取天气。 - 伦敦(再次允许): 确认防护仅针对特定参数,不会破坏一般功能。
结论¶
恭喜!你已经构建了一个复杂、多智能体、状态感知且安全的 AI 应用程序。
回顾一下我们所取得的成就:
- 基础知识: 创建了一个可以调用自定义工具(
get_weather)的单个智能体。 - 团队合作: 实施了一个根智能体,它可以根据任务描述自动将任务委托给专门的子智能体(问候、告别)。
- 状态管理: 使用会话状态来记住用户偏好(温度单位),并构建了可以读取/写入此状态的智能工具。
- 输入安全: 使用
before_model_callback实施了防护栏,以在潜在有害输入到达 LLM 之前阻止它。 - 工具安全: 使用
before_tool_callback添加了第二层防御,以验证和限制工具的使用方式(阻止特定城市)。
天气机器人团队架构:
graph TD
User["用户"] -->|查询| RootAgent["根智能体<br>(Weather Agent v6)"]
subgraph "安全层"
InputGuard["输入防护<br>(before_model_callback)"]
ToolGuard["工具防护<br>(before_tool_callback)"]
end
RootAgent --> InputGuard
InputGuard -- "阻止 (关键字)" --> User
InputGuard -- "允许" --> LLM[LLM]
LLM -->|工具调用| ToolGuard
ToolGuard -- "阻止 (巴黎)" --> RootAgent
ToolGuard -- "允许" --> Tools
subgraph "工具 & 状态"
Tools["工具: get_weather_stateful"]
State[("会话状态<br>偏好, 历史")]
Tools <--> State
end
subgraph "子智能体 (委托)"
Greeting["问候智能体"]
Farewell["告别智能体"]
end
RootAgent -- "委托" --> Greeting
RootAgent -- "委托" --> Farewell
Greeting --> User
Farewell --> User
Tools --> RootAgent
RootAgent --> User
关键要点:
- 智能体与工具:定义能力和推理的基本构建块。清晰的指令和文档字符串至关重要。
- 运行器与会话服务:编排智能体执行和维护对话上下文的引擎和内存管理系统。
- 自动委托:ADK 强大的功能,允许根智能体根据子智能体的描述智能地路由任务,实现模块化和可扩展的架构。
- 回调:用于实施安全、策略和自定义逻辑的强大挂钩,位于执行生命周期的关键点(模型调用前、工具执行前)。
下一步是什么?
- 探索更多模型:尝试将
MODEL_GEMINI_2_0_FLASH换成其他模型,看看它们如何处理委托。 - 持久化状态:将
InMemorySessionService替换为基于数据库的实现(如 Firestore),以构建真正的生产就绪型应用程序。 - 扩展团队:添加更多专门的智能体(例如,用于货币转换或新闻更新的智能体),并观察根智能体如何管理日益复杂的团队。
你现在已经掌握了使用 ADK 构建高级智能体系统的基础知识。快乐编码!