为智能体工作流构建图路由¶
ADK 中的基于图的工作流将智能体逻辑定义为由执行节点 (Nodes) 和边 (Edges) 组成的拓扑结构。这允许你构建一个将 AI 推理与确定性代码逻辑紧密结合的可靠流程。通过图路由,你可以轻松封装函数、AI 智能体、工具以及人机协作步骤。相比于纯提示词驱动的智能体,这种明确绘制执行路径的方法为复杂的、分步的业务过程提供了极高的精确度和可靠性。
# 定义具备路由逻辑的工作流
root_agent = Workflow(
name="routing_workflow",
edges=[
("START", process_message, router), # 从 START 开始,经过处理后进入路由节点
(router,
{
"output-1": response_1,
"output-2": response_2,
"output-3": response_3,
},
),
],
)
图 1. 任务路由图的可视化及其对应的 Workflow 代码实现。
使用基于图的智能体工作流的优势在于,相比基于提示词的智能体,它在控制力、可预测性和可靠性方面有了显著提升。通过在代码中定义整体流程工作流,你可以更精细地控制任务的路由和执行方式。这种结构化的节点定义提高了智能体的可预测性,并增强了对于需要明确步骤和过程管理的复杂任务的可靠性。
Alpha 发布版说明
ADK 2.0 是一个 Alpha 发布版,与之前的 ADK 版本一起使用时可能会导致破坏性变更。如果你需要向后兼容性(例如在生产环境中),请不要使用 ADK 2.0。我们鼓励你测试此版本,并欢迎提供你的反馈!
想要快速上手图工作流?请先查阅基于图的智能体工作流基础。
节点定义与类型¶
一个工作流图由多个执行节点 (Nodes) 组成。这些节点可以是: * 智能体 (Agents):执行推理任务的大模型实体。 * 自定义工具 (Tools):特定的 Python 函数。 * 人机协作步骤 (HITL):需要人类干预的节点。 * 原生代码函数:处理纯辅助逻辑的函数。
节点通过 Event 对象接收上游数据并向下游发送产出结果。
from google.adk import Event
# 定义一个将输入文本转为大写的简单函数节点
def my_function_node(node_input: str):
input_text_modified = node_input.upper()
return Event(output=input_text_modified)
关于节点间数据传输的更多细节,请参考智能体工作流的数据处理。
工作流图语法参考¶
你通过创建一个 edges 数组来定义图,它定义了要遵循的节点逻辑执行路径和条件。本节提供了 edges 数组中图语法的概况。以下代码示例展示了一个包含两个按顺序执行的节点的基础工作流:
from google.adk import Workflow
# 定义顺序工作流
root_agent = Workflow(
name="sequential_workflow",
edges=[("START", task_A_node, task_B_node)],
)
注意:工作流与智能体限制
在图工作流中使用 LlmAgent 时,必须将其设置为 task 或 single-turn 模式。关于模式的详细说明,请参阅协作智能体团队构建。
路由序列 (Sequence)¶
edges 数组中的节点会按照声明的物理顺序依次执行。数组的第一行通常以 START 关键字开头,表示执行的起点。
# 运行单个孤立节点
edges=[("START", single_task)]
# 按顺序链式运行多个节点
edges=[("START",
task_A_node,
task_B_node,
task_C_node)]
如果你需要并行启动多个任务,可以定义多行以 START 开头的边:
# 启动多个并行任务
edges=[
("START", parallel_task_A),
("START", parallel_task_B),
("START", parallel_task_C),
]
并行执行的局限性
并非所有节点都支持并行。例如,你无法在同一个会话中同时开启多个交互式聊天分支。
路由分支 (Branching)¶
对于非线性路径,你需要使用一个具备“决策能力”的节点(通常是 FunctionNode)。该节点会返回一个携带 route 标识的 Event。在 edges 数组中,你通过将路由标识映射到目标节点来实现分流。
def intent_router(node_input: str):
"""根据意图分析决定下一跳路劲"""
if "查询" in node_input:
return Event(route="GO_TO_SEARCH")
return Event(route="GO_TO_CHAT")
root_agent = Workflow(
name="branching_logic",
edges=[
("START", pre_processor, intent_router),
(intent_router,
{
"GO_TO_SEARCH": search_subagent,
"GO_TO_CHAT": chat_subagent,
},
),
],
)
并行任务:扇出与合并 (Fan-out & Join)¶
当你将任务分发到多个并行节点后,通常需要汇总结果以便后续处理。ADK 提供了 JoinNode 对象专门用于收束并行链路。它会阻塞等待所有上游任务完成,然后将产出的数据集合体传给后续节点。
图 2. 利用 JoinNode 汇聚并行节点的执行结果。
from google.adk.workflow import JoinNode
# 定义一个汇聚节点
combiner = JoinNode(name="result_combiner")
edges=[
("START", fetch_news, combiner),
("START", fetch_ads, combiner),
("START", fetch_user_pref, combiner),
(combiner, generate_summary_agent), # 所有结果到齐后,由智能体生成总结
]
JoinNode 阻塞警告
JoinNode 必须收集到所有上游支路的输出后才会激活。如果任一支路挂掉或未产出 Event,整个工作流将会永久阻塞。请务必为并行节点配置防御性的“兜底输出”。
嵌套工作流 (Nested Workflows)¶
为了实现业务逻辑的高内聚与低耦合,你可以将特定子系统封装为独立的 Workflow 对象,并将其作为大工作流中的一个节点。
图 3. 父工作流中嵌入的子工作流节点。
from google.adk import Workflow
# 根级主工作流引用了已经定义好的子系统 workflow_B 和 workflow_C
root_agent = Workflow(
name="enterprise_platform",
edges=[
("START", entry_node, router),
(router, {
"ORDER_SYSTEM": order_workflow, # 这是一个完整的子工作流
"USER_SYSTEM": user_mgmt_workflow, # 这也是一个完整的子工作流
}),
],
)
嵌套工作流的数据输出机制¶
嵌套工作流的数据流转逻辑与原子节点略有不同: 1. 链路流转:子工作流内部节点产生的数据会在其自身的图中正常传递。 2. 事件冒泡:系统会自动将子工作流的执行事件上报给父级,以确保全链路的可追溯性(Traceability)。 3. 最终产出:当子工作流执行到其自身的图终点(末梢节点)时,父级节点会自动捕获这些叶子节点产出的最终数据,并将其作为该“子工作流节点”的统一输出发送出去。