Skip to content

ADK 运行时

什么是运行时?

ADK 运行时 (ADK Runtime) 是在用户交互期间为你的智能体应用程序提供动力的底层引擎。它是一个系统,接收你定义的智能体、工具和回调,并协调它们对用户输入的执行,管理信息流、状态变化以及与外部服务(如 LLM 或存储)的交互。

可以将运行时视为你的智能体应用程序的"引擎"。你定义部件(智能体、工具),而运行时处理它们如何连接并一起运行以满足用户请求。

核心思想:事件循环

ADK 运行时的核心是一个事件循环。这个循环促进了Runner组件与你定义的"执行逻辑"(包括你的智能体、它们进行的 LLM 调用、回调和工具)之间的来回通信。

intro_components.png

简而言之:

  1. Runner接收用户查询并要求主Agent开始处理。
  2. Agent(及其相关逻辑)运行,直到它有东西要报告(如响应、使用工具的请求或状态变更)—然后它产出一个Event
  3. Runner接收这个Event,处理任何相关操作(如通过Services保存状态变更),并将事件向前传递(例如,到用户界面)。
  4. 只有在Runner处理完事件后,Agent的逻辑才从暂停处恢复,现在可能会看到 Runner 提交的变更效果。
  5. 这个循环重复进行,直到智能体对当前用户查询没有更多事件要产出。

这种事件驱动的循环是管理 ADK 如何执行你的智能体代码的基本模式。

心跳:事件循环 - 内部工作原理

事件循环是定义Runner和你的自定义代码(智能体、工具、回调,统称为"执行逻辑"或"逻辑组件")之间交互的核心操作模式。它建立了明确的责任划分:

Runner 的角色(协调者)

Runner作为单个用户调用的中央协调器。其在循环中的责任包括:

  1. 初始化: 接收终端用户的查询(new_message)并通常通过SessionService将其附加到会话历史记录。
  2. 启动: 通过调用主智能体的执行方法(例如,agent_to_run.run_async(...))开始事件生成过程。
  3. 接收和处理: 等待智能体逻辑yield一个Event。在收到事件后,Runner立即处理它。这包括:
    • 使用配置的ServicesSessionServiceArtifactServiceMemoryService)来提交event.actions中指示的更改(如state_deltaartifact_delta)。
    • 执行其他内部簿记工作。
  4. 向上传递: 将处理后的事件转发(例如,到调用应用程序或 UI 以进行渲染)。
  5. 迭代: 向智能体逻辑发出信号,表明产出事件的处理已完成,允许它恢复并生成下一个事件。

概念性 Runner 循环:

# Runner 主循环逻辑的简化视图
def run(new_query, ...) -> Generator[Event]:
    # 1. 将 new_query 追加到会话事件历史(通过 SessionService)
    session_service.append_event(session, Event(author='user', content=new_query))

    # 2. 通过调用智能体启动事件循环
    agent_event_generator = agent_to_run.run_async(context)

    async for event in agent_event_generator:
        # 3. 处理生成的事件并提交更改
        session_service.append_event(session, event) # 提交 state/artifact deltas 等
        # memory_service.update_memory(...) # 如果适用
        # artifact_service 可能已通过上下文在智能体运行期间被调用

        # 4. 产出事件供上游处理(例如,UI 渲染)
        yield event
        # Runner 在产出后隐式地向智能体生成器发出可以继续的信号

执行逻辑的角色(智能体、工具、回调)

你在智能体、工具和回调中的代码负责实际的计算和决策。它与循环的交互包括:

  1. 执行: 基于当前InvocationContext运行其逻辑,包括恢复执行时的会话状态。
  2. 产出: 当逻辑需要通信(发送消息、调用工具、报告状态变更)时,它构造一个包含相关内容和操作的Event,然后将这个事件yieldRunner
  3. 暂停: 关键的是,智能体逻辑在yield语句后立即暂停。它等待Runner完成步骤 3(处理和提交)。
  4. 恢复: 只有在Runner处理完产出的事件后,智能体逻辑才从紧接在yield之后的语句恢复执行。
  5. 查看更新的状态: 恢复后,智能体逻辑现在可以可靠地访问反映Runner之前产出的事件提交的更改的会话状态(ctx.session.state)。

概念性执行逻辑:

# Agent.run_async、回调或工具内部逻辑的简化视图

# ... 先前的代码基于当前状态运行 ...

# 1. 确定需要更改或输出,构造事件
# 示例:更新状态
update_data = {'field_1': 'value_2'}
event_with_state_change = Event(
    author=self.name,
    actions=EventActions(state_delta=update_data),
    content=types.Content(parts=[types.Part(text="状态已更新。")])
    # ... 其他事件字段 ...
)

# 2. 将事件产出给 Runner 进行处理和提交
yield event_with_state_change
# <<<<<<<<<<<< 执行在此暂停 >>>>>>>>>>>>

# <<<<<<<<<<<< RUNNER 处理并提交事件 >>>>>>>>>>>>

# 3. 仅在 Runner 处理完上述事件后恢复执行。
# 现在,Runner 提交的状态变更可靠地反映出来。
# 后续代码可以安全地假设产出事件中的更改已发生。
val = ctx.session.state['field_1']
# 这里`val`保证是"value_2"(假设 Runner 成功提交)
print(f"恢复执行。field_1 的值现在是:{val}")

# ... 后续代码继续 ...
# 可能稍后会产出另一个事件...

这种Runner和你的执行逻辑之间的协作产出/暂停/恢复循环,通过Event对象调解,构成了 ADK 运行时的核心。

运行时的关键组件

在 ADK 运行时内部,几个组件协同工作以执行智能体调用。了解它们的角色可以清楚地说明事件循环是如何运作的:

  1. Runner

    • 角色: 单个用户查询的主要入口点和协调器(run_async)。
    • 功能: 管理整体事件循环,接收执行逻辑产出的事件,与服务协调处理和提交事件操作(状态/制品(Artifacts)更改),并将处理过的事件向上游转发(例如,到 UI)。它基本上是逐个回合地根据产出的事件驱动对话。(定义在google.adk.runners.runner.py中)。
  2. 执行逻辑组件

    • 角色: 包含你的自定义代码和核心智能体功能的部分。
    • 组件:
    • AgentBaseAgentLlmAgent等):你的主要逻辑单元,处理信息并决定操作。它们实现_run_async_impl方法,产出事件。
    • ToolsBaseToolFunctionToolAgentTool等):智能体(通常是LlmAgent)用于与外部世界交互或执行特定任务的外部函数或功能。它们执行并返回结果,然后这些结果被包装在事件中。
    • Callbacks(函数):附加到智能体的用户定义函数(例如,before_agent_callbackafter_model_callback),挂钩到执行流程中的特定点,可能修改行为或状态,其效果被捕获在事件中。
    • 功能: 执行实际的思考、计算或外部交互。它们通过产出Event对象并暂停直到 Runner 处理它们来传达结果或需求。
  3. Event

    • 角色:Runner和执行逻辑之间来回传递的消息。
    • 功能: 表示一个原子性事件(用户输入、智能体文本、工具调用/结果、状态变更请求、控制信号)。它既携带事件内容,也携带预期的副作用(如state_deltaactions)。(定义在google.adk.events.event.py中)。
  4. Services

    • 角色: 负责管理持久性或共享资源的后端组件。主要由Runner在事件处理期间使用。
    • 组件:
    • SessionServiceBaseSessionServiceInMemorySessionService等):管理Session对象,包括保存/加载它们,将state_delta应用到会话状态,以及将事件追加到事件历史
    • ArtifactServiceBaseArtifactServiceInMemoryArtifactServiceGcsArtifactService等):管理二进制制品(Artifacts)数据的存储和检索。虽然save_artifact是通过执行逻辑期间的上下文调用的,但事件中的artifact_delta确认了 Runner/SessionService 的操作。
    • MemoryServiceBaseMemoryService等):(可选)管理用户跨会话的长期语义记忆。
    • 功能: 提供持久层。Runner与它们交互,确保由event.actions信号传递的更改在执行逻辑恢复之前可靠地存储。
  5. Session

    • 角色: 一个数据容器,保存用户和应用程序之间一个特定对话的状态和历史。
    • 功能: 存储当前state字典、所有过去events事件历史)的列表以及与相关制品(Artifacts)的引用。它是交互的主要记录,由SessionService管理。(定义在google.adk.sessions.session.py中)。
  6. Invocation

    • 角色: 一个概念性术语,表示从Runner接收单个用户查询的那一刻到智能体逻辑完成为该查询产出事件的过程中发生的一切。
    • 功能: 一次调用可能涉及多个智能体运行(如果使用智能体转移或AgentTool)、多个 LLM 调用、工具执行和回调执行,所有这些都通过InvocationContext中的单个invocation_id联系在一起。

这些参与者通过事件循环持续交互,处理用户的请求。

工作原理:简化的调用

让我们追踪一个典型的用户查询的简化流程,该查询涉及一个调用工具的 LLM 智能体:

intro_components.png

步骤分解

  1. 用户输入: 用户发送查询(例如,"法国的首都是什么?")。
  2. Runner 启动: Runner.run_async开始。它与SessionService交互以加载相关的Session,并将用户查询作为第一个Event添加到会话历史中。准备一个InvocationContextctx)。
  3. 智能体执行: Runner在指定的根智能体(如LlmAgent)上调用agent.run_async(ctx)
  4. LLM 调用(示例): Agent_Llm确定它需要信息,可能通过调用工具。它为LLM准备请求。假设 LLM 决定调用MyTool
  5. 产出 FunctionCall 事件: Agent_Llm从 LLM 接收FunctionCall响应,将其包装在Event(author='Agent_Llm', content=Content(parts=[Part(function_call=...)]))中,并yield此事件。
  6. 智能体暂停: Agent_Llm的执行在yield后立即暂停。
  7. Runner 处理: Runner接收 FunctionCall 事件。它将事件传递给SessionService记录在历史中。然后Runner向上游(User或应用程序)产出事件。
  8. 智能体恢复: Runner发出事件已处理的信号,Agent_Llm恢复执行。
  9. 工具执行: Agent_Llm的内部流程现在继续执行请求的MyTool。它调用tool.run_async(...)
  10. 工具返回结果: MyTool执行并返回其结果(例如,{'result': 'Paris'})。
  11. 产出 FunctionResponse 事件: 智能体(Agent_Llm)将工具结果包装成包含FunctionResponse部分的Event(例如,Event(author='Agent_Llm', content=Content(role='user', parts=[Part(function_response=...)])))。如果工具修改了状态(state_delta)或保存了制品(Artifacts)(artifact_delta),此事件也可能包含actions。智能体yield此事件。
  12. 智能体再次暂停: Agent_Llm再次暂停。
  13. Runner 处理: Runner接收 FunctionResponse 事件。它将事件传递给SessionService,后者应用任何state_delta/artifact_delta并将事件添加到历史中。Runner向上游产出事件。
  14. 智能体恢复: Agent_Llm恢复,现在知道工具结果和任何状态更改都已提交。
  15. 最终 LLM 调用(示例): Agent_Llm将工具结果发送回LLM以生成自然语言响应。
  16. 产出最终文本事件: Agent_LlmLLM接收最终文本,将其包装在Event(author='Agent_Llm', content=Content(parts=[Part(text=...)]))中,并yield它。
  17. 智能体暂停: Agent_Llm暂停。
  18. Runner 处理: Runner接收最终文本事件,将其传递给SessionService记录历史,并将其产出到上游User。这可能被标记为is_final_response()
  19. 智能体恢复并完成: Agent_Llm恢复。已完成此次调用的任务,其run_async生成器完成。
  20. Runner 完成: Runner看到智能体的生成器已耗尽,并完成此次调用的循环。

这种产出/暂停/处理/恢复循环确保状态更改被一致地应用,并且执行逻辑在产出事件后始终操作在最近提交的状态上。

重要的运行时行为

了解 ADK 运行时如何处理状态、流式传输和异步操作的几个关键方面对于构建可预测和高效的智能体至关重要。

状态更新和提交时机

  • 规则: 当你的代码(在智能体、工具或回调中)修改会话状态(例如,context.state['my_key'] = 'new_value')时,这种更改最初在当前的InvocationContext中本地记录。只有在携带相应state_deltaEvent被你的代码yield并随后被Runner处理后,此更改才保证被持久化(由SessionService保存)。

  • 含义: 在从yield恢复后运行的代码可以可靠地假设已产出事件中信号的状态更改已经被提交。

# 智能体逻辑内部(概念性)

# 1. 修改状态
ctx.session.state['status'] = 'processing'
event1 = Event(..., actions=EventActions(state_delta={'status': 'processing'}))

# 2. 产出带有增量的事件
yield event1
# --- 暂停 --- Runner 处理 event1,SessionService 提交'status' = 'processing' ---

# 3. 恢复执行
# 现在可以安全地依赖已提交的状态
current_status = ctx.session.state['status'] # 保证是'processing'
print(f"恢复后的状态:{current_status}")

会话状态的"脏读取"

  • 定义: 虽然提交发生在产出后,但在同一调用内稍后运行的代码,但在状态变更事件实际产出和处理之前,通常可以看到本地的、未提交的更改。这有时被称为"脏读取"。
  • 示例:
# before_agent_callback 中的代码
callback_context.state['field_1'] = 'value_1'
# 状态在本地设置为'value_1',但尚未由 Runner 提交

# ... 智能体运行 ...

# 稍后*在同一调用内*调用的工具中的代码
# 可读取(脏读取),但'value_1'尚未保证持久化。
val = tool_context.state['field_1'] # 这里的'val'可能是'value_1'
print(f"工具中的脏读取值:{val}")

# 假设携带 state_delta={'field_1': 'value_1'}的事件
# 在此工具运行并由 Runner 处理*之后*产出。
  • 含义:
  • 好处: 允许单个复杂步骤内(例如,在下一个 LLM 回合之前的多个回调或工具调用)的不同部分使用状态进行协调,而无需等待完整的产出/提交循环。
  • 警告: 严重依赖脏读取进行关键逻辑可能有风险。如果调用在携带state_delta的事件产出并由Runner处理之前失败,未提交的状态更改将丢失。对于关键状态转换,确保它们与成功处理的事件相关联。

流式 vs. 非流式输出(partial=True

这主要涉及 LLM 响应的处理方式,特别是当使用流式生成 API 时。

  • 流式: LLM 逐个标记或小块生成其响应。
  • 框架(通常在BaseLlmFlow内)为单个概念性响应产出多个Event对象。这些事件大多数将具有partial=True
  • Runner在接收到带有partial=True的事件时,通常会立即转发到上游(用于 UI 显示),但跳过处理其actions(如state_delta)。
  • 最终,框架为该响应产出一个最终事件,标记为非部分(partial=False或通过turn_complete=True隐式)。
  • Runner只完全处理这个最终事件,提交任何相关的state_deltaartifact_delta
  • 非流式: LLM 一次生成整个响应。框架产出单个标记为非部分的事件,Runner完全处理该事件。
  • 为什么重要: 确保状态更改基于来自 LLM 的完整响应原子地且只应用一次,同时仍允许 UI 在生成文本时逐步显示。

异步是主要的(run_async

  • 核心设计: ADK 运行时基本上建立在 Python 的asyncio库上,以有效处理并发操作(如等待 LLM 响应或工具执行)而不阻塞。
  • 主入口点: Runner.run_async是执行智能体调用的主要方法。所有核心可运行组件(智能体、特定流程)使用内部async def方法。
  • 同步便利(run): 主要为方便(例如,在简单脚本或测试环境中)存在同步Runner.run方法。然而,内部,Runner.run通常只是调用Runner.run_async并管理异步事件循环执行。
  • 开发者体验: 你应该通常设计你的应用程序逻辑(例如,使用 ADK 的 web 服务器)使用asyncio
  • 同步回调/工具: 框架旨在处理作为工具或回调提供的async def和常规def函数无缝。长时间运行的同步工具或回调,特别是那些执行阻塞 I/O 的,可能会潜在地阻塞主asyncio事件循环。框架可能会使用机制,如asyncio.to_thread,以减轻通过将此类阻塞同步代码运行到单独的线程池,防止它阻塞其他异步任务。然而,CPU 绑定同步代码将仍然阻塞它运行的线程。

理解这些行为有助于编写更强大的 ADK 应用程序并调试与状态一致性、流式更新和异步执行相关的 issues。