Skip to content

动态工作流

Supported in ADKPython v2.0.0Alpha

ADK 框架提供了一种编程化的方式来定义工作流,这为传统的基于图的工作流提供了一个更灵活、更强大的替代方案。虽然基于图的方法非常适合组合多步骤、静态的流程结构,但如果你的业务逻辑包含复杂的迭代循环、递归或高度动态的分支判断,静态图可能会变得难以管理。

ADK 中的动态工作流 (Dynamic Workflows) 允许你抛开预设的图路径限制,充分利用 Python 等编程语言的全部功能。通过动态工作流,你可以使用简单的装饰器创建工作流,像函数一样调用工作流节点,并构建复杂的路由逻辑。以下是 ADK 中动态工作流的一些优势:

  • 灵活的控制流:使用循环、条件判断和递归动态定义执行顺序,而这些在静态图中很难或无法表示。
  • 编程体验:使用熟悉的构造(如 while 循环和 async/await)而不是基于图的路由。
  • 自动检查点 (Automatic Checkpointing):动态工作流会跟踪每个节点的执行情况。在恢复工作流时,已成功执行的子节点会被自动跳过,从而使复杂逻辑默认具备持久性和可恢复性。
  • 封装:将业务逻辑包装进节点中,父节点在内部组合底层节点,从而保持整体工作流图的整洁和可管理。

Alpha 发布版

ADK 2.0 是一个 Alpha 发布版,与之前的 ADK 版本一起使用时可能会导致破坏性变更。如果你需要向后兼容性(例如在生产环境中),请不要使用 ADK 2.0。我们鼓励你测试此版本,并欢迎提供你的使用反馈

关于如何安装 ADK 2.0 以测试此功能,请参阅 ADK 2.0 欢迎页面

快速入门

以下动态工作流代码示例展示了如何定义一个包含单个函数节点的基础工作流:

from google.adk import Workflow
from google.adk import Event
from google.adk import Context
from typing import Any

# 1. 定义一个简单的功能节点
@node(name="hello_node")
def my_node(node_input: Any):
    return "你好,欢迎使用 ADK 动态工作流!"

# 2. 定义动态工作流协调节点
@node(rerun_on_resume=True)
async def my_workflow(ctx: Context, node_input: str) -> str:
    # ctx.run_node 负责执行节点并捕获输出
    result = await ctx.run_node(my_node, input_data="hello")
    return result

# 3. 封装为工作流对象以便运行
root_agent = Workflow(
    name="root_agent",
    edges=[("START", my_workflow)],
)

提示: 此示例使用了 @node 装饰器,它会自动为函数生成必要的包装逻辑,使其能够在 ADK 运行时上下文(Context)中被正确识别和调度。

核心概念:节点与工作流

节点和工作流是构建动态系统的基础基石。

节点与 @node

ADK 中的动态工作流由节点组成,这些节点是派生自 BaseNode 的类。可用工作流节点的一个简单版本是 FunctionNode,它允许你将代码包装成在 Workflow 中运行所需的功能。为了最大程度减少样板代码,ADK 推荐使用 @node 装饰器:

# 使用装饰器定义节点
@node(name="hello_node")
def my_function_node(node_input: Any):
    return "你好,世界"

以下代码片段展示了不使用 @node 注解时的等效代码:

# 基础函数
def my_function_node(node_input: Any):
    return "你好,世界"

# 带有选项的 FunctionNode 包装器
success_node = FunctionNode(
    my_function_node,
    name="hello",
    rerun_on_resume=True,
)

如果你正在包装来自外部库的函数、需要基于同一个函数创建具有不同配置的多个节点,或者正在注册表中管理节点引用以进行高级编排,那么手动创建节点包装器代码会非常有用。

工作流

在 ADK 动态工作流中,你使用 Workflow 类作为编排节点的主要容器。你可以使用一个节点来定义一个动态工作流,其中包含管理节点运行的代码以及这些节点的执行逻辑(顺序和路径),如下列代码示例所示:

# 定义工作流逻辑
@node(rerun_on_resume=True)
async def my_workflow(ctx):
    # run_node 执行一个节点并返回其输出
    result = await ctx.run_node(my_function_node, input_data="你好")
    result_formatted = await ctx.run_node(my_formatting_node, input_data=result)
    return result_formatted

# 运行工作流
root_agent = Workflow(
    name="root_agent",
    edges=[("START", my_workflow)],
)

数据处理机制

在动态工作流中,数据传递比静态图更加直观。由于使用了 async/await 模式,ctx.run_node() 会直接异步返回该节点的处理结果,你无需手动在不同事件对象间搬运数据。

from google.adk import Context

@node(rerun_on_resume=True)
async def editorial_workflow(ctx: Context, user_request: str):
    # 智能体节点生成输出
    raw_draft = await ctx.run_node(draft_agent, user_request)

    # 函数节点格式化文本
    formatted_text = await ctx.run_node(format_function_node, raw_draft)

    return formatted_text

你也可以使用定义的类来传递特定的数据架构,并配置输入和输出架构,这与基于图的工作流节点类似,如下列代码示例所示:

from google.adk import Agent
from google.adk import Context
from pydantic import BaseModel

# 定义数据模型
class CityTime(BaseModel):
    time_info: str  # 时间信息
    city: str       # 城市名称

@node
def city_time_function(city: str):
    """模拟返回指定城市的当前时间。"""
    return CityTime(time_info="10:10 AM", city=city)

# 定义智能体,并使用 CityTime 作为输入架构
city_report_agent = Agent(
    name="city_report_agent",
    model="gemini-2.5-flash",
    input_schema=CityTime,
    instruction="""输出上一个节点提供的数据。""",
)

@node # 工作流节点
async def city_workflow(ctx: Context):
    city_time = await ctx.run_node(city_time_function, "Paris")
    report_text = await ctx.run_node(city_report_agent, city_time)

    return report_text

For more information on data handling between workflow nodes, see Data handling for agent workflows.

工作流路由策略

Dynamic workflows in ADK provide more flexibility in terms of routing logic compared to graph-based workflows, including iterative loops or more complex branching logic. This section describes some of the techniques that you can use for routing.

顺序路由

你可以使用 ADK 动态工作流创建顺序任务处理,就像使用基于图的工作流一样。以下代码片段展示了一个包含一个智能体、一个函数节点和第二个智能体的动态工作流:

@node # 工作流节点
async def city_workflow(ctx: Context):
    # 按顺序执行三个节点
    city = await ctx.run_node(city_generator_agent)
    city_time = await ctx.run_node(city_time_function, city)
    report_text = await ctx.run_node(city_report_agent, city_time)

    return report_text

循环路由

对于希望对任务使用迭代循环的工作流,动态工作流为你提供了更大的灵活性来定义所需的路由逻辑。以下代码示例展示了如何使用动态工作流构建一个用于生成、评审和更新代码的工作流循环:

# 生成代码的智能体
coder_agent = LlmAgent(
    name="generator_agent",
    model="gemini-2.5-flash",
    instruction="根据用户请求编写 Python 代码。",
    output_schema=str,
)

# 代码静态检查节点
@node(name="lint_reviewer")
compile_lint_check = ApiNode()

# 修复代码的智能体
fixer_agent = LlmAgent(
    name="generator_agent",
    model="gemini-2.5-flash",
    instruction="""重构当前代码 {code}
        基于编译和 lint 评审结果:{findings}""",
    output_schema=str,
)

@node # 工作流节点
async def code_workflow(ctx):
  code = await ctx.run_node(coder_agent)
  check_resp = await ctx.run_node(compile_lint_check, code)

  # 循环,直到没有发现问题
  while check_resp.findings:
    yield Event(state={"code": code, "findings": check_resp.findings})
    code = await ctx.run_node(fixer_agent)

    check_resp = await ctx.run_node(compile_lint_check, code)

    return code

并行执行路由

ADK 中的动态工作流可以支持并行执行,你可以使用标准异步库(如 asyncio)来构建此功能。以下代码示例展示了如何构建一个支持并行执行的工作流节点,该节点随后可以集成到更大的工作流中:

from google.adk.workflow import BaseNode
from google.adk import Context
from typing import Any
import asyncio

class ParallelNode(BaseNode):
    """一个并行运行工作节点的监督节点。"""
    real_node: BaseNode

    async def run(self, ctx: Context, node_input: list[Any]):
        tasks = []

        # 为输入列表中的每一项动态调度工作节点
        for item in node_input:
            # ctx.run_node 为临时节点返回一个可等待的 future
            tasks.append(ctx.run_node(self.real_node, item))

        # 使用 asyncio 并行收集结果
        results = await asyncio.gather(*tasks)

        return results

提示:恢复运行中的并行节点

工作流框架确保如果动态工作流恢复运行,只有失败或中断的工作节点会被重新执行,这包括并行的工作节点。

人机协作 (Human Input)

动态工作流天然支持“人机协同 (Human in the Loop)”步骤。通过 RequestInput 信号,你可以指示工作流在关键点暂停执行,等待用户反馈后再继续。

async def approval_step(ctx: Context, data: Any):
    # 发起审批请求
    req = RequestInput(message="请确认是否批准此操作?(yes/no)")

    # 执行到此处时,工作流会挂起并保存现场,直到用户在前端完成响应
    user_choice = await ctx.run_node(GetInputNode(req))

    if user_choice.lower() == "yes":
        return await ctx.run_node(process_task, data)
    return "用户已否决该操作。"

高级进阶特性

执行标识符 (Execution IDs)

ADK 为每一个节点调用生成一个确定性的 ID(由于父 ID 与计数器组成)。这构成了检查点机制的基石。在恢复执行时,框架根据这些 ID 识别该步骤是否已经成功跑过,从而决定是直接读取历史结果还是重新执行。

自定义执行 ID

谨慎使用自定义 ID

除非你非常清楚自己在做什么(例如处理顺序会发生重排的任务列表),否则请避免手动干扰执行 ID。不当的 ID 可能会破坏 ADK 的自动重试和恢复逻辑。

如果你正在处理如订单列表等具有天然唯一键的场景,可以手动指定:

for order in orders:
    # 使用业务自带的 order_id 确保在即使重排也能精准命中检查点
    await ctx.run_node(process_node, order, name=order.order_id)