恢复停止的智能体¶
ADK 智能体的执行可能因各种因素而中断,包括网络连接断开、电源故障或必需的外部系统离线。ADK 的恢复功能允许智能体工作流从上次停止的地方继续,避免重新启动整个工作流。在 ADK Python 1.16 及更高版本中,你可以将 ADK 工作流配置为可恢复,以便它可以跟踪工作流的执行,从而允许你在意外中断后恢复它。
本指南介绍了如何将 ADK 智能体工作流配置为可恢复。如果你使用的是自定义智能体,也可以将其更新为可恢复。有关更多信息,请参阅 向自定义智能体添加恢复功能。
添加可恢复配置¶
通过将可恢复性配置应用到你的 ADK 工作流的 App 对象,为智能体工作流启用恢复功能,如下代码示例所示:
app = App(
name='my_resumable_agent',
root_agent=root_agent,
# 设置可恢复性配置以启用可恢复性。
resumability_config=ResumabilityConfig(
is_resumable=True,
),
)
警告:长时间运行的函数、确认、身份验证
对于使用 长时间运行的函数 (Long Running Functions)、确认 (Confirmations) 或 身份验证 (Authentication) 且需要用户输入的智能体,添加可恢复确认会改变这些功能的运行方式。有关更多信息,请参阅这些功能的文档。
注意:自定义智能体
自定义智能体默认不支持恢复功能。你必须更新自定义智能体的代码以支持恢复功能。有关修改自定义智能体以支持增量恢复功能的信息,请参阅 向自定义智能体添加恢复功能。
恢复停止的工作流¶
当 ADK 工作流停止执行时,你可以使用包含该工作流实例的调用 ID (Invocation ID) 的命令来恢复工作流。调用 ID 可以在工作流的 事件 (Event) 历史中找到。请确保 ADK API 服务器正在运行(以防其被中断或关机),然后运行以下命令来恢复工作流,如下面的 API 请求示例所示。
# 如需要重启 API 服务器:
adk api_server my_resumable_agent/
# 恢复智能体:
curl -X POST http://localhost:8000/run_sse \
-H "Content-Type: application/json" \
-d '{
"app_name": "my_resumable_agent",
"user_id": "u_123",
"session_id": "s_abc",
"invocation_id": "invocation-123",
}'
你也可以使用 Runner 对象的 run_async 方法来恢复工作流,如下所示:
runner.run_async(user_id='u_123', session_id='s_abc',
invocation_id='invocation-123')
# 当 new_message 设置为函数响应时,
# 我们尝试恢复长时间运行的函数。
注意
目前不支持从 ADK Web 用户界面或使用 ADK 命令行 (CLI) 工具恢复工作流。
工作原理¶
恢复功能的工作原理是:通过使用 事件 (Events) 和 事件动作 (Event Actions) 记录已完成的智能体工作流任务,并在可恢复的工作流中跟踪智能体任务的完成情况。如果工作流被中断并在稍后重新启动,系统会通过设置每个智能体的完成状态来恢复工作流。如果某个智能体未完成,工作流系统会恢复该智能体已完成的所有事件,并从部分完成的状态重新启动工作流。对于多智能体工作流,具体的恢复行为会有所不同,这取决于你工作流中的多智能体类,具体说明如下:
- 顺序智能体:从保存的状态读取
current_sub_agent以找到序列中要运行的下一个子智能体。 - 循环智能体:使用
current_sub_agent和times_looped值从上次完成的迭代和子智能体继续循环。 - 并行智能体:确定哪些子智能体已经完成,并仅运行那些尚未完成的智能体。
事件记录包括成功返回结果的工具的结果。因此,如果智能体成功执行了函数工具 A 和 B,然后在执行工具 C 期间失败,系统会恢复工具 A 和 B 的结果,并通过重新运行工具 C 请求来恢复工作流。
注意:工具执行行为
使用工具恢复工作流时,恢复功能确保智能体中的工具至少运行一次,并且在恢复工作流时可能运行多次。如果你的智能体使用重复运行会产生负面影响的工具(如购买),你应该修改工具以检查并防止重复运行。
注意:不支持在恢复前修改工作流
在恢复停止的智能体工作流之前,请勿对其进行修改。例如,不支持在工作流停止后添加或删除智能体,然后恢复该工作流。
向自定义智能体添加恢复功能¶
自定义智能体有特定的实现要求以支持可恢复性。你必须在自定义智能体内决定并定义工作流步骤,这些步骤产生可以在传递给下一步处理之前保存的结果。以下步骤概述了如何修改自定义智能体以支持工作流恢复:
- 创建
CustomAgentState类:扩展BaseAgentState以创建保留你智能体状态的对象。- 可选,创建
WorkFlowStep类:如果你的自定义智能体有顺序步骤,请考虑创建一个WorkFlowStep列表对象,定义智能体的离散、可保存的步骤。
- 可选,创建
- 添加初始智能体状态:修改你的智能体的异步运行函数以设置你的智能体的初始状态。
- 添加智能体状态检查点:修改你的智能体的异步运行函数,为智能体整体任务的每个已完成步骤生成和保存智能体状态。
- 添加智能体结束状态以跟踪智能体状态:修改你的智能体的异步运行函数,在成功完成智能体的全部任务时包含
end_of_agent=True状态。
以下示例展示了对 自定义智能体 (Custom Agents) 指南中所示的 StoryFlowAgent 类进行的必要代码修改:
class WorkflowStep(int, Enum):
INITIAL_STORY_GENERATION = 1
CRITIC_REVISER_LOOP = 2
POST_PROCESSING = 3
CONDITIONAL_REGENERATION = 4
# 扩展 BaseAgentState
### class StoryFlowAgentState(BaseAgentState):
### step = WorkflowStep
@override
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
"""
实现故事工作流的自定义编排逻辑。
使用由 Pydantic 分配的实例属性(例如,self.story_generator)。
"""
agent_state = self._load_agent_state(ctx, WorkflowStep)
if agent_state is None:
# 记录智能体的开始
agent_state = StoryFlowAgentState(step=WorkflowStep.INITIAL_STORY_GENERATION)
yield self._create_agent_state_event(ctx, agent_state)
next_step = agent_state.step
logger.info(f"[{self.name}] 开始故事生成工作流。")
# 步骤 1. 初始故事生成
if next_step <= WorkflowStep.INITIAL_STORY_GENERATION:
logger.info(f"[{self.name}] 运行 StoryGenerator...")
async for event in self.story_generator.run_async(ctx):
yield event
# 检查故事是否在继续之前生成
if "current_story" not in ctx.session.state or not ctx.session.state[
"current_story"
]:
return # 如果初始故事失败则停止处理
agent_state = StoryFlowAgentState(step=WorkflowStep.CRITIC_REVISER_LOOP)
yield self._create_agent_state_event(ctx, agent_state)
# 步骤 2. 批评 - 修订循环
if next_step <= WorkflowStep.CRITIC_REVISER_LOOP:
logger.info(f"[{self.name}] 运行 CriticReviserLoop...")
async for event in self.loop_agent.run_async(ctx):
logger.info(
f"[{self.name}] 来自 CriticReviserLoop 的事件:"
f"{event.model_dump_json(indent=2, exclude_none=True)}"
)
yield event
agent_state = StoryFlowAgentState(step=WorkflowStep.POST_PROCESSING)
yield self._create_agent_state_event(ctx, agent_state)
# 步骤 3. 顺序后处理(语法和语调检查)
if next_step <= WorkflowStep.POST_PROCESSING:
logger.info(f"[{self.name}] 运行 PostProcessing...")
async for event in self.sequential_agent.run_async(ctx):
logger.info(
f"[{self.name}] 来自 PostProcessing 的事件:"
f"{event.model_dump_json(indent=2, exclude_none=True)}"
)
yield event
agent_state = StoryFlowAgentState(step=WorkflowStep.CONDITIONAL_REGENERATION)
yield self._create_agent_state_event(ctx, agent_state)
# 步骤 4. 基于语调的条件逻辑
if next_step <= WorkflowStep.CONDITIONAL_REGENERATION:
tone_check_result = ctx.session.state.get("tone_check_result")
if tone_check_result == "negative":
logger.info(f"[{self.name}] 语调是负面的。重新生成故事...")
async for event in self.story_generator.run_async(ctx):
logger.info(
f"[{self.name}] 来自 StoryGenerator (重新生成) 的事件:"
f"{event.model_dump_json(indent=2, exclude_none=True)}"
)
yield event
else:
logger.info(f"[{self.name}] 语调不是负面的。保留当前故事。")
logger.info(f"[{self.name}] 工作流完成。")
yield self._create_agent_state_event(ctx, end_of_agent=True)