Skip to content

用于 ADK 的 Temporal 插件

ADK 已支持Python

Temporal 是一个通用的持久执行平台,使 ADK 智能体具有韧性、可扩展性和生产就绪性。LLM 调用和工具执行作为 Temporal 活动 运行,具有自动重试和恢复功能。如果发生任何故障,你的智能体会从上次中断的地方继续执行 - 无需手动会话管理或外部数据库。

使用场景

Temporal 插件为你的智能体提供:

  • 持久执行:永不丢失进度。如果你的智能体崩溃或停滞,Temporal 会自动从上一个成功的步骤恢复 —— 无需手动恢复会话
  • 内置重试和限流:可配置带有退避机制的重试策略 (Retry Policies),以及处理来自 LLM 提供者的背压机制。
  • 长时间运行的后台智能体:支持使用阻塞等待运行数小时、数天甚至无限期的智能体和工具。
  • 人机回环 (Human-in-the-loop):暂停执行直到人工批准,然后从中断处恢复。Temporal 的任务路由 (Task Routing) 可扩展地将传入信号(如用户聊天或审批)路由到正确的工作流。
  • 可观测性与调试:检查智能体执行的每一步,确定性地回放工作流,并使用 Temporal UI 精确定位故障。

先决条件

请注意,从 Temporal Python 1.24.0 开始,该集成尚处于实验阶段,未来可能会有破坏性更改。

安装

安装 Temporal Python SDK 以及 google-adk 扩展包:

pip install "temporalio[google-adk]"

在智能体中使用

基础设置

此集成包含两个部分:工作流侧 (Workflow side)(智能体运行的地方)和工作节点侧 (Worker side)(托管执行环境的地方)。

1. 定义智能体和工作流

创建一个 ADK 智能体并将其包装在 Temporal 工作流中。使用 TemporalModel 通过 Temporal 活动路由 LLM 调用。

from contextlib import aclosing
from datetime import timedelta
from google.adk.agents import Agent
from google.adk.runners import InMemoryRunner
from google.genai import types
from temporalio import activity, workflow
from temporalio.common import RetryPolicy
from temporalio.contrib.google_adk_agents import TemporalModel
from temporalio.contrib.google_adk_agents.workflow import activity_tool
from temporalio.workflow import ActivityConfig

# 定义一个 Temporal 活动

@activity.defn
async def get_weather(city: str) -> str:
    """获取城市的当前天气。"""
    # 在此处调用你的天气 API
    return f"{city} 天气晴朗,72°F"

# 将活动包装为 ADK 工具。该工具将具备记忆化、重试和超时功能。
weather_tool = activity_tool(
    get_weather,
    start_to_close_timeout=timedelta(seconds=30),
    retry_policy=RetryPolicy(maximum_attempts=3),
)

# 使用你的智能体
agent = Agent(
    name="weather_agent",
    model=TemporalModel(
      "gemini-flash-latest",
      activity_config=ActivityConfig(summary="天气智能体")),
    tools=[weather_tool],
)

# 将智能体放入工作流中,赋予其持久执行能力。

@workflow.defn
class WeatherAgentWorkflow:
    @workflow.run
    async def run(self, user_message: str) -> str:
        # 仅用于测试;生产环境请使用 Runner()
        runner = InMemoryRunner(agent=agent, app_name="weather_app")
        session = await runner.session_service.create_session(
            user_id="user", app_name="weather_app"
        )
        result = ""
        async with aclosing(runner.run_async(
            user_id="user",
            session_id=session.id,
            new_message=types.Content(
                role="user", parts=[types.Part.from_text(text=user_message)]
            ),
        )) as events:
            async for event in events:
                if event.content and event.content.parts:
                    for part in event.content.parts:
                        if part.text:
                            result = part.text
        return result

2. 配置并启动工作节点 (Worker)

使用 GoogleAdkPlugin 配置工作节点,使 ADK 能够在分布式系统上的工作流中运行:

import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio.contrib.google_adk_agents import GoogleAdkPlugin

async def main():
    client = await Client.connect(
        "localhost:7233",
        plugins=[GoogleAdkPlugin()]
    )

    worker = Worker(
        client,
        task_queue="my-agent-task-queue",
        workflows=[WeatherAgentWorkflow],
        activities=[get_weather],
    )
    await worker.run()

asyncio.run(main())

3. 启动工作流执行

import asyncio
from temporalio.client import Client
from temporalio.contrib.google_adk_agents import GoogleAdkPlugin

async def start():
    client = await Client.connect(
        "localhost:7233",
        plugins=[GoogleAdkPlugin()]
    )
    result = await client.execute_workflow(
        WeatherAgentWorkflow.run,
        "旧金山的天气怎么样?",
        id="weather-agent-1",
        task_queue="my-agent-task-queue",
    )
    print(result)

asyncio.run(start())

使用 MCP 工具

MCP 工具作为 Temporal 活动执行:

from google.adk.agents import Agent
from google.adk.tools.mcp_tool import McpToolset
from google.adk.tools.mcp_tool.mcp_session_manager import StdioConnectionParams
from mcp import StdioServerParameters
from temporalio.client import Client
from temporalio.contrib.google_adk_agents import (
    GoogleAdkPlugin,
    TemporalModel,
    TemporalMcpToolSet,
    TemporalMcpToolSetProvider,
)

# 定义一个共享的 MCP 工具集工厂。
# 工作节点(TemporalMcpToolSetProvider)和智能体(TemporalMcpToolSet)都使用该工厂。
def toolset_factory(_):
    return McpToolset(
        connection_params=StdioConnectionParams(
            server_params=StdioServerParameters(
                command="npx",
                args=["-y", "@modelcontextprotocol/server-filesystem", "/path/to/dir"],
            ),
        ),
    )

# 提供者告诉工作节点如何实例化工具集。
toolset_provider = TemporalMcpToolSetProvider("my-tools", toolset_factory)

# 使用工具集提供者配置客户端
client = await Client.connect(
    "localhost:7233",
    plugins=[GoogleAdkPlugin(toolset_providers=[toolset_provider])]
)

# 在声明智能体时按名称引用工具集(在 @workflow.run 内部)。
# not_in_workflow_toolset 允许该智能体也可以通过 `adk web` 在本地运行。
agent = Agent(
    name="tool_agent",
    model=TemporalModel("gemini-flash-latest"),
    tools=[TemporalMcpToolSet("my-tools", not_in_workflow_toolset=toolset_factory)],
)

使用 adk web 进行本地开发

为方便本地开发,Temporal 包装器在 Temporal 工作流之外运行时会自动回退到直接执行,因此你可以使用 adk web 和其他 ADK 开发命令,而无需运行 Temporal 服务器。在此模式下,你无法获得持久执行的好处,也无法精确测试生产环境行为。

  • TemporalModelactivity_tool 会自动工作——它们会检测到自己在工作流之外,并直接调用底层 LLM 或函数。
  • TemporalMcpToolSet 需要 not_in_workflow_toolset 参数(如上方的 MCP 示例所示),以便知道如何在本地实例化工具集。

工作原理

该插件确保你的 ADK 智能体在 Temporal 工作流代码中确定性地运行,并将输入和输出序列化并记录下来,以实现稳健的恢复。例如:

  • LLM 调用通过 TemporalModel 作为 Temporal 活动执行。如果调用失败或工作节点崩溃,Temporal 会从最后一个成功的步骤进行重试或回放,从而增强韧性并减少 Token 消耗。
  • 非确定性操作(如 time.time()uuid.uuid4())在工作流代码(而非活动代码)中运行时,会自动替换为 Temporal 的确定性等价实现(workflow.now()workflow.uuid4())。
  • ADK 和 Gemini 模块已配置为在 Temporal 的沙箱环境中运行,并自动通过白名单。
  • Pydantic 序列化已自动配置用于 ADK 的数据类型。

其他能力

能力 描述
Durable tool execution activity_tool 将工具函数包装为活动,支持长时间运行的工具、自动重试和心跳检测
MCP tool support TemporalMcpToolSet 将 MCP 工具作为活动执行,支持完整的事件传播
Human-in-the-loop 你的智能体工作流可以等待信号更新以等待人工输入,客户端可以发送这些信号以恢复智能体
Deterministic runtime GoogleAdkPlugin 将非确定性调用替换为 Temporal 安全的等价实现
Debuggability 每次 LLM 调用和工具执行在 Temporal UI 中都可见为一个活动,使调试变得简单
Observability 使用 OpenTelemetry 与你喜欢的可观测性解决方案配合使用,具有跨进程且能抵御崩溃的追踪能力
Safe versioning 使用 Temporal Worker 版本管理部署新的智能体版本,不会中断正在执行的任务
Multi-agent orchestration 在工作流内组合多个智能体,或通过使用子工作流Nexus 扩展到更复杂的用例

其他资源