Skip to content

第 3 部分:使用 run_live() 处理事件

run_live() 方法是 ADK 流式对话的主要入口点,它实现了一个异步生成器,随着对话的展开产生事件。本部分重点介绍理解和处理这些事件——这是实现你的应用程序、用户和 AI 模型之间实时交互的核心通信机制。

你将学习如何处理不同的事件类型(文本、音频、转录、工具调用),使用中断和回合完成信号管理对话流,序列化事件以进行网络传输,并利用 ADK 的自动工具执行。理解事件处理对于构建让用户感觉自然和实时的响应迅速的流式处理应用程序至关重要。

需要异步上下文

所有 run_live() 代码都需要异步上下文。有关详细信息和生产示例,请参阅 第 1 部分:FastAPI 应用程序示例

run_live() 如何工作

run_live() 是一个实时流式传输对话事件的异步生成器。它在生成事件时立即产生事件——没有缓冲,没有轮询,没有回调。事件在没有内部缓冲的情况下流式传输。总内存取决于会话持久性(例如,内存中与数据库),使其适用于快速交换和扩展会话。

方法签名和流程

用法:

Source reference: runners.py
# The method signature reveals the thoughtful design
async def run_live(
    self,
    *,                                      # 仅关键字参数
    user_id: Optional[str] = None,          # 用户标识(除非提供会话,否则为必需)
    session_id: Optional[str] = None,       # 会话跟踪(除非提供会话,否则为必需)
    live_request_queue: LiveRequestQueue,   # 双向通信通道
    run_config: Optional[RunConfig] = None, # 流式处理行为配置
    session: Optional[Session] = None,      # 已弃用:改用 user_id 和 session_id
) -> AsyncGenerator[Event, None]:           # 产生对话事件的生成器

正如其签名所示,每个流式对话都需要身份 (user_id)、连续性 (session_id)、通信 (live_request_queue) 和配置 (run_config)。返回类型——事件的异步生成器——承诺实时交付而不会压垮系统资源。

sequenceDiagram
participant Client as 客户端
participant Runner
participant Agent as 智能体
participant LLMFlow
participant Gemini

Client->>Runner: runner.run_live(user_id, session_id, queue, config)
Runner->>Agent: agent.run_live(context)
Agent->>LLMFlow: _llm_flow.run_live(context)
LLMFlow->>Gemini: "连接并流式传输"

loop "持续流式传输"
    Gemini-->>LLMFlow: LlmResponse
    LLMFlow-->>Agent: Event
    Agent-->>Runner: Event
    Runner-->>Client: Event (yield)
end

基本使用模式

消费来自 run_live() 的事件的最简单方法是使用 for 循环迭代异步生成器:

Demo implementation: main.py:182-190
async for event in runner.run_live(
    user_id=user_id,
    session_id=session_id,
    live_request_queue=live_request_queue,
    run_config=run_config
):
    event_json = event.model_dump_json(exclude_none=True, by_alias=True)
    logger.debug(f"[SERVER] Event: {event_json}")
    await websocket.send_text(event_json)

会话标识符

user_idsession_id 都必须与通过 SessionService.create_session() 创建会话时使用的标识符匹配。这些可以是基于你的应用程序需求的任何字符串值(例如,UUID、电子邮件地址、自定义令牌)。有关会话标识符的详细指导,请参阅 第 1 部分:获取或创建会话

run_live() 中的连接生命周期

run_live() 方法自动管理底层 Live API 连接生命周期:

连接状态:

  1. 初始化:调用 run_live() 时建立连接
  2. 活跃流式处理:通过 LiveRequestQueue(上行到模型)和 run_live()(下行从模型)进行双向通信
  3. 优雅关闭:调用 LiveRequestQueue.close() 时连接关闭
  4. 错误恢复:ADK 支持透明的会话恢复;通过 RunConfig.session_resumption 启用以处理瞬态故障。有关详细信息,请参阅 第 4 部分:Live API 会话恢复

run_live() 产生什么

run_live() 方法在智能体处理用户输入并生成响应时实时产生 Event 对象流。理解不同的事件类型有助于你构建能够适当处理文本、音频、转录、工具调用、元数据和错误的响应迅速的 UI。每种事件类型都在下面的部分中详细解释。

事件类型 描述
文本事件 使用 response_modalities=["TEXT"] 时的模型文本响应;包括用于流式 UI 管理的 partialturn_completeinterrupted 标志
带有内联数据的音频事件 使用 response_modalities=["AUDIO"] 时实时流式传输的原始音频字节 (inline_data);短暂的(不持久化到会话)
带有文件数据的音频事件 聚合到文件中并存储在工件中的音频;包含 file_data 引用而不是原始字节;可以持久化到会话历史记录
元数据事件 用于成本监控和配额跟踪的令牌使用信息 (prompt_token_count, candidates_token_count, total_token_count)
转录事件 RunConfig 中启用转录时,用于用户输入 (input_transcription) 和模型输出 (output_transcription) 的语音转文本
工具调用事件 来自模型的函数调用请求;ADK 自动处理执行
错误事件 带有 error_codeerror_message 字段的模型错误和连接问题

源引用

runners.py:746-775 中的完整事件类型处理实现

run_live() 何时退出

run_live() 事件循环可以在各种条件下退出。理解这些退出场景对于适当的资源清理和错误处理至关重要:

退出条件 触发器 优雅? 描述
手动关闭 live_request_queue.close() ✅ 是 用户显式关闭队列,发送 LiveRequest(close=True) 信号
所有智能体完成 SequentialAgent 中的最后一个智能体调用 task_completed() ✅ 是 在所有顺序智能体完成任务后
会话超时 达到 Live API 持续时间限制 ⚠️ 连接关闭 会话超过最大持续时间(见下面的限制)
提前退出 设置 end_invocation 标志 ✅ 是 在预处理期间或由工具/回调设置以提前终止
空事件 队列关闭信号 ✅ 是 指示事件流已结束的内部信号
错误 连接错误,异常 ❌ 否 未处理的异常或连接故障

SequentialAgent 行为

当使用 SequentialAgent 时,task_completed() 函数不会退出你的应用程序的 run_live() 循环。它仅发出当前智能体工作结束的信号,触发无缝过渡到序列中的下一个智能体。你的事件循环继续接收来自后续智能体的事件。循环仅在序列中的最后一个智能体完成时退出。

了解更多

有关会话恢复和连接恢复的详细信息,请参阅 第 4 部分:Live API 会话恢复。有关多智能体工作流,请参阅 多智能体工作流的最佳实践

保存到 ADK 会话的事件

并非 run_live() 产生的所有事件都会持久化到 ADK Session。当 run_live() 退出时,只有某些事件会保存到会话中,而其他事件则保持短暂。理解哪些事件被保存与哪些是短暂的,对于使用会话持久性、恢复或需要查看对话历史记录的应用程序至关重要。

源引用

runners.py:746-775 中的会话事件持久化逻辑

保存到 ADK Session 的事件:

这些事件持久化到 ADK Session 并在会话历史记录中可用:

  • 带有文件数据的音频事件:仅当 RunConfig.save_live_model_audio_to_sessionTrue 时才保存到 ADK Session;音频数据聚合到工件中的文件中,带有 file_data 引用
  • 使用元数据事件:始终保存以跟踪整个 ADK Session 的令牌消耗
  • 非部分转录事件:保存最终转录;部分转录不持久化
  • 函数调用和响应事件:始终保存以维护工具执行历史记录
  • 其他控制事件:保存大多数控制事件(例如,turn_completefinish_reason

未保存到 ADK Session 的事件:

这些事件是短暂的,仅在活跃流式处理期间产生给调用者:

  • 带有内联数据的音频事件inline_data 中的原始音频 Blob 数据从未保存到 ADK Session(仅用于实时播放)
  • 部分转录事件:仅用于实时显示;保存最终转录

音频持久化

要将音频对话保存到 ADK Session 以进行查看或恢复,请启用 RunConfig.save_live_blob = True。这将音频流持久化到工件。有关配置详细信息,请参阅 第 4 部分:save_live_blob

理解事件

事件是 ADK 双向流式处理系统中的核心通信机制。本节探索事件的完整生命周期——从它们如何通过多个管道层生成,到实现真正实时交互的并发处理模式,再到中断和回合完成的实际处理。你将了解事件类型(文本、音频、转录、工具调用)、用于网络传输的序列化策略,以及管理跨 Gemini Live API 和 Vertex AI Live API 平台的流式处理会话的连接生命周期。

Event 类

ADK 的 Event 类是一个 Pydantic 模型,代表流式对话中的所有通信。它扩展了 LlmResponse 并充当模型响应、用户输入、转录和控制信号的统一容器。

源引用

event.py:30-129llm_response.py:28-185 中的 Event 类实现

关键字段

所有应用程序必需:

  • content:包含文本、音频或函数调用作为 Content.parts
  • author:标识谁创建了事件("user" 或智能体名称)
  • partial:区分增量块和完整文本
  • turn_complete:指示何时再次启用用户输入
  • interrupted:指示何时停止渲染当前输出

对于语音/音频应用程序:

  • input_transcription:用户的口语(当在 RunConfig 中启用时)
  • output_transcription:模型的口语(当在 RunConfig 中启用时)
  • content.parts[].inline_data:用于播放的音频数据

对于工具执行:

  • content.parts[].function_call:模型的工具调用请求
  • content.parts[].function_response:工具执行结果
  • long_running_tool_ids:跟踪异步工具执行

对于调试和诊断:

  • usage_metadata:令牌计数和计费信息
  • cache_metadata:上下文缓存命中/未命中统计信息
  • finish_reason:模型停止生成的原因(例如,STOP、MAX_TOKENS、SAFETY)
  • error_code / error_message:故障诊断

作者语义

转录事件的作者为 "user";模型响应/事件使用智能体的名称作为 author(而不是 "model")。有关详细信息,请参阅 事件作者身份

理解事件标识

事件有两个重要的 ID 字段:

  • event.id:此特定事件的唯一标识符(格式:UUID)。每个事件都有一个新的 ID,即使是部分文本块。
  • event.invocation_id:当前调用中所有事件的共享标识符(格式:"e-" + UUID)。在 run_live() 中,来自单个流式处理会话的所有事件共享相同的 invocation_id。(有关调用的更多信息,请参阅 InvocationContext

用法:

# 此流式处理会话中的所有事件都将具有相同的 invocation_id
async for event in runner.run_live(...):
    print(f"Event ID: {event.id}")              # 每个事件唯一
    print(f"Invocation ID: {event.invocation_id}")  # 会话中的所有事件相同

用例: - event.id:在日志中跟踪单个事件,对事件进行重复数据删除 - event.invocation_id:按对话会话对事件进行分组,过滤特定于会话的事件

事件作者身份

在实时流式处理模式下,Event.author 字段遵循特殊语义以保持对话清晰:

模型响应:由智能体名称(例如,"my_agent")创作,而不是字面字符串 "model"

  • 这启用了多智能体场景,你需要跟踪哪个智能体生成了响应
  • 示例:Event(author="customer_service_agent", content=...)

用户转录:当事件包含转录的用户音频时,作者为 "user"

它是如何工作的

  1. Gemini Live API 返回带有 content.role == 'user' 的用户音频转录
  2. ADK 的 get_author_for_event() 函数检查此角色标记
  3. 如果 content.role == 'user',ADK 将 Event.author 设置为 "user"
  4. 否则,ADK 将 Event.author 设置为智能体名称(例如,"my_agent"

这种转换确保转录的用户输入在你的应用程序的对话历史记录中正确归因于用户,即使它流经模型的响应流。

  • 示例:输入音频转录 → Event(author="user", input_transcription=..., content.role="user")

这为什么重要

  • 在多智能体应用程序中,你可以按智能体过滤事件:events = [e for e in stream if e.author == "my_agent"]
  • 当显示对话历史记录时,使用 event.author 显示谁说了什么
  • 转录事件正确归因于用户,即使它们流经模型

源引用

base_llm_flow.py:281-294 中的作者归因逻辑

事件类型和处理

ADK 通过 runner.run_live() 流式传输不同的事件类型,以支持不同的交互模式:用于传统聊天的文本响应,用于语音输出的音频块,用于可访问性和日志记录的转录,以及用于函数执行的工具调用通知。每个事件都包含元数据标志(partialturn_completeinterrupted),用于控制 UI 状态转换并启用自然、类似人类的对话流。理解如何识别和处理这些事件类型对于构建响应迅速的流式处理应用程序至关重要。

文本事件

最常见的事件类型,包含使用 RunConfig 中的 response_modalities 指定为 ["TEXT"] 模式时的模型文本响应:

用法:

async for event in runner.run_live(...):
    if event.content and event.content.parts:
        if event.content.parts[0].text:
            text = event.content.parts[0].text

            if not event.partial:
                # 你的更新流式显示的逻辑
                update_streaming_display(text)

默认响应模态行为

当未显式设置 response_modalities(即 None)时,ADK 在 run_live() 开始时自动默认为 ["AUDIO"] 模式。这意味着:

  • 如果你不提供 RunConfig:默认为 ["AUDIO"]
  • 如果你提供没有 response_modalities 的 RunConfig:默认为 ["AUDIO"]
  • 如果你显式设置 response_modalities:使用你的设置(不应用默认值)

为什么存在此默认值:某些本机音频模型需要显式设置响应模态。为了确保与所有模型的兼容性,ADK 默认为 ["AUDIO"]

对于纯文本应用程序:始终在 RunConfig 中显式设置 response_modalities=["TEXT"],以避免接收意外的音频事件。

示例:

# 显式文本模式
run_config = RunConfig(
    response_modalities=["TEXT"],
    streaming_mode=StreamingMode.BIDI
)

关键事件标志:

这些标志帮助你在 UI 中管理流式文本显示和对话流:

  • event.partial:流式传输期间增量文本块为 True;完整合并文本为 False
  • event.turn_complete:当模型完成其完整响应时为 True
  • event.interrupted:当用户打断模型的响应时为 True

了解更多

有关使用 partialturn_completeinterrupted 标志管理对话流和 UI 状态的详细指导,请参阅 处理文本事件

音频事件

RunConfig 中的 response_modalities 配置为 ["AUDIO"] 时,模型生成音频输出而不是文本,你将在事件流中接收音频数据:

配置:

# 为音频响应配置 RunConfig
run_config = RunConfig(
    response_modalities=["AUDIO"],
    streaming_mode=StreamingMode.BIDI
)

# 音频作为 inline_data 到达 event.content.parts
async for event in runner.run_live(..., run_config=run_config):
    if event.content and event.content.parts:
        part = event.content.parts[0]
        if part.inline_data:
            # 音频事件结构:
            # part.inline_data.data: 字节(原始 PCM 音频)
            # part.inline_data.mime_type: 字符串(例如,"audio/pcm")
            audio_data = part.inline_data.data
            mime_type = part.inline_data.mime_type

            print(f"Received {len(audio_data)} bytes of {mime_type}")
            # 你的播放音频的逻辑
            await play_audio(audio_data)

了解更多

  • response_modalities 控制模型如何生成输出——你必须为每个会话选择 ["TEXT"] 用于文本响应或 ["AUDIO"] 用于音频响应。你不能同时使用两种模态。有关配置详细信息,请参阅 第 4 部分:响应模态
  • 有关音频格式、发送/接收音频和音频处理流的全面覆盖,请参阅 第 5 部分:如何使用音频、图像和视频

带有文件数据的音频事件

当音频数据聚合并不保存为工件中的文件时,ADK 产生包含 file_data 引用而不是原始 inline_data 的事件。这对于将音频持久化到会话历史记录很有用。

源引用

runners.py:752-754audio_cache_manager.py:192-194 中的音频文件聚合逻辑

接收音频文件引用:

async for event in runner.run_live(
    user_id=user_id,
    session_id=session_id,
    live_request_queue=queue,
    run_config=run_config
):
    if event.content and event.content.parts:
        for part in event.content.parts:
            if part.file_data:
                # 音频聚合到保存在工件中的文件中
                file_uri = part.file_data.file_uri
                mime_type = part.file_data.mime_type

                print(f"Audio file saved: {file_uri} ({mime_type})")
                # 从工件服务检索音频文件以进行播放

文件数据与内联数据:

  • 内联数据 (part.inline_data):实时流式传输的原始音频字节;短暂且不保存到会话
  • 文件数据 (part.file_data):存储在工件中的音频文件的引用;可以持久化到会话历史记录

输入和输出音频数据都聚合到音频文件中并保存在工件服务中。文件引用作为 file_data 包含在事件中,允许你稍后检索音频。

会话持久性

要将带有文件数据的音频事件保存到会话历史记录,请启用 RunConfig.save_live_model_audio_to_session = True。这允许从持久化会话中查看或重播音频对话。

元数据事件

使用元数据事件包含用于监控成本和配额消耗的令牌使用信息。run_live() 方法将这些事件与内容事件分开产生。

源引用

llm_response.py:105-106 中的使用元数据结构

访问令牌使用情况:

async for event in runner.run_live(
    user_id=user_id,
    session_id=session_id,
    live_request_queue=queue,
    run_config=run_config
):
    if event.usage_metadata:
        print(f"Prompt tokens: {event.usage_metadata.prompt_token_count}")
        print(f"Response tokens: {event.usage_metadata.candidates_token_count}")
        print(f"Total tokens: {event.usage_metadata.total_token_count}")

        # 跟踪整个会话的累积使用情况
        total_tokens += event.usage_metadata.total_token_count or 0

可用元数据字段:

  • prompt_token_count:输入(提示和上下文)中的令牌数
  • candidates_token_count:模型响应中的令牌数
  • total_token_count:提示和响应令牌的总和
  • cached_content_token_count:从缓存提供的令牌数(当使用上下文缓存时)

成本监控

使用元数据事件允许在流式处理会话期间进行实时成本跟踪。你可以实施配额限制,向用户显示使用情况,或记录指标以进行计费和分析。

转录事件

当在 RunConfig 中启用转录时,你将作为单独的事件接收转录:

配置:

async for event in runner.run_live(...):
    # 用户的口语(当启用 input_audio_transcription 时)
    if event.input_transcription:
        # 你的显示用户转录的逻辑
        display_user_transcription(event.input_transcription)

    # 模型的口语(当启用 output_audio_transcription 时)
    if event.output_transcription:
        # 你的显示模型转录的逻辑
        display_model_transcription(event.output_transcription)

这些启用了可访问性功能和对话日志记录,而无需单独的转录服务。

了解更多

有关在 RunConfig 中启用转录和理解转录交付的详细信息,请参阅 第 5 部分:音频转录

工具调用事件

当模型请求工具执行时:

用法:

async for event in runner.run_live(...):
    if event.content and event.content.parts:
        for part in event.content.parts:
            if part.function_call:
                # 模型正在请求工具执行
                tool_name = part.function_call.name
                tool_args = part.function_call.args
                # ADK 自动处理执行

ADK 自动处理工具调用——除非实施自定义工具执行逻辑,否则你通常不需要直接处理这些。

了解更多

有关 ADK 如何自动执行工具、处理函数响应以及支持长时间运行和流式工具的详细信息,请参阅 run_live() 中的自动工具执行

错误事件

生产应用程序需要强大的错误处理来优雅地处理模型错误和连接问题。ADK 通过 error_codeerror_message 字段显示错误:

用法:

import logging

logger = logging.getLogger(__name__)

try:
    async for event in runner.run_live(...):
        # 处理来自模型或连接的错误
        if event.error_code:
            logger.error(f"Model error: {event.error_code} - {event.error_message}")

            # 向客户端发送错误通知
            await websocket.send_json({
                "type": "error",
                "code": event.error_code,
                "message": event.error_message
            })

            # 根据错误严重性决定是继续还是中断
            if event.error_code in ["SAFETY", "PROHIBITED_CONTENT", "BLOCKLIST"]:
                # 内容策略违规 - 通常无法重试
                break  # 终端错误 - 退出循环
            elif event.error_code == "MAX_TOKENS":
                # 达到令牌限制 - 可能需要调整配置
                break
            # 对于其他错误,你可能会继续或实施重试逻辑
            continue  # 瞬态错误 - 继续处理

        # 仅在没有错误时进行正常事件处理
        if event.content and event.content.parts:
            # ... 处理内容
            pass
finally:
    queue.close()  # 始终清理连接

Note

上面的示例显示了检查 error_codeerror_message 的基本结构。有关带有用户通知、重试逻辑和上下文日志记录的生产就绪错误处理,请参阅下面的实际场景。

何时使用 breakcontinue

关键决定是:模型的响应能否有意义地继续?

场景 1:内容策略违规(使用 break

你正在构建一个客户支持聊天机器人。用户问了一个触发 SAFETY 过滤器的不当问题:

示例:

if event.error_code in ["SAFETY", "PROHIBITED_CONTENT", "BLOCKLIST"]:
    # 模型已停止生成 - 无法继续
    await websocket.send_json({
        "type": "error",
        "message": "I can't help with that request. Please ask something else."
    })
    break  # 退出循环 - 模型不会为此回合发送更多事件

为什么 break 模型已终止其响应。此回合不会再有事件到来。继续只会浪费资源等待不会到达的事件。


场景 2:流式传输期间的网络故障(使用 continue

你正在构建一个语音转录服务。在转录过程中,出现了短暂的网络故障:

示例:

if event.error_code == "UNAVAILABLE":
    # 临时网络问题
    logger.warning(f"Network hiccup: {event.error_message}")
    # 不要为可能自行解决的短暂瞬态问题通知用户
    continue  # 继续监听 - 模型可能会恢复并继续

为什么 continue 这是一个瞬态错误。连接可能会恢复,模型可能会继续流式传输转录。中断会过早结束可能恢复的流。

用户通知

对于短暂的瞬态错误(持续 <1 秒),不要通知用户——他们不会注意到故障。但是,如果错误持续存在或影响用户体验(例如,流式传输暂停 >3 秒),请优雅地通知他们:“遇到连接问题,正在重试...”


场景 3:达到令牌限制(使用 break

你正在生成一篇长篇文章并达到了最大令牌限制:

示例:

if event.error_code == "MAX_TOKENS":
    # 模型已达到输出限制
    await websocket.send_json({
        "type": "complete",
        "message": "Response reached maximum length",
        "truncated": True
    })
    break  # 模型已完成 - 不会再生成更多令牌

为什么 break 模型已达到其输出限制并停止。继续不会产生更多令牌。


场景 4:带有重试逻辑的速率限制(使用 continue 并回退)

你正在运行一个偶尔达到速率限制的高流量应用程序:

示例:

retry_count = 0
max_retries = 3

async for event in runner.run_live(...):
    if event.error_code == "RESOURCE_EXHAUSTED":
        retry_count += 1
        if retry_count > max_retries:
            logger.error("Max retries exceeded")
            break  # 多次失败后放弃

        # 等待并重试
        await asyncio.sleep(2 ** retry_count)  # 指数回退
        continue  # 继续监听 - 速率限制可能会清除

    # 成功事件重置计数器
    retry_count = 0

为什么 continue(最初)? 速率限制通常是暂时的。通过指数回退,流可能会恢复。但在多次失败后,break 以避免无限等待。


决策框架:

错误类型 操作 原因
SAFETY, PROHIBITED_CONTENT break 模型终止响应
MAX_TOKENS break 模型完成生成
UNAVAILABLE, DEADLINE_EXCEEDED continue 瞬态网络/超时问题
RESOURCE_EXHAUSTED (rate limit) continue 并带有重试逻辑 短暂等待后可能会恢复
未知错误 continue(带有日志记录) 谨慎行事

关键:始终使用 finally 进行清理

用法:

try:
    async for event in runner.run_live(...):
        # ... 错误处理 ...
finally:
    queue.close()  # 无论你中断还是正常完成,清理都会运行

无论你 break 还是循环自然结束,finally 都能确保连接正确关闭。

错误代码参考:

ADK 错误代码来自底层 Gemini API。以下是你将遇到的最常见错误代码:

错误代码 类别 描述 推荐操作
SAFETY 内容策略 内容违反安全策略 break - 通知用户,记录事件
PROHIBITED_CONTENT 内容策略 内容包含禁止材料 break - 显示策略违规消息
BLOCKLIST 内容策略 内容匹配阻止列表 break - 警报用户,不要重试
MAX_TOKENS 限制 输出达到最大令牌限制 break - 优雅截断,总结
RESOURCE_EXHAUSTED 速率限制 超过配额或速率限制 continue 并回退 - 延迟后重试
UNAVAILABLE 瞬态 服务暂时不可用 continue - 重试,可能自行解决
DEADLINE_EXCEEDED 瞬态 请求超时 continue - 考虑带有回退的重试
CANCELLED 客户端 客户端取消了请求 break - 清理资源
UNKNOWN 系统 发生未指定的错误 continue 并带有日志记录 - 记录以进行分析

有关完整的错误代码列表和描述,请参阅官方文档:

官方文档

错误处理的最佳实践:

  • 始终首先检查错误:在处理内容之前处理 error_code 以避免处理无效事件
  • 记录带有上下文的错误:在错误日志中包含 session_id 和 user_id 以进行调试
  • 分类错误:区分可重试错误(瞬态故障)和终端错误(内容策略违规)
  • 优雅地通知用户:显示用户友好的错误消息,而不是原始错误代码
  • 实施重试逻辑:对于瞬态错误,考虑使用指数回退自动重试
  • 监控错误率:跟踪错误类型和频率以识别系统性问题
  • 处理内容策略错误:对于 SAFETYPROHIBITED_CONTENTBLOCKLIST 错误,通知用户其内容违反策略

处理文本事件

理解 partialinterruptedturn_complete 标志对于构建响应迅速的流式 UI 至关重要。这些标志使你能够在流式传输期间提供实时反馈,优雅地处理用户中断,并检测对话边界以进行适当的状态管理。

处理 partial

此标志帮助你区分增量文本块和完整合并文本,从而实现带有适当最终确认的平滑流式显示。

用法:

async for event in runner.run_live(...):
    if event.content and event.content.parts:
        if event.content.parts[0].text:
            text = event.content.parts[0].text

            if event.partial:
                # 你的流式 UI 更新逻辑在这里
                update_streaming_display(text)
            else:
                # 你的完整消息显示逻辑在这里
                display_complete_message(text)

partial 标志语义:

  • partial=True:此事件中的文本是增量的——它仅包含自上一个事件以来的新文本
  • partial=False:此事件中的文本是完整的——它包含此响应段的完整合并文本

Note

partial 标志仅对文本内容 (event.content.parts[].text) 有意义。对于其他内容类型:

  • 音频事件inline_data 中的每个音频块都是独立的(不发生合并)
  • 工具调用:函数调用和响应始终是完整的(部分不适用)
  • 转录:转录事件在产生时始终是完整的

示例流:

Event 1: partial=True,  text="Hello",        turn_complete=False
Event 2: partial=True,  text=" world",       turn_complete=False
Event 3: partial=False, text="Hello world",  turn_complete=False
Event 4: partial=False, text="",             turn_complete=True  # 回合完成

重要的时间关系:

  • partial=False 可以在一个回合中发生多次(例如,在每个句子之后)
  • turn_complete=True 在模型完整响应的最末尾一次发生,在一个单独的事件
  • 你可能会收到:partial=False(句子 1)→ partial=False(句子 2)→ turn_complete=True
  • 合并的文本事件(带有内容的 partial=False)始终在 turn_complete=True 事件之前产生

Note

ADK 在内部累积来自 partial=True 事件的所有文本。当你收到 partial=False 的事件时,文本内容等于所有前面的 partial=True 块的总和。这意味着:

  • 如果你不需要流式显示,你可以安全地忽略所有 partial=True 事件并仅处理 partial=False 事件
  • 如果你显示 partial=True 事件,partial=False 事件提供用于验证或存储的完整合并文本
  • 这种累积由 ADK 的 StreamingResponseAggregator 自动处理——你不需要手动连接部分文本块

处理 interrupted 标志

这通过检测用户何时在响应中途打断模型来启用自然对话流,允许你立即停止渲染过时的内容。

当用户在模型仍在生成响应时发送新输入(在语音对话中很常见)时,你将收到 interrupted=True 的事件:

用法:

async for event in runner.run_live(...):
    if event.interrupted:
        # 你的停止显示部分文本并清除打字指示器的逻辑
        stop_streaming_display()

        # 你的在 UI 中显示中断的逻辑(可选)
        show_user_interruption_indicator()

示例 - 中断场景:

Model: "The weather in San Francisco is currently..."
User: [interrupts] "Actually, I meant San Diego"
→ event.interrupted=True received
→ Your app: stop rendering model response, clear UI
→ Model processes new input
Model: "The weather in San Diego is..."

何时使用中断处理:

  • 语音对话:当用户开始说话时立即停止音频播放
  • 清除 UI 状态:删除打字指示器和部分文本显示
  • 对话日志记录:标记哪些响应被中断(不完整)
  • 用户反馈:显示已识别中断的视觉指示

处理 turn_complete 标志

这发出对话边界的信号,允许你更新 UI 状态(启用输入控件,隐藏指示器)并在日志和分析中标记适当的回合边界。

当模型完成其完整响应时,你将收到 turn_complete=True 的事件:

用法:

async for event in runner.run_live(...):
    if event.turn_complete:
        # 你的更新 UI 以显示“准备输入”状态的逻辑
        enable_user_input()
        # 你的隐藏打字指示器的逻辑
        hide_typing_indicator()

        # 你的在日志中标记对话边界的逻辑
        log_turn_boundary()

事件标志组合:

理解 turn_completeinterrupted 如何组合有助于你处理所有对话状态:

场景 turn_complete interrupted 你的应用程序应该
正常完成 True False 启用输入,显示“准备就绪”状态
用户在响应中途打断 False True 停止显示,清除部分内容
在结束时打断 True True 与正常完成相同(回合完成)
响应中途(部分文本) False False 继续显示流式文本

实现:

async for event in runner.run_live(...):
    # 处理流式文本
    if event.content and event.content.parts and event.content.parts[0].text:
        if event.partial:
            # 你的显示打字指示器并更新部分文本的逻辑
            update_streaming_text(event.content.parts[0].text)
        else:
            # Your logic to display complete text chunk
            display_text(event.content.parts[0].text)

    # Handle interruption
    if event.interrupted:
        # Your logic to stop audio playback and clear indicators
        stop_audio_playback()
        clear_streaming_indicators()

    # Handle turn completion
    if event.turn_complete:
        # Your logic to enable user input
        show_input_ready_state()
        enable_microphone()

Common Use Cases:

  • UI state management: Show/hide "ready for input" indicators, typing animations, microphone states
  • Audio playback control: Know when to stop rendering audio chunks from the model
  • Conversation logging: Mark clear boundaries between turns for history/analytics
  • Streaming optimization: Stop buffering when turn is complete

Turn completion and caching: Audio/transcript caches are flushed automatically at specific points during streaming:

  • On turn completion (turn_complete=True): Both user and model audio caches are flushed
  • On interruption (interrupted=True): Model audio cache is flushed
  • On generation completion: Model audio cache is flushed

Serializing Events to JSON

ADK Event objects are Pydantic models, which means they come with powerful serialization capabilities. The model_dump_json() method is particularly useful for streaming events over network protocols like WebSockets or Server-Sent Events (SSE).

Using event.model_dump_json()

This provides a simple one-liner to convert ADK events into JSON format that can be sent over network protocols like WebSockets or SSE.

The model_dump_json() method serializes an Event object to a JSON string:

Demo implementation: main.py:178-191
async def downstream_task() -> None:
    """Receives Events from run_live() and sends to WebSocket."""
    async for event in runner.run_live(
        user_id=user_id,
        session_id=session_id,
        live_request_queue=live_request_queue,
        run_config=run_config
    ):
        event_json = event.model_dump_json(exclude_none=True, by_alias=True)
        await websocket.send_text(event_json)

What gets serialized:

  • Event metadata (author, server_content fields)
  • Content (text, audio data, function calls)
  • Event flags (partial, turn_complete, interrupted)
  • Transcription data (input_transcription, output_transcription)
  • Tool execution information

When to use model_dump_json():

  • ✅ Streaming events over network (WebSocket, SSE)
  • ✅ Logging/persistence to JSON files
  • ✅ Debugging and inspection
  • ✅ Integration with JSON-based APIs

When NOT to use it:

  • ❌ In-memory processing (use event objects directly)
  • ❌ High-frequency events where serialization overhead matters
  • ❌ When you only need a few fields (extract them directly instead)

Performance Warning

Binary audio data in event.content.parts[].inline_data will be base64-encoded when serialized to JSON, significantly increasing payload size (~133% overhead). For production applications with audio, send binary data separately using WebSocket binary frames or multipart HTTP. See Optimization for Audio Transmission for details.

Serialization options

This allows you to reduce payload sizes by excluding unnecessary fields, improving network performance and client processing speed.

Pydantic's model_dump_json() supports several useful parameters:

Usage:

# Exclude None values for smaller payloads (with camelCase field names)
event_json = event.model_dump_json(exclude_none=True, by_alias=True)

# Custom exclusions (e.g., skip large binary audio)
event_json = event.model_dump_json(
    exclude={'content': {'parts': {'__all__': {'inline_data'}}}},
    by_alias=True
)

# Include only specific fields
event_json = event.model_dump_json(
    include={'content', 'author', 'turn_complete', 'interrupted'},
    by_alias=True
)

# Pretty-printed JSON (for debugging)
event_json = event.model_dump_json(indent=2, by_alias=True)

The bidi-demo uses exclude_none=True to minimize payload size by omitting fields with None values.

Deserializing on the Client

This shows how to parse and handle serialized events on the client side, enabling responsive UI updates based on event properties like turn completion and interruptions.

On the client side (JavaScript/TypeScript), parse the JSON back to objects:

Demo implementation: app.js:297-576
// Handle incoming messages
websocket.onmessage = function (event) {
    // Parse the incoming ADK Event
    const adkEvent = JSON.parse(event.data);

    // Handle turn complete event
    if (adkEvent.turnComplete === true) {
        // Remove typing indicator from current message
        if (currentBubbleElement) {
            const textElement = currentBubbleElement.querySelector(".bubble-text");
            const typingIndicator = textElement.querySelector(".typing-indicator");
            if (typingIndicator) {
                typingIndicator.remove();
            }
        }
        currentMessageId = null;
        currentBubbleElement = null;
        return;
    }

    // Handle interrupted event
    if (adkEvent.interrupted === true) {
        // Stop audio playback if it's playing
        if (audioPlayerNode) {
            audioPlayerNode.port.postMessage({ command: "endOfAudio" });
        }

        // Keep the partial message but mark it as interrupted
        if (currentBubbleElement) {
            const textElement = currentBubbleElement.querySelector(".bubble-text");

            // Remove typing indicator
            const typingIndicator = textElement.querySelector(".typing-indicator");
            if (typingIndicator) {
                typingIndicator.remove();
            }

            // Add interrupted marker
            currentBubbleElement.classList.add("interrupted");
        }

        currentMessageId = null;
        currentBubbleElement = null;
        return;
    }

    // Handle content events (text or audio)
    if (adkEvent.content && adkEvent.content.parts) {
        const parts = adkEvent.content.parts;

        for (const part of parts) {
            // Handle text
            if (part.text) {
                // Add a new message bubble for a new turn
                if (currentMessageId == null) {
                    currentMessageId = Math.random().toString(36).substring(7);
                    currentBubbleElement = createMessageBubble(part.text, false, true);
                    currentBubbleElement.id = currentMessageId;
                    messagesDiv.appendChild(currentBubbleElement);
                } else {
                    // Update the existing message bubble with accumulated text
                    const existingText = currentBubbleElement.querySelector(".bubble-text").textContent;
                    const cleanText = existingText.replace(/\.\.\.$/, '');
                    updateMessageBubble(currentBubbleElement, cleanText + part.text, true);
                }

                scrollToBottom();
            }
        }
    }
};

📖 Demo Implementation: See the complete WebSocket message handler in app.js:297-576

Optimization for Audio Transmission

Base64-encoded binary audio in JSON significantly increases payload size. For production applications, use a single WebSocket connection with both binary frames (for audio) and text frames (for metadata):

Usage:

async for event in runner.run_live(...):
    # Check for binary audio
    has_audio = (
        event.content and
        event.content.parts and
        any(p.inline_data for p in event.content.parts)
    )

    if has_audio:
        # Send audio via binary WebSocket frame
        for part in event.content.parts:
            if part.inline_data:
                await websocket.send_bytes(part.inline_data.data)

        # Send metadata only (much smaller)
        metadata_json = event.model_dump_json(
            exclude={'content': {'parts': {'__all__': {'inline_data'}}}},
            by_alias=True
        )
        await websocket.send_text(metadata_json)
    else:
        # Text-only events can be sent as JSON
        await websocket.send_text(event.model_dump_json(exclude_none=True, by_alias=True))

This approach reduces bandwidth by ~75% for audio-heavy streams while maintaining full event metadata.

Automatic Tool Execution in run_live()

Source Reference

See automatic tool execution implementation in functions.py

One of the most powerful features of ADK's run_live() is automatic tool execution. Unlike the raw Gemini Live API, which requires you to manually handle tool calls and responses, ADK abstracts this complexity entirely.

The Challenge with Raw Live API

When using the Gemini Live API directly (without ADK), tool use requires manual orchestration:

  1. Receive function calls from the model
  2. Execute the tools yourself
  3. Format function responses correctly
  4. Send responses back to the model

This creates significant implementation overhead, especially in streaming contexts where you need to handle multiple concurrent tool calls, manage errors, and coordinate with ongoing audio/text streams.

How ADK Simplifies Tool Use

With ADK, tool execution becomes declarative. Simply define tools on your Agent:

Demo implementation: agent.py:11-16
import os
from google.adk.agents import Agent
from google.adk.tools import google_search

agent = Agent(
    name="google_search_agent",
    model=os.getenv("DEMO_AGENT_MODEL", "gemini-2.5-flash-native-audio-preview-09-2025"),
    tools=[google_search],
    instruction="You are a helpful assistant that can search the web."
)

When you call runner.run_live(), ADK automatically:

  • Detects when the model returns function calls in streaming responses
  • Executes tools in parallel for maximum performance
  • Handles before/after tool callbacks for custom logic
  • Formats function responses according to Live API requirements
  • Sends responses back to the model seamlessly
  • Yields both function call and response events to your application

Tool Execution Events

When tools execute, you'll receive events through the run_live() async generator:

Usage:

async for event in runner.run_live(...):
    # Function call event - model requesting tool execution
    if event.get_function_calls():
        print(f"Model calling: {event.get_function_calls()[0].name}")

    # Function response event - tool execution result
    if event.get_function_responses():
        print(f"Tool result: {event.get_function_responses()[0].response}")

You don't need to handle the execution yourself—ADK does it automatically. You just observe the events as they flow through the conversation.

Learn More

The bidi-demo sends all events (including function calls and responses) directly to the WebSocket client without server-side filtering. This allows the client to observe tool execution in real-time through the event stream. See the downstream task in main.py:178-191

Long-Running and Streaming Tools

ADK supports advanced tool patterns that integrate seamlessly with run_live():

Long-Running Tools: Tools that require human approval or take extended time to complete. Mark them with is_long_running=True. In resumable async flows, ADK can pause after long-running calls. In live flows, streaming continues; long_running_tool_ids indicate pending operations and clients can display appropriate UI.

Streaming Tools: Tools that accept an input_stream parameter with type LiveRequestQueue can send real-time updates back to the model during execution, enabling progressive responses.

How Streaming Tools Work

When you call runner.run_live(), ADK inspects your agent's tools at initialization (lines 828-865 in runners.py) to identify streaming tools by checking parameter type annotations for LiveRequestQueue.

Queue creation and lifecycle:

  1. Creation: ADK creates an ActiveStreamingTool with a dedicated LiveRequestQueue for each streaming tool at the start of run_live() (before processing any events)
  2. Storage: These queues are stored in invocation_context.active_streaming_tools[tool_name] for the duration of the invocation
  3. Injection: When the model calls the tool, ADK automatically injects the tool's queue as the input_stream parameter (lines 238-253 in function_tool.py)
  4. Usage: The tool can use this queue to send real-time updates back to the model during execution
  5. Lifecycle: The queues persist for the entire run_live() invocation (one InvocationContext = one run_live() call) and are destroyed when run_live() exits

Queue distinction:

  • Main queue (live_request_queue parameter): Created by your application, used for client-to-model communication
  • Tool queues (active_streaming_tools[tool_name].stream): Created automatically by ADK, used for tool-to-model communication during execution

Both types of queues are LiveRequestQueue instances, but they serve different purposes in the streaming architecture.

This enables tools to provide incremental updates, progress notifications, or partial results during long-running operations.

Code reference: See runners.py:828-865 (tool detection) and function_tool.py:238-253 (parameter injection) for implementation details.

See the Tools Guide for implementation examples.

关键要点

原始 Live API 工具使用与 ADK 之间的区别是显着的:

方面 原始 Live API ADK run_live()
工具声明 手动模式定义 从 Python 函数自动
工具执行 应用程序代码中的手动处理 自动并行执行
响应格式化 手动 JSON 构建 自动
错误处理 手动 try/catch 和格式化 自动捕获和报告
流式集成 手动协调 自动事件产生
开发人员体验 复杂,容易出错 声明性,简单

这种自动处理是 ADK 的核心价值主张之一——它将 Live API 工具使用的复杂性转变为简单、声明性的开发人员体验。

InvocationContext:执行状态容器

源引用

invocation_context.py 中的 InvocationContext 实现

虽然 run_live() 返回一个用于消费事件的 AsyncGenerator,但在内部它创建并管理一个 InvocationContext——ADK 的统一状态载体,封装了完整对话调用所需的一切。一个 InvocationContext 对应一个 run_live() 循环——它在你调用 run_live() 时创建,并在整个流式会话期间持续存在。

把它想象成一个旅行笔记本,从开始到结束伴随着对话,收集信息,跟踪进度,并为沿途的每个组件提供上下文。它是 ADK 对上下文概念的运行时实现,提供实时对话期间所需的执行时状态和服务。有关 ADK 中上下文的更广泛概述,请参阅 ADK 中的上下文

什么是调用?

一个 调用 代表一个完整的交互周期:

  • 从用户输入(文本、音频或控制信号)开始
  • 可能涉及一个或多个智能体调用
  • 在生成最终响应或明确终止时结束
  • runner.run_live()runner.run_async() 编排

这与 智能体调用(执行单个智能体的逻辑)和 步骤(单个 LLM 调用加上任何由此产生的工具执行)是不同的。

层级结构如下:

   ┌─────────────────────── 调用 ──────────────────────────┐
   ┌──────────── llm_agent_call_1 ────────────┐ ┌─ agent_call_2 ─┐
   ┌──── step_1 ────────┐ ┌───── step_2 ──────┐
   [call_llm] [call_tool] [call_llm] [transfer]

谁使用 InvocationContext?

InvocationContext 在不同层面服务于不同的受众:

  • ADK 的内部组件(主要用户):Runner、Agent、LLMFlow 和 GeminiLlmConnection 都接收、读取和写入 InvocationContext,因为它流经堆栈。这种共享上下文实现了无缝协调,而无需紧密耦合。

  • 应用程序开发人员(间接受益者):你通常不会在应用程序代码中直接创建或操作 InvocationContext。相反,你受益于 InvocationContext 在幕后实现的清晰、简化的 API——例如优雅的 async for event in runner.run_live() 模式。

  • 工具和回调开发人员(直接访问):当你实现自定义工具或回调时,你会收到 InvocationContext 作为参数。这使你可以直接访问对话状态、会话服务和控制标志(如 end_invocation),以实现复杂的行为。

InvocationContext 包含什么

当你实现自定义工具或回调时,你会收到 InvocationContext 作为参数。以下是你可以使用的内容:

工具/回调开发人员的基本字段:

  • context.invocation_id:当前调用标识符(每个 run_live() 调用唯一)
  • context.session
    • context.session.events:会话历史中的所有事件(跨所有调用)
    • context.session.state:用于会话数据的持久键值存储
    • context.session.user_id:用户身份
  • context.run_config:当前流式配置(响应模态、转录设置、成本限制)
  • context.end_invocation:将其设置为 True 可立即终止对话(对于错误处理或策略执行很有用)

工具开发中的示例用例:

# 示例:显示常见 InvocationContext 模式的综合工具实现
def my_tool(context: InvocationContext, query: str):
    # 访问用户身份
    user_id = context.session.user_id

    # 检查这是否是用户的第一条消息
    event_count = len(context.session.events)
    if event_count == 0:
        return "Welcome! This is your first message."

    # 访问对话历史记录
    recent_events = context.session.events[-5:]  # 最后 5 个事件

    # 访问持久会话状态
    # 会话状态跨调用持久存在(不仅仅是此流式会话)
    user_preferences = context.session.state.get('user_preferences', {})

    # 更新会话状态(将被持久化)
    context.session.state['last_query_time'] = datetime.now().isoformat()

    # 访问持久化服务
    if context.artifact_service:
        # 存储大文件/音频
        await context.artifact_service.save_artifact(
            app_name=context.session.app_name,
            user_id=context.session.user_id,
            session_id=context.session.id,
            filename="result.bin",
            artifact=types.Part(inline_data=types.Blob(mime_type="application/octet-stream", data=data)),
        )

    # 使用上下文处理查询
    result = process_query(query, context=recent_events, preferences=user_preferences)

    # 在特定场景下终止对话
    if result.get('error'):
        # 处理错误 - 停止对话
        context.end_invocation = True

    return result

理解 InvocationContext 对于掌握 ADK 如何维护状态、协调执行以及启用多智能体工作流和可恢复性等高级功能至关重要。即使你从未直接接触它,了解流经应用程序的内容也有助于你设计更好的智能体并更有效地调试问题。

多智能体工作流的最佳实践

ADK 的双向流式处理支持三种智能体架构:单智能体(一个智能体处理整个对话)、带子智能体的多智能体(协调器智能体使用 transfer_to_agent 动态路由到专家智能体)和 顺序工作流智能体(智能体使用 task_completed 在固定管道中执行)。本节重点介绍顺序工作流的最佳实践,其中理解智能体转换和状态共享对于流畅的 BIDI 通信至关重要。

了解更多

有关多智能体模式的全面覆盖,请参阅 ADK 文档中的 作为编排器的工作流智能体

在使用 ADK 构建多智能体系统时,理解智能体在实时流式传输期间如何转换和共享状态对于流畅的 BIDI 通信至关重要。

带有 BIDI 流式处理的 SequentialAgent

SequentialAgent 启用工作流管道,其中智能体一个接一个地执行。每个智能体在下一个开始之前完成其任务。实时流式处理的挑战在于确定智能体何时完成了连续音频或视频输入的处理。

源引用

sequential_agent.py:119-159 中的 SequentialAgent 实现

它是如何工作的:

ADK 自动向序列中的每个智能体添加一个 task_completed() 函数。当模型调用此函数时,它发出完成信号并触发向下一个智能体的转换:

用法:

# SequentialAgent 自动将此工具添加到每个子智能体
def task_completed():
    """
    发出智能体已成功完成用户的问题或任务的信号。
    """
    return 'Task completion signaled.'

关键见解是 智能体转换在同一个 run_live() 事件流中透明地发生。你的应用程序不需要管理转换——只需统一消费事件:

用法:

async def handle_sequential_workflow():
    """带有 BIDI 流式处理的 SequentialAgent 的推荐模式。"""

    # 1. 跨序列中所有智能体共享的单个队列
    queue = LiveRequestQueue()

    # 2. 后台任务持续捕获用户输入
    async def capture_user_input():
        while True:
            # 你的从麦克风读取音频的逻辑
            audio_chunk = await microphone.read()
            queue.send_realtime(
                blob=types.Blob(data=audio_chunk, mime_type="audio/pcm")
            )

    input_task = asyncio.create_task(capture_user_input())

    try:
        # 3. 单个事件循环无缝处理所有智能体
        async for event in runner.run_live(
            user_id="user_123",
            session_id="session_456",
            live_request_queue=queue,
        ):
            # 事件跨智能体转换无缝流动
            current_agent = event.author

            # 处理音频和文本输出
            if event.content and event.content.parts:
                for part in event.content.parts:
                    # 检查音频数据
                    if part.inline_data and part.inline_data.mime_type.startswith("audio/"):
                        # 你的播放音频的逻辑
            await play_audio(part.inline_data.data)

                    # 检查文本数据
                    if part.text:
                        await display_text(f"[{current_agent}] {part.text}")

            # 不需要特殊的转换处理!

    finally:
        input_task.cancel()
        queue.close()

智能体转换期间的事件流

以下是智能体转换时你的应用程序看到的内容:

# 智能体 1(研究员)完成其工作
Event: author="researcher", text="I've gathered all the data."
Event: author="researcher", function_call: task_completed()
Event: author="researcher", function_response: task_completed

# --- 自动转换(对你的代码不可见) ---

# 智能体 2(作家)开始
Event: author="writer", text="Let me write the report based on the research..."
Event: author="writer", text=" The findings show..."
Event: author="writer", function_call: task_completed()
Event: author="writer", function_response: task_completed

# --- 自动转换 ---

# 智能体 3(审阅者)开始 - 序列中的最后一个智能体
Event: author="reviewer", text="Let me review the report..."
Event: author="reviewer", text="The report looks good. All done!"
Event: author="reviewer", function_call: task_completed()
Event: author="reviewer", function_response: task_completed

# --- 最后一个智能体完成:run_live() 退出 ---
# 你的 async for 循环在这里结束

设计原则

1. 单个事件循环

对序列中的所有智能体使用一个事件循环:

用法:

# ✅ 正确:一个循环处理所有智能体
async for event in runner.run_live(...):
    # 你的事件处理逻辑在这里
    await handle_event(event)  # 适用于智能体 1、智能体 2、智能体 3...

# ❌ 不正确:不要中断循环或创建多个循环
for agent in agents:
    async for event in runner.run_live(...):  # 错误!
        ...

2. 持久队列

同一个 LiveRequestQueue 服务于所有智能体:

# 用户输入流向当前活动的任何智能体
User speaks → Queue → Agent1 (researcher)
User speaks → Queue → Agent2 (writer)
User speaks → Queue → Agent3 (reviewer)

不要为每个智能体创建新队列:

# ❌ 不正确:每个智能体的新队列
for agent in agents:
    new_queue = LiveRequestQueue()  # 错误!

# ✅ 正确:整个工作流的单个队列
queue = LiveRequestQueue()
async for event in runner.run_live(live_request_queue=queue):
    ...

3. 智能体感知 UI(可选)

跟踪哪个智能体处于活动状态以获得更好的用户体验:

用法:

current_agent_name = None

async for event in runner.run_live(...):
    # 检测智能体转换
    if event.author and event.author != current_agent_name:
        current_agent_name = event.author
        # 你的更新 UI 指示器的逻辑
        await update_ui_indicator(f"Now: {current_agent_name}")

    # 你的事件处理逻辑在这里
    await handle_event(event)

4. 转换通知

可选地在智能体移交时通知用户:

用法:

async for event in runner.run_live(...):
    # 检测任务完成(转换信号)
    if event.content and event.content.parts:
        for part in event.content.parts:
            if (part.function_response and
                part.function_response.name == "task_completed"):
                # 你的显示转换通知的逻辑
                await display_notification(
                    f"✓ {event.author} completed. Handing off to next agent..."
                )
                continue

    # 你的事件处理逻辑在这里
    await handle_event(event)

关键区别:transfer_to_agent 与 task_completed

理解这两个函数有助于你选择正确的多智能体模式:

函数 智能体模式 run_live() 何时退出 用例
transfer_to_agent 协调器(动态路由) LiveRequestQueue.close() 根据意图将用户路由到专家
task_completed 顺序(管道) LiveRequestQueue.close() 或最后一个智能体的 task_completed 固定工作流:研究 → 写作 → 审阅

transfer_to_agent 示例:

# 协调器根据用户意图路由
User: "I need help with billing"
Event: author="coordinator", function_call: transfer_to_agent(agent_name="billing")
# 流继续与账单智能体 - 同一个 run_live() 循环
Event: author="billing", text="I can help with your billing question..."

task_completed 示例:

# 顺序工作流在管道中进行
Event: author="researcher", function_call: task_completed()
# 当前智能体退出,序列中的下一个智能体开始
Event: author="writer", text="Based on the research..."

最佳实践总结

实践 原因
使用单个事件循环 ADK 在内部处理转换
跨智能体保持队列活动 同一个队列服务于所有顺序智能体
跟踪 event.author 知道哪个智能体当前正在响应
不要重置会话/上下文 对话状态跨智能体持久存在
统一处理事件 所有智能体产生相同的事件类型
task_completed 发出转换信号 不要手动管理顺序流

SequentialAgent 设计确保了平滑的转换——你的应用程序只需看到来自不同智能体的连续事件流,并由 ADK 管理自动移交。

总结

在本部分中,你掌握了 ADK 双向流式架构中的事件处理。我们探讨了智能体生成的不同事件类型——文本响应、音频块、转录、工具调用和控制信号——并学习了如何有效地处理每种事件类型。你现在了解了如何处理中断和回合完成信号以实现自然的对话流,使用 Pydantic 的模型序列化为网络传输序列化事件,利用 ADK 的自动工具执行来简化智能体工作流,并访问 InvocationContext 以进行高级状态管理场景。有了这些事件处理模式,你就能够构建响应迅速的流式应用程序,为用户提供实时反馈。接下来,你将学习如何通过 RunConfig 配置复杂的流式行为,包括多模态交互、会话恢复和成本控制。


上一篇:第 2 部分 - 使用 LiveRequestQueue 发送消息 | 下一篇:第 4 部分 - 了解 RunConfig