Skip to content

为智能体工作流构建图路由

Supported in ADKPython v2.0.0Alpha

ADK 中的基于图的工作流将智能体逻辑定义为执行节点和边 (edges) 的图,允许你构建结合了人工智能 (AI) 推理和代码逻辑的更可靠流程。这些工作流允许你创建可以封装代码函数、AI 驱动的智能体、工具以及人类输入的执行逻辑路由。通过明确地绘制出路由逻辑,这种方法允许你在代码中定义特定的、分步的过程工作流,相比纯提示词驱动的智能体,它提供了更高的精确度和可靠性。

基于图的航班升舱智能体

# 定义路由工作流
root_agent = Workflow(
  name="routing_workflow",
  edges=[
    ("START", process_message, router),
    (router,
      {
        "output-1": response_1,
        "output-2": response_2,
        "output-3": response_3,
      },
    ),
  ],
)

图 1. 任务图的可视化以及实现它的 Workflow 代码。

使用基于图的智能体工作流的优势在于,相比基于提示词的智能体,它在控制力、可预测性和可靠性方面有了显著提升。通过在代码中定义整体流程工作流,你可以更精细地控制任务的路由和执行方式。这种结构化的节点定义提高了智能体的可预测性,并增强了对于需要明确步骤和过程管理的复杂任务的可靠性。

Alpha 发布版

ADK 2.0 是一个 Alpha 发布版,与之前的 ADK 版本一起使用时可能会导致破坏性变更。如果你需要向后兼容性(例如在生产环境中),请不要使用 ADK 2.0。我们鼓励你测试此版本,并欢迎提供你的 反馈

通过查看 基于图的智能体工作流 开始使用 ADK 中的基于图的工作流。

节点

一个图由执行节点组成。这些节点 (nodes) 可以是智能体 (Agents)、ADK 工具 (Tools)、人类输入任务或你编写的代码函数。节点可以接收先前执行节点的输入,并通过 Event 对象发送数据。下面展示了一个处理文本输入并发送文本输出的简单 FunctionNode

from google.adk import Event

# 定义一个简单的函数节点
def my_function_node(node_input: str):
    input_text_modified = node_input.upper()
    return Event(output=input_text_modified)

有关在节点之间传输数据的更多信息,请参阅 智能体工作流的数据处理

工作流图语法

你通过创建一个 edges 数组来定义图,它定义了要遵循的节点逻辑执行路径和条件。本节提供了 edges 数组中图语法的概况。以下代码示例展示了一个包含两个按顺序执行的节点的基础工作流:

from google.adk import Workflow

# 定义顺序工作流
root_agent = Workflow(
    name="sequential_workflow",
    edges=[("START", task_A_node, task_B_node)],
)

注意:工作流与智能体限制

你可以将智能体 (Agents)LlmAgents 添加到基于图的工作流中,但必须将它们设置为任务 (task) 或单次对话 (single-turn) 模式。有关智能体模式的更多信息,请参见 构建协作智能体团队

路由序列

edges 数组根据数组中呈现的顺序或节点执行节点,从第一行开始并进行到随后的行,直到执行完成。edges 数组的第一行使用 START 关键字来表示图执行的开始,每个列出的节点按顺序执行,如下列代码片段所示:

# 运行单个节点
edges=[("START", task_A_node)]
# 按顺序运行 3 个节点
edges=[("START",
        task_A_node,
        task_B_node,
        task_C_node)]

你也可以多次使用 START 在工作流图的开头启动并行任务,如下列代码片段所示:

# 启动多个并行任务
edges=[
    ("START", parallel_task_A),
    ("START", parallel_task_B),
    ("START", parallel_task_C),
]

注意:并行节点的局限性

并非所有的工作流节点或子智能体都可以并行运行。特别是,你不能在同一个智能体会话中运行多个交互式聊天会话。

路由分支

START 关键字之后 edges 数组的后续行定义了节点的额外执行逻辑。对于分支路径,你需要定义一个节点(通常是 FunctionNode),它输出一个带有多个路由值的路由 (route)。在 edges 图中,你使用路由值和目标节点定义执行逻辑,如下列代码示例所示:

# 模拟路由决策
def router(node_input: str):
    """模拟路由决策"""
    return Event(route="RUN_TASK_C")

root_agent = Workflow(
  name="routing_workflow",
  edges=[
    ("START", task_A_node, router),
    (router,
      {
        # "路由值": 运行的节点
        "RUN_TASK_B": task_B_node,
        "RUN_TASK_C": task_C_node,
      },
    ),
  ],
)

并行任务:扇出与合并路径

你可以创建将执行分散到多个并行节点上的图,并且通常你需要汇聚每个节点的输出以进行进一步处理。你可以通过使用 JoinNode 对象来完成此操作,它会等待每个并行任务完成,然后将这些节点的输出集合传递给下一个节点。

连接到 JoinNode 的任务

图 2. 并行任务节点的输出可以使用 JoinNode 对象进行汇聚。

以下代码片段展示了如何实现一个基础的 JoinNode 对象并使用它来汇聚所有节点的输出:

from google.adk.workflow import JoinNode

# 定义合并节点
my_join_node = JoinNode(name="my_join_node")

edges=[
    ("START", parallel_task_A, my_join_node),
    ("START", parallel_task_B, my_join_node),
    ("START", parallel_task_C, my_join_node),
    (my_join_node, final_task_D),
]

注意:未完成节点导致 JoinNode 阻塞

JoinNode 对象仅在所有上游节点都提供了一个 Event 输出后才会继续。如果其中一个上游节点未能提供输出,JoinNode 就会阻塞,工作流执行就会停止。请确保在任何输出到 JoinNode 的节点中包含故障安全输出(兜底输出)。

嵌套工作流

构建更复杂的工作流时,你可能希望将特定任务的功能封装进可重用的工作流中。一个或多个 Workflow 对象可以作为另一个工作流智能体图中的节点使用,以实现这一目标。

父工作流中的嵌套工作流

图 3. 嵌套 Workflow 作为父级 Workflow 内部的节点。

以下代码片段展示了如何实现一个带有两个嵌套 Workflow 对象 (workflow_B, workflow_C) 作为图中节点的工作流智能体:

from google.adk import Workflow

# 定义包含嵌套工作流的根级智能体
root_agent = Workflow(
  name="parent_workflow",
  edges=[
    ("START", task_A1, router),
    (router, {
      "RUN_WORKFLOW_B": workflow_B,
      "RUN_WORKFLOW_C": workflow_C,
    }),
  ],
)

嵌套工作流的数据输出

嵌套 Workflow 对象的工作方式与单个节点略有不同。当嵌套工作流完成其其中一个节点时,它会将数据传输到嵌套工作流图中下一个节点,并且系统会将该节点的 Event 冒泡(向上传递)到父工作流,以实现过程可追溯性。当嵌套工作流完成其过程中的最后一个节点时,父节点会从最终的叶子节点提取数据,并将其作为嵌套工作流的输出发出。 low.