Skip to content

第 3 部分:使用 run_live() 进行事件处理

run_live() 方法是 ADK 用于流式对话的主要入口点,实现了一个异步生成器,随着对话展开产生事件。本部分侧重于理解和处理这些事件——这是在你的应用程序、用户和 AI 模型之间实现实时交互的核心通信机制。

你将学习如何处理不同的事件类型(文本、音频、转录、工具调用),使用中断和回合完成信号管理对话流,为网络传输序列化事件,以及利用 ADK 的自动工具执行。理解事件处理对于构建对用户来说感觉自然和实时的响应式流式应用程序至关重要。

需要异步上下文

所有 run_live() 代码都需要异步上下文。有关详细信息和生产示例,请参阅 第 1 部分:FastAPI 应用示例

run_live() 的工作原理

run_live() 是一个异步生成器,实时流式传输对话事件。它在生成时立即产生事件——没有缓冲,没有轮询,没有回调。事件在没有内部缓冲的情况下流式传输。总体内存取决于会话持久性(例如,记忆中与数据库),使其适用于快速交流和延长会话。

方法签名和流程

用法:

源引用: runners.py
# 方法签名揭示了周到的设计
async def run_live(
    self,
    *,                                      # 仅关键字参数
    user_id: Optional[str] = None,          # 用户标识(除非提供会话,否则为必需)
    session_id: Optional[str] = None,       # 会话跟踪(除非提供会话,否则为必需)
    live_request_queue: LiveRequestQueue,   # 双向通信通道
    run_config: Optional[RunConfig] = None, # 流式处理行为配置
    session: Optional[Session] = None,      # 已弃用:改用 user_id 和 session_id
) -> AsyncGenerator[Event, None]:           # 产生对话事件的生成器

正如其签名所示,每个流式会话都需要身份(user_id)、连续性(session_id)、通信(live_request_queue)和配置(run_config)。返回类型——事件的异步生成器——承诺实时传输而不会压倒系统资源。

sequenceDiagram
participant Client as 客户端
participant Runner
participant Agent as 智能体
participant LLMFlow
participant Gemini

Client->>Runner: runner.run_live(user_id, session_id, queue, config)
Runner->>Agent: agent.run_live(context)
Agent->>LLMFlow: _llm_flow.run_live(context)
LLMFlow->>Gemini: "连接并流式传输"

loop "持续流式传输"
    Gemini-->>LLMFlow: LlmResponse
    LLMFlow-->>Agent: Event
    Agent-->>Runner: Event
    Runner-->>Client: Event (yield)
end

Basic Usage Pattern

消费来自 run_live() 的事件的最简单方法是使用 for 循环迭代异步生成器:

Demo implementation: main.py:225-233
async for event in runner.run_live(
    user_id=user_id,
    session_id=session_id,
    live_request_queue=live_request_queue,
    run_config=run_config
):
    event_json = event.model_dump_json(exclude_none=True, by_alias=True)
    logger.debug(f"[SERVER] Event: {event_json}")
    await websocket.send_text(event_json)

会话标识符

user_idsession_id 都必须与你通过 SessionService.create_session() 创建会话时使用的标识符匹配。这些可以是基于你的应用程序需求的任何字符串值(例如,UUID、电子邮件地址、自定义令牌)。有关会话标识符的详细指导,请参阅 第 1 部分:获取或创建会话

run_live() 中的连接生命周期

run_live() 方法自动管理底层 Live API 连接生命周期:

连接状态: 1. 初始化:调用 run_live() 时建立连接 2. 活动流式传输:通过 LiveRequestQueue(上游到模型)和 run_live()(下游来自模型)进行双向通信 3. 优雅关闭:调用 LiveRequestQueue.close() 时连接关闭 4. 错误恢复:ADK 支持透明会话恢复;通过 RunConfig.session_resumption 启用以处理临时故障。详情请参阅 第 4 部分:Live API 会话恢复

run_live() 产生什么

run_live() 方法实时产生 Event 对象流,因为智能体处理用户输入并生成响应。了解不同的事件类型有助于你构建响应式的 UI,适当地处理文本、音频、转录、工具调用、元数据和错误。每种事件类型在下面章节中有详细解释。

事件类型 描述
文本事件 使用 response_modalities=["TEXT"] 时的模型文本响应;包括用于流式 UI 管理的 partialturn_completeinterrupted 标志
带有内联数据的音频事件 使用 response_modalities=["AUDIO"] 时实时流式传输的原始音频字节 (inline_data);短暂的(不持久化到会话)
带有文件数据的音频事件 聚合到文件中并存储在制品中的音频;包含 file_data 引用而不是原始字节;可以持久化到会话历史记录
元数据事件 用于成本监控和配额跟踪的令牌使用信息 (prompt_token_count, candidates_token_count, total_token_count)
转录事件 RunConfig 中启用转录时,用于用户输入 (input_transcription) 和模型输出 (output_transcription) 的语音转文本
工具调用事件 来自模型的函数调用请求;ADK 自动处理执行
错误事件 带有 error_codeerror_message 字段的模型错误和连接问题

源引用

参见 runners.py 中的完整事件类型处理实现

When run_live() Exits

The run_live() event loop can exit under various conditions. Understanding these exit scenarios is crucial for proper resource cleanup and error handling:

Exit Condition Trigger Graceful? Description
手动关闭 live_request_queue.close() ✅ 是 用户显式关闭队列,发送 LiveRequest(close=True) 信号
所有智能体完成 SequentialAgent 中的最后一个智能体调用 task_completed() ✅ 是 在所有顺序智能体完成任务后
会话超时 达到 Live API 持续时间限制 ⚠️ 连接关闭 会话超过最大持续时间(见下面的限制)
提前退出 设置 end_invocation 标志 ✅ 是 在预处理期间或由工具/回调设置以提前终止
空事件 队列关闭信号 ✅ 是 指示事件流已结束的内部信号
错误 连接错误,异常 ❌ 否 未处理的异常或连接故障

SequentialAgent 行为

使用 SequentialAgent 时,task_completed() 函数不会退出你的应用程序的 run_live() 循环。它仅发出当前智能体工作结束的信号,触发序列中下一个智能体的无缝转换。你的事件循环继续接收来自后续智能体的事件。循环仅在序列中的最后一个智能体完成时退出。

了解更多

有关会话恢复和连接恢复的详细信息,请参见 第 4 部分:Live API 会话恢复。有关多智能体工作流,请参见 多智能体工作流最佳实践

保存到 ADK Session 的事件

并非 run_live() 产生的所有事件都持久化到 ADK Session。当 run_live() 退出时,只有某些事件保存到会话,而其他事件仍然是短暂的。了解哪些事件被保存与哪些是短暂的对于使用会话持久性、恢复或需要回顾对话历史的应用程序至关重要。

源引用

参见 runners.py 中的会话事件持久性逻辑

保存到 ADK Session 的事件:

这些事件持久化到 ADK Session 并在会话历史记录中可用:

  • 带有文件数据的音频事件:仅当 RunConfig.save_live_blobTrue 时保存到 ADK Session;音频数据使用 file_data 引用聚合到制品中的文件中
  • 使用元数据事件:始终保存以跟踪 ADK Session 中的令牌消耗
  • 非部分转录事件:最终转录被保存;部分转录不会持久化
  • 函数调用和响应事件:始终保存以维护工具执行历史记录
  • 其他控制事件:大多数控制事件(例如,turn_completefinish_reason)被保存

未保存到 ADK Session 的事件:

这些事件是短暂的,仅在活跃流式处理期间产生给调用者:

  • 带有内联数据的音频事件inline_data 中的原始音频 Blob 数据从未保存到 ADK Session(仅用于实时播放)
  • 部分转录事件:仅用于实时显示;保存最终转录

音频持久化

要将音频对话保存到 ADK Session 以进行回顾或恢复,请启用 RunConfig.save_live_blob = True。这将音频流持久化到制品中。有关配置详细信息,请参见 第 4 部分:save_live_blob

理解事件

事件是 ADK 双向流式系统中的核心通信机制。本节探讨事件的完整生命周期——从它们如何通过多层流水线生成,到启用真正实时交互的并发处理模式,再到中断和回合完成的实际处理。你将了解事件类型(文本、音频、转录、工具调用)、网络传输的序列化策略,以及管理 Gemini Live API 和 Vertex AI Live API 平台流式会话的连接生命周期。

事件类

ADK 的 Event 类是一个 Pydantic 模型,代表流式对话中的所有通信。它扩展了 LlmResponse,并作为模型响应、用户输入、转录和控制信号的统一容器。

源引用

参见 event.py:30-128llm_response.py:28-200 中的事件类实现

关键字段

所有应用程序必需: - content: 包含文本、音频或函数调用为 Content.parts - author: 识别谁创建了事件("user" 或智能体名称) - partial: 区分增量块和完整文本 - turn_complete: 发出何时再次启用用户输入的信号 - interrupted: 指示何时停止渲染当前输出

对于语音/音频应用程序: - input_transcription: 用户的口语(在 RunConfig 中启用时) - output_transcription: 模型的口语(在 RunConfig 中启用时) - content.parts[].inline_data: 播放音频数据

对于工具执行: - content.parts[].function_call: 模型的工具调用请求 - content.parts[].function_response: 工具执行结果 - long_running_tool_ids: 跟踪异步工具执行

用于调试和诊断: - usage_metadata: 令牌计数和计费信息 - cache_metadata: 上下文缓存命中/未命中统计 - finish_reason: 模型为什么停止生成(例如,STOP、MAX_TOKENS、SAFETY) - error_code / error_message: 故障诊断

作者语义

转录事件的作者为 "user";模型响应/事件使用智能体名称作为 author(不是 "model")。详情请参见 事件作者身份

理解事件标识

事件有两个重要的 ID 字段:

  • event.id: 此特定事件的唯一标识符(格式:UUID)。每个事件得到一个新 ID,即使是部分文本块。
  • event.invocation_id: 当前调用中所有事件的共享标识符(格式:"e-" + UUID)。在 run_live() 中,来自单个流式会话的所有事件共享相同的 invocation_id。(有关调用的更多信息,请参见 InvocationContext

用法:

# 此流式处理会话中的所有事件都将具有相同的 invocation_id
async for event in runner.run_live(...):
    print(f"Event ID: {event.id}")              # 每个事件唯一
    print(f"Invocation ID: {event.invocation_id}")  # 会话中的所有事件相同

用例: - event.id:在日志中跟踪单个事件,对事件进行重复数据删除 - event.invocation_id:按对话会话对事件进行分组,过滤特定于会话的事件

事件作者身份

在实时流式处理模式下,Event.author 字段遵循特殊语义以保持对话清晰:

模型响应:由智能体名称(例如,"my_agent")创作,而不是字面字符串 "model"

  • 这启用了多智能体场景,你需要跟踪哪个智能体生成了响应
  • 示例:Event(author="customer_service_agent", content=...)

用户转录:当事件包含转录的用户音频时,作者为 "user"

它是如何工作的

  1. Gemini Live API 返回带有 content.role == 'user' 的用户音频转录
  2. ADK 的 get_author_for_event() 函数检查此角色标记
  3. 如果 content.role == 'user',ADK 将 Event.author 设置为 "user"
  4. 否则,ADK 将 Event.author 设置为智能体名称(例如,"my_agent"

这种转换确保转录的用户输入在你的应用程序的对话历史记录中正确归因于用户,即使它流经模型的响应流。

  • 示例:输入音频转录 → Event(author="user", input_transcription=..., content.role="user")

这为什么重要

  • 在多智能体应用程序中,你可以按智能体过滤事件:events = [e for e in stream if e.author == "my_agent"]
  • 当显示对话历史记录时,使用 event.author 显示谁说了什么
  • 转录事件正确归因于用户,即使它们流经模型

源引用

参见 base_llm_flow.py:292-326 中的作者归因逻辑

事件类型和处理

ADK 通过 runner.run_live() 流式传输不同的事件类型以支持不同的交互模式:用于传统聊天的文本响应、用于语音输出的音频块、用于无障碍和日志记录的转录,以及用于函数执行的工具调用通知。每个事件包括控制 UI 状态转换并启用自然、类人对话流的元数据标志(partialturn_completeinterrupted)。了解如何识别和处理这些事件类型对于构建响应式流式应用程序至关重要。

文本事件

最常见的事件类型,当你在 RunConfig 中将 response_modalities 指定为 ["TEXT"] 模式时,包含模型的文本响应:

用法:

async for event in runner.run_live(...):
    if event.content and event.content.parts:
        if event.content.parts[0].text:
            text = event.content.parts[0].text

            if not event.partial:
                # 你的更新流式显示的逻辑
                update_streaming_display(text)

默认响应模态行为

当未显式设置 response_modalities(即 None)时,ADK 在 run_live() 开始时自动默认为 ["AUDIO"] 模式。这意味着:

  • 如果你不提供 RunConfig:默认为 ["AUDIO"]
  • 如果你提供没有 response_modalities 的 RunConfig:默认为 ["AUDIO"]
  • 如果你显式设置 response_modalities:使用你的设置(不应用默认值)

为什么存在此默认值:某些原生音频模型需要显式设置响应模态。为了确保与所有模型的兼容性,ADK 默认为 ["AUDIO"]

对于纯文本应用程序:始终在 RunConfig 中显式设置 response_modalities=["TEXT"],以避免接收意外的音频事件。

示例:

# 显式文本模式
run_config = RunConfig(
    response_modalities=["TEXT"],
    streaming_mode=StreamingMode.BIDI
)

关键事件标志:

这些标志帮助你在 UI 中管理流式文本显示和对话流:

  • event.partial:流式传输期间增量文本块为 True;完整合并文本为 False
  • event.turn_complete:当模型完成其完整响应时为 True
  • event.interrupted:当用户打断模型的响应时为 True

了解更多

有关使用 partialturn_completeinterrupted 标志管理对话流和 UI 状态的详细指导,请参阅 处理文本事件

音频事件

RunConfig 中的 response_modalities 配置为 ["AUDIO"] 时,模型生成音频输出而不是文本,你将在事件流中接收音频数据:

配置:

# 为音频响应配置 RunConfig
run_config = RunConfig(
    response_modalities=["AUDIO"],
    streaming_mode=StreamingMode.BIDI
)

# 音频作为 inline_data 到达 event.content.parts
async for event in runner.run_live(..., run_config=run_config):
    if event.content and event.content.parts:
        part = event.content.parts[0]
        if part.inline_data:
            # 音频事件结构:
            # part.inline_data.data: 字节(原始 PCM 音频)
            # part.inline_data.mime_type: 字符串(例如,"audio/pcm")
            audio_data = part.inline_data.data
            mime_type = part.inline_data.mime_type

            print(f"Received {len(audio_data)} bytes of {mime_type}")
            # 你的播放音频的逻辑
            await play_audio(audio_data)

了解更多

  • response_modalities 控制模型如何生成输出——你必须为每个会话选择 ["TEXT"] 用于文本响应或 ["AUDIO"] 用于音频响应。你不能同时使用两种模态。有关配置详细信息,请参阅 第 4 部分:响应模态
  • 有关音频格式、发送/接收音频和音频处理流的全面覆盖,请参阅 第 5 部分:如何使用音频、图像和视频

带有文件数据的音频事件

当音频数据聚合并不保存为制品中的文件时,ADK 产生包含 file_data 引用而不是原始 inline_data 的事件。这对于将音频持久化到会话历史记录很有用。

源引用

参见 audio_cache_manager.py:156-178 中的音频文件聚合逻辑

接收音频文件引用:

async for event in runner.run_live(
    user_id=user_id,
    session_id=session_id,
    live_request_queue=queue,
    run_config=run_config
):
    if event.content and event.content.parts:
        for part in event.content.parts:
            if part.file_data:
                # 音频聚合到保存在制品中的文件中
                file_uri = part.file_data.file_uri
                mime_type = part.file_data.mime_type

                print(f"音频文件已保存: {file_uri} ({mime_type})")
                # 从制品服务检索音频文件以进行播放

文件数据与内联数据:

  • 内联数据 (part.inline_data):实时流式传输的原始音频字节;短暂且不保存到会话
  • 文件数据 (part.file_data):存储在制品中的音频文件的引用;可以持久化到会话历史记录

输入和输出音频数据都聚合到音频文件中并保存在制品服务中。文件引用作为 file_data 包含在事件中,允许你稍后检索音频。

会话持久性

要将带文件数据的音频事件保存到会话历史记录,请启用 RunConfig.save_live_blob = True。这允许从持久化会话中回顾或重播音频对话。

元数据事件

使用元数据事件包含用于监控成本和配额消耗的令牌使用信息。run_live() 方法将这些事件与内容事件分开产生。

源引用

参见 llm_response.py:105 中的使用元数据结构

访问令牌使用情况:

async for event in runner.run_live(
    user_id=user_id,
    session_id=session_id,
    live_request_queue=queue,
    run_config=run_config
):
    if event.usage_metadata:
        print(f"Prompt tokens: {event.usage_metadata.prompt_token_count}")
        print(f"Response tokens: {event.usage_metadata.candidates_token_count}")
        print(f"Total tokens: {event.usage_metadata.total_token_count}")

        # 跟踪整个会话的累积使用情况
        total_tokens += event.usage_metadata.total_token_count or 0

可用元数据字段:

  • prompt_token_count:输入(提示和上下文)中的令牌数
  • candidates_token_count:模型响应中的令牌数
  • total_token_count:提示和响应令牌的总和
  • cached_content_token_count:从缓存提供的令牌数(当使用上下文缓存时)

成本监控

使用元数据事件允许在流式处理会话期间进行实时成本跟踪。你可以实施配额限制,向用户显示使用情况,或记录指标以进行计费和分析。

转录事件

当在 RunConfig 中启用转录时,你将作为单独的事件接收转录:

配置:

async for event in runner.run_live(...):
    # 用户的口语(当启用 input_audio_transcription 时)
    if event.input_transcription:
        # 你的显示用户转录的逻辑
        display_user_transcription(event.input_transcription)

    # 模型的口语(当启用 output_audio_transcription 时)
    if event.output_transcription:
        # 你的显示模型转录的逻辑
        display_model_transcription(event.output_transcription)

这些启用了可访问性功能和对话日志记录,而无需单独的转录服务。

了解更多

有关在 RunConfig 中启用转录和理解转录交付的详细信息,请参阅 第 5 部分:音频转录

工具调用事件

当模型请求工具执行时:

用法:

async for event in runner.run_live(...):
    if event.content and event.content.parts:
        for part in event.content.parts:
            if part.function_call:
                # 模型正在请求工具执行
                tool_name = part.function_call.name
                tool_args = part.function_call.args
                # ADK 自动处理执行

ADK 自动处理工具调用——除非实施自定义工具执行逻辑,否则你通常不需要直接处理这些。

了解更多

有关 ADK 如何自动执行工具、处理函数响应以及支持长时间运行和流式工具的详细信息,请参阅 run_live() 中的自动工具执行

错误事件

生产应用程序需要强大的错误处理来优雅地处理模型错误和连接问题。ADK 通过 error_codeerror_message 字段显示错误:

用法:

import logging

logger = logging.getLogger(__name__)

try:
    async for event in runner.run_live(...):
        # 处理来自模型或连接的错误
        if event.error_code:
            logger.error(f"Model error: {event.error_code} - {event.error_message}")

            # 向客户端发送错误通知
            await websocket.send_json({
                "type": "error",
                "code": event.error_code,
                "message": event.error_message
            })

            # 根据错误严重性决定是继续还是中断
            if event.error_code in ["SAFETY", "PROHIBITED_CONTENT", "BLOCKLIST"]:
                # 内容策略违规 - 通常无法重试
                break  # 终端错误 - 退出循环
            elif event.error_code == "MAX_TOKENS":
                # 达到令牌限制 - 可能需要调整配置
                break
            # 对于其他错误,你可能会继续或实施重试逻辑
            continue  # 瞬态错误 - 继续处理

        # 仅在没有错误时进行正常事件处理
        if event.content and event.content.parts:
            # ... 处理内容
            pass
finally:
    queue.close()  # 始终清理连接

Note

上面的示例显示了检查 error_codeerror_message 的基本结构。有关带有用户通知、重试逻辑和上下文日志记录的生产就绪错误处理,请参阅下面的实际场景。

何时使用 breakcontinue

关键决定是:模型的响应能否有意义地继续?

场景 1:内容策略违规(使用 break

你正在构建一个客户支持聊天机器人。用户问了一个触发 SAFETY 过滤器的不当问题:

示例:

if event.error_code in ["SAFETY", "PROHIBITED_CONTENT", "BLOCKLIST"]:
    # 模型已停止生成 - 无法继续
    await websocket.send_json({
        "type": "error",
        "message": "我不能帮助这个请求。请换一个问题。"
    })
    break  # 退出循环 - 模型不会为此回合发送更多事件

为什么 break 模型已终止其响应。此回合不会再有事件到来。继续只会浪费资源等待不会到达的事件。


场景 2:流式传输期间的网络故障(使用 continue

你正在构建一个语音转录服务。在转录过程中,出现了短暂的网络故障:

示例:

if event.error_code == "UNAVAILABLE":
    # 临时网络问题
    logger.warning(f"网络波动: {event.error_message}")
    # 不要为可能自行解决的短暂瞬态问题通知用户
    continue  # 继续监听 - 模型可能会恢复并继续

为什么 continue 这是一个瞬态错误。连接可能会恢复,模型可能会继续流式传输转录。中断会过早结束可能恢复的流。

用户通知

对于短暂的瞬态错误(持续 <1 秒),不要通知用户——他们不会注意到故障。但是,如果错误持续存在或影响用户体验(例如,流式传输暂停 >3 秒),请优雅地通知他们:“遇到连接问题,正在重试...”


场景 3:达到令牌限制(使用 break

你正在生成一篇长篇文章并达到了最大令牌限制:

示例:

if event.error_code == "MAX_TOKENS":
    # 模型已达到输出限制
    await websocket.send_json({
        "type": "complete",
        "message": "响应达到最大长度",
        "truncated": True
    })
    break  # 模型已完成 - 不会再生成更多令牌

为什么 break 模型已达到其输出限制并停止。继续不会产生更多令牌。


场景 4:带有重试逻辑的速率限制(使用 continue 并回退)

你正在运行一个偶尔达到速率限制的高流量应用程序:

示例:

retry_count = 0
max_retries = 3

async for event in runner.run_live(...):
    if event.error_code == "RESOURCE_EXHAUSTED":
        retry_count += 1
        if retry_count > max_retries:
            logger.error("Max retries exceeded")
            break  # 多次失败后放弃

        # 等待并重试
        await asyncio.sleep(2 ** retry_count)  # 指数回退
        continue  # 继续监听 - 速率限制可能会清除

    # 成功事件重置计数器
    retry_count = 0

为什么 continue(最初)? 速率限制通常是暂时的。通过指数回退,流可能会恢复。但在多次失败后,break 以避免无限等待。


决策框架:

错误类型 操作 原因
SAFETY, PROHIBITED_CONTENT break 模型终止响应
MAX_TOKENS break 模型完成生成
UNAVAILABLE, DEADLINE_EXCEEDED continue 瞬态网络/超时问题
RESOURCE_EXHAUSTED (rate limit) continue 并带有重试逻辑 短暂等待后可能会恢复
未知错误 continue(带有日志记录) 谨慎行事

关键:始终使用 finally 进行清理

用法:

try:
    async for event in runner.run_live(...):
        # ... 错误处理 ...
finally:
    queue.close()  # 无论你中断还是正常完成,清理都会运行

无论你 break 还是循环自然结束,finally 都能确保连接正确关闭。

错误代码参考:

ADK 错误代码来自底层 Gemini API。以下是你将遇到的最常见错误代码:

错误代码 类别 描述 推荐操作
SAFETY 内容策略 内容违反安全策略 break - 通知用户,记录事件
PROHIBITED_CONTENT 内容策略 内容包含禁止材料 break - 显示策略违规消息
BLOCKLIST 内容策略 内容匹配阻止列表 break - 警报用户,不要重试
MAX_TOKENS 限制 输出达到最大令牌限制 break - 优雅截断,总结
RESOURCE_EXHAUSTED 速率限制 超过配额或速率限制 continue 并回退 - 延迟后重试
UNAVAILABLE 瞬态 服务暂时不可用 continue - 重试,可能自行解决
DEADLINE_EXCEEDED 瞬态 请求超时 continue - 考虑带有回退的重试
CANCELLED 客户端 客户端取消了请求 break - 清理资源
UNKNOWN 系统 发生未指定的错误 continue 并带有日志记录 - 记录以进行分析

有关完整的错误代码列表和描述,请参阅官方文档:

官方文档

错误处理的最佳实践:

  • 始终首先检查错误:在处理内容之前处理 error_code 以避免处理无效事件
  • 记录带有上下文的错误:在错误日志中包含 session_id 和 user_id 以进行调试
  • 分类错误:区分可重试错误(瞬态故障)和终端错误(内容策略违规)
  • 优雅地通知用户:显示用户友好的错误消息,而不是原始错误代码
  • 实施重试逻辑:对于瞬态错误,考虑使用指数回退自动重试
  • 监控错误率:跟踪错误类型和频率以识别系统性问题
  • 处理内容策略错误:对于 SAFETYPROHIBITED_CONTENTBLOCKLIST 错误,通知用户其内容违反策略

处理文本事件

理解 partialinterruptedturn_complete 标志对于构建响应迅速的流式 UI 至关重要。这些标志使你能够在流式传输期间提供实时反馈,优雅地处理用户中断,并检测对话边界以进行适当的状态管理。

处理 partial

此标志帮助你区分增量文本块和完整合并文本,从而实现带有适当最终确认的平滑流式显示。

用法:

async for event in runner.run_live(...):
    if event.content and event.content.parts:
        if event.content.parts[0].text:
            text = event.content.parts[0].text

            if event.partial:
                # 你的流式 UI 更新逻辑在这里
                update_streaming_display(text)
            else:
                # 你的完整消息显示逻辑在这里
                display_complete_message(text)

partial 标志语义:

  • partial=True:此事件中的文本是增量的——它仅包含自上一个事件以来的新文本
  • partial=False:此事件中的文本是完整的——它包含此响应段的完整合并文本

Note

partial 标志仅对文本内容 (event.content.parts[].text) 有意义。对于其他内容类型:

  • 音频事件inline_data 中的每个音频块都是独立的(不发生合并)
  • 工具调用:函数调用和响应始终是完整的(部分不适用)
  • 转录:转录事件在产生时始终是完整的

示例流:

Event 1: partial=True,  text="Hello",        turn_complete=False
Event 2: partial=True,  text=" world",       turn_complete=False
Event 3: partial=False, text="Hello world",  turn_complete=False
Event 4: partial=False, text="",             turn_complete=True  # 回合完成

重要的时间关系: - partial=False 在一个回合中可能发生多次(例如,在每句话之后) - turn_complete=True 在模型完整响应的最后发生,在一个单独的事件中 - 你可能会收到:partial=False(句子 1)→ partial=False(句子 2)→ turn_complete=True - 合并的文本事件(带有内容的 partial=False)总是 turn_complete=True 事件之前产生

Note

ADK 在内部累积来自 partial=True 事件的所有文本。当你收到 partial=False 的事件时,文本内容等于所有前面的 partial=True 块的总和。这意味着:

  • 如果你不需要流式显示,你可以安全地忽略所有 partial=True 事件并仅处理 partial=False 事件
  • 如果你显示 partial=True 事件,partial=False 事件提供用于验证或存储的完整合并文本
  • 这种累积由 ADK 的 StreamingResponseAggregator 自动处理——你不需要手动连接部分文本块

处理 interrupted 标志

这通过检测用户何时在响应中途打断模型来启用自然对话流,允许你立即停止渲染过时的内容。

当用户在模型仍在生成响应时发送新输入(在语音对话中很常见)时,你将收到 interrupted=True 的事件:

用法:

async for event in runner.run_live(...):
    if event.interrupted:
        # 你的停止显示部分文本并清除打字指示器的逻辑
        stop_streaming_display()

        # 你的在 UI 中显示中断的逻辑(可选)
        show_user_interruption_indicator()

示例 - 中断场景:

Model: "The weather in San Francisco is currently..."
User: [interrupts] "Actually, I meant San Diego"
→ event.interrupted=True received
→ Your app: stop rendering model response, clear UI
→ Model processes new input
Model: "The weather in San Diego is..."

何时使用中断处理:

  • 语音对话:当用户开始说话时立即停止音频播放
  • 清除 UI 状态:删除打字指示器和部分文本显示
  • 对话日志记录:标记哪些响应被中断(不完整)
  • 用户反馈:显示已识别中断的视觉指示

处理 turn_complete 标志

这发出对话边界的信号,允许你更新 UI 状态(启用输入控件,隐藏指示器)并在日志和分析中标记适当的回合边界。

当模型完成其完整响应时,你将收到 turn_complete=True 的事件:

用法:

async for event in runner.run_live(...):
    if event.turn_complete:
        # 你的更新 UI 以显示“准备输入”状态的逻辑
        enable_user_input()
        # 你的隐藏打字指示器的逻辑
        hide_typing_indicator()

        # 你的在日志中标记对话边界的逻辑
        log_turn_boundary()

事件标志组合:

理解 turn_completeinterrupted 如何组合有助于你处理所有对话状态:

场景 turn_complete interrupted 你的应用程序应该
正常完成 True False 启用输入,显示“准备就绪”状态
用户在响应中途打断 False True 停止显示,清除部分内容
在结束时打断 True True 与正常完成相同(回合完成)
响应中途(部分文本) False False 继续显示流式文本

实现:

async for event in runner.run_live(...):
    # 处理流式文本
    if event.content and event.content.parts and event.content.parts[0].text:
        if event.partial:
            # 你的显示打字指示器并更新部分文本的逻辑
            update_streaming_text(event.content.parts[0].text)
        else:
            # Your logic to display complete text chunk
            display_text(event.content.parts[0].text)

    # Handle interruption
    if event.interrupted:
        # Your logic to stop audio playback and clear indicators
        stop_audio_playback()
        clear_streaming_indicators()

    # Handle turn completion
    if event.turn_complete:
        # Your logic to enable user input
        show_input_ready_state()
        enable_microphone()

常见用例:

  • UI 状态管理:显示/隐藏"准备输入"指示器、打字动画、麦克风状态
  • 音频播放控制:知道何时停止渲染来自模型的音频块
  • 对话日志记录:在历史记录/分析中标记回合之间的清晰边界
  • 流式优化:回合完成时停止缓冲

回合完成和缓存: 音频/转录缓存在流式传输期间的特定点自动刷新: - 在回合完成时 (turn_complete=True):用户和模型音频缓存都被刷新 - 在中断时 (interrupted=True):模型音频缓存被刷新 - 在生成完成时:模型音频缓存被刷新

将事件序列化为 JSON

ADK Event 对象是 Pydantic 模型,这意味着它们具有强大的序列化功能。model_dump_json() 方法特别适用于通过 WebSockets 或 Server-Sent Events (SSE) 等网络协议流式传输事件。

使用 event.model_dump_json()

这提供了一个简单的单行代码,将 ADK 事件转换为可以发送到 WebSocket 或 SSE 等网络协议的 JSON 格式。

model_dump_json() 方法将 Event 对象序列化为 JSON 字符串:

Demo implementation: main.py:219-234
async def downstream_task() -> None:
    """Receives Events from run_live() and sends to WebSocket."""
    async for event in runner.run_live(
        user_id=user_id,
        session_id=session_id,
        live_request_queue=live_request_queue,
        run_config=run_config
    ):
        event_json = event.model_dump_json(exclude_none=True, by_alias=True)
        await websocket.send_text(event_json)

序列化什么:

  • 事件元数据(作者,服务器内容字段)
  • 内容(文本,音频数据,函数调用)
  • 事件标志(partial,turn_complete,interrupted)
  • 转录数据(input_transcription,output_transcription)
  • 工具执行信息

何时使用 model_dump_json()

  • ✅ 通过网络流式传输事件(WebSocket,SSE)
  • ✅ 记录/持久化到 JSON 文件
  • ✅ 调试和检查
  • ✅ 与基于 JSON 的 API 集成

何时不使用它:

  • ❌ 内存中处理(直接使用事件对象)
  • ❌ 序列化开销很重要的高频事件
  • ❌ 当你只需要少数字段时(直接提取它们)

性能警告

event.content.parts[].inline_data 中的二进制音频数据在序列化为 JSON 时将进行 base64 编码,显著增加负载大小(~133% 开销)。对于带音频的生产应用程序,使用 WebSocket 二进制帧或 multipart HTTP 单独发送二进制数据。有关详细信息,请参阅 音频传输优化

序列化选项

这允许你通过排除不必要的字段来减小负载大小,提高网络性能和客户端处理速度。

Pydantic 的 model_dump_json() 支持几个有用的参数:

用法:

# 排除 None 值以获得更小的有效载荷(使用 camelCase 字段名)
event_json = event.model_dump_json(exclude_none=True, by_alias=True)

# 自定义排除(例如,跳过大的二进制音频)
event_json = event.model_dump_json(
    exclude={'content': {'parts': {'__all__': {'inline_data'}}}},
    by_alias=True
)

# 仅包含特定字段
event_json = event.model_dump_json(
    include={'content', 'author', 'turn_complete', 'interrupted'},
    by_alias=True
)

# 格式化打印 JSON(用于调试)
event_json = event.model_dump_json(indent=2, by_alias=True)

bidi-demo 使用 exclude_none=True 通过省略值为 None 的字段来最小化有效载荷大小。

客户端反序列化

这展示了如何在客户端解析和处理序列化的事件,启用基于回合完成和中断等事件属性的响应式 UI 更新。

在客户端(JavaScript/TypeScript)上,将 JSON 解析回对象:

Demo implementation: app.js:339-688
// Handle incoming messages
websocket.onmessage = function (event) {
    // 解析传入的 ADK 事件
    const adkEvent = JSON.parse(event.data);

    // 处理回合完成事件
    if (adkEvent.turnComplete === true) {
        // 从当前消息中移除打字指示器
        if (currentBubbleElement) {
            const textElement = currentBubbleElement.querySelector(".bubble-text");
            const typingIndicator = textElement.querySelector(".typing-indicator");
            if (typingIndicator) {
                typingIndicator.remove();
            }
        }
        currentMessageId = null;
        currentBubbleElement = null;
        return;
    }

    // 处理中断事件
    if (adkEvent.interrupted === true) {
        // 如果正在播放,则停止音频播放
        if (audioPlayerNode) {
            audioPlayerNode.port.postMessage({ command: "endOfAudio" });
        }

        // 保留部分消息但标记为中断
        if (currentBubbleElement) {
            const textElement = currentBubbleElement.querySelector(".bubble-text");

            // 移除打字指示器
            const typingIndicator = textElement.querySelector(".typing-indicator");
            if (typingIndicator) {
                typingIndicator.remove();
            }

            // 添加中断标记
            currentBubbleElement.classList.add("interrupted");
        }

        currentMessageId = null;
        currentBubbleElement = null;
        return;
    }

    // 处理内容事件(文本或音频)
    if (adkEvent.content && adkEvent.content.parts) {
        const parts = adkEvent.content.parts;

        for (const part of parts) {
            // 处理文本
            if (part.text) {
                // 为新回合添加新的消息气泡
                if (currentMessageId == null) {
                    currentMessageId = Math.random().toString(36).substring(7);
                    currentBubbleElement = createMessageBubble(part.text, false, true);
                    currentBubbleElement.id = currentMessageId;
                    messagesDiv.appendChild(currentBubbleElement);
                } else {
                    // 使用累积的文本更新现有消息气泡
                    const existingText = currentBubbleElement.querySelector(".bubble-text").textContent;
                    const cleanText = existingText.replace(/\.\.\.$/, '');
                    updateMessageBubble(currentBubbleElement, cleanText + part.text, true);
                }

                scrollToBottom();
            }
        }
    }
};

演示实现

See the complete WebSocket message handler in app.js:339-688

音频传输优化

JSON 中的 Base64 编码二进制音频显著增加负载大小。对于生产应用程序,使用单个 WebSocket 连接,同时使用二进制帧(用于音频)和文本帧(用于元数据):

用法:

async for event in runner.run_live(...):
    # 检查二进制音频
    has_audio = (
        event.content and
        event.content.parts and
        any(p.inline_data for p in event.content.parts)
    )

    if has_audio:
        # 通过二进制 WebSocket 帧发送音频
        for part in event.content.parts:
            if part.inline_data:
                await websocket.send_bytes(part.inline_data.data)

        # 仅发送元数据(小得多)
        metadata_json = event.model_dump_json(
            exclude={'content': {'parts': {'__all__': {'inline_data'}}}},
            by_alias=True
        )
        await websocket.send_text(metadata_json)
    else:
        # 仅文本事件可以作为 JSON 发送
        await websocket.send_text(event.model_dump_json(exclude_none=True, by_alias=True))

这种方法将音频密集流的带宽减少约 75%,同时保持完整的事件元数据。

run_live() 中的自动工具执行

源引用

See automatic tool execution implementation in functions.py

ADK 的 run_live() 最强大的功能之一是自动工具执行。与原始 Gemini Live API 不同,原始 Gemini Live API 要求你手动处理工具调用和响应,而 ADK 完全抽象了这种复杂性。

原始 Live API 的挑战

当直接使用 Gemini Live API(不使用 ADK)时,工具使用需要手动编排:

  1. 接收 来自模型的函数调用
  2. 执行 你自己工具
  3. 格式化 函数响应
  4. 发送 响应回模型

这会产生重大实现开销,尤其是在需要处理多个并发工具调用、管理错误以及与正在进行的音频/文本流协调的流式传输环境中。

ADK 如何简化工具使用

使用 ADK 时,工具执行变成声明式。只需在你的智能体上定义工具:

Demo implementation: agent.py:11-16
import os
from google.adk.agents import Agent
from google.adk.tools import google_search

agent = Agent(
    name="google_search_agent",
    model=os.getenv("DEMO_AGENT_MODEL", "gemini-2.5-flash-native-audio-preview-12-2025"),
    tools=[google_search],
    instruction="你是一个可以搜索网络的有用助手。"
)

当你调用 runner.run_live() 时,ADK 会自动:

  • 检测 模型何时在流式响应中返回函数调用
  • 执行 工具以获得最大性能
  • 处理 自定义逻辑的工具前后回调
  • 格式化 函数响应以符合 Live API 要求
  • 发送 响应回模型
  • 产生 函数调用和响应事件到你的应用程序

工具执行事件

当工具执行时,你将通过 run_live() 异步生成器接收事件:

用法:

async for event in runner.run_live(...):
    # 函数调用事件 - 模型请求工具执行
    if event.get_function_calls():
        print(f"模型调用: {event.get_function_calls()[0].name}")

    # 函数响应事件 - 工具执行结果
    if event.get_function_responses():
        print(f"工具结果: {event.get_function_responses()[0].response}")

你不需要自己处理执行——ADK 会自动处理。你只需观察事件如何流经对话。

了解更多

The bidi-demo sends all events (including function calls and responses) directly to the WebSocket client without server-side filtering. This allows the client to observe tool execution in real-time through the event stream. See the downstream task in main.py:219-234

长时间运行和流式工具

ADK 支持与 run_live() 无缝集成的高级工具模式:

长时间运行的工具:需要人工批准或需要很长时间才能完成的工具。使用 is_long_running=True 标记它们。在可恢复的异步流中,ADK 可以在长时间运行的调用后暂停。在实时流中,流式传输继续;long_running_tool_ids 指示待处理的操作,客户端可以显示适当的 UI。

流式工具:接受 input_stream 参数(类型为 LiveRequestQueue)的工具可以在执行期间向模型发送实时更新,启用渐进式响应。

流式工具如何工作

当你调用 runner.run_live() 时,ADK 在初始化时检查你的智能体工具(runners.py 中的第 828-865 行),通过检查 LiveRequestQueue 的参数类型注释来识别流式工具。

队列创建和生命周期

  1. 创建:ADK 在 run_live() 开始时(在处理任何事件之前)为每个流式工具创建一个 ActiveStreamingTool 和专用的 LiveRequestQueue
  2. 存储:这些队列在调用期间存储在 invocation_context.active_streaming_tools[tool_name]
  3. 注入:当模型调用工具时,ADK 自动将工具的队列注入为 input_stream 参数(function_tool.py 中的第 238-253 行)
  4. 使用:工具可以在执行期间使用此队列向模型发送实时更新
  5. 生命周期:队列在完整 run_live() 调用期间保持(一个 InvocationContext = 一个 run_live() 调用),并在 run_live() 退出时被销毁

队列区别

  • 主队列 (live_request_queue 参数):由你的应用程序创建,用于客户端到模型的通信
  • 工具队列 (active_streaming_tools[tool_name].stream):由 ADK 自动创建,用于执行期间的工具到模型通信

两种队列都是 LiveRequestQueue 实例,但在流式架构中服务于不同目的。

这使工具能够在长时间运行的操作期间提供增量更新、进度通知或部分结果。

代码参考: 有关实现详情,请参见 runners.py:828-865(工具检测)和 function_tool.py:238-253(参数注入)。

有关实现示例,请参见 工具指南

关键要点

原始 Live API 工具使用与 ADK 之间的区别是显著的:

方面 原始 Live API ADK run_live()
工具声明 手动模式定义 来自 Python 函数的自动定义
工具执行 在应用代码中手动处理 自动并行执行
响应格式化 手动 JSON 构造 自动
错误处理 手动 try/catch 和格式化 自动捕获和报告
流式集成 手动协调 自动事件产生
开发人员体验 复杂,容易出错 声明式,简单

这种自动处理是 ADK 的核心价值之一——它将 Live API 工具使用的复杂性转化为简单、声明式的开发人员体验。

InvocationContext:执行状态容器

来源引用

invocation_context.py 中的 InvocationContext 实现

虽然 run_live() 返回一个用于消费事件的异步生成器,但在内部,它创建并管理一个 InvocationContext——ADK 统一状态载体,封装完成完整对话调用所需的一切。一个 InvocationContext 对应一个 run_live() 循环——在你调用 run_live() 时创建,并在完整流式会话期间保持。

将它视为一本伴随对话从开始到结束的笔记本,收集信息,跟踪进度,并在途中为每个组件提供上下文。这是 ADK 上下文概念的运行时实现,在实时对话期间提供执行时状态和服务。有关 ADK 中上下文的更广泛概述,请参见 ADK 中的上下文

什么是调用?

一个 调用 代表一个完整的交互周期: - 从用户输入开始(文本、音频或控制信号) - 可能涉及一个或多个智能体调用 - 在生成最终响应或显式终止时结束 - 由 runner.run_live()runner.run_async() 协调

这与 智能体调用(执行单个智能体的逻辑)和 步骤(单个 LLM 调用加上任何生成的工具执行)不同。

层次结构如下:

   ┌─────────────────────── 调用 ──────────────────────────┐
   ┌──────────── llm_agent_call_1 ────────────┐ ┌─ agent_call_2 ─┐
   ┌──── step_1 ────────┐ ┌───── step_2 ──────┐
   [call_llm] [call_tool] [call_llm] [transfer]

谁使用 InvocationContext?

InvocationContext 在不同层面服务于不同的受众:

  • ADK 的内部组件(主要用户):Runner、Agent、LLMFlow 和 GeminiLlmConnection 都接收、读取和写入 InvocationContext,因为它流经堆栈。这种共享上下文实现了无缝协调,而无需紧密耦合。

  • 应用程序开发人员(间接受益者):你通常不会在应用程序代码中直接创建或操作 InvocationContext。相反,你受益于 InvocationContext 在幕后实现的清晰、简化的 API——例如优雅的 async for event in runner.run_live() 模式。

  • 工具和回调开发人员(直接访问):当你实现自定义工具或回调时,你会收到 InvocationContext 作为参数。这使你可以直接访问对话状态、会话服务和控制标志(如 end_invocation),以实现复杂的行为。

InvocationContext 包含什么

当你实现自定义工具或回调时,你会收到 InvocationContext 作为参数。以下是你可以使用的内容:

工具/回调开发人员的基本字段:

  • context.invocation_id:当前调用标识符(每个 run_live() 调用唯一)
  • context.session
    • context.session.events:会话历史中的所有事件(跨所有调用)
    • context.session.state:用于会话数据的持久键值存储
    • context.session.user_id:用户身份
  • context.run_config:当前流式配置(响应模态、转录设置、成本限制)
  • context.end_invocation:将其设置为 True 可立即终止对话(对于错误处理或策略执行很有用)

工具开发中的示例用例:

# 示例:显示常见 InvocationContext 模式的综合工具实现
def my_tool(context: InvocationContext, query: str):
    # 访问用户身份
    user_id = context.session.user_id

    # 检查这是否是用户的第一条消息
    event_count = len(context.session.events)
    if event_count == 0:
        return "Welcome! This is your first message."

    # 访问对话历史记录
    recent_events = context.session.events[-5:]  # 最后 5 个事件

    # 访问持久会话状态
    # 会话状态跨调用持久存在(不仅仅是此流式会话)
    user_preferences = context.session.state.get('user_preferences', {})

    # 更新会话状态(将被持久化)
    context.session.state['last_query_time'] = datetime.now().isoformat()

    # 访问持久化服务
    if context.artifact_service:
        # 存储大文件/音频
        await context.artifact_service.save_artifact(
            app_name=context.session.app_name,
            user_id=context.session.user_id,
            session_id=context.session.id,
            filename="result.bin",
            artifact=types.Part(inline_data=types.Blob(mime_type="application/octet-stream", data=data)),
        )

    # 使用上下文处理查询
    result = process_query(query, context=recent_events, preferences=user_preferences)

    # 在特定场景下终止对话
    if result.get('error'):
        # 处理错误 - 停止对话
        context.end_invocation = True

    return result

理解 InvocationContext 对于掌握 ADK 如何维护状态、协调执行以及启用多智能体工作流和可恢复性等高级功能至关重要。即使你从未直接接触它,了解流经应用程序的内容也有助于你设计更好的智能体并更有效地调试问题。

多智能体工作流的最佳实践

ADK 的双向流式处理支持三种智能体架构:单智能体(一个智能体处理整个对话)、带子智能体的多智能体(协调器智能体使用 transfer_to_agent 动态路由到专家智能体)和 顺序工作流智能体(智能体使用 task_completed 在固定管道中执行)。本节重点介绍顺序工作流的最佳实践,其中理解智能体转换和状态共享对于流畅的 BIDI 通信至关重要。

了解更多

有关多智能体模式的全面覆盖,请参阅 ADK 文档中的 作为编排器的工作流智能体

在使用 ADK 构建多智能体系统时,理解智能体在实时流式传输期间如何转换和共享状态对于流畅的 BIDI 通信至关重要。

带有 BIDI 流式处理的 SequentialAgent

SequentialAgent 启用工作流管道,其中智能体一个接一个地执行。每个智能体在下一个开始之前完成其任务。实时流式处理的挑战在于确定智能体何时完成了连续音频或视频输入的处理。

源引用

See SequentialAgent implementation in sequential_agent.py:119-158

它是如何工作的:

ADK 自动向序列中的每个智能体添加一个 task_completed() 函数。当模型调用此函数时,它发出完成信号并触发向下一个智能体的转换:

用法:

# SequentialAgent 自动将此工具添加到每个子智能体
def task_completed():
    """
    发出智能体已成功完成用户的问题或任务的信号。
    """
    return 'Task completion signaled.'

关键见解是 智能体转换在同一个 run_live() 事件流中透明地发生。你的应用程序不需要管理转换——只需统一消费事件:

用法:

async def handle_sequential_workflow():
    """带有 BIDI 流式处理的 SequentialAgent 的推荐模式。"""

    # 1. 跨序列中所有智能体共享的单个队列
    queue = LiveRequestQueue()

    # 2. 后台任务持续捕获用户输入
    async def capture_user_input():
        while True:
            # 你的从麦克风读取音频的逻辑
            audio_chunk = await microphone.read()
            queue.send_realtime(
                blob=types.Blob(data=audio_chunk, mime_type="audio/pcm")
            )

    input_task = asyncio.create_task(capture_user_input())

    try:
        # 3. 单个事件循环无缝处理所有智能体
        async for event in runner.run_live(
            user_id="user_123",
            session_id="session_456",
            live_request_queue=queue,
        ):
            # 事件跨智能体转换无缝流动
            current_agent = event.author

            # 处理音频和文本输出
            if event.content and event.content.parts:
                for part in event.content.parts:
                    # 检查音频数据
                    if part.inline_data and part.inline_data.mime_type.startswith("audio/"):
                        # 你的播放音频的逻辑
            await play_audio(part.inline_data.data)

                    # 检查文本数据
                    if part.text:
                        await display_text(f"[{current_agent}] {part.text}")

            # 不需要特殊的转换处理!

    finally:
        input_task.cancel()
        queue.close()

智能体转换期间的事件流

以下是智能体转换时你的应用程序看到的内容:

# 智能体 1(研究员)完成其工作
Event: author="researcher", text="我已经收集了所有数据。"
Event: author="researcher", function_call: task_completed()
Event: author="researcher", function_response: task_completed

# --- 自动转换(对你的代码不可见) ---

# 智能体 2(作家)开始
Event: author="writer", text="让我根据研究写报告..."
Event: author="writer", text=" 研究结果表明..."
Event: author="writer", function_call: task_completed()
Event: author="writer", function_response: task_completed

# --- 自动转换 ---

# 智能体 3(审阅者)开始 - 序列中的最后一个智能体
Event: author="reviewer", text="让我审阅这份报告..."
Event: author="reviewer", text="报告看起来不错。完成!"
Event: author="reviewer", function_call: task_completed()
Event: author="reviewer", function_response: task_completed

# --- 最后一个智能体完成:run_live() 退出 ---
# 你的 async for 循环在这里结束

设计原则

1. 单个事件循环

对序列中的所有智能体使用一个事件循环:

用法:

# ✅ 正确:一个循环处理所有智能体
async for event in runner.run_live(...):
    # 你的事件处理逻辑在这里
    await handle_event(event)  # 适用于智能体 1、智能体 2、智能体 3...

# ❌ 不正确:不要中断循环或创建多个循环
for agent in agents:
    async for event in runner.run_live(...):  # 错误!
        ...

2. 持久队列

同一个 LiveRequestQueue 服务于所有智能体:

# 用户输入流向当前活动的任何智能体
用户说话 → 队列 → 智能体1(研究员)
用户说话 → 队列 → 智能体2(作家)
用户说话 → 队列 → 智能体3(审阅者)

不要为每个智能体创建新队列:

# ❌ 不正确:每个智能体的新队列
for agent in agents:
    new_queue = LiveRequestQueue()  # 错误!

# ✅ 正确:整个工作流的单个队列
queue = LiveRequestQueue()
async for event in runner.run_live(live_request_queue=queue):
    ...

3. 智能体感知 UI(可选)

跟踪哪个智能体处于活动状态以获得更好的用户体验:

用法:

current_agent_name = None

async for event in runner.run_live(...):
    # 检测智能体转换
    if event.author and event.author != current_agent_name:
        current_agent_name = event.author
        # 你的更新 UI 指示器的逻辑
        await update_ui_indicator(f"现在: {current_agent_name}")

    # 你的事件处理逻辑在这里
    await handle_event(event)

4. 转换通知

可选地在智能体移交时通知用户:

用法:

async for event in runner.run_live(...):
    # 检测任务完成(转换信号)
    if event.content and event.content.parts:
        for part in event.content.parts:
            if (part.function_response and
                part.function_response.name == "task_completed"):
                # 你的显示转换通知的逻辑
                await display_notification(
                    f"✓ {event.author} 完成。移交给下一个智能体..."
                )
                continue

    # 你的事件处理逻辑在这里
    await handle_event(event)

关键区别:transfer_to_agent 与 task_completed

理解这两个函数有助于你选择正确的多智能体模式:

函数 智能体模式 run_live() 何时退出 用例
transfer_to_agent 协调器(动态路由) LiveRequestQueue.close() 根据意图将用户路由到专家
task_completed 顺序(管道) LiveRequestQueue.close() 或最后一个智能体的 task_completed 固定工作流:研究 → 写作 → 审阅

transfer_to_agent 示例:

# 协调器根据用户意图路由
用户: "我需要帮助处理账单"
事件: author="coordinator", function_call: transfer_to_agent(agent_name="billing")
# 流继续与账单智能体 - 同一个 run_live() 循环
事件: author="billing", text="我可以帮助你的账单问题..."

task_completed 示例:

# 顺序工作流在管道中进行
事件: author="researcher", function_call: task_completed()
# 当前智能体退出,序列中的下一个智能体开始
事件: author="writer", text="基于研究..."

最佳实践总结

实践 原因
使用单个事件循环 ADK 在内部处理转换
跨智能体保持队列活动 同一个队列服务于所有顺序智能体
跟踪 event.author 知道哪个智能体当前正在响应
不要重置会话/上下文 对话状态跨智能体持久存在
统一处理事件 所有智能体产生相同的事件类型
task_completed 发出转换信号 不要手动管理顺序流

SequentialAgent 设计确保了平滑的转换——你的应用程序只需看到来自不同智能体的连续事件流,并由 ADK 管理自动移交。

总结

在本部分中,你掌握了 ADK 双向流式架构中的事件处理。我们探讨了智能体生成的不同事件类型——文本响应、音频块、转录、工具调用和控制信号——并学习了如何有效地处理每种事件类型。你现在了解了如何处理中断和回合完成信号以实现自然的对话流,使用 Pydantic 的模型序列化为网络传输序列化事件,利用 ADK 的自动工具执行来简化智能体工作流,并访问 InvocationContext 以进行高级状态管理场景。有了这些事件处理模式,你就能够构建响应迅速的流式应用程序,为用户提供实时反馈。接下来,你将学习如何通过 RunConfig 配置复杂的流式行为,包括多模态交互、会话恢复和成本控制。


Previous: Part 2: Sending Messages with LiveRequestQueue | Next: Part 4: Understanding RunConfig