Skip to content

ADK 的 Dapr 插件

Supported in ADKPython

Dapr 是一个分布式工作流编排引擎,使 ADK 智能体能够抵御故障。LLM 调用和工具执行作为 Dapr 工作流活动运行,具有自动重试和恢复功能。如果出现任何故障,你的智能体会自动从断点处继续执行。

使用场景

Dapr 插件为你的智能体提供:

  • 持久执行:永不丢失进度。如果你的智能体崩溃或停滞,Dapr 自动从最后一个成功的活动恢复,无需手动恢复
  • 内置重试和退避:可配置的重试策略具有指数退避功能,可处理来自 LLM 提供方和工具 API 的瞬时故障。
  • 长时间运行和常驻智能体:支持运行数小时、数天或无限期的智能体和工具,由 Dapr 的持久状态存储支持。
  • 可移植基础设施:在不更改智能体代码的情况下切换 15 种以上的数据库(Redis、GCP Firestore、PostgreSQL、DynamoDB、Cosmos DB 以及更多)。Dapr 的可插拔组件模型让你可以从本地开发迁移到任何云环境。
  • 可观测性和调试:使用 Dapr 的工作流 API 检查智能体执行的每一步,并通过 Dapr 内置的 OpenTelemetry 集成发出追踪和指标。

前置条件

安装

安装用于 Dapr 的 Diagrid Agent 包,其中包含 ADK 扩展:

pip install diagrid

初始化 Dapr:

dapr init

在智能体中使用

基础设置

该集成包装了你的 ADK 智能体,使得每次 LLM 调用和每次工具执行都作为持久的 Dapr 工作流活动运行。运行器处理工作流注册、启动 Dapr 工作流运行时,并公开用于调用智能体的异步接口。

定义智能体和运行器

照常创建 ADK 智能体,并将其传递给 DaprWorkflowAgentRunner

import asyncio
from google.adk.agents import LlmAgent
from google.adk.tools import FunctionTool
from diagrid.agent.adk import DaprWorkflowAgentRunner


def get_weather(city: str) -> str:
    """获取某个城市的当前天气。

    参数:
        city:要查询天气的城市名称。

    返回:
        描述天气的字符串。
    """
    # 在此处调用你的天气 API
    return f"{city} 天气晴朗,72°F"


# 定义 ADK 智能体
agent = LlmAgent(
    name="weather_agent",
    model="gemini-flash-latest",
    instruction="你是一个可以查询天气的得力助手。",
    tools=[FunctionTool(get_weather)],
)


async def main():
    # 包装智能体,使每次工具调用都作为持久的 Dapr 活动运行
    runner = DaprWorkflowAgentRunner(
        agent=agent,
        name="weather-agent",
        max_iterations=10,
    )

    # 启动 Dapr 工作流运行时
    runner.start()

    try:
        async for event in runner.run_async(
            user_message="旧金山的天气怎么样?",
            session_id="session-001",
        ):
            if event["type"] == "workflow_completed":
                print(event["final_response"])
    finally:
        runner.shutdown()


if __name__ == "__main__":
    asyncio.run(main())

使用 Dapr 运行智能体

Dapr 工作流使用轻量级 Sidecar 和已配置的状态存储。使用以下 命令在本地运行 Dapr 并同时运行你的智能体:

dapr run --app-id weather-agent -- python3 agent.py

Note

默认情况下,Dapr 使用随 Dapr 安装的 Redis 组件,位于 ~/.dapr/components/statestore.yaml。请参阅支持的状态存储以更改所使用的状态存储。

崩溃恢复

如果托管智能体的进程在执行过程中崩溃,Dapr 会在应用重启时自动从最后一个成功的活动恢复工作流——无需自定义重放逻辑。

# 第一次运行:工具 1 完成后进程崩溃。
# 第二次运行:Dapr 自动恢复并执行工具 2 和 3。
runner = DaprWorkflowAgentRunner(agent=agent, name="sequential-agent")
runner.start()

async for event in runner.run_async(
    user_message="运行三步流水线。",
    session_id="pipeline-001",
):
    if event["type"] == "workflow_completed":
        print(event["final_response"])

由于 session_id 和工作流实例 ID 是稳定的,使用 Dapr 重新启动同一应用会使 Sidecar 拾取正在执行的工作流并驱动它们完成,无需手动恢复。

工作原理

该插件将 ADK 智能体循环转换为 Dapr 工作流,使得每一步都被检查点记录、重试和自动重放:

  • LLM 调用作为 Dapr 工作流活动执行。如果调用失败或工作进程崩溃,Dapr 根据配置的重试策略进行重试,或从最后一个成功的活动重放工作流,从而增加弹性并减少令牌消耗。
  • 工具执行作为独立活动运行,每个工具调用一个活动。工作流通过 Dapr 的 when_all 原语分发出并行工具调用,并在重新调用 LLM 之前等待它们完成。
  • 工作流状态(消息、工具调用、工具结果)在每个活动之后被序列化并存储到配置的 Dapr 状态存储中,因此任何有权访问状态存储的副本都可以接管执行。
  • 确定性编排agent_workflow 函数仅包含确定性控制流;所有副作用(LLM 调用、工具调用)都在活动内部发生,这是 Dapr 工作流对重放安全性的要求。

功能特性

功能 描述
持久工具执行 每个 ADK 工具作为 Dapr 工作流活动运行,具有自动重试、退避和失败重放功能
并行工具调用 单次 LLM 响应中的多个工具调用被并发分派为活动,并在下一个 LLM 步骤之前合并
可移植状态存储 通过 Dapr 组件在 Redis、GCP Firestore、PostgreSQL、Cosmos DB 等多种存储之间切换,无需更改代码
长时间运行的智能体 工作流可以运行数小时、数天或无限期;状态保留在 Dapr 状态存储中直到完成
可观测性 每次 LLM 调用和工具执行都是一个工作流活动,可通过 Dapr 的 OpenTelemetry 集成追踪并通过工作流 API 检查
Kubernetes 原生 使用 Dapr 的 Sidecar 注入将同一智能体部署到 Kubernetes,无需更改代码

其他资源