ADK 双向流式处理开发指南:第 1 部分 - 介绍¶
欢迎来到使用 Agent Development Kit (ADK) 进行双向流式处理的世界。这篇文章将彻底改变你对 AI 智能体通信的理解,从传统的请求 - 响应模式转变为动态的、实时的对话,就像与真人交谈一样自然。
想象一下构建一个 AI 助手,它不仅仅是等你说完才回应,而是积极倾听,当你突然有想法时还可以在句子中间被打断。想象一下创建能够同时处理音频、视频和文本的客户支持机器人,并在整个对话过程中保持上下文。这就是双向流式处理的力量,ADK 让每个开发者都能使用这种技术。
1.1 什么是双向流式处理?¶
双向流式处理(Bidirectional streaming)代表了传统 AI 交互的根本性转变。它不再使用僵化的"询问 - 等待"模式,而是实现了实时的双向通信,人类和 AI 可以同时说话、听取和回应。这创造了自然的、类似人类的对话,具有即时响应和可以中断正在进行的交互的革命性能力。
想想发送电子邮件和打电话之间的区别。传统的 AI 交互就像电子邮件——你发送一条完整的消息,等待一个完整的回复,然后再发送另一条完整的消息。双向流式处理就像打电话——流畅、自然,能够打断、澄清和实时回应。
关键特征¶
这些特征将双向流式处理与传统 AI 交互区分开来,使其在创建引人入胜的用户体验方面具有独特的强大功能:
-
双向通信:无需等待完整响应的持续数据交换。用户和 AI 都可以在你还在说话时就开始回应你问题的前几个词,创造出真正对话式而非事务性的体验。
-
响应式中断:这可能是自然用户体验最重要的功能——用户可以在智能体回应中途用新输入打断它,就像在人类对话中一样。如果 AI 正在解释量子物理学,而你突然问"等等,什么是电子?",AI 会立即停止并解决你的问题。
-
最适合多模态:同时支持文本、音频和视频输入,创造丰富、自然的交互。用户可以在展示文档时说话,在语音通话期间输入后续问题,或者在不失去上下文的情况下无缝切换通信模式。
sequenceDiagram
participant Client as 用户
participant Agent as 智能体
Client->>Agent: "你好!"
Client->>Agent: "解释一下日本的历史"
Agent->>Client: "你好!"
Agent->>Client: "当然!日本的历史是..." (部分内容)
Client->>Agent: "啊,等等。"
Agent->>Client: "好的,我能帮你什么?" (interrupted = True)
与其他流式处理类型的区别¶
理解双向流式处理与其他方法的区别对于理解其独特价值至关重要。流式处理领域包括几种不同的模式,每种都服务于不同的用例:
流式处理类型比较
双向流式处理与其他流式处理方法有根本区别:
-
服务端流式处理:从服务器到客户端的单向数据流。就像观看实时视频流——你接收连续数据,但无法实时与其交互。对仪表板或实时信息流有用,但不适合对话。
-
令牌级流式处理:无中断的顺序文本令牌传递。AI 逐词生成响应,但你必须等待完成后才能发送新输入。就像实时观看某人打字一样——你看到它在形成,但无法打断。
-
双向流式处理:支持中断的完整双向通信。真正的对话式 AI,双方可以同时说话、听取和回应。这使得自然对话成为可能,你可以打断、澄清或在谈话中途改变话题。
真实世界应用¶
双向流式处理通过使智能体能够以类似人类的响应性和智能进行操作,从而彻底改变了智能体 AI 应用。这些应用展示了流式处理如何将静态 AI 交互转变为动态的、智能体驱动的体验,感觉真正智能和主动。
在 购物礼宾演示 的视频中,多模态双向流式处理功能通过实现更快、更直观的购物体验,显著改善了电子商务的用户体验。对话理解和快速并行搜索的结合最终实现了虚拟试穿等高级功能,提高了买家信心并减少了在线购物的摩擦。
另外,你可以想象双向流式处理的许多可能的现实世界应用:
-
客户服务和联络中心:这是最直接的应用。该技术可以创建远超传统聊天机器人的复杂虚拟智能体。
- 用例:客户致电零售公司的支持热线询问有缺陷的产品。
- 多模态(视频):客户可以说:"我的咖啡机底部在漏水,让我给你看看。"然后他们可以使用手机摄像头实时传输问题的视频。AI 智能体可以使用其视觉能力来识别型号和具体的故障点。
- 实时交互和中断:如果智能体说:"好的,我正在为你的 Model X 咖啡机处理退货",客户可以打断说:"不,等等,是 Model Y Pro",智能体可以立即纠正其路线,而无需重新开始对话。
-
现场服务和技术协助:现场工作的技术人员可以使用免提、语音激活的助手来获得实时帮助。
- 用例:一名 HVAC 技术人员在现场尝试诊断复杂的商用空调机组。
- 多模态(视频和语音):技术人员戴着智能眼镜或使用手机,可以将他们的视角流式传输给 AI 智能体。他们可以问:"我听到这个压缩机发出奇怪的噪音。你能识别它并调出这个型号的诊断流程图吗?"
- 实时交互:智能体可以逐步指导技术人员,技术人员可以随时提出澄清问题或打断,而无需把手从工具上拿开。
-
医疗保健和远程医疗:智能体可以作为患者接收、分诊和基本咨询的第一接触点。
- 用例:患者使用医疗机构的应用程序就皮肤病进行初步咨询。
- 多模态(视频/图像):患者可以安全地分享皮疹的实时视频或高分辨率图像。AI 可以进行初步分析并提出澄清问题。
-
金融服务和财富管理:智能体可以为客户提供安全、交互式、数据丰富的财务管理方式。
- 用例:客户想要审查他们的投资组合并讨论市场趋势。
- 多模态(屏幕共享):智能体可以共享其屏幕来显示图表、图形和投资组合绩效数据。客户也可以共享他们的屏幕指向特定的新闻文章并问:"这个事件对我的科技股有什么潜在影响?"
- 实时交互:通过访问客户的账户数据分析客户当前的投资组合配置。模拟潜在交易对投资组合风险状况的影响。
1.2 ADK 双向流式处理架构概述¶
ADK 双向流式处理架构使双向 AI 对话感觉像人类对话一样自然。该架构通过一个为低延迟和高吞吐量通信而设计的复杂流水线,与 Google 的 Gemini Live API 无缝集成。
该系统处理实时流式处理所需的复杂编排——管理多个并发数据流、优雅地处理中断、同时处理多模态输入,以及在动态交互中维护对话状态。ADK 双向流式处理将这种复杂性抽象为简单、直观的 API,开发者可以使用而无需理解流式处理协议或 AI 模型通信模式的复杂细节。
高级架构¶
graph TB
subgraph "应用程序"
subgraph "客户端"
C1["Web / 移动端"]
end
subgraph "传输层"
T1["WebSocket / SSE (例如 FastAPI)"]
end
end
subgraph "ADK"
subgraph "ADK 双向流式处理"
L1[LiveRequestQueue]
L2[Runner]
L3[Agent]
L4[LLM Flow]
end
subgraph "LLM 集成"
G1[GeminiLlmConnection]
G2[Gemini Live API]
end
end
C1 <--> T1
T1 -->|"live_request_queue.send()"| L1
L1 -->|"runner.run_live(queue)"| L2
L2 -->|"agent.run_live()"| L3
L3 -->|"_llm_flow.run_live()"| L4
L4 -->|"llm.connect()"| G1
G1 <--> G2
G1 -->|"yield LlmResponse"| L4
L4 -->|"yield Event"| L3
L3 -->|"yield Event"| L2
L2 -->|"yield Event"| T1
classDef external fill:#e1f5fe,stroke:#01579b,stroke-width:2px
classDef adk fill:#f3e5f5,stroke:#4a148c,stroke-width:2px
class C1,T1,L3 external
class L1,L2,L4,G1,G2 adk
开发者提供: | ADK 提供: | Gemini 提供: |
---|---|---|
Web / 移动端:用户交互的前端应用程序,处理 UI/UX、用户输入捕获和响应显示 WebSocket / SSE 服务器:实时通信服务器(如 FastAPI),管理客户端连接、处理流式处理协议,并在客户端和 ADK 之间路由消息 智能体:针对你应用程序需求量身定制的自定义 AI 智能体定义,包含特定指令、工具和行为 |
LiveRequestQueue:消息队列,缓冲和排序传入的用户消息(文本内容、音频块、控制信号)以供智能体有序处理 Runner:执行引擎,编排智能体会话、管理对话状态,并提供 run_live() 流式处理接口LLM Flow:处理流式对话逻辑、管理上下文并与语言模型协调的处理流水线 GeminiLlmConnection:抽象层,连接 ADK 的流式处理架构与 Gemini Live API,处理协议转换和连接管理 |
Gemini Live API:Google 的实时语言模型服务,处理流式输入、生成响应、处理中断、支持多模态内容(文本、音频、视频),并提供函数调用和上下文理解等高级 AI 功能 |
1.3 设置你的开发环境¶
现在你了解了 ADK 双向流式处理架构的要点及其提供的价值,是时候获得实际操作经验了。本节将准备你的开发环境,这样你就可以开始构建前面章节描述的流式处理智能体和应用程序。
在这个设置结束时,你将拥有创建智能语音助手、主动客户支持智能体和多智能体协作平台所需的一切。设置过程很简单——ADK 处理复杂的流式处理基础设施,因此你可以专注于构建智能体的独特功能,而不是与低级流式处理协议斗争。
安装步骤¶
1. 创建虚拟环境(推荐)¶
# 创建虚拟环境
python -m venv .venv
# 激活虚拟环境
# macOS/Linux:
source .venv/bin/activate
# Windows CMD:
# .venv\Scripts\activate.bat
# Windows PowerShell:
# .venv\Scripts\Activate.ps1
2. 安装 ADK¶
在项目根目录创建 requirements.txt
文件。注意 google-adk
库包含 FastAPI 和 uvicorn,你可以将它们用作双向流式处理应用程序的 Web 服务器。
安装所有依赖项:
3. 设置 SSL 证书路径(仅限 macOS)¶
4. 设置 API 密钥¶
选择你运行智能体的首选平台:
- 从 Google AI Studio 获取 API 密钥
- 在项目根目录创建
.env
文件:
- 设置 Google Cloud 项目
- 安装并配置 gcloud CLI
- 认证:
gcloud auth login
- 启用 Vertex AI API
- 在项目根目录创建
.env
文件:
5. 创建环境设置脚本¶
我们将创建验证脚本来验证你的安装:
创建 src/part1/1-3-1_environment_setup.py
:
#!/usr/bin/env python3
"""
第 1.3.1 部分:环境设置验证
验证 ADK 流式处理环境配置的综合脚本。
"""
import os
import sys
from pathlib import Path
from dotenv import load_dotenv
def validate_environment():
"""验证 ADK 流式处理环境设置。"""
print("🔧 ADK 流式处理环境验证")
print("=" * 45)
# 加载环境变量
env_path = Path(__file__).parent.parent.parent / '.env'
if env_path.exists():
load_dotenv(env_path)
print(f"✓ 环境文件已加载:{env_path}")
else:
print(f"❌ 环境文件未找到:{env_path}")
return False
# 检查 Python 版本
python_version = sys.version_info
if python_version >= (3, 8):
print(f"✓ Python 版本:{python_version.major}.{python_version.minor}.{python_version.micro}")
else:
print(f"❌ Python 版本 {python_version.major}.{python_version.minor} - 需要 3.8+")
return False
# 测试 ADK 安装
try:
import google.adk
print(f"✓ ADK 导入成功")
# 如果可用,尝试获取版本
try:
from google.adk.version import __version__
print(f"✓ ADK 版本:{__version__}")
except:
print("ℹ️ ADK 版本信息不可用")
except ImportError as e:
print(f"❌ ADK 导入失败:{e}")
return False
# 检查基本导入
essential_imports = [
('google.adk.agents', 'Agent, LiveRequestQueue'),
('google.adk.runners', 'InMemoryRunner'),
('google.genai.types', 'Content, Part, Blob'),
]
for module, components in essential_imports:
try:
__import__(module)
print(f"✓ 导入:{module}")
except ImportError as e:
print(f"❌ 导入失败:{module} - {e}")
return False
# 验证环境变量
env_checks = [
('GOOGLE_GENAI_USE_VERTEXAI', '平台配置'),
('GOOGLE_API_KEY', 'API 认证'),
]
for env_var, description in env_checks:
value = os.getenv(env_var)
if value:
# 为安全起见屏蔽 API 密钥
display_value = value if env_var != 'GOOGLE_API_KEY' else f"{value[:10]}..."
print(f"✓ {description}:{display_value}")
else:
print(f"❌ 缺失:{env_var} ({description})")
return False
# 测试基本 ADK 功能
try:
from google.adk.agents import LiveRequestQueue
from google.genai.types import Content, Part
# 创建测试队列
queue = LiveRequestQueue()
test_content = Content(parts=[Part(text="测试消息")])
queue.send_content(test_content)
queue.close()
print("✓ 基本 ADK 功能测试通过")
except Exception as e:
print(f"❌ ADK 功能测试失败:{e}")
return False
print("\n🎉 环境验证成功!")
print("\n下一步:")
print("• 开始在 src/agents/ 中构建你的流式处理智能体")
print("• 在 src/tools/ 中创建自定义工具")
print("• 在 src/utils/ 中添加实用功能")
print("• 使用第 3 部分示例进行测试")
return True
def main():
"""运行环境验证。"""
try:
success = validate_environment()
sys.exit(0 if success else 1)
except KeyboardInterrupt:
print("\n\n⚠️ 验证被用户中断")
sys.exit(1)
except Exception as e:
print(f"\n❌ 意外错误:{e}")
sys.exit(1)
if __name__ == "__main__":
main()
项目结构¶
现在你的流式处理项目应该具有以下结构:
your-streaming-project/
├── .env # 环境变量(API 密钥)
├── requirements.txt # Python 依赖项
└── src/
└── part1/
└── 1-3-1_environment_setup.py # 环境验证脚本
运行它¶
使用我们完整的环境设置脚本来确保一切配置正确:
预期输出
当你运行验证脚本时,你应该看到类似这样的输出:
🔧 ADK 流式处理环境验证
=============================================
✓ 环境文件已加载:/path/to/your-streaming-project/.env
✓ Python 版本:3.12.8
✓ ADK 导入成功
✓ ADK 版本:1.3.0
✓ 导入:google.adk.agents
✓ 导入:google.adk.runners
✓ 导入:google.genai.types
✓ 平台配置:FALSE
✓ API 认证:AIzaSyAolZ...
✓ 基本 ADK 功能测试通过
🎉 环境验证成功!
这个综合验证脚本检查:
- ADK 安装和版本
- 所需的环境变量
- API 密钥验证
- 基本导入验证
下一步¶
设置好环境后,你就可以深入了解核心流式处理 API 了。在下一部分(即将推出),你将学习:
- LiveRequestQueue:双向通信的核心
- run_live() 方法:启动流式处理会话
- 事件处理:处理实时响应
- Gemini Live API:直接集成模式