Skip to content

恢复停止的智能体

ADK 智能体的执行可能因各种因素而中断,包括网络连接断开、电源故障或必需的外部系统离线。 ADK 的恢复功能允许智能体工作流从上次停止的地方继续,避免重新启动整个工作流。 在 ADK Python 1.16 及更高版本中,你可以将 ADK 工作流配置为可恢复,以便它可以跟踪工作流的执行,然后允许你在意外中断后恢复它。

本指南说明如何将你的 ADK 智能体工作流配置为可恢复。 如果你使用自定义智能体,你可以将它们更新为可恢复。 有关详细信息,请参阅向自定义智能体添加恢复功能

添加可恢复配置

通过将可恢复性配置应用于你的 ADK 工作流的 App 对象,为智能体工作流启用恢复功能,如下代码示例所示:

app = App(
    name='my_resumable_agent',
    root_agent=root_agent,
    # 设置可恢复性配置以启用可恢复性。
    resumability_config=ResumabilityConfig(
        is_resumable=True,
    ),
)

注意:长时间运行函数、确认、身份验证

对于使用长时间运行函数确认身份验证需要用户输入的智能体,添加可恢复确认会改变这些功能的操作方式。有关详细信息,请参阅这些功能的文档。

注意:自定义智能体

自定义智能体默认不支持恢复。你必须更新自定义智能体的智能体代码以支持恢复功能。有关修改自定义智能体以支持增量恢复功能的信息,请参阅向自定义智能体添加恢复功能

恢复停止的工作流

当 ADK 工作流停止执行时,你可以使用包含工作流实例调用 ID 的命令来恢复工作流,该 ID 可在工作流的事件历史记录中找到。确保 ADK API 服务器正在运行,以防它被中断或断电,然后运行以下命令来恢复工作流,如以下 API 请求示例所示。

# 如需要重启 API 服务器:
adk api_server my_resumable_agent/

# 恢复智能体:
curl -X POST http://localhost:8000/run_sse \
 -H "Content-Type: application/json" \
 -d '{
   "app_name": "my_resumable_agent",
   "user_id": "u_123",
   "session_id": "s_abc",
   "invocation_id": "invocation-123",
 }'

你也可以使用 Runner 对象的 Run Async 方法来恢复工作流,如下所示:

runner.run_async(user_id='u_123', session_id='s_abc', 
    invocation_id='invocation-123')

# 当 new_message 设置为函数响应时,
# 我们尝试恢复长时间运行的函数。

注意

目前不支持从 ADK Web 用户界面或使用 ADK 命令行(CLI)工具恢复工作流。

工作原理

恢复功能通过记录已完成的智能体工作流任务来工作,包括使用事件事件操作跟踪可恢复工作流中智能体任务的完成情况。如果工作流被中断然后稍后重新启动,系统通过设置每个智能体的完成状态来恢复工作流。如果智能体未完成,工作流系统会恢复该智能体的任何已完成事件,并从部分完成状态重新启动工作流。 对于多智能体工作流,具体的恢复行为会有所不同,基于你工作流中的多智能体类,如下所述:

  • 顺序智能体:从保存的状态读取 current_sub_agent 以找到序列中要运行的下一个子智能体。
  • 循环智能体:使用 current_sub_agent 和 times_looped 值从上次完成的迭代和子智能体继续循环。
  • 并行智能体:确定哪些子智能体已经完成并仅运行那些尚未完成的智能体。

事件记录包括成功返回结果的工具的结果。因此,如果智能体成功执行了函数工具 A 和 B,然后在执行工具 C 期间失败,系统会恢复工具 A 和 B 的结果,并通过重新运行工具 C 请求来恢复工作流。

注意:工具执行行为

使用工具恢复工作流时,恢复功能确保智能体中的工具至少运行一次,并且在恢复工作流时可能运行多次。 如果你的智能体使用重复运行会产生负面影响的工具,如购买,你应该修改工具以检查并防止重复运行。

注意:不支持使用恢复修改工作流

在恢复之前不要修改已停止的智能体工作流。 例如,不支持向已停止的工作流添加或移除智能体然后恢复该工作流。

向自定义智能体添加恢复功能

自定义智能体有特定的实现要求以支持可恢复性。 你必须在自定义智能体内决定并定义工作流步骤,这些步骤产生可以在传递给下一步处理之前保存的结果。 以下步骤概述了如何修改自定义智能体以支持工作流恢复。

  • 创建 CustomAgentState 类:扩展 BaseAgentState 以创建 保留你的智能体状态的对象。
    • 可选,创建 WorkFlowStep 类:如果你的自定义智能体有顺序步骤,请考虑创建一个 WorkFlowStep 列表对象,定义智能体的离散、可保存的步骤。
  • 添加初始智能体状态:修改你的智能体的异步运行函数以设置你的智能体的初始状态。
  • 添加智能体状态检查点:修改你的智能体的异步运行函数以为智能体整体任务的每个已完成步骤生成和保存智能体状态。
  • 添加智能体结束状态以跟踪智能体状态:修改你的智能体的异步运行函数以在成功完成智能体的全部任务时包含 end_of_agent=True 状态。

以下示例显示了对自定义智能体指南中显示的 StoryFlowAgent 类所需的代码修改:

class WorkflowStep(int, Enum):
 INITIAL_STORY_GENERATION = 1
 CRITIC_REVISER_LOOP = 2
 POST_PROCESSING = 3
 CONDITIONAL_REGENERATION = 4

# 扩展 BaseAgentState

### class StoryFlowAgentState(BaseAgentState):

###   step = WorkflowStep

@override
async def _run_async_impl(
    self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
    """
    实现故事工作流的自定义编排逻辑。
    使用由 Pydantic 分配的实例属性(例如,self.story_generator)。
    """
    agent_state = self._load_agent_state(ctx, WorkflowStep)

    if agent_state is None:
      # 记录智能体的开始
      agent_state = StoryFlowAgentState(step=WorkflowStep.INITIAL_STORY_GENERATION)
      yield self._create_agent_state_event(ctx, agent_state)

    next_step = agent_state.step
    logger.info(f"[{self.name}] 开始故事生成工作流。")

    # 步骤 1. 初始故事生成
    if next_step <= WorkflowStep.INITIAL_STORY_GENERATION:
      logger.info(f"[{self.name}] 运行 StoryGenerator...")
      async for event in self.story_generator.run_async(ctx):
          yield event

      # 检查故事是否在继续之前生成
      if "current_story" not in ctx.session.state or not ctx.session.state[
          "current_story"
      ]:
          return  # 如果初始故事失败则停止处理

    agent_state = StoryFlowAgentState(step=WorkflowStep.CRITIC_REVISER_LOOP)
    yield self._create_agent_state_event(ctx, agent_state)

    # 步骤 2. 批评 - 修订循环
    if next_step <= WorkflowStep.CRITIC_REVISER_LOOP:
      logger.info(f"[{self.name}] 运行 CriticReviserLoop...")
      async for event in self.loop_agent.run_async(ctx):
          logger.info(
              f"[{self.name}] 来自 CriticReviserLoop 的事件:"
              f"{event.model_dump_json(indent=2, exclude_none=True)}"
          )
          yield event

    agent_state = StoryFlowAgentState(step=WorkflowStep.POST_PROCESSING)
    yield self._create_agent_state_event(ctx, agent_state)

    # 步骤 3. 顺序后处理(语法和语调检查)
    if next_step <= WorkflowStep.POST_PROCESSING:
      logger.info(f"[{self.name}] 运行 PostProcessing...")
      async for event in self.sequential_agent.run_async(ctx):
          logger.info(
              f"[{self.name}] 来自 PostProcessing 的事件:"
              f"{event.model_dump_json(indent=2, exclude_none=True)}"
          )
          yield event

    agent_state = StoryFlowAgentState(step=WorkflowStep.CONDITIONAL_REGENERATION)
    yield self._create_agent_state_event(ctx, agent_state)

    # 步骤 4. 基于语调的条件逻辑
    if next_step <= WorkflowStep.CONDITIONAL_REGENERATION:
      tone_check_result = ctx.session.state.get("tone_check_result")
      if tone_check_result == "negative":
          logger.info(f"[{self.name}] 语调是负面的。重新生成故事...")
          async for event in self.story_generator.run_async(ctx):
              logger.info(
                  f"[{self.name}] 来自 StoryGenerator (重新生成) 的事件:"
                  f"{event.model_dump_json(indent=2, exclude_none=True)}"
              )
              yield event
      else:
          logger.info(f"[{self.name}] 语调不是负面的。保留当前故事。")

    logger.info(f"[{self.name}] 工作流完成。")
    yield self._create_agent_state_event(ctx, end_of_agent=True)