ADK 的 Google Cloud Pub/Sub 工具¶
Supported in ADKPython v1.22.0
PubSubToolset 允许智能体与 Google Cloud Pub/Sub 服务交互,以发布、拉取和确认消息。
前置条件¶
在使用 PubSubToolset 之前,你需要:
- 在你的 Google Cloud 项目中启用 Pub/Sub API。
- 身份验证和授权:确保运行智能体的主体(例如用户、服务账户)具有执行 Pub/Sub 操作所需的 IAM 权限。有关 Pub/Sub 角色的更多信息,请参阅 Pub/Sub 访问控制文档。
- 创建主题或订阅:创建主题以发布消息,并创建订阅以接收消息。
用法¶
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import os
from google.adk.agents import Agent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.tools.pubsub.config import PubSubToolConfig
from google.adk.tools.pubsub.pubsub_credentials import PubSubCredentialsConfig
from google.adk.tools.pubsub.pubsub_toolset import PubSubToolset
from google.genai import types
import google.auth
# Define constants for this example agent
AGENT_NAME = "pubsub_agent"
APP_NAME = "pubsub_app"
USER_ID = "user1234"
SESSION_ID = "1234"
GEMINI_MODEL = "gemini-2.0-flash"
# Define Pub/Sub tool config.
# You can optionally set the project_id here, or let the agent infer it from context/user input.
tool_config = PubSubToolConfig(project_id=os.getenv("GOOGLE_CLOUD_PROJECT"))
# Uses externally-managed Application Default Credentials (ADC) by default.
# This decouples authentication from the agent / tool lifecycle.
# https://cloud.google.com/docs/authentication/provide-credentials-adc
application_default_credentials, _ = google.auth.default()
credentials_config = PubSubCredentialsConfig(
credentials=application_default_credentials
)
# Instantiate a Pub/Sub toolset
pubsub_toolset = PubSubToolset(
credentials_config=credentials_config, pubsub_tool_config=tool_config
)
# Agent Definition
pubsub_agent = Agent(
model=GEMINI_MODEL,
name=AGENT_NAME,
description=(
"Agent to publish, pull, and acknowledge messages from Google Cloud"
" Pub/Sub."
),
instruction="""\
You are a cloud engineer agent with access to Google Cloud Pub/Sub tools.
You can publish messages to topics, pull messages from subscriptions, and acknowledge messages.
""",
tools=[pubsub_toolset],
)
# Session and Runner
session_service = InMemorySessionService()
session = asyncio.run(
session_service.create_session(
app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID
)
)
runner = Runner(
agent=pubsub_agent, app_name=APP_NAME, session_service=session_service
)
# Agent Interaction
def call_agent(query):
"""
Helper function to call the agent with a query.
"""
content = types.Content(role="user", parts=[types.Part(text=query)])
events = runner.run(user_id=USER_ID, session_id=SESSION_ID, new_message=content)
print("USER:", query)
for event in events:
if event.is_final_response():
final_response = event.content.parts[0].text
print("AGENT:", final_response)
call_agent("publish 'Hello World' to 'my-topic'")
call_agent("pull messages from 'my-subscription'")
工具¶
PubSubToolset 包含以下工具:
publish_message¶
发布消息到 Pub/Sub 主题。
| 参数 | 类型 | 描述 |
|---|---|---|
topic_name |
str |
Pub/Sub 主题的名称(例如 projects/my-project/topics/my-topic)。 |
message |
str |
要发布的消息内容。 |
attributes |
dict[str, str] |
(可选) 要附加到消息的属性。 |
ordering_key |
str |
(可选) 消息的排序键。如果你设置此参数,消息将按顺序发布。 |
pull_messages¶
从 Pub/Sub 订阅拉取消息。
| 参数 | 类型 | 描述 |
|---|---|---|
subscription_name |
str |
Pub/Sub 订阅的名称(例如 projects/my-project/subscriptions/my-sub)。 |
max_messages |
int |
(可选) 要拉取的最大消息数。默认为 1。 |
auto_ack |
bool |
(可选) 是否自动确认消息。默认为 False。 |
acknowledge_messages¶
确认 Pub/Sub 订阅上的一个或多个消息。
| 参数 | 类型 | 描述 |
|---|---|---|
subscription_name |
str |
Pub/Sub 订阅的名称(例如 projects/my-project/subscriptions/my-sub)。 |
ack_ids |
list[str] |
要确认的确认 ID 列表。 |