Skip to content

流式工具

Supported in ADKPython v0.5.0Java v0.2.0Experimental

流式工具允许工具(函数)将中间结果以流的形式返回给智能体,智能体随后可以立即对这些中间结果做出响应。

例如,你可以使用流式工具连续监控股票价格的变化,并让智能体在达到特定阈值时即时反应。另一个例子是让智能体监控视频流,当画面中出现特定对象(如人数变化)时,智能体能够立即汇报这些动态。

支持说明

此功能目前仅在 ADK Gemini Live API 中受支持。

要定义一个流式工具,你必须遵循以下要求:

  1. 异步函数:工具必须定义为 async 异步函数。
  2. AsyncGenerator 返回类型:该函数必须声明返回 AsyncGenerator
    • 第一个类型参数是你 yield 产出的数据类型(例如,文本消息使用 str,结构化数据使用自定义对象)。
    • 第二个类型参数通常为 None(如果生成器不通过 send() 接收外部输入)。

我们支持两类流式工具: - 简单流式工具:接收常规数据流(即你通过 ADK Web 或 ADK Runner 传递的非媒体流)。 - 视频流式工具:专门用于视频场景,实时视频流(JPEG 帧)会被直接传递到该函数中进行处理。

示例实现

下面我们定义一个能够同时监控股票价格和视频画面动态的智能体。

import asyncio
from typing import AsyncGenerator

from google.adk.agents import LiveRequestQueue
from google.adk.agents.llm_agent import Agent
from google.adk.tools.function_tool import FunctionTool
from google.genai import Client
from google.genai import types as genai_types


async def monitor_stock_price(stock_symbol: str) -> AsyncGenerator[str, None]:
  """该函数将以持续流式、异步的方式监控指定股票代码的价格。"""
  print(f"开始监控股票价格: {stock_symbol}!")

  # 模拟股价波动
  await asyncio.sleep(4)
  price_alert1 = f"{stock_symbol} 的当前价格为 300"
  yield price_alert1
  print(price_alert1)

  await asyncio.sleep(4)
  price_alert1 = f"{stock_symbol} 的当前价格为 400"
  yield price_alert1
  print(price_alert1)

  await asyncio.sleep(20)
  price_alert1 = f"{stock_symbol} 的当前价格为 900"
  yield price_alert1
  print(price_alert1)


# 对于视频流监控,`input_stream: LiveRequestQueue` 是 ADK 用于传递视频帧的保留关键参数。
async def monitor_video_stream(
    input_stream: LiveRequestQueue,
) -> AsyncGenerator[str, None]:
  """实时监控视频流中出现了多少人。"""
  print("开始执行视频监控...")
  client = Client(vertexai=False)
  prompt_text = "统计这张图像中的人数。仅返回一个纯数字。"

  last_count = None
  while True:
    last_valid_req = None

    # 消耗队列中的所有图像,仅保留最新的一帧以减少延迟
    while input_stream._queue.qsize() != 0:
      live_req = await input_stream.get()
      if live_req.blob is not None and live_req.blob.mime_type == "image/jpeg":
        last_valid_req = live_req

    # 如果找到了有效的图像帧,则进行处理
    if last_valid_req is not None:
      # 使用获取到的原始数据创建图像 Part
      image_part = genai_types.Part.from_bytes(
          data=last_valid_req.blob.data, mime_type=last_valid_req.blob.mime_type
      )

      contents = genai_types.Content(
          role="user",
          parts=[image_part, genai_types.Part.from_text(prompt_text)],
      )

      # 调用模型对图像进行理解与人数统计
      response = client.models.generate_content(
          model="gemini-flash-latest",
          contents=contents,
          config=genai_types.GenerateContentConfig(
              system_instruction="你是一个专业的视频分析助手,可以准确统计图像或视频中的人数。请仅返回结果数字。"
          ),
      )

      current_count = response.candidates[0].content.parts[0].text
      if not last_count:
        last_count = current_count
      elif last_count != current_count:
        last_count = current_count
        yield response # 将变化汇报给智能体
        print("人数发生变化:", current_count)

    # 在检查下一帧之前稍作等待
    await asyncio.sleep(0.5)


# 此函数必须存在,以协助 ADK 在必要时(如用户打断)停止你的流式工具执行。
# 例如,如果智能体要停止 `monitor_stock_price`,它将通过 stop_streaming(function_name="monitor_stock_price") 调用。
def stop_streaming(function_name: str):
  """停止正在运行的流式处理职能。

  Args:
    function_name: 要停止的流式函数名称。
  """
  pass


root_agent = Agent(
    model="gemini-flash-latest",
    name="video_streaming_agent",
    instruction="""
      你是一个监控专家智能体。你可以利用提供的工具进行视频实时监控和股票价格监控。
      当用户想要监控视频流时,请使用 monitor_video_stream 函数。当该函数返回告警时,你应立即告知用户。
      当用户想要监控股价时,请使用 monitor_stock_price。
      保持回答精简,不要啰嗦。
    """,
    tools=[
        monitor_video_stream,
        monitor_stock_price,
        FunctionTool(stop_streaming),
    ]
)
import com.google.adk.agents.LiveRequestQueue;
import com.google.adk.agents.LlmAgent;
import com.google.adk.tools.Annotations.Schema;
import com.google.adk.tools.FunctionTool;
import com.google.genai.Client;
import com.google.genai.types.Content;
import com.google.genai.types.GenerateContentConfig;
import com.google.genai.types.GenerateContentResponse;
import com.google.genai.types.Part;
import io.reactivex.rxjava3.core.Flowable;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class StreamingTools {

  @Schema(description = "该函数将以持续流式、异步的方式监控指定股票代码的价格。")
  public static Flowable<Map<String, Object>> monitorStockPrice(@Schema(name = "stockSymbol") String stockSymbol) {
    System.out.println("开始监控股票价格: " + stockSymbol);

    return Flowable.concat(
        Flowable.<Map<String, Object>>just(Collections.singletonMap("result", stockSymbol + " 的当前价格为 300")).delay(4, TimeUnit.SECONDS),
        Flowable.<Map<String, Object>>just(Collections.singletonMap("result", stockSymbol + " 的当前价格为 400")).delay(4, TimeUnit.SECONDS),
        Flowable.<Map<String, Object>>just(Collections.singletonMap("result", stockSymbol + " 的当前价格为 900")).delay(20, TimeUnit.SECONDS),
        Flowable.<Map<String, Object>>just(Collections.singletonMap("result", stockSymbol + " 的当前价格为 500")).delay(20, TimeUnit.SECONDS)
    );
  }

  // 视频流监控中,`inputStream` 参数用于 ADK 注入实时的视频帧队列。
  @Schema(description = "实时监控视频流中出现了多少人。")
  public static Flowable<Map<String, Object>> monitorVideoStream(@Schema(name = "inputStream") LiveRequestQueue inputStream) {
    Client client = Client.builder().build();
    String promptText = "统计这张图像中的人数。仅返回一个纯数字。";

    // 使用 RxJava 异步流进行处理
    return inputStream.get()
        .filter(req -> req.blob().isPresent() && "image/jpeg".equals(req.blob().get().mimeType()))
        .sample(500, TimeUnit.MILLISECONDS) // 每 0.5 秒处理一帧
        .map(req -> {
          System.out.println("正在处理队列中的最新帧");
          Part imagePart = Part.builder().inlineData(req.blob().get()).build();
          Content contents = Content.builder()
              .role("user")
              .parts(Arrays.asList(imagePart, Part.fromText(promptText)))
              .build();

          GenerateContentResponse response = client.models().generateContent(
              "gemini-flash-latest",
              contents,
              GenerateContentConfig.builder()
                  .systemInstruction(Content.builder().parts(Arrays.asList(
                      Part.fromText("你是一个专业的视频分析助手,可以准确统计图像或视频中的人数。请仅返回结果数字。")
                  )).build())
                  .build()
          );
          return (Map<String, Object>) Collections.<String, Object>singletonMap("result", response.text());
        })
        .distinctUntilChanged();
  }

  // 使用此精确函数来帮助 ADK 在请求时停止你的流式工具。
  @Schema(description = "停止流式处理")
  public static void stopStreaming(
      @Schema(name = "functionName", description = "要停止的流式函数名称。") String functionName) {
    // 停止流式处理逻辑
  }

  public static void main(String[] args) {
    LlmAgent rootAgent = LlmAgent.builder()
        .model("gemini-flash-latest")
        .name("video_streaming_agent")
        .instruction(
            "你是一个监控智能体。你可以利用提供的工具进行视频监控和股票价格监控。\n" +
            "当用户想要监控视频流时,你可以使用 monitorVideoStream 函数。\n" +
            "当 monitorVideoStream 返回告警时,你应该告知用户。\n" +
            "当用户想要监控股票价格时,你可以使用 monitorStockPrice。\n" +
            "不要问太多问题。不要过于啰嗦。"
        )
        .tools(Arrays.asList(
            FunctionTool.create(StreamingTools.class, "monitorVideoStream"),
            FunctionTool.create(StreamingTools.class, "monitorStockPrice"),
            FunctionTool.create(StreamingTools.class, "stopStreaming")
        ))
        .build();
  }
}

以下是一些推荐的测试 Query:

  • 帮我监控 $XYZ 股票的价格动态。
  • 请帮我盯着视频流,看看里面有多少人。