Skip to content

恢复停止的智能体

Supported in ADKPython v1.14.0

ADK 智能体的执行可能因各种因素而中断,包括网络连接断开、电源故障或必需的外部系统离线。ADK 的恢复功能允许智能体工作流从上次停止的地方继续,避免重新启动整个工作流。在 ADK Python 1.16 及更高版本中,你可以将 ADK 工作流配置为可恢复,以便它可以跟踪工作流的执行,从而允许你在意外中断后恢复它。

本指南介绍了如何将 ADK 智能体工作流配置为可恢复。如果你使用的是自定义智能体,也可以将其更新为可恢复。有关更多信息,请参阅 向自定义智能体添加恢复功能

添加可恢复配置

通过将可恢复性配置应用到你的 ADK 工作流的 App 对象,为智能体工作流启用恢复功能,如下代码示例所示:

app = App(
    name='my_resumable_agent',
    root_agent=root_agent,
    # 设置可恢复性配置以启用可恢复性。
    resumability_config=ResumabilityConfig(
        is_resumable=True,
    ),
)

警告:长时间运行的函数、确认、身份验证

对于使用 长时间运行的函数 (Long Running Functions)确认 (Confirmations)身份验证 (Authentication) 且需要用户输入的智能体,添加可恢复确认会改变这些功能的运行方式。有关更多信息,请参阅这些功能的文档。

注意:自定义智能体

自定义智能体默认不支持恢复功能。你必须更新自定义智能体的代码以支持恢复功能。有关修改自定义智能体以支持增量恢复功能的信息,请参阅 向自定义智能体添加恢复功能

恢复停止的工作流

当 ADK 工作流停止执行时,你可以使用包含该工作流实例的调用 ID (Invocation ID) 的命令来恢复工作流。调用 ID 可以在工作流的 事件 (Event) 历史中找到。请确保 ADK API 服务器正在运行(以防其被中断或关机),然后运行以下命令来恢复工作流,如下面的 API 请求示例所示。

# 如需要重启 API 服务器:
adk api_server my_resumable_agent/

# 恢复智能体:
curl -X POST http://localhost:8000/run_sse \
 -H "Content-Type: application/json" \
 -d '{
   "app_name": "my_resumable_agent",
   "user_id": "u_123",
   "session_id": "s_abc",
   "invocation_id": "invocation-123",
 }'

你也可以使用 Runner 对象的 run_async 方法来恢复工作流,如下所示:

runner.run_async(user_id='u_123', session_id='s_abc',
    invocation_id='invocation-123')

# 当 new_message 设置为函数响应时,
# 我们尝试恢复长时间运行的函数。

注意

目前不支持从 ADK Web 用户界面或使用 ADK 命令行 (CLI) 工具恢复工作流。

工作原理

恢复功能的工作原理是:通过使用 事件 (Events)事件动作 (Event Actions) 记录已完成的智能体工作流任务,并在可恢复的工作流中跟踪智能体任务的完成情况。如果工作流被中断并在稍后重新启动,系统会通过设置每个智能体的完成状态来恢复工作流。如果某个智能体未完成,工作流系统会恢复该智能体已完成的所有事件,并从部分完成的状态重新启动工作流。对于多智能体工作流,具体的恢复行为会有所不同,这取决于你工作流中的多智能体类,具体说明如下:

  • 顺序智能体:从保存的状态读取 current_sub_agent 以找到序列中要运行的下一个子智能体。
  • 循环智能体:使用 current_sub_agenttimes_looped 值从上次完成的迭代和子智能体继续循环。
  • 并行智能体:确定哪些子智能体已经完成,并仅运行那些尚未完成的智能体。

事件记录包括成功返回结果的工具的结果。因此,如果智能体成功执行了函数工具 A 和 B,然后在执行工具 C 期间失败,系统会恢复工具 A 和 B 的结果,并通过重新运行工具 C 请求来恢复工作流。

注意:工具执行行为

使用工具恢复工作流时,恢复功能确保智能体中的工具至少运行一次,并且在恢复工作流时可能运行多次。如果你的智能体使用重复运行会产生负面影响的工具(如购买),你应该修改工具以检查并防止重复运行。

注意:不支持在恢复前修改工作流

在恢复停止的智能体工作流之前,请勿对其进行修改。例如,不支持在工作流停止后添加或删除智能体,然后恢复该工作流。

向自定义智能体添加恢复功能

自定义智能体有特定的实现要求以支持可恢复性。你必须在自定义智能体内决定并定义工作流步骤,这些步骤产生可以在传递给下一步处理之前保存的结果。以下步骤概述了如何修改自定义智能体以支持工作流恢复:

  • 创建 CustomAgentState:扩展 BaseAgentState 以创建保留你智能体状态的对象。
    • 可选,创建 WorkFlowStep:如果你的自定义智能体有顺序步骤,请考虑创建一个 WorkFlowStep 列表对象,定义智能体的离散、可保存的步骤。
  • 添加初始智能体状态:修改你的智能体的异步运行函数以设置你的智能体的初始状态。
  • 添加智能体状态检查点:修改你的智能体的异步运行函数,为智能体整体任务的每个已完成步骤生成和保存智能体状态。
  • 添加智能体结束状态以跟踪智能体状态:修改你的智能体的异步运行函数,在成功完成智能体的全部任务时包含 end_of_agent=True 状态。

以下示例展示了对 自定义智能体 (Custom Agents) 指南中所示的 StoryFlowAgent 类进行的必要代码修改:

class WorkflowStep(int, Enum):
 INITIAL_STORY_GENERATION = 1
 CRITIC_REVISER_LOOP = 2
 POST_PROCESSING = 3
 CONDITIONAL_REGENERATION = 4

# 扩展 BaseAgentState

### class StoryFlowAgentState(BaseAgentState):

###   step = WorkflowStep

@override
async def _run_async_impl(
    self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
    """
    实现故事工作流的自定义编排逻辑。
    使用由 Pydantic 分配的实例属性(例如,self.story_generator)。
    """
    agent_state = self._load_agent_state(ctx, WorkflowStep)

    if agent_state is None:
      # 记录智能体的开始
      agent_state = StoryFlowAgentState(step=WorkflowStep.INITIAL_STORY_GENERATION)
      yield self._create_agent_state_event(ctx, agent_state)

    next_step = agent_state.step
    logger.info(f"[{self.name}] 开始故事生成工作流。")

    # 步骤 1. 初始故事生成
    if next_step <= WorkflowStep.INITIAL_STORY_GENERATION:
      logger.info(f"[{self.name}] 运行 StoryGenerator...")
      async for event in self.story_generator.run_async(ctx):
          yield event

      # 检查故事是否在继续之前生成
      if "current_story" not in ctx.session.state or not ctx.session.state[
          "current_story"
      ]:
          return  # 如果初始故事失败则停止处理

    agent_state = StoryFlowAgentState(step=WorkflowStep.CRITIC_REVISER_LOOP)
    yield self._create_agent_state_event(ctx, agent_state)

    # 步骤 2. 批评 - 修订循环
    if next_step <= WorkflowStep.CRITIC_REVISER_LOOP:
      logger.info(f"[{self.name}] 运行 CriticReviserLoop...")
      async for event in self.loop_agent.run_async(ctx):
          logger.info(
              f"[{self.name}] 来自 CriticReviserLoop 的事件:"
              f"{event.model_dump_json(indent=2, exclude_none=True)}"
          )
          yield event

    agent_state = StoryFlowAgentState(step=WorkflowStep.POST_PROCESSING)
    yield self._create_agent_state_event(ctx, agent_state)

    # 步骤 3. 顺序后处理(语法和语调检查)
    if next_step <= WorkflowStep.POST_PROCESSING:
      logger.info(f"[{self.name}] 运行 PostProcessing...")
      async for event in self.sequential_agent.run_async(ctx):
          logger.info(
              f"[{self.name}] 来自 PostProcessing 的事件:"
              f"{event.model_dump_json(indent=2, exclude_none=True)}"
          )
          yield event

    agent_state = StoryFlowAgentState(step=WorkflowStep.CONDITIONAL_REGENERATION)
    yield self._create_agent_state_event(ctx, agent_state)

    # 步骤 4. 基于语调的条件逻辑
    if next_step <= WorkflowStep.CONDITIONAL_REGENERATION:
      tone_check_result = ctx.session.state.get("tone_check_result")
      if tone_check_result == "negative":
          logger.info(f"[{self.name}] 语调是负面的。重新生成故事...")
          async for event in self.story_generator.run_async(ctx):
              logger.info(
                  f"[{self.name}] 来自 StoryGenerator (重新生成) 的事件:"
                  f"{event.model_dump_json(indent=2, exclude_none=True)}"
              )
              yield event
      else:
          logger.info(f"[{self.name}] 语调不是负面的。保留当前故事。")

    logger.info(f"[{self.name}] 工作流完成。")
    yield self._create_agent_state_event(ctx, end_of_agent=True)