Skip to content

自定义音频流应用

本文概述了使用 ADK Streaming 和 FastAPI 构建的自定义异步 Web 应用的服务器和客户端代码,实现实时、双向的音频和文本通信。

注意: 本指南假设你已经具备 JavaScript 和 Python asyncio 编程经验。

语音/视频流支持的模型

为了在 ADK 中使用语音/视频流,你需要使用支持 Live API 的 Gemini 模型。你可以在文档中找到支持 Gemini Live API 的模型 ID

1. 安装 ADK

创建并激活虚拟环境(推荐):

# 创建
python -m venv .venv
# 激活(每个新终端)
# macOS/Linux: source .venv/bin/activate
# Windows CMD: .venv\Scripts\activate.bat
# Windows PowerShell: .venv\Scripts\Activate.ps1

安装 ADK:

pip install google-adk

使用以下命令设置 SSL_CERT_FILE 变量(当客户端通过 wss:// 连接连接到服务器时需要)。

export SSL_CERT_FILE=$(python -m certifi)

下载示例代码:

git clone --no-checkout https://github.com/google/adk-docs.git
cd adk-docs
git sparse-checkout init --cone
git sparse-checkout set examples/python/snippets/streaming/adk-streaming
git checkout main
cd examples/python/snippets/streaming/adk-streaming/app

此示例代码包含以下文件和文件夹:

adk-streaming/
└── app/ # Web 应用文件夹
    ├── .env # Gemini API 密钥 / Google Cloud 项目 ID
    ├── main.py # FastAPI Web 应用
    ├── static/ # 静态内容文件夹
    |   ├── js # JavaScript 文件文件夹(包含 app.js)
    |   └── index.html # Web 客户端页面
    └── google_search_agent/ # 智能体文件夹
        ├── __init__.py # Python 包
        └── agent.py # 智能体定义

2. 设置平台

要运行示例应用,请从 Google AI Studio 或 Google Cloud Vertex AI 中选择一个平台:

  1. Google AI Studio 获取 API 密钥。
  2. 打开位于 (app/) 内的 .env 文件,并复制粘贴以下代码。

    .env
    GOOGLE_GENAI_USE_VERTEXAI=FALSE
    GOOGLE_API_KEY=PASTE_YOUR_ACTUAL_API_KEY_HERE
    
  3. PASTE_YOUR_ACTUAL_API_KEY_HERE 替换为你的实际 API KEY

  1. 你需要一个现有的 Google Cloud 账户和一个项目。
  2. 打开位于 (app/) 内的 .env 文件。复制粘贴以下代码并更新项目 ID 和位置。

    .env
    GOOGLE_GENAI_USE_VERTEXAI=TRUE
    GOOGLE_CLOUD_PROJECT=PASTE_YOUR_ACTUAL_PROJECT_ID
    GOOGLE_CLOUD_LOCATION=us-central1
    

agent.py

google_search_agent 文件夹中的智能体定义代码 agent.py 是编写智能体逻辑的地方:

from google.adk.agents import Agent
from google.adk.tools import google_search  # Import the tool

root_agent = Agent(
   # 智能体的唯一名称。
   name="google_search_agent",
   # 智能体将使用的大语言模型 (LLM)。
   model="gemini-2.0-flash-exp",
   # model="gemini-2.0-flash-live-001",  # 截至 2025 年 2 月的新流媒体模型版本
   # 智能体目的的简短描述。
   description="Agent to answer questions using Google Search.",
   # 设置智能体行为的指令。
   instruction="Answer the question using the Google Search tool.",
   # 添加 google_search 工具以使用 Google 搜索进行接地。
   tools=[google_search],
)

注意: 要同时启用文本和音频/视频输入,模型必须支持 generateContent(用于文本)和 bidiGenerateContent 方法。请参考 模型列表文档 验证这些功能。此快速入门使用 gemini-2.0-flash-exp 模型进行演示。

注意你如何轻松地集成 与 Google 搜索接地 功能。Agent 类和 google_search 工具处理与 LLM 和搜索 API 接地的复杂交互,使你能够专注于智能体的目的行为

intro_components.png

3. 与你的流应用交互

1. 导航到正确的目录:

要有效运行你的智能体,请确保你在 app 文件夹 (adk-streaming/app)

2. 启动 Fast API:运行以下命令启动 CLI 界面

uvicorn main:app --reload

3. 以文本模式访问应用: 一旦应用启动,终端将显示一个本地 URL(例如 http://localhost:8000)。点击此链接在浏览器中打开 UI。

现在你应该看到如下所示的 UI:

ADK Streaming app

尝试提问 What time is it now?。智能体将使用 Google 搜索回应你的查询。你会注意到 UI 以流式文本显示智能体的回应。即使在智能体仍在回应时,你也可以随时向智能体发送消息。这展示了 ADK Streaming 的双向通信能力。

4. 以音频模式访问应用: 现在点击 Start Audio 按钮。应用将以音频模式重新连接服务器,首次使用时 UI 将显示以下对话框:

ADK Streaming app

点击 Allow while visiting the site,然后你将看到麦克风图标显示在浏览器顶部:

ADK Streaming app

现在你可以用语音与智能体交谈。用语音提问类似 What time is it now? 的问题,你也会听到智能体用语音回应。由于 ADK 的 Streaming 支持 多种语言,它也可以用支持的语言回应问题。

5. 查看控制台日志

如果你使用 Chrome 浏览器,右键点击并选择 Inspect 打开 DevTools。在 Console 中,你可以看到传入和传出的音频数据,例如 [CLIENT TO AGENT][AGENT TO CLIENT],表示浏览器和服务器之间流入和流出的音频数据。

同时,在应用服务器控制台中,你应该会看到类似这样的内容:

INFO:     ('127.0.0.1', 50068) - "WebSocket /ws/70070018?is_audio=true" [accepted]
Client #70070018 connected, audio mode: true
INFO:     connection open
INFO:     127.0.0.1:50061 - "GET /static/js/pcm-player-processor.js HTTP/1.1" 200 OK
INFO:     127.0.0.1:50060 - "GET /static/js/pcm-recorder-processor.js HTTP/1.1" 200 OK
[AGENT TO CLIENT]: audio/pcm: 9600 bytes.
INFO:     127.0.0.1:50082 - "GET /favicon.ico HTTP/1.1" 404 Not Found
[AGENT TO CLIENT]: audio/pcm: 11520 bytes.
[AGENT TO CLIENT]: audio/pcm: 11520 bytes.

如果你开发自己的流应用,这些控制台日志非常重要。在许多情况下,浏览器和服务器之间的通信失败是流应用 bug 的主要原因。

4. 服务器代码概述

这个服务器应用通过 WebSockets 实现与 ADK 智能体的实时流交互。客户端向 ADK 智能体发送文本/音频,并接收流式文本/音频回应。

核心功能: 1. 初始化/管理 ADK 智能体会话。 2. 处理客户端 WebSocket 连接。 3. 将客户端消息转发给 ADK 智能体。 4. 向客户端流式传输 ADK 智能体回应(文本/音频)。

ADK Streaming 设置

import os
import json
import asyncio
import base64

from pathlib import Path
from dotenv import load_dotenv

from google.genai.types import (
    Part,
    Content,
    Blob,
)

from google.adk.runners import Runner
from google.adk.agents import LiveRequestQueue
from google.adk.agents.run_config import RunConfig
from google.adk.sessions.in_memory_session_service import InMemorySessionService

from fastapi import FastAPI, WebSocket
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse

from google_search_agent.agent import root_agent
  • 导入: 包括标准 Python 库、用于环境变量的 dotenv、Google ADK 和 FastAPI。
  • load_dotenv() 加载环境变量。
  • APP_NAME:ADK 的应用标识符。
  • session_service = InMemorySessionService():初始化 ADK 内存会话服务,适用于单实例或开发使用。生产环境可能使用持久存储。

start_agent_session(session_id, is_audio=False)

def start_agent_session(session_id, is_audio=False):
    """Starts an agent session"""

    # Create a Session
    session = await session_service.create_session(
        app_name=APP_NAME,
        user_id=session_id,
        session_id=session_id,
    )

    # Create a Runner
    runner = Runner(
        app_name=APP_NAME,
        agent=root_agent,
        session_service=session_service,
    )

    # Set response modality
    modality = "AUDIO" if is_audio else "TEXT"
    run_config = RunConfig(response_modalities=[modality])

    # Create a LiveRequestQueue for this session
    live_request_queue = LiveRequestQueue()

    # Start agent session
    live_events = runner.run_live(
        session=session,
        live_request_queue=live_request_queue,
        run_config=run_config,
    )
    return live_events, live_request_queue

此函数初始化 ADK 智能体的实时会话。

参数 类型 描述
session_id str 唯一客户端会话标识符。
is_audio bool True 表示音频回应,False 表示文本回应(默认)。

关键步骤: 1. 创建会话: 建立 ADK 会话。 2. 创建 Runner:root_agent 实例化 ADK 运行器。 3. 设置回应模态: 配置智能体回应为 "AUDIO" 或 "TEXT"。 4. 创建 LiveRequestQueue: 为客户端输入到智能体创建队列。 5. 启动智能体会话: runner.run_live(...) 启动智能体,返回: * live_events:智能体事件的异步可迭代对象(文本、音频、完成)。 * live_request_queue:向智能体发送数据的队列。

返回: (live_events, live_request_queue)

agent_to_client_messaging(websocket, live_events)

async def agent_to_client_messaging(websocket, live_events):
    """Agent to client communication"""
    while True:
        async for event in live_events:

            # If the turn complete or interrupted, send it
            if event.turn_complete or event.interrupted:
                message = {
                    "turn_complete": event.turn_complete,
                    "interrupted": event.interrupted,
                }
                await websocket.send_text(json.dumps(message))
                print(f"[AGENT TO CLIENT]: {message}")
                continue

            # Read the Content and its first Part
            part: Part = (
                event.content and event.content.parts and event.content.parts[0]
            )
            if not part:
                continue

            # If it's audio, send Base64 encoded audio data
            is_audio = part.inline_data and part.inline_data.mime_type.startswith("audio/pcm")
            if is_audio:
                audio_data = part.inline_data and part.inline_data.data
                if audio_data:
                    message = {
                        "mime_type": "audio/pcm",
                        "data": base64.b64encode(audio_data).decode("ascii")
                    }
                    await websocket.send_text(json.dumps(message))
                    print(f"[AGENT TO CLIENT]: audio/pcm: {len(audio_data)} bytes.")
                    continue

            # If it's text and a parial text, send it
            if part.text and event.partial:
                message = {
                    "mime_type": "text/plain",
                    "data": part.text
                }
                await websocket.send_text(json.dumps(message))
                print(f"[AGENT TO CLIENT]: text/plain: {message}")

这个异步函数将 ADK 智能体事件流式传输到 WebSocket 客户端。

逻辑: 1. 遍历智能体的 live_events。 2. 回合完成/中断: 向客户端发送状态标志。 3. 内容处理: * 从事件内容中提取第一个 Part。 * 音频数据: 如果是音频(PCM),Base64 编码并作为 JSON 发送:{ "mime_type": "audio/pcm", "data": "<base64_audio>" }。 * 文本数据: 如果是部分文本,作为 JSON 发送:{ "mime_type": "text/plain", "data": "<partial_text>" }。 4. 记录消息日志。

client_to_agent_messaging(websocket, live_request_queue)

async def client_to_agent_messaging(websocket, live_request_queue):
    """Client to agent communication"""
    while True:
        # Decode JSON message
        message_json = await websocket.receive_text()
        message = json.loads(message_json)
        mime_type = message["mime_type"]
        data = message["data"]

        # Send the message to the agent
        if mime_type == "text/plain":
            # Send a text message
            content = Content(role="user", parts=[Part.from_text(text=data)])
            live_request_queue.send_content(content=content)
            print(f"[CLIENT TO AGENT]: {data}")
        elif mime_type == "audio/pcm":
            # Send an audio data
            decoded_data = base64.b64decode(data)
            live_request_queue.send_realtime(Blob(data=decoded_data, mime_type=mime_type))
        else:
            raise ValueError(f"Mime type not supported: {mime_type}")

这个异步函数将来自 WebSocket 客户端的消息转发给 ADK 智能体。

逻辑: 1. 接收并解析 WebSocket 中的 JSON 消息,期望格式:{ "mime_type": "text/plain" | "audio/pcm", "data": "<data>" }。 2. 文本输入: 对于 "text/plain",通过 live_request_queue.send_content() 向智能体发送 Content。 3. 音频输入: 对于 "audio/pcm",解码 Base64 数据,包装在 Blob 中,并通过 live_request_queue.send_realtime() 发送。 4. 对不支持的 MIME 类型引发 ValueError。 5. 记录消息日志。

FastAPI Web 应用

app = FastAPI()

STATIC_DIR = Path("static")
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")


@app.get("/")
async def root():
    """Serves the index.html"""
    return FileResponse(os.path.join(STATIC_DIR, "index.html"))


@app.websocket("/ws/{session_id}")
async def websocket_endpoint(websocket: WebSocket, session_id: int, is_audio: str):
    """Client websocket endpoint"""

    # Wait for client connection
    await websocket.accept()
    print(f"Client #{session_id} connected, audio mode: {is_audio}")

    # Start agent session
    session_id = str(session_id)
    live_events, live_request_queue = start_agent_session(session_id, is_audio == "true")

    # Start tasks
    agent_to_client_task = asyncio.create_task(
        agent_to_client_messaging(websocket, live_events)
    )
    client_to_agent_task = asyncio.create_task(
        client_to_agent_messaging(websocket, live_request_queue)
    )
    await asyncio.gather(agent_to_client_task, client_to_agent_task)

    # Disconnected
    print(f"Client #{session_id} disconnected")
  • app = FastAPI():初始化应用。
  • 静态文件:/static 下提供 static 目录中的文件。
  • @app.get("/") (根端点): 提供 index.html
  • @app.websocket("/ws/{session_id}") (WebSocket 端点):
    • 路径参数: session_id (int) 和 is_audio (str: "true"/"false")。
    • 连接处理:
      1. 接受 WebSocket 连接。
      2. 使用 session_idis_audio 调用 start_agent_session()
      3. 并发消息任务: 使用 asyncio.gather 并发创建和运行 agent_to_client_messagingclient_to_agent_messaging。这些任务处理双向消息流。
      4. 记录客户端连接和断开连接。

工作原理(整体流程){ #How-It-Works }

  1. 客户端连接到 ws://<server>/ws/<session_id>?is_audio=<true_or_false>
  2. 服务器的 websocket_endpoint 接受连接,启动 ADK 会话 (start_agent_session)。
  3. 两个 asyncio 任务管理通信:
    • client_to_agent_messaging:客户端 WebSocket 消息 -> ADK live_request_queue
    • agent_to_client_messaging:ADK live_events -> 客户端 WebSocket。
  4. 双向流式传输持续进行,直到断开连接或出错。

5. 客户端代码概述

JavaScript app.js(在 app/static/js 中)管理客户端与 ADK Streaming WebSocket 后端的交互。它处理发送文本/音频和接收/显示流式回应。

主要功能: 1. 管理 WebSocket 连接。 2. 处理文本输入。 3. 捕获麦克风音频(Web Audio API,AudioWorklets)。 4. 向后端发送文本/音频。 5. 接收并渲染文本/音频智能体回应。 6. 管理 UI。

前提条件

  • HTML 结构: 需要特定的元素 ID(例如 messageFormmessagemessagessendButtonstartAudioButton)。
  • 后端服务器: Python FastAPI 服务器必须在运行。
  • 音频 Worklet 文件: 用于音频处理的 audio-player.jsaudio-recorder.js

WebSocket 处理

// 通过 WebSocket 连接连接到服务器
const sessionId = Math.random().toString().substring(10);
const ws_url =
  "ws://" + window.location.host + "/ws/" + sessionId;
let websocket = null;
let is_audio = false;

// 获取 DOM 元素
const messageForm = document.getElementById("messageForm");
const messageInput = document.getElementById("message");
const messagesDiv = document.getElementById("messages");
let currentMessageId = null;

// WebSocket 处理程序
function connectWebsocket() {
  // 连接 websocket
  websocket = new WebSocket(ws_url + "?is_audio=" + is_audio);

  // 处理连接打开
  websocket.onopen = function () {
    // 连接已打开消息
    console.log("WebSocket connection opened.");
    document.getElementById("messages").textContent = "Connection opened";

    // 启用发送按钮
    document.getElementById("sendButton").disabled = false;
    addSubmitHandler();
  };

  // 处理传入消息
  websocket.onmessage = function (event) {
    // 解析传入消息
    const message_from_server = JSON.parse(event.data);
    console.log("[AGENT TO CLIENT] ", message_from_server);

    // 检查回合是否完成
    // 如果回合完成,添加新消息
    if (
      message_from_server.turn_complete &&
      message_from_server.turn_complete == true
    ) {
      currentMessageId = null;
      return;
    }

    // 如果是音频,播放它
    if (message_from_server.mime_type == "audio/pcm" && audioPlayerNode) {
      audioPlayerNode.port.postMessage(base64ToArray(message_from_server.data));
    }

    // 如果是文本,打印它
    if (message_from_server.mime_type == "text/plain") {
      // 为新回合添加新消息
      if (currentMessageId == null) {
        currentMessageId = Math.random().toString(36).substring(7);
        const message = document.createElement("p");
        message.id = currentMessageId;
        // 将消息元素附加到 messagesDiv
        messagesDiv.appendChild(message);
      }

      // 将消息文本添加到现有消息元素
      const message = document.getElementById(currentMessageId);
      message.textContent += message_from_server.data;

      // 向下滚动到 messagesDiv 底部
      messagesDiv.scrollTop = messagesDiv.scrollHeight;
    }
  };

  // 处理连接关闭
  websocket.onclose = function () {
    console.log("WebSocket connection closed.");
    document.getElementById("sendButton").disabled = true;
    document.getElementById("messages").textContent = "Connection closed";
    setTimeout(function () {
      console.log("Reconnecting...");
      connectWebsocket();
    }, 5000);
  };

  websocket.onerror = function (e) {
    console.log("WebSocket error: ", e);
  };
}
connectWebsocket();

// 向表单添加提交处理程序
function addSubmitHandler() {
  messageForm.onsubmit = function (e) {
    e.preventDefault();
    const message = messageInput.value;
    if (message) {
      const p = document.createElement("p");
      p.textContent = "> " + message;
      messagesDiv.appendChild(p);
      messageInput.value = "";
      sendMessage({
        mime_type: "text/plain",
        data: message,
      });
      console.log("[CLIENT TO AGENT] " + message);
    }
    return false;
  };
}

// 将消息作为 JSON 字符串发送到服务器
function sendMessage(message) {
  if (websocket && websocket.readyState == WebSocket.OPEN) {
    const messageJson = JSON.stringify(message);
    websocket.send(messageJson);
  }
}

// 将 Base64 数据解码为数组
function base64ToArray(base64) {
  const binaryString = window.atob(base64);
  const len = binaryString.length;
  const bytes = new Uint8Array(len);
  for (let i = 0; i < len; i++) {
    bytes[i] = binaryString.charCodeAt(i);
  }
  return bytes.buffer;
}
  • 连接设置: 生成 sessionId,构造 ws_urlis_audio 标志(初始为 false)在激活时向 URL 附加 ?is_audio=trueconnectWebsocket() 初始化连接。
  • websocket.onopen:启用发送按钮,更新 UI,调用 addSubmitHandler()
  • websocket.onmessage:解析来自服务器的传入 JSON。
    • 回合完成: 如果智能体回合完成,重置 currentMessageId
    • 音频数据 (audio/pcm): 解码 Base64 音频 (base64ToArray()) 并发送到 audioPlayerNode 进行播放。
    • 文本数据 (text/plain): 如果是新回合(currentMessageId 为 null),创建新的 <p>。将接收的文本附加到当前消息段落,产生流式效果。滚动 messagesDiv
  • websocket.onclose:禁用发送按钮,更新 UI,5 秒后尝试自动重新连接。
  • websocket.onerror:记录错误。
  • 初始连接: 脚本加载时调用 connectWebsocket()

DOM 交互与消息提交

  • 元素检索: 获取所需的 DOM 元素。
  • addSubmitHandler():附加到 messageForm 的提交。阻止默认提交,获取 messageInput 中的文本,显示用户消息,清除输入,并使用 { mime_type: "text/plain", data: messageText } 调用 sendMessage()
  • sendMessage(messagePayload):如果 WebSocket 打开,发送 JSON 字符串化的 messagePayload

音频处理

let audioPlayerNode;
let audioPlayerContext;
let audioRecorderNode;
let audioRecorderContext;
let micStream;

// 导入音频 worklets
import { startAudioPlayerWorklet } from "./audio-player.js";
import { startAudioRecorderWorklet } from "./audio-recorder.js";

// 启动音频
function startAudio() {
  // 启动音频输出
  startAudioPlayerWorklet().then(([node, ctx]) => {
    audioPlayerNode = node;
    audioPlayerContext = ctx;
  });
  // 启动音频输入
  startAudioRecorderWorklet(audioRecorderHandler).then(
    ([node, ctx, stream]) => {
      audioRecorderNode = node;
      audioRecorderContext = ctx;
      micStream = stream;
    }
  );
}

// 仅当用户点击按钮时启动音频
// (由于 Web Audio API 的手势要求)
const startAudioButton = document.getElementById("startAudioButton");
startAudioButton.addEventListener("click", () => {
  startAudioButton.disabled = true;
  startAudio();
  is_audio = true;
  connectWebsocket(); // 以音频模式重新连接
});

// 音频录制器处理程序
function audioRecorderHandler(pcmData) {
  // 将 pcm 数据作为 base64 发送
  sendMessage({
    mime_type: "audio/pcm",
    data: arrayBufferToBase64(pcmData),
  });
  console.log("[CLIENT TO AGENT] sent %s bytes", pcmData.byteLength);
}

// 用 Base64 编码数组缓冲区
function arrayBufferToBase64(buffer) {
  let binary = "";
  const bytes = new Uint8Array(buffer);
  const len = bytes.byteLength;
  for (let i = 0; i < len; i++) {
    binary += String.fromCharCode(bytes[i]);
  }
  return window.btoa(binary);
}
  • 音频 Worklets: 通过 audio-player.js(用于播放)和 audio-recorder.js(用于捕获)使用 AudioWorkletNode
  • 状态变量: 存储 AudioContexts 和 WorkletNodes(例如 audioPlayerNode)。
  • startAudio():初始化播放器和录制器 worklets。将 audioRecorderHandler 作为回调传递给录制器。
  • "Start Audio" 按钮 (startAudioButton):
    • 需要用户手势才能使用 Web Audio API。
    • 点击时:禁用按钮,调用 startAudio(),设置 is_audio = true,然后调用 connectWebsocket() 以音频模式重新连接(URL 包含 ?is_audio=true)。
  • audioRecorderHandler(pcmData):来自录制器 worklet 的回调,带有 PCM 音频块。将 pcmData 编码为 Base64 (arrayBufferToBase64()) 并通过 sendMessage() 发送到服务器,带有 mime_type: "audio/pcm"
  • 辅助函数: base64ToArray() (服务器音频 -> 客户端播放器)和 arrayBufferToBase64() (客户端麦克风音频 -> 服务器)。

工作原理(客户端流程){ #How-It-Works-Client-Side-Flow }

  1. 页面加载: 以文本模式建立 WebSocket。
  2. 文本交互: 用户输入/提交文本;发送到服务器。服务器文本回应显示,流式传输。
  3. 切换到音频模式: "Start Audio" 按钮点击初始化音频 worklets,设置 is_audio=true,并以音频模式重新连接 WebSocket。
  4. 音频交互: 录制器将麦克风音频(Base64 PCM)发送到服务器。服务器音频/文本回应由 websocket.onmessage 处理,用于播放/显示。
  5. 连接管理: WebSocket 关闭时自动重新连接。

总结

本文概述了使用 ADK Streaming 和 FastAPI 构建的自定义异步 Web 应用的服务器和客户端代码,实现实时、双向的语音和文本通信。

Python FastAPI 服务器代码初始化 ADK 智能体会话,配置为文本或音频回应。它使用 WebSocket 端点处理客户端连接。异步任务管理双向消息传递:将客户端文本或 Base64 编码的 PCM 音频转发给 ADK 智能体,并将文本或 Base64 编码的 PCM 音频回应从智能体流式传输回客户端。

客户端 JavaScript 代码管理 WebSocket 连接,可以重新建立以在文本和音频模式之间切换。它发送用户输入(文本或通过 Web Audio API 和 AudioWorklets 捕获的麦克风音频)到服务器。来自服务器的传入消息被处理:文本被显示(流式传输),Base64 编码的 PCM 音频被解码并使用 AudioWorklet 播放。

生产环境的后续步骤

当你在生产应用中使用 ADK 的 Streaming 时,你可能需要考虑以下几点:

  • 部署多个实例: 运行多个 FastAPI 应用实例,而不是单个实例。
  • 实现负载均衡: 在应用实例前放置负载均衡器,以分配传入的 WebSocket 连接。
    • 为 WebSockets 配置: 确保负载均衡器支持长连接 WebSocket 连接,并考虑"粘性会话"(会话亲和性)将客户端路由到同一个后端实例,或者 设计为无状态实例(见下一点)。
  • 外部化会话状态: 将 ADK 的 InMemorySessionService 替换为分布式、持久的会话存储。这允许任何服务器实例处理任何用户的会话,实现应用服务器级别的真正无状态性,并提高容错能力。
  • 实现健康检查: 为 WebSocket 服务器实例设置强大的健康检查,以便负载均衡器可以自动从轮换中移除不健康的实例。
  • 利用编排: 考虑使用 Kubernetes 等编排平台,实现 WebSocket 服务器实例的自动部署、扩展、自我修复和管理。