环境智能体 (Ambient Agents)¶
ADK 支持环境智能体,这些自主智能体可以处理数据、监控事件并异步响应,无需人工干预。使用环境智能体可以:
- 响应云事件。 当文件上传到 Cloud Storage 时处理文件,响应数据库更改,或处理审计日志条目。
- 处理队列中的消息。 分析传入的支持工单、审核内容、分类文档,或在项目到达时运行质量保证。
- 按计划运行。 生成日报表、运行定期监控检查,或在固定间隔处理批处理作业。
- 监控基础设施。 响应基础设施中的连续事件流,并自主对变化采取行动。
从环境智能体获取结果¶
由于环境智能体无需人工交互即可运行,你需要将其输出路由到通知通道。常见模式包括:
- 结构化日志记录。 写入 JSON 日志并配置 Cloud Monitoring 警报,通过电子邮件、Slack 或 PagerDuty 通知。
- Pub/Sub。 将结果发布到主题供下游服务消费。
- 应用集成。 将智能体输出路由到电子邮件、Jira 或其他系统。
如何构建环境智能体¶
ADK 提供两种方法:
/run |
触发器端点 | |
|---|---|---|
| 事件源 | 任意(Pub/Sub、webhooks、cron、自定义服务) | Cloud Pub/Sub、Eventarc(Standard 和 Advanced) |
| 负载解析 | 由你处理 | 自动(Base64 解码、CloudEvent 解析) |
| 会话创建 | 启用 --auto_create_session |
自动(每个事件一个) |
| 会话存储 | 你配置的 SessionService |
你配置的 SessionService |
| 并发控制 | 由你处理 | 内置信号量,可配置限制 |
| 重试逻辑 | 由你处理 | 带抖动的指数退避,用于瞬态错误 |
| 最适合 | 自定义集成、非 GCP 来源 | GCP 原生事件驱动工作负载 |
使用 /run¶
当你需要对集成进行全面控制或使用非 GCP 事件源时,请使用 /run 端点。启用 --auto_create_session,以便自动创建会话,然后在事件到达时连接任何 HTTP 客户端调用 /run。
此模式适用于任何可以发出 HTTP 请求的事件源。
Example: Processing incoming webhooks
以下 Cloud Run 函数 接收来自外部服务(例如 GitHub)的 webhook 并将其转发给智能体:
Example: Send an event with curl
使用触发器端点¶
当你的事件源是 Pub/Sub 或 Eventarc,并且你希望 ADK 处理负载解析、会话创建、并发和重试时,请使用触发器端点。
事件如何处理¶
Pub/Sub 和 Eventarc 以 HTTP POST 请求的形式将事件传递给智能体。当触发器端点收到事件时,它会:
- 解析请求:根据源格式(Pub/Sub 推送消息或 CloudEvent)解析请求。
- 解码负载:Base64 编码的消息数据被解码,如果可能,解析为 JSON。
- 自动创建会话:使用生成的 UUID 自动创建会话。与
/run端点不同,你无需启用--auto_create_session——触发器端点始终为每个事件创建一个新会话。 - 运行智能体:将解码后的事件作为用户消息运行智能体。
- 返回状态码:
200响应告知 Pub/Sub 或 Eventarc 事件已成功处理。500响应表示失败,事件源根据其重试策略重新尝试传递。
支持的来源¶
| 来源 | 端点 | 描述 |
|---|---|---|
| Pub/Sub | /apps/{app_name}/trigger/pubsub |
接收来自 Pub/Sub 推送订阅 的消息。 |
| Eventarc | /apps/{app_name}/trigger/eventarc |
接收由 Eventarc 传递的 CloudEvents(Standard 或 Advanced),支持结构化和二进制内容模式。 |
示例智能体¶
以下智能体处理来自触发器端点的事件。它使用 parse_event 工具提取事件数据和属性,然后分析内容。
Agent code (event_processing_agent/agent.py)
import json
from google.adk.agents import LlmAgent
def parse_event(raw_event: str) -> dict:
"""Parse and extract structured data from a trigger event.
Trigger endpoints deliver events as a JSON string with 'data' and
'attributes' fields. This tool extracts those fields so the agent
can reason about the event contents.
"""
try:
event = json.loads(raw_event)
except json.JSONDecodeError as e:
return {"error": f"Failed to parse event JSON: {e}"}
return {
"data": event.get("data"),
"attributes": event.get("attributes", {}),
}
root_agent = LlmAgent(
model="gemini-flash-latest",
name="event_processor",
instruction="""You are an event-processing agent that handles incoming
events from Pub/Sub and Eventarc triggers.
When you receive an event:
1. Use the `parse_event` tool to extract the event data and attributes.
2. Analyze the event contents and determine what action to take.
3. Summarize what you found and what action you would recommend.
Be concise and structured in your responses.""",
tools=[parse_event],
)
以下智能体处理来自触发器端点的事件。它提取事件数据和属性,然后分析内容。
智能体代码 (event_processing_agent.go)
import (
"context"
"log"
"os"
"google.golang.org/genai"
"google.golang.org/adk/agent"
"google.golang.org/adk/agent/llmagent"
"google.golang.org/adk/cmd/launcher"
"google.golang.org/adk/cmd/launcher/full"
"google.golang.org/adk/model/gemini"
)
func main() {
ctx := context.Background()
model, err := gemini.NewModel(ctx, "gemini-2.5-flash", &genai.ClientConfig{
APIKey: os.Getenv("GOOGLE_API_KEY"),
})
if err != nil {
log.Fatalf("Failed to create model: %v", err)
}
a, err := llmagent.New(llmagent.Config{
Name: "event_processor",
Model: model,
Description: "Agent to process the events from Pub/Sub and Eventarc triggers.",
Instruction: `
You are an event-processing agent that handles incoming
events from Pub/Sub and Eventarc triggers.
When you receive an event:
1. Analyze the event contents and determine what action to take.
2. Summarize what you found and what action you would recommend.
Be concise and structured in your responses.`,
})
if err != nil {
log.Fatalf("Failed to create agent: %v", err)
}
config := &launcher.Config{
AgentLoader: agent.NewSingleLoader(a),
}
l := full.NewLauncher()
if err = l.Execute(ctx, config, os.Args[1:]); err != nil {
log.Fatalf("Run failed: %v\n\n%s", err, l.CommandLineSyntax())
}
}
启用触发器¶
触发器端点默认处于禁用状态。使用 --trigger_sources 标志启用它们:
对于生产部署,你可以在自定义 FastAPI 入口点中以编程方式启用触发器:
部署入口点 (main.py)
import os
import uvicorn
from google.adk.cli.fast_api import get_fast_api_app
AGENT_DIR = os.path.dirname(os.path.abspath(__file__))
app = get_fast_api_app(
agents_dir=AGENT_DIR,
web=False,
trigger_sources=["pubsub", "eventarc"],
)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", 8000)))
本地尝试¶
1. 以启用触发器的方式启动服务器:
2. 发送测试事件:
curl -X POST http://localhost:8000/apps/event_processing_agent/trigger/pubsub \
-H "Content-Type: application/json" \
-d '{
"message": {
"data": "eyJvcmRlcl9pZCI6ICIxMjM0IiwgInN0YXR1cyI6ICJuZXcifQ==",
"attributes": {"source": "orders-service"}
},
"subscription": "projects/my-project/subscriptions/orders-sub"
}'
Base64 值解码为 {"order_id": "1234", "status": "new"}。
成功的响应:
触发器来源¶
参数映射¶
/run 端点要求你提供 app_name、user_id 和 session_id。触发器端点会自动推导这些参数:
| 参数 | 来源 |
|---|---|
app_name |
从 URL 路径中提取(/apps/{app_name}/trigger/...) |
session_id |
每个事件自动生成的 UUID |
user_id |
Pub/Sub:subscription 字段。Eventarc:source 或 ce-source 标头。 |
消息格式¶
所有触发器端点在将传入事件作为用户消息传递给智能体之前,都会将其规范化为一致的 JSON 结构:
data: The decoded event payload. If the original data is JSON, it is parsed into a structured object. Otherwise, it is passed as a plain string.attributes: Key-value metadata from the event source (for example, Pub/Sub message attributes or CloudEvents headers likece-type,ce-source).
Your agent receives this JSON string as the input message and can parse it to extract the data and attributes.
Pub/Sub¶
Pub/Sub 触发器端点处理来自 Pub/Sub 推送订阅 的消息。当你的应用程序或服务将消息发布到主题时使用它,例如:
- 支持门户发布传入工单进行分类和路由。
- 内容流水线发送文档进行分类或审核。
- 监控服务发布警报以进行自动分析。
请求格式¶
Pub/Sub 推送订阅以此格式发送请求:
{
"message": {
"data": "eyJvcmRlcl9pZCI6ICIxMjM0IiwgInN0YXR1cyI6ICJuZXcifQ==",
"attributes": {"source": "orders-service"},
"messageId": "123456789",
"publishTime": "2026-04-08T12:00:00Z"
},
"subscription": "projects/my-project/subscriptions/my-sub"
}
data 字段是 Base64 编码的。触发器端点会自动解码它。
响应¶
| HTTP 状态 | 含义 |
|---|---|
| 200 | 事件已成功处理。Pub/Sub 确认消息。 |
| 400 | 请求无效(格式错误的 Base64 编码)。消息不会被重试。 |
| 500 | 处理失败(瞬态或非瞬态智能体错误)。Pub/Sub 根据其重试策略 重新尝试传递。配置死信队列 以捕获重复失败的消息。 |
Eventarc¶
Eventarc 触发器端点处理由 Eventarc 传递的 CloudEvents,包括 Standard 和 Advanced 版本。用于响应 Google Cloud 中的事件,例如:
- 文件上传到 Cloud Storage(分类、总结或从文档中提取数据)。
- 记录写入 BigQuery(运行异常检测或生成警报)。
- 创建了审计日志条目(标记策略违规或可疑活动)。
支持两种内容模式:
- Binary content mode (Eventarc default): CloudEvents attributes are sent
as
ce-*HTTP headers, and the body contains the event data (typically a Pub/Sub message wrapper). - Structured content mode: All CloudEvents attributes and data are in the JSON body.
Test with curl (structured mode)
curl -X POST http://localhost:8000/apps/my_agent/trigger/eventarc \
-H "Content-Type: application/json" \
-d '{
"specversion": "1.0",
"type": "google.cloud.storage.object.v1.finalized",
"source": "//storage.googleapis.com/projects/my-project",
"id": "event-123",
"data": {
"bucket": "my-bucket",
"name": "uploads/document.pdf"
}
}'
Test with curl (binary mode)
curl -X POST http://localhost:8000/apps/my_agent/trigger/eventarc \
-H "Content-Type: application/json" \
-H "ce-type: google.cloud.storage.object.v1.finalized" \
-H "ce-source: //storage.googleapis.com/projects/my-project" \
-H "ce-id: event-456" \
-H "ce-specversion: 1.0" \
-d '{
"message": {
"data": "eyJidWNrZXQiOiAibXktYnVja2V0IiwgIm5hbWUiOiAiZG9jLnBkZiJ9",
"attributes": {"eventType": "OBJECT_FINALIZE"}
},
"subscription": "projects/my-project/subscriptions/eventarc-sub"
}'
响应¶
| HTTP 状态 | 含义 |
|---|---|
| 200 | 事件已成功处理。Eventarc 确认传递。 |
| 500 | 处理失败。Eventarc 根据其重试策略重新尝试传递。 |
配置¶
并发控制¶
触发器端点使用信号量来限制并发智能体调用的数量。这可以防止在事件突发期间智能体超过你的 LLM 模型配额。
| Setting | Default | Environment Variable |
|---|---|---|
| Max concurrent invocations | 10 | ADK_TRIGGER_MAX_CONCURRENT |
| Setting | Default | Flag |
|---|---|---|
| Max concurrent invocations | 10 | --trigger_max_concurrent_runs |
When the concurrency limit is reached, incoming requests are queued and processed as slots become available. Concurrency control is per process. If you deploy multiple Cloud Run instances, each instance maintains its own independent semaphore.
带退避的自动重试¶
触发器端点包含针对瞬态错误(例如 429 RESOURCE_EXHAUSTED 响应)的内置重试逻辑。当检测到瞬态错误时,将使用指数退避和抖动重试请求。
| Setting | Default | Environment Variable |
|---|---|---|
| Max retry attempts | 3 | ADK_TRIGGER_MAX_RETRIES |
| Base backoff delay | 1.0s | ADK_TRIGGER_RETRY_BASE_DELAY |
| Max backoff delay | 30.0s | ADK_TRIGGER_RETRY_MAX_DELAY |
| Setting | Default | Flag |
|---|---|---|
| Max retry attempts | 3 | --trigger_max_retries |
| Base backoff delay | 1.0s | --trigger_base_delay |
| Max backoff delay | 30.0s | --trigger_max_delay |
如果所有重试都已用尽,端点返回 HTTP 500,通知 Pub/Sub 或 Eventarc 在更高级别重试传递。非瞬态错误会立即失败,不进行重试。
错误处理和灾难恢复¶
基于触发器的工作负载的灾难恢复由触发服务处理,而非 ADK:
- 如果你的智能体崩溃或返回错误,Pub/Sub 或 Eventarc 不会收到确认,并会自动重新传递消息。
- 在最大重试次数用尽后,未处理的消息会移动到死信队列 (DLQ)(如果已配置)。
- 每次重新传递都会创建一个新会话。触发器工作负载在本质上是无状态的。
超时考虑¶
所有触发器端点同步处理并在返回响应之前等待智能体完成。这是有意设计的:保持 HTTP 请求活动可确保托管基础设施在智能体仍在工作时不会终止进程。同步响应代码(200 或 500)使得 Pub/Sub 和 Eventarc 能够正确确认成功或触发重试。
最大处理时间由上游服务决定:
| 服务 | 最大超时 |
|---|---|
| Pub/Sub 推送 | 10 分钟(确认截止时间) |
| Eventarc | 10 分钟(Standard 使用 Pub/Sub 作为传输;Advanced 通过流水线传递) |
触发器端点专为在 10 分钟内完成的智能体而设计。这适用于处理单个事件、运行验证、分类文档以及将结果写入下游服务。
长时间运行的智能体
触发器端点不适合耗时超过 10 分钟的智能体。对于长时间运行的工作负载,请使用 Pub/Sub 拉取订阅、Cloud Run Jobs 或工作池架构。
会话生命周期¶
会话遵循与所有其他 ADK 入口点相同的模式。它们通过你配置的 SessionService 创建。默认情况下,ADK 使用 InMemorySessionService,这使得触发器会话是短暂的:每个事件创建,处理后丢弃。
如果你配置了持久化 SessionService(例如 DatabaseSessionService),触发器会话会自动存储。这对于事件驱动工作负载的审计、调试和事后分析非常有用。
部署¶
以下示例使用 Cloud Run 作为部署目标。Cloud Run 是目前推荐用于部署具有触发器端点的环境智能体的平台。
身份验证和安全性
触发器端点是 ADK Web 服务器中的标准 HTTP 路由。身份验证和安全性在部署级别执行,与任何其他 ADK 端点相同。启用身份验证部署时(推荐),所有端点都需要有效凭据。GCP 服务使用服务账号 身份进行身份验证。有关详细信息,请参阅每个服务的文档。
使用 --trigger_sources 标志将启用了触发器的智能体部署到 Cloud Run:
部署后,将适当的 GCP 基础设施连接到智能体的触发器端点:
- Pub/Sub:创建一个指向
/apps/{app_name}/trigger/pubsub的推送订阅。 - Eventarc:创建一个Eventarc Standard 触发器 或一个Eventarc Advanced 流水线,路由到
/apps/{app_name}/trigger/eventarc。 - Cloud Scheduler:创建一个调度器作业,按 cron 计划发布到你的 Pub/Sub 主题。
有关完整的部署说明,请参阅部署到 Cloud Run。
下一步?¶
- 了解如何将智能体部署到 Cloud Run
- 探索API 服务器端点 用于交互式智能体调用
- 使用 Pub/Sub 工具集 为智能体提供发布和拉取消息的能力