Skip to content

动态工作流

Supported in ADKPython v2.0.0Beta

ADK 框架提供了一种编程化的方式来定义工作流,这为传统的基于图的工作流提供了一个更灵活、更强大的替代方案。虽然基于图的方法非常适合组合多步骤、静态的流程结构,但如果你的业务逻辑包含复杂的迭代循环、递归或高度动态的分支判断,静态图可能会变得难以管理。

ADK 中的动态工作流 (Dynamic Workflows) 允许你抛开预设的图路径限制,充分利用 Python 等编程语言的全部功能。通过动态工作流,你可以使用简单的装饰器创建工作流,像函数一样调用工作流节点,并构建复杂的路由逻辑。以下是 ADK 中动态工作流的一些优势:

  • 灵活的控制流:使用循环、条件判断和递归动态定义执行顺序,而这些在静态图中很难或无法表示。
  • 编程体验:使用熟悉的构造(如 while 循环和 async/await)而不是基于图的路由。
  • 自动检查点 (Automatic Checkpointing):动态工作流会跟踪每个节点的执行情况。在恢复工作流时,已成功执行的子节点会被自动跳过,从而使复杂逻辑默认具备持久性和可恢复性。
  • 封装:将业务逻辑包装进节点中,父节点在内部组合底层节点,从而保持整体工作流图的整洁和可管理。

Beta 版本

ADK 2.0 是一个 Beta 版本,在与之前版本的 ADK 一起使用时可能会导致重大变更。如果你需要向后兼容性(例如在生产环境中),请不要使用 ADK 2.0。我们鼓励你测试此版本,并欢迎你的反馈

关于如何安装 ADK 2.0 以测试此功能,请参阅 ADK 2.0 欢迎页面

快速入门

以下动态工作流代码示例展示了如何定义一个包含单个函数节点的基础工作流:

from google.adk import Context
from google.adk import Workflow
from google.adk.workflow import node
from typing import Any

# 1. 定义一个简单的功能节点
@node(name="hello_node")
def my_node(node_input: Any):
    return "你好,欢迎使用 ADK 动态工作流!"

# 2. 定义动态工作流协调节点
@node(rerun_on_resume=True)
async def my_workflow(ctx: Context, node_input: str) -> str:
    # run_node 执行一个节点并返回其输出
    result = await ctx.run_node(my_node, node_input="hello")
    return result

# 3. 封装为工作流对象以便运行
root_agent = Workflow(
    name="root_agent",
    edges=[("START", my_workflow)],
)

提示: 此示例使用了 @node 装饰器,它会自动为函数生成必要的包装逻辑,使其能够在 ADK 运行时上下文(Context)中被正确识别和调度。

核心概念:节点与工作流

节点和工作流是构建动态系统的基础基石。

节点与 @node

ADK 中的动态工作流由节点组成,这些节点是派生自 BaseNode 的类。可用工作流节点的一个简单版本是 FunctionNode,它允许你将代码包装成在 Workflow 中运行所需的功能。为了最大程度减少样板代码,ADK 推荐使用 @node 装饰器:

# 使用装饰器定义节点
@node(name="hello_node")
def my_function_node(node_input: Any):
    return "你好,世界"

以下代码片段展示了不使用 @node 注解时的等效代码:

# 基础函数
def my_function_node(node_input: Any):
    return "你好,世界"

# 带有选项的 FunctionNode 包装器
success_node = FunctionNode(
    my_function_node,
    name="hello",
    rerun_on_resume=True,
)

如果你正在包装来自外部库的函数、需要基于同一个函数创建具有不同配置的多个节点,或者正在注册表中管理节点引用以进行高级编排,那么手动创建节点包装器代码会非常有用。

工作流

在 ADK 动态工作流中,你使用 Workflow 类作为编排节点的主要容器。你可以使用一个节点来定义一个动态工作流,其中包含管理节点运行的代码以及这些节点的执行逻辑(顺序和路径),如下列代码示例所示:

# 定义工作流逻辑
@node(rerun_on_resume=True)
async def my_workflow(ctx):
    # run_node 执行一个节点并返回其输出
    result = await ctx.run_node(my_function_node, node_input="Hello")
    result_formatted = await ctx.run_node(my_formatting_node, node_input=result)
    return result_formatted

# 运行工作流
root_agent = Workflow(
    name="root_agent",
    edges=[("START", my_workflow)],
)

数据处理机制

在动态工作流中,数据传递比静态图更加直观。由于使用了 async/await 模式,ctx.run_node() 会直接异步返回该节点的处理结果,你无需手动在不同事件对象间搬运数据。

from google.adk import Context
from google.adk.workflow import node

@node(rerun_on_resume=True)
async def editorial_workflow(ctx: Context, user_request: str):
    # 智能体节点生成输出
    raw_draft = await ctx.run_node(draft_agent, user_request)

    # 函数节点格式化文本
    formatted_text = await ctx.run_node(format_function_node, raw_draft)

    return formatted_text

你也可以使用定义的类来传递特定的数据架构,并配置输入和输出架构,这与基于图的工作流节点类似,如下列代码示例所示:

from google.adk import Agent
from google.adk import Context
from google.adk.workflow import node
from pydantic import BaseModel

# 定义数据模型
class CityTime(BaseModel):
    time_info: str  # 时间信息
    city: str       # 城市名称

@node
def city_time_function(city: str):
    """模拟返回指定城市的当前时间。"""
    return CityTime(time_info="10:10 AM", city=city)

# 定义智能体,并使用 CityTime 作为输入架构
city_report_agent = Agent(
    name="city_report_agent",
    model="gemini-flash-latest",
    input_schema=CityTime,
    instruction="""输出上一个节点提供的数据。""",
)

@node # 工作流节点
async def city_workflow(ctx: Context):
    city_time = await ctx.run_node(city_time_function, "Paris")
    report_text = await ctx.run_node(city_report_agent, city_time)

    return report_text

有关工作流节点之间数据处理的信息,请参阅智能体工作流的数据处理

工作流路由策略

基于图的工作流相比,ADK 中的动态工作流在路由逻辑方面提供了更大的灵活性,包括迭代循环或更复杂的分支逻辑。本节描述了一些可用于路由的技术。

顺序路由

你可以使用 ADK 动态工作流创建顺序任务处理,就像使用基于图的工作流一样。以下代码片段展示了一个包含一个智能体、一个函数节点和第二个智能体的动态工作流:

@node # 工作流节点
async def city_workflow(ctx: Context):
    # 按顺序执行三个节点
    city = await ctx.run_node(city_generator_agent)
    city_time = await ctx.run_node(city_time_function, city)
    report_text = await ctx.run_node(city_report_agent, city_time)

    return report_text

循环路由

对于希望对任务使用迭代循环的工作流,动态工作流为你提供了更大的灵活性来定义所需的路由逻辑。以下代码示例展示了如何使用动态工作流构建一个用于生成、评审和更新代码的工作流循环:

from google.adk import Context
from google.adk import Event
from google.adk.agents import LlmAgent
from google.adk.workflow import node

coder_agent = LlmAgent(
    name="generator_agent",
    model="gemini-flash-latest",
    instruction="Write python code for user request.",
    output_schema=str,
)

# 代码静态检查节点
@node(name="lint_reviewer")
async def compile_lint_check(ctx: Context, code: str):
    # 模拟 API 调用或代码静态检查
    class Response:
        findings = ""
    return Response()

# 修复代码的智能体
fixer_agent = LlmAgent(
    name="fixer_agent",
    model="gemini-flash-latest",
    instruction="""Refactor current code {code}.
        Based on compile & lint review: {findings}""",
    output_schema=str,
)

@node # 工作流节点
async def code_workflow(ctx: Context, user_request: str):
  code = await ctx.run_node(coder_agent, user_request)
  check_resp = await ctx.run_node(compile_lint_check, code)

  # 循环,直到没有发现问题
  while check_resp.findings:
    yield Event(state={"code": code, "findings": check_resp.findings})
    code = await ctx.run_node(fixer_agent, {"code": code, "findings": check_resp.findings})

    check_resp = await ctx.run_node(compile_lint_check, code)

    return code

并行执行路由

ADK 中的动态工作流可以支持并行执行,你可以使用标准的异步库(如 asyncio)来构建此功能。以下代码示例展示了如何使用 @nodeasyncio.gather 构建支持并行执行的工作流节点:

import asyncio
from typing import Any
from google.adk import Context
from google.adk.workflow import BaseNode, node


@node(rerun_on_resume=True)
async def parallel_supervisor(
    ctx: Context, node_input: list[Any], real_node: BaseNode
):
    """对输入列表中的每一项并行运行一个工作节点。"""
    tasks = []
    for item in node_input:
        # ctx.run_node 返回一个 future。先追加再等待,而不是立即等待。
        tasks.append(ctx.run_node(real_node, item))

    # 并行收集所有结果
    results = await asyncio.gather(*tasks)
    return results

提示:恢复运行中的并行节点

工作流框架确保如果动态工作流恢复运行,只有失败或中断的工作节点会被重新执行,这包括并行的工作节点。

人机协作 (Human Input)

ADK 中的动态工作流也可以包含人工输入或人机回环(HITL)步骤。你通过在工作流节点中 yield 一个 RequestInput 来将人工输入构建到工作流中,这会暂停工作流并等待用户输入。以下代码示例展示了如何构建一个人工输入节点并将其包含在工作流中:

from typing import Any
from google.adk import Context
from google.adk.events import RequestInput
from google.adk.workflow import node


@node(rerun_on_resume=False)
async def get_user_approval(ctx: Context, node_input: Any):
    """yield 一个 RequestInput 以暂停工作流并等待用户输入。"""
    yield RequestInput(message="Please approve this request (Yes/No)")


@node(rerun_on_resume=True)
async def handle_process(ctx: Context, node_input: Any):
    """调用交互步骤的编排器。"""
    user_response = await ctx.run_node(get_user_approval)

    if user_response.lower() == "yes":
        return "Approved"
    return "Denied"

重要提示:使用 ctx.run_node 的父节点

在动态工作流中调用 ctx.run_node 的父节点必须设置 rerun_on_resume=True 以正确处理中断。

高级进阶特性

执行标识符 (Execution IDs)

ADK 为每一个节点调用生成一个确定性的 ID(由于父 ID 与计数器组成)。这构成了检查点机制的基石。在恢复执行时,框架根据这些 ID 识别该步骤是否已经成功跑过,从而决定是直接读取历史结果还是重新执行。

自定义执行 ID

谨慎使用自定义 ID

除非你非常清楚自己在做什么(例如处理顺序会发生重排的任务列表),否则请避免手动干扰执行 ID。不当的 ID 可能会破坏 ADK 的自动重试和恢复逻辑。

如果你正在处理如订单列表等具有天然唯一键的场景,可以手动指定:

from google.adk import Context
from google.adk.workflow import node
from pydantic import BaseModel
from typing import Any
import asyncio

class Order(BaseModel):
  order_id: str
  cart_items: list[Product]

@node(rerun_on_resume=True)
async def process_all_orders(ctx: Context, node_input: Any):
  orders = await get_orders()

  process_tasks = []
  for order in orders:
    # 使用 run_id 提供自定义标识符。
    # 自定义 run_id 必须包含至少一个非数字字符,
    # 以避免与自动生成的顺序数字 ID 冲突。
    task = ctx.run_node(process_order, order, run_id=f"order-{order.order_id}")
    process_tasks.append(task)

  results = await asyncio.gather(*process_tasks)
  return results

默认情况下,自动生成的运行 ID 是从 "1" 开始的顺序整数(表示为字符串)。自定义 run_id 值必须包含至少一个非数字字符,以避免与这些自动生成的 ID 冲突。