Skip to content

ADK 的 DBOS 插件

Supported in ADKPython

DBOS 是一个持久执行框架,用于构建可靠的工作流和 AI 智能体。它与 ADK 集成,使 LLM 调用、工具执行和智能体编排具有容错性和可扩展性。智能体在崩溃、部署或重启后精确地从断点处恢复——这一切都由你自己拥有的数据库支持,无需单独编排服务。

使用场景

DBOS 插件为 ADK 智能体增加了生产级的可靠性和编排能力:

  • 持久执行:持久化 LLM 和工具输出。从崩溃、部署或机器故障中自动恢复智能体,不会丢失进度或重复副作用。无需手动恢复会话
  • 内置重试和退避:可配置的重试策略,具有指数退避功能,可处理来自 LLM 提供方和工具执行的瞬时故障。
  • 长时间运行的智能体:运行智能体和工具数小时、数天或数月。
  • 人工参与:暂停执行并在收到外部信号或人工批准后恢复执行。
  • 具有速率限制的可扩展执行:在工作流中组合多个智能体,或使用持久队列和内置速率限制在分布式工作进程间扩展智能体工作流。
  • 可观测性和管理:从 DBOS 控制台检查、取消、恢复和分叉智能体工作流。

前置条件

安装

pip install dbos-google-adk

在智能体中使用

该集成包装了你的 ADK 智能体,使得每次 LLM 调用都作为持久的 DBOS 工作流步骤运行。使用 @DBOS.step() 装饰的工具函数会被单独检查点记录,并具有可配置的重试。

基础设置

通过将 DBOSPlugin 添加到你的 Runner 来定义智能体和工作流,并从 @DBOS.workflow() 驱动智能体:

import asyncio
import logging

from dbos import DBOS, DBOSConfig
from dbos_google_adk import DBOSPlugin
from google.adk.agents import LlmAgent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types

# 使用 @DBOS.step() 装饰工具调用以实现持久执行
@DBOS.step()
async def get_weather(city: str) -> str:
    """获取某个城市的天气。"""
    return f"{city} 天气晴朗"

agent = LlmAgent(name="weather", model="gemini-flash-latest", tools=[get_weather])
runner = Runner(
    app_name="my-agent",
    agent=agent,
    plugins=[DBOSPlugin()],
    session_service=InMemorySessionService(),
)

# 从 DBOS 工作流驱动智能体以实现持久执行
@DBOS.workflow()
async def run_agent(user_id: str, session_id: str, message: str) -> str:
    new_message = types.Content(role="user", parts=[types.Part.from_text(text=message)])
    async for event in runner.run_async(
        user_id=user_id, session_id=session_id, new_message=new_message
    ):
        if event.is_final_response():
            return event.content.parts[0].text
    return ""


async def main():
    # DBOS 默认检查点到 SQLite。生产环境建议使用 Postgres。
    config: DBOSConfig = {"name": "my-agent", "system_database_url": "sqlite:///dbostest.sqlite"}
    DBOS(config=config)
    DBOS.launch()

    await runner.session_service.create_session(
        app_name="my-agent", user_id="u", session_id="s"
    )
    print(await run_agent("u", "s", "旧金山的天气怎么样?"))


if __name__ == "__main__":
    asyncio.run(main())

持久事件压缩

对于持久事件压缩,使用 DBOSEventSummarizer 包装你的摘要器, 以便压缩 LLM 调用也被检查点记录:

from dbos_google_adk import DBOSEventSummarizer
from google.adk.models.google_llm import Gemini

summarizer = DBOSEventSummarizer.from_llm(Gemini(model="gemini-flash-latest"))

工作原理

DBOSPluginDBOSEventSummarizer 在持久的 DBOS 工作流中运行你的 ADK 智能体:

  • LLM 调用DBOSPlugin 拦截并作为 DBOS 步骤执行。如果调用失败或工作进程崩溃,DBOS 从最后一个成功步骤恢复,减少浪费的令牌消耗。
  • 工具函数使用 @DBOS.step() 装饰,被单独检查点记录。它们的输出存储在数据库中,因此重放会完全跳过已完成的工具执行。
  • 工作流执行在每一步后被序列化并存储在你的数据库(SQLite 或 Postgres)中。任何有权访问同一数据库的工作进程都可以接管执行,从而实现分布式故障转移和水平扩展。

功能特性

功能 描述
持久工具执行 除 LLM 调用外,使用 @DBOS.step() 装饰的工具函数也会在数据库中被检查点记录,并可在失败时进行可配置的重试
故障恢复 DBOS 在进程重启时从最后一个成功步骤恢复正在执行的工作流,或在分布式环境中通过 DBOS Conductor 自动故障转移
并行工具调用 单次 LLM 响应中的多个工具调用被并发分派,具有重放安全性,并在下一个 LLM 步骤之前合并
调试 逐步重放任何过去的工作流执行。从特定步骤分叉和重启工作流以修复错误
长时间运行的智能体 工作流可以运行数小时、数天或数月;状态保留在数据库中直到完成
可观测性 每次 LLM 调用和工具执行都是一个记录步骤,可在 DBOS 控制台仪表板或通过 OpenTelemetry 查看
人工参与 通过 DBOS 工作流通知暂停执行并在收到外部信号或人工批准后恢复
具有速率限制的可扩展执行 在工作流中组合多个智能体,或使用持久队列在分布式工作进程间执行智能体工作流。内置速率限制以处理 API 背压
安全版本管理 使用 DBOS 补丁或版本管理升级和部署新的智能体版本,不会中断正在执行的实例

其他资源