ADK 的 Dapr 插件¶
Supported in ADKPython
Dapr 是一个分布式工作流编排引擎,使 ADK 智能体能够抵御故障。LLM 调用和工具执行作为 Dapr 工作流活动运行,具有自动重试和恢复功能。如果出现任何故障,你的智能体会自动从断点处继续执行。
使用场景¶
Dapr 插件为你的智能体提供:
- 持久执行:永不丢失进度。如果你的智能体崩溃或停滞,Dapr 自动从最后一个成功的活动恢复,无需手动恢复。
- 内置重试和退避:可配置的重试策略具有指数退避功能,可处理来自 LLM 提供方和工具 API 的瞬时故障。
- 长时间运行和常驻智能体:支持运行数小时、数天或无限期的智能体和工具,由 Dapr 的持久状态存储支持。
- 可移植基础设施:在不更改智能体代码的情况下切换 15 种以上的数据库(Redis、GCP Firestore、PostgreSQL、DynamoDB、Cosmos DB 以及更多)。Dapr 的可插拔组件模型让你可以从本地开发迁移到任何云环境。
- 可观测性和调试:使用 Dapr 的工作流 API 检查智能体执行的每一步,并通过 Dapr 内置的 OpenTelemetry 集成发出追踪和指标。
前置条件¶
- Python 3.11+
- 一个 Gemini API 密钥(或任何支持的模型)
- 已安装 Dapr CLI 和运行时(安装指南)
- 为工作流持久化配置了 Dapr 状态存储组件
安装¶
安装用于 Dapr 的 Diagrid Agent 包,其中包含 ADK 扩展:
初始化 Dapr:
在智能体中使用¶
基础设置¶
该集成包装了你的 ADK 智能体,使得每次 LLM 调用和每次工具执行都作为持久的 Dapr 工作流活动运行。运行器处理工作流注册、启动 Dapr 工作流运行时,并公开用于调用智能体的异步接口。
定义智能体和运行器
照常创建 ADK 智能体,并将其传递给 DaprWorkflowAgentRunner。
import asyncio
from google.adk.agents import LlmAgent
from google.adk.tools import FunctionTool
from diagrid.agent.adk import DaprWorkflowAgentRunner
def get_weather(city: str) -> str:
"""获取某个城市的当前天气。
参数:
city:要查询天气的城市名称。
返回:
描述天气的字符串。
"""
# 在此处调用你的天气 API
return f"{city} 天气晴朗,72°F"
# 定义 ADK 智能体
agent = LlmAgent(
name="weather_agent",
model="gemini-flash-latest",
instruction="你是一个可以查询天气的得力助手。",
tools=[FunctionTool(get_weather)],
)
async def main():
# 包装智能体,使每次工具调用都作为持久的 Dapr 活动运行
runner = DaprWorkflowAgentRunner(
agent=agent,
name="weather-agent",
max_iterations=10,
)
# 启动 Dapr 工作流运行时
runner.start()
try:
async for event in runner.run_async(
user_message="旧金山的天气怎么样?",
session_id="session-001",
):
if event["type"] == "workflow_completed":
print(event["final_response"])
finally:
runner.shutdown()
if __name__ == "__main__":
asyncio.run(main())
使用 Dapr 运行智能体
Dapr 工作流使用轻量级 Sidecar 和已配置的状态存储。使用以下 命令在本地运行 Dapr 并同时运行你的智能体:
Note
默认情况下,Dapr 使用随 Dapr 安装的 Redis 组件,位于 ~/.dapr/components/statestore.yaml。请参阅支持的状态存储以更改所使用的状态存储。
崩溃恢复¶
如果托管智能体的进程在执行过程中崩溃,Dapr 会在应用重启时自动从最后一个成功的活动恢复工作流——无需自定义重放逻辑。
# 第一次运行:工具 1 完成后进程崩溃。
# 第二次运行:Dapr 自动恢复并执行工具 2 和 3。
runner = DaprWorkflowAgentRunner(agent=agent, name="sequential-agent")
runner.start()
async for event in runner.run_async(
user_message="运行三步流水线。",
session_id="pipeline-001",
):
if event["type"] == "workflow_completed":
print(event["final_response"])
由于 session_id 和工作流实例 ID 是稳定的,使用 Dapr 重新启动同一应用会使 Sidecar 拾取正在执行的工作流并驱动它们完成,无需手动恢复。
工作原理¶
该插件将 ADK 智能体循环转换为 Dapr 工作流,使得每一步都被检查点记录、重试和自动重放:
- LLM 调用作为 Dapr 工作流活动执行。如果调用失败或工作进程崩溃,Dapr 根据配置的重试策略进行重试,或从最后一个成功的活动重放工作流,从而增加弹性并减少令牌消耗。
- 工具执行作为独立活动运行,每个工具调用一个活动。工作流通过 Dapr 的
when_all原语分发出并行工具调用,并在重新调用 LLM 之前等待它们完成。 - 工作流状态(消息、工具调用、工具结果)在每个活动之后被序列化并存储到配置的 Dapr 状态存储中,因此任何有权访问状态存储的副本都可以接管执行。
- 确定性编排:
agent_workflow函数仅包含确定性控制流;所有副作用(LLM 调用、工具调用)都在活动内部发生,这是 Dapr 工作流对重放安全性的要求。
功能特性¶
| 功能 | 描述 |
|---|---|
| 持久工具执行 | 每个 ADK 工具作为 Dapr 工作流活动运行,具有自动重试、退避和失败重放功能 |
| 并行工具调用 | 单次 LLM 响应中的多个工具调用被并发分派为活动,并在下一个 LLM 步骤之前合并 |
| 可移植状态存储 | 通过 Dapr 组件在 Redis、GCP Firestore、PostgreSQL、Cosmos DB 等多种存储之间切换,无需更改代码 |
| 长时间运行的智能体 | 工作流可以运行数小时、数天或无限期;状态保留在 Dapr 状态存储中直到完成 |
| 可观测性 | 每次 LLM 调用和工具执行都是一个工作流活动,可通过 Dapr 的 OpenTelemetry 集成追踪并通过工作流 API 检查 |
| Kubernetes 原生 | 使用 Dapr 的 Sidecar 注入将同一智能体部署到 Kubernetes,无需更改代码 |
其他资源¶
- Dapr 工作流文档 - Dapr 工作流构建块的完整参考
- Diagrid Agent SDK on GitHub - Dapr ADK 集成的源代码
- Dapr 社区 Discord - 提问、报告错误和社区讨论
- 支持的状态存储 - 与 Dapr 工作流兼容的状态存储组件列表