Skip to content

环境智能体 (Ambient Agents)

Supported in ADKPythonGo

ADK 支持环境智能体,这些自主智能体可以处理数据、监控事件并异步响应,无需人工干预。使用环境智能体可以:

  • 响应云事件。 当文件上传到 Cloud Storage 时处理文件,响应数据库更改,或处理审计日志条目。
  • 处理队列中的消息。 分析传入的支持工单、审核内容、分类文档,或在项目到达时运行质量保证。
  • 按计划运行。 生成日报表、运行定期监控检查,或在固定间隔处理批处理作业。
  • 监控基础设施。 响应基础设施中的连续事件流,并自主对变化采取行动。

从环境智能体获取结果

由于环境智能体无需人工交互即可运行,你需要将其输出路由到通知通道。常见模式包括:

如何构建环境智能体

ADK 提供两种方法:

/run 触发器端点
事件源 任意(Pub/Sub、webhooks、cron、自定义服务) Cloud Pub/SubEventarcStandardAdvanced
负载解析 由你处理 自动(Base64 解码、CloudEvent 解析)
会话创建 启用 --auto_create_session 自动(每个事件一个)
会话存储 你配置的 SessionService 你配置的 SessionService
并发控制 由你处理 内置信号量,可配置限制
重试逻辑 由你处理 带抖动的指数退避,用于瞬态错误
最适合 自定义集成、非 GCP 来源 GCP 原生事件驱动工作负载

使用 /run

当你需要对集成进行全面控制或使用非 GCP 事件源时,请使用 /run 端点。启用 --auto_create_session,以便自动创建会话,然后在事件到达时连接任何 HTTP 客户端调用 /run

adk api_server --auto_create_session path/to/your/agent

此模式适用于任何可以发出 HTTP 请求的事件源。

Example: Processing incoming webhooks

以下 Cloud Run 函数 接收来自外部服务(例如 GitHub)的 webhook 并将其转发给智能体:

import json
import uuid

import functions_framework
import requests

AGENT_URL = "https://my-agent-service-xxxxx.run.app"

@functions_framework.http
def handle_webhook(request):
    """接收 webhook 并转发给智能体的 Cloud Run 函数。"""
    payload = request.get_json(silent=True) or {}
    ...
Example: Send an event with curl
curl -X POST http://localhost:8000/apps/my_agent/run \
  -H "Content-Type: application/json" \
  -d '{
    "app_name": "my_agent",
    "user_id": "webhook-caller",
    "session_id": "session-123",
    "new_message": {
      "role": "user",
      "parts": [{"text": "{\"order_id\": \"1234\", \"status\": \"new\"}"}]
    }
  }'

使用触发器端点

当你的事件源是 Pub/Sub 或 Eventarc,并且你希望 ADK 处理负载解析、会话创建、并发和重试时,请使用触发器端点。

事件如何处理

Pub/Sub 和 Eventarc 以 HTTP POST 请求的形式将事件传递给智能体。当触发器端点收到事件时,它会:

  1. 解析请求:根据源格式(Pub/Sub 推送消息或 CloudEvent)解析请求。
  2. 解码负载:Base64 编码的消息数据被解码,如果可能,解析为 JSON。
  3. 自动创建会话:使用生成的 UUID 自动创建会话。与 /run 端点不同,你无需启用 --auto_create_session——触发器端点始终为每个事件创建一个新会话。
  4. 运行智能体:将解码后的事件作为用户消息运行智能体。
  5. 返回状态码200 响应告知 Pub/Sub 或 Eventarc 事件已成功处理。500 响应表示失败,事件源根据其重试策略重新尝试传递。

支持的来源

来源 端点 描述
Pub/Sub /apps/{app_name}/trigger/pubsub 接收来自 Pub/Sub 推送订阅 的消息。
Eventarc /apps/{app_name}/trigger/eventarc 接收由 Eventarc 传递的 CloudEventsStandardAdvanced),支持结构化和二进制内容模式。

示例智能体

以下智能体处理来自触发器端点的事件。它使用 parse_event 工具提取事件数据和属性,然后分析内容。

Agent code (event_processing_agent/agent.py)
import json

from google.adk.agents import LlmAgent


def parse_event(raw_event: str) -> dict:
    """Parse and extract structured data from a trigger event.

    Trigger endpoints deliver events as a JSON string with 'data' and
    'attributes' fields. This tool extracts those fields so the agent
    can reason about the event contents.
    """
    try:
        event = json.loads(raw_event)
    except json.JSONDecodeError as e:
        return {"error": f"Failed to parse event JSON: {e}"}
    return {
        "data": event.get("data"),
        "attributes": event.get("attributes", {}),
    }


root_agent = LlmAgent(
    model="gemini-flash-latest",
    name="event_processor",
    instruction="""You are an event-processing agent that handles incoming
events from Pub/Sub and Eventarc triggers.

When you receive an event:
1. Use the `parse_event` tool to extract the event data and attributes.
2. Analyze the event contents and determine what action to take.
3. Summarize what you found and what action you would recommend.

Be concise and structured in your responses.""",
    tools=[parse_event],
)

以下智能体处理来自触发器端点的事件。它提取事件数据和属性,然后分析内容。

智能体代码 (event_processing_agent.go)
import (
    "context"
    "log"
    "os"

    "google.golang.org/genai"

    "google.golang.org/adk/agent"
    "google.golang.org/adk/agent/llmagent"
    "google.golang.org/adk/cmd/launcher"
    "google.golang.org/adk/cmd/launcher/full"
    "google.golang.org/adk/model/gemini"
)

func main() {
    ctx := context.Background()

    model, err := gemini.NewModel(ctx, "gemini-2.5-flash", &genai.ClientConfig{
        APIKey: os.Getenv("GOOGLE_API_KEY"),
    })
    if err != nil {
        log.Fatalf("Failed to create model: %v", err)
    }

    a, err := llmagent.New(llmagent.Config{
        Name:        "event_processor",
        Model:       model,
        Description: "Agent to process the events from Pub/Sub and Eventarc triggers.",
        Instruction: `
        You are an event-processing agent that handles incoming
        events from Pub/Sub and Eventarc triggers.

        When you receive an event:
        1. Analyze the event contents and determine what action to take.
        2. Summarize what you found and what action you would recommend.

        Be concise and structured in your responses.`,
    })
    if err != nil {
        log.Fatalf("Failed to create agent: %v", err)
    }

    config := &launcher.Config{
        AgentLoader: agent.NewSingleLoader(a),
    }

    l := full.NewLauncher()
    if err = l.Execute(ctx, config, os.Args[1:]); err != nil {
        log.Fatalf("Run failed: %v\n\n%s", err, l.CommandLineSyntax())
    }
}

启用触发器

触发器端点默认处于禁用状态。使用 --trigger_sources 标志启用它们:

adk api_server --trigger_sources "pubsub,eventarc" path/to/your/agent

对于生产部署,你可以在自定义 FastAPI 入口点中以编程方式启用触发器:

部署入口点 (main.py)
import os

import uvicorn
from google.adk.cli.fast_api import get_fast_api_app

AGENT_DIR = os.path.dirname(os.path.abspath(__file__))

app = get_fast_api_app(
    agents_dir=AGENT_DIR,
    web=False,
    trigger_sources=["pubsub", "eventarc"],
)

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", 8000)))

触发器端点默认处于禁用状态。使用相应的触发器标志启用它们:

go run agent.go web api pubsub eventarc

本地尝试

1. 以启用触发器的方式启动服务器:

adk api_server --trigger_sources "pubsub" event_processing_agent
go run event_processing_agent.go web api pubsub

2. 发送测试事件:

curl -X POST http://localhost:8000/apps/event_processing_agent/trigger/pubsub \
  -H "Content-Type: application/json" \
  -d '{
    "message": {
      "data": "eyJvcmRlcl9pZCI6ICIxMjM0IiwgInN0YXR1cyI6ICJuZXcifQ==",
      "attributes": {"source": "orders-service"}
    },
    "subscription": "projects/my-project/subscriptions/orders-sub"
  }'

Base64 值解码为 {"order_id": "1234", "status": "new"}

成功的响应:

{"status": "success"}

触发器来源

参数映射

/run 端点要求你提供 app_nameuser_idsession_id。触发器端点会自动推导这些参数:

参数 来源
app_name 从 URL 路径中提取(/apps/{app_name}/trigger/...
session_id 每个事件自动生成的 UUID
user_id Pub/Sub:subscription 字段。Eventarc:sourcece-source 标头。

消息格式

所有触发器端点在将传入事件作为用户消息传递给智能体之前,都会将其规范化为一致的 JSON 结构:

{
  "data": "<decoded event payload>",
  "attributes": {"key": "value"}
}
  • data: The decoded event payload. If the original data is JSON, it is parsed into a structured object. Otherwise, it is passed as a plain string.
  • attributes: Key-value metadata from the event source (for example, Pub/Sub message attributes or CloudEvents headers like ce-type, ce-source).

Your agent receives this JSON string as the input message and can parse it to extract the data and attributes.

Pub/Sub

Pub/Sub 触发器端点处理来自 Pub/Sub 推送订阅 的消息。当你的应用程序或服务将消息发布到主题时使用它,例如:

  • 支持门户发布传入工单进行分类和路由。
  • 内容流水线发送文档进行分类或审核。
  • 监控服务发布警报以进行自动分析。

请求格式

Pub/Sub 推送订阅以此格式发送请求:

{
  "message": {
    "data": "eyJvcmRlcl9pZCI6ICIxMjM0IiwgInN0YXR1cyI6ICJuZXcifQ==",
    "attributes": {"source": "orders-service"},
    "messageId": "123456789",
    "publishTime": "2026-04-08T12:00:00Z"
  },
  "subscription": "projects/my-project/subscriptions/my-sub"
}

data 字段是 Base64 编码的。触发器端点会自动解码它。

响应

HTTP 状态 含义
200 事件已成功处理。Pub/Sub 确认消息。
400 请求无效(格式错误的 Base64 编码)。消息不会被重试。
500 处理失败(瞬态或非瞬态智能体错误)。Pub/Sub 根据其重试策略 重新尝试传递。配置死信队列 以捕获重复失败的消息。

Eventarc

Eventarc 触发器端点处理由 Eventarc 传递的 CloudEvents,包括 StandardAdvanced 版本。用于响应 Google Cloud 中的事件,例如:

  • 文件上传到 Cloud Storage(分类、总结或从文档中提取数据)。
  • 记录写入 BigQuery(运行异常检测或生成警报)。
  • 创建了审计日志条目(标记策略违规或可疑活动)。

支持两种内容模式:

  • Binary content mode (Eventarc default): CloudEvents attributes are sent as ce-* HTTP headers, and the body contains the event data (typically a Pub/Sub message wrapper).
  • Structured content mode: All CloudEvents attributes and data are in the JSON body.
Test with curl (structured mode)
curl -X POST http://localhost:8000/apps/my_agent/trigger/eventarc \
  -H "Content-Type: application/json" \
  -d '{
    "specversion": "1.0",
    "type": "google.cloud.storage.object.v1.finalized",
    "source": "//storage.googleapis.com/projects/my-project",
    "id": "event-123",
    "data": {
      "bucket": "my-bucket",
      "name": "uploads/document.pdf"
    }
  }'
Test with curl (binary mode)
curl -X POST http://localhost:8000/apps/my_agent/trigger/eventarc \
  -H "Content-Type: application/json" \
  -H "ce-type: google.cloud.storage.object.v1.finalized" \
  -H "ce-source: //storage.googleapis.com/projects/my-project" \
  -H "ce-id: event-456" \
  -H "ce-specversion: 1.0" \
  -d '{
    "message": {
      "data": "eyJidWNrZXQiOiAibXktYnVja2V0IiwgIm5hbWUiOiAiZG9jLnBkZiJ9",
      "attributes": {"eventType": "OBJECT_FINALIZE"}
    },
    "subscription": "projects/my-project/subscriptions/eventarc-sub"
  }'

响应

HTTP 状态 含义
200 事件已成功处理。Eventarc 确认传递。
500 处理失败。Eventarc 根据其重试策略重新尝试传递。

配置

并发控制

触发器端点使用信号量来限制并发智能体调用的数量。这可以防止在事件突发期间智能体超过你的 LLM 模型配额。

Setting Default Environment Variable
Max concurrent invocations 10 ADK_TRIGGER_MAX_CONCURRENT
Setting Default Flag
Max concurrent invocations 10 --trigger_max_concurrent_runs

When the concurrency limit is reached, incoming requests are queued and processed as slots become available. Concurrency control is per process. If you deploy multiple Cloud Run instances, each instance maintains its own independent semaphore.

# Allow up to 5 concurrent agent invocations
export ADK_TRIGGER_MAX_CONCURRENT=5
go run event_processing_agent.go web api pubsub --trigger_max_concurrent_runs=5

带退避的自动重试

触发器端点包含针对瞬态错误(例如 429 RESOURCE_EXHAUSTED 响应)的内置重试逻辑。当检测到瞬态错误时,将使用指数退避和抖动重试请求。

Setting Default Environment Variable
Max retry attempts 3 ADK_TRIGGER_MAX_RETRIES
Base backoff delay 1.0s ADK_TRIGGER_RETRY_BASE_DELAY
Max backoff delay 30.0s ADK_TRIGGER_RETRY_MAX_DELAY
Setting Default Flag
Max retry attempts 3 --trigger_max_retries
Base backoff delay 1.0s --trigger_base_delay
Max backoff delay 30.0s --trigger_max_delay

如果所有重试都已用尽,端点返回 HTTP 500,通知 Pub/Sub 或 Eventarc 在更高级别重试传递。非瞬态错误会立即失败,不进行重试。

错误处理和灾难恢复

基于触发器的工作负载的灾难恢复由触发服务处理,而非 ADK:

  • 如果你的智能体崩溃或返回错误,Pub/Sub 或 Eventarc 不会收到确认,并会自动重新传递消息。
  • 在最大重试次数用尽后,未处理的消息会移动到死信队列 (DLQ)(如果已配置)。
  • 每次重新传递都会创建一个新会话。触发器工作负载在本质上是无状态的。

超时考虑

所有触发器端点同步处理并在返回响应之前等待智能体完成。这是有意设计的:保持 HTTP 请求活动可确保托管基础设施在智能体仍在工作时不会终止进程。同步响应代码(200 或 500)使得 Pub/Sub 和 Eventarc 能够正确确认成功或触发重试。

最大处理时间由上游服务决定:

服务 最大超时
Pub/Sub 推送 10 分钟(确认截止时间)
Eventarc 10 分钟(Standard 使用 Pub/Sub 作为传输;Advanced 通过流水线传递)

触发器端点专为在 10 分钟内完成的智能体而设计。这适用于处理单个事件、运行验证、分类文档以及将结果写入下游服务。

长时间运行的智能体

触发器端点不适合耗时超过 10 分钟的智能体。对于长时间运行的工作负载,请使用 Pub/Sub 拉取订阅Cloud Run Jobs 或工作池架构。

会话生命周期

会话遵循与所有其他 ADK 入口点相同的模式。它们通过你配置的 SessionService 创建。默认情况下,ADK 使用 InMemorySessionService,这使得触发器会话是短暂的:每个事件创建,处理后丢弃。

如果你配置了持久化 SessionService(例如 DatabaseSessionService),触发器会话会自动存储。这对于事件驱动工作负载的审计、调试和事后分析非常有用。

部署

以下示例使用 Cloud Run 作为部署目标。Cloud Run 是目前推荐用于部署具有触发器端点的环境智能体的平台。

身份验证和安全性

触发器端点是 ADK Web 服务器中的标准 HTTP 路由。身份验证和安全性在部署级别执行,与任何其他 ADK 端点相同。启用身份验证部署时(推荐),所有端点都需要有效凭据。GCP 服务使用服务账号 身份进行身份验证。有关详细信息,请参阅每个服务的文档。

使用 --trigger_sources 标志将启用了触发器的智能体部署到 Cloud Run:

adk deploy cloud_run \
  --project=$GOOGLE_CLOUD_PROJECT \
  --region=$GOOGLE_CLOUD_LOCATION \
  --trigger_sources="pubsub,eventarc" \
  path/to/your/agent

使用相应的触发器标志将启用了触发器的智能体部署到 Cloud Run(所有设置均以触发器类型为前缀)

adk deploy cloud_run \
  --project=$GOOGLE_CLOUD_PROJECT \
  --region=$GOOGLE_CLOUD_LOCATION \
  --pubsub \
  --pubsub_max_concurrent_runs=5 \
  --eventarc \
  --eventarc_max_concurrent_runs=5

部署后,将适当的 GCP 基础设施连接到智能体的触发器端点:

有关完整的部署说明,请参阅部署到 Cloud Run

下一步?