Skip to content

ADK 中的多智能体系统

Supported in ADKPython v0.1.0Go v0.1.0Java v0.2.0

随着智能体应用程序变得越来越复杂,将它们构建为单一的、整体式智能体可能会变得难以开发、维护和理解。智能体开发套件 (ADK) 支持通过将多个不同的 BaseAgent 实例组合成多智能体系统 (MAS) 来构建复杂的应用程序。

在 ADK 中,多智能体系统是一个应用程序,其中不同的智能体(通常形成层次结构)协作或协调以实现更大的目标。以这种方式构建你的应用程序提供了显著的优势,包括增强的模块化、专业化、可重用性、可维护性,以及使用专用工作流智能体定义结构化控制流的能力。

你可以组合从 BaseAgent 派生的各种类型的智能体来构建这些系统:

  • LLM 智能体: 由大型语言模型驱动的智能体。(参见 LLM 智能体
  • 工作流智能体: 专门设计用于管理其子智能体执行流程的智能体(SequentialAgentParallelAgentLoopAgent)。(参见工作流智能体
  • 自定义智能体: 你自己的继承自 BaseAgent 并具有特殊非 LLM 逻辑的智能体。(参见自定义智能体

以下部分详细介绍了使你能够有效构建和管理这些多智能体系统的核心 ADK 原语——如智能体层次结构、工作流智能体和交互机制。

1. ADK 智能体组合原语

ADK 提供了核心构建块——原语——使你能够在多智能体系统中构建和管理交互。

Note

这些原语的具体参数或方法名可能因 SDK 语言而略有不同(如 Python 中为 sub_agents,Java 中为 subAgents)。详情请参考对应语言的 API 文档。

1.1. 智能体层次结构(父智能体,子智能体)

多智能体系统结构的基础是 BaseAgent 中定义的父子关系。

  • 建立层次结构: 通过在初始化父智能体时,将一组智能体实例传递给 sub_agents 参数来创建树状结构。ADK 会自动在初始化时为每个子智能体设置 parent_agent 属性。
  • 单一父级规则: 一个智能体实例只能作为子智能体添加一次。尝试为其分配第二个父级会导致 ValueError
  • 重要性: 该层次结构定义了工作流智能体的作用域,并影响 LLM 驱动委托的潜在目标。你可以通过 agent.parent_agent 导航层次结构,或用 agent.find_agent(name) 查找后代。
# 概念示例:定义层次结构
from google.adk.agents import LlmAgent, BaseAgent

# 定义单个智能体
greeter = LlmAgent(name="Greeter", model="gemini-2.0-flash")
task_doer = BaseAgent(name="TaskExecutor") # 自定义非 LLM 智能体

# 创建父智能体并通过 sub_agents 分配子智能体
coordinator = LlmAgent(
    name="Coordinator",
    model="gemini-2.0-flash",
    description="我负责协调问候和任务。",
    sub_agents=[ # 在这里分配子智能体
        greeter,
        task_doer
    ]
)

# 框架会自动设置:
# assert greeter.parent_agent == coordinator
# assert task_doer.parent_agent == coordinator
// 概念示例:定义层次结构
import com.google.adk.agents.SequentialAgent;
import com.google.adk.agents.LlmAgent;

// 定义单个智能体
LlmAgent greeter = LlmAgent.builder().name("Greeter").model("gemini-2.0-flash").build();
SequentialAgent taskDoer = SequentialAgent.builder().name("TaskExecutor").subAgents(...).build(); // 顺序智能体

// 创建父智能体并分配子智能体
LlmAgent coordinator = LlmAgent.builder()
    .name("Coordinator")
    .model("gemini-2.0-flash")
    .description("我负责协调问候和任务")
    .subAgents(greeter, taskDoer) // 在这里分配子智能体
    .build();

// 框架会自动设置:
// assert greeter.parentAgent().equals(coordinator);
// assert taskDoer.parentAgent().equals(coordinator);
import (
    "google.golang.org/adk/agent"
    "google.golang.org/adk/agent/llmagent"
)

// Conceptual Example: Defining Hierarchy
// Define individual agents
greeter, _ := llmagent.New(llmagent.Config{Name: "Greeter", Model: m})
taskDoer, _ := agent.New(agent.Config{Name: "TaskExecutor"}) // Custom non-LLM agent

// Create parent agent and assign children via sub_agents
coordinator, _ := llmagent.New(llmagent.Config{
    Name:        "Coordinator",
    Model:       m,
    Description: "I coordinate greetings and tasks.",
    SubAgents:   []agent.Agent{greeter, taskDoer}, // Assign sub_agents here
})

1.2. 工作流智能体作为编排器

ADK 包含从 BaseAgent 派生的专门智能体,它们本身不执行任务,而是编排其 sub_agents 的执行流程。

  • SequentialAgent 按照列出的顺序依次执行其 sub_agents
    • 上下文: 顺序传递相同的 InvocationContext,使智能体能够通过共享状态轻松传递结果。
# 概念示例:顺序流水线
from google.adk.agents import SequentialAgent, LlmAgent

step1 = LlmAgent(name="Step1_Fetch", output_key="data") # 输出保存到 state['data']
step2 = LlmAgent(name="Step2_Process", instruction="Process data from {data}.")

pipeline = SequentialAgent(name="MyPipeline", sub_agents=[step1, step2])
# 当 pipeline 运行时,Step2 可以访问 Step1 设置的 state['data']。
// 概念示例:顺序流水线
import com.google.adk.agents.SequentialAgent;
import com.google.adk.agents.LlmAgent;

LlmAgent step1 = LlmAgent.builder().name("Step1_Fetch").outputKey("data").build(); // 输出保存到 state.get("data")
LlmAgent step2 = LlmAgent.builder().name("Step2_Process").instruction("Process data from {data}.").build();

SequentialAgent pipeline = SequentialAgent.builder().name("MyPipeline").subAgents(step1, step2).build();
// 当 pipeline 运行时,Step2 可以访问 Step1 设置的 state.get("data")。
import (
    "google.golang.org/adk/agent"
    "google.golang.org/adk/agent/llmagent"
    "google.golang.org/adk/agent/workflowagents/sequentialagent"
)

// Conceptual Example: Sequential Pipeline
step1, _ := llmagent.New(llmagent.Config{Name: "Step1_Fetch", OutputKey: "data", Model: m}) // Saves output to state["data"]
step2, _ := llmagent.New(llmagent.Config{Name: "Step2_Process", Instruction: "Process data from {data}.", Model: m})

pipeline, _ := sequentialagent.New(sequentialagent.Config{
    AgentConfig: agent.Config{Name: "MyPipeline", SubAgents: []agent.Agent{step1, step2}},
})
// When pipeline runs, Step2 can access the state["data"] set by Step1.
  • ParallelAgent 并行执行其 sub_agents。来自子智能体的事件可能会交错。
    • 上下文: 为每个子智能体修改 InvocationContext.branch(例如,ParentBranch.ChildName),提供一个不同的上下文路径,这在某些内存实现中对于隔离历史记录可能很有用。
    • 状态: 尽管分支不同,所有并行子智能体都访问相同的共享 session.state,使它们能够读取初始状态和写入结果(使用不同的键以避免竞争条件)。
# 概念示例:并行执行
from google.adk.agents import ParallelAgent, LlmAgent

fetch_weather = LlmAgent(name="WeatherFetcher", output_key="weather")
fetch_news = LlmAgent(name="NewsFetcher", output_key="news")

gatherer = ParallelAgent(name="InfoGatherer", sub_agents=[fetch_weather, fetch_news])
# 当 gatherer 运行时,WeatherFetcher 和 NewsFetcher 并发运行。
# 后续智能体可以读取 state['weather'] 和 state['news']。
// 概念示例:并行执行
import com.google.adk.agents.LlmAgent;
import com.google.adk.agents.ParallelAgent;

LlmAgent fetchWeather = LlmAgent.builder()
    .name("WeatherFetcher")
    .outputKey("weather")
    .build();

LlmAgent fetchNews = LlmAgent.builder()
    .name("NewsFetcher")
    .instruction("news")
    .build();

ParallelAgent gatherer = ParallelAgent.builder()
    .name("InfoGatherer")
    .subAgents(fetchWeather, fetchNews)
    .build();

// 当 gatherer 运行时,WeatherFetcher 和 NewsFetcher 并发运行。
// 后续智能体可以读取 state['weather'] 和 state['news']。
import (
    "google.golang.org/adk/agent"
    "google.golang.org/adk/agent/llmagent"
    "google.golang.org/adk/agent/workflowagents/parallelagent"
)

// Conceptual Example: Parallel Execution
fetchWeather, _ := llmagent.New(llmagent.Config{Name: "WeatherFetcher", OutputKey: "weather", Model: m})
fetchNews, _ := llmagent.New(llmagent.Config{Name: "NewsFetcher", OutputKey: "news", Model: m})

gatherer, _ := parallelagent.New(parallelagent.Config{
    AgentConfig: agent.Config{Name: "InfoGatherer", SubAgents: []agent.Agent{fetchWeather, fetchNews}},
})
// When gatherer runs, WeatherFetcher and NewsFetcher run concurrently.
// A subsequent agent could read state["weather"] and state["news"].
  • LoopAgent 在循环中顺序执行其 sub_agents
    • 终止: 如果达到可选的 max_iterations 或任何子智能体返回带有 escalate=TrueEvent(在其事件操作中),循环将停止。
    • 上下文和状态: 在每次迭代中传递相同的 InvocationContext,允许状态更改(例如,计数器、标志)在循环间持续存在。
# 概念示例:带条件的循环
from google.adk.agents import LoopAgent, LlmAgent, BaseAgent
from google.adk.events import Event, EventActions
from google.adk.agents.invocation_context import InvocationContext
from typing import AsyncGenerator

class CheckCondition(BaseAgent): # 自定义智能体用于检查状态
    async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event, None]:
        status = ctx.session.state.get("status", "pending")
        is_done = (status == "completed")
        yield Event(author=self.name, actions=EventActions(escalate=is_done)) # 如果完成则升级

process_step = LlmAgent(name="ProcessingStep") # 可能会更新 state['status'] 的智能体

poller = LoopAgent(
    name="StatusPoller",
    max_iterations=10,
    sub_agents=[process_step, CheckCondition(name="Checker")]
)
# 当 poller 运行时,会反复执行 process_step 和 Checker,
# 直到 Checker 升级(state['status'] == 'completed')或达到 10 次迭代。
// 概念示例:带条件的循环
// 自定义智能体用于检查状态并可能升级
public static class CheckConditionAgent extends BaseAgent {
  public CheckConditionAgent(String name, String description) {
    super(name, description, List.of(), null, null);
  }

  @Override
  protected Flowable<Event> runAsyncImpl(InvocationContext ctx) {
    String status = (String) ctx.session().state().getOrDefault("status", "pending");
    boolean isDone = "completed".equalsIgnoreCase(status);

    // 生成一个事件,如果条件满足则升级(退出循环)。
    // 如果未完成,escalate 标志为 false 或缺失,循环继续。
    Event checkEvent = Event.builder()
            .author(name())
            .id(Event.generateEventId()) // 重要:为事件生成唯一 ID
            .actions(EventActions.builder().escalate(isDone).build()) // 如果完成则升级
            .build();
    return Flowable.just(checkEvent);
  }
}

// 可能会更新 state.put("status") 的智能体
LlmAgent processingStepAgent = LlmAgent.builder().name("ProcessingStep").build();
// 检查条件的自定义智能体实例
CheckConditionAgent conditionCheckerAgent = new CheckConditionAgent(
    "ConditionChecker",
    "检查 status 是否为 'completed'。"
);
LoopAgent poller = LoopAgent.builder().name("StatusPoller").maxIterations(10).subAgents(processingStepAgent, conditionCheckerAgent).build();
// 当 poller 运行时,会反复执行 processingStepAgent 和 conditionCheckerAgent,
// 直到 Checker 升级(state.get("status") == "completed")或达到 10 次迭代。
import (
    "iter"
    "google.golang.org/adk/agent"
    "google.golang.org/adk/agent/llmagent"
    "google.golang.org/adk/agent/workflowagents/loopagent"
    "google.golang.org/adk/session"
)

// Conceptual Example: Loop with Condition
// Custom agent to check state
checkCondition, _ := agent.New(agent.Config{
    Name: "Checker",
    Run: func(ctx agent.InvocationContext) iter.Seq2[*session.Event, error] {
        return func(yield func(*session.Event, error) bool) {
            status, err := ctx.Session().State().Get("status")
            // If "status" is not in the state, default to "pending".
            // This is idiomatic Go for handling a potential error on lookup.
            if err != nil {
                status = "pending"
            }
            isDone := status == "completed"
            yield(&session.Event{Author: "Checker", Actions: session.EventActions{Escalate: isDone}}, nil)
        }
    },
})

processStep, _ := llmagent.New(llmagent.Config{Name: "ProcessingStep", Model: m}) // Agent that might update state["status"]

poller, _ := loopagent.New(loopagent.Config{
    MaxIterations: 10,
    AgentConfig:   agent.Config{Name: "StatusPoller", SubAgents: []agent.Agent{processStep, checkCondition}},
})
// When poller runs, it executes processStep then Checker repeatedly
// until Checker escalates (state["status"] == "completed") or 10 iterations pass.

1.3. 交互与通信机制

系统内的智能体通常需要相互交换数据或触发动作。ADK 通过以下方式实现这一点:

a) 共享会话状态(session.state

在同一调用中运行的智能体(因此通过 InvocationContext 共享相同的 Session 对象)被动通信的最基本方式。

  • 机制: 一个智能体(或其工具/回调)写入一个值(context.state['data_key'] = processed_data),随后的智能体读取它(data = context.state.get('data_key'))。状态变更通过 CallbackContext 跟踪。
  • 便利性: LlmAgent 上的 output_key 属性自动将智能体的最终响应文本(或结构化输出)保存到指定的状态键。
  • 性质: 异步、被动通信。非常适合由 SequentialAgent 编排的管道或跨 LoopAgent 迭代传递数据。
  • 另见: 状态管理

调用上下文和 temp: 状态

当父智能体调用子智能体时,它传递相同的 InvocationContext。这意味着它们共享相同的临时(temp:)状态,这对于传递仅与当前轮次相关的数据是理想的。

# 概念示例:使用 output_key 和读取 state
from google.adk.agents import LlmAgent, SequentialAgent

agent_A = LlmAgent(name="AgentA", instruction="Find the capital of France.", output_key="capital_city")
agent_B = LlmAgent(name="AgentB", instruction="Tell me about the city stored in {capital_city}.")

pipeline = SequentialAgent(name="CityInfo", sub_agents=[agent_A, agent_B])
# AgentA 运行后,将 "Paris" 保存到 state['capital_city']。
# AgentB 运行时,其指令处理器读取 state['capital_city'] 获取 "Paris"。
// 概念示例:使用 outputKey 和读取 state
import com.google.adk.agents.LlmAgent;
import com.google.adk.agents.SequentialAgent;

LlmAgent agentA = LlmAgent.builder()
    .name("AgentA")
    .instruction("Find the capital of France.")
    .outputKey("capital_city")
    .build();

LlmAgent agentB = LlmAgent.builder()
    .name("AgentB")
    .instruction("Tell me about the city stored in {capital_city}.")
    .outputKey("capital_city")
    .build();

SequentialAgent pipeline = SequentialAgent.builder().name("CityInfo").subAgents(agentA, agentB).build();
// AgentA 运行后,将 "Paris" 保存到 state('capital_city')。
// AgentB 运行时,其指令处理器读取 state.get("capital_city") 获取 "Paris"。
import (
    "google.golang.org/adk/agent"
    "google.golang.org/adk/agent/llmagent"
    "google.golang.org/adk/agent/workflowagents/sequentialagent"
)

// Conceptual Example: Using output_key and reading state
agentA, _ := llmagent.New(llmagent.Config{Name: "AgentA", Instruction: "Find the capital of France.", OutputKey: "capital_city", Model: m})
agentB, _ := llmagent.New(llmagent.Config{Name: "AgentB", Instruction: "Tell me about the city stored in {capital_city}.", Model: m})

pipeline2, _ := sequentialagent.New(sequentialagent.Config{
    AgentConfig: agent.Config{Name: "CityInfo", SubAgents: []agent.Agent{agentA, agentB}},
})
// AgentA runs, saves "Paris" to state["capital_city"].
// AgentB runs, its instruction processor reads state["capital_city"] to get "Paris".

b) LLM 驱动的委托(智能体转移)

利用 LlmAgent 的理解能力来动态将任务路由到层次结构中其他合适的智能体。

  • 机制: 智能体的 LLM 生成特定的函数调用:transfer_to_agent(agent_name='target_agent_name')
  • 处理: 默认情况下,当存在子智能体或未禁止转移时使用的 AutoFlow 会拦截此调用。它使用 root_agent.find_agent() 识别目标智能体并更新 InvocationContext 以切换执行焦点。
  • 要求: 调用的 LlmAgent 需要明确的 instructions,说明何时转移,而潜在的目标智能体需要明确的 description,以便 LLM 做出明智的决定。可以在 LlmAgent 上配置转移范围(父级、子智能体、同级)。
  • 性质: 基于 LLM 解释的动态、灵活路由。
# 概念示例:LLM Transfer
from google.adk.agents import LlmAgent

booking_agent = LlmAgent(name="Booker", description="Handles flight and hotel bookings.")
info_agent = LlmAgent(name="Info", description="Provides general information and answers questions.")

coordinator = LlmAgent(
    name="Coordinator",
    model="gemini-2.0-flash",
    instruction="You are an assistant. Delegate booking tasks to Booker and info requests to Info.",
    description="Main coordinator.",
    # AutoFlow 通常在这里被隐式使用
    sub_agents=[booking_agent, info_agent]
)
# 如果 coordinator 收到 "Book a flight",其 LLM 应生成:
# FunctionCall(name='transfer_to_agent', args={'agent_name': 'Booker'})
# ADK 框架随后将执行路由到 booking_agent。
// 概念示例:LLM Transfer
import com.google.adk.agents.LlmAgent;

LlmAgent bookingAgent = LlmAgent.builder()
    .name("Booker")
    .description("Handles flight and hotel bookings.")
    .build();

LlmAgent infoAgent = LlmAgent.builder()
    .name("Info")
    .description("Provides general information and answers questions.")
    .build();

// 定义协调智能体
LlmAgent coordinator = LlmAgent.builder()
    .name("Coordinator")
    .model("gemini-2.0-flash") // 或你想要的模型
    .instruction("You are an assistant. Delegate booking tasks to Booker and info requests to Info.")
    .description("Main coordinator.")
    // 因为存在 subAgents 且未禁止转移,所以默认(隐式)使用 AutoFlow。
    .subAgents(bookingAgent, infoAgent)
    .build();

// 如果 coordinator 收到 "Book a flight",其 LLM 应生成:
// FunctionCall.builder.name("transferToAgent").args(ImmutableMap.of("agent_name", "Booker")).build()
// ADK 框架随后将执行路由到 bookingAgent。
import (
    "google.golang.org/adk/agent/llmagent"
)

// Conceptual Setup: LLM Transfer
bookingAgent, _ := llmagent.New(llmagent.Config{Name: "Booker", Description: "Handles flight and hotel bookings.", Model: m})
infoAgent, _ := llmagent.New(llmagent.Config{Name: "Info", Description: "Provides general information and answers questions.", Model: m})

coordinator, _ = llmagent.New(llmagent.Config{
    Name:        "Coordinator",
    Model:       m,
    Instruction: "You are an assistant. Delegate booking tasks to Booker and info requests to Info.",
    Description: "Main coordinator.",
    SubAgents:   []agent.Agent{bookingAgent, infoAgent},
})

// If coordinator receives "Book a flight", its LLM should generate:
// FunctionCall{Name: "transfer_to_agent", Args: map[string]any{"agent_name": "Booker"}}
// ADK framework then routes execution to bookingAgent.

c) 显式调用 (AgentTool)

允许 LlmAgent 将另一个 BaseAgent 实例视为可调用函数或工具

  • 机制: 将目标智能体实例包装在 AgentTool 中并将其包含在父 LlmAgenttools 列表中。AgentTool 为 LLM 生成相应的函数声明。
  • 处理: 当父 LLM 生成针对 AgentTool 的函数调用时,框架执行 AgentTool.run_async。此方法运行目标智能体,捕获其最终响应,将任何状态/制品更改转发回父级的上下文,并将响应作为工具的结果返回。
  • 性质: 同步(在父级的流程内),明确、受控的调用,就像任何其他工具一样。
  • (注意: 需要显式导入和使用 AgentTool)。
# 概念示例:Agent as a Tool
from google.adk.agents import LlmAgent, BaseAgent
from google.adk.tools import agent_tool
from pydantic import BaseModel

# 定义目标智能体(可以是 LlmAgent 或自定义 BaseAgent)
class ImageGeneratorAgent(BaseAgent): # 示例自定义智能体
    name: str = "ImageGen"
    description: str = "Generates an image based on a prompt."
    # ... 内部逻辑 ...
    async def _run_async_impl(self, ctx): # 简化的运行逻辑
        prompt = ctx.session.state.get("image_prompt", "default prompt")
        # ... 生成图片字节 ...
        image_bytes = b"..."
        yield Event(author=self.name, content=types.Content(parts=[types.Part.from_bytes(image_bytes, "image/png")]))

image_agent = ImageGeneratorAgent()
image_tool = agent_tool.AgentTool(agent=image_agent) # 包装该智能体

# 父智能体使用 AgentTool
artist_agent = LlmAgent(
    name="Artist",
    model="gemini-2.0-flash",
    instruction="Create a prompt and use the ImageGen tool to generate the image.",
    tools=[image_tool] # 包含 AgentTool
)
# Artist LLM 生成 prompt,然后调用:
# FunctionCall(name='ImageGen', args={'image_prompt': 'a cat wearing a hat'})
# 框架调用 image_tool.run_async(...),运行 ImageGeneratorAgent。
# 生成的图片 Part 作为工具结果返回给 Artist agent。
// 概念示例:Agent as a Tool
import com.google.adk.agents.BaseAgent;
import com.google.adk.agents.LlmAgent;
import com.google.adk.tools.AgentTool;

// 示例自定义智能体(可以是 LlmAgent 或自定义 BaseAgent)
public class ImageGeneratorAgent extends BaseAgent  {

  public ImageGeneratorAgent(String name, String description) {
    super(name, description, List.of(), null, null);
  }

  // ... 内部逻辑 ...
  @Override
  protected Flowable<Event> runAsyncImpl(InvocationContext invocationContext) { // 简化的运行逻辑
    invocationContext.session().state().get("image_prompt");
    // 生成图片字节
    // ...

    Event responseEvent = Event.builder()
        .author(this.name())
        .content(Content.fromParts(Part.fromText("\b...")))
        .build();

    return Flowable.just(responseEvent);
  }

  @Override
  protected Flowable<Event> runLiveImpl(InvocationContext invocationContext) {
    return null;
  }
}

// 使用 AgentTool 包装该智能体
ImageGeneratorAgent imageAgent = new ImageGeneratorAgent("image_agent", "generates images");
AgentTool imageTool = AgentTool.create(imageAgent);

// 父智能体使用 AgentTool
LlmAgent artistAgent = LlmAgent.builder()
        .name("Artist")
        .model("gemini-2.0-flash")
        .instruction(
                "You are an artist. Create a detailed prompt for an image and then " +
                        "use the 'ImageGen' tool to generate the image. " +
                        "The 'ImageGen' tool expects a single string argument named 'request' " +
                        "containing the image prompt. The tool will return a JSON string in its " +
                        "'result' field, containing 'image_base64', 'mime_type', and 'status'."
        )
        .description("An agent that can create images using a generation tool.")
        .tools(imageTool) // 包含 AgentTool
        .build();

// Artist LLM 生成 prompt,然后调用:
// FunctionCall(name='ImageGen', args={'imagePrompt': 'a cat wearing a hat'})
// 框架调用 imageTool.runAsync(...),运行 ImageGeneratorAgent。
// 生成的图片 Part 作为工具结果返回给 Artist agent。
import (
    "fmt"
    "iter"
    "google.golang.org/adk/agent"
    "google.golang.org/adk/agent/llmagent"
    "google.golang.org/adk/model"
    "google.golang.org/adk/session"
    "google.golang.org/adk/tool"
    "google.golang.org/adk/tool/agenttool"
    "google.golang.org/genai"
)

// Conceptual Setup: Agent as a Tool
// Define a target agent (could be LlmAgent or custom BaseAgent)
imageAgent, _ := agent.New(agent.Config{
    Name:        "ImageGen",
    Description: "Generates an image based on a prompt.",
    Run: func(ctx agent.InvocationContext) iter.Seq2[*session.Event, error] {
        return func(yield func(*session.Event, error) bool) {
            prompt, _ := ctx.Session().State().Get("image_prompt")
            fmt.Printf("Generating image for prompt: %v\n", prompt)
            imageBytes := []byte("...") // Simulate image bytes
            yield(&session.Event{
                Author: "ImageGen",
                LLMResponse: model.LLMResponse{
                    Content: &genai.Content{
                        Parts: []*genai.Part{genai.NewPartFromBytes(imageBytes, "image/png")},
                    },
                },
            }, nil)
        }
    },
})

// Wrap the agent
imageTool := agenttool.New(imageAgent, nil)

// Now imageTool can be used as a tool by other agents.

// Parent agent uses the AgentTool
artistAgent, _ := llmagent.New(llmagent.Config{
    Name:        "Artist",
    Model:       m,
    Instruction: "Create a prompt and use the ImageGen tool to generate the image.",
    Tools:       []tool.Tool{imageTool}, // Include the AgentTool
})
// Artist LLM generates a prompt, then calls:
// FunctionCall{Name: "ImageGen", Args: map[string]any{"image_prompt": "a cat wearing a hat"}}
// Framework calls imageTool.Run(...), which runs ImageGeneratorAgent.
// The resulting image Part is returned to the Artist agent as the tool result.

这些原语提供了设计多智能体交互的灵活性,范围从紧密耦合的顺序工作流到动态的、由 LLM 驱动的委托网络。

2. 使用 ADK 原语的常见多智能体模式

通过组合 ADK 的组合原语,你可以实现多智能体协作的各种已建立模式。

协调员/调度员模式

  • 结构: 一个中央 LlmAgent(协调员)管理几个专业 sub_agents
  • 目标: 将传入请求路由到适当的专家智能体。
  • 使用的 ADK 原语:
    • 层次结构: 协调员在 sub_agents 中列出专家。
    • 交互: 主要使用 LLM 驱动的委托(要求子智能体有明确的 description 和协调员有适当的 instruction)或 显式调用(AgentTool(协调员在其 tools 中包含 AgentTool 包装的专家)。
# 概念示例:Coordinator using LLM Transfer
from google.adk.agents import LlmAgent

billing_agent = LlmAgent(name="Billing", description="Handles billing inquiries.")
support_agent = LlmAgent(name="Support", description="Handles technical support requests.")

coordinator = LlmAgent(
    name="HelpDeskCoordinator",
    model="gemini-2.0-flash",
    instruction="Route user requests: Use Billing agent for payment issues, Support agent for technical problems.",
    description="Main help desk router.",
    # 在 AutoFlow 中,allow_transfer=True 对于 sub_agents 通常是隐式的
    sub_agents=[billing_agent, support_agent]
)
# 用户问 "My payment failed" -> Coordinator 的 LLM 应调用 transfer_to_agent(agent_name='Billing')
# 用户问 "I can't log in" -> Coordinator 的 LLM 应调用 transfer_to_agent(agent_name='Support')
// 概念示例:Coordinator using LLM Transfer
import com.google.adk.agents.LlmAgent;

LlmAgent billingAgent = LlmAgent.builder()
    .name("Billing")
    .description("Handles billing inquiries and payment issues.")
    .build();

LlmAgent supportAgent = LlmAgent.builder()
    .name("Support")
    .description("Handles technical support requests and login problems.")
    .build();

LlmAgent coordinator = LlmAgent.builder()
    .name("HelpDeskCoordinator")
    .model("gemini-2.0-flash")
    .instruction("Route user requests: Use Billing agent for payment issues, Support agent for technical problems.")
    .description("Main help desk router.")
    .subAgents(billingAgent, supportAgent)
    // 在 Autoflow 中,对于子智能体,智能体转移是隐式的,除非使用
    // .disallowTransferToParent 或 disallowTransferToPeers 指定
    .build();

// 用户问 "My payment failed" -> Coordinator 的 LLM 应调用
// transferToAgent(agentName='Billing')
// 用户问 "I can't log in" -> Coordinator 的 LLM 应调用
// transferToAgent(agentName='Support')
import (
    "google.golang.org/adk/agent"
    "google.golang.org/adk/agent/llmagent"
)

// Conceptual Code: Coordinator using LLM Transfer
billingAgent, _ := llmagent.New(llmagent.Config{Name: "Billing", Description: "Handles billing inquiries.", Model: m})
supportAgent, _ := llmagent.New(llmagent.Config{Name: "Support", Description: "Handles technical support requests.", Model: m})

coordinator, _ := llmagent.New(llmagent.Config{
    Name:        "HelpDeskCoordinator",
    Model:       m,
    Instruction: "Route user requests: Use Billing agent for payment issues, Support agent for technical problems.",
    Description: "Main help desk router.",
    SubAgents:   []agent.Agent{billingAgent, supportAgent},
})
// User asks "My payment failed" -> Coordinator's LLM should call transfer_to_agent(agent_name='Billing')
// User asks "I can't log in" -> Coordinator's LLM should call transfer_to_agent(agent_name='Support')

顺序流水线模式

  • 结构: 一个 SequentialAgent 包含按固定顺序执行的 sub_agents
  • 目标: 实现一个多步骤流程,其中一个步骤的输出作为下一个步骤的输入。
  • 使用的 ADK 原语:
    • 工作流: SequentialAgent 定义顺序。
    • 通信: 主要使用共享会话状态。较早的智能体写入结果(通常通过 output_key),较晚的智能体从 context.state 读取这些结果。
# 概念示例:Sequential Data Pipeline
from google.adk.agents import SequentialAgent, LlmAgent

validator = LlmAgent(name="ValidateInput", instruction="Validate the input.", output_key="validation_status")
processor = LlmAgent(name="ProcessData", instruction="Process data if {validation_status} is 'valid'.", output_key="result")
reporter = LlmAgent(name="ReportResult", instruction="Report the result from {result}.")

data_pipeline = SequentialAgent(
    name="DataPipeline",
    sub_agents=[validator, processor, reporter]
)
# validator runs -> saves to state['validation_status']
# processor runs -> reads state['validation_status'], saves to state['result']
# reporter runs -> reads state['result']
// 概念示例:Sequential Data Pipeline
import com.google.adk.agents.SequentialAgent;

LlmAgent validator = LlmAgent.builder()
    .name("ValidateInput")
    .instruction("Validate the input")
    .outputKey("validation_status") // 将其主要文本输出保存到 session.state["validation_status"]
    .build();

LlmAgent processor = LlmAgent.builder()
    .name("ProcessData")
    .instruction("Process data if {validation_status} is 'valid'")
    .outputKey("result") // 将其主要文本输出保存到 session.state["result"]
    .build();

LlmAgent reporter = LlmAgent.builder()
    .name("ReportResult")
    .instruction("Report the result from {result}")
    .build();

SequentialAgent dataPipeline = SequentialAgent.builder()
    .name("DataPipeline")
    .subAgents(validator, processor, reporter)
    .build();

// validator runs -> saves to state['validation_status']
// processor runs -> reads state['validation_status'], saves to state['result']
// reporter runs -> reads state['result']
import (
    "google.golang.org/adk/agent"
    "google.golang.org/adk/agent/llmagent"
    "google.golang.org/adk/agent/workflowagents/sequentialagent"
)

// Conceptual Code: Sequential Data Pipeline
validator, _ := llmagent.New(llmagent.Config{Name: "ValidateInput", Instruction: "Validate the input.", OutputKey: "validation_status", Model: m})
processor, _ := llmagent.New(llmagent.Config{Name: "ProcessData", Instruction: "Process data if {validation_status} is 'valid'.", OutputKey: "result", Model: m})
reporter, _ := llmagent.New(llmagent.Config{Name: "ReportResult", Instruction: "Report the result from {result}.", Model: m})

dataPipeline, _ := sequentialagent.New(sequentialagent.Config{
    AgentConfig: agent.Config{Name: "DataPipeline", SubAgents: []agent.Agent{validator, processor, reporter}},
})
// validator runs -> saves to state["validation_status"]
// processor runs -> reads state["validation_status"], saves to state["result"]
// reporter runs -> reads state["result"]

并行扇出/收集模式

  • 结构: 一个ParallelAgent并发运行多个sub_agents,通常后面跟着一个 (在SequentialAgent中的) 聚合结果的智能体。
  • 目标: 同时执行独立任务以减少延迟,然后合并它们的输出。
  • 使用的 ADK 原语:
    • 工作流: 使用ParallelAgent进行并发执行 (扇出)。通常嵌套在SequentialAgent中以处理后续的聚合步骤 (聚合)。
    • 通信: 子智能体将结果写入共享会话状态中的不同键。随后的"聚合"智能体读取多个状态键。
# 概念示例:Parallel Information Gathering
from google.adk.agents import SequentialAgent, ParallelAgent, LlmAgent

fetch_api1 = LlmAgent(name="API1Fetcher", instruction="Fetch data from API 1.", output_key="api1_data")
fetch_api2 = LlmAgent(name="API2Fetcher", instruction="Fetch data from API 2.", output_key="api2_data")

gather_concurrently = ParallelAgent(
    name="ConcurrentFetch",
    sub_agents=[fetch_api1, fetch_api2]
)

synthesizer = LlmAgent(
    name="Synthesizer",
    instruction="Combine results from {api1_data} and {api2_data}."
)

overall_workflow = SequentialAgent(
    name="FetchAndSynthesize",
    sub_agents=[gather_concurrently, synthesizer] # Run parallel fetch, then synthesize
)
# fetch_api1 and fetch_api2 run concurrently, saving to state.
# synthesizer runs afterwards, reading state['api1_data'] and state['api2_data'].
// 概念示例:Parallel Information Gathering
import com.google.adk.agents.LlmAgent;
import com.google.adk.agents.ParallelAgent;
import com.google.adk.agents.SequentialAgent;

LlmAgent fetchApi1 = LlmAgent.builder()
    .name("API1Fetcher")
    .instruction("Fetch data from API 1.")
    .outputKey("api1_data")
    .build();

LlmAgent fetchApi2 = LlmAgent.builder()
    .name("API2Fetcher")
    .instruction("Fetch data from API 2.")
    .outputKey("api2_data")
    .build();

ParallelAgent gatherConcurrently = ParallelAgent.builder()
    .name("ConcurrentFetcher")
    .subAgents(fetchApi2, fetchApi1)
    .build();

LlmAgent synthesizer = LlmAgent.builder()
    .name("Synthesizer")
    .instruction("Combine results from {api1_data} and {api2_data}.")
    .build();

SequentialAgent overallWorfklow = SequentialAgent.builder()
    .name("FetchAndSynthesize") // Run parallel fetch, then synthesize
    .subAgents(gatherConcurrently, synthesizer)
    .build();

// fetch_api1 and fetch_api2 run concurrently, saving to state.
// synthesizer runs afterwards, reading state['api1_data'] and state['api2_data'].
import (
    "google.golang.org/adk/agent"
    "google.golang.org/adk/agent/llmagent"
    "google.golang.org/adk/agent/workflowagents/parallelagent"
    "google.golang.org/adk/agent/workflowagents/sequentialagent"
)

// Conceptual Code: Parallel Information Gathering
fetchAPI1, _ := llmagent.New(llmagent.Config{Name: "API1Fetcher", Instruction: "Fetch data from API 1.", OutputKey: "api1_data", Model: m})
fetchAPI2, _ := llmagent.New(llmagent.Config{Name: "API2Fetcher", Instruction: "Fetch data from API 2.", OutputKey: "api2_data", Model: m})

gatherConcurrently, _ := parallelagent.New(parallelagent.Config{
    AgentConfig: agent.Config{Name: "ConcurrentFetch", SubAgents: []agent.Agent{fetchAPI1, fetchAPI2}},
})

synthesizer, _ := llmagent.New(llmagent.Config{Name: "Synthesizer", Instruction: "Combine results from {api1_data} and {api2_data}.", Model: m})

overallWorkflow, _ := sequentialagent.New(sequentialagent.Config{
    AgentConfig: agent.Config{Name: "FetchAndSynthesize", SubAgents: []agent.Agent{gatherConcurrently, synthesizer}},
})
// fetch_api1 and fetch_api2 run concurrently, saving to state.
// synthesizer runs afterwards, reading state["api1_data"] and state["api2_data"].

分层任务分解模式

  • 结构: 一个多层次的智能体树,其中高级智能体分解复杂目标并将子任务委托给低级智能体。
  • 目标: 通过递归地将复杂问题分解为更简单、可执行的步骤来解决问题。
  • 使用的 ADK 原语:
    • 层次结构: 多级 parent_agent/sub_agents 结构。
    • 交互: 主要使用LLM 驱动的委托显式调用(AgentTool,由父智能体用来向子智能体分配任务。结果通过层次结构向上返回(通过工具响应或状态)。
# 概念示例:分层研究任务
from google.adk.agents import LlmAgent
from google.adk.tools import agent_tool

# 底层类工具智能体
web_searcher = LlmAgent(name="WebSearch", description="Performs web searches for facts.")
summarizer = LlmAgent(name="Summarizer", description="Summarizes text.")

# 结合工具的中层智能体
research_assistant = LlmAgent(
    name="ResearchAssistant",
    model="gemini-2.0-flash",
    description="Finds and summarizes information on a topic.",
    tools=[agent_tool.AgentTool(agent=web_searcher), agent_tool.AgentTool(agent=summarizer)]
)

# 委托研究的高层智能体
report_writer = LlmAgent(
    name="ReportWriter",
    model="gemini-2.0-flash",
    instruction="Write a report on topic X. Use the ResearchAssistant to gather information.",
    tools=[agent_tool.AgentTool(agent=research_assistant)]
    # 或者,如果 research_assistant 是一个子智能体,也可以使用 LLM 转移
)
# 用户与 ReportWriter 交互。
# ReportWriter 调用 ResearchAssistant 工具。
# ResearchAssistant 调用 WebSearch 和 Summarizer 工具。
# 结果向上回流。
// 概念示例:分层研究任务
import com.google.adk.agents.LlmAgent;
import com.google.adk.tools.AgentTool;

// 底层类工具智能体
LlmAgent webSearcher = LlmAgent.builder()
    .name("WebSearch")
    .description("Performs web searches for facts.")
    .build();

LlmAgent summarizer = LlmAgent.builder()
    .name("Summarizer")
    .description("Summarizes text.")
    .build();

// 结合工具的中层智能体
LlmAgent researchAssistant = LlmAgent.builder()
    .name("ResearchAssistant")
    .model("gemini-2.0-flash")
    .description("Finds and summarizes information on a topic.")
    .tools(AgentTool.create(webSearcher), AgentTool.create(summarizer))
    .build();

// 委托研究的高层智能体
LlmAgent reportWriter = LlmAgent.builder()
    .name("ReportWriter")
    .model("gemini-2.0-flash")
    .instruction("Write a report on topic X. Use the ResearchAssistant to gather information.")
    .tools(AgentTool.create(researchAssistant))
    // 或者,如果 research_assistant 是一个 subAgent,也可以使用 LLM 转移
    .build();

// 用户与 ReportWriter 交互。
// ReportWriter 调用 ResearchAssistant 工具。
// ResearchAssistant 调用 WebSearch 和 Summarizer 工具。
// 结果向上回流。
import (
    "google.golang.org/adk/agent/llmagent"
    "google.golang.org/adk/tool"
    "google.golang.org/adk/tool/agenttool"
)

// Conceptual Code: Hierarchical Research Task
// Low-level tool-like agents
webSearcher, _ := llmagent.New(llmagent.Config{Name: "WebSearch", Description: "Performs web searches for facts.", Model: m})
summarizer, _ := llmagent.New(llmagent.Config{Name: "Summarizer", Description: "Summarizes text.", Model: m})

// Mid-level agent combining tools
webSearcherTool := agenttool.New(webSearcher, nil)
summarizerTool := agenttool.New(summarizer, nil)
researchAssistant, _ := llmagent.New(llmagent.Config{
    Name:        "ResearchAssistant",
    Model:       m,
    Description: "Finds and summarizes information on a topic.",
    Tools:       []tool.Tool{webSearcherTool, summarizerTool},
})

// High-level agent delegating research
researchAssistantTool := agenttool.New(researchAssistant, nil)
reportWriter, _ := llmagent.New(llmagent.Config{
    Name:        "ReportWriter",
    Model:       m,
    Instruction: "Write a report on topic X. Use the ResearchAssistant to gather information.",
    Tools:       []tool.Tool{researchAssistantTool},
})
// User interacts with ReportWriter.
// ReportWriter calls ResearchAssistant tool.
// ResearchAssistant calls WebSearch and Summarizer tools.
// Results flow back up.

Review/Critique 模式(生成器 - 评论器)

  • 结构: 通常在SequentialAgent中包含两个智能体:一个生成者和一个评论者/审核者。
  • 目标: 通过专门的智能体审核来提高生成输出的质量或有效性。
  • 使用的 ADK 原语:
    • 工作流: SequentialAgent确保生成在审核之前发生。
    • 通信: 共享会话状态(生成者使用output_key保存输出;审核者读取该状态键)。审核者可能将其反馈保存到另一个状态键,供后续步骤使用。
# 概念示例:Generator-Critic
from google.adk.agents import SequentialAgent, LlmAgent

generator = LlmAgent(
    name="DraftWriter",
    instruction="Write a short paragraph about subject X.",
    output_key="draft_text"
)

reviewer = LlmAgent(
    name="FactChecker",
    instruction="Review the text in {draft_text} for factual accuracy. Output 'valid' or 'invalid' with reasons.",
    output_key="review_status"
)

# 可选:基于 review_status 的后续步骤

review_pipeline = SequentialAgent(
    name="WriteAndReview",
    sub_agents=[generator, reviewer]
)
# generator runs -> saves draft to state['draft_text']
# reviewer runs -> reads state['draft_text'], saves status to state['review_status']
// 概念示例:Generator-Critic
import com.google.adk.agents.LlmAgent;
import com.google.adk.agents.SequentialAgent;

LlmAgent generator = LlmAgent.builder()
    .name("DraftWriter")
    .instruction("Write a short paragraph about subject X.")
    .outputKey("draft_text")
    .build();

LlmAgent reviewer = LlmAgent.builder()
    .name("FactChecker")
    .instruction("Review the text in {draft_text} for factual accuracy. Output 'valid' or 'invalid' with reasons.")
    .outputKey("review_status")
    .build();

// 可选:基于 review_status 的后续步骤

SequentialAgent reviewPipeline = SequentialAgent.builder()
    .name("WriteAndReview")
    .subAgents(generator, reviewer)
    .build();

// generator 运行后,将草稿保存到 state['draft_text']
// reviewer 运行后,读取 state['draft_text'],将状态保存到 state['review_status']
import (
    "google.golang.org/adk/agent"
    "google.golang.org/adk/agent/llmagent"
    "google.golang.org/adk/agent/workflowagents/sequentialagent"
)

// Conceptual Code: Generator-Critic
generator, _ := llmagent.New(llmagent.Config{
    Name:        "DraftWriter",
    Instruction: "Write a short paragraph about subject X.",
    OutputKey:   "draft_text",
    Model:       m,
})

reviewer, _ := llmagent.New(llmagent.Config{
    Name:        "FactChecker",
    Instruction: "Review the text in {draft_text} for factual accuracy. Output 'valid' or 'invalid' with reasons.",
    OutputKey:   "review_status",
    Model:       m,
})

reviewPipeline, _ := sequentialagent.New(sequentialagent.Config{
    AgentConfig: agent.Config{Name: "WriteAndReview", SubAgents: []agent.Agent{generator, reviewer}},
})
// generator runs -> saves draft to state["draft_text"]
// reviewer runs -> reads state["draft_text"], saves status to state["review_status"]

迭代优化模式

  • 结构: 使用包含一个或多个智能体的 LoopAgent,这些智能体在多次迭代中处理任务。
  • 目标: 逐步改进存储在会话状态中的结果(如代码、文本、计划),直到达到质量阈值或达到最大迭代次数。
  • 使用的 ADK 原语:
    • 工作流: LoopAgent 管理重复。
    • 通信: 共享会话状态对于智能体读取前一次迭代的输出并保存改进版本至关重要。
    • 终止: 循环通常基于 max_iterations 结束,或专用检查智能体在结果令人满意时在 Event Actions 中设置 escalate=True
# 概念示例:Iterative Code Refinement
from google.adk.agents import LoopAgent, LlmAgent, BaseAgent
from google.adk.events import Event, EventActions
from google.adk.agents.invocation_context import InvocationContext
from typing import AsyncGenerator

# 根据 state['current_code'] 和 state['requirements'] 生成/改进代码的智能体
code_refiner = LlmAgent(
    name="CodeRefiner",
    instruction="Read state['current_code'] (if exists) and state['requirements']. Generate/refine Python code to meet requirements. Save to state['current_code'].",
    output_key="current_code" # 每次覆盖 state 中的代码
)

# 检查代码是否符合质量标准的智能体
quality_checker = LlmAgent(
    name="QualityChecker",
    instruction="Evaluate the code in state['current_code'] against state['requirements']. Output 'pass' or 'fail'.",
    output_key="quality_status"
)

# 自定义智能体检查状态并在 'pass' 时升级
class CheckStatusAndEscalate(BaseAgent):
    async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event, None]:
        status = ctx.session.state.get("quality_status", "fail")
        should_stop = (status == "pass")
        yield Event(author=self.name, actions=EventActions(escalate=should_stop))

refinement_loop = LoopAgent(
    name="CodeRefinementLoop",
    max_iterations=5,
    sub_agents=[code_refiner, quality_checker, CheckStatusAndEscalate(name="StopChecker")]
)
# 循环运行:Refiner -> Checker -> StopChecker
# 每次迭代更新 state['current_code']。
# 如果 QualityChecker 输出 'pass'(导致 StopChecker 升级)或达到 5 次迭代则循环停止。
// 概念示例:Iterative Code Refinement
import com.google.adk.agents.BaseAgent;
import com.google.adk.agents.LlmAgent;
import com.google.adk.agents.LoopAgent;
import com.google.adk.events.Event;
import com.google.adk.events.EventActions;
import com.google.adk.agents.InvocationContext;
import io.reactivex.rxjava3.core.Flowable;
import java.util.List;

// 根据 state['current_code'] 和 state['requirements'] 生成/改进代码的智能体
LlmAgent codeRefiner = LlmAgent.builder()
    .name("CodeRefiner")
    .instruction("Read state['current_code'] (if exists) and state['requirements']. Generate/refine Java code to meet requirements. Save to state['current_code'].")
    .outputKey("current_code") // 每次覆盖 state 中的代码
    .build();

// 检查代码是否符合质量标准的智能体
LlmAgent qualityChecker = LlmAgent.builder()
    .name("QualityChecker")
    .instruction("Evaluate the code in state['current_code'] against state['requirements']. Output 'pass' or 'fail'.")
    .outputKey("quality_status")
    .build();

BaseAgent checkStatusAndEscalate = new BaseAgent(
    "StopChecker","Checks quality_status and escalates if 'pass'.", List.of(), null, null) {

  @Override
  protected Flowable<Event> runAsyncImpl(InvocationContext invocationContext) {
    String status = (String) invocationContext.session().state().getOrDefault("quality_status", "fail");
    boolean shouldStop = "pass".equals(status);

    EventActions actions = EventActions.builder().escalate(shouldStop).build();
    Event event = Event.builder()
        .author(this.name())
        .actions(actions)
        .build();
    return Flowable.just(event);
  }
};

LoopAgent refinementLoop = LoopAgent.builder()
    .name("CodeRefinementLoop")
    .maxIterations(5)
    .subAgents(codeRefiner, qualityChecker, checkStatusAndEscalate)
    .build();

// 循环运行:Refiner -> Checker -> StopChecker
// 每次迭代更新 state['current_code']。
// 如果 QualityChecker 输出 'pass'(导致 StopChecker 升级)或达到 5 次迭代则循环停止。
import (
    "iter"
    "google.golang.org/adk/agent"
    "google.golang.org/adk/agent/llmagent"
    "google.golang.org/adk/agent/workflowagents/loopagent"
    "google.golang.org/adk/session"
)

// Conceptual Code: Iterative Code Refinement
codeRefiner, _ := llmagent.New(llmagent.Config{
    Name:        "CodeRefiner",
    Instruction: "Read state['current_code'] (if exists) and state['requirements']. Generate/refine Python code to meet requirements. Save to state['current_code'].",
    OutputKey:   "current_code",
    Model:       m,
})

qualityChecker, _ := llmagent.New(llmagent.Config{
    Name:        "QualityChecker",
    Instruction: "Evaluate the code in state['current_code'] against state['requirements']. Output 'pass' or 'fail'.",
    OutputKey:   "quality_status",
    Model:       m,
})

checkStatusAndEscalate, _ := agent.New(agent.Config{
    Name: "StopChecker",
    Run: func(ctx agent.InvocationContext) iter.Seq2[*session.Event, error] {
        return func(yield func(*session.Event, error) bool) {
            status, _ := ctx.Session().State().Get("quality_status")
            shouldStop := status == "pass"
            yield(&session.Event{Author: "StopChecker", Actions: session.EventActions{Escalate: shouldStop}}, nil)
        }
    },
})

refinementLoop, _ := loopagent.New(loopagent.Config{
    MaxIterations: 5,
    AgentConfig:   agent.Config{Name: "CodeRefinementLoop", SubAgents: []agent.Agent{codeRefiner, qualityChecker, checkStatusAndEscalate}},
})
// Loop runs: Refiner -> Checker -> StopChecker
// State["current_code"] is updated each iteration.
// Loop stops if QualityChecker outputs 'pass' (leading to StopChecker escalating) or after 5 iterations.

人工介入模式

  • 结构: 在智能体工作流中集成人类干预点。
  • 目标: 允许人类监督、批准、纠正或执行 AI 无法完成的任务。
  • 使用的 ADK 原语(概念):
    • 交互: 可以通过自定义工具实现,该工具暂停执行并向外部系统(例如 UI、工单系统)发送请求,等待人类输入。然后该工具将人类的响应返回给智能体。
    • 工作流: 可以使用LLM 驱动的委托transfer_to_agent)指向概念性的"人类智能体"触发外部工作流,或在LlmAgent中使用自定义工具。
    • 状态/回调: 状态可以保存人类任务详情;回调可以管理交互流程。
    • 注意: ADK 没有内置的"人类智能体"类型,所以这需要自定义集成。
# 概念示例:使用工具进行人工审批
from google.adk.agents import LlmAgent, SequentialAgent
from google.adk.tools import FunctionTool

# --- 假设 external_approval_tool 已存在 ---
# 该工具将:
# 1. 接收详情(如 request_id, amount, reason)。
# 2. 将这些详情发送到人工审核系统(如通过 API)。
# 3. 轮询或等待人工响应(approved/rejected)。
# 4. 返回人工决策。
# async def external_approval_tool(amount: float, reason: str) -> str: ...
approval_tool = FunctionTool(func=external_approval_tool)

# 准备请求的智能体
prepare_request = LlmAgent(
    name="PrepareApproval",
    instruction="Prepare the approval request details based on user input. Store amount and reason in state.",
    # ... 可能设置 state['approval_amount'] 和 state['approval_reason'] ...
)

# 调用人工审批工具的智能体
request_approval = LlmAgent(
    name="RequestHumanApproval",
    instruction="Use the external_approval_tool with amount from state['approval_amount'] and reason from state['approval_reason'].",
    tools=[approval_tool],
    output_key="human_decision"
)

# 根据人工决策继续的智能体
process_decision = LlmAgent(
    name="ProcessDecision",
    instruction="Check {human_decision}. If 'approved', proceed. If 'rejected', inform user."
)

approval_workflow = SequentialAgent(
    name="HumanApprovalWorkflow",
    sub_agents=[prepare_request, request_approval, process_decision]
)
// 概念示例:使用工具进行人工审批
import com.google.adk.agents.LlmAgent;
import com.google.adk.agents.SequentialAgent;
import com.google.adk.tools.FunctionTool;

// --- 假设 external_approval_tool 已存在 ---
// 该工具将:
// 1. 接收详情(如 request_id, amount, reason)。
// 2. 将这些详情发送到人工审核系统(如通过 API)。
// 3. 轮询或等待人工响应(approved/rejected)。
// 4. 返回人工决策。
// public boolean externalApprovalTool(float amount, String reason) { ... }
FunctionTool approvalTool = FunctionTool.create(externalApprovalTool);

// 准备请求的智能体
LlmAgent prepareRequest = LlmAgent.builder()
    .name("PrepareApproval")
    .instruction("Prepare the approval request details based on user input. Store amount and reason in state.")
    // ... 可能设置 state['approval_amount'] 和 state['approval_reason'] ...
    .build();

// 调用人工审批工具的智能体
LlmAgent requestApproval = LlmAgent.builder()
    .name("RequestHumanApproval")
    .instruction("Use the external_approval_tool with amount from state['approval_amount'] and reason from state['approval_reason'].")
    .tools(approvalTool)
    .outputKey("human_decision")
    .build();

// 根据人工决策继续的智能体
LlmAgent processDecision = LlmAgent.builder()
    .name("ProcessDecision")
    .instruction("Check {human_decision}. If 'approved', proceed. If 'rejected', inform user.")
    .build();

SequentialAgent approvalWorkflow = SequentialAgent.builder()
    .name("HumanApprovalWorkflow")
    .subAgents(prepareRequest, requestApproval, processDecision)
    .build();
import (
    "google.golang.org/adk/agent"
    "google.golang.org/adk/agent/llmagent"
    "google.golang.org/adk/agent/workflowagents/sequentialagent"
    "google.golang.org/adk/tool"
)

// Conceptual Code: Using a Tool for Human Approval
// --- Assume externalApprovalTool exists ---
// func externalApprovalTool(amount float64, reason string) (string, error) { ... }
type externalApprovalToolArgs struct {
    Amount float64 `json:"amount" jsonschema:"The amount for which approval is requested."`
    Reason string  `json:"reason" jsonschema:"The reason for the approval request."`
}
var externalApprovalTool func(tool.Context, externalApprovalToolArgs) (string, error)
approvalTool, _ := functiontool.New(
    functiontool.Config{
        Name:        "external_approval_tool",
        Description: "Sends a request for human approval.",
    },
    externalApprovalTool,
)

prepareRequest, _ := llmagent.New(llmagent.Config{
    Name:        "PrepareApproval",
    Instruction: "Prepare the approval request details based on user input. Store amount and reason in state.",
    Model:       m,
})

requestApproval, _ := llmagent.New(llmagent.Config{
    Name:        "RequestHumanApproval",
    Instruction: "Use the external_approval_tool with amount from state['approval_amount'] and reason from state['approval_reason'].",
    Tools:       []tool.Tool{approvalTool},
    OutputKey:   "human_decision",
    Model:       m,
})

processDecision, _ := llmagent.New(llmagent.Config{
    Name:        "ProcessDecision",
    Instruction: "Check {human_decision}. If 'approved', proceed. If 'rejected', inform user.",
    Model:       m,
})

approvalWorkflow, _ := sequentialagent.New(sequentialagent.Config{
    AgentConfig: agent.Config{Name: "HumanApprovalWorkflow", SubAgents: []agent.Agent{prepareRequest, requestApproval, processDecision}},
})

这些模式为构建你的多智能体系统提供了起点。你可以根据需要混合和匹配它们,为你的特定应用程序创建最有效的架构。