第 2 部分:使用 LiveRequestQueue 发送消息¶
在第 1 部分中,你了解了 ADK 双向流式处理应用程序的四阶段生命周期。本部分重点介绍上行流——你的应用程序如何使用 LiveRequestQueue 向智能体发送消息。
与不同消息类型需要不同端点或通道的传统 API 不同,ADK 通过 LiveRequestQueue 及其 LiveRequest 消息模型提供了一个单一的统一接口。本部分涵盖:
- 消息类型:通过
send_content()发送文本,通过send_realtime()流式传输音频/图像/视频,使用活动信号控制对话轮次,以及使用控制信号优雅地终止会话 - 并发模式:理解异步队列管理和事件循环线程安全
- 最佳实践:在异步上下文中创建队列,确保适当的资源清理,并理解消息顺序保证
- 故障排除:诊断常见问题,如消息未被处理和队列生命周期问题
理解 LiveRequestQueue 对于构建能够在异步事件循环中无缝处理多模态输入的响应迅速的流式处理应用程序至关重要。
LiveRequestQueue 和 LiveRequest¶
LiveRequestQueue 是你在流式对话中向智能体发送消息的主要接口。ADK 不再管理用于文本、音频和控制信号的单独通道,而是提供了一个统一的 LiveRequest 容器,通过单个优雅的 API 处理所有消息类型:
class LiveRequest(BaseModel):
content: Optional[Content] = None # 基于文本的内容和结构化数据
blob: Optional[Blob] = None # 音频/视频数据和二进制流
activity_start: Optional[ActivityStart] = None # 信号:用户活动开始
activity_end: Optional[ActivityEnd] = None # 信号:用户活动结束
close: bool = False # 优雅的连接终止信号
这种简化的设计处理了你将遇到的每个流式处理场景。content 和 blob 字段处理不同的数据类型,activity_start 和 activity_end 字段启用活动信号,close 标志提供优雅的终止语义。
content 和 blob 字段是互斥的——每个 LiveRequest 只能设置一个。虽然 ADK 不在客户端强制执行此操作,如果设置了两个都会尝试发送,但 Live API 后端会以验证错误拒绝此操作。ADK 的便捷方法 send_content() 和 send_realtime() 自动确保满足此约束,仅设置一个字段,因此使用这些方法(而不是手动创建 LiveRequest 对象)是推荐的方法。
下图说明了不同消息类型如何从你的应用程序通过 LiveRequestQueue 方法流向 LiveRequest 容器,最后流向 Live API:
graph LR
subgraph "Application"
A1["用户文本输入"]
A2["音频流"]
A3["活动信号"]
A4["关闭信号"]
end
subgraph "LiveRequestQueue Methods"
B1[send_content<br/>Content]
B2[send_realtime<br/>Blob]
B3[send_activity_start<br/>ActivityStart]
B3b[send_activity_end<br/>ActivityEnd]
B4[close<br/>close=True]
end
subgraph "LiveRequest Container"
C1[content: Content]
C2[blob: Blob]
C3[activity_start/end]
C4[close: bool]
end
subgraph "Gemini Live API"
D[WebSocket 连接]
end
A1 --> B1 --> C1 --> D
A2 --> B2 --> C2 --> D
A3 --> B3 --> C3 --> D
A3 --> B3b --> C3
A4 --> B4 --> C4 --> D
发送不同类型的消息¶
LiveRequestQueue 提供了向智能体发送不同消息类型的便捷方法。本节演示了文本消息、音频/视频流式传输、用于手动轮次控制的活动信号以及会话终止的实用模式。
send_content(): 发送带有轮次控制的文本¶
send_content() 方法以轮次模式发送文本消息,其中每条消息代表一个离散的对话轮次。这向模型发出完整轮次的信号,触发立即响应生成。
content = types.Content(parts=[types.Part(text=json_message["text"])])
live_request_queue.send_content(content)
在 ADK 双向流式处理中使用 Content 和 Part:
-
Content(google.genai.types.Content):代表对话中单个消息或轮次的容器。它包含组成完整消息的Part对象数组。 -
Part(google.genai.types.Part):消息中的单个内容片段。对于使用 Live API 的 ADK 双向流式处理,你将使用: text:发送给模型的文本内容(包括代码)
在实践中,大多数 ADK 双向流式处理消息使用单个文本 Part。多部分结构设计用于如下场景: - 混合文本与函数响应(由 ADK 自动处理) - 将文本解释与结构化数据结合 - 新内容类型的未来可扩展性
对于 Live API,多模态输入(音频/视频)使用不同的机制(见下面的 send_realtime()),而不是多部分 Content。
ADK 双向流式处理中的 Content 和 Part 用法
虽然 Gemini API Part 类型支持许多字段(inline_data、file_data、function_call、function_response 等),但大多数要么由 ADK 自动处理,要么在 Live API 中使用不同的机制:
- 函数调用:ADK 自动处理函数调用循环——从模型接收函数调用,执行你注册的函数,并发回响应。你不需要手动构建这些。
- 图像/视频:不要将
send_content()与inline_data一起使用。相反,使用send_realtime(Blob(mime_type="image/jpeg", data=...))进行连续流式传输。见 第 5 部分:如何使用图像和视频。
send_realtime(): 实时发送音频、图像和视频¶
send_realtime() 方法发送二进制数据流——主要是音频、图像和视频——通过 Blob 类型流动,该类型处理实时模式下的传输。与在轮次模式下处理的文本内容不同,blob 专为数据分块到达的连续流式处理场景而设计。你提供原始字节,Pydantic 在 JSON 序列化期间自动处理 base64 编码以进行安全网络传输(在 LiveRequest.model_config 中配置)。MIME 类型帮助模型理解内容格式。
audio_blob = types.Blob(
mime_type="audio/pcm;rate=16000",
data=audio_data
)
live_request_queue.send_realtime(audio_blob)
了解更多
有关音频、图像和视频规格、格式和最佳实践的完整详细信息,请参阅 第 5 部分:如何使用音频、图像和视频。
活动信号¶
活动信号 (ActivityStart/ActivityEnd) 仅当在你的 RunConfig 中显式禁用自动(服务端)语音活动检测时才能发送。当你的应用程序需要手动语音活动控制时使用它们,例如:
- 一键通界面:用户显式控制何时说话(例如,按住按钮)
- 嘈杂环境:背景噪音使自动 VAD 不可靠,因此你使用客户端 VAD 或手动控制
- 客户端 VAD:你在客户端实现自己的 VAD 算法,通过仅在检测到语音时发送音频来减少网络开销
- 自定义交互模式:非语音场景,如手势触发的交互或定时音频片段
活动信号告诉模型什么:
ActivityStart:"用户正在说话 - 开始累积音频以进行处理"ActivityEnd:"用户已说完话 - 处理累积的音频并生成响应"
没有这些信号(当 VAD 被禁用时),模型不知道何时开始/停止监听语音,因此你必须显式标记轮次边界。
发送活动信号:
from google.genai import types
# 手动活动信号模式(例如,一键通)
live_request_queue.send_activity_start() # 信号:用户开始说话
# 当用户按住通话按钮时流式传输音频块
while user_is_holding_button:
audio_blob = types.Blob(mime_type="audio/pcm;rate=16000", data=audio_chunk)
live_request_queue.send_realtime(audio_blob)
live_request_queue.send_activity_end() # 信号:用户停止说话
默认行为(自动 VAD): 如果你不发送活动信号,Live API 的内置 VAD 会自动检测你通过 send_realtime() 发送的音频流中的语音边界。这是大多数应用程序的推荐方法。
了解更多
有关自动 VAD 与手动活动信号的详细比较,包括何时禁用 VAD 和最佳实践,请参阅 第 5 部分:语音活动检测。
控制信号¶
close 信号为流式处理会话提供优雅的终止语义。它向系统发出信号,干净地关闭模型连接并结束双向流式处理。在 ADK 双向流式处理中,你的应用程序负责显式发送 close 信号:
BIDI 模式下的手动关闭: 当使用 StreamingMode.BIDI(双向流式处理)时,你的应用程序应在会话终止或发生错误时手动调用 close()。这种做法可以最大限度地减少会话资源使用。
SSE 模式下的自动关闭: 当使用旧版 StreamingMode.SSE(非双向流式处理)时,ADK 在收到来自模型的 turn_complete=True 事件时会自动在队列上调用 close()(见 base_llm_flow.py:754)。
见 第 4 部分:理解 RunConfig 以获取详细比较以及何时使用每种模式。
try:
logger.debug("Starting asyncio.gather for upstream and downstream tasks")
await asyncio.gather(
upstream_task(),
downstream_task()
)
logger.debug("asyncio.gather completed normally")
except WebSocketDisconnect:
logger.debug("Client disconnected normally")
except Exception as e:
logger.error(f"Unexpected error in streaming tasks: {e}", exc_info=True)
finally:
# 始终关闭队列,即使发生异常
logger.debug("Closing live_request_queue")
live_request_queue.close()
如果你不调用 close() 会发生什么?
虽然 ADK 会自动清理本地资源,但在 BIDI 模式下未能调用 close() 会阻止向 Live API 发送优雅的终止信号,Live API 将在一定的超时时间后收到突然断开连接。这可能导致“僵尸”Live API 会话保留在云服务上,即使你的应用程序已完成它们。这些滞留的会话可能会显著减少你的应用程序可以处理的并发会话数量,因为它们会继续计入你的配额限制,直到它们最终超时。
了解更多
有关流式处理期间的全面错误处理模式,包括何时使用 break 与 continue 以及处理不同错误类型,请参阅 第 3 部分:错误事件。
并发和线程安全¶
理解 LiveRequestQueue 如何处理并发对于构建可靠的流式处理应用程序至关重要。队列建立在 asyncio.Queue 之上,这意味着它在同一事件循环线程(常见情况)内并发访问是安全的,但在从不同线程(高级情况)调用时需要特殊处理。本节解释了 LiveRequestQueue API 背后的设计选择,何时可以在没有额外预防措施的情况下安全地使用它,以及何时需要像 loop.call_soon_threadsafe() 这样的线程安全机制。
异步队列管理¶
LiveRequestQueue 使用同步方法(send_content()、send_realtime())而不是异步方法,即使底层队列是异步消费的。这种设计选择使用了 asyncio.Queue.put_nowait() - 一种不需要 await 的非阻塞操作。
为什么使用同步发送方法? 方便和简单。你可以在异步代码的任何地方调用它们而无需 await:
async def upstream_task() -> None:
"""从 WebSocket 接收消息并发送到 LiveRequestQueue。"""
while True:
message = await websocket.receive()
if "bytes" in message:
audio_data = message["bytes"]
audio_blob = types.Blob(
mime_type="audio/pcm;rate=16000",
data=audio_data
)
live_request_queue.send_realtime(audio_blob)
elif "text" in message:
text_data = message["text"]
json_message = json.loads(text_data)
if json_message.get("type") == "text":
content = types.Content(parts=[types.Part(text=json_message["text"])])
live_request_queue.send_content(content)
这种模式自然地混合了异步 I/O 操作和同步 CPU 操作。发送方法立即返回而不阻塞,允许你的应用程序保持响应。
最佳实践:在异步上下文中创建队列¶
始终在异步上下文(异步函数或协程)中创建 LiveRequestQueue,以确保它使用正确的事件循环:
# ✅ 推荐 - 在异步上下文中创建
async def main():
queue = LiveRequestQueue() # 使用来自异步上下文的现有事件循环
# 这是首选模式 - 确保队列使用将运行流式处理操作的正确事件循环
# that will run your streaming operations
# ❌ 不推荐 - 自动创建事件循环
queue = LiveRequestQueue() # 有效但 ADK 自动创建新循环
# 这由于 ADK 的安全机制而有效,但在复杂应用程序或多线程场景中可能会导致循环协调问题
# loop coordination in complex applications or multi-threaded scenarios
这为什么重要: LiveRequestQueue 需要在实例化时存在事件循环。ADK 包含一个安全机制,如果不存在则自动创建循环,但在多线程场景或自定义事件循环配置中依赖此机制可能会导致意外行为。
消息顺序保证¶
LiveRequestQueue 提供可预测的消息传递行为:
| 保证 | 描述 | 影响 |
|---|---|---|
| FIFO 排序 | 按发送顺序处理消息(由底层 asyncio.Queue 保证) |
维护对话上下文和交互一致性 |
| 无合并 | 每条消息独立传递 | 无自动批处理——每个发送操作创建一个请求 |
| 默认无界 | 队列接受无限消息而不阻塞 | 好处:简化客户端代码(发送时不阻塞) 风险:如果发送速度快于处理速度,内存会增长 缓解:在生产中监控队列深度 |
生产提示:对于高吞吐量音频/视频流式处理,监控
live_request_queue._queue.qsize()以检测背压。如果队列深度持续增长,请减慢发送速率或实施批处理。注意:_queue是一个内部属性,可能会在未来版本中更改;请谨慎使用。
总结¶
在本部分中,你了解了 LiveRequestQueue 如何提供统一接口,用于在异步事件循环中向 ADK 流式处理智能体发送消息。我们涵盖了 LiveRequest 消息模型,并探索了如何发送不同消息类型:通过 send_content() 发送文本内容,通过 send_realtime() 发送音频/视频 blob,用于手动轮次控制的活动信号,以及通过 close() 发送用于优雅终止的控制信号。你还了解了异步队列管理的最佳实践,在异步上下文中创建队列,资源清理和消息顺序。你现在了解了如何在双向流式处理应用程序中将 LiveRequestQueue 用作上行通信通道,使用户能够并发发送消息并接收智能体响应。接下来,你将学习如何处理下行流——处理智能体响应这些消息而生成的事件。
← 上一篇:第 1 部分 - ADK 双向流式处理介绍 | 下一篇:第 3 部分 - 使用 run_live() 处理事件 →