智能体工作流的数据处理¶
在智能体与基于图的节点之间构建并管理清晰的数据结构,是使用 ADK 打造可靠业务流程的关键。本指南详细解析了基于图的工作流以及协作智能体中的数据处理机制,包括信息如何利用事件 (Events) 在节点间流转。我们将深入探讨事件、数据、内容和状态的核心参数,并演示如何通过数据架构 (Schemas) 与特定的指令语法,在函数节点和智能体节点间实现精准的结构化数据互传。
Alpha 发布版说明
ADK 2.0 目前处于 Alpha 阶段,与之前的版本相比可能存在破坏性变更。如果你在生产环境中对稳定性有极高要求,建议暂不使用。我们非常期待你测试该版本并提交反馈!
严重警告:严禁混用 ADK 2.0 与 1.0 的存储系统
如果你在 ADK 2.0 项目中启用了持久化存储,请务必确保其存储空间(包括会话存储、记忆库及评估数据)与 1.0 项目完全隔离。混用会导致不可恢复的数据丢失或由于格式不兼容导致 1.0 项目瘫痪。
工作流图中的事件 (Events)¶
在基于图的工作流中,事件 (Events) 是承载数据的核心容器。工作流中的每一个执行节点都在不断地接收(消费)并发送事件。
事件包含多个专用参数,用于满足不同维度的数据传输需求。以下是你需要掌握的关键参数:
output:用于在节点之间传递业务处理结果的标准参数。message:专门用于面向用户的响应内容。state:在整个 ADK 会话生命周期内,通过事件在各节点间自动流转并持久化的小型元数据。
此外,事件还封装了诸如“源节点信息”等元数据,以便工作流追踪执行链路。
节点的输入与输出机制¶
图中的每个节点都通过 Event 类进行数据吞吐。通常使用 yield 语法将处理结果移交给后续节点。
from google.adk import Event
# 定义一个简单的函数节点
def my_function_node(node_input: str):
# 将输入转为大写
output_value = node_input.upper()
# 使用 return 或 yield 返回事件对象
return Event(output=output_value)
语法建议:
* return:当节点只需要产出一个最终 Event 且不需要额外后续处理时使用。
* yield:当需要生成一系列数据项,或者某些数据项需要异步/延后处理时使用。每次调用 yield 都会将内容追加到当前事件的数据对象列表中。
* 空返回:不带参数的 return 或 yield 会向下游传递一个 None 值。
Event.output 参数详解¶
output 是节点间协作的“通用语言”。下游节点会自动将该参数中的内容识别为节点输入 (node_input)。
# 节点 1:生成结果数据
def my_function_node_1():
return Event(output="The Result")
# 节点 2:接收并处理输入
def my_function_node_2(node_input: Content):
# 提取并转换数据
output_value = node_input.parts[0].text.lower()
return Event(output=output_value) # 最终产出 "the result"
你可以利用此参数传递更为复杂的结构化字典或对象:
# 传递 JSON 风格的结构化数据
def my_function_node_3():
yield Event(
output={
"city_name": "Paris",
"city_time": "10:10 AM",
},
)
注意:Event.output 负载限制
在单次执行周期内,节点仅允许发送一个主 Event.output。虽然可以使用多次 yield,但如果存在两个或更多包含 output 负载的 yield 命令,运行时将会抛出错误。
Event.message 参数详解¶
message 专门用于同最终用户进行沟通。除非你的节点逻辑涉及向用户同步当前进度或询问补充问题,否则应优先使用 output。
# 该节点用于向前端用户反馈进度,而不影响核心业务逻辑的数据传输
async def user_progress_notifier(node_input: str):
"""告知用户后台研究已启动。"""
yield Event(message="正在为你启动深度研究,请稍候...")
Event.state 参数详解¶
state 允许你在会话中维护一组轻量级的全局变量。这些值在各个节点间透明持久化,常用于控制循环逻辑或分支走向。
# 1. 初始化重试状态
async def init_state_node(attempts: int = 0):
yield Event(
state={
"attempts": attempts,
},
)
# 2. 执行任务并自增计数器
async def task_attempt_node(node_input: Content, attempts: int):
yield Event(
state={
"attempts": attempts + 1,
},
)
# 3. 读取并展示状态
async def read_state_node(ctx: WorkflowContext):
print(f"当前尝试次数状态: {ctx.state}") # 输出: attempts: 1
# 定义工作流
root_agent = Workflow(
name="root_agent",
edges=[("START", init_state_node, task_attempt_node, read_state_node)],
)
注意:state 属性的存储容量限制
state 严禁用于存储大规模业务数据(如文件内容或长列表)。大宗数据应当通过 Artifact 或外部数据库进行持久化。
使用架构 (Schema) 约束数据流¶
为了增强代码的健壮性,你可以设置输入和输出数据架构,以约束任何节点(包括函数节点 (FunctionNodes) 和智能体 (Agents))的输入和输出数据格式。以下参数是任何节点的可选设置。你可以根据智能体项目的需要,在任何工作流节点上设置这两个参数或其中之一。
input_schema:定义该节点期望接收的数据模型(继承自BaseModel)。output_schema:定义该节点保证产出的数据模型。 下面的代码示例展示了如何为子智能体设置输入和输出架构。
from google.adk import Agent
from pydantic import BaseModel
# 定义强类型的搜索输入模型
class FlightSearchInput(BaseModel):
origin: str # 机场代码,例如 "SFO"
destination: str # 机场代码,例如 "CDG"
departure_date: date # 日期,例如 date(2026, 3, 15)
passengers: int = 1 # 乘客人数
# 定义搜索结果输出模型
class FlightSearchOutput(BaseModel):
flights: list[Flight]
cheapest_price: float
# 配置智能体
flight_searcher = Agent(
name="flight_searcher",
instruction="搜索可用航班。",
input_schema=FlightSearchInput,
output_schema=FlightSearchOutput,
tools=[search_flights_api],
mode="single-turn",
...
)
# 协调者智能体
assistant = Agent(
name="assistant",
instruction="你帮助用户规划行程。",
sub_agents=[flight_searcher],
...
)
在智能体指令中引用结构化数据¶
当上游节点将结构化数据流入智能体时,你可以在智能体的 instruction(指令)中通过模板语法直接引用这些属性。
当你从子智能体或工作流节点(如函数节点)将结构化数据传递到智能体时,可以使用特定语法将该数据添加到智能体的指令中。具体而言,你可以使用花括号 { } 选择输入架构属性,或者使用 < > 指定输入架构属性、from 关键字以及提供数据的节点名称。以下代码片段展示了两种包含通过智能体输入架构 (input schema) 传递的数据的方法:
- 花括号语法
{ }:基于当前智能体input_schema的属性进行选择。 - 箭头语法
< >:当存在多个数据源时,指定属性来源的节点名称。
# 定义城市时间架构
class CityTime(BaseModel):
time_info: str # 时间信息
city: str # 城市名称
# 函数节点:获取时间
def lookup_time_function(city: str):
"""模拟返回指定城市的当前时间。"""
return Event(output=CityTime(time_info='10:10 AM', city=city))
# 智能体:报告城市时间
city_report_agent = Agent(
name="city_report_agent",
model="gemini-2.5-flash",
input_schema=CityTime,
# 方式一:基于类和参数的数据选择
# instruction="""
# 请按以下格式返回一句话:
# 现在 {CityTime.city} 的时间是 {CityTime.time_info}。
# """,
# 方式二:基于源节点名称的更严格数据选择
instruction="""
请按以下格式返回一句话:
现在 <CityTime.city from lookup_time_function> 的时间是
<CityTime.time_info from lookup_time_function>。
""",
)
# 根级工作流
root_agent = Workflow(
name="root_agent",
edges=[
(START, city_generator_agent, lookup_time_function, city_report_agent)
],
)
如果你想了解此类工作流的快速入门示例,请参阅基于图的智能体工作流设计。