Skip to content

ADK 流式处理快速入门

通过本快速入门,你将学习创建一个简单的智能体,并使用 ADK 流式处理来实现低延迟、双向的语音和视频通信。我们将安装 ADK,设置一个基本的"Google 搜索"智能体,尝试使用 adk web 工具进行流式处理运行智能体,然后解释如何使用 ADK 流式处理和 FastAPI 自己构建一个简单的异步 web 应用。

注意: 本指南假设你有在 Windows、Mac 和 Linux 环境中使用终端的经验。

支持语音/视频流式处理的模型

为了在 ADK 中使用语音/视频流式处理,你需要使用支持 Live API 的 Gemini 模型。你可以在文档中找到支持 Gemini Live API 的模型 ID

1. 设置环境并安装 ADK

创建并激活虚拟环境(推荐):

# 创建
python -m venv .venv
# 激活(每次打开新终端时)
# macOS/Linux: source .venv/bin/activate
# Windows CMD: .venv\Scripts\activate.bat
# Windows PowerShell: .venv\Scripts\Activate.ps1

安装 ADK:

pip install google-adk

2. 项目结构

创建以下带有空文件的文件夹结构:

adk-streaming/  # 项目文件夹
└── app/ # web 应用文件夹
    ├── .env # Gemini API 密钥
    └── google_search_agent/ # 智能体文件夹
        ├── __init__.py # Python 包
        └── agent.py # 智能体定义

agent.py

将以下代码块复制粘贴到 agent.py

对于 model,请按照前面模型部分中的描述仔细检查模型 ID。

from google.adk.agents import Agent
from google.adk.tools import google_search  # 导入工具

root_agent = Agent(
   # 智能体的唯一名称
   name="basic_search_agent",
   # 智能体将使用的大型语言模型 (LLM)
   model="gemini-2.0-flash-exp", # Google AI Studio
   #model="gemini-2.0-flash-live-preview-04-09" # Vertex AI Studio
   # 智能体目的的简短描述
   description="Agent to answer questions using Google Search.",
   # 设置智能体行为的指令
   instruction="You are an expert researcher. You always stick to the facts.",
   # 添加 google_search 工具,使用 Google 搜索进行接地
   tools=[google_search]
)

注意: 要启用文本和音频/视频输入,模型必须支持 generateContent(用于文本)和 bidiGenerateContent 方法。通过参考 Gemini 模型列表 (models.list) 文档 验证这些功能。此快速入门使用了 gemini-2.0-flash-exp 模型进行演示。

agent.py 是存储所有智能体逻辑的地方,你必须定义一个 root_agent

注意你是如何轻松集成 使用 Google 搜索进行基础信息获取 功能的。Agent 类和 google_search 工具处理与 LLM 的复杂交互以及与搜索 API 的基础信息获取,让你可以专注于智能体的目的行为

intro_components.png

将以下代码块复制粘贴到 __init__.pymain.py 文件中。

__init__.py
from . import agent

3. 设置平台

要运行智能体,请从 Google AI Studio 或 Google Cloud Vertex AI 中选择一个平台:

  1. Google AI Studio 获取 API 密钥。
  2. 打开位于 (app/) 中的 .env 文件并复制粘贴以下代码。

    .env
    GOOGLE_GENAI_USE_VERTEXAI=FALSE
    GOOGLE_API_KEY=PASTE_YOUR_ACTUAL_API_KEY_HERE
    
  3. PASTE_YOUR_ACTUAL_API_KEY_HERE 替换为你的实际 API KEY

  1. 你需要一个现有的 Google Cloud 账户和项目。
  2. 打开位于 (app/) 中的 .env 文件。复制粘贴以下代码并更新项目 ID 和位置。

    .env
    GOOGLE_GENAI_USE_VERTEXAI=TRUE
    GOOGLE_CLOUD_PROJECT=PASTE_YOUR_ACTUAL_PROJECT_ID
    GOOGLE_CLOUD_LOCATION=us-central1
    

4. 使用 adk web 尝试智能体

现在可以尝试智能体了。运行以下命令来启动开发 UI。首先,确保将当前目录设置为 app

cd app

同时,使用以下命令设置 SSL_CERT_FILE 变量。这是后面进行语音和视频测试所必需的。

export SSL_CERT_FILE=$(python -m certifi)

然后,运行开发 UI:

adk web

直接在浏览器中打开提供的 URL(通常是 http://localhost:8000http://127.0.0.1:8000)。此连接完全保持在你的本地机器上。选择 basic_search_agent

尝试文本

通过在 UI 中输入以下提示来尝试。

  • 纽约的天气如何?
  • 纽约现在几点?
  • 巴黎的天气如何?
  • 巴黎现在几点?

智能体将使用 google_search 工具获取最新信息来回答这些问题。

尝试语音和视频

要尝试语音功能,重新加载网页浏览器,点击麦克风按钮启用语音输入,并用语音提出相同的问题。你将实时听到回答。

要尝试视频功能,重新加载网页浏览器,点击摄像头按钮启用视频输入,并提出类似"你看到什么?"的问题。智能体将回答它在视频输入中看到的内容。

停止工具

通过在控制台按 Ctrl-C 停止 adk web

ADK 流式处理说明

以下功能将在未来版本的 ADK 流式处理中支持:回调、长时间运行工具、示例工具和 Shell 智能体(如 SequentialAgent)。

5. 构建自定义流式处理应用(可选)

在上一节中,我们已经使用 adk web 工具检查了我们的基本搜索智能体是否可以与 ADK 流式处理一起工作。在本节中,我们将学习如何使用 FastAPI 构建你自己的能够进行流式通信的 Web 应用。

app 下添加 static 目录,并添加 main.pyindex.html 作为空文件,如下结构所示:

adk-streaming/  # 项目文件夹
└── app/ # web 应用文件夹
    ├── main.py # FastAPI web 应用
    └── static/ # 静态内容文件夹
        └── index.html # Web 客户端页面

通过添加上述目录和文件,整个目录结构和文件将如下所示:

adk-streaming/  # Project folder
└── app/ # the web app folder
    ├── main.py # FastAPI web app
    ├── static/ # Static content folder
    |   └── index.html # The web client page
    ├── .env # Gemini API key
    └── google_search_agent/ # Agent folder
        ├── __init__.py # Python package
        └── agent.py # Agent definition

main.py

将以下代码块复制粘贴到 main.py 文件中。

import os
import json
import asyncio

from pathlib import Path
from dotenv import load_dotenv

from google.genai.types import (
    Part,
    Content,
)

from google.adk.runners import Runner
from google.adk.agents import LiveRequestQueue
from google.adk.agents.run_config import RunConfig
from google.adk.sessions.in_memory_session_service import InMemorySessionService

from fastapi import FastAPI, WebSocket
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse

from google_search_agent.agent import root_agent

#
# ADK 流式处理
#

# 加载 Gemini API 密钥
load_dotenv()

APP_NAME = "ADK 流式处理示例"
session_service = InMemorySessionService()


def start_agent_session(session_id: str):
    """启动智能体会话"""

    # 创建会话
    session = session_service.create_session(
        app_name=APP_NAME,
        user_id=session_id,
        session_id=session_id,
    )

    # 创建运行器
    runner = Runner(
        app_name=APP_NAME,
        agent=root_agent,
        session_service=session_service,
    )

    # 设置响应模态 = TEXT
    run_config = RunConfig(response_modalities=["TEXT"])

    # 为此会话创建 LiveRequestQueue
    live_request_queue = LiveRequestQueue()

    # 启动智能体会话
    live_events = runner.run_live(
        session=session,
        live_request_queue=live_request_queue,
        run_config=run_config,
    )
    return live_events, live_request_queue


async def agent_to_client_messaging(websocket, live_events):
    """智能体到客户端通信"""
    while True:
        async for event in live_events:
            # turn_complete
            if event.turn_complete:
                await websocket.send_text(json.dumps({"turn_complete": True}))
                print("[回合完成]")

            if event.interrupted:
                await websocket.send_text(json.dumps({"interrupted": True}))
                print("[已中断]")

            # 读取 Content 及其第一个 Part
            part: Part = (
                event.content and event.content.parts and event.content.parts[0]
            )
            if not part or not event.partial:
                continue

            # 获取文本
            text = event.content and event.content.parts and event.content.parts[0].text
            if not text:
                continue

            # 将文本发送给客户端
            await websocket.send_text(json.dumps({"message": text}))
            print(f"[智能体到客户端]: {text}")
            await asyncio.sleep(0)


async def client_to_agent_messaging(websocket, live_request_queue):
    """客户端到智能体通信"""
    while True:
        text = await websocket.receive_text()
        content = Content(role="user", parts=[Part.from_text(text=text)])
        live_request_queue.send_content(content=content)
        print(f"[客户端到智能体]: {text}")
        await asyncio.sleep(0)


#
# FastAPI web 应用
#

app = FastAPI()

STATIC_DIR = Path("static")
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")


@app.get("/")
async def root():
    """提供 index.html"""
    return FileResponse(os.path.join(STATIC_DIR, "index.html"))


@app.websocket("/ws/{session_id}")
async def websocket_endpoint(websocket: WebSocket, session_id: int):
    """客户端 websocket 端点"""

    # 等待客户端连接
    await websocket.accept()
    print(f"客户端 #{session_id} 已连接")

    # 启动智能体会话
    session_id = str(session_id)
    live_events, live_request_queue = start_agent_session(session_id)

    # 启动任务
    agent_to_client_task = asyncio.create_task(
        agent_to_client_messaging(websocket, live_events)
    )
    client_to_agent_task = asyncio.create_task(
        client_to_agent_messaging(websocket, live_request_queue)
    )
    await asyncio.gather(agent_to_client_task, client_to_agent_task)

    # 已断开连接
    print(f"客户端 #{session_id} 已断开连接")

这段代码使用 ADK 和 FastAPI 创建了一个实时聊天应用。它设置了一个 WebSocket 端点,客户端可以在此连接并与 Google 搜索智能体交互。

主要功能:

  • 加载 Gemini API 密钥。
  • 使用 ADK 管理智能体会话并运行 google_search_agent
  • start_agent_session 初始化一个智能体会话,带有用于实时通信的实时请求队列。
  • agent_to_client_messaging 异步地将智能体的文本响应和状态更新(回合完成、已中断)流式传输到已连接的 WebSocket 客户端。
  • client_to_agent_messaging 异步接收来自 WebSocket 客户端的文本消息,并将其作为用户输入发送给智能体。
  • FastAPI 提供静态前端并在 /ws/{session_id} 处理 WebSocket 连接。
  • 当客户端连接时,它启动一个智能体会话并创建并发任务,通过 WebSocket 在客户端和智能体之间进行双向通信。

将以下代码块复制粘贴到 index.html 文件中。

index.html
<!doctype html>
<html>
  <head>
    <title>ADK 流式处理测试</title>
  </head>

  <body>
    <h1>ADK 流式处理测试</h1>
    <div
      id="messages"
      style="height: 300px; overflow-y: auto; border: 1px solid black"></div>
    <br />

    <form id="messageForm">
      <label for="message">消息</label>
      <input type="text" id="message" name="message" />
      <button type="submit" id="sendButton" disabled>发送</button>
    </form>
  </body>

  <script>
    // 使用 WebSocket 连接连接服务器
    const sessionId = Math.random().toString().substring(10);
    const ws_url = "ws://" + window.location.host + "/ws/" + sessionId;
    let ws = new WebSocket(ws_url);

    // 获取 DOM 元素
    const messageForm = document.getElementById("messageForm");
    const messageInput = document.getElementById("message");
    const messagesDiv = document.getElementById("messages");
    let currentMessageId = null;

    // WebSocket 处理程序
    function addWebSocketHandlers(ws) {
      ws.onopen = function () {
        console.log("WebSocket 连接已打开。");
        document.getElementById("sendButton").disabled = false;
        document.getElementById("messages").textContent = "连接已打开";
        addSubmitHandler(this);
      };

      ws.onmessage = function (event) {
        // 解析传入的消息
        const packet = JSON.parse(event.data);
        console.log(packet);

        // 检查回合是否完成
        // 如果回合完成,添加新消息
        if (packet.turn_complete && packet.turn_complete == true) {
          currentMessageId = null;
          return;
        }

        // 为新回合添加新消息
        if (currentMessageId == null) {
          currentMessageId = Math.random().toString(36).substring(7);
          const message = document.createElement("p");
          message.id = currentMessageId;
          // 将消息元素附加到 messagesDiv
          messagesDiv.appendChild(message);
        }

        // 将消息文本添加到现有消息元素
        const message = document.getElementById(currentMessageId);
        message.textContent += packet.message;

        // 滚动到 messagesDiv 底部
        messagesDiv.scrollTop = messagesDiv.scrollHeight;
      };

      // 当连接关闭时,尝试重新连接
      ws.onclose = function () {
        console.log("WebSocket 连接已关闭。");
        document.getElementById("sendButton").disabled = true;
        document.getElementById("messages").textContent = "连接已关闭";
        setTimeout(function () {
          console.log("正在重新连接...");
          ws = new WebSocket(ws_url);
          addWebSocketHandlers(ws);
        }, 5000);
      };

      ws.onerror = function (e) {
        console.log("WebSocket 错误:", e);
      };
    }
    addWebSocketHandlers(ws);

    // 向表单添加提交处理程序
    function addSubmitHandler(ws) {
      messageForm.onsubmit = function (e) {
        e.preventDefault();
        const message = messageInput.value;
        if (message) {
          const p = document.createElement("p");
          p.textContent = "> " + message;
          messagesDiv.appendChild(p);
          ws.send(message);
          messageInput.value = "";
        }
        return false;
      };
    }
  </script>
</html>

这个 HTML 文件设置了一个基本网页,包含:

  • 一个表单(messageForm),带有用于输入消息的输入字段和"发送"按钮。
  • JavaScript,它:
  • 连接到 wss://[当前主机]/ws/[随机会话 ID] 的 WebSocket 服务器。
  • 在成功连接时启用"发送"按钮。
  • 将从 WebSocket 接收到的消息附加到 messages div,处理流式响应和回合完成。
  • 当表单提交时,将输入字段中输入的文本发送到 WebSocket 服务器。
  • 如果 WebSocket 连接关闭,尝试重新连接。

6. 与你的流式处理应用交互

  1. 导航到正确的目录:

要有效运行你的智能体,你需要在 app 文件夹(adk-streaming/app

  1. 启动 Fast API:运行以下命令启动 CLI 界面
uvicorn main:app --reload
  1. 访问 UI: 一旦 UI 服务器启动,终端将显示一个本地 URL(例如,http://localhost:8000)。点击此链接在浏览器中打开 UI。

现在你应该看到如下 UI:

ADK 流式处理测试

尝试问一个问题 Gemini 是什么?。智能体将使用 Google 搜索来回答你的查询。你会注意到 UI 以流式文本的形式显示智能体的响应。你也可以随时向智能体发送消息,即使智能体仍在响应中。这展示了 ADK 流式处理的双向通信能力。

相比传统同步 Web 应用的优势:

  • 实时双向通信:无缝交互。
  • 响应更快、更具吸引力:无需等待完整响应或持续刷新。感觉像实时对话。
  • 可以扩展为支持音频、图像和视频流的多模态应用。

恭喜!你已经成功创建并与你的第一个使用 ADK 的流式处理智能体进行了交互!

下一步

  • 添加音频/图像模态: 通过流式处理,你还可以使用音频和图像与智能体进行实时通信。我们将在未来添加更多多模态支持的示例。敬请期待!