ADK 运行时¶
什么是运行时?¶
ADK 运行时 (ADK Runtime) 是在用户交互期间为你的智能体应用程序提供动力的底层引擎。它是一个系统,接收你定义的智能体、工具和回调,并协调它们对用户输入的执行,管理信息流、状态变化以及与外部服务(如 LLM 或存储)的交互。
可以将运行时视为你的智能体应用程序的"引擎"。你定义部件(智能体、工具),而运行时处理它们如何连接并一起运行以满足用户请求。
核心思想:事件循环¶
ADK 运行时的核心是一个事件循环。这个循环促进了Runner
组件与你定义的"执行逻辑"(包括你的智能体、它们进行的 LLM 调用、回调和工具)之间的来回通信。
简而言之:
Runner
接收用户查询并请求主Agent
开始处理。Agent
(及其相关逻辑)运行,直到有内容需要报告(如响应、工具调用或状态变更)——此时它会产出或发出一个Event
。Runner
接收该Event
,处理相关操作(如通过Services
保存状态变更),并将事件转发(如发送到用户界面)。- 只有在
Runner
处理完事件后,Agent
的逻辑才会从暂停处恢复,此时它可以看到 Runner 已提交的更改。 - 该循环重复,直到智能体对当前用户查询不再产出事件。
这种事件驱动的循环是管理 ADK 如何执行你的智能体代码的基本模式。
心跳:事件循环 - 内部工作原理¶
事件循环是定义Runner
和你的自定义代码(智能体、工具、回调,统称为"执行逻辑"或"逻辑组件")之间交互的核心操作模式。它建立了明确的责任划分:
Note
不同 SDK 语言的方法名和参数名可能略有不同(如 Java 中为 agent_to_run.runAsync(...)
,Python 中为 agent_to_run.run_async(...)
)。详情请参阅各语言的 API 文档。
Runner 的角色(协调者)¶
Runner
作为单个用户调用的中央协调器。其在循环中的责任包括:
- 启动: 接收终端用户的查询(
new_message
),通常通过SessionService
将其追加到会话历史。 - 启动事件流: 通过调用主智能体的执行方法(如
agent_to_run.run_async(...)
)开始事件生成过程。 - 接收与处理: 等待智能体逻辑
yield
或emit
一个Event
。收到事件后,Runner 立即处理它。这包括:- 使用配置的
Services
(SessionService
、ArtifactService
、MemoryService
)提交event.actions
指示的更改(如state_delta
、artifact_delta
)。 - 执行其他内部记录。
- 使用配置的
- 向上游产出: 将处理后的事件转发(如发送到应用或 UI 渲染)。
- 迭代: 通知智能体逻辑已完成当前事件的处理,允许其恢复并生成下一个事件。
概念性 Runner 循环:
# Runner 主循环逻辑简化视图
def run(new_query, ...) -> Generator[Event]:
# 1. 通过 SessionService 将 new_query 追加到会话事件历史
session_service.append_event(session, Event(author='user', content=new_query))
# 2. 调用智能体,启动事件循环
agent_event_generator = agent_to_run.run_async(context)
async for event in agent_event_generator:
# 3. 处理生成的事件并提交更改
session_service.append_event(session, event) # 提交 state/artifact delta 等
# memory_service.update_memory(...) # 如适用
# artifact_service 可能已在 agent run 期间通过 context 调用
# 4. 向上游产出事件(如 UI 渲染)
yield event
# Runner 隐式通知 agent generator 可以继续
// Java 中 Runner 主循环逻辑的简化概念视图。
public Flowable<Event> runConceptual(
Session session,
InvocationContext invocationContext,
Content newQuery
) {
// 1. 通过 SessionService 将 new_query 追加到会话事件历史
// ...
sessionService.appendEvent(session, userEvent).blockingGet();
// 2. 调用智能体,启动事件流
Flowable<Event> agentEventStream = agentToRun.runAsync(invocationContext);
// 3. 处理每个生成的事件,提交更改,并 "yield" 或 "emit"
return agentEventStream.map(event -> {
// 这会修改 session 对象(添加事件,应用 stateDelta)。
// appendEvent 的返回值(Single<Event>)概念上
// 就是处理后的事件本身。
sessionService.appendEvent(session, event).blockingGet(); // 简化的阻塞调用
// memory_service.update_memory(...) // 如适用 - 概念性
// artifact_service 可能已在 agent run 期间通过 context 调用
// 4. "Yield" 事件到上游处理
// 在 RxJava 中,map 返回事件即将其产出给下一个操作符或订阅者。
return event;
});
}
执行逻辑的角色(智能体、工具、回调)¶
你在智能体、工具和回调中的代码负责实际的计算和决策。它与循环的交互包括:
- 执行: 基于当前
InvocationContext
运行逻辑,包括恢复时的会话状态。 - 产出: 当逻辑需要通信(发送消息、调用工具、报告状态变更)时,构造包含相关内容和操作的
Event
,然后yield
该事件给Runner
。 - 暂停: 关键点在于,智能体逻辑在
yield
语句(或 RxJava 中的return
)后立即暂停。它等待 Runner 完成第 3 步(处理和提交)。 - 恢复: 仅当 Runner 处理完产出的事件后,智能体逻辑才从
yield
后的语句恢复。 - 看到已更新状态: 恢复后,智能体逻辑可以可靠地访问反映 Runner 已提交更改的会话状态(
ctx.session.state
)。
概念性执行逻辑:
# Agent.run_async、回调或工具内部逻辑简化视图
# ... 之前的代码基于当前状态运行 ...
# 1. 决定需要变更或输出,构造事件
# 示例:更新状态
update_data = {'field_1': 'value_2'}
event_with_state_change = Event(
author=self.name,
actions=EventActions(state_delta=update_data),
content=types.Content(parts=[types.Part(text="State updated.")])
# ... 其他事件字段 ...
)
# 2. 产出事件给 Runner 处理和提交
yield event_with_state_change
# <<<<<<<<<<<< 此处执行暂停 >>>>>>>>>>>>
# <<<<<<<<<<<< RUNNER 处理并提交事件 >>>>>>>>>>>>
# 3. 仅在 Runner 处理完上述事件后恢复执行。
# 现在,Runner 提交的状态已可靠反映。
# 后续代码可以安全假设产出事件的更改已生效。
val = ctx.session.state['field_1']
# 此时 val 保证为 "value_2"(假设 Runner 成功提交)
print(f"恢复执行。field_1 的值现在是: {val}")
# ... 后续代码继续 ...
# 可能稍后再产出另一个事件 ...
// Agent.runAsync、回调或工具内部逻辑简化视图
// ... 之前的代码基于当前状态运行 ...
// 1. 决定需要变更或输出,构造事件
// 示例:更新状态
ConcurrentMap<String, Object> updateData = new ConcurrentHashMap<>();
updateData.put("field_1", "value_2");
EventActions actions = EventActions.builder().stateDelta(updateData).build();
Content eventContent = Content.builder().parts(Part.fromText("State updated.")).build();
Event eventWithStateChange = Event.builder()
.author(self.name())
.actions(actions)
.content(Optional.of(eventContent))
// ... 其他事件字段 ...
.build();
// 2. "Yield" 事件。在 RxJava 中,这意味着将其发射到流中。
// Runner(或上游消费者)会订阅此 Flowable。
// Runner 收到该事件后会处理它(如调用 sessionService.appendEvent)。
// Java ADK 的 'appendEvent' 会修改 'ctx'(InvocationContext)中的 'Session' 对象。
// <<<<<<<<<<<< 概念性暂停点 >>>>>>>>>>>>
// 在 RxJava 中,'eventWithStateChange' 的发射发生后,流
// 可能继续用 'flatMap' 或 'concatMap' 操作符表示
// Runner 处理该事件后的后续逻辑。
// 要模拟 "仅在 Runner 处理完事件后恢复执行":
// Runner 的 `appendEvent` 通常本身是异步操作(返回 Single<Event>)。
// 智能体的流程需要结构化,使依赖已提交状态的后续逻辑在 `appendEvent` 完成后运行。
// Runner 通常这样编排:
// Runner:
// agent.runAsync(ctx)
// .concatMapEager(eventFromAgent ->
// sessionService.appendEvent(ctx.session(), eventFromAgent) // 这会更新 ctx.session().state()
// .toFlowable() // 处理后发射事件
// )
// .subscribe(processedEvent -> { /* UI 渲染 processedEvent */ });
// 所以,在智能体自身逻辑中,如果需要在产出事件被处理并且 ctx.session().state() 已更新后做事,
// 这部分逻辑通常在其响应式链的下一个步骤中。
// 本例中,我们发射事件,然后在 Flowable 链的后续操作中模拟 "恢复"。
return Flowable.just(eventWithStateChange) // 步骤 2: 产出事件
.concatMap(yieldedEvent -> {
// <<<<<<<<<<<< RUNNER 概念性处理并提交事件 >>>>>>>>>>>>
// 此时,在真实 Runner 中,ctx.session().appendEvent(yieldedEvent) 已被调用
// 并且 ctx.session().state() 已更新。
// 由于我们在智能体概念逻辑内部建模,
// 假设 Runner 的操作已隐式更新了 'ctx.session()'。
// 3. 恢复执行。
// 现在,Runner(通过 sessionService.appendEvent)提交的状态
// 已可靠反映在 ctx.session().state() 中。
Object val = ctx.session().state().get("field_1");
// 此时 val 保证为 "value_2",因为 Runner 调用的 sessionService.appendEvent
// 已更新 ctx 对象内的 session state。
System.out.println("恢复执行。field_1 的值现在是: " + val);
// ... 后续代码继续 ...
// 如果后续代码需要产出另一个事件,可在此处实现。
这种Runner
和你的执行逻辑之间的协作产出/暂停/恢复循环,通过Event
对象调解,构成了 ADK 运行时的核心。
运行时的关键组件¶
在 ADK 运行时内部,几个组件协同工作以执行智能体调用。了解它们的角色可以清楚地说明事件循环是如何运作的:
-
Runner
¶- 角色: 单次用户查询(
run_async
)的主入口和协调者。 - 功能: 管理整个事件循环,接收执行逻辑产出的事件,与 Services 协作处理并提交事件操作(状态/制品变更),并将处理后的事件转发到上游(如 UI)。它本质上根据产出的事件逐轮驱动对话。(定义在
google.adk.runners.runner
)。
- 角色: 单次用户查询(
-
执行逻辑组件¶
- 角色: 包含你的自定义代码和核心智能体功能的部分。
- 组件:
Agent
(BaseAgent
,LlmAgent
等):你的主要逻辑单元,处理信息并决定操作。它们实现_run_async_impl
方法,产出事件。Tools
(BaseTool
,FunctionTool
,AgentTool
等):智能体(通常是LlmAgent
)用于与外部世界交互或执行特定任务的外部函数或功能。它们执行并返回结果,然后这些结果被包装在事件中。Callbacks
(函数):附加到智能体的用户定义函数(例如,before_agent_callback
,after_model_callback
),挂钩到执行流程中的特定点,可能修改行为或状态,其效果被捕获在事件中。- 功能: 执行实际的思考、计算或外部交互。它们通过产出
Event
对象并暂停直到 Runner 处理它们来传达结果或需求。
-
Event
¶- 角色: 在
Runner
和执行逻辑之间传递的消息。 - 功能: 表示一个原子性事件(用户输入、智能体文本、工具调用/结果、状态变更请求、控制信号)。它携带事件内容和预期副作用(如
actions
里的state_delta
)。
- 角色: 在
-
Services
¶- 角色: 负责管理持久性或共享资源的后端组件。主要由
Runner
在事件处理期间使用。 - 组件:
SessionService
(BaseSessionService
,InMemorySessionService
等):管理Session
对象,包括保存/加载它们,将state_delta
应用到会话状态,以及将事件追加到事件历史
。ArtifactService
(BaseArtifactService
,InMemoryArtifactService
,GcsArtifactService
等):管理二进制制品(Artifacts)数据的存储和检索。虽然save_artifact
是通过执行逻辑期间的上下文调用的,但事件中的artifact_delta
确认了 Runner/SessionService 的操作。MemoryService
(BaseMemoryService
等):(可选)管理用户跨会话的长期语义记忆。- 功能: 提供持久层。
Runner
与它们交互,确保由event.actions
信号传递的更改在执行逻辑恢复之前可靠地存储。
- 角色: 负责管理持久性或共享资源的后端组件。主要由
-
Session
¶- 角色: 保存单次会话用户与应用交互状态和历史的数据容器。
- 功能: 存储当前
state
字典、所有历史events
(事件历史)和相关制品引用。它是交互的主记录,由SessionService
管理。
-
Invocation
¶- 角色: 概念性术语,表示从
Runner
接收单个用户查询到智能体逻辑为该查询产出事件的全过程。 - 功能: 一次调用可能涉及多个智能体运行(如智能体转移或
AgentTool
)、多个 LLM 调用、工具执行和回调执行,所有这些都通过InvocationContext
中的单个invocation_id
联系在一起。
- 角色: 概念性术语,表示从
这些参与者通过事件循环持续交互,处理用户的请求。
工作原理:简化的调用¶
让我们追踪一个典型的用户查询的简化流程,该查询涉及一个调用工具的 LLM 智能体:
步骤分解¶
- 用户输入: 用户发送查询(如"法国的首都是哪里?")。
- Runner 启动:
Runner.run_async
开始。它与SessionService
交互加载相关Session
,并将用户查询作为第一个Event
添加到会话历史。准备好InvocationContext
(ctx
)。 - 智能体执行:
Runner
调用根智能体(如LlmAgent
)的agent.run_async(ctx)
。 - LLM 调用(示例):
Agent_Llm
判断需要信息,可能通过调用工具。它准备 LLM 请求。假设 LLM 决定调用MyTool
。 - 产出 FunctionCall 事件:
Agent_Llm
收到 LLM 的FunctionCall
响应,将其包装为Event(author='Agent_Llm', content=Content(parts=[Part(function_call=...)]))
,并yield
或emit
该事件。 - 智能体暂停:
Agent_Llm
执行在yield
后立即暂停。 - Runner 处理:
Runner
接收 FunctionCall 事件,传递给SessionService
记录到历史。然后 Runner 将事件产出到上游(如用户或应用)。 - 智能体恢复: Runner 通知事件已处理,
Agent_Llm
恢复执行。 - 工具执行:
Agent_Llm
内部流程继续执行请求的MyTool
,调用tool.run_async(...)
。 - 工具返回结果:
MyTool
执行并返回结果(如{'result': 'Paris'}
)。 - 产出 FunctionResponse 事件: 智能体(
Agent_Llm
)将工具结果包装为包含FunctionResponse
部分的Event
(如Event(author='Agent_Llm', content=Content(role='user', parts=[Part(function_response=...)]))
)。如果工具修改了状态(state_delta
)或保存了制品(artifact_delta
),该事件也会包含这些操作。智能体yield
该事件。 - 智能体暂停:
Agent_Llm
再次暂停。 - Runner 处理: Runner 接收 FunctionResponse 事件,传递给
SessionService
应用state_delta
/artifact_delta
并添加到历史。Runner 产出事件到上游。 - 智能体恢复:
Agent_Llm
恢复,知道工具结果和状态变更已提交。 - 最终 LLM 调用(示例):
Agent_Llm
将工具结果返回给 LLM 生成自然语言响应。 - 产出最终文本事件:
Agent_Llm
收到 LLM 的最终文本,将其包装为Event(author='Agent_Llm', content=Content(parts=[Part(text=...)]))
并yield
。 - 智能体暂停:
Agent_Llm
暂停。 - Runner 处理: Runner 接收最终文本事件,传递给
SessionService
记录到历史,并产出到上游(如用户)。这通常被标记为is_final_response()
。 - 智能体恢复并结束:
Agent_Llm
恢复。完成本次调用后,其run_async
生成器结束。 - Runner 完成: Runner 发现智能体生成器已耗尽,结束本次调用的循环。
这种产出/暂停/处理/恢复循环确保状态更改被一致地应用,并且执行逻辑在产出事件后始终操作在最近提交的状态上。
重要的运行时行为¶
了解 ADK 运行时如何处理状态、流式传输和异步操作的几个关键方面对于构建可预测和高效的智能体至关重要。
状态更新和提交时机¶
-
规则: 当你的代码(在智能体、工具或回调中)修改会话状态(例如,
context.state['my_key'] = 'new_value'
)时,这种更改最初在当前的InvocationContext
中本地记录。只有在携带相应state_delta
的Event
被你的代码yield
并随后被Runner
处理后,此更改才保证被持久化(由SessionService
保存)。 -
含义: 在从
yield
恢复后运行的代码可以可靠地假设已产出事件中信号的状态更改已经被提交。
# 智能体逻辑内部(概念性)
# 1. 修改状态
ctx.session.state['status'] = 'processing'
event1 = Event(..., actions=EventActions(state_delta={'status': 'processing'}))
# 2. 产出带 delta 的事件
yield event1
# --- 暂停 --- Runner 处理 event1,SessionService 提交 'status' = 'processing' ---
# 3. 恢复执行
# 现在可以安全依赖已提交的状态
current_status = ctx.session.state['status'] # 保证为 'processing'
print(f"恢复后状态: {current_status}")
// 智能体逻辑内部(概念性)
// ... 之前的代码基于当前状态运行 ...
// 1. 准备状态变更并构造事件
ConcurrentHashMap<String, Object> stateChanges = new ConcurrentHashMap<>();
stateChanges.put("status", "processing");
EventActions actions = EventActions.builder().stateDelta(stateChanges).build();
Content content = Content.builder().parts(Part.fromText("Status update: processing")).build();
Event event1 = Event.builder()
.actions(actions)
// ...
.build();
// 2. 产出带 delta 的事件
return Flowable.just(event1)
.map(
emittedEvent -> {
// --- 概念性暂停 & RUNNER 处理 ---
// 3. 恢复执行(概念性)
// 现在可以安全依赖已提交的状态。
String currentStatus = (String) ctx.session().state().get("status");
System.out.println("恢复后状态(智能体逻辑内): " + currentStatus); // 保证为 'processing'
// 事件本身(event1)被传递。
// 如果智能体后续逻辑还要产出*另一个*事件,
// 可用 concatMap 发射新事件。
return emittedEvent;
});
// ... 智能体后续逻辑可能涉及更多响应式操作
// 或基于已更新的 ctx.session().state() 产出更多事件。
会话状态的"脏读取"¶
- 定义: 虽然提交发生在产出后,但在同一调用内稍后运行的代码,但在状态变更事件实际产出和处理之前,通常可以看到本地的、未提交的更改。这有时被称为"脏读取"。
- 示例:
# before_agent_callback 中的代码
callback_context.state['field_1'] = 'value_1'
# 状态本地设为 'value_1',但 Runner 尚未提交
# ... agent 运行 ...
# *同一次调用*中稍后工具里的代码
# 可读(脏读),但 'value_1' 尚未保证持久化。
val = tool_context.state['field_1'] # 这里 val 很可能是 'value_1'
print(f"工具中的脏读值: {val}")
# 假设携带 state_delta={'field_1': 'value_1'} 的事件
# 在该工具运行后才产出并被 Runner 处理。
// 修改状态 - BeforeAgentCallback 中的代码
// 并将此变更暂存到 callbackContext.eventActions().stateDelta()。
callbackContext.state().put("field_1", "value_1");
// --- agent 运行 ... ---
// --- *同一次调用*中稍后工具里的代码 ---
// 可读(脏读),但 'value_1' 尚未保证持久化。
Object val = toolContext.state().get("field_1"); // 这里 val 很可能是 'value_1'
System.out.println("工具中的脏读值: " + val);
// 假设携带 state_delta={'field_1': 'value_1'} 的事件
// 在该工具运行后才产出并被 Runner 处理。
- 含义:
- 好处: 允许单个复杂步骤内(例如,在下一个 LLM 轮次之前的多个回调或工具调用)的不同部分使用状态进行协调,而无需等待完整的产出/提交循环。
- 警告: 严重依赖脏读取进行关键逻辑可能有风险。如果调用在携带
state_delta
的事件产出并由Runner
处理之前失败,未提交的状态更改将丢失。对于关键状态转换,确保它们与成功处理的事件相关联。
流式 vs. 非流式输出(partial=True
)¶
这主要涉及 LLM 响应的处理方式,特别是当使用流式生成 API 时。
- 流式: LLM 逐个标记或小块生成其响应。
- 框架(通常在
BaseLlmFlow
内)为单个概念性响应产出多个Event
对象。这些事件大多数将具有partial=True
。 Runner
在接收到带有partial=True
的事件时,通常会立即转发到上游(用于 UI 显示),但跳过处理其actions
(如state_delta
)。- 最终,框架为该响应产出一个最终事件,标记为非部分(
partial=False
或通过turn_complete=True
隐式)。 Runner
只完全处理这个最终事件,提交任何相关的state_delta
或artifact_delta
。- 非流式: LLM 一次生成整个响应。框架产出单个标记为非部分的事件,
Runner
完全处理该事件。 - 为什么重要: 确保状态更改基于来自 LLM 的完整响应原子地且只应用一次,同时仍允许 UI 在生成文本时逐步显示。
异步是主要的(run_async
)¶
- 核心设计: ADK 运行时本质上基于异步库(如 Python 的
asyncio
和 Java 的RxJava
)构建,以高效处理并发操作(如等待 LLM 响应或工具执行),避免阻塞。 - 主入口:
Runner.run_async
是执行智能体调用的主要方法。所有核心可运行组件(智能体、特定 flow)内部都使用asynchronous
方法。 - 同步便捷方法(
run
): 同步的Runner.run
方法主要用于便捷(如简单脚本或测试环境)。但其内部通常只是调用Runner.run_async
并为你管理异步事件循环。 - 开发体验: 推荐你将应用(如基于 ADK 的 web 服务器)设计为异步,以获得最佳性能。Python 推荐用
asyncio
,Java 推荐用 RxJava 的响应式编程模型。 - 同步回调/工具: ADK 框架支持工具和回调的异步与同步函数。
- 阻塞 I/O: 对于长时间运行的同步 I/O 操作,框架会尝试防止阻塞。Python ADK 可能用 asyncio.to_thread,Java ADK 通常依赖合适的 RxJava 调度器或阻塞调用包装。
- CPU 密集型任务: 纯 CPU 密集型同步任务在两种环境下都会阻塞其执行线程。
理解这些行为有助于编写更强大的 ADK 应用程序并调试与状态一致性、流式更新和异步执行相关的问题。