ADK 流式处理快速入门¶
通过本快速入门,你将学习创建一个简单的智能体,并使用 ADK 流式处理来实现低延迟、双向的语音和视频通信。我们将安装 ADK,设置一个基本的"Google 搜索"智能体,尝试使用 adk web
工具进行流式处理运行智能体,然后解释如何使用 ADK 流式处理和 FastAPI 自己构建一个简单的异步 web 应用。
注意: 本指南假设你有在 Windows、Mac 和 Linux 环境中使用终端的经验。
支持语音/视频流式处理的模型¶
为了在 ADK 中使用语音/视频流式处理,你需要使用支持 Live API 的 Gemini 模型。你可以在文档中找到支持 Gemini Live API 的模型 ID:
1. 设置环境并安装 ADK¶
创建并激活虚拟环境(推荐):
# 创建
python -m venv .venv
# 激活(每次打开新终端时)
# macOS/Linux: source .venv/bin/activate
# Windows CMD: .venv\Scripts\activate.bat
# Windows PowerShell: .venv\Scripts\Activate.ps1
安装 ADK:
2. 项目结构¶
创建以下带有空文件的文件夹结构:
adk-streaming/ # 项目文件夹
└── app/ # web 应用文件夹
├── .env # Gemini API 密钥
└── google_search_agent/ # 智能体文件夹
├── __init__.py # Python 包
└── agent.py # 智能体定义
agent.py¶
将以下代码块复制粘贴到 agent.py
。
对于 model
,请按照前面模型部分中的描述仔细检查模型 ID。
from google.adk.agents import Agent
from google.adk.tools import google_search # 导入工具
root_agent = Agent(
# 智能体的唯一名称
name="basic_search_agent",
# 智能体将使用的大型语言模型 (LLM)
model="gemini-2.0-flash-exp", # Google AI Studio
#model="gemini-2.0-flash-live-preview-04-09" # Vertex AI Studio
# 智能体目的的简短描述
description="Agent to answer questions using Google Search.",
# 设置智能体行为的指令
instruction="You are an expert researcher. You always stick to the facts.",
# 添加 google_search 工具,使用 Google 搜索进行接地
tools=[google_search]
)
注意: 要启用文本和音频/视频输入,模型必须支持 generateContent(用于文本)和 bidiGenerateContent 方法。通过参考 Gemini 模型列表 (models.list) 文档 验证这些功能。此快速入门使用了 gemini-2.0-flash-exp 模型进行演示。
agent.py
是存储所有智能体逻辑的地方,你必须定义一个 root_agent
。
注意你是如何轻松集成 使用 Google 搜索进行基础信息获取 功能的。Agent
类和 google_search
工具处理与 LLM 的复杂交互以及与搜索 API 的基础信息获取,让你可以专注于智能体的目的和行为。
将以下代码块复制粘贴到 __init__.py
和 main.py
文件中。
3. 设置平台¶
要运行智能体,请从 Google AI Studio 或 Google Cloud Vertex AI 中选择一个平台:
- 从 Google AI Studio 获取 API 密钥。
-
打开位于 (
app/
) 中的.env
文件并复制粘贴以下代码。 -
将
PASTE_YOUR_ACTUAL_API_KEY_HERE
替换为你的实际API KEY
。
- 你需要一个现有的 Google Cloud 账户和项目。
- 设置 Google Cloud 项目
- 设置 gcloud CLI
- 通过在终端运行
gcloud auth login
来验证 Google Cloud。 - 启用 Vertex AI API。
-
打开位于 (
app/
) 中的.env
文件。复制粘贴以下代码并更新项目 ID 和位置。
4. 使用 adk web
尝试智能体¶
现在可以尝试智能体了。运行以下命令来启动开发 UI。首先,确保将当前目录设置为 app
:
同时,使用以下命令设置 SSL_CERT_FILE
变量。这是后面进行语音和视频测试所必需的。
然后,运行开发 UI:
直接在浏览器中打开提供的 URL(通常是 http://localhost:8000
或 http://127.0.0.1:8000
)。此连接完全保持在你的本地机器上。选择 basic_search_agent
。
尝试文本¶
通过在 UI 中输入以下提示来尝试。
- 纽约的天气如何?
- 纽约现在几点?
- 巴黎的天气如何?
- 巴黎现在几点?
智能体将使用 google_search 工具获取最新信息来回答这些问题。
尝试语音和视频¶
要尝试语音功能,重新加载网页浏览器,点击麦克风按钮启用语音输入,并用语音提出相同的问题。你将实时听到回答。
要尝试视频功能,重新加载网页浏览器,点击摄像头按钮启用视频输入,并提出类似"你看到什么?"的问题。智能体将回答它在视频输入中看到的内容。
停止工具¶
通过在控制台按 Ctrl-C
停止 adk web
。
ADK 流式处理说明¶
以下功能将在未来版本的 ADK 流式处理中支持:回调、长时间运行工具、示例工具和 Shell 智能体(如 SequentialAgent)。
5. 构建自定义流式处理应用(可选)¶
在上一节中,我们已经使用 adk web
工具检查了我们的基本搜索智能体是否可以与 ADK 流式处理一起工作。在本节中,我们将学习如何使用 FastAPI 构建你自己的能够进行流式通信的 Web 应用。
在 app
下添加 static
目录,并添加 main.py
和 index.html
作为空文件,如下结构所示:
adk-streaming/ # 项目文件夹
└── app/ # web 应用文件夹
├── main.py # FastAPI web 应用
└── static/ # 静态内容文件夹
└── index.html # Web 客户端页面
通过添加上述目录和文件,整个目录结构和文件将如下所示:
adk-streaming/ # Project folder
└── app/ # the web app folder
├── main.py # FastAPI web app
├── static/ # Static content folder
| └── index.html # The web client page
├── .env # Gemini API key
└── google_search_agent/ # Agent folder
├── __init__.py # Python package
└── agent.py # Agent definition
main.py
将以下代码块复制粘贴到 main.py 文件中。
import os
import json
import asyncio
from pathlib import Path
from dotenv import load_dotenv
from google.genai.types import (
Part,
Content,
)
from google.adk.runners import Runner
from google.adk.agents import LiveRequestQueue
from google.adk.agents.run_config import RunConfig
from google.adk.sessions.in_memory_session_service import InMemorySessionService
from fastapi import FastAPI, WebSocket
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from google_search_agent.agent import root_agent
#
# ADK 流式处理
#
# 加载 Gemini API 密钥
load_dotenv()
APP_NAME = "ADK 流式处理示例"
session_service = InMemorySessionService()
def start_agent_session(session_id: str):
"""启动智能体会话"""
# 创建会话
session = session_service.create_session(
app_name=APP_NAME,
user_id=session_id,
session_id=session_id,
)
# 创建运行器
runner = Runner(
app_name=APP_NAME,
agent=root_agent,
session_service=session_service,
)
# 设置响应模态 = TEXT
run_config = RunConfig(response_modalities=["TEXT"])
# 为此会话创建 LiveRequestQueue
live_request_queue = LiveRequestQueue()
# 启动智能体会话
live_events = runner.run_live(
session=session,
live_request_queue=live_request_queue,
run_config=run_config,
)
return live_events, live_request_queue
async def agent_to_client_messaging(websocket, live_events):
"""智能体到客户端通信"""
while True:
async for event in live_events:
# turn_complete
if event.turn_complete:
await websocket.send_text(json.dumps({"turn_complete": True}))
print("[回合完成]")
if event.interrupted:
await websocket.send_text(json.dumps({"interrupted": True}))
print("[已中断]")
# 读取 Content 及其第一个 Part
part: Part = (
event.content and event.content.parts and event.content.parts[0]
)
if not part or not event.partial:
continue
# 获取文本
text = event.content and event.content.parts and event.content.parts[0].text
if not text:
continue
# 将文本发送给客户端
await websocket.send_text(json.dumps({"message": text}))
print(f"[智能体到客户端]: {text}")
await asyncio.sleep(0)
async def client_to_agent_messaging(websocket, live_request_queue):
"""客户端到智能体通信"""
while True:
text = await websocket.receive_text()
content = Content(role="user", parts=[Part.from_text(text=text)])
live_request_queue.send_content(content=content)
print(f"[客户端到智能体]: {text}")
await asyncio.sleep(0)
#
# FastAPI web 应用
#
app = FastAPI()
STATIC_DIR = Path("static")
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
@app.get("/")
async def root():
"""提供 index.html"""
return FileResponse(os.path.join(STATIC_DIR, "index.html"))
@app.websocket("/ws/{session_id}")
async def websocket_endpoint(websocket: WebSocket, session_id: int):
"""客户端 websocket 端点"""
# 等待客户端连接
await websocket.accept()
print(f"客户端 #{session_id} 已连接")
# 启动智能体会话
session_id = str(session_id)
live_events, live_request_queue = start_agent_session(session_id)
# 启动任务
agent_to_client_task = asyncio.create_task(
agent_to_client_messaging(websocket, live_events)
)
client_to_agent_task = asyncio.create_task(
client_to_agent_messaging(websocket, live_request_queue)
)
await asyncio.gather(agent_to_client_task, client_to_agent_task)
# 已断开连接
print(f"客户端 #{session_id} 已断开连接")
这段代码使用 ADK 和 FastAPI 创建了一个实时聊天应用。它设置了一个 WebSocket 端点,客户端可以在此连接并与 Google 搜索智能体交互。
主要功能:
- 加载 Gemini API 密钥。
- 使用 ADK 管理智能体会话并运行
google_search_agent
。 start_agent_session
初始化一个智能体会话,带有用于实时通信的实时请求队列。agent_to_client_messaging
异步地将智能体的文本响应和状态更新(回合完成、已中断)流式传输到已连接的 WebSocket 客户端。client_to_agent_messaging
异步接收来自 WebSocket 客户端的文本消息,并将其作为用户输入发送给智能体。- FastAPI 提供静态前端并在
/ws/{session_id}
处理 WebSocket 连接。 - 当客户端连接时,它启动一个智能体会话并创建并发任务,通过 WebSocket 在客户端和智能体之间进行双向通信。
将以下代码块复制粘贴到 index.html
文件中。
<!doctype html>
<html>
<head>
<title>ADK 流式处理测试</title>
</head>
<body>
<h1>ADK 流式处理测试</h1>
<div
id="messages"
style="height: 300px; overflow-y: auto; border: 1px solid black"></div>
<br />
<form id="messageForm">
<label for="message">消息:</label>
<input type="text" id="message" name="message" />
<button type="submit" id="sendButton" disabled>发送</button>
</form>
</body>
<script>
// 使用 WebSocket 连接连接服务器
const sessionId = Math.random().toString().substring(10);
const ws_url = "ws://" + window.location.host + "/ws/" + sessionId;
let ws = new WebSocket(ws_url);
// 获取 DOM 元素
const messageForm = document.getElementById("messageForm");
const messageInput = document.getElementById("message");
const messagesDiv = document.getElementById("messages");
let currentMessageId = null;
// WebSocket 处理程序
function addWebSocketHandlers(ws) {
ws.onopen = function () {
console.log("WebSocket 连接已打开。");
document.getElementById("sendButton").disabled = false;
document.getElementById("messages").textContent = "连接已打开";
addSubmitHandler(this);
};
ws.onmessage = function (event) {
// 解析传入的消息
const packet = JSON.parse(event.data);
console.log(packet);
// 检查回合是否完成
// 如果回合完成,添加新消息
if (packet.turn_complete && packet.turn_complete == true) {
currentMessageId = null;
return;
}
// 为新回合添加新消息
if (currentMessageId == null) {
currentMessageId = Math.random().toString(36).substring(7);
const message = document.createElement("p");
message.id = currentMessageId;
// 将消息元素附加到 messagesDiv
messagesDiv.appendChild(message);
}
// 将消息文本添加到现有消息元素
const message = document.getElementById(currentMessageId);
message.textContent += packet.message;
// 滚动到 messagesDiv 底部
messagesDiv.scrollTop = messagesDiv.scrollHeight;
};
// 当连接关闭时,尝试重新连接
ws.onclose = function () {
console.log("WebSocket 连接已关闭。");
document.getElementById("sendButton").disabled = true;
document.getElementById("messages").textContent = "连接已关闭";
setTimeout(function () {
console.log("正在重新连接...");
ws = new WebSocket(ws_url);
addWebSocketHandlers(ws);
}, 5000);
};
ws.onerror = function (e) {
console.log("WebSocket 错误:", e);
};
}
addWebSocketHandlers(ws);
// 向表单添加提交处理程序
function addSubmitHandler(ws) {
messageForm.onsubmit = function (e) {
e.preventDefault();
const message = messageInput.value;
if (message) {
const p = document.createElement("p");
p.textContent = "> " + message;
messagesDiv.appendChild(p);
ws.send(message);
messageInput.value = "";
}
return false;
};
}
</script>
</html>
这个 HTML 文件设置了一个基本网页,包含:
- 一个表单(
messageForm
),带有用于输入消息的输入字段和"发送"按钮。 - JavaScript,它:
- 连接到
wss://[当前主机]/ws/[随机会话 ID]
的 WebSocket 服务器。 - 在成功连接时启用"发送"按钮。
- 将从 WebSocket 接收到的消息附加到
messages
div,处理流式响应和回合完成。 - 当表单提交时,将输入字段中输入的文本发送到 WebSocket 服务器。
- 如果 WebSocket 连接关闭,尝试重新连接。
6. 与你的流式处理应用交互¶
- 导航到正确的目录:
要有效运行你的智能体,你需要在 app 文件夹(adk-streaming/app
) 中
- 启动 Fast API:运行以下命令启动 CLI 界面
- 访问 UI: 一旦 UI 服务器启动,终端将显示一个本地 URL(例如,http://localhost:8000)。点击此链接在浏览器中打开 UI。
现在你应该看到如下 UI:
尝试问一个问题 Gemini 是什么?
。智能体将使用 Google 搜索来回答你的查询。你会注意到 UI 以流式文本的形式显示智能体的响应。你也可以随时向智能体发送消息,即使智能体仍在响应中。这展示了 ADK 流式处理的双向通信能力。
相比传统同步 Web 应用的优势:
- 实时双向通信:无缝交互。
- 响应更快、更具吸引力:无需等待完整响应或持续刷新。感觉像实时对话。
- 可以扩展为支持音频、图像和视频流的多模态应用。
恭喜!你已经成功创建并与你的第一个使用 ADK 的流式处理智能体进行了交互!
下一步¶
- 添加音频/图像模态: 通过流式处理,你还可以使用音频和图像与智能体进行实时通信。我们将在未来添加更多多模态支持的示例。敬请期待!