使用 ADK 构建你的第一个智能体团队:一个渐进式天气机器人¶
本教程基于 Agent Development Kit 的快速入门示例托展而来。现在,你可以更深入地探索,并构建一个更复杂、多代理系统。
我们将着手构建一个天气机器人智能体团队,在简单的基础上逐步添加高级功能。从一个可以查询天气的单个智能体开始,我们将逐步添加以下功能:
- 利用不同的 AI 模型(Gemini、GPT、Claude)
- 为不同任务设计专门的子智能体(如问候和告别)
- 实现智能体之间的智能任务分配
- 使用持久会话状态为智能体提供记忆能力
- 通过回调实现关键的安全防护机制
为什么选择天气机器人团队?
这个看似简单的用例提供了一个实用且易于理解的场景,用于探索构建复杂、真实世界智能体应用所需的核心 ADK 概念。你将学习如何构建交互结构、管理状态、确保安全性,以及协调多个协同工作的 AI "大脑"。
再说一下什么是 ADK?
提醒一下,ADK 是一个 Python 框架,旨在简化由大型语言模型(LLM)驱动的应用程序的开发。它为创建能够推理、计划、使用工具、与用户动态交互以及在团队中有效协作的智能体提供了强大的构建模块。
在这个高级教程中,你将掌握:
- ✅ 工具定义与使用:创建 Python 函数(
工具
)赋予智能体特定能力(如获取数据),并指导智能体如何有效使用它们。 - ✅ 多 LLM 灵活性:通过 LiteLLM 集成配置智能体使用各种领先的 LLM(Gemini、GPT-4o、Claude Sonnet),让你为每个任务选择最佳模型。
- ✅ 智能体任务分配与协作:设计专门的子智能体,并实现用户请求的自动路由(
自动流程
)到团队中最合适的智能体。 - ✅ 会话状态记忆:利用
会话状态
和工具上下文
使智能体能够记住对话轮次之间的信息,实现更具上下文的交互。 - ✅ 使用回调实现安全防护:实现
before_model_callback
和before_tool_callback
,根据预定义规则检查、修改或阻止请求/工具使用,增强应用程序的安全性和控制。
最终预期:
完成本教程后,你将构建一个功能齐全的多智能体天气机器人系统。这个系统不仅能提供天气信息,还能处理对话礼仪,记住最后查询的城市,并在定义的安全边界内运行,所有这些都由 ADK 编排。
前提条件:
- ✅ 扎实的 Python 编程理解
- ✅ 熟悉大型语言模型(LLM)、API 和智能体概念
- ❗ 至关重要:完成 ADK 快速入门教程或具备 ADK 基础知识(Agent、Runner、SessionService、基本工具使用)。本教程直接建立在这些概念之上。
- ✅ 你计划使用的 LLM 的 API 密钥(例如,Google AI Studio 用于 Gemini、OpenAI Platform、Anthropic Console)
准备好构建你的智能体团队了吗?让我们开始吧!
步骤 0:设置和安装¶
库安装¶
关于执行环境的说明:
本教程专为 Google Colab、Colab Enterprise 或 Jupyter 笔记本等交互式笔记本环境而设计。请注意以下几点:
- 运行异步代码: 笔记本环境处理异步代码的方式不同。你会看到使用
await
的示例(适用于事件循环已经运行的情况,常见于笔记本中)或asyncio.run()
(在作为独立.py
脚本运行或在特定笔记本设置中通常需要)。代码块为两种场景提供指导。 - 手动 Runner/Session 设置: 这些步骤涉及显式创建
Runner
和SessionService
实例。之所以展示这种方法,是因为它为你提供了对智能体执行生命周期、会话管理和状态持久性的细粒度控制。
替代方案:使用 ADK 的内置工具(Web UI / CLI / API 服务器)
如果你更喜欢使用 ADK 标准工具自动处理运行器和会话管理的设置,你可以在这里找到为此目的构建的等效代码。该版本设计为可以直接使用 adk web
(用于 Web UI)、adk run
(用于 CLI 交互)或 adk api_server
(暴露 API)等命令运行。请遵循该替代资源中提供的 README.md
说明。
准备好构建你的智能体团队了吗?让我们开始吧!
注意: 本教程适用于 adk 1.0.0 及更高版本
# @title Step 0: Setup and Installation
# Install ADK and LiteLLM for multi-model support
!pip install google-adk -q
!pip install litellm -q
print("Installation complete.")
导入库¶
import os
import asyncio
from google.adk.agents import Agent
from google.adk.models.lite_llm import LiteLlm # 用于多模型支持
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.genai import types # 用于创建消息 Content/Parts
import warnings
# 忽略所有警告
warnings.filterwarnings("ignore")
import logging
logging.basicConfig(level=logging.ERROR)
print("Libraries imported.")
设置 API 密钥¶
# --- 重要:用你的真实 API 密钥替换占位符 ---
# Gemini API 密钥(从 Google AI Studio 获取:https://aistudio.google.com/app/apikey)
os.environ["GOOGLE_API_KEY"] = "YOUR_GOOGLE_API_KEY" # <--- 替换
# OpenAI API 密钥(从 OpenAI Platform 获取:https://platform.openai.com/api-keys)
os.environ['OPENAI_API_KEY'] = 'YOUR_OPENAI_API_KEY' # <--- 替换
# Anthropic API 密钥(从 Anthropic Console 获取:https://console.anthropic.com/settings/keys)
os.environ['ANTHROPIC_API_KEY'] = 'YOUR_ANTHROPIC_API_KEY' # <--- 替换
# --- 验证密钥(可选检查) ---
print("API Keys Set:")
print(f"Google API Key set: {'Yes' if os.environ.get('GOOGLE_API_KEY') and os.environ['GOOGLE_API_KEY'] != 'YOUR_GOOGLE_API_KEY' else 'No (REPLACE PLACEHOLDER!)'}")
print(f"OpenAI API Key set: {'Yes' if os.environ.get('OPENAI_API_KEY') and os.environ['OPENAI_API_KEY'] != 'YOUR_OPENAI_API_KEY' else 'No (REPLACE PLACEHOLDER!)'}")
print(f"Anthropic API Key set: {'Yes' if os.environ.get('ANTHROPIC_API_KEY') and os.environ['ANTHROPIC_API_KEY'] != 'YOUR_ANTHROPIC_API_KEY' else 'No (REPLACE PLACEHOLDER!)'}")
# 配置 ADK 直接使用 API 密钥(本多模型设置不使用 Vertex AI)
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "False"
# @markdown **安全提示:** 最佳做法是安全地管理 API 密钥(例如,使用 Colab Secrets 或环境变量),而不是直接在笔记本中硬编码。替换上面的占位符字符串。
定义模型常量以便于使用¶
# --- Define Model Constants for easier use ---
# More supported models can be referenced here: https://ai.google.dev/gemini-api/docs/models#model-variations
MODEL_GEMINI_2_0_FLASH = "gemini-2.0-flash"
# More supported models can be referenced here: https://docs.litellm.ai/docs/providers/openai#openai-chat-completion-models
MODEL_GPT_4O = "openai/gpt-4.1" # You can also try: gpt-4.1-mini, gpt-4o etc.
# More supported models can be referenced here: https://docs.litellm.ai/docs/providers/anthropic
MODEL_CLAUDE_SONNET = "anthropic/claude-sonnet-4-20250514" # You can also try: claude-opus-4-20250514 , claude-3-7-sonnet-20250219 etc
print("\nEnvironment configured.")
步骤 1:你的第一个智能体 - 基本天气查询¶
让我们首先构建天气机器人的基础组件:一个能够执行特定任务 - 查询天气信息的单个智能体。这涉及创建两个核心部分:
- 工具:一个 Python 函数,赋予智能体获取天气数据的能力。
- 智能体:理解用户请求,知道它拥有天气工具,并决定何时以及如何使用它的 AI "大脑"。
1. 定义工具¶
在 ADK 中,工具是赋予智能体超越纯文本生成能力的构建块。它们通常是执行特定操作的常规 Python 函数,如调用 API、查询数据库或执行计算。
我们的第一个工具将提供模拟天气报告。这使我们能够专注于智能体结构,而无需暂时使用外部 API 密钥。之后,你可以轻松地将这个模拟函数替换为调用真实天气服务的函数。
关键概念:文档字符串至关重要! 智能体的 LLM 严重依赖函数的文档字符串来理解:
- 工具做什么
- 何时使用它
- 它需要什么参数(
city: str
) - 它返回什么信息
最佳实践:为你的工具编写清晰、描述性和准确的文档字符串。这对于 LLM 正确使用工具至关重要。
# @title 定义 get_weather 工具
def get_weather(city: str) -> dict:
"""获取指定城市的当前天气报告。
Args:
city (str): 城市名称(例如,"New York"、"London"、"Tokyo")。
Returns:
dict: 包含天气信息的字典。
包含一个 'status' 键('success' 或 'error')。
如果是 'success',包含 'report' 键与天气详情。
如果是 'error',包含 'error_message' 键。
"""
# 最佳实践:记录工具执行以便于调试
print(f"--- Tool: get_weather called for city: {city} ---")
city_normalized = city.lower().replace(" ", "") # 基本输入标准化
# 为简单起见的模拟天气数据
mock_weather_db = {
"newyork": {"status": "success", "report": "纽约的天气是晴朗的,温度为 25°C。"},
"london": {"status": "success", "report": "伦敦多云,温度为 15°C。"},
"tokyo": {"status": "success", "report": "东京有小雨,温度为 18°C。"},
}
# 最佳实践:在工具内优雅地处理潜在错误
if city_normalized in mock_weather_db:
return mock_weather_db[city_normalized]
else:
return {"status": "error", "error_message": f"抱歉,我没有 '{city}' 的天气信息。"}
# 工具使用示例(可选自测)
print(get_weather("New York"))
print(get_weather("Paris"))
2. 定义智能体¶
现在,让我们创建智能体本身。ADK 中的 Agent
编排用户、LLM 和可用工具之间的交互。
我们用几个关键参数配置它:
name
:此智能体的唯一标识符(例如,"weather_agent_v1")。model
:指定使用哪个 LLM(例如,MODEL_GEMINI_2_5_PRO
)。我们将从特定的 Gemini 模型开始。description
:智能体总体目的的简洁摘要。当其他智能体需要决定是否将任务委派给这个智能体时,这一点变得至关重要。instruction
:为 LLM 提供详细指导,包括其行为方式、角色、目标,特别是如何以及何时使用其分配的工具
。tools
:包含智能体被允许使用的实际 Python 工具函数的列表(例如,[get_weather]
)。
最佳实践:提供清晰具体的instruction
提示。指令越详细,LLM 就越能理解其角色以及如何有效使用其工具。如果需要,明确说明错误处理。
最佳实践:选择描述性的name
和description
值。这些在 ADK 内部使用,对于自动任务分配等功能(稍后介绍)至关重要。
# @title 定义天气智能体
# 使用前面定义的模型常量之一
AGENT_MODEL = MODEL_GEMINI_2_5_PRO # 从强大的 Gemini 模型开始
weather_agent = Agent(
name="weather_agent_v1",
model=AGENT_MODEL, # 指定底层 LLM
description="为特定城市提供天气信息。", # 对于稍后的任务分配至关重要
instruction="你是一个有用的天气助手。你的主要目标是提供当前天气报告。"
"当用户询问特定城市的天气时,"
"你必须使用 'get_weather' 工具查找信息。"
"分析工具的响应:如果状态为 'error',礼貌地告知用户错误信息。"
"如果状态为 'success',向用户清晰简洁地呈现天气 'report'。"
"仅在提到城市进行天气请求时使用该工具。",
tools=[get_weather], # 使该工具可用于此智能体
)
print(f"Agent '{weather_agent.name}' created using model '{AGENT_MODEL}'.")
3. 设置 Runner 和 Session Service¶
为了管理和执行智能体,我们需要两个更多的组件:
SessionService
:负责管理不同用户和会话的对话历史和状态。InMemorySessionService
是一个简单的实现,将所有内容存储在内存中,适用于测试和简单的应用程序。它跟踪交换的消息。我们将在步骤 4 中探索状态持久性。Runner
:引擎负责编排交互流程。它接受用户输入,将其路由到适当的智能体,根据智能体的逻辑管理对 LLM 和工具的调用,通过SessionService
处理会话更新,并生成表示交互进展的事件。
# @title Setup Session Service and Runner
# --- Session Management ---
# Key Concept: SessionService stores conversation history & state.
# InMemorySessionService is simple, non-persistent storage for this tutorial.
session_service = InMemorySessionService()
# Define constants for identifying the interaction context
APP_NAME = "weather_tutorial_app"
USER_ID = "user_1"
SESSION_ID = "session_001" # Using a fixed ID for simplicity
# Create the specific session where the conversation will happen
session = await session_service.create_session(
app_name=APP_NAME,
user_id=USER_ID,
session_id=SESSION_ID
)
print(f"Session created: App='{APP_NAME}', User='{USER_ID}', Session='{SESSION_ID}'")
# --- Runner ---
# Key Concept: Runner orchestrates the agent execution loop.
runner = Runner(
agent=weather_agent, # The agent we want to run
app_name=APP_NAME, # Associates runs with our app
session_service=session_service # Uses our session manager
)
print(f"Runner created for agent '{runner.agent.name}'.")
4. 与智能体交互¶
我们需要一种方法向智能体发送消息并接收其响应。由于 LLM 调用和工具执行可能需要时间,ADK 的 Runner
是异步运行的。
我们将定义一个 async
辅助函数 (call_agent_async
),它:
- 接受用户查询字符串。
- 将其打包到 ADK
Content
格式中。 - 调用
runner.run_async
,提供用户/会话上下文和新的消息。 - 遍历
runner.run_async
产生的事件。事件表示智能体执行步骤(例如,工具调用请求、工具结果接收、中间 LLM 思考、最终响应)。 - 使用
event.is_final_response()
识别并打印最终响应事件。
为什么是 async
?与 LLM 和潜在工具(如外部 API)的交互是 I/O 绑定的操作。使用 asyncio
允许程序高效地处理这些操作,而不会阻塞执行。
# @title Define Agent Interaction Function
import asyncio
from google.genai import types # For creating message Content/Parts
async def call_agent_async(query: str):
"""Sends a query to the agent and prints the final response."""
print(f"\n>>> User Query: {query}")
# Prepare the user's message in ADK format
content = types.Content(role='user', parts=[types.Part(text=query)])
final_response_text = "智能体没有产生最终响应。" # 默认值
# 关键概念:run_async 执行智能体逻辑并产生事件。
# 我们遍历事件以找到最终答案。
async for event in runner.run_async(user_id=USER_ID, session_id=SESSION_ID, new_message=content):
# 你可以取消注释下面的行以查看执行期间的*所有*事件
# print(f" [事件] 作者:{event.author},类型:{type(event).__name__},最终:{event.is_final_response()},内容:{event.content}")
# 关键概念:is_final_response() 标记轮次的结束消息。
if event.is_final_response():
if event.content and event.content.parts:
# 假设第一部分中的文本响应
final_response_text = event.content.parts[0].text
elif event.actions and event.actions.escalate: # 处理潜在错误/升级
final_response_text = f"智能体升级:{event.error_message or '无特定消息。'}"
# 如果需要,在这里添加更多检查(例如,特定错误代码)
break # 找到最终响应后停止处理事件
print(f"<<< Agent Response: {final_response_text}")
5. 运行对话¶
最后,让我们通过发送一些查询来测试我们的设置。我们将我们的 async
调用包装在一个主 async
函数中,并使用 await
运行它。
观察输出:
- 看到用户查询。
- 注意
--- Tool: get_weather called... ---
日志,当智能体使用工具时。 - 观察智能体的最终响应,包括它如何处理天气数据不可用的情况(对于巴黎)。
# @title Run the Initial Conversation
# We need an async function to await our interaction helper
async def run_conversation():
await call_agent_async("伦敦的天气如何?")
await call_agent_async("巴黎怎么样?") # 期望工具的错误消息
await call_agent_async("告诉我纽约的天气")
# Execute the conversation using await in an async context (like Colab/Jupyter)
await run_conversation()
预期输出:
>>> User Query: What is the weather like in London?
--- Tool: get_weather called for city: London ---
<<< Agent Response: The weather in London is cloudy with a temperature of 15°C.
>>> User Query: How about Paris?
--- Tool: get_weather called for city: Paris ---
<<< Agent Response: Sorry, I don't have weather information for Paris.
>>> User Query: Tell me the weather in New York
--- Tool: get_weather called for city: New York ---
<<< Agent Response: The weather in New York is sunny with a temperature of 25°C.
恭喜你!你已经成功构建并交互了你的第一个 ADK 智能体。它理解用户请求,使用工具查找信息,并根据工具结果适当响应。
在下一步中,我们将探索如何轻松切换为这个智能体提供支持的底层语言模型。
步骤 2:使用 LiteLLM 进行多模型¶
在步骤 1 中,我们构建了一个由特定 Gemini 模型驱动的功能性天气智能体。虽然有效,但现实世界的应用程序通常从使用不同的大型语言模型(LLM)中受益。为什么?
- 性能:某些模型在特定任务(例如,编程、推理、创意写作)方面表现出色。
- 成本:不同模型具有不同的价格点。
- 功能:模型提供多样化的功能、上下文窗口大小和微调选项。
- 可用性/冗余:拥有替代方案可以确保即使一个提供商出现问题,应用程序仍能正常运行。
ADK 通过其与 LiteLLM 库的集成,使在不同模型之间切换变得无缝。LiteLLM 作为 100 多个 LLM 的统一接口。
在本步骤中,我们将:
- 学习如何配置 ADK
Agent
以使用来自提供商(如 OpenAI)和 Anthropic 的模型。 - 定义、配置(使用自己的会话和运行器)并立即测试我们的天气智能体,每个都由不同的 LLM 支持。
- 与这些不同的智能体交互,观察它们在相同底层工具的情况下可能出现的响应变化。
1. 导入 LiteLlm
¶
我们在初始设置(步骤 0)中导入了这个,但它是在多模型支持的关键组件:
2. 定义和测试多模型智能体¶
与仅传递模型名称字符串(默认情况下为 Google 的 Gemini 模型)不同,我们使用 LiteLlm
类包装所需的模型标识符字符串。
- 关键概念:
LiteLlm
包装器:LiteLlm(model="provider/model_name")
语法告诉 ADK 通过 LiteLLM 库将请求路由到指定的模型提供商。
每个块将:
* 使用特定 LiteLLM 模型(MODEL_GPT_4O
或 MODEL_CLAUDE_SONNET
)定义智能体。
* 创建一个 新的、独立的 InMemorySessionService
和会话,专门用于该智能体的测试运行。这使得对话历史记录在演示中保持隔离。
* 创建一个为特定智能体和其会话服务配置的 Runner
。
* 立即调用 call_agent_async
发送查询并测试智能体。
最佳实践:使用常量作为模型名称(如 MODEL_GPT_4O
、MODEL_CLAUDE_SONNET
定义在步骤 0),以避免拼写错误并使代码更易于管理。
错误处理:我们用 try...except
块包装智能体定义。这防止了整个代码单元失败,如果特定提供商的 API 密钥丢失或无效,则允许教程继续进行配置的模型。
首先,让我们使用 OpenAI 的 GPT-4o 创建和测试智能体。
# @title 定义和测试 GPT 智能体
# 确保环境中定义了步骤 1 中的 'get_weather' 函数。
# 确保之前定义了 'call_agent_async'。
# --- 使用 GPT-4o 的智能体 ---
weather_agent_gpt = None # 初始化为 None
runner_gpt = None # 初始化运行器为 None
try:
weather_agent_gpt = Agent(
name="weather_agent_gpt",
# 关键变化:包装 LiteLLM 模型标识符
model=LiteLlm(model=MODEL_GPT_4O),
description="提供天气信息(使用 GPT-4o)。",
instruction="你是一个由 GPT-4o 驱动的有用的天气助手。"
"对于城市天气请求使用 'get_weather' 工具。"
"根据工具的输出状态清楚地呈现成功报告或礼貌的错误消息。",
tools=[get_weather], # 重新使用相同的工具
)
print(f"智能体 '{weather_agent_gpt.name}' 已使用模型 '{MODEL_GPT_4O}' 创建。")
# InMemorySessionService 是本教程的简单、非持久存储。
session_service_gpt = InMemorySessionService() # 创建专用服务
# 定义用于标识交互上下文的常量
APP_NAME_GPT = "weather_tutorial_app_gpt" # 此测试的唯一应用名称
USER_ID_GPT = "user_1_gpt"
SESSION_ID_GPT = "session_001_gpt" # 为简单起见使用固定 ID
# 创建将发生对话的特定会话
session_gpt = await session_service_gpt.create_session(
app_name=APP_NAME_GPT,
user_id=USER_ID_GPT,
session_id=SESSION_ID_GPT
)
print(f"会话已创建:App='{APP_NAME_GPT}',User='{USER_ID_GPT}',Session='{SESSION_ID_GPT}'")
# 创建特定于此智能体及其会话服务的运行器
runner_gpt = Runner(
agent=weather_agent_gpt,
app_name=APP_NAME_GPT, # 使用特定的应用名称
session_service=session_service_gpt # 使用特定的会话服务
)
print(f"已为智能体 '{runner_gpt.agent.name}' 创建运行器。")
# --- 测试 GPT 智能体 ---
print("\n--- 测试 GPT 智能体 ---")
# 确保 call_agent_async 使用正确的运行器、用户 ID、会话 ID
await call_agent_async("东京的天气怎么样?",
runner=runner_gpt,
user_id=USER_ID_GPT,
session_id=SESSION_ID_GPT)
except Exception as e:
print(f"❌ 无法创建或运行 GPT 智能体 '{MODEL_GPT_4O}'。检查 API 密钥和模型名称。错误:{e}")
接下来,我们将对 Anthropic 的 Claude Sonnet 做同样的事情。
# @title 定义和测试 Claude 智能体
# 确保环境中定义了步骤 1 中的 'get_weather' 函数。
# 确保之前定义了 'call_agent_async'。
# --- 使用 Claude Sonnet 的智能体 ---
weather_agent_claude = None # 初始化为 None
runner_claude = None # 初始化运行器为 None
try:
weather_agent_claude = Agent(
name="weather_agent_claude",
# 关键变化:包装 LiteLLM 模型标识符
model=LiteLlm(model=MODEL_CLAUDE_SONNET),
description="提供天气信息(使用 Claude Sonnet)。",
instruction="你是一个由 Claude Sonnet 驱动的有用的天气助手。"
"对于城市天气请求使用 'get_weather' 工具。"
"分析工具的字典输出('status'、'report'/'error_message')。"
"清楚地呈现成功报告或礼貌的错误消息。",
tools=[get_weather], # 重新使用相同的工具
)
print(f"智能体 '{weather_agent_claude.name}' 已使用模型 '{MODEL_CLAUDE_SONNET}' 创建。")
# InMemorySessionService 是本教程的简单、非持久存储。
session_service_claude = InMemorySessionService() # 创建专用服务
# 定义用于标识交互上下文的常量
APP_NAME_CLAUDE = "weather_tutorial_app_claude" # 唯一应用名称
USER_ID_CLAUDE = "user_1_claude"
SESSION_ID_CLAUDE = "session_001_claude" # 为简单起见使用固定 ID
# 创建将发生对话的特定会话
session_claude = await session_service_claude.create_session(
app_name=APP_NAME_CLAUDE,
user_id=USER_ID_CLAUDE,
session_id=SESSION_ID_CLAUDE
)
print(f"会话已创建:App='{APP_NAME_CLAUDE}',User='{USER_ID_CLAUDE}',Session='{SESSION_ID_CLAUDE}'")
# 创建特定于此智能体及其会话服务的运行器
runner_claude = Runner(
agent=weather_agent_claude,
app_name=APP_NAME_CLAUDE, # 使用特定的应用名称
session_service=session_service_claude # 使用特定的会话服务
)
print(f"已为智能体 '{runner_claude.agent.name}' 创建运行器。")
# --- 测试 Claude 智能体 ---
print("\n--- 测试 Claude 智能体 ---")
# 确保 call_agent_async 使用正确的运行器、用户 ID、会话 ID
await call_agent_async("请提供伦敦的天气。",
runner=runner_claude,
user_id=USER_ID_CLAUDE,
session_id=SESSION_ID_CLAUDE)
except Exception as e:
print(f"❌ 无法创建或运行 Claude 智能体 '{MODEL_CLAUDE_SONNET}'。检查 API 密钥和模型名称。错误:{e}")
观察两个代码块的输出。你应该看到:
- 每个智能体(
weather_agent_gpt
、weather_agent_claude
)成功创建(如果 API 密钥有效)。 - 为每个智能体创建了一个专用会话和运行器。
- 每个智能体在处理查询时都正确识别需要使用
get_weather
工具(你会看到--- Tool: get_weather called... ---
日志)。 - 底层工具逻辑保持不变,始终返回我们的模拟数据。
- 然而,每个智能体生成的最终文本响应可能略有不同,这取决于指令提示由不同的 LLM(GPT-4o 与 Claude Sonnet)解释和执行。
这一步展示了 ADK + LiteLLM 提供的强大功能和灵活性。你可以轻松地尝试和部署使用各种 LLM 的智能体,同时保持核心应用程序逻辑(工具、基本智能体结构)一致。
在下一步中,我们将超越单个智能体,构建一个小团队,其中智能体可以相互委托任务!
步骤 3:构建智能体团队 - 为问候和告别进行委托¶
在步骤 1 和步骤 2 中,我们构建了一个专注于天气查询的单个智能体。虽然对于其特定任务有效,但现实世界的应用程序通常涉及处理更广泛的用例。我们可以添加更多工具和复杂指令到我们的单个天气智能体中,但这很快就会变得难以管理和效率低下。
更强大的方法是构建一个智能体团队。这涉及:
- 创建多个专门智能体,每个智能体设计用于执行特定功能(例如,一个用于天气、一个用于问候、一个用于计算)。
- 指定一个根智能体(或协调器)来接收初始用户请求。
- 使根智能体能够委托请求给最合适的专门子智能体,基于用户的意图。
为什么构建智能体团队?
- 模块化:更容易开发、测试和维护各个智能体。
- 专业化:每个智能体可以根据其特定任务进行微调(指令、模型选择)。
- 可扩展性:通过添加新智能体来更容易地添加新功能。
- 效率:允许使用更简单/更便宜的模型来处理更简单任务(如问候)。
在本步骤中,我们将:
- 为处理问候(
say_hello
)和告别(say_goodbye
)创建简单工具。 - 创建两个新的专门子智能体:
greeting_agent
和farewell_agent
。 - 更新我们的主要天气智能体(
weather_agent_v2
)作为根智能体。 - 配置根智能体及其子智能体,实现自动委托。
- 通过发送不同类型的请求来测试委托流程。
1. 为子智能体定义工具¶
首先,让我们为我们的新专家智能体创建简单的 Python 函数。记住,清晰的文档字符串对于将要使用它们的智能体至关重要。
# @title 为问候和告别智能体定义工具
from typing import Optional # 确保导入 Optional
# 确保步骤 1 中的 'get_weather' 如果独立运行此步骤时可用。
# def get_weather(city: str) -> dict: ... (来自步骤 1)
def say_hello(name: Optional[str] = None) -> str:
"""提供简单的问候。如果提供姓名,将使用它。
Args:
name (str, optional): 要问候的人的姓名。如果未提供,默认为通用问候。
Returns:
str: 友好的问候消息。
"""
if name:
greeting = f"你好,{name}!"
print(f"--- 工具:say_hello 被调用,姓名:{name} ---")
else:
greeting = "你好!" # 如果姓名为 None 或未明确传递时的默认问候
print(f"--- 工具:say_hello 被调用,无特定姓名(姓名参数值:{name})---")
return greeting
def say_goodbye() -> str:
"""提供简单的告别消息来结束对话。"""
print(f"--- 工具:say_goodbye 被调用 ---")
return "再见!祝你有美好的一天。"
print("问候和告别工具已定义。")
# 可选自测
print(say_hello("Alice"))
print(say_hello()) # 测试无参数(应该使用默认"你好!")
print(say_hello(name=None)) # 测试 name 明确为 None(应该使用默认"你好!")
2. 定义子智能体(问候和告别)¶
现在,为我们的专家创建 Agent
实例。注意他们高度集中的 instruction
和,关键的是,他们的清晰 description
。description
是 根智能体 用来决定 何时 将任务委托给这些子智能体的关键信息。
我们甚至可以使用不同的 LLM 为这些子智能体!让我们将 GPT-4o 分配给问候智能体,并将告别智能体也使用 GPT-4o(如果你愿意,可以轻松切换到 Claude 或 Gemini,如果设置了 API 密钥)。
最佳实践:子智能体 description
字段应该准确且简洁地总结他们的特定功能。这对于有效自动委托至关重要。
最佳实践:子智能体 instruction
字段应该针对他们的有限范围,告诉他们具体要做什么,什么不要做(例如,"你的唯一任务是...")。
# @title 定义问候和告别子智能体
# 确保导入 LiteLlm 并设置 API 密钥(来自步骤 0/2)
# from google.adk.models.lite_llm import LiteLlm
# MODEL_GPT_4O、MODEL_CLAUDE_SONNET 等应该已定义
# --- 问候智能体 ---
greeting_agent = None
try:
greeting_agent = Agent(
# 为简单任务使用潜在不同/更便宜的模型
model=LiteLlm(model=MODEL_GPT_4O),
name="greeting_agent",
instruction="你是问候智能体。你的唯一任务是为用户提供友好的问候。"
"使用 'say_hello' 工具生成问候。"
"如果用户提供了他们的姓名,确保将其传递给工具。"
"不要参与任何其他对话或任务。",
description="使用 'say_hello' 工具处理简单问候和你好。", # 对委托至关重要
tools=[say_hello],
)
print(f"✅ 智能体 '{greeting_agent.name}' 已使用模型 '{MODEL_GPT_4O}' 创建。")
except Exception as e:
print(f"❌ 无法创建问候智能体。检查 API 密钥({MODEL_GPT_4O})。错误:{e}")
# --- 告别智能体 ---
farewell_agent = None
try:
farewell_agent = Agent(
# 可以使用相同或不同的模型
model=LiteLlm(model=MODEL_GPT_4O), # 此示例继续使用 GPT
name="farewell_agent",
instruction="你是告别智能体。你的唯一任务是提供礼貌的再见消息。"
"当用户表示他们要离开或结束对话时使用 'say_goodbye' 工具"
"(例如,使用"再见"、"再见"、"谢谢再见"、"回头见"等词语)。"
"不要执行任何其他操作。",
description="使用 'say_goodbye' 工具处理简单的告别和再见。", # 对委托至关重要
tools=[say_goodbye],
)
print(f"✅ 智能体 '{farewell_agent.name}' 已使用模型 '{MODEL_GPT_4O}' 创建。")
except Exception as e:
print(f"❌ 无法创建告别智能体。检查 API 密钥({MODEL_GPT_4O})。错误:{e}")
3. 定义根智能体及其子智能体¶
现在,我们升级我们的 weather_agent
。关键变化是:
- 添加
sub_agents
参数:我们传递一个包含刚刚创建的greeting_agent
和farewell_agent
实例的列表。 - 更新
instruction
:我们明确告诉根智能体关于它的子智能体以及何时应该将任务委托给它们。
关键概念:自动委托(自动流程)通过提供 sub_agents
列表,ADK 实现了自动委托。当根智能体收到用户查询时,它的 LLM 不仅考虑其自身的指令和工具,还考虑每个子智能体的 description
。如果 LLM 确定查询与子智能体描述的能力(例如,"处理简单问候")相符,它将自动生成一个特殊的内部动作,将控制权转移到该子智能体,以便该轮次。子智能体然后使用自己的模型、指令和工具处理查询。
最佳实践:确保根智能体的指令清楚地指导其委托决策。提及子智能体名称并描述委托发生的条件。
# @title 定义带有子智能体的根智能体
# 确保在定义根智能体之前成功创建了子智能体。
# 同时确保定义了原始的 'get_weather' 工具。
root_agent = None
runner_root = None # 初始化运行器
if greeting_agent and farewell_agent and 'get_weather' in globals():
# 让我们为根智能体使用功能强大的 Gemini 模型来处理编排
root_agent_model = MODEL_GEMINI_2_0_FLASH
weather_agent_team = Agent(
name="weather_agent_v2", # 给它一个新的版本名称
model=root_agent_model,
description="主要协调智能体。处理天气请求并将问候/告别委托给专家。",
instruction="你是协调团队的主要天气智能体。你的主要职责是提供天气信息。"
"仅对特定天气请求使用 'get_weather' 工具(例如,'伦敦的天气')。"
"你有专门的子智能体:"
"1. 'greeting_agent':处理像'嗨'、'你好'这样的简单问候。将此类委托给它。"
"2. 'farewell_agent':处理像'再见'、'回头见'这样的简单告别。将此类委托给它。"
"分析用户的查询。如果是问候,委托给 'greeting_agent'。如果是告别,委托给 'farewell_agent'。"
"如果是天气请求,使用 'get_weather' 自己处理。"
"对于其他任何事情,适当回应或说明你无法处理。",
tools=[get_weather], # 根智能体仍需要天气工具用于其核心任务
# 关键变化:在这里链接子智能体!
sub_agents=[greeting_agent, farewell_agent]
)
print(f"✅ 根智能体 '{weather_agent_team.name}' 已使用模型 '{root_agent_model}' 和子智能体创建:{[sa.name for sa in weather_agent_team.sub_agents]}")
else:
print("❌ 无法创建根智能体,因为一个或多个子智能体初始化失败或缺少 'get_weather' 工具。")
if not greeting_agent: print(" - 问候智能体缺失。")
if not farewell_agent: print(" - 告别智能体缺失。")
if 'get_weather' not in globals(): print(" - get_weather 函数缺失。")
4. 与智能体团队交互¶
现在,我们已经定义了根智能体(weather_agent_team
- 注意:确保此变量名称与之前代码块中定义的名称匹配,可能为 # @title Define the Root Agent with Sub-Agents
,可能将其命名为 root_agent
) 及其专门子智能体,让我们测试委托机制。
以下代码块将:
- 定义一个
async
函数run_team_conversation
。 - 在此函数中,创建一个 新的、独立的
InMemorySessionService
和一个特定的会话(session_001_agent_team
),专门用于此测试运行。这隔离了对话历史记录,用于测试团队动态。 - 创建一个
Runner
(runner_agent_team
),配置为使用我们的weather_agent_team
(根智能体)和专用会话服务。 - 使用我们更新的
call_agent_async
函数向runner_agent_team
发送不同类型的查询(问候、天气请求、告别)。我们明确传递运行器、用户 ID 和会话 ID 用于此特定测试。 - 立即执行
run_team_conversation
函数。
我们期望以下流程:
- "你好!" 查询进入
runner_agent_team
。 - 根智能体(
weather_agent_team
)收到它并根据其指令和greeting_agent
的描述将任务委托给greeting_agent
。 greeting_agent
处理查询,调用其say_hello
工具并生成响应。- "纽约的天气如何?" 查询未委托给根智能体,而是直接由根智能体使用其
get_weather
工具处理。 - "谢谢,再见!" 查询委托给
farewell_agent
,它使用其say_goodbye
工具。
# @title 与智能体团队交互
# 确保定义了根智能体(例如,来自前一个单元的 'weather_agent_team' 或 'root_agent')。
# 确保定义了 call_agent_async 函数。
# 在定义对话函数之前检查根智能体变量是否存在
root_agent_var_name = 'root_agent' # 来自步骤 3 指南的默认名称
if 'weather_agent_team' in globals(): # 检查用户是否使用了这个名称
root_agent_var_name = 'weather_agent_team'
elif 'root_agent' not in globals():
print("⚠️ 未找到根智能体('root_agent' 或 'weather_agent_team')。无法定义 run_team_conversation。")
# 分配一个虚拟值以防止稍后代码块运行时出现 NameError
root_agent = None
if root_agent_var_name in globals() and globals()[root_agent_var_name]:
async def run_team_conversation():
print("\n--- 测试智能体团队委托 ---")
# InMemorySessionService 是本教程的简单、非持久存储。
session_service = InMemorySessionService()
# 定义用于标识交互上下文的常量
APP_NAME = "weather_tutorial_agent_team"
USER_ID = "user_1_agent_team"
SESSION_ID = "session_001_agent_team"
session = await session_service.create_session(
app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID
)
print(f"会话已创建:App='{APP_NAME}',User='{USER_ID}',Session='{SESSION_ID}'")
# --- 获取实际根智能体对象 ---
# 使用确定的变量名称
actual_root_agent = globals()[root_agent_var_name]
# 创建特定于此智能体团队测试的运行器
runner_agent_team = Runner(
agent=actual_root_agent, # 使用根智能体对象
app_name=APP_NAME, # 使用特定的应用名称
session_service=session_service # 使用特定的会话服务
)
# 更正打印语句以显示实际根智能体的名称
print(f"已为智能体 '{actual_root_agent.name}' 创建运行器。")
# 总是通过根智能体的运行器交互,传递正确的 ID
await call_agent_async("你好!",
runner=runner_agent_team,
user_id=USER_ID,
session_id=SESSION_ID)
await call_agent_async("纽约的天气如何?",
runner=runner_agent_team,
user_id=USER_ID,
session_id=SESSION_ID)
await call_agent_async("谢谢,再见!",
runner=runner_agent_team,
user_id=USER_ID,
session_id=SESSION_ID)
# 执行对话
# 注意:这可能需要根智能体和子智能体使用的模型的 API 密钥!
await run_team_conversation()
else:
print("\n⚠️ Skipping agent team conversation as the root agent was not successfully defined in the previous step.")
仔细观察输出日志,特别是 --- Tool: ... called ---
消息。你应该观察到:
- 对于 "你好!",调用了
say_hello
工具(表示greeting_agent
处理了它)。 - 对于 "纽约的天气如何?",调用了
get_weather
工具(表示根智能体处理了它)。 - 对于 "谢谢,再见!",调用了
say_goodbye
工具(表示farewell_agent
处理了它)。
这确认了成功的自动委托!根智能体根据其指令和 sub_agents
的描述,正确地将用户请求路由到团队中适当的专门智能体。
你现在已经在应用程序中构建了多个协同工作的智能体。这种模块化设计是构建更复杂和能力的智能体系统的关键。在下一步中,我们将为智能体提供记忆能力,使用会话状态。
步骤 4:添加记忆和个性化使用会话状态¶
到目前为止,我们的智能体团队可以通过委托处理不同的任务,但每次交互都是全新的——智能体没有记忆,也没有会话内的用户偏好。为了创建更复杂和上下文感知的体验,智能体需要记忆。ADK 通过会话状态提供这一点。
什么是会话状态?
- 它是一个 Python 字典(
session.state
),与特定的用户会话(由APP_NAME
、USER_ID
、SESSION_ID
标识)相关联。 - 它跨多个对话轮次保持信息*。
- 智能体和工具可以从和写入此状态,允许它们记住细节、适应行为并个性化响应。
智能体如何与状态交互:
ToolContext
(主要方法):工具可以接受ToolContext
对象(自动由 ADK 提供,如果作为最后一个参数声明)。此对象通过tool_context.state
直接访问会话状态,允许工具读取偏好或保存结果在执行过程中。output_key
(自动保存智能体响应):智能体可以配置output_key="your_key"
。ADK 将自动保存智能体在轮次结束时的最终文本响应到session.state["your_key"]
。
在本步骤中,我们将增强我们的天气机器人团队:
- 使用新的
InMemorySessionService
来演示隔离状态。 - 使用用户偏好初始化会话状态,即
temperature_unit
。 - 创建一个状态感知版本的天气工具(
get_weather_stateful
),它通过ToolContext
读取此偏好,并调整输出格式(摄氏度/华氏度)。 - 更新根智能体以使用此状态工具,并将其配置为
output_key
,以便自动保存其最终天气报告到会话状态。 - 运行对话以观察初始状态如何影响工具、手动状态变化如何影响后续行为,以及
output_key
如何保持智能体的响应。
1. 初始化新会话服务和状态¶
为了清楚地演示状态管理,而不会干扰之前的步骤,我们将实例化一个新的 InMemorySessionService
。我们还将创建一个会话,其中包含一个初始状态,定义用户的温度单位偏好。
# @title 1. 初始化新会话服务和状态
# 导入必要的会话组件
from google.adk.sessions import InMemorySessionService
# 为此状态演示创建新的会话服务实例
session_service_stateful = InMemorySessionService()
print("✅ 已为状态演示创建新的 InMemorySessionService。")
# 为教程的这一部分定义新的会话 ID
SESSION_ID_STATEFUL = "session_state_demo_001"
USER_ID_STATEFUL = "user_state_demo"
# 定义初始状态数据 - 用户最初偏好摄氏度
initial_state = {
"user_preference_temperature_unit": "Celsius"
}
# 创建会话,提供初始状态
session_stateful = await session_service_stateful.create_session(
app_name=APP_NAME, # 使用一致的应用名称
user_id=USER_ID_STATEFUL,
session_id=SESSION_ID_STATEFUL,
state=initial_state # <<< 在创建期间初始化状态
)
print(f"✅ 已为用户 '{USER_ID_STATEFUL}' 创建会话 '{SESSION_ID_STATEFUL}'。")
# 验证初始状态是否正确设置
retrieved_session = await session_service_stateful.get_session(app_name=APP_NAME,
user_id=USER_ID_STATEFUL,
session_id = SESSION_ID_STATEFUL)
print("\n--- 初始会话状态 ---")
if retrieved_session:
print(retrieved_session.state)
else:
print("错误:无法检索会话。")
2. 创建状态感知天气工具¶
现在,我们创建一个新的天气工具。其关键功能是接受 tool_context: ToolContext
,它允许访问 tool_context.state
。它将读取 user_preference_temperature_unit
并根据状态格式化温度。
关键概念:ToolContext
这个对象是允许你的工具逻辑与会话上下文交互的桥梁,包括读取和写入状态变量。ADK 自动注入它,如果作为工具函数的最后一个参数定义。
最佳实践:在读取状态时,使用 dictionary.get('key', default_value)
来处理键可能不存在的情况,确保工具不会崩溃。
# @title 2. 创建状态感知天气工具
from google.adk.tools.tool_context import ToolContext
def get_weather_stateful(city: str, tool_context: ToolContext) -> dict:
"""检索天气,根据会话状态转换温度单位。"""
print(f"--- 工具:get_weather_stateful 被调用,城市:{city} ---")
# --- 从状态读取偏好 ---
preferred_unit = tool_context.state.get("user_preference_temperature_unit", "Celsius") # 默认为摄氏度
print(f"--- 工具:读取状态 'user_preference_temperature_unit':{preferred_unit} ---")
city_normalized = city.lower().replace(" ", "")
# 模拟天气数据(内部始终以摄氏度存储)
mock_weather_db = {
"newyork": {"temp_c": 25, "condition": "sunny"},
"london": {"temp_c": 15, "condition": "cloudy"},
"tokyo": {"temp_c": 18, "condition": "light rain"},
}
if city_normalized in mock_weather_db:
data = mock_weather_db[city_normalized]
temp_c = data["temp_c"]
condition = data["condition"]
# 根据状态偏好格式化温度
if preferred_unit == "Fahrenheit":
temp_value = (temp_c * 9/5) + 32 # 计算华氏度
temp_unit = "°F"
else: # 默认为摄氏度
temp_value = temp_c
temp_unit = "°C"
report = f"{city.capitalize()} 的天气是 {condition},温度为 {temp_value:.0f}{temp_unit}。"
result = {"status": "success", "report": report}
print(f"--- 工具:以 {preferred_unit} 生成报告。结果:{result} ---")
# 写入状态的示例(此工具可选)
tool_context.state["last_city_checked_stateful"] = city
print(f"--- 工具:更新状态 'last_city_checked_stateful':{city} ---")
return result
else:
# 处理未找到的城市
error_msg = f"抱歉,我没有 '{city}' 的天气信息。"
print(f"--- 工具:未找到城市 '{city}'。---")
return {"status": "error", "error_message": error_msg}
print("✅ 已定义状态感知的 'get_weather_stateful' 工具。")
3. 重新定义子智能体并更新根智能体¶
为了确保这一步是自包含的并且正确构建,我们首先重新定义 greeting_agent
和 farewell_agent
完全相同,然后我们定义新的根智能体(weather_agent_v4_stateful
):
- 它使用新的
get_weather_stateful
工具。 - 它包括问候和告别子智能体进行委托。
- 关键的是,它设置
output_key="last_weather_report"
,它自动将最终天气响应保存到会话状态。
# @title 3. Redefine Sub-Agents and Update Root Agent with output_key
# Ensure necessary imports: Agent, LiteLlm, Runner
from google.adk.agents import Agent
from google.adk.models.lite_llm import LiteLlm
from google.adk.runners import Runner
# Ensure tools 'say_hello', 'say_goodbye' are defined (from Step 3)
# Ensure model constants MODEL_GPT_4O, MODEL_GEMINI_2_5_PRO etc. are defined
# --- Redefine Greeting Agent (from Step 3) ---
greeting_agent = None
try:
greeting_agent = Agent(
model=MODEL_GEMINI_2_0_FLASH,
name="greeting_agent",
instruction="You are the Greeting Agent. Your ONLY task is to provide a friendly greeting using the 'say_hello' tool. Do nothing else.",
description="Handles simple greetings and hellos using the 'say_hello' tool.",
tools=[say_hello],
)
print(f"✅ Agent '{greeting_agent.name}' redefined.")
except Exception as e:
print(f"❌ Could not redefine Greeting agent. Error: {e}")
# --- Redefine Farewell Agent (from Step 3) ---
farewell_agent = None
try:
farewell_agent = Agent(
model=MODEL_GEMINI_2_0_FLASH,
name="farewell_agent",
instruction="You are the Farewell Agent. Your ONLY task is to provide a polite goodbye message using the 'say_goodbye' tool. Do not perform any other actions.",
description="Handles simple farewells and goodbyes using the 'say_goodbye' tool.",
tools=[say_goodbye],
)
print(f"✅ Agent '{farewell_agent.name}' redefined.")
except Exception as e:
print(f"❌ Could not redefine Farewell agent. Error: {e}")
# --- Define the Updated Root Agent ---
root_agent_stateful = None
runner_root_stateful = None # Initialize runner
# Check prerequisites before creating the root agent
if greeting_agent and farewell_agent and 'get_weather_stateful' in globals():
root_agent_model = MODEL_GEMINI_2_0_FLASH # Choose orchestration model
root_agent_stateful = Agent(
name="weather_agent_v4_stateful", # New version name
model=root_agent_model,
description="Main agent: Provides weather (state-aware unit), delegates greetings/farewells, saves report to state.",
instruction="You are the main Weather Agent. Your job is to provide weather using 'get_weather_stateful'. "
"The tool will format the temperature based on user preference stored in state. "
"Delegate simple greetings to 'greeting_agent' and farewells to 'farewell_agent'. "
"Handle only weather requests, greetings, and farewells.",
tools=[get_weather_stateful], # Use the state-aware tool
sub_agents=[greeting_agent, farewell_agent], # Include sub-agents
output_key="last_weather_report" # <<< Auto-save agent's final weather response
)
print(f"✅ Root Agent '{root_agent_stateful.name}' created using stateful tool and output_key.")
# --- Create Runner for this Root Agent & NEW Session Service ---
runner_root_stateful = Runner(
agent=root_agent_stateful,
app_name=APP_NAME,
session_service=session_service_stateful # Use the NEW stateful session service
)
print(f"✅ Runner created for stateful root agent '{runner_root_stateful.agent.name}' using stateful session service.")
else:
print("❌ Cannot create stateful root agent. Prerequisites missing.")
if not greeting_agent: print(" - greeting_agent definition missing.")
if not farewell_agent: print(" - farewell_agent definition missing.")
if 'get_weather_stateful' not in globals(): print(" - get_weather_stateful tool missing.")
4. 交互并测试状态流程¶
现在,让我们执行一个旨在测试状态交互的设计对话,使用 runner_root_stateful
(与我们状态智能体和 session_service_stateful
相关联)。我们将使用之前定义的 call_agent_async
函数,确保我们传递正确的运行器、用户 ID(USER_ID_STATEFUL
)和会话 ID(SESSION_ID_STATEFUL
)。
对话流程将是:
- 检查天气(伦敦):
get_weather_stateful
工具应该从步骤 1 中初始化的 "Celsius" 状态读取user_preference_temperature_unit
。根智能体的最终响应(以摄氏度为单位的天气报告)应该保存到state['last_weather_report']
通过output_key
配置。 - 手动更新状态:我们将直接修改
InMemorySessionService
实例的内部存储值(session_service_stateful
)。- 为什么直接修改?
session_service.get_session()
方法返回一个副本的会话。修改该副本不会影响后续智能体运行中使用的状态。对于此测试场景,我们访问内部sessions
字典来更改user_preference_temperature_unit
的实际存储状态值为 "Fahrenheit"。注意:在实际应用程序中,状态变化通常由工具或智能体逻辑返回EventActions(state_delta=...)
触发,而不是直接手动更新。
- 为什么直接修改?
- 再次检查天气(纽约):
get_weather_stateful
工具现在应该从状态读取更新后的 "Fahrenheit" 偏好,并相应地转换温度。根智能体的新响应(以华氏度为单位的天气)将覆盖state['last_weather_report']
中的先前值,因为output_key
。 - 向智能体问好:验证在状态操作的同时,委托给
greeting_agent
仍然正确。这种交互将成为此特定序列中output_key
保存的最后一个响应。 - 检查最终状态:在对话结束后,我们最后一次获取会话(获取副本)并打印其状态,以确认
user_preference_temperature_unit
确实是 "Fahrenheit",观察output_key
保存的最终值(此运行中为问候语),并查看工具写入的last_city_checked_stateful
值。
# Ensure the stateful runner (runner_root_stateful) is available from the previous cell
# Ensure call_agent_async, USER_ID_STATEFUL, SESSION_ID_STATEFUL, APP_NAME are defined
if 'runner_root_stateful' in globals() and runner_root_stateful:
async def run_stateful_conversation():
print("\n--- Testing State: Temp Unit Conversion & output_key ---")
# 1. Check weather (Uses initial state: Celsius)
print("--- Turn 1: Requesting weather in London (expect Celsius) ---")
await call_agent_async(query= "What's the weather in London?",
runner=runner_root_stateful,
user_id=USER_ID_STATEFUL,
session_id=SESSION_ID_STATEFUL
)
# 2. Manually update state preference to Fahrenheit - DIRECTLY MODIFY STORAGE
print("\n--- Manually Updating State: Setting unit to Fahrenheit ---")
try:
# Access the internal storage directly - THIS IS SPECIFIC TO InMemorySessionService for testing
stored_session = session_service_stateful.sessions[APP_NAME][USER_ID_STATEFUL][SESSION_ID_STATEFUL]
stored_session.state["user_preference_temperature_unit"] = "Fahrenheit"
# Optional: You might want to update the timestamp as well if any logic depends on it
# import time
# stored_session.last_update_time = time.time()
print(f"--- 已更新存储的会话状态。当前 'user_preference_temperature_unit':{stored_session.state['user_preference_temperature_unit']} ---")
except KeyError:
print(f"--- 错误:无法从内部存储中检索会话 '{SESSION_ID_STATEFUL}',用户 '{USER_ID_STATEFUL}',应用 '{APP_NAME}' 以更新状态。检查 ID 和是否创建了会话。---")
except Exception as e:
print(f"--- 更新内部会话状态时出错:{e} ---")
# 3. Check weather again (Tool should now use Fahrenheit)
# This will also update 'last_weather_report' via output_key
print("\n--- Turn 2: Requesting weather in New York (expect Fahrenheit) ---")
await call_agent_async("Tell me the weather in New York.",
runner=runner_root_stateful,
user_id=USER_ID_STATEFUL,
session_id=SESSION_ID_STATEFUL
)
# 4. Test basic delegation (should still work)
# This will update 'last_weather_report' again, overwriting the NY weather report
print("\n--- Turn 3: Sending a greeting ---")
await call_agent_async(query= "Hi!",
runner=runner_root_stateful,
user_id=USER_ID_STATEFUL,
session_id=SESSION_ID_STATEFUL
)
# Execute the conversation
await run_stateful_conversation()
# 方法 1:直接 await(笔记本/异步 REPL 的默认方式)
# 如果你的环境支持顶级 await(如 Colab/Jupyter 笔记本),
# 这意味着事件循环已经在运行,所以你可以直接 await 函数。
print("尝试使用 'await' 执行(笔记本默认)...")
await run_stateful_conversation()
# METHOD 2: asyncio.run (For Standard Python Scripts [.py])
# If running this code as a standard Python script from your terminal,
# the script context is synchronous. `asyncio.run()` is needed to
# create and manage an event loop to execute your async function.
# To use this method:
# 1. Comment out the `await run_stateful_conversation()` line above.
# 2. Uncomment the following block:
"""
import asyncio
if __name__ == "__main__": # Ensures this runs only when script is executed directly
print("Executing using 'asyncio.run()' (for standard Python scripts)...")
try:
# This creates an event loop, runs your async function, and closes the loop.
asyncio.run(run_stateful_conversation())
except Exception as e:
print(f"An error occurred: {e}")
"""
# --- Inspect final session state after the conversation ---
# This block runs after either execution method completes.
print("\n--- Inspecting Final Session State ---")
final_session = await session_service_stateful.get_session(app_name=APP_NAME,
user_id= USER_ID_STATEFUL,
session_id=SESSION_ID_STATEFUL)
if final_session:
# Use .get() for safer access to potentially missing keys
print(f"Final Preference: {final_session.state.get('user_preference_temperature_unit', 'Not Set')}")
print(f"Final Last Weather Report (from output_key): {final_session.state.get('last_weather_report', 'Not Set')}")
print(f"Final Last City Checked (by tool): {final_session.state.get('last_city_checked_stateful', 'Not Set')}")
# Print full state for detailed view
# print(f"Full State Dict: {final_session.state}") # For detailed view
else:
print("\n❌ Error: Could not retrieve final session state.")
else:
print("\n⚠️ Skipping state test conversation. Stateful root agent runner ('runner_root_stateful') is not available.")
通过审查对话流程和最终会话状态打印,你可以确认:
- 状态读取:天气工具(
get_weather_stateful
)正确读取user_preference_temperature_unit
从状态,最初使用 "Celsius" 用于伦敦。 - 状态更新:直接修改成功地将存储偏好更改为 "Fahrenheit"。
- 状态读取(更新):工具随后读取 "Fahrenheit" 当被问及纽约的天气时,并执行转换。根智能体的新响应(以华氏度为单位的天气)将覆盖
state['last_weather_report']
中的先前值,因为output_key
。 - 工具状态写入:工具成功地将
last_city_checked_stateful
("纽约" 在第二次天气检查后)写入状态,通过tool_context.state
。 - 委托:
greeting_agent
对于 "Hi!" 的委托仍然正确,即使状态发生了变化。 output_key
:output_key="last_weather_report"
成功保存根智能体的每个轮次的最终响应。在这个序列中,最后一个响应是问候语("Hello, there!"),所以它覆盖了状态键中的天气报告。- 最终状态:最终检查确认偏好保持为 "Fahrenheit"。
你已经成功地将会话状态集成到使用 ToolContext
个性化智能体行为,手动修改状态以测试 InMemorySessionService
,并观察 output_key
如何提供一种简单的方法来保持智能体的最后一个响应到状态。这种状态管理的基础理解对于我们继续实施使用回调的安全防护机制至关重要。
步骤 5:添加安全 - 输入保护机制使用 before_model_callback
¶
我们的智能体团队变得越来越有能力,记住偏好并有效使用工具。然而,在现实场景中,我们经常需要安全机制来控制智能体的行为在潜在问题请求到达核心大型语言模型(LLM)之前。
ADK 提供了回调——允许你钩入智能体执行生命周期中的特定点。before_model_callback
特别适用于输入安全。
什么是 before_model_callback
?
- 它是一个你定义的 Python 函数,ADK 在将编译后的请求(包括对话历史记录、指令、最新用户消息)发送到底层 LLM 之前执行。
- 目的:检查请求,如果需要,修改它,或完全阻止它基于预定义规则。
常见用例:
- 输入验证/过滤:检查用户输入是否符合标准或包含禁止内容(如 PII 或关键词)。
- 安全防护:防止有害、离题或违反政策的请求被 LLM 处理。
- 动态提示修改:在将请求发送到 LLM 之前,向 LLM 请求上下文添加及时信息(例如,从会话状态中)。
如何工作:
- 定义一个接受
callback_context: CallbackContext
和llm_request: LlmRequest
的函数。 callback_context
:提供对智能体信息、会话状态(callback_context.state
)等的访问。llm_request
:包含要发送给 LLM 的完整载荷(contents
、config
)。- 在函数内部:
- 检查:检查
llm_request.contents
(特别是最后一条用户消息)。 - 修改(小心使用):你可以更改
llm_request
的部分。 - 阻止(保护机制):返回一个
LlmResponse
对象。ADK 将立即发送此响应,跳过该轮次的 LLM 调用。 - 允许:返回
None
。ADK 继续使用(可能修改的)请求调用 LLM。
在本步骤中,我们将:
- 定义一个
before_model_callback
函数(block_keyword_guardrail
),检查用户输入是否包含特定关键词("BLOCK")。 - 更新我们的状态智能体(
weather_agent_v4_stateful
从步骤 4)以使用此回调。 - 创建一个与更新后的智能体相关联的新运行器,但使用相同的会话状态服务来保持状态连续性。
- 通过发送正常和包含关键词的请求来测试安全防护。
1. 定义保护机制回调函数¶
此函数将检查 llm_request
内容中的最后一条用户消息。如果找到 "BLOCK"(不分大小写),它会构造并返回一个 LlmResponse
对象以阻止流程;否则,它会返回 None
以继续。
# @title 1. 定义 before_model_callback 保护机制
# 确保必要的导入可用
from google.adk.agents.callback_context import CallbackContext
from google.adk.models.llm_request import LlmRequest
from google.adk.models.llm_response import LlmResponse
from google.genai import types # 用于创建响应内容
from typing import Optional
def block_keyword_guardrail(
callback_context: CallbackContext, llm_request: LlmRequest
) -> Optional[LlmResponse]:
"""
检查最新用户消息中的 'BLOCK'。如果找到,阻止 LLM 调用
并返回预定义的 LlmResponse。否则,返回 None 以继续。
"""
agent_name = callback_context.agent_name # 获取被拦截模型调用的智能体名称
print(f"--- 回调:block_keyword_guardrail 正在为智能体运行:{agent_name} ---")
# 从请求历史中提取最新用户消息的文本
last_user_message_text = ""
if llm_request.contents:
# 查找角色为 'user' 的最近消息
for content in reversed(llm_request.contents):
if content.role == 'user' and content.parts:
# 为简单起见,假设文本在第一部分中
if content.parts[0].text:
last_user_message_text = content.parts[0].text
break # 找到最后一条用户消息文本
print(f"--- 回调:检查最后一条用户消息:'{last_user_message_text[:100]}...' ---") # 记录前 100 个字符
# --- 保护机制逻辑 ---
keyword_to_block = "BLOCK"
if keyword_to_block in last_user_message_text.upper(): # 不区分大小写检查
print(f"--- 回调:找到 '{keyword_to_block}'。阻止 LLM 调用!---")
# 可选,在状态中设置标志以记录阻止事件
callback_context.state["guardrail_block_keyword_triggered"] = True
print(f"--- 回调:设置状态 'guardrail_block_keyword_triggered':True ---")
# 构造并返回 LlmResponse 以停止流程并发送此回复
return LlmResponse(
content=types.Content(
role="model", # 从智能体的角度模拟响应
parts=[types.Part(text=f"我无法处理此请求,因为它包含被阻止的关键词 '{keyword_to_block}'。")],
)
# 注意:如果需要,你也可以在这里设置 error_message 字段
)
else:
# 未找到关键词,允许请求继续到 LLM
print(f"--- 回调:未找到关键词。允许 {agent_name} 的 LLM 调用。---")
return None # 返回 None 表示 ADK 正常继续
print("✅ block_keyword_guardrail 函数已定义。")
2. 更新根智能体以使用回调¶
我们重新定义根智能体,添加 before_model_callback
参数并将其指向我们的新保护机制函数。我们将给它一个新的版本名称以供清晰。
重要:我们需要重新定义子智能体(greeting_agent
、farewell_agent
)和状态工具(get_weather_stateful
),以确保根智能体定义可以访问所有组件。
# @title 2. Update Root Agent with before_model_callback
# --- Redefine Sub-Agents (Ensures they exist in this context) ---
greeting_agent = None
try:
# Use a defined model constant
greeting_agent = Agent(
model=MODEL_GEMINI_2_0_FLASH,
name="greeting_agent", # Keep original name for consistency
instruction="You are the Greeting Agent. Your ONLY task is to provide a friendly greeting using the 'say_hello' tool. Do nothing else.",
description="Handles simple greetings and hellos using the 'say_hello' tool.",
tools=[say_hello],
)
print(f"✅ Sub-Agent '{greeting_agent.name}' redefined.")
except Exception as e:
print(f"❌ Could not redefine Greeting agent. Check Model/API Key ({MODEL_GPT_4O}). Error: {e}")
farewell_agent = None
try:
# Use a defined model constant
farewell_agent = Agent(
model=MODEL_GEMINI_2_0_FLASH,
name="farewell_agent", # Keep original name
instruction="You are the Farewell Agent. Your ONLY task is to provide a polite goodbye message using the 'say_goodbye' tool. Do not perform any other actions.",
description="Handles simple farewells and goodbyes using the 'say_goodbye' tool.",
tools=[say_goodbye],
)
print(f"✅ Sub-Agent '{farewell_agent.name}' redefined.")
except Exception as e:
print(f"❌ Could not redefine Farewell agent. Check Model/API Key ({MODEL_GPT_4O}). Error: {e}")
# --- Define the Root Agent with the Callback ---
root_agent_model_guardrail = None
runner_root_model_guardrail = None
# Check all components before proceeding
if greeting_agent and farewell_agent and 'get_weather_stateful' in globals() and 'block_keyword_guardrail' in globals():
# Use a defined model constant like MODEL_GEMINI_2_5_PRO
root_agent_model = MODEL_GEMINI_2_0_FLASH
root_agent_model_guardrail = Agent(
name="weather_agent_v5_model_guardrail", # New version name for clarity
model=root_agent_model,
description="Main agent: Handles weather, delegates greetings/farewells, includes input keyword guardrail.",
instruction="You are the main Weather Agent. Provide weather using 'get_weather_stateful'. "
"Delegate simple greetings to 'greeting_agent' and farewells to 'farewell_agent'. "
"Handle only weather requests, greetings, and farewells.",
tools=[get_weather],
sub_agents=[greeting_agent, farewell_agent], # Reference the redefined sub-agents
output_key="last_weather_report", # Keep output_key from Step 4
before_model_callback=block_keyword_guardrail # <<< Assign the guardrail callback
)
print(f"✅ Root Agent '{root_agent_model_guardrail.name}' created with before_model_callback.")
# --- Create Runner for this Agent, Using SAME Stateful Session Service ---
# Ensure session_service_stateful exists from Step 4
if 'session_service_stateful' in globals():
runner_root_model_guardrail = Runner(
agent=root_agent_model_guardrail,
app_name=APP_NAME, # Use consistent APP_NAME
session_service=session_service_stateful # <<< Use the service from Step 4
)
print(f"✅ Runner created for guardrail agent '{runner_root_model_guardrail.agent.name}', using stateful session service.")
else:
print("❌ Cannot create runner. 'session_service_stateful' from Step 4 is missing.")
else:
print("❌ Cannot create root agent with model guardrail. One or more prerequisites are missing or failed initialization:")
if not greeting_agent: print(" - Greeting Agent")
if not farewell_agent: print(" - Farewell Agent")
if 'get_weather_stateful' not in globals(): print(" - 'get_weather_stateful' tool")
if 'block_keyword_guardrail' not in globals(): print(" - 'block_keyword_guardrail' callback")
3. 交互以测试保护机制¶
让我们测试保护机制的行为。我们将使用与步骤 4 相同的会话(SESSION_ID_STATEFUL
)来显示状态在更改时保持不变。
- 发送正常天气请求(应该通过保护机制并执行)。
- 发送包含 "BLOCK" 的请求(应该被回调拦截)。
- 发送问候(应该通过根智能体的保护机制,被委托,并正常执行)。
# @title 3. Interact to Test the Model Input Guardrail
# Ensure the runner for the guardrail agent is available
if runner_root_model_guardrail:
async def run_guardrail_test_conversation():
print("\n--- Testing Model Input Guardrail ---")
# Use the runner for the agent with the callback and the existing stateful session ID
interaction_func = lambda query: call_agent_async(query,
runner_root_model_guardrail, USER_ID_STATEFUL, SESSION_ID_STATEFUL # <-- Pass correct IDs
)
# 1. Normal request (Callback allows, should use Fahrenheit from Step 4 state change)
await interaction_func("What is the weather in London?")
# 2. Request containing the blocked keyword
await interaction_func("BLOCK the request for weather in Tokyo")
# 3. Normal greeting (Callback allows root agent, delegation happens)
await interaction_func("Hello again")
# Execute the conversation
await run_guardrail_test_conversation()
# METHOD 2: asyncio.run (For Standard Python Scripts [.py])
# If running this code as a standard Python script from your terminal,
# the script context is synchronous. `asyncio.run()` is needed to
# create and manage an event loop to execute your async function.
# To use this method:
# 1. Comment out the `await run_guardrail_test_conversation()` line above.
# 2. Uncomment the following block:
"""
import asyncio
if __name__ == "__main__": # Ensures this runs only when script is executed directly
print("Executing using 'asyncio.run()' (for standard Python scripts)...")
try:
# This creates an event loop, runs your async function, and closes the loop.
asyncio.run(run_guardrail_test_conversation())
except Exception as e:
print(f"An error occurred: {e}")
"""
# --- Inspect final session state after the conversation ---
# This block runs after either execution method completes.
# Optional: Check state for the trigger flag set by the callback
print("\n--- Inspecting Final Session State (After Guardrail Test) ---")
# Use the session service instance associated with this stateful session
final_session = await session_service_stateful.get_session(app_name=APP_NAME,
user_id=USER_ID_STATEFUL,
session_id=SESSION_ID_STATEFUL)
if final_session:
# Use .get() for safer access
print(f"Guardrail Triggered Flag: {final_session.state.get('guardrail_block_keyword_triggered', 'Not Set (or False)')}")
print(f"Last Weather Report: {final_session.state.get('last_weather_report', 'Not Set')}") # Should be London weather if successful
print(f"Temperature Unit: {final_session.state.get('user_preference_temperature_unit', 'Not Set')}") # Should be Fahrenheit
# print(f"Full State Dict: {final_session.state}") # For detailed view
else:
print("\n❌ Error: Could not retrieve final session state.")
else:
print("\n⚠️ Skipping model guardrail test. Runner ('runner_root_model_guardrail') is not available.")
观察执行流程:
- 伦敦天气:回调运行于
weather_agent_v5_model_guardrail
,检查消息,打印 "Keyword not found. Allowing LLM call.",并返回None
。代理继续,调用get_weather_stateful
工具(使用步骤 4 状态变化中的 "Fahrenheit" 偏好),并返回天气。此响应更新last_weather_report
通过output_key
。 - 包含关键词的请求:回调再次运行于
weather_agent_v5_model_guardrail
,检查消息,找到 "BLOCK",打印 "Blocking LLM call!",设置状态标志,并返回预定义的LlmResponse
。代理的底层 LLM 在此轮次中从未被调用。用户看到回调的阻止消息。 - 再次问候:回调运行于
weather_agent_v5_model_guardrail
,允许请求。根智能体然后委托给greeting_agent
。注意:根智能体上定义的before_model_callback
不会自动应用于子智能体。greeting_agent
正常进行,调用其say_hello
工具,并返回问候。
You have successfully implemented an input safety layer! The before_model_callback
provides a powerful mechanism to enforce rules and control agent behavior before expensive or potentially risky LLM calls are made. Next, we'll apply a similar concept to add guardrails around tool usage itself.
Step 6: Adding Safety - Tool Argument Guardrail (before_tool_callback
)¶
In Step 5, we added a guardrail to inspect and potentially block user input before it reached the LLM. Now, we'll add another layer of control after the LLM has decided to use a tool but before that tool actually executes. This is useful for validating the arguments the LLM wants to pass to the tool.
ADK provides the before_tool_callback
for this precise purpose.
What is before_tool_callback
?
- It's a Python function executed just before a specific tool function runs, after the LLM has requested its use and decided on the arguments.
- Purpose: Validate tool arguments, prevent tool execution based on specific inputs, modify arguments dynamically, or enforce resource usage policies.
Common Use Cases:
- Argument Validation: Check if arguments provided by the LLM are valid, within allowed ranges, or conform to expected formats.
- Resource Protection: Prevent tools from being called with inputs that might be costly, access restricted data, or cause unwanted side effects (e.g., blocking API calls for certain parameters).
- Dynamic Argument Modification: Adjust arguments based on session state or other contextual information before the tool runs.
How it Works:
-
Define a function accepting
tool: BaseTool
,args: Dict[str, Any]
, andtool_context: ToolContext
.tool
: The tool object about to be called (inspecttool.name
).args
: The dictionary of arguments the LLM generated for the tool.tool_context
: Provides access to session state (tool_context.state
), agent info, etc.
-
Inside the function:
- Inspect: Examine the
tool.name
and theargs
dictionary. - Modify: Change values within the
args
dictionary directly. If you returnNone
, the tool runs with these modified args. - Block/Override (Guardrail): Return a dictionary. ADK treats this dictionary as the result of the tool call, completely skipping the execution of the original tool function. The dictionary should ideally match the expected return format of the tool it's blocking.
- Allow: Return
None
. ADK proceeds to execute the actual tool function with the (potentially modified) arguments.
- Inspect: Examine the
In this step, we will:
- Define a
before_tool_callback
function (block_paris_tool_guardrail
) that specifically checks if theget_weather_stateful
tool is called with the city "Paris". - If "Paris" is detected, the callback will block the tool and return a custom error dictionary.
- Update our root agent (
weather_agent_v6_tool_guardrail
) to include both thebefore_model_callback
and this newbefore_tool_callback
. - Create a new runner for this agent, using the same stateful session service.
- Test the flow by requesting weather for allowed cities and the blocked city ("Paris").
步骤 6:添加安全 - 工具参数保护机制(before_tool_callback
)¶
在步骤 5 中,我们添加了一个保护机制来检查和潜在阻止用户输入在 LLM 决定使用工具之前。现在,我们将添加另一层控制在 LLM 决定使用工具之后但在工具实际执行之前。这对于验证LLM 想要传递给工具的参数很有用。
ADK 提供了 before_tool_callback
用于此目的。
什么是 before_tool_callback
?
- 它是一个在特定工具函数运行之前执行的 Python 函数,在 LLM 决定使用该工具并生成参数之后。
- 目的:验证工具参数、防止工具执行基于特定输入、动态修改参数或执行资源使用策略。
常见用例:
- 参数验证:检查 LLM 提供的参数是否有效、在允许范围内或符合预期格式。
- 资源保护:防止工具被调用带有可能会产生负面影响或访问受限数据的输入(例如,阻止某些参数的 API 调用)。
- 动态参数修改:在工具运行之前基于会话状态或其他上下文信息调整参数。
如何工作:
- 定义一个接受
tool: BaseTool
、args: Dict[str, Any]
和tool_context: ToolContext
的函数。 tool
:即将被调用的工具对象(检查tool.name
)。args
:LLM 为工具生成的参数字典。tool_context
:提供对会话状态(tool_context.state
)、智能体信息等的访问。- 在函数内部:
- 检查:检查
tool.name
和args
字典。 - 修改:直接更改
args
字典中的值。如果返回None
,工具将使用这些修改后的参数运行。 - 阻止/覆盖(保护机制):返回 字典。ADK 将此字典视为工具调用的结果,完全跳过原始工具函数的执行。字典应该理想地匹配被阻止的工具的预期返回格式。
- 允许:返回
None
。ADK 继续使用(可能修改后的)参数执行实际工具函数。
在本步骤中,我们将:
- 定义一个
before_tool_callback
函数(block_paris_tool_guardrail
),专门检查get_weather_stateful
工具是否被调用时带有城市 "Paris"。 - 如果检测到 "Paris",回调将阻止工具并返回自定义错误字典。
- 更新我们的根智能体(
weather_agent_v6_tool_guardrail
)以包括两个before_model_callback
和这个新的before_tool_callback
。 - 创建一个新的运行器用于此智能体,使用相同的会话状态服务。
- 通过请求允许的城市和被阻止的城市("Paris")来测试流程。
1. 定义工具保护机制回调函数¶
此函数针对 get_weather_stateful
工具。它检查 city
参数。如果它是 "Paris",它返回一个自定义错误字典,看起来像工具的错误响应。否则,它允许工具运行,返回 None
。
# @title 1. Define the before_tool_callback Guardrail
# Ensure necessary imports are available
from google.adk.tools.base_tool import BaseTool
from google.adk.tools.tool_context import ToolContext
from typing import Optional, Dict, Any # For type hints
def block_paris_tool_guardrail(
tool: BaseTool, args: Dict[str, Any], tool_context: ToolContext
) -> Optional[Dict]:
"""
Checks if 'get_weather_stateful' is called for 'Paris'.
If so, blocks the tool execution and returns a specific error dictionary.
Otherwise, allows the tool call to proceed by returning None.
"""
tool_name = tool.name
agent_name = tool_context.agent_name # Agent attempting the tool call
print(f"--- Callback: block_paris_tool_guardrail running for tool '{tool_name}' in agent '{agent_name}' ---")
print(f"--- Callback: Inspecting args: {args} ---")
# --- Guardrail Logic ---
target_tool_name = "get_weather_stateful" # Match the function name used by FunctionTool
blocked_city = "paris"
# Check if it's the correct tool and the city argument matches the blocked city
if tool_name == target_tool_name:
city_argument = args.get("city", "") # Safely get the 'city' argument
if city_argument and city_argument.lower() == blocked_city:
print(f"--- Callback: Detected blocked city '{city_argument}'. Blocking tool execution! ---")
# Optionally update state
tool_context.state["guardrail_tool_block_triggered"] = True
print(f"--- Callback: Set state 'guardrail_tool_block_triggered': True ---")
# Return a dictionary matching the tool's expected output format for errors
# This dictionary becomes the tool's result, skipping the actual tool run.
return {
"status": "error",
"error_message": f"Policy restriction: Weather checks for '{city_argument.capitalize()}' are currently disabled by a tool guardrail."
}
else:
print(f"--- Callback: City '{city_argument}' is allowed for tool '{tool_name}'. ---")
else:
print(f"--- Callback: Tool '{tool_name}' is not the target tool. Allowing. ---")
# If the checks above didn't return a dictionary, allow the tool to execute
print(f"--- Callback: Allowing tool '{tool_name}' to proceed. ---")
return None # Returning None allows the actual tool function to run
print("✅ block_paris_tool_guardrail function defined.")
2. 更新根智能体以使用两个回调¶
我们再次重新定义根智能体(weather_agent_v6_tool_guardrail
),这次添加 before_tool_callback
参数,与步骤 5 中的 before_model_callback
一起。
自包含执行说明:与步骤 5 类似,确保所有先决条件(子智能体、工具、before_model_callback
)已定义或可用,然后再定义此智能体。
# @title 2. Update Root Agent with BOTH Callbacks (Self-Contained)
# --- Ensure Prerequisites are Defined ---
# (Include or ensure execution of definitions for: Agent, LiteLlm, Runner, ToolContext,
# MODEL constants, say_hello, say_goodbye, greeting_agent, farewell_agent,
# get_weather_stateful, block_keyword_guardrail, block_paris_tool_guardrail)
# --- Redefine Sub-Agents (Ensures they exist in this context) ---
greeting_agent = None
try:
# Use a defined model constant like MODEL_GPT_4O
greeting_agent = Agent(
model=MODEL_GEMINI_2_0_FLASH,
name="greeting_agent", # Keep original name for consistency
instruction="You are the Greeting Agent. Your ONLY task is to provide a friendly greeting using the 'say_hello' tool. Do nothing else.",
description="Handles simple greetings and hellos using the 'say_hello' tool.",
tools=[say_hello],
)
print(f"✅ Sub-Agent '{greeting_agent.name}' redefined.")
except Exception as e:
print(f"❌ Could not redefine Greeting agent. Check Model/API Key ({MODEL_GPT_4O}). Error: {e}")
farewell_agent = None
try:
# Use a defined model constant like MODEL_GPT_4O
farewell_agent = Agent(
model=MODEL_GEMINI_2_0_FLASH,
name="farewell_agent", # Keep original name
instruction="You are the Farewell Agent. Your ONLY task is to provide a polite goodbye message using the 'say_goodbye' tool. Do not perform any other actions.",
description="Handles simple farewells and goodbyes using the 'say_goodbye' tool.",
tools=[say_goodbye],
)
print(f"✅ Sub-Agent '{farewell_agent.name}' redefined.")
except Exception as e:
print(f"❌ Could not redefine Farewell agent. Check Model/API Key ({MODEL_GPT_4O}). Error: {e}")
# --- Define the Root Agent with Both Callbacks ---
root_agent_tool_guardrail = None
runner_root_tool_guardrail = None
if ('greeting_agent' in globals() and greeting_agent and
'farewell_agent' in globals() and farewell_agent and
'get_weather_stateful' in globals() and
'block_keyword_guardrail' in globals() and
'block_paris_tool_guardrail' in globals()):
root_agent_model = MODEL_GEMINI_2_0_FLASH
root_agent_tool_guardrail = Agent(
name="weather_agent_v6_tool_guardrail", # New version name
model=root_agent_model,
description="Main agent: Handles weather, delegates, includes input AND tool guardrails.",
instruction="You are the main Weather Agent. Provide weather using 'get_weather_stateful'. "
"Delegate greetings to 'greeting_agent' and farewells to 'farewell_agent'. "
"Handle only weather, greetings, and farewells.",
tools=[get_weather_stateful],
sub_agents=[greeting_agent, farewell_agent],
output_key="last_weather_report",
before_model_callback=block_keyword_guardrail, # Keep model guardrail
before_tool_callback=block_paris_tool_guardrail # <<< Add tool guardrail
)
print(f"✅ Root Agent '{root_agent_tool_guardrail.name}' created with BOTH callbacks.")
# --- Create Runner, Using SAME Stateful Session Service ---
if 'session_service_stateful' in globals():
runner_root_tool_guardrail = Runner(
agent=root_agent_tool_guardrail,
app_name=APP_NAME,
session_service=session_service_stateful # <<< Use the service from Step 4/5
)
print(f"✅ Runner created for tool guardrail agent '{runner_root_tool_guardrail.agent.name}', using stateful session service.")
else:
print("❌ Cannot create runner. 'session_service_stateful' from Step 4/5 is missing.")
else:
print("❌ Cannot create root agent with tool guardrail. Prerequisites missing.")
3. 交互以测试工具保护机制¶
让我们再次使用相同的会话状态服务(SESSION_ID_STATEFUL
)从之前的步骤中进行交互。
- 请求天气(纽约):通过两个回调,工具执行(使用步骤 4 状态变化中的 "Fahrenheit" 偏好)。
- 请求天气(巴黎):通过
before_model_callback
。LLM 决定调用get_weather_stateful(city='Paris')
。before_tool_callback
拦截,阻止工具,并返回错误字典。智能体传递此错误。 - 请求天气(伦敦):通过两个回调,工具正常执行。
# @title 3. Interact to Test the Tool Argument Guardrail
# Ensure the runner for the tool guardrail agent is available
if runner_root_tool_guardrail:
async def run_tool_guardrail_test():
print("\n--- Testing Tool Argument Guardrail ('Paris' blocked) ---")
# Use the runner for the agent with both callbacks and the existing stateful session
interaction_func = lambda query: call_agent_async(query, runner_root_tool_guardrail, USER_ID_STATEFUL, SESSION_ID_STATEFUL)
# 1. Allowed city (Should pass both callbacks, use Fahrenheit state)
await interaction_func("What's the weather in New York?")
# 2. Blocked city (Should pass model callback, but be blocked by tool callback)
await interaction_func("How about Paris?")
# 3. Another allowed city (Should work normally again)
await interaction_func("Tell me the weather in London.")
# Execute the conversation
await run_tool_guardrail_test()
# METHOD 1: Direct await (Default for Notebooks/Async REPLs)
# If your environment supports top-level await (like Colab/Jupyter notebooks),
# it means an event loop is already running, so you can directly await the function.
print("尝试使用 'await' 执行(笔记本默认)...")
await run_tool_guardrail_test()
# METHOD 2: asyncio.run (For Standard Python Scripts [.py])
# If running this code as a standard Python script from your terminal,
# the script context is synchronous. `asyncio.run()` is needed to
# create and manage an event loop to execute your async function.
# To use this method:
# 1. Comment out the `await run_tool_guardrail_test()` line above.
# 2. Uncomment the following block:
"""
import asyncio
if __name__ == "__main__": # Ensures this runs only when script is executed directly
print("Executing using 'asyncio.run()' (for standard Python scripts)...")
try:
# This creates an event loop, runs your async function, and closes the loop.
asyncio.run(run_tool_guardrail_test())
except Exception as e:
print(f"An error occurred: {e}")
"""
# --- Inspect final session state after the conversation ---
# This block runs after either execution method completes.
# Optional: Check state for the tool block trigger flag
print("\n--- Inspecting Final Session State (After Tool Guardrail Test) ---")
# Use the session service instance associated with this stateful session
final_session = await session_service_stateful.get_session(app_name=APP_NAME,
user_id=USER_ID_STATEFUL,
session_id= SESSION_ID_STATEFUL)
if final_session:
# Use .get() for safer access
print(f"Tool Guardrail Triggered Flag: {final_session.state.get('guardrail_tool_block_triggered', 'Not Set (or False)')}")
print(f"Last Weather Report: {final_session.state.get('last_weather_report', 'Not Set')}") # Should be London weather if successful
print(f"Temperature Unit: {final_session.state.get('user_preference_temperature_unit', 'Not Set')}") # Should be Fahrenheit
# print(f"Full State Dict: {final_session.state}") # For detailed view
else:
print("\n❌ Error: Could not retrieve final session state.")
else:
print("\n⚠️ Skipping tool guardrail test. Runner ('runner_root_tool_guardrail') is not available.")
(运行上面的代码单元以生成输出。保持下面的输出单元在 markdown 中)
分析输出:
- 纽约:
before_model_callback
允许请求。LLM 请求get_weather_stateful
。before_tool_callback
运行,检查参数({'city': 'New York'}
),看到它不是 "Paris",打印 "Allowing tool...",并返回None
。实际get_weather_stateful
函数执行,读取 "Fahrenheit" 从状态,并返回天气报告。智能体传递此信息,并通过output_key
保存。 - 巴黎:
before_model_callback
允许请求。LLM 请求get_weather_stateful(city='Paris')
。before_tool_callback
运行,检查参数,检测到 "Paris",打印 "Blocking tool execution!",设置状态标志,并返回错误字典{'status': 'error', 'error_message': 'Policy restriction...'}
。实际get_weather_stateful
函数从未执行。智能体收到错误字典好像它是工具的输出,并根据该错误消息制定响应。 - 伦敦:行为与纽约相同,通过两个回调并成功执行工具。新的伦敦天气报告覆盖
state['last_weather_report']
中的先前值。
你已经成功添加了一个关键的安全层,不仅控制什么到达 LLM,还控制如何基于特定参数生成的工具可以被使用。before_model_callback
和 before_tool_callback
这样的回调对于构建强大、安全且符合政策的智能体应用程序至关重要。
结论:你的智能体团队已经准备好了!¶
恭喜你!你已经成功地从构建一个基本的天气智能体转变为使用 ADK 构建一个复杂的、多智能体团队。
让我们回顾一下你已经完成了什么:
- 你从一个基本智能体开始,配备了一个单一工具(
get_weather
)。 - 你探索了 ADK 的多模型灵活性,使用 LiteLLM,在不同的 LLM 上运行相同的内核逻辑。
- 你拥抱了模块化,通过创建专门的子智能体(
greeting_agent
、farewell_agent
)和启用自动委托从根智能体。 - 你为智能体提供了记忆,使用会话状态,允许他们记住用户偏好(
temperature_unit
)和过去的交互(output_key
)。 - 你实现了关键的安全防护机制,使用
before_model_callback
(阻止特定输入关键词)和before_tool_callback
(阻止基于参数的工具使用,如城市 "Paris")。
通过构建这个渐进的天气机器人团队,你获得了使用核心 ADK 概念构建复杂、智能应用程序的实践经验。
关键收获:
- 智能体和工具:定义能力和推理的基本构建块。清晰的指令和文档字符串至关重要。
- 运行器和会话服务:引擎和内存管理系统,负责编排智能体执行并保持对话上下文。
- 委托:设计多智能体团队允许专业化、模块化和更好地管理复杂任务。智能体
description
是自动流程的关键。 - 会话状态(
ToolContext
、output_key
):对于创建上下文感知、个性化和多轮对话智能体至关重要。 - 回调(
before_model
、before_tool
):强大的钩子,用于实现安全、验证、政策执行和动态修改在关键操作(LLM 调用或工具执行)之前。 - 灵活性(
LiteLlm
):ADK 让你为工作选择最佳的 LLM,平衡性能、成本和功能。
接下来做什么?
你的天气机器人团队是一个很好的起点。这里有一些想法可以进一步探索 ADK 并增强你的应用程序:
Happy building!