为智能体工作流构建图路由¶
ADK 中的基于图的工作流将智能体逻辑定义为执行节点和边 (edges) 的图,允许你构建结合了人工智能 (AI) 推理和代码逻辑的更可靠流程。这些工作流允许你创建可以封装代码函数、AI 驱动的智能体、工具以及人类输入的执行逻辑路由。通过明确地绘制出路由逻辑,这种方法允许你在代码中定义特定的、分步的过程工作流,相比纯提示词驱动的智能体,它提供了更高的精确度和可靠性。
# 定义路由工作流
root_agent = Workflow(
name="routing_workflow",
edges=[
("START", process_message, router),
(router,
{
"output-1": response_1,
"output-2": response_2,
"output-3": response_3,
},
),
],
)
图 1. 任务图的可视化以及实现它的 Workflow 代码。
使用基于图的智能体工作流的优势在于,相比基于提示词的智能体,它在控制力、可预测性和可靠性方面有了显著提升。通过在代码中定义整体流程工作流,你可以更精细地控制任务的路由和执行方式。这种结构化的节点定义提高了智能体的可预测性,并增强了对于需要明确步骤和过程管理的复杂任务的可靠性。
Alpha 发布版
ADK 2.0 是一个 Alpha 发布版,与之前的 ADK 版本一起使用时可能会导致破坏性变更。如果你需要向后兼容性(例如在生产环境中),请不要使用 ADK 2.0。我们鼓励你测试此版本,并欢迎提供你的 反馈!
通过查看 基于图的智能体工作流 开始使用 ADK 中的基于图的工作流。
节点¶
一个图由执行节点组成。这些节点 (nodes) 可以是智能体 (Agents)、ADK 工具 (Tools)、人类输入任务或你编写的代码函数。节点可以接收先前执行节点的输入,并通过 Event 对象发送数据。下面展示了一个处理文本输入并发送文本输出的简单 FunctionNode:
from google.adk import Event
# 定义一个简单的函数节点
def my_function_node(node_input: str):
input_text_modified = node_input.upper()
return Event(output=input_text_modified)
有关在节点之间传输数据的更多信息,请参阅 智能体工作流的数据处理。
工作流图语法¶
你通过创建一个 edges 数组来定义图,它定义了要遵循的节点逻辑执行路径和条件。本节提供了 edges 数组中图语法的概况。以下代码示例展示了一个包含两个按顺序执行的节点的基础工作流:
from google.adk import Workflow
# 定义顺序工作流
root_agent = Workflow(
name="sequential_workflow",
edges=[("START", task_A_node, task_B_node)],
)
注意:工作流与智能体限制
你可以将智能体 (Agents) 或 LlmAgents 添加到基于图的工作流中,但必须将它们设置为任务 (task) 或单次对话 (single-turn) 模式。有关智能体模式的更多信息,请参见 构建协作智能体团队。
路由序列¶
edges 数组根据数组中呈现的顺序或节点执行节点,从第一行开始并进行到随后的行,直到执行完成。edges 数组的第一行使用 START 关键字来表示图执行的开始,每个列出的节点按顺序执行,如下列代码片段所示:
# 运行单个节点
edges=[("START", task_A_node)]
# 按顺序运行 3 个节点
edges=[("START",
task_A_node,
task_B_node,
task_C_node)]
你也可以多次使用 START 在工作流图的开头启动并行任务,如下列代码片段所示:
# 启动多个并行任务
edges=[
("START", parallel_task_A),
("START", parallel_task_B),
("START", parallel_task_C),
]
注意:并行节点的局限性
并非所有的工作流节点或子智能体都可以并行运行。特别是,你不能在同一个智能体会话中运行多个交互式聊天会话。
路由分支¶
START 关键字之后 edges 数组的后续行定义了节点的额外执行逻辑。对于分支路径,你需要定义一个节点(通常是 FunctionNode),它输出一个带有多个路由值的路由 (route)。在 edges 图中,你使用路由值和目标节点定义执行逻辑,如下列代码示例所示:
# 模拟路由决策
def router(node_input: str):
"""模拟路由决策"""
return Event(route="RUN_TASK_C")
root_agent = Workflow(
name="routing_workflow",
edges=[
("START", task_A_node, router),
(router,
{
# "路由值": 运行的节点
"RUN_TASK_B": task_B_node,
"RUN_TASK_C": task_C_node,
},
),
],
)
并行任务:扇出与合并路径¶
你可以创建将执行分散到多个并行节点上的图,并且通常你需要汇聚每个节点的输出以进行进一步处理。你可以通过使用 JoinNode 对象来完成此操作,它会等待每个并行任务完成,然后将这些节点的输出集合传递给下一个节点。
图 2. 并行任务节点的输出可以使用 JoinNode 对象进行汇聚。
以下代码片段展示了如何实现一个基础的 JoinNode 对象并使用它来汇聚所有节点的输出:
from google.adk.workflow import JoinNode
# 定义合并节点
my_join_node = JoinNode(name="my_join_node")
edges=[
("START", parallel_task_A, my_join_node),
("START", parallel_task_B, my_join_node),
("START", parallel_task_C, my_join_node),
(my_join_node, final_task_D),
]
注意:未完成节点导致 JoinNode 阻塞
JoinNode 对象仅在所有上游节点都提供了一个 Event 输出后才会继续。如果其中一个上游节点未能提供输出,JoinNode 就会阻塞,工作流执行就会停止。请确保在任何输出到 JoinNode 的节点中包含故障安全输出(兜底输出)。
嵌套工作流¶
构建更复杂的工作流时,你可能希望将特定任务的功能封装进可重用的工作流中。一个或多个 Workflow 对象可以作为另一个工作流智能体图中的节点使用,以实现这一目标。
图 3. 嵌套 Workflow 作为父级 Workflow 内部的节点。
以下代码片段展示了如何实现一个带有两个嵌套 Workflow 对象 (workflow_B, workflow_C) 作为图中节点的工作流智能体:
from google.adk import Workflow
# 定义包含嵌套工作流的根级智能体
root_agent = Workflow(
name="parent_workflow",
edges=[
("START", task_A1, router),
(router, {
"RUN_WORKFLOW_B": workflow_B,
"RUN_WORKFLOW_C": workflow_C,
}),
],
)
嵌套工作流的数据输出¶
嵌套 Workflow 对象的工作方式与单个节点略有不同。当嵌套工作流完成其其中一个节点时,它会将数据传输到嵌套工作流图中下一个节点,并且系统会将该节点的 Event 冒泡(向上传递)到父工作流,以实现过程可追溯性。当嵌套工作流完成其过程中的最后一个节点时,父节点会从最终的叶子节点提取数据,并将其作为嵌套工作流的输出发出。 low.