BigQuery 智能体分析插件¶
版本要求
使用 ADK 的 最新版本(版本 1.21.0 或更高版本)以充分利用本文档中描述的功能。
BigQuery 智能体分析插件通过提供深入的智能体行为分析的健壮解决方案,显著增强了智能体开发工具包(ADK)。使用 ADK 插件架构和 BigQuery 存储写入 API,它直接捕获和记录关键操作事件到 Google BigQuery 表中,为你提供高级功能,用于调试、实时监控和全面的离线性能评估。
版本 1.21.0 引入了 混合多模态日志记录,允许你通过将大型负载(图像、音频、数据块)卸载到 Google Cloud Storage(GCS)来记录它们,同时在 BigQuery 中保留结构化引用(ObjectRef)。
预览版发布
BigQuery 智能体分析插件处于预览版发布阶段。欲了解更多信息,请参见发布阶段描述。
BigQuery 存储写入 API
此功能使用 BigQuery 存储写入 API,这是一个付费服务。有关成本信息,请参见 BigQuery 文档。
使用场景¶
- 智能体工作流调试和分析: 捕获广泛的插件生命周期事件(LLM 调用、工具使用)和 智能体生成的事件(用户输入、模型响应),记录到一个定义良好的模式中。
- 高容量分析和调试: 日志操作使用存储写入 API 异步执行,以实现高吞吐量和低延迟。
- 多模态分析:记录和分析文本、图像和其他模态。大文件被卸载到 GCS,使它们可以通过对象表访问 BigQuery ML。
- 分布式跟踪:内置支持 OpenTelemetry 风格的跟踪(
trace_id,span_id)以可视化智能体执行流。
记录的智能体事件数据根据 ADK 事件类型而有所不同。欲了解更多信息,请参见 事件类型和负载。
先决条件¶
- Google Cloud 项目 启用了 BigQuery API。
- BigQuery 数据集: 在使用插件之前创建一个数据集来存储日志表。如果表不存在,插件会在数据集中自动创建必要的事件表。
- Google Cloud Storage 存储桶(可选): 如果你计划记录多模态内容(图像、音频等),建议创建一个 GCS 存储桶来卸载大文件。
- 身份验证:
- 本地: 运行
gcloud auth application-default login。 - 云: 确保你的服务账户具有所需的权限。
- 本地: 运行
IAM 权限¶
为了让智能体正常工作,运行智能体的主体(例如,服务账户、用户账户)需要这些 Google Cloud 角色:
* roles/bigquery.jobUser 在项目级别运行 BigQuery 查询。
* roles/bigquery.dataEditor 在表级别写入日志/事件数据。
* 如果使用 GCS 卸载: roles/storage.objectCreator 和 roles/storage.objectViewer 在目标存储桶上。
与智能体一起使用¶
你通过配置并将 BigQuery 智能体分析插件注册到你的 ADK 智能体的 App 对象来使用它。以下示例显示了使用此插件的智能体实现,包括 GCS 卸载:
# my_bq_agent/agent.py
import os
import google.auth
from google.adk.apps import App
from google.adk.plugins.bigquery_agent_analytics_plugin import BigQueryAgentAnalyticsPlugin, BigQueryLoggerConfig
from google.adk.agents import Agent
from google.adk.models.google_llm import Gemini
from google.adk.tools.bigquery import BigQueryToolset, BigQueryCredentialsConfig
# --- 配置 ---
PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT", "your-gcp-project-id")
DATASET_ID = os.environ.get("BIG_QUERY_DATASET_ID", "your-big-query-dataset-id")
LOCATION = os.environ.get("GOOGLE_CLOUD_LOCATION", "US") # 插件中的默认位置是 US
GCS_BUCKET = os.environ.get("GCS_BUCKET_NAME", "your-gcs-bucket-name") # 可选
if PROJECT_ID == "your-gcp-project-id":
raise ValueError("请设置 GOOGLE_CLOUD_PROJECT 或更新代码。")
# --- 关键:在 Gemini 实例化之前设置环境变量 ---
os.environ['GOOGLE_CLOUD_PROJECT'] = PROJECT_ID
os.environ['GOOGLE_CLOUD_LOCATION'] = LOCATION
os.environ['GOOGLE_GENAI_USE_VERTEXAI'] = 'True'
# --- 使用配置初始化插件 ---
bq_config = BigQueryLoggerConfig(
enabled=True,
gcs_bucket_name=GCS_BUCKET, # 为多模态内容启用 GCS 卸载
log_multi_modal_content=True,
max_content_length=500 * 1024, # 500 KB 限制用于内联文本
batch_size=1, # 默认为 1 以实现低延迟,增加以实现高吞吐量
shutdown_timeout=10.0
)
bq_logging_plugin = BigQueryAgentAnalyticsPlugin(
project_id=PROJECT_ID,
dataset_id=DATASET_ID,
table_id="agent_events_v2", # 默认表名是 agent_events_v2
config=bq_config,
location=LOCATION
)
# --- 初始化工具和模型 ---
credentials, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
bigquery_toolset = BigQueryToolset(
credentials_config=BigQueryCredentialsConfig(credentials=credentials)
)
llm = Gemini(model="gemini-2.5-flash")
root_agent = Agent(
model=llm,
name='my_bq_agent',
instruction="你是一个可以访问 BigQuery 工具的有用助手。",
tools=[bigquery_toolset]
)
# --- 创建 App ---
app = App(
name="my_bq_agent",
root_agent=root_agent,
plugins=[bq_logging_plugin],
)
运行和测试智能体¶
通过运行智能体并通过聊天界面进行一些请求来测试插件,例如"告诉我你能做什么"或"列出我的云项目
SELECT timestamp, event_type, content
FROM `your-gcp-project-id.your-big-query-dataset-id.agent_events_v2`
ORDER BY timestamp DESC
LIMIT 20;
配置选项¶
你可以使用 BigQueryLoggerConfig 自定义插件。
enabled(bool, 默认:True): 要禁用插件记录智能体数据到 BigQuery 表,请将此参数设置为 False。clustering_fields(List[str], 默认:["event_type", "agent", "user_id"]): 自动创建 BigQuery 表时用于聚类的字段。gcs_bucket_name(Optional[str], 默认:None): 用于卸载大内容(图像、数据块、大文本)的 GCS 存储桶的名称。如果未提供,大内容可能会被截断或替换为占位符。connection_id(Optional[str], 默认:None): 用作ObjectRef列的授权者的 BigQuery 连接 ID(例如,us.my-connection)。使用ObjectRef与 BigQuery ML 时需要。max_content_length(int, 默认:500 * 1024): 在 BigQuery 中存储 内联 的文本内容的最大长度(以字符为单位),在此之前卸载到 GCS(如果配置)或截断。默认为 500 KB。batch_size(int, 默认:1): 写入 BigQuery 之前要批处理的事件数。batch_flush_interval(float, 默认:1.0): 在刷新部分批处理之前等待的最大时间(以秒为单位)。shutdown_timeout(float, 默认:10.0): 关闭期间等待日志刷新的秒数。event_allowlist(Optional[List[str]], 默认:None): 要记录的事件类型列表。如果为None,则记录所有事件,除了event_denylist中的事件。有关支持的事件类型的完整列表,请参见 事件类型和负载 部分。event_denylist(Optional[List[str]], 默认:None): 要跳过记录的事件类型列表。有关支持的事件类型的完整列表,请参见 事件类型和负载 部分。content_formatter(Optional[Callable[[Any, str], Any]], 默认:None): 记录前格式化事件内容的可选函数。log_multi_modal_content(bool, 默认:True): 是否记录详细的内容部分(包括 GCS 引用)。
以下代码示例显示了如何为 BigQuery 智能体分析插件定义配置:
import json
import re
from google.adk.plugins.bigquery_agent_analytics_plugin import BigQueryLoggerConfig
def redact_dollar_amounts(event_content: Any) -> str:
"""
自定义格式化程序,用于删除美元金额(例如,$600,$12.50)
并确保如果输入是字典,则输出 JSON。
"""
text_content = ""
if isinstance(event_content, dict):
text_content = json.dumps(event_content)
else:
text_content = str(event_content)
# 正则表达式查找美元金额:$ 后跟数字,可选择带有逗号或小数。
# 示例:$600,$1,200.50,$0.99
redacted_content = re.sub(r'\$\d+(?:,\d{3})*(?:\.\d+)?', 'xxx', text_content)
return redacted_content
config = BigQueryLoggerConfig(
enabled=True,
event_allowlist=["LLM_REQUEST", "LLM_RESPONSE"], # 仅记录这些事件
# event_denylist=["TOOL_STARTING"], # 跳过这些事件
shutdown_timeout=10.0, # 退出时等待最多 10 秒以刷新日志
client_close_timeout=2.0, # 等待最多 2 秒以关闭 BQ 客户端
max_content_length=500, # 将内容截断为 500 个字符
content_formatter=redact_dollar_amounts, # 在日志内容中删除美元金额
)
plugin = BigQueryAgentAnalyticsPlugin(..., config=config)
模式和生产设置¶
如果表不存在,插件会自动创建表。但是,对于生产环境,我们建议使用以下 DDL 手动创建表,该 DDL 利用 JSON 类型的灵活性和 重复记录 的多模态内容。
推荐的 DDL:
CREATE TABLE `your-gcp-project-id.adk_agent_logs.agent_events_v2`
(
timestamp TIMESTAMP NOT NULL OPTIONS(description="记录事件的 UTC 时间。"),
event_type STRING OPTIONS(description="指示正在记录的事件类型(例如,'LLM_REQUEST','TOOL_COMPLETED')。"),
agent STRING OPTIONS(description="与事件关联的 ADK 智能体或作者的名称。"),
session_id STRING OPTIONS(description="用于在单个对话或用户会话内对事件进行分组的唯一标识符。"),
invocation_id STRING OPTIONS(description="会话中每次单独的智能体执行或回合的唯一标识符。"),
user_id STRING OPTIONS(description="与当前会话关联的用户标识符。"),
trace_id STRING OPTIONS(description="用于分布式跟踪的 OpenTelemetry 跟踪 ID。"),
span_id STRING OPTIONS(description="此特定操作的 OpenTelemetry 跨度 ID。"),
parent_span_id STRING OPTIONS(description="OpenTelemetry 父跨度 ID 以重建层次结构。"),
content JSON OPTIONS(description="作为 JSON 存储的事件特定数据(负载)。"),
content_parts ARRAY<STRUCT<
mime_type STRING,
uri STRING,
object_ref STRUCT<
uri STRING,
version STRING,
authorizer STRING,
details JSON
>,
text STRING,
part_index INT64,
part_attributes STRING,
storage_mode STRING
>> OPTIONS(description="用于多模态数据的详细内容部分。"),
attributes JSON OPTIONS(description="用于附加元数据的任意键值对。"),
latency_ms JSON OPTIONS(description="延迟测量(例如,total_ms)。"),
status STRING OPTIONS(description="事件的结果,通常为 'OK' 或 'ERROR'。"),
error_message STRING OPTIONS(description="如果发生错误则填充。"),
is_truncated BOOLEAN OPTIONS(description="标志指示内容是否被截断。")
)
PARTITION BY DATE(timestamp)
CLUSTER BY event_type, agent, user_id;
事件类型和负载¶
content 列现在包含特定于 event_type 的 JSON 对象。content_parts 列提供了内容的结构化视图,对于图像或卸载的数据特别有用。
内容截断
- 可变内容字段被截断为
max_content_length(在BigQueryLoggerConfig中配置,默认 500KB)。 - 如果配置了
gcs_bucket_name,大内容将被卸载到 GCS 而不是被截断,并且引用存储在content_parts.object_ref中。
LLM 交互(插件生命周期)¶
这些事件跟踪发送到 LLM 的原始请求和从 LLM 接收到的响应。
| 事件类型 | 内容(JSON)结构 | 属性(JSON) | 示例内容(简化) |
|---|---|---|---|
LLM_REQUEST |
{
"prompt": [
{"role": "user", "content": "..."}
],
"system_prompt": "..."
}
|
{
"tools": ["tool_a", "tool_b"],
"llm_config": {"temperature": 0.5}
}
|
{
"prompt": [
{"role": "user", "content": "法国的首都是什么?"}
],
"system_prompt": "你是一个有用的地理助手。"
}
|
LLM_RESPONSE |
{
"response": "...",
"usage": {...}
}
|
{} |
{
"response": "法国的首都是巴黎。",
"usage": {
"prompt": 15,
"completion": 7,
"total": 22
}
}
|
LLM_ERROR |
null |
{} |
null(参见 error_message 列) |
工具使用(插件生命周期)¶
这些事件跟踪智能体执行的工具。
| 事件类型 | 内容(JSON)结构 | 属性(JSON) | 示例内容 |
|---|---|---|---|
TOOL_STARTING |
{
"tool": "...",
"args": {...}
}
|
{} |
{"tool": "list_datasets", "args": {"project_id": "my-project"}}
|
TOOL_COMPLETED |
{
"tool": "...",
"result": "..."
}
|
{} |
{"tool": "list_datasets", "result": ["ds1", "ds2"]}
|
TOOL_ERROR |
{
"tool": "...",
"args": {...}
}
|
{} |
{"tool": "list_datasets", "args": {}}
|
智能体生命周期和通用事件¶
| 事件类型 | 内容(JSON)结构 |
|---|---|
INVOCATION_STARTING |
{} |
INVOCATION_COMPLETED |
{} |
AGENT_STARTING |
"你是一个有用的智能体..." |
AGENT_COMPLETED |
{} |
USER_MESSAGE_RECEIVED |
{"text_summary": "帮我预订航班。"} |
GCS 卸载示例(多模态和大文本)¶
当配置了 gcs_bucket_name 时,大文本和多模态内容(图像、音频等)会自动卸载到 GCS。content 列将包含摘要或占位符,而 content_parts 包含指向 GCS URI 的 object_ref。
卸载文本示例
{
"event_type": "LLM_REQUEST",
"content_parts": [
{
"part_index": 1,
"mime_type": "text/plain",
"storage_mode": "GCS_REFERENCE",
"text": "AAAA... [OFFLOADED]",
"object_ref": {
"uri": "gs://haiyuan-adk-debug-verification-1765319132/2025-12-10/e-f9545d6d/ae5235e6_p1.txt",
"authorizer": "us.bqml_connection",
"details": {"gcs_metadata": {"content_type": "text/plain"}}
}
}
]
}
卸载图像示例
{
"event_type": "LLM_REQUEST",
"content_parts": [
{
"part_index": 2,
"mime_type": "image/png",
"storage_mode": "GCS_REFERENCE",
"text": "[MEDIA OFFLOADED]",
"object_ref": {
"uri": "gs://haiyuan-adk-debug-verification-1765319132/2025-12-10/e-f9545d6d/ae5235e6_p2.png",
"authorizer": "us.bqml_connection",
"details": {"gcs_metadata": {"content_type": "image/png"}}
}
}
]
}
查询卸载内容(获取签名 URL)
SELECT
timestamp,
event_type,
part.mime_type,
part.storage_mode,
part.object_ref.uri AS gcs_uri,
-- 生成签名 URL 以直接读取内容(需要 connection_id 配置)
STRING(OBJ.GET_ACCESS_URL(part.object_ref, 'r').access_urls.read_url) AS signed_url
FROM `your-gcp-project-id.your-dataset-id.agent_events_v2`,
UNNEST(content_parts) AS part
WHERE part.storage_mode = 'GCS_REFERENCE'
ORDER BY timestamp DESC
LIMIT 10;
高级分析查询¶
使用 trace_id 跟踪特定对话回合
SELECT timestamp, event_type, agent, JSON_VALUE(content, '$.response') as summary
FROM `your-gcp-project-id.your-dataset-id.agent_events_v2`
WHERE trace_id = 'your-trace-id'
ORDER BY timestamp ASC;
令牌使用分析(访问 JSON 字段)
SELECT
AVG(CAST(JSON_VALUE(content, '$.usage.total') AS INT64)) as avg_tokens
FROM `your-gcp-project-id.your-dataset-id.agent_events_v2`
WHERE event_type = 'LLM_RESPONSE';
查询多模态内容(使用 content_parts 和 ObjectRef)
SELECT
timestamp,
part.mime_type,
part.object_ref.uri as gcs_uri
FROM `your-gcp-project-id.your-dataset-id.agent_events_v2`,
UNNEST(content_parts) as part
WHERE part.mime_type LIKE 'image/%'
ORDER BY timestamp DESC;
使用 BigQuery 远程模型(Gemini)分析多模态内容
SELECT
logs.session_id,
-- 获取图像的签名 URL
STRING(OBJ.GET_ACCESS_URL(parts.object_ref, "r").access_urls.read_url) as signed_url,
-- 使用远程模型(例如,gemini-pro-vision)分析图像
AI.GENERATE(
('简要描述此图像。什么公司标志?', parts.object_ref)
) AS generated_result
FROM
`your-gcp-project-id.your-dataset-id.agent_events_v2` logs,
UNNEST(logs.content_parts) AS parts
WHERE
parts.mime_type LIKE 'image/%'
ORDER BY logs.timestamp DESC
LIMIT 1;
延迟分析(LLM 和工具)
SELECT
event_type,
AVG(CAST(JSON_VALUE(latency_ms, '$.total_ms') AS INT64)) as avg_latency_ms
FROM `your-gcp-project-id.your-dataset-id.agent_events_v2`
WHERE event_type IN ('LLM_RESPONSE', 'TOOL_COMPLETED')
GROUP BY event_type;
跨度层次结构和持续时间分析
SELECT
span_id,
parent_span_id,
event_type,
timestamp,
-- 从 latency_ms 中提取已完成操作的持续时间
CAST(JSON_VALUE(latency_ms, '$.total_ms') AS INT64) as duration_ms,
-- 识别特定工具或操作
COALESCE(
JSON_VALUE(content, '$.tool'),
'LLM_CALL'
) as operation
FROM `your-gcp-project-id.your-dataset-id.agent_events_v2`
WHERE trace_id = 'your-trace-id'
AND event_type IN ('LLM_RESPONSE', 'TOOL_COMPLETED')
ORDER BY timestamp ASC;