Skip to content

ADK 运行时

Supported in ADKPython v0.1.0Typescript v0.2.0Go v0.1.0Java v0.1.0

ADK 运行时 (ADK Runtime) 是在用户交互期间为你的智能体应用程序提供动力的底层引擎。它是一个系统,接收你定义的智能体、工具和回调,并协调它们对用户输入的执行,管理信息流、状态变化以及与外部服务(如 LLM 或存储)的交互。

可以将运行时视为你的智能体应用程序的"引擎"。你定义部件(智能体、工具),而运行时处理它们如何连接并一起运行以满足用户请求。

核心思想:事件循环

ADK 运行时的核心是一个事件循环。这个循环促进了Runner组件与你定义的"执行逻辑"(包括你的智能体、它们进行的 LLM 调用、回调和工具)之间的来回通信。

intro_components.png

简而言之:

  1. Runner 接收用户查询并请求主 Agent 开始处理。
  2. Agent(及其相关逻辑)运行,直到有内容需要报告(如响应、工具调用或状态变更)——此时它会产出发出一个 Event
  3. Runner 接收该 Event,处理相关操作(如通过 Services 保存状态变更),并将事件转发(如发送到用户界面)。
  4. 只有在 Runner 处理完事件后,Agent 的逻辑才会从暂停处恢复,此时它可以看到 Runner 已提交的更改。
  5. 该循环重复,直到智能体对当前用户查询不再产出事件。

这种事件驱动的循环是管理 ADK 如何执行你的智能体代码的基本模式。

心跳:事件循环 - 内部工作原理

事件循环是定义Runner和你的自定义代码(智能体、工具、回调,统称为"执行逻辑"或"逻辑组件")之间交互的核心操作模式。它建立了明确的责任划分:

Note

特定的方法名称和参数名称可能因 SDK 语言而略有不同(例如,Python 中的 agent.run_async(...),Go 中的 agent.Run(...),Java 和 TypeScript 中的 agent.runAsync(...))。详情请参阅特定语言的 API 文档。

Runner 的角色(协调者)

Runner作为单个用户调用的中央协调器。其在循环中的责任包括:

  1. 启动: 接收终端用户的查询(new_message),通常通过 SessionService 将其追加到会话历史。
  2. 启动事件流: 通过调用主智能体的执行方法(如 agent_to_run.run_async(...))开始事件生成过程。
  3. 接收与处理: 等待智能体逻辑 yieldemit 一个 Event。收到事件后,Runner 立即处理它。这包括:
    • 使用配置的 ServicesSessionServiceArtifactServiceMemoryService)提交 event.actions 指示的更改(如 state_deltaartifact_delta)。
    • 执行其他内部记录。
  4. 向上游产出: 将处理后的事件转发(如发送到应用或 UI 渲染)。
  5. 迭代: 通知智能体逻辑已完成当前事件的处理,允许其恢复并生成下一个事件。

概念性 Runner 循环:

# Runner 主循环逻辑简化视图
def run(new_query, ...) -> Generator[Event]:
    # 1. 通过 SessionService 将 new_query 追加到会话事件历史
    session_service.append_event(session, Event(author='user', content=new_query))

    # 2. Kick off event loop by calling the agent
    agent_event_generator = agent_to_run.run_async(context)

    async for event in agent_event_generator:
        # 3. Process the generated event and commit changes
        session_service.append_event(session, event) # Commits state/artifact deltas etc.
        # memory_service.update_memory(...) # If applicable
        # artifact_service might have already been called via context during agent run

        # 4. Yield event for upstream processing (e.g., UI rendering)
        yield event
        # Runner 隐式通知 agent generator 可以继续
// Simplified view of Runner's main loop logic
async * runAsync(newQuery: Content, ...): AsyncGenerator<Event, void, void> {
    // 1. Append newQuery to session event history (via SessionService)
    await sessionService.appendEvent({
        session,
        event: createEvent({author: 'user', content: newQuery})
    });

    // 2. Kick off event loop by calling the agent
    const agentEventGenerator = agentToRun.runAsync(context);

    for await (const event of agentEventGenerator) {
        // 3. Process the generated event and commit changes
        // Commits state/artifact deltas etc.
        await sessionService.appendEvent({session, event});
        // memoryService.updateMemory(...) // If applicable
        // artifactService might have already been called via context during agent run

        // 4. Yield event for upstream processing (e.g., UI rendering)
        yield event;
        // Runner implicitly signals agent generator can continue after yielding
    }
}
// Go 中 Runner 主循环逻辑的简化概念视图
func (r *Runner) RunConceptual(ctx context.Context, session *session.Session, newQuery *genai.Content) iter.Seq2[*Event, error] {
    return func(yield func(*Event, error) bool) {
        // 1. 将 new_query 追加到会话事件历史(通过 SessionService)
        // ...
        userEvent := session.NewEvent(ctx.InvocationID()) // 简化用于概念视图
        userEvent.Author = "user"
        userEvent.LLMResponse = model.LLMResponse{Content: newQuery}

        if _, err := r.sessionService.Append(ctx, &session.AppendRequest{Event: userEvent}); err != nil {
            yield(nil, err)
            return
        }

        // 2. 通过调用智能体启动事件流
        // 假设 agent.Run 也返回 iter.Seq2[*Event, error]
        agentEventsAndErrs := r.agent.Run(ctx, &agent.RunRequest{Session: session, Input: newQuery})

        for event, err := range agentEventsAndErrs {
            if err != nil {
                if !yield(event, err) { // 即使有错误也产出事件,然后停止
                    return
                }
                return // 智能体以错误结束
            }

            // 3. 处理生成的事件并提交更改
            // 仅将非部分事件提交到会话服务(如实际代码所示)
            if !event.LLMResponse.Partial {
                if _, err := r.sessionService.Append(ctx, &session.AppendRequest{Event: event}); err != nil {
                    yield(nil, err)
                    return
                }
            }
            // memory_service.update_memory(...) // 如果适用
            // artifact_service 可能已在智能体运行期间通过上下文调用

            // 4. 产出事件以供上游处理
            if !yield(event, nil) {
                return // 上游消费者停止
            }
        }
        // 智能体成功完成
    }
}
// Simplified conceptual view of the Runner's main loop logic in Java.
public Flowable<Event> runConceptual(
    Session session,
    InvocationContext invocationContext,
    Content newQuery
    ) {

    // 1. Append new_query to session event history (via SessionService)
    // ...
    sessionService.appendEvent(session, userEvent).blockingGet();

    // 2. Kick off event stream by calling the agent
    Flowable<Event> agentEventStream = agentToRun.runAsync(invocationContext);

    // 3. Process each generated event, commit changes, and "yield" or "emit"
    return agentEventStream.map(event -> {
        // This mutates the session object (adds event, applies stateDelta).
        // The return value of appendEvent (a Single<Event>) is conceptually
        // just the event itself after processing.
        sessionService.appendEvent(session, event).blockingGet(); // Simplified blocking call

        // memory_service.update_memory(...) // If applicable - conceptual
        // artifact_service might have already been called via context during agent run

        // 4. "Yield" event for upstream processing
        //    In RxJava, returning the event in map effectively yields it to the next operator or subscriber.
        return event;
    });
}

执行逻辑的角色(智能体、工具、回调)

你在智能体、工具和回调中的代码负责实际的计算和决策。它与循环的交互包括:

  1. 执行: 基于当前 InvocationContext 运行逻辑,包括恢复时的会话状态。
  2. 产出: 当逻辑需要通信(发送消息、调用工具、报告状态变更)时,构造包含相关内容和操作的 Event,然后 yield 该事件给 Runner
  3. 暂停: 关键点在于,智能体逻辑在 yield 语句(或 RxJava 中的 return)后立即暂停。它等待 Runner 完成第 3 步(处理和提交)。
  4. 恢复: 仅当 Runner 处理完产出的事件后,智能体逻辑才从 yield 后的语句恢复。
  5. 看到已更新状态: 恢复后,智能体逻辑可以可靠地访问反映 Runner 已提交更改的会话状态(ctx.session.state)。

概念性执行逻辑:

# Simplified view of logic inside Agent.run_async, callbacks, or tools

# ... previous code runs based on current state ...

# 1. Determine a change or output is needed, construct the event
# Example: Updating state
update_data = {'field_1': 'value_2'}
event_with_state_change = Event(
    author=self.name,
    actions=EventActions(state_delta=update_data),
    content=types.Content(parts=[types.Part(text="State updated.")])
    # ... 其他事件字段 ...
)

# 2. Yield the event to the Runner for processing & commit
yield event_with_state_change
# <<<<<<<<<<<< EXECUTION PAUSES HERE >>>>>>>>>>>>

# <<<<<<<<<<<< RUNNER PROCESSES & COMMITS THE EVENT >>>>>>>>>>>>

# 3. Resume execution ONLY after Runner is done processing the above event.
# Now, the state committed by the Runner is reliably reflected.
# Subsequent code can safely assume the change from the yielded event happened.
val = ctx.session.state['field_1']
# here `val` is guaranteed to be "value_2" (assuming Runner committed successfully)
print(f"Resumed execution. Value of field_1 is now: {val}")

# ... subsequent code continues ...
# Maybe yield another event later...
// Simplified view of logic inside Agent.runAsync, callbacks, or tools

// ... previous code runs based on current state ...

// 1. Determine a change or output is needed, construct the event
// Example: Updating state
const updateData = {'field_1': 'value_2'};
const eventWithStateChange = createEvent({
    author: this.name,
    actions: createEventActions({stateDelta: updateData}),
    content: {parts: [{text: "State updated."}]}
    // ... other event fields ...
});

// 2. Yield the event to the Runner for processing & commit
yield eventWithStateChange;
// <<<<<<<<<<<< EXECUTION PAUSES HERE >>>>>>>>>>>>

// <<<<<<<<<<<< RUNNER PROCESSES & COMMITS THE EVENT >>>>>>>>>>>>

// 3. Resume execution ONLY after Runner is done processing the above event.
// Now, the state committed by the Runner is reliably reflected.
// Subsequent code can safely assume the change from the yielded event happened.
const val = ctx.session.state['field_1'];
// here `val` is guaranteed to be "value_2" (assuming Runner committed successfully)
console.log(`Resumed execution. Value of field_1 is now: ${val}`);

// ... subsequent code continues ...
// Maybe yield another event later...
// Simplified view of logic inside Agent.Run, callbacks, or tools

// ... previous code runs based on current state ...

// 1. Determine a change or output is needed, construct the event
// Example: Updating state
updateData := map[string]interface{}{"field_1": "value_2"}
eventWithStateChange := &Event{
    Author: self.Name(),
    Actions: &EventActions{StateDelta: updateData},
    Content: genai.NewContentFromText("State updated.", "model"),
    // ... other event fields ...
}

// 2. Yield the event to the Runner for processing & commit
// In Go, this is done by sending the event to a channel.
eventsChan <- eventWithStateChange
// <<<<<<<<<<<< EXECUTION PAUSES HERE (conceptually) >>>>>>>>>>>>
// The Runner on the other side of the channel will receive and process the event.
// The agent's goroutine might continue, but the logical flow waits for the next input or step.

// <<<<<<<<<<<< RUNNER PROCESSES & COMMITS THE EVENT >>>>>>>>>>>>

// 3. Resume execution ONLY after Runner is done processing the above event.
// In a real Go implementation, this would likely be handled by the agent receiving
// a new RunRequest or context indicating the next step. The updated state
// would be part of the session object in that new request.
// For this conceptual example, we'll just check the state.
val := ctx.State.Get("field_1")
// here `val` is guaranteed to be "value_2" because the Runner would have
// updated the session state before calling the agent again.
fmt.Printf("Resumed execution. Value of field_1 is now: %v\n", val)

// ... subsequent code continues ...
// Maybe send another event to the channel later...
// Simplified view of logic inside Agent.runAsync, callbacks, or tools
// ... previous code runs based on current state ...

// 1. Determine a change or output is needed, construct the event
// Example: Updating state
ConcurrentMap<String, Object> updateData = new ConcurrentHashMap<>();
updateData.put("field_1", "value_2");

EventActions actions = EventActions.builder().stateDelta(updateData).build();
Content eventContent = Content.builder().parts(Part.fromText("State updated.")).build();

Event eventWithStateChange = Event.builder()
    .author(self.name())
    .actions(actions)
    .content(Optional.of(eventContent))
    // ... 其他事件字段 ...
    .build();

// 2. "Yield" the event. In RxJava, this means emitting it into the stream.
//    The Runner (or upstream consumer) will subscribe to this Flowable.
//    When the Runner receives this event, it will process it (e.g., call sessionService.appendEvent).
//    The 'appendEvent' in Java ADK mutates the 'Session' object held within 'ctx' (InvocationContext).

// <<<<<<<<<<<< CONCEPTUAL PAUSE POINT >>>>>>>>>>>>
// In RxJava, the emission of 'eventWithStateChange' happens, and then the stream
// might continue with a 'flatMap' or 'concatMap' operator that represents
// the logic *after* the Runner has processed this event.

// To model the "resume execution ONLY after Runner is done processing":
// The Runner's `appendEvent` is usually an async operation itself (returns Single<Event>).
// The agent's flow needs to be structured such that subsequent logic
// that depends on the committed state runs *after* that `appendEvent` completes.

// This is how the Runner typically orchestrates it:
// Runner:
//   agent.runAsync(ctx)
//     .concatMapEager(eventFromAgent ->
//         sessionService.appendEvent(ctx.session(), eventFromAgent) // 这会更新 ctx.session().state()
//             .toFlowable() // 处理后发射事件
//     )
//     .subscribe(processedEvent -> { /* UI renders processedEvent */ });

// So, within the agent's own logic, if it needs to do something *after* an event it yielded
// has been processed and its state changes are reflected in ctx.session().state(),
// that subsequent logic would typically be in another step of its reactive chain.

// For this conceptual example, we'll emit the event, and then simulate the "resume"
// as a subsequent operation in the Flowable chain.

return Flowable.just(eventWithStateChange) // Step 2: Yield the event
    .concatMap(yieldedEvent -> {
        // <<<<<<<<<<<< RUNNER CONCEPTUALLY PROCESSES & COMMITS THE EVENT >>>>>>>>>>>>
        // At this point, in a real runner, ctx.session().appendEvent(yieldedEvent) would have been called
        // by the Runner, and ctx.session().state() would be updated.
        // Since we are *inside* the agent's conceptual logic trying to model this,
        // we assume the Runner's action has implicitly updated our 'ctx.session()'.

        // 3. Resume execution.
        // Now, the state committed by the Runner (via sessionService.appendEvent)
        // is reliably reflected in ctx.session().state().
        Object val = ctx.session().state().get("field_1");
        // here `val` is guaranteed to be "value_2" because the `sessionService.appendEvent`
        // called by the Runner would have updated the session state within the `ctx` object.

        System.out.println("Resumed execution. Value of field_1 is now: " + val);

        // ... subsequent code continues ...
        // If this subsequent code needs to yield another event, it would do so here.

这种Runner和你的执行逻辑之间的协作产出/暂停/恢复循环,通过Event对象调解,构成了 ADK 运行时的核心。

运行时的关键组件

在 ADK 运行时内部,几个组件协同工作以执行智能体调用。了解它们的角色可以清楚地说明事件循环是如何运作的:

  1. Runner

    • 角色: 单次用户查询(run_async)的主入口和协调者。
    • 功能: 管理整个事件循环,接收执行逻辑产出的事件,与 Services 协作处理并提交事件操作(状态/制品变更),并将处理后的事件转发到上游(如 UI)。它本质上根据产出的事件逐轮驱动对话。(定义在 google.adk.runners.runner)。
  2. 执行逻辑组件

    • 角色: 包含你的自定义代码和核心智能体功能的部分。
    • 组件:
    • AgentBaseAgentLlmAgent等):你的主要逻辑单元,处理信息并决定操作。它们实现_run_async_impl方法,产出事件。
    • ToolsBaseToolFunctionToolAgentTool等):智能体(通常是LlmAgent)用于与外部世界交互或执行特定任务的外部函数或功能。它们执行并返回结果,然后这些结果被包装在事件中。
    • Callbacks(函数):附加到智能体的用户定义函数(例如,before_agent_callbackafter_model_callback),挂钩到执行流程中的特定点,可能修改行为或状态,其效果被捕获在事件中。
    • 功能: 执行实际的思考、计算或外部交互。它们通过产出Event对象并暂停直到 Runner 处理它们来传达结果或需求。
  3. Event

    • 角色:Runner 和执行逻辑之间传递的消息。
    • 功能: 表示一个原子性事件(用户输入、智能体文本、工具调用/结果、状态变更请求、控制信号)。它携带事件内容和预期副作用(如 actions 里的 state_delta)。
  4. Services

    • 角色: 负责管理持久性或共享资源的后端组件。主要由Runner在事件处理期间使用。
    • 组件:
    • SessionServiceBaseSessionServiceInMemorySessionService等):管理Session对象,包括保存/加载它们,将state_delta应用到会话状态,以及将事件追加到事件历史
    • ArtifactServiceBaseArtifactServiceInMemoryArtifactServiceGcsArtifactService等):管理二进制制品(Artifacts)数据的存储和检索。虽然save_artifact是通过执行逻辑期间的上下文调用的,但事件中的artifact_delta确认了 Runner/SessionService 的操作。
    • MemoryServiceBaseMemoryService等):(可选)管理用户跨会话的长期语义记忆。
    • 功能: 提供持久层。Runner与它们交互,确保由event.actions信号传递的更改在执行逻辑恢复之前可靠地存储。
  5. Session

    • 角色: 保存单次会话用户与应用交互状态和历史的数据容器。
    • 功能: 存储当前 state 字典、所有历史 events(事件历史)和相关制品引用。它是交互的主记录,由 SessionService 管理。
  6. Invocation

    • 角色: 概念性术语,表示从Runner接收单个用户查询到智能体逻辑为该查询产出事件的全过程。
    • 功能: 一次调用可能涉及多个智能体运行(如智能体转移或AgentTool)、多个 LLM 调用、工具执行和回调执行,所有这些都通过InvocationContext中的单个invocation_id联系在一起。

这些参与者通过事件循环持续交互,处理用户的请求。

工作原理:简化的调用

让我们追踪一个典型的用户查询的简化流程,该查询涉及一个调用工具的 LLM 智能体:

intro_components.png

步骤分解

  1. 用户输入: 用户发送查询(如"法国的首都是哪里?")。
  2. Runner 启动: Runner.run_async 开始。它与 SessionService 交互加载相关 Session,并将用户查询作为第一个 Event 添加到会话历史。准备好 InvocationContextctx)。
  3. 智能体执行: Runner 调用根智能体(如 LlmAgent)的 agent.run_async(ctx)
  4. LLM 调用(示例): Agent_Llm 判断需要信息,可能通过调用工具。它准备 LLM 请求。假设 LLM 决定调用 MyTool
  5. 产出 FunctionCall 事件: Agent_Llm 收到 LLM 的 FunctionCall 响应,将其包装为 Event(author='Agent_Llm', content=Content(parts=[Part(function_call=...)])),并 yieldemit 该事件。
  6. 智能体暂停: Agent_Llm 执行在 yield 后立即暂停。
  7. Runner 处理: Runner 接收 FunctionCall 事件,传递给 SessionService 记录到历史。然后 Runner 将事件产出到上游(如用户或应用)。
  8. 智能体恢复: Runner 通知事件已处理,Agent_Llm 恢复执行。
  9. 工具执行: Agent_Llm 内部流程继续执行请求的 MyTool,调用 tool.run_async(...)
  10. 工具返回结果: MyTool 执行并返回结果(如 {'result': 'Paris'})。
  11. 产出 FunctionResponse 事件: 智能体(Agent_Llm)将工具结果包装为包含 FunctionResponse 部分的 Event(如 Event(author='Agent_Llm', content=Content(role='user', parts=[Part(function_response=...)])))。如果工具修改了状态(state_delta)或保存了制品(artifact_delta),该事件也会包含这些操作。智能体 yield 该事件。
  12. 智能体暂停: Agent_Llm 再次暂停。
  13. Runner 处理: Runner 接收 FunctionResponse 事件,传递给 SessionService 应用 state_delta/artifact_delta 并添加到历史。Runner 产出事件到上游。
  14. 智能体恢复: Agent_Llm 恢复,知道工具结果和状态变更已提交。
  15. 最终 LLM 调用(示例): Agent_Llm 将工具结果返回给 LLM 生成自然语言响应。
  16. 产出最终文本事件: Agent_Llm 收到 LLM 的最终文本,将其包装为 Event(author='Agent_Llm', content=Content(parts=[Part(text=...)]))yield
  17. 智能体暂停: Agent_Llm 暂停。
  18. Runner 处理: Runner 接收最终文本事件,传递给 SessionService 记录到历史,并产出到上游(如用户)。这通常被标记为 is_final_response()
  19. 智能体恢复并结束: Agent_Llm 恢复。完成本次调用后,其 run_async 生成器结束。
  20. Runner 完成: Runner 发现智能体生成器已耗尽,结束本次调用的循环。

这种产出/暂停/处理/恢复循环确保状态更改被一致地应用,并且执行逻辑在产出事件后始终操作在最近提交的状态上。

重要的运行时行为

了解 ADK 运行时如何处理状态、流式传输和异步操作的几个关键方面对于构建可预测和高效的智能体至关重要。

状态更新和提交时机

  • 规则: 当你的代码(在智能体、工具或回调中)修改会话状态(例如,context.state['my_key'] = 'new_value')时,这种更改最初在当前的InvocationContext中本地记录。只有在携带相应state_deltaEvent被你的代码yield并随后被Runner处理后,此更改才保证被持久化(由SessionService保存)。

  • 含义: 在从yield恢复后运行的代码可以可靠地假设已产出事件中信号的状态更改已经被提交。

# Inside agent logic (conceptual)

# 1. Modify state
ctx.session.state['status'] = 'processing'
event1 = Event(..., actions=EventActions(state_delta={'status': 'processing'}))

# 2. Yield event with the delta
yield event1
# --- PAUSE --- Runner processes event1, SessionService commits 'status' = 'processing' ---

# 3. Resume execution
# Now it's safe to rely on the committed state
current_status = ctx.session.state['status'] # Guaranteed to be 'processing'
print(f"Status after resuming: {current_status}")
// Inside agent logic (conceptual)

// 1. Modify state
// In TypeScript, you modify state via the context, which tracks the change.
ctx.state.set('status', 'processing');
// The framework will automatically populate actions with the state
// delta from the context. For illustration, it's shown here.
const event1 = createEvent({
    actions: createEventActions({stateDelta: {'status': 'processing'}}),
    // ... other event fields
});

// 2. 产出带有增量的事件
yield event1;
// --- 暂停 --- Runner 处理 event1,SessionService 提交 'status' = 'processing' ---

// 3. 恢复执行
// 现在可以安全地依赖会话对象中的已提交状态。
const currentStatus = ctx.session.state['status']; // Guaranteed to be 'processing'
console.log(`Status after resuming: ${currentStatus}`);

// 智能体逻辑内部(概念性)

func (a Agent) RunConceptual(ctx agent.InvocationContext) iter.Seq2[session.Event, error] { // 整个逻辑被包装在一个将作为迭代器返回的函数中。 return func(yield func(*session.Event, error) bool) { // ... 之前的代码根据输入 ctx 中的当前状态运行 ... // 例如,val := ctx.State().Get("field_1") 在这里可能返回 "value_1"。

  // 1. 确定需要更改或输出,构造事件
  // 示例:更新状态
  updateData := map[string]interface{}{"field_1": "value_2"}
  eventWithStateChange := session.NewEvent(ctx.InvocationID())
  eventWithStateChange.Author = a.Name()
  eventWithStateChange.Actions = &session.EventActions{StateDelta: updateData}
  // ... 其他事件字段 ...

  // 2. 产出事件给 Runner 进行处理和提交。
  // 智能体的执行在此调用后立即继续。
  if !yield(eventWithStateChange, nil) {
      // 如果 yield 返回 false,则表示消费者(Runner)
      // 已停止监听,因此我们应该停止产出事件。
      return
  }

  // <<<<<<<<<<<< RUNNER 处理并提交事件 >>>>>>>>>>>>
  // 这发生在智能体外部,在智能体的迭代器
  // 产出事件之后。

  // 3. 智能体无法立即看到它刚刚产出的状态更改。
  // 在单个 `Run` 调用中,状态是不可变的。
  // 这里的 `val` 仍然是 "value_1"(或开始时的任何值)。
  // 更新后的状态("value_2")将仅在 `ctx` 中可用
  // 在后续轮次的*下一个* `Run` 调用中。
  val := ctx.State().Get("field_1")
  // 这里的 `val` 仍然是 "value_1"(或开始时的任何值)。
  // 更新后的状态("value_2")将仅在 `ctx` 中可用
  // 在后续轮次的*下一个* `Run` 调用中。
  fmt.Printf("Resumed execution. Value of field_1 is now: %v\n", val)

  // ... 后续代码继续,可能会产出更多事件 ...
  finalEvent := session.NewEvent(ctx.InvocationID())
  finalEvent.Author = a.Name()
  // ...
  yield(finalEvent, nil)

} }

// Inside agent logic (conceptual)
// ... previous code runs based on current state ...

// 1. Prepare state modification and construct the event
ConcurrentHashMap<String, Object> stateChanges = new ConcurrentHashMap<>();
stateChanges.put("status", "processing");

EventActions actions = EventActions.builder().stateDelta(stateChanges).build();
Content content = Content.builder().parts(Part.fromText("Status update: processing")).build();

Event event1 = Event.builder()
    .actions(actions)
    // ...
    .build();

// 2. 产出带有增量的事件
return Flowable.just(event1)
    .map(
        emittedEvent -> {
            // --- 概念性暂停和 RUNNER 处理 ---
            // 3. 恢复执行(概念上)
            // 现在可以安全地依赖已提交的状态。
            String currentStatus = (String) ctx.session().state().get("status");
            System.out.println("Status after resuming (inside agent logic): " + currentStatus); // Guaranteed to be 'processing'

            // The event itself (event1) is passed on.
            // If subsequent logic within this agent step produced *another* event,
            // you'd use concatMap to emit that new event.
            return emittedEvent;
        });

// ... subsequent agent logic might involve further reactive operators
// or emitting more events based on the now-updated `ctx.session().state()`.

"Dirty Reads" of Session State

会话状态的"脏读取"

  • 定义: 虽然提交发生在产出后,但在同一调用内稍后运行的代码,但在状态变更事件实际产出和处理之前,通常可以看到本地的、未提交的更改。这有时被称为"脏读取"。
  • 示例:
# before_agent_callback 中的代码
callback_context.state['field_1'] = 'value_1'
# State is locally set to 'value_1', but not yet committed by Runner

# ... agent runs ...

# Code in a tool called later *within the same invocation*
# Readable (dirty read), but 'value_1' isn't guaranteed persistent yet.
val = tool_context.state['field_1'] # 'val' will likely be 'value_1' here
print(f"Dirty read value in tool: {val}")

# Assume the event carrying the state_delta={'field_1': 'value_1'}
# is yielded *after* this tool runs and is processed by the Runner.
// Code in beforeAgentCallback
callbackContext.state.set('field_1', 'value_1');
// State is locally set to 'value_1', but not yet committed by Runner

// --- agent runs ... ---

// --- Code in a tool called later *within the same invocation* ---
// Readable (dirty read), but 'value_1' isn't guaranteed persistent yet.
const val = toolContext.state.get('field_1'); // 'val' will likely be 'value_1' here
console.log(`Dirty read value in tool: ${val}`);

// Assume the event carrying the state_delta={'field_1': 'value_1'}
// is yielded *after* this tool runs and is processed by the Runner.

// before_agent_callback 中的代码 // 回调将直接修改上下文的会话状态。 // 此更改是当前调用上下文的本地更改。 ctx.State.Set("field_1", "value_1") // 状态在本地设置为 'value_1',但尚未由 Runner 提交

// ... 智能体运行 ...

// 在同一次调用中稍后调用的工具中的代码 // 可读(脏读),但 'value_1' 尚未保证持久化。 val := ctx.State().Get("field_1") // 这里的 'val' 很可能是 'value_1' fmt.Printf("工具中的脏读值: %v\n", val)

// 假设携带 state_delta={'field_1': 'value_1'} 的事件 // 在此工具运行后才产出并被 Runner 处理。

// Modify state - Code in BeforeAgentCallback
// AND stages this change in callbackContext.eventActions().stateDelta().
callbackContext.state().put("field_1", "value_1");

// --- agent runs ... ---

// --- Code in a tool called later *within the same invocation* ---
// Readable (dirty read), but 'value_1' isn't guaranteed persistent yet.
Object val = toolContext.state().get("field_1"); // 'val' will likely be 'value_1' here
System.out.println("Dirty read value in tool: " + val);
// Assume the event carrying the state_delta={'field_1': 'value_1'}
// is yielded *after* this tool runs and is processed by the Runner.
  • Implications:
  • 好处: 允许在单个复杂步骤中的不同逻辑部分(例如,下次 LLM 轮次之前的多个回调或工具调用)使用状态进行协调,而无需等待完整的产出/提交周期。
  • 注意事项: 过度依赖脏读进行关键逻辑可能存在风险。如果调用在 Runner 产出并处理携带 state_delta 的事件之前失败,则未提交的状态更改将丢失。对于关键状态转换,请确保它们与成功处理的事件相关联。

流式 vs. 非流式输出(partial=True

这主要涉及 LLM 响应的处理方式,特别是当使用流式生成 API 时。

  • 流式: LLM 逐个标记或小块生成其响应。
  • 框架(通常在BaseLlmFlow内)为单个概念性响应产出多个Event对象。这些事件大多数将具有partial=True
  • Runner在接收到带有partial=True的事件时,通常会立即转发到上游(用于 UI 显示),但跳过处理其actions(如state_delta)。
  • 最终,框架为该响应产出一个最终事件,标记为非部分(partial=False或通过turn_complete=True隐式)。
  • Runner只完全处理这个最终事件,提交任何相关的state_deltaartifact_delta
  • 非流式: LLM 一次生成整个响应。框架产出单个标记为非部分的事件,Runner完全处理该事件。
  • 为什么重要: 确保状态更改基于来自 LLM 的完整响应原子地且只应用一次,同时仍允许 UI 在生成文本时逐步显示。

异步是主要的(run_async

  • 核心设计: ADK 运行时从根本上建立在异步模式和库(如 Python 的 asyncio、Java 的 RxJava 以及 TypeScript 中的原生 PromiseAsyncGenerator)之上,以高效地处理并发操作(如等待 LLM 响应或工具执行),而不会阻塞。
  • 主要入口点: Runner.run_async 是执行智能体调用的主要方法。所有核心可运行组件(智能体、特定流)在内部使用 asynchronous 方法。
  • 同步便捷性(run): 同步的 Runner.run 方法主要为了方便(例如,在简单脚本或测试环境中)。然而,在内部,Runner.run 通常只是调用 Runner.run_async 并为你管理异步事件循环执行。
  • 开发者体验: 我们建议将你的应用程序(例如,使用 ADK 的 Web 服务器)设计为异步以获得最佳性能。在 Python 中,这意味着使用 asyncio;在 Java 中,利用 RxJava 的响应式编程模型;在 TypeScript 中,这意味着使用原生 PromiseAsyncGenerator 构建。
  • 同步回调/工具: ADK 框架同时支持工具和回调的异步和同步函数。
    • 阻塞 I/O: 对于长时间运行的同步 I/O 操作,框架尝试防止停滞。Python ADK 可能使用 asyncio.to_thread,而 Java ADK 通常依赖适当的 RxJava 调度器或阻塞调用的包装器。在 TypeScript 中,框架简单地等待函数;如果同步函数执行阻塞 I/O,它将使事件循环停滞。开发者应尽可能使用异步 I/O API(返回 Promise)。
    • CPU 密集型工作: 纯 CPU 密集型同步任务仍将在两个环境中阻塞其执行线程。

理解这些行为有助于编写更强大的 ADK 应用程序并调试与状态一致性、流式更新和异步执行相关的问题。