Skip to content

ADK 的 Google Cloud Pub/Sub 工具

Supported in ADKPython v1.22.0

PubSubToolset 允许智能体与 Google Cloud Pub/Sub 服务交互,以发布、拉取和确认消息。

前置条件

在使用 PubSubToolset 之前,你需要:

  1. 在你的 Google Cloud 项目中启用 Pub/Sub API
  2. 身份验证和授权:确保运行智能体的主体(例如用户、服务账户)具有执行 Pub/Sub 操作所需的 IAM 权限。有关 Pub/Sub 角色的更多信息,请参阅 Pub/Sub 访问控制文档
  3. 创建主题或订阅创建主题以发布消息,并创建订阅以接收消息。

用法

# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import os

from google.adk.agents import Agent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.tools.pubsub.config import PubSubToolConfig
from google.adk.tools.pubsub.pubsub_credentials import PubSubCredentialsConfig
from google.adk.tools.pubsub.pubsub_toolset import PubSubToolset
from google.genai import types
import google.auth

# Define constants for this example agent
AGENT_NAME = "pubsub_agent"
APP_NAME = "pubsub_app"
USER_ID = "user1234"
SESSION_ID = "1234"
GEMINI_MODEL = "gemini-2.0-flash"

# Define Pub/Sub tool config.
# You can optionally set the project_id here, or let the agent infer it from context/user input.
tool_config = PubSubToolConfig(project_id=os.getenv("GOOGLE_CLOUD_PROJECT"))

# Uses externally-managed Application Default Credentials (ADC) by default.
# This decouples authentication from the agent / tool lifecycle.
# https://cloud.google.com/docs/authentication/provide-credentials-adc
application_default_credentials, _ = google.auth.default()
credentials_config = PubSubCredentialsConfig(
    credentials=application_default_credentials
)

# Instantiate a Pub/Sub toolset
pubsub_toolset = PubSubToolset(
    credentials_config=credentials_config, pubsub_tool_config=tool_config
)

# Agent Definition
pubsub_agent = Agent(
    model=GEMINI_MODEL,
    name=AGENT_NAME,
    description=(
        "Agent to publish, pull, and acknowledge messages from Google Cloud"
        " Pub/Sub."
    ),
    instruction="""\
        You are a cloud engineer agent with access to Google Cloud Pub/Sub tools.
        You can publish messages to topics, pull messages from subscriptions, and acknowledge messages.
    """,
    tools=[pubsub_toolset],
)

# Session and Runner
session_service = InMemorySessionService()
session = asyncio.run(
    session_service.create_session(
        app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID
    )
)
runner = Runner(
    agent=pubsub_agent, app_name=APP_NAME, session_service=session_service
)


# Agent Interaction
def call_agent(query):
    """
    Helper function to call the agent with a query.
    """
    content = types.Content(role="user", parts=[types.Part(text=query)])
    events = runner.run(user_id=USER_ID, session_id=SESSION_ID, new_message=content)

    print("USER:", query)
    for event in events:
        if event.is_final_response():
            final_response = event.content.parts[0].text
            print("AGENT:", final_response)


call_agent("publish 'Hello World' to 'my-topic'")
call_agent("pull messages from 'my-subscription'")

工具

PubSubToolset 包含以下工具:

publish_message

发布消息到 Pub/Sub 主题。

参数 类型 描述
topic_name str Pub/Sub 主题的名称(例如 projects/my-project/topics/my-topic)。
message str 要发布的消息内容。
attributes dict[str, str] (可选) 要附加到消息的属性。
ordering_key str (可选) 消息的排序键。如果你设置此参数,消息将按顺序发布。

pull_messages

从 Pub/Sub 订阅拉取消息。

参数 类型 描述
subscription_name str Pub/Sub 订阅的名称(例如 projects/my-project/subscriptions/my-sub)。
max_messages int (可选) 要拉取的最大消息数。默认为 1
auto_ack bool (可选) 是否自动确认消息。默认为 False

acknowledge_messages

确认 Pub/Sub 订阅上的一个或多个消息。

参数 类型 描述
subscription_name str Pub/Sub 订阅的名称(例如 projects/my-project/subscriptions/my-sub)。
ack_ids list[str] 要确认的确认 ID 列表。