用于 ADK 的 Temporal 插件¶
ADK 已支持Python
Temporal 是一个通用的持久执行 (Durable Execution) 平台,它使 ADK 智能体具有韧性 (Resilient)、可扩展性,并达到生产就绪状态。LLM 调用和工具执行将作为具有自动重试和恢复功能的 Temporal 活动 (Activities) 运行。如果发生任何故障,你的智能体将完整地从中断处继续运行 —— 无需手动会话管理或外部数据库。
使用场景¶
Temporal 插件为你的智能体提供:
- 持久执行:永不丢失进度。如果你的智能体崩溃或停滞,Temporal 会自动从上一个成功的步骤恢复 —— 无需手动恢复会话。
- 内置重试和限流:可配置带有退避机制的重试策略 (Retry Policies),以及处理来自 LLM 提供者的背压机制。
- 长时间运行的后台智能体:支持使用阻塞等待运行数小时、数天甚至无限期的智能体和工具。
- 人机回环 (Human-in-the-loop):暂停执行直到人工批准,然后从中断处恢复。Temporal 的任务路由 (Task Routing) 可扩展地将传入信号(如用户聊天或审批)路由到正确的工作流。
- 可观测性与调试:检查智能体执行的每一步,确定性地回放工作流,并使用 Temporal UI 精确定位故障。
先决条件¶
- Python 3.10+
- Gemini API 密钥(或任何受支持的模型)
- 一个运行中的 Temporal 服务器(本地开发服务器、自托管或 Temporal Cloud)
- Temporal Python SDK 1.24.0
请注意,从 Temporal Python 1.24.0 开始,该集成尚处于实验阶段,未来可能会有破坏性更改。
安装¶
安装 Temporal Python SDK 以及 google-adk 额外组件:
在智能体中使用¶
基础设置¶
此集成包含两个部分:工作流侧 (Workflow side)(智能体运行的地方)和工作节点侧 (Worker side)(托管执行环境的地方)。
1. 定义智能体和工作流
创建一个 ADK 智能体并将其包装在 Temporal 工作流中。使用 TemporalModel 通过 Temporal 活动路由 LLM 调用。
from contextlib import aclosing
from datetime import timedelta
from google.adk.agents import Agent
from google.adk.runners import InMemoryRunner
from google.genai import types
from temporalio import activity, workflow
from temporalio.common import RetryPolicy
from temporalio.contrib.google_adk_agents import TemporalModel
from temporalio.contrib.google_adk_agents.workflow import activity_tool
from temporalio.workflow import ActivityConfig
# 定义一个 Temporal 活动
@activity.defn
async def get_weather(city: str) -> str:
"""获取城市的当前天气。"""
# 在此处调用你的天气 API
return f"{city} 天气晴朗,72°F"
# 将活动包装为 ADK 工具。该工具将具备记忆化、重试和超时功能。
weather_tool = activity_tool(
get_weather,
start_to_close_timeout=timedelta(seconds=30),
retry_policy=RetryPolicy(maximum_attempts=3),
)
# 使用你的智能体
agent = Agent(
name="weather_agent",
model=TemporalModel(
"gemini-2.5-pro",
activity_config=ActivityConfig(summary="天气智能体")),
tools=[weather_tool],
)
# 将智能体放入工作流中,赋予其持久执行能力。
@workflow.defn
class WeatherAgentWorkflow:
@workflow.run
async def run(self, user_message: str) -> str:
# 仅用于测试;生产环境请使用 Runner()
runner = InMemoryRunner(agent=agent, app_name="weather_app")
session = await runner.session_service.create_session(
user_id="user", app_name="weather_app"
)
result = ""
async with aclosing(runner.run_async(
user_id="user",
session_id=session.id,
new_message=types.Content(
role="user", parts=[types.Part.from_text(text=user_message)]
),
)) as events:
async for event in events:
if event.content and event.content.parts:
for part in event.content.parts:
if part.text:
result = part.text
return result
2. 配置并启动工作节点 (Worker)
使用 GoogleAdkPlugin 配置工作节点,使 ADK 准备好在分布式系统的工作流中运行:
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio.contrib.google_adk_agents import GoogleAdkPlugin
async def main():
client = await Client.connect(
"localhost:7233",
plugins=[GoogleAdkPlugin()]
)
worker = Worker(
client,
task_queue="my-agent-task-queue",
workflows=[WeatherAgentWorkflow],
activities=[get_weather],
)
await worker.run()
asyncio.run(main())
3. 启动工作流执行
import asyncio
from temporalio.client import Client
from temporalio.contrib.google_adk_agents import GoogleAdkPlugin
async def start():
client = await Client.connect(
"localhost:7233",
plugins=[GoogleAdkPlugin()]
)
result = await client.execute_workflow(
WeatherAgentWorkflow.run,
"旧金山的天气怎么样?",
id="weather-agent-1",
task_queue="my-agent-task-queue",
)
print(result)
asyncio.run(start())
使用 MCP 工具¶
将 MCP 工具作为 Temporal 活动执行:
from google.adk.agents import Agent
from google.adk.tools.mcp_tool import McpToolset
from google.adk.tools.mcp_tool.mcp_session_manager import StdioConnectionParams
from mcp import StdioServerParameters
from temporalio.client import Client
from temporalio.contrib.google_adk_agents import (
GoogleAdkPlugin,
TemporalModel,
TemporalMcpToolSet,
TemporalMcpToolSetProvider,
)
# 定义一个工厂,让 Temporal 根据需要实例化你的 MCPToolset。
toolset_provider = TemporalMcpToolSetProvider(
"my-tools",
lambda _: McpToolset(
connection_params=StdioConnectionParams(
server_params=StdioServerParameters(
command="npx",
args=["-y", "@modelcontextprotocol/server-filesystem", "/path/to/dir"],
),
),
),
)
# 使用工具集提供者配置客户端
client = await Client.connect(
"localhost:7233",
plugins=[GoogleAdkPlugin(toolset_providers=[toolset_provider])]
)
# 在声明智能体时按名称引用工具集(在 @workflow.run 内部)
agent = Agent(
name="tool_agent",
model=TemporalModel("gemini-2.5-pro"),
tools=[TemporalMcpToolSet("my-tools")],
)
工作原理¶
该插件确保你的 ADK 智能体在 Temporal 工作流代码中确定性地运行,并使输入和输出序列化并被记录,以便实现可靠的恢复。例如:
- LLM 调用通过
TemporalModel作为 Temporal 活动执行。如果调用失败或工作节点崩溃,Temporal 会重试或从上一个成功的步骤回放,从而增加韧性并减少 Token 消耗。 - 非确定性操作(如
time.time()、uuid.uuid4())在工作流代码(而非活动代码)中运行时,会自动替换为 Temporal 的确定性等效操作(workflow.now()、workflow.uuid4())。 - ADK 和 Gemini 模块针对 Temporal 的沙箱 (Sandbox) 环境进行了配置,并支持自动透传。
- 自动为 ADK 的数据类型配置 Pydantic 序列化。
其他能力¶
| 能力 | 描述 |
|---|---|
| 持久工具执行 | activity_tool 将工具函数包装为活动,支持长时间运行的工具、自动重试和心跳机制 (Heartbeating) |
| MCP 工具支持 | TemporalMcpToolSet 将 MCP 工具作为具有完整事件传播能力的活动执行 |
| 人机回环 | 你的智能体工作流可以等待信号 (Signals) 和更新 (Updates) 以等待用户输入,客户端可以发送这些内容来恢复智能体运行 |
| 确定性运行时 | GoogleAdkPlugin 将非确定性调用替换为 Temporal 安全的等效调用 |
| 可调试性 | 每次 LLM 调用和工具执行在 Temporal UI 中都作为活动可见,使调试故障变得极其简单。 |
| 可观测性 | 使用 OpenTelemetry 结合你喜爱的可观测性方案,提供对崩溃具有韧性的跨进程跨度 (Spans)。 |
| 安全版本控制 | 使用 Temporal 工作节点版本控制 (Worker Versioning) 部署新的智能体版本,且不会干扰正在运行的执行 |
| 多智能体编排 | 在一个工作流中组合多个智能体,或通过使用子工作流 (Child Workflows) 或 Nexus 将其扩展到更复杂的用例 |
更多资源¶
- Temporal Python SDK 文档 (Temporal Python SDK Documentation) —— Temporal Python SDK 的完整参考
- PyPI 上的 Temporal Python SDK —— Python 包
- Temporal Cloud —— 托管的 Temporal 服务
- 使用 Temporal 编排后台智能体 (Orchestrating Ambient Agents with Temporal) —— 关于长时间运行智能体模式的博客文章