Skip to content

动态工作流

Supported in ADKPython v2.0.0Alpha

ADK 框架提供了一种以编程方式定义工作流的方法,作为基于图的工作流的一种更灵活且强大的替代方案。使用基于图的方法提供了一种便捷的方式,通过工作流节点来组合多步骤、静态的过程结构。然而,如果你工作流中的逻辑路径更为复杂,例如包含迭代循环或复杂的分支逻辑,基于图的方法可能不再适合你的需求,或者会因为过于庞杂而难以管理。

ADK 中的动态工作流允许你抛开基于图的路径结构,充分利用所选编程语言的全部功能来构建工作流。通过动态工作流,你可以使用简单的装饰器创建工作流,像函数一样调用工作流节点,并构建复杂的路由逻辑。以下是 ADK 中动态工作流的一些优势:

  • 灵活的控制流:使用循环、条件判断和递归动态定义执行顺序,而这些在静态图中很难或无法表示。
  • 编程体验:使用熟悉的构造(如 while 循环和 async/await)而不是基于图的路由。
  • 自动检查点 (Automatic Checkpointing):动态工作流会跟踪每个节点的执行情况。在恢复工作流时,已成功执行的子节点会被自动跳过,从而使复杂逻辑默认具备持久性和可恢复性。
  • 封装:将业务逻辑包装进节点中,父节点在内部组合底层节点,从而保持整体工作流图的整洁和可管理。

Alpha 发布版

ADK 2.0 是一个 Alpha 发布版,与之前的 ADK 版本一起使用时可能会导致破坏性变更。如果你需要向后兼容性(例如在生产环境中),请不要使用 ADK 2.0。我们鼓励你测试此版本,并欢迎提供你的 反馈

有关安装 ADK 2.0 以测试此功能的信息,请参见 欢迎使用 ADK 2.0

快速入门

以下动态工作流代码示例展示了如何定义一个包含单个函数节点的基础工作流:

from google.adk import Workflow
from google.adk import Event
from google.adk import Context
from typing import Any

# 定义一个简单的节点
@node(name="hello_node")
def my_node(node_input: Any):
    return "你好,世界"

# 定义一个动态工作流节点
@node(rerun_on_resume=True)
async def my_workflow(ctx: Context, node_input: str) -> str:
    # run_node 执行一个节点并返回其输出
    result = await ctx.run_node(my_node, input_data="hello")
    return result

# 运行工作流
root_agent = Workflow(
    name="root_agent",
    edges=[("START", my_workflow)],
)

此示例为了方便并保持代码尽可能简洁,使用了 @node 注解。此注解生成包装器,允许代码在 ADK 动态工作流的上下文中运行。

构建块:节点与工作流

节点(Node)和工作流(Workflow)代表了 ADK 动态工作流的基础构建块。这些类提供了包装代码所需的功能,使其能够集成到 ADK 的代码化工作流中。

节点与 @node

ADK 中的动态工作流由节点组成,这些节点是派生自 BaseNode 的类。可用工作流节点的一个简单版本是 FunctionNode,它允许你将代码包装成在 Workflow 中运行所需的功能。为了方便,ADK 框架提供了 @node 注解来生成节点包装器,从而将样板代码减少到最低:

# 使用装饰器定义节点
@node(name="hello_node")
def my_function_node(node_input: Any):
    return "你好,世界"

以下代码片段展示了不使用 @node 注解时的等效代码:

# 基础函数
def my_function_node(node_input: Any):
    return "你好,世界"

# 带有选项的 FunctionNode 包装器
success_node = FunctionNode(
    my_function_node,
    name="hello",
    rerun_on_resume=True,
)

如果你正在包装来自外部库的函数、需要基于同一个函数创建具有不同配置的多个节点,或者正在注册表中管理节点引用以进行高级编排,那么手动创建节点包装器代码会非常有用。

工作流

在 ADK 动态工作流中,你使用 Workflow 类作为编排节点的主要容器。你可以使用一个节点来定义一个动态工作流,其中包含管理节点运行的代码以及这些节点的执行逻辑(顺序和路径),如下列代码示例所示:

# 定义工作流逻辑
@node(rerun_on_resume=True)
async def my_workflow(ctx):
    # run_node 执行一个节点并返回其输出
    result = await ctx.run_node(my_function_node, input_data="你好")
    result_formatted = await ctx.run_node(my_formatting_node, input_data=result)
    return result_formatted

# 运行工作流
root_agent = Workflow(
    name="root_agent",
    edges=[("START", my_workflow)],
)

数据处理

在 ADK 中使用动态工作流时,传递数据比基于图的工作流更简单,因为在工作流中,Context 类的 run_node() 方法会直接返回节点的输出。这消除了直接处理会话状态或由于数据传输而需要处理复杂路由输出的需求。以下代码示例展示了如何在智能体节点和函数节点之间传递字符串数据:

from google.adk import Context

@node(rerun_on_resume=True)
async def editorial_workflow(ctx: Context, user_request: str):
    # 智能体节点生成输出
    raw_draft = await ctx.run_node(draft_agent, user_request)

    # 函数节点格式化文本
    formatted_text = await ctx.run_node(format_function_node, raw_draft)

    return formatted_text

你也可以使用定义的类来传递特定的数据架构,并配置输入和输出架构,这与基于图的工作流节点类似,如下列代码示例所示:

from google.adk import Agent
from google.adk import Context
from pydantic import BaseModel

# 定义数据模型
class CityTime(BaseModel):
    time_info: str  # 时间信息
    city: str       # 城市名称

@node
def city_time_function(city: str):
    """模拟返回指定城市的当前时间。"""
    return CityTime(time_info="10:10 AM", city=city)

# 定义智能体,并使用 CityTime 作为输入架构
city_report_agent = Agent(
    name="city_report_agent",
    model="gemini-2.5-flash",
    input_schema=CityTime,
    instruction="""输出上一个节点提供的数据。""",
)

@node # 工作流节点
async def city_workflow(ctx: Context):
    city_time = await ctx.run_node(city_time_function, "Paris")
    report_text = await ctx.run_node(city_report_agent, city_time)

    return report_text

有关工作流节点之间数据处理的更多信息,请参见 智能体工作流的数据处理

工作流路由

基于图的工作流相比,ADK 中的动态工作流在路由逻辑方面提供了更大的灵活性,包括迭代循环或更复杂的分支逻辑。本节介绍了一些你可以使用的路由技术。

顺序路由

你可以使用 ADK 动态工作流创建顺序任务处理,就像使用基于图的工作流一样。以下代码片段展示了一个包含一个智能体、一个函数节点和第二个智能体的动态工作流:

@node # 工作流节点
async def city_workflow(ctx: Context):
    # 按顺序执行三个节点
    city = await ctx.run_node(city_generator_agent)
    city_time = await ctx.run_node(city_time_function, city)
    report_text = await ctx.run_node(city_report_agent, city_time)

    return report_text

循环路由

对于希望对任务使用迭代循环的工作流,动态工作流为你提供了更大的灵活性来定义所需的路由逻辑。以下代码示例展示了如何使用动态工作流构建一个用于生成、评审和更新代码的工作流循环:

# 生成代码的智能体
coder_agent = LlmAgent(
    name="generator_agent",
    model="gemini-2.5-flash",
    instruction="根据用户请求编写 Python 代码。",
    output_schema=str,
)

# 代码静态检查节点
@node(name="lint_reviewer")
compile_lint_check = ApiNode()

# 修复代码的智能体
fixer_agent = LlmAgent(
    name="generator_agent",
    model="gemini-2.5-flash",
    instruction="""重构当前代码 {code}
        基于编译和 lint 评审结果:{findings}""",
    output_schema=str,
)

@node # 工作流节点
async def code_workflow(ctx):
  code = await ctx.run_node(coder_agent)
  check_resp = await ctx.run_node(compile_lint_check, code)

  # 循环,直到没有发现问题
  while check_resp.findings:
    yield Event(state={"code": code, "findings": check_resp.findings})
    code = await ctx.run_node(fixer_agent)

    check_resp = await ctx.run_node(compile_lint_check, code)

  return code

并行执行路由

ADK 中的动态工作流可以支持并行执行,你可以使用标准异步库(如 asyncio)来构建此功能。以下代码示例展示了如何构建一个支持并行执行的工作流节点,该节点随后可以集成到更大的工作流中:

from google.adk.workflow import BaseNode
from google.adk import Context
from typing import Any
import asyncio

class ParallelNode(BaseNode):
    """一个并行运行工作节点的监督节点。"""
    real_node: BaseNode

    async def run(self, ctx: Context, node_input: list[Any]):
        tasks = []

        # 为输入列表中的每一项动态调度工作节点
        for item in node_input:
            # ctx.run_node 为临时节点返回一个可等待的 future
            tasks.append(ctx.run_node(self.real_node, item))

        # 使用 asyncio 并行收集结果
        results = await asyncio.gather(*tasks)

        return results

提示:恢复运行中的并行节点

工作流框架确保如果动态工作流恢复运行,只有失败或中断的工作节点会被重新执行,这包括并行的工作节点。

人类输入

ADK 中的动态工作流还可以包含人类输入或人机协同 (HITL) 步骤。你通过创建一个派生自 BaseNode 的子类来将人类输入构建到工作流中,该子类会中断工作流,并结合 RequestInput 实例向用户提供请求并检索其响应。以下代码示例展示了如何构建人类输入节点并将其包含在工作流中:

from google.adk.workflow import BaseNode
from google.adk import Context
from google.adk.events import RequestInput
from typing import Any, AsyncGenerator

class GetInput(BaseNode):
    """一个暂停执行并等待人类输入的节点。"""
    rerun_on_resume = False  # 确保恢复时将响应作为输出产出

    def __init__(self, request: RequestInput, name: str):
        self.request = request
        self.name = name

    def get_name(self) -> str:
        return self.name

    async def run(self) -> AsyncGenerator[Any, None]:
        # 产出(Yield)请求会告诉工作流暂停并等待输入
        yield self.request

async def approval_process_node(ctx: Context, node_input: Any):
    """一个协调人类审批步骤的父节点。"""

    # 为用户定义请求内容
    request = RequestInput(message="请批准此请求(是/否)")

    # 动态调用 HITL 节点。工作流在此处暂停。
    user_response = await ctx.run_node(GetInput(request, name="approval_step"))

    if user_response.lower() == "yes" or user_response == "是":
        return "请求已批准"
    else:
        return "请求被拒绝"

高级特性

动态工作流提供了一些旨在处理更复杂开发场景的高级特性。这些能力允许对执行进行更精细的控制,并能更好地与现有的技术基础设施集成。

执行 ID

ADK 框架基于父级 ID 和一个计数器,为子节点的执行生成一个确定性标识符 (ID)。ADK 工作流为每个调度的节点使用确定性 ID 来识别之前的结果。这些 ID 是根据动态节点调度的顺序生成的,用于设置检查点,并在工作流恢复或重新运行时按正确顺序重新运行任务。

自定义执行 ID

在某些罕见的情况下,你可能需要稳定的标识符,例如在处理可重新排序的列表时,你可以在运行节点时提供自定义 ID。通常情况下,你应该避免这样做,因为这会影响工作流任务的重试和进程恢复。具体来说,这些 ID 用于检查节点状态,如果节点已经运行,则跳过执行。如果你提供自定义 ID,请确保它们对于工作流重新运行保持确定性,并且对于输入而言逻辑上保持不变。以下示例展示了在工作流中执行节点时如何添加此类标识符:

警告:自定义执行 ID

请尽量避免创建自定义执行 ID。由于执行 ID 用于确定节点的执行顺序,自定义执行 ID 可能会在系统尝试重新运行工作流中的这些节点时引发问题。

class Order(BaseModel):
  order_id: str
  cart_items: list[Product]

# 处理订单的工作流节点
async def process_orders_workflow(ctx, node_input: str):

  orders = await get_orders()

  process_tasks = []
  for i, order in enumerate(orders):
    # 使用自定义 ID(order_id)运行节点
    task = ctx.run_node(process_order, order, name=order.order_id)

    process_tasks.append(task)

  # 并行执行
  result = await asyncio.gather(*process_tasks)

  return result