自定义智能体模板工作流¶
自定义智能体和基于智能体的工作流允许你通过直接继承 BaseAgent 并实现自己的控制流来定义任意的编排逻辑。这种方法允许你创建类似于 SequentialAgent、LoopAgent 和 ParallelAgent 的新执行模式,使你能够构建高度特定且复杂的智能体工作流。
备选方案:基于图的工作流
从 ADK 2.0 开始,使用 BaseAgent 的基于智能体的工作流已被更灵活的工作流结构所取代,包括基于图的工作流和动态工作流。在为目标工作流构建自定义智能体之前,你应先评估这些工作流机制的能力。
高级概念
通过直接实现 _run_async_impl(或其他语言的等效方法)来构建自定义智能体虽然提供了强大的控制能力,但比使用预定义的 LlmAgent 或 WorkflowAgent 类型更复杂。我们建议在尝试自定义编排逻辑之前,先理解这些基础的智能体类型。
概述¶
自定义智能体本质上是你创建的任何继承自 google.adk.agents.BaseAgent 并在 _run_async_impl 异步方法中实现其核心执行逻辑的类。你可以完全控制此方法如何调用其他子智能体、管理状态以及处理事件。

Note
实现智能体核心异步逻辑的具体方法名称可能因 SDK 语言而略有不同,例如 Java 中的 runAsyncImpl、Python 中的 _run_async_impl 或 TypeScript 中的 runAsyncImpl。详情请参阅特定语言的 API 文档。
为什么构建自定义智能体?¶
在回顾了现有的 ADK 智能体工作流方法和架构后,如果你发现这些机制无法满足你项目的以下一个或多个要求,你可以考虑构建自定义工作流智能体:
Implementing custom logic¶
自定义智能体的核心在于你定义其独特异步行为的方法。这个方法允许你编排子智能体并管理执行流程。
任何自定义智能体的核心都是 _run_async_impl 方法。你需要在这里定义其独特的行为。
- 签名:
async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event, None]: - 异步生成器: 它必须是一个
async def函数,并返回一个AsyncGenerator。这样你可以yield由子智能体或自身逻辑产生的事件给上层 runner。 ctx(InvocationContext): 提供关键的运行时信息,最重要的是ctx.session.state,这是在你的自定义智能体编排的各个步骤之间共享数据的主要方式。
任何自定义智能体的核心都是 runAsyncImpl 方法。在这里你定义其独特的行为。
- 签名:
async* runAsyncImpl(ctx: InvocationContext): AsyncGenerator<Event, void, undefined> - 异步生成器: 它必须是一个
async生成器函数 (async*)。 ctx(InvocationContext): 提供对关键运行时信息的访问,最重要的是ctx.session.state,这是在你的自定义智能体编排的步骤之间共享数据的主要方式。
在 Go 中,你需要实现 Run 方法作为满足 agent.Agent 接口的结构体的一部分。实际逻辑通常是你自定义智能体结构体上的一个方法。
- 签名:
Run(ctx agent.InvocationContext) iter.Seq2[*session.Event, error] - 迭代器:
Run方法返回一个迭代器 (iter.Seq2),用于产生事件和错误。这是处理智能体执行流式结果的标准方式。 ctx(InvocationContext):agent.InvocationContext提供对会话的访问,包括状态,以及其他关键运行时信息。- 会话状态: 你可以通过
ctx.Session().State()访问会话状态。
任何自定义智能体的核心都是 runAsyncImpl 方法,你需要从 BaseAgent 覆盖它。
- 签名:
protected Flowable<Event> runAsyncImpl(InvocationContext ctx) - 响应式流 (
Flowable): 它必须返回一个io.reactivex.rxjava3.core.Flowable<Event>。这个Flowable代表一个事件流,将由自定义智能体的逻辑产生,通常通过组合或转换来自子智能体的多个Flowable。 ctx(InvocationContext): 提供对关键运行时信息的访问,最重要的是ctx.session().state(),它是一个java.util.concurrent.ConcurrentMap<String, Object>。这是在你的自定义智能体编排的步骤之间共享数据的主要方式。
Key capabilities within the core asynchronous method¶
-
调用子智能体: 你可以通过子智能体的
run_async方法调用它们(通常作为实例属性存储,如self.my_llm_agent),并yield其事件: -
管理状态: 通过会话状态字典(
ctx.session.state)读取和写入数据,在子智能体调用之间传递数据或做决策: -
实现控制流: 使用标准 Python 结构(
if/elif/else,for/while循环,try/except)来创建涉及子智能体的复杂、条件或迭代工作流。
-
调用子智能体: 你使用它们的
run方法调用子智能体(通常作为实例属性存储,如this.myLlmAgent)并产出它们的事件: -
管理状态: 从会话状态对象 (
ctx.session.state) 读取和写入,以在子智能体调用之间传递数据或做决策: -
实现控制流: 使用标准 TypeScript/JavaScript 结构 (
if/else、for/while循环、try/catch) 来创建涉及你的子智能体的复杂、条件或迭代工作流。
-
调用子智能体: 你可以通过调用子智能体的
Run方法来调用它们。 -
管理状态: 从会话状态读取和写入数据,以在子智能体调用之间传递数据或做出决策。
// ctx (agent.InvocationContext) 直接传递给你智能体的 Run 函数。 // 读取前一个智能体设置的数据 previousResult, err := ctx.Session().State().Get("some_key") if err != nil { // 处理键可能尚不存在的情况 } // 根据状态做出决策 if val, ok := previousResult.(string); ok && val == "some_value" { // ... 调用一个特定的子智能体 ... } else { // ... 调用另一个子智能体 ... } // 为后续步骤存储结果 if err := ctx.Session().State().Set("my_custom_result", "calculated_value"); err != nil { // 处理错误 } -
实现控制流: 使用标准的 Go 结构(
if/else、for/switch循环、goroutine、channel)来创建涉及子智能体的复杂、条件或迭代工作流。
-
调用子智能体: 你可以通过子智能体的异步运行方法调用它们(通常作为实例属性或对象存储),并返回它们的事件流:
通常你会用 RxJava 操作符如
concatWith、flatMapPublisher或concatArray链接子智能体的Flowable。如果后续阶段的执行依赖于前序阶段的完成或状态,通常会用// 示例:运行一个子智能体 // return someSubAgent.runAsync(ctx); // 示例:顺序运行多个子智能体 Flowable<Event> firstAgentEvents = someSubAgent1.runAsync(ctx) .doOnNext(event -> System.out.println("Event from agent 1: " + event.id())); Flowable<Event> secondAgentEvents = Flowable.defer(() -> someSubAgent2.runAsync(ctx) .doOnNext(event -> System.out.println("Event from agent 2: " + event.id())) ); return firstAgentEvents.concatWith(secondAgentEvents);Flowable.defer()。 -
管理状态: 通过会话状态读取和写入数据,在子智能体调用之间传递数据或做决策。会话状态是通过
ctx.session().state()获得的java.util.concurrent.ConcurrentMap<String, Object>。// 读取前一个智能体设置的数据 Object previousResult = ctx.session().state().get("some_key"); // 根据状态做决策 if ("some_value".equals(previousResult)) { // ... 包含特定子智能体 Flowable 的逻辑 ... } else { // ... 包含另一个子智能体 Flowable 的逻辑 ... } // 为后续步骤存储结果(通常通过子智能体的 output_key 完成) // ctx.session().state().put("my_custom_result", "calculated_value"); -
实现控制流: 结合响应式操作符(RxJava)和标准语言结构(
if/else、循环、try/catch)来创建复杂的工作流。- 条件分支: 用
Flowable.defer()根据条件选择订阅哪个Flowable,或用filter()在流内过滤事件。 - 迭代: 用
repeat()、retry()等操作符,或通过结构化Flowable链,在条件下递归调用自身部分(通常用flatMapPublisher或concatMap管理)。
- 条件分支: 用
管理子智能体和状态¶
通常,自定义智能体会编排其他智能体(如 LlmAgent、LoopAgent 等)。
- 初始化: 你通常会在自定义智能体的构造函数中传入这些子智能体的实例,并将它们存储为实例字段/属性(如
this.story_generator = story_generator_instance或self.story_generator = story_generator_instance)。这样它们就可以在自定义智能体的核心异步执行逻辑(如_run_async_impl方法)中被访问到。 - 子智能体列表: 在用
super()构造BaseAgent时,你应该传递一个sub agents列表。这个列表告诉 ADK 框架哪些智能体是该自定义智能体直接编排的子层级。这对于框架的生命周期管理、内省以及未来可能的路由功能都很重要,即使你的核心执行逻辑(_run_async_impl)是直接通过self.xxx_agent调用这些智能体的。请包含你自定义逻辑直接调用的顶层智能体。 - 状态: 如前所述,
ctx.session.state是子智能体(尤其是使用output key的LlmAgent)将结果传递回编排者,以及编排者向下传递必要输入的标准方式。
基于智能体的工作流原语¶
以下章节详细介绍了核心 ADK 原语——如智能体层次结构、工作流智能体和交互机制——它们使你能够有效地构建和管理这些多智能体系统。ADK 提供了核心构建块(原语),使你可以构建和管理多智能体系统中的交互。
Note
原语的具体参数或方法名称可能因 SDK 语言而略有不同,例如 Python 中的 sub_agents 和 Java 中的 subAgents。详情请参阅特定语言的 API 文档。
智能体层次结构:父智能体和子智能体¶
构建多智能体系统的基础是在 BaseAgent 中定义的父子关系。
- 建立层次结构: 在初始化父智能体时,通过向
sub_agents参数传递智能体实例列表来创建树状结构。ADK 在初始化期间自动在每个子智能体上设置parent_agent属性。 - 单父规则: 一个智能体实例只能作为子智能体被添加一次。尝试分配第二个父智能体会导致
ValueError。 - 重要性: 此层次结构定义了工作流智能体的作用域,并影响 LLM 驱动的委托的潜在目标。你可以使用
agent.parent_agent导航层次结构,或使用agent.find_agent(name)查找后代。
# 概念示例:定义层次结构
from google.adk.agents import LlmAgent, BaseAgent
# 定义各个智能体
greeter = LlmAgent(name="Greeter", model="gemini-flash-latest")
task_doer = BaseAgent(name="TaskExecutor") # 自定义非 LLM 智能体
# 创建父智能体并通过 sub_agents 分配子智能体
coordinator = LlmAgent(
name="Coordinator",
model="gemini-flash-latest",
description="我协调问候和任务。",
sub_agents=[ # 在此处分配子智能体
greeter,
task_doer
]
)
# 框架自动设置:
# assert greeter.parent_agent == coordinator
# assert task_doer.parent_agent == coordinator
// 概念示例:定义层次结构
import { LlmAgent, BaseAgent, InvocationContext } from '@google/adk';
import type { Event, createEventActions } from '@google/adk';
class TaskExecutorAgent extends BaseAgent {
async *runAsyncImpl(context: InvocationContext): AsyncGenerator<Event, void, void> {
yield {
id: 'event-1',
invocationId: context.invocationId,
author: this.name,
content: { parts: [{ text: 'Task completed!' }] },
actions: createEventActions(),
timestamp: Date.now(),
};
}
async *runLiveImpl(context: InvocationContext): AsyncGenerator<Event, void, void> {
this.runAsyncImpl(context);
}
}
// 定义各个智能体
const greeter = new LlmAgent({name: 'Greeter', model: 'gemini-flash-latest'});
const taskDoer = new TaskExecutorAgent({name: 'TaskExecutor'}); // Custom non-LLM agent
// Create parent agent and assign children via subAgents
const coordinator = new LlmAgent({
name: 'Coordinator',
model: 'gemini-flash-latest',
description: 'I coordinate greetings and tasks.',
subAgents: [ // Assign subAgents here
greeter,
taskDoer
],
});
// 框架自动设置:
// console.assert(greeter.parentAgent === coordinator);
// console.assert(taskDoer.parentAgent === coordinator);
import (
"google.golang.org/adk/agent"
"google.golang.org/adk/agent/llmagent"
)
// Conceptual Example: Defining Hierarchy
// Define individual agents
greeter, _ := llmagent.New(llmagent.Config{Name: "Greeter", Model: m})
taskDoer, _ := agent.New(agent.Config{Name: "TaskExecutor"}) // Custom non-LLM agent
// Create parent agent and assign children via sub_agents
coordinator, _ := llmagent.New(llmagent.Config{
Name: "Coordinator",
Model: m,
Description: "I coordinate greetings and tasks.",
SubAgents: []agent.Agent{greeter, taskDoer}, // Assign sub_agents here
})
// 概念示例:定义层次结构
import com.google.adk.agents.SequentialAgent;
import com.google.adk.agents.LlmAgent;
// 定义各个智能体
LlmAgent greeter = LlmAgent.builder().name("Greeter").model("gemini-flash-latest").build();
SequentialAgent taskDoer = SequentialAgent.builder().name("TaskExecutor").subAgents(...).build(); // Sequential Agent
// Create parent agent and assign sub_agents
LlmAgent coordinator = LlmAgent.builder()
.name("Coordinator")
.model("gemini-flash-latest")
.description("I coordinate greetings and tasks")
.subAgents(greeter, taskDoer) // Assign sub_agents here
.build();
// 框架自动设置:
// assert greeter.parentAgent().equals(coordinator);
// assert taskDoer.parentAgent().equals(coordinator);
class TaskExecutorAgent : BaseAgent(name = "TaskExecutor") {
override fun runAsyncImpl(context: InvocationContext): Flow<Event> {
return flowOf(
Event(
author = name,
content = Content(parts = listOf(Part(text = "Task completed!"))),
),
)
}
}
val greeter = LlmAgent(name = "Greeter", model = model)
val taskDoer = TaskExecutorAgent()
val coordinator =
LlmAgent(
name = "Coordinator",
model = model,
description = "I coordinate greetings and tasks.",
subAgents = listOf(greeter, taskDoer),
)
工作流智能体作为编排器¶
ADK 包含从 BaseAgent 派生的专用智能体,它们本身不执行任务,而是编排其 sub_agents 的执行流程。
SequentialAgent: 按照列出的顺序逐一执行其sub_agents。- 上下文: 按顺序传递相同的
InvocationContext,允许智能体通过共享状态轻松传递结果。
- 上下文: 按顺序传递相同的
# 概念示例:顺序流水线
from google.adk.agents import SequentialAgent, LlmAgent
step1 = LlmAgent(name="Step1_Fetch", output_key="data") # 将输出保存到 state['data']
step2 = LlmAgent(name="Step2_Process", instruction="处理来自 {data} 的数据。")
pipeline = SequentialAgent(name="MyPipeline", sub_agents=[step1, step2])
# 当流水线运行时,Step2 可以访问 Step1 设置的 state['data']。
// 概念示例:顺序流水线
import { SequentialAgent, LlmAgent } from '@google/adk';
const step1 = new LlmAgent({name: 'Step1_Fetch', outputKey: 'data'}); // 将输出保存到 state['data']
const step2 = new LlmAgent({name: 'Step2_Process', instruction: 'Process data from {data}.'});
const pipeline = new SequentialAgent({name: 'MyPipeline', subAgents: [step1, step2]});
// 当流水线运行时,Step2 可以访问 Step1 设置的 state['data']。
import (
"google.golang.org/adk/agent"
"google.golang.org/adk/agent/llmagent"
"google.golang.org/adk/agent/workflowagents/sequentialagent"
)
// Conceptual Example: Sequential Pipeline
step1, _ := llmagent.New(llmagent.Config{Name: "Step1_Fetch", OutputKey: "data", Model: m}) // Saves output to state["data"]
step2, _ := llmagent.New(llmagent.Config{Name: "Step2_Process", Instruction: "Process data from {data}.", Model: m})
pipeline, _ := sequentialagent.New(sequentialagent.Config{
AgentConfig: agent.Config{Name: "MyPipeline", SubAgents: []agent.Agent{step1, step2}},
})
// When pipeline runs, Step2 can access the state["data"] set by Step1.
// 概念示例:顺序流水线
import com.google.adk.agents.SequentialAgent;
import com.google.adk.agents.LlmAgent;
LlmAgent step1 = LlmAgent.builder().name("Step1_Fetch").outputKey("data").build(); // 将输出保存到 state.get("data")
LlmAgent step2 = LlmAgent.builder().name("Step2_Process").instruction("Process data from {data}.").build();
SequentialAgent pipeline = SequentialAgent.builder().name("MyPipeline").subAgents(step1, step2).build();
// 当流水线运行时,Step2 可以访问 Step1 设置的 state.get("data")。
ParallelAgent: 并行执行其sub_agents。来自子智能体的事件可能是交错的。- 上下文: 为每个子智能体修改
InvocationContext.branch(例如,ParentBranch.ChildName),提供不同的上下文路径,这在某些记忆实现中对于隔离历史很有用。 - 状态: 尽管分支不同,所有并行子智能体访问相同的共享
session.state,使它们能够读取初始状态并写入结果(使用不同的键以避免竞态条件)。
- 上下文: 为每个子智能体修改
# 概念示例:并行执行
from google.adk.agents import ParallelAgent, LlmAgent
fetch_weather = LlmAgent(name="WeatherFetcher", output_key="weather")
fetch_news = LlmAgent(name="NewsFetcher", output_key="news")
gatherer = ParallelAgent(name="InfoGatherer", sub_agents=[fetch_weather, fetch_news])
# 当 gatherer 运行时,WeatherFetcher 和 NewsFetcher 并发执行。
# 后续的智能体可以读取 state['weather'] 和 state['news']。
// 概念示例:并行执行
import { ParallelAgent, LlmAgent } from '@google/adk';
const fetchWeather = new LlmAgent({name: 'WeatherFetcher', outputKey: 'weather'});
const fetchNews = new LlmAgent({name: 'NewsFetcher', outputKey: 'news'});
const gatherer = new ParallelAgent({name: 'InfoGatherer', subAgents: [fetchWeather, fetchNews]});
// 当 gatherer 运行时,WeatherFetcher 和 NewsFetcher 并发执行。
// 后续的智能体可以读取 state['weather'] 和 state['news']。
import (
"google.golang.org/adk/agent"
"google.golang.org/adk/agent/llmagent"
"google.golang.org/adk/agent/workflowagents/parallelagent"
)
// Conceptual Example: Parallel Execution
fetchWeather, _ := llmagent.New(llmagent.Config{Name: "WeatherFetcher", OutputKey: "weather", Model: m})
fetchNews, _ := llmagent.New(llmagent.Config{Name: "NewsFetcher", OutputKey: "news", Model: m})
gatherer, _ := parallelagent.New(parallelagent.Config{
AgentConfig: agent.Config{Name: "InfoGatherer", SubAgents: []agent.Agent{fetchWeather, fetchNews}},
})
// When gatherer runs, WeatherFetcher and NewsFetcher run concurrently.
// A subsequent agent could read state["weather"] and state["news"].
// 概念示例:并行执行
import com.google.adk.agents.LlmAgent;
import com.google.adk.agents.ParallelAgent;
LlmAgent fetchWeather = LlmAgent.builder()
.name("WeatherFetcher")
.outputKey("weather")
.build();
LlmAgent fetchNews = LlmAgent.builder()
.name("NewsFetcher")
.instruction("news")
.build();
ParallelAgent gatherer = ParallelAgent.builder()
.name("InfoGatherer")
.subAgents(fetchWeather, fetchNews)
.build();
// 当 gatherer 运行时,WeatherFetcher 和 NewsFetcher 并发执行。
// 后续的智能体可以读取 state['weather'] 和 state['news']。
LoopAgent: 在循环中顺序执行其sub_agents。- 终止: 如果达到可选的
max_iterations,或任何子智能体在其事件操作中返回了escalate=True的Event,循环将停止。 - 上下文与状态: 每次迭代传递相同的
InvocationContext,允许状态更改(如计数器、标志)在循环之间持久化。
- 终止: 如果达到可选的
# 概念示例:带条件的循环
from google.adk.agents import LoopAgent, LlmAgent, BaseAgent
from google.adk.events import Event, EventActions
from google.adk.agents.invocation_context import InvocationContext
from typing import AsyncGenerator
class CheckCondition(BaseAgent): # 自定义智能体,用于检查状态
async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event, None]:
status = ctx.session.state.get("status", "pending")
is_done = (status == "completed")
yield Event(author=self.name, actions=EventActions(escalate=is_done)) # 如果完成则升级
process_step = LlmAgent(name="ProcessingStep") # 可能更新 state['status'] 的智能体
poller = LoopAgent(
name="StatusPoller",
max_iterations=10,
sub_agents=[process_step, CheckCondition(name="Checker")]
)
# 当 poller 运行时,它会重复执行 process_step 然后 Checker
# 直到 Checker 升级(state['status'] == 'completed')或达到 10 次迭代。
// 概念示例:带条件的循环
import { LoopAgent, LlmAgent, BaseAgent, InvocationContext } from '@google/adk';
import type { Event, createEventActions, EventActions } from '@google/adk';
class CheckConditionAgent extends BaseAgent { // 自定义智能体,用于检查状态
async *runAsyncImpl(ctx: InvocationContext): AsyncGenerator<Event> {
const status = ctx.session.state['status'] || 'pending';
const isDone = status === 'completed';
yield createEvent({ author: 'check_condition', actions: createEventActions({ escalate: isDone }) });
}
async *runLiveImpl(ctx: InvocationContext): AsyncGenerator<Event> {
// 此方法未实现。
}
};
const processStep = new LlmAgent({name: 'ProcessingStep'}); // 可能更新 state['status'] 的智能体
const poller = new LoopAgent({
name: 'StatusPoller',
maxIterations: 10,
// 在循环中顺序执行其子智能体
subAgents: [processStep, new CheckConditionAgent ({name: 'Checker'})]
});
// 当 poller 运行时,它会重复执行 processStep,然后 Checker
// 直到 Checker 升级(state['status'] === 'completed')或达到 10 次迭代。
import (
"iter"
"google.golang.org/adk/agent"
"google.golang.org/adk/agent/llmagent"
"google.golang.org/adk/agent/workflowagents/loopagent"
"google.golang.org/adk/session"
)
// Conceptual Example: Loop with Condition
// Custom agent to check state
checkCondition, _ := agent.New(agent.Config{
Name: "Checker",
Run: func(ctx agent.InvocationContext) iter.Seq2[*session.Event, error] {
return func(yield func(*session.Event, error) bool) {
status, err := ctx.Session().State().Get("status")
// If "status" is not in the state, default to "pending".
// This is idiomatic Go for handling a potential error on lookup.
if err != nil {
status = "pending"
}
isDone := status == "completed"
yield(&session.Event{Author: "Checker", Actions: session.EventActions{Escalate: isDone}}, nil)
}
},
})
processStep, _ := llmagent.New(llmagent.Config{Name: "ProcessingStep", Model: m}) // Agent that might update state["status"]
poller, _ := loopagent.New(loopagent.Config{
MaxIterations: 10,
AgentConfig: agent.Config{Name: "StatusPoller", SubAgents: []agent.Agent{processStep, checkCondition}},
})
// When poller runs, it executes processStep then Checker repeatedly
// until Checker escalates (state["status"] == "completed") or 10 iterations pass.
// 概念示例:带条件的循环
// 自定义智能体,用于检查状态并可能升级
public static class CheckConditionAgent extends BaseAgent {
public CheckConditionAgent(String name, String description) {
super(name, description, List.of(), null, null);
}
@Override
protected Flowable<Event> runAsyncImpl(InvocationContext ctx) {
String status = (String) ctx.session().state().getOrDefault("status", "pending");
boolean isDone = "completed".equalsIgnoreCase(status);
// 如果满足条件,则发出信号以升级(退出循环)。
// 如果未完成,则升级标志为 false 或不存在,循环继续。
Event checkEvent = Event.builder()
.author(name())
.id(Event.generateEventId()) // 为事件提供唯一 ID 很重要
.actions(EventActions.builder().escalate(isDone).build()) // Escalate if done
.build();
return Flowable.just(checkEvent);
}
}
// 可能更新 state.put("status") 的智能体
LlmAgent processingStepAgent = LlmAgent.builder().name("ProcessingStep").build();
// 用于检查条件的自定义智能体实例
CheckConditionAgent conditionCheckerAgent = new CheckConditionAgent(
"ConditionChecker",
"检查状态是否为 'completed'。"
);
LoopAgent poller = LoopAgent.builder().name("StatusPoller").maxIterations(10).subAgents(processingStepAgent, conditionCheckerAgent).build();
// 当 poller 运行时,它会重复执行 processingStepAgent 然后 conditionCheckerAgent
// 直到 Checker 升级(state.get("status") == "completed")或达到 10 次迭代。
class CheckConditionAgent(name: String) : BaseAgent(name = name) {
override fun runAsyncImpl(context: InvocationContext): Flow<Event> {
val status = context.session.state["status"] as? String ?: "pending"
val isDone = status == "completed"
return flowOf(
Event(
author = name,
actions = EventActions(escalate = isDone),
),
)
}
}
val processStep = LlmAgent(name = "ProcessingStep", model = model)
val checker = CheckConditionAgent(name = "Checker")
val poller =
LoopAgent(
name = "StatusPoller",
maxIterations = 10,
subAgents = listOf(processStep, checker),
)
交互与通信机制¶
系统中的智能体通常需要交换数据或在彼此之间触发操作。ADK 通过以下方式实现这一点:
共享会话状态¶
在同一调用中运行的智能体(从而通过 InvocationContext 共享相同的 Session 对象)进行被动通信的最基本方式。
- 机制: 一个智能体(或其工具/回调)写入一个值(
context.state['data_key'] = processed_data),后续的智能体读取它(data = context.state.get('data_key'))。状态更改通过CallbackContext追踪。 - 便利性:
LlmAgent上的output_key属性将智能体的最终响应文本(或结构化输出)自动保存到指定的状态键中。 - 性质: 异步、被动通信。适用于由
SequentialAgent编排的流水线或在LoopAgent迭代之间传递数据。 - 另请参阅: 状态管理
调用上下文和 temp: 状态
当父智能体调用子智能体时,它会传递相同的 InvocationContext。这意味着它们共享相同的临时(temp:)状态,这对于传递仅与当前轮次相关的数据非常理想。
# 概念示例:使用 output_key 并读取状态
from google.adk.agents import LlmAgent, SequentialAgent
agent_A = LlmAgent(name="AgentA", instruction="Find the capital of France.", output_key="capital_city")
agent_B = LlmAgent(name="AgentB", instruction="Tell me about the city stored in {capital_city}.")
pipeline = SequentialAgent(name="CityInfo", sub_agents=[agent_A, agent_B])
# AgentA 运行,将 "Paris" 保存到 state['capital_city']。
# AgentB 运行,其指令处理器读取 state['capital_city'] 以获取 "Paris"。
// 概念示例:使用 outputKey 并读取状态
import { LlmAgent, SequentialAgent } from '@google/adk';
const agentA = new LlmAgent({name: 'AgentA', instruction: 'Find the capital of France.', outputKey: 'capital_city'});
const agentB = new LlmAgent({name: 'AgentB', instruction: 'Tell me about the city stored in {capital_city}.'});
const pipeline = new SequentialAgent({name: 'CityInfo', subAgents: [agentA, agentB]});
// AgentA 运行,将 "Paris" 保存到 state['capital_city']。
// AgentB 运行,其指令处理器读取 state['capital_city'] 以获取 "Paris"。
import (
"google.golang.org/adk/agent"
"google.golang.org/adk/agent/llmagent"
"google.golang.org/adk/agent/workflowagents/sequentialagent"
)
// Conceptual Example: Using output_key and reading state
agentA, _ := llmagent.New(llmagent.Config{Name: "AgentA", Instruction: "Find the capital of France.", OutputKey: "capital_city", Model: m})
agentB, _ := llmagent.New(llmagent.Config{Name: "AgentB", Instruction: "Tell me about the city stored in {capital_city}.", Model: m})
pipeline2, _ := sequentialagent.New(sequentialagent.Config{
AgentConfig: agent.Config{Name: "CityInfo", SubAgents: []agent.Agent{agentA, agentB}},
})
// AgentA runs, saves "Paris" to state["capital_city"].
// AgentB runs, its instruction processor reads state["capital_city"] to get "Paris".
// 概念示例:使用 outputKey 并读取状态
import com.google.adk.agents.LlmAgent;
import com.google.adk.agents.SequentialAgent;
LlmAgent agentA = LlmAgent.builder()
.name("AgentA")
.instruction("Find the capital of France.")
.outputKey("capital_city")
.build();
LlmAgent agentB = LlmAgent.builder()
.name("AgentB")
.instruction("Tell me about the city stored in {capital_city}.")
.outputKey("capital_city")
.build();
SequentialAgent pipeline = SequentialAgent.builder().name("CityInfo").subAgents(agentA, agentB).build();
// AgentA runs, saves "Paris" to state('capital_city').
// AgentB runs, its instruction processor reads state.get("capital_city") to get "Paris".
val agentA =
LlmAgent(
name = "AgentA",
model = model,
instruction = Instruction("Find the capital of France."),
)
val agentB =
LlmAgent(
name = "AgentB",
model = model,
instruction = Instruction("Tell me about the city stored in state."),
)
val cityPipeline = SequentialAgent(name = "CityInfo", subAgents = listOf(agentA, agentB))
LLM 委托与智能体转移¶
利用 LlmAgent 的理解能力,将任务动态路由到层次结构中其他合适的智能体。
- 机制: 智能体的 LLM 生成特定的函数调用:
transfer_to_agent(agent_name='target_agent_name')。 - 处理: 当存在子智能体或未禁止转移时,默认使用的
AutoFlow会拦截此调用。它使用root_agent.find_agent()识别目标智能体并更新InvocationContext以切换执行焦点。 - 要求: 发起调用的
LlmAgent需要清晰的instructions来说明何时转移,而潜在的目标智能体需要有独特的description,以便 LLM 做出明智的决策。转移范围(父级、子级、兄弟级)可以在LlmAgent上配置。 - 性质: 基于 LLM 解释的动态、灵活路由。
# 概念设置:LLM 转移
from google.adk.agents import LlmAgent
booking_agent = LlmAgent(name="Booker", description="处理航班和酒店预订。")
info_agent = LlmAgent(name="Info", description="提供一般信息并回答问题。")
coordinator = LlmAgent(
name="Coordinator",
model="gemini-flash-latest",
instruction="你是一个助手。将预订任务委托给 Booker,将信息请求委托给 Info。",
description="主协调者。",
# 此处通常隐式使用 AutoFlow
sub_agents=[booking_agent, info_agent]
)
# 如果协调者收到"预订航班",其 LLM 应生成:
# FunctionCall(name='transfer_to_agent', args={'agent_name': 'Booker'})
# ADK 框架然后将执行路由到 booking_agent。
// 概念设置:LLM 转移
import { LlmAgent } from '@google/adk';
const bookingAgent = new LlmAgent({name: 'Booker', description: '处理航班和酒店预订。'});
const infoAgent = new LlmAgent({name: 'Info', description: '提供一般信息并回答问题。'});
const coordinator = new LlmAgent({
name: 'Coordinator',
model: 'gemini-flash-latest',
instruction: '你是一个助手。将预订任务委托给 Booker,将信息请求委托给 Info。',
description: '主协调者。',
// 此处通常隐式使用 AutoFlow
subAgents: [bookingAgent, infoAgent]
});
// 如果协调者收到"预订航班",其 LLM 应生成:
// {functionCall: {name: 'transfer_to_agent', args: {agent_name: 'Booker'}}}
// ADK 框架然后将执行路由到 bookingAgent。
import (
"google.golang.org/adk/agent/llmagent"
)
// Conceptual Setup: LLM Transfer
bookingAgent, _ := llmagent.New(llmagent.Config{Name: "Booker", Description: "Handles flight and hotel bookings.", Model: m})
infoAgent, _ := llmagent.New(llmagent.Config{Name: "Info", Description: "Provides general information and answers questions.", Model: m})
coordinator, _ = llmagent.New(llmagent.Config{
Name: "Coordinator",
Model: m,
Instruction: "You are an assistant. Delegate booking tasks to Booker and info requests to Info.",
Description: "Main coordinator.",
SubAgents: []agent.Agent{bookingAgent, infoAgent},
})
// If coordinator receives "Book a flight", its LLM should generate:
// FunctionCall{Name: "transfer_to_agent", Args: map[string]any{"agent_name": "Booker"}}
// ADK framework then routes execution to bookingAgent.
// 概念设置:LLM 转移
import com.google.adk.agents.LlmAgent;
LlmAgent bookingAgent = LlmAgent.builder()
.name("Booker")
.description("处理航班和酒店预订。")
.build();
LlmAgent infoAgent = LlmAgent.builder()
.name("Info")
.description("提供一般信息并回答问题。")
.build();
// 定义协调者智能体
LlmAgent coordinator = LlmAgent.builder()
.name("Coordinator")
.model("gemini-flash-latest") // 或你想要的模型
.instruction("你是一个助手。将预订任务委托给 Booker,将信息请求委托给 Info。")
.description("主协调者。")
// 默认情况下会(隐式地)使用 AutoFlow,因为存在子智能体
// 且未禁止转移。
.subAgents(bookingAgent, infoAgent)
.build();
// 如果协调者收到"预订航班",其 LLM 应生成:
// FunctionCall.builder.name("transferToAgent").args(ImmutableMap.of("agent_name", "Booker")).build()
// ADK 框架然后将执行路由到 bookingAgent。
val bookingAgent =
LlmAgent(
name = "Booker",
model = model,
description = "Handles flight and hotel bookings.",
)
val infoAgent =
LlmAgent(
name = "Info",
model = model,
description = "Provides general information and answers questions.",
)
val transferCoordinator =
LlmAgent(
name = "Coordinator",
model = model,
instruction =
Instruction(
"You are an assistant. Delegate booking tasks to Booker and info requests to Info.",
),
description = "Main coordinator.",
subAgents = listOf(bookingAgent, infoAgent),
)
使用 AgentTool 的显式调用¶
允许 LlmAgent 将另一个 BaseAgent 实例视为可调用函数或工具。
- 机制: 将目标智能体实例包装在
AgentTool中,并将其包含在父LlmAgent的tools列表中。AgentTool会为 LLM 生成相应的函数声明。 - 处理: 当父 LLM 生成针对
AgentTool的函数调用时,框架执行AgentTool.run_async。此方法运行目标智能体,捕获其最终响应,将所有状态/工件更改转发回父上下文,并将响应作为工具的结果返回。 - 性质: 像任何其他工具一样同步(在父流程内)、显式、受控的调用。
- (注意: 需要使用
AgentTool并显式导入)。
# 概念设置:智能体作为工具
from google.adk.agents import LlmAgent, BaseAgent
from google.adk.tools import agent_tool
from pydantic import BaseModel
# 定义目标智能体(可以是 LlmAgent 或自定义 BaseAgent)
class ImageGeneratorAgent(BaseAgent): # 示例自定义智能体
name: str = "ImageGen"
description: str = "根据提示生成图像。"
# ... 内部逻辑 ...
async def _run_async_impl(self, ctx): # 简化的运行逻辑
prompt = ctx.session.state.get("image_prompt", "default prompt")
# ... 生成图像字节 ...
image_bytes = b"..."
yield Event(author=self.name, content=types.Content(parts=[types.Part.from_bytes(image_bytes, "image/png")]))
image_agent = ImageGeneratorAgent()
image_tool = agent_tool.AgentTool(agent=image_agent) # 包装智能体
# 父智能体使用 AgentTool
artist_agent = LlmAgent(
name="Artist",
model="gemini-flash-latest",
instruction="创建提示并使用 ImageGen 工具生成图像。",
tools=[image_tool] # 包含 AgentTool
)
# Artist LLM 生成提示,然后调用:
# FunctionCall(name='ImageGen', args={'image_prompt': 'a cat wearing a hat'})
# 框架调用 image_tool.run_async(...),它会运行 ImageGeneratorAgent。
# 生成的图像 Part 作为工具结果返回给 Artist 智能体。
// 概念设置:智能体作为工具
import { LlmAgent, BaseAgent, AgentTool, InvocationContext } from '@google/adk';
import type { Part, createEvent, Event } from '@google/genai';
// 定义目标智能体(可以是 LlmAgent 或自定义 BaseAgent)
class ImageGeneratorAgent extends BaseAgent { // 示例自定义智能体
constructor() {
super({name: 'ImageGen', description: '根据提示生成图像。'});
}
// ... 内部逻辑 ...
async *runAsyncImpl(ctx: InvocationContext): AsyncGenerator<Event> { // 简化的运行逻辑
const prompt = ctx.session.state['image_prompt'] || 'default prompt';
// ... 生成图像字节 ...
const imageBytes = new Uint8Array(); // 占位符
const imagePart: Part = {inlineData: {data: Buffer.from(imageBytes).toString('base64'), mimeType: 'image/png'}};
yield createEvent({content: {parts: [imagePart]}});
}
async *runLiveImpl(ctx: InvocationContext): AsyncGenerator<Event, void, void> {
// 此智能体未实现此方法。
}
}
const imageAgent = new ImageGeneratorAgent();
const imageTool = new AgentTool({agent: imageAgent}); // 包装智能体
// 父智能体使用 AgentTool
const artistAgent = new LlmAgent({
name: 'Artist',
model: 'gemini-flash-latest',
instruction: 'Create a prompt and use the ImageGen tool to generate the image.',
tools: [imageTool] // 包含 AgentTool
});
// Artist LLM 生成提示,然后调用:
// {functionCall: {name: 'ImageGen', args: {image_prompt: 'a cat wearing a hat'}}}
// 框架调用 imageTool.runAsync(...),它会运行 ImageGeneratorAgent。
// 生成的图像 Part 作为工具结果返回给 Artist 智能体。
import (
"fmt"
"iter"
"google.golang.org/adk/agent"
"google.golang.org/adk/agent/llmagent"
"google.golang.org/adk/model"
"google.golang.org/adk/session"
"google.golang.org/adk/tool"
"google.golang.org/adk/tool/agenttool"
"google.golang.org/genai"
)
// Conceptual Setup: Agent as a Tool
// Define a target agent (could be LlmAgent or custom BaseAgent)
imageAgent, _ := agent.New(agent.Config{
Name: "ImageGen",
Description: "Generates an image based on a prompt.",
Run: func(ctx agent.InvocationContext) iter.Seq2[*session.Event, error] {
return func(yield func(*session.Event, error) bool) {
prompt, _ := ctx.Session().State().Get("image_prompt")
fmt.Printf("Generating image for prompt: %v\n", prompt)
imageBytes := []byte("...") // Simulate image bytes
yield(&session.Event{
Author: "ImageGen",
LLMResponse: model.LLMResponse{
Content: &genai.Content{
Parts: []*genai.Part{genai.NewPartFromBytes(imageBytes, "image/png")},
},
},
}, nil)
}
},
})
// Wrap the agent
imageTool := agenttool.New(imageAgent, nil)
// Now imageTool can be used as a tool by other agents.
// Parent agent uses the AgentTool
artistAgent, _ := llmagent.New(llmagent.Config{
Name: "Artist",
Model: m,
Instruction: "Create a prompt and use the ImageGen tool to generate the image.",
Tools: []tool.Tool{imageTool}, // Include the AgentTool
})
// Artist LLM generates a prompt, then calls:
// FunctionCall{Name: "ImageGen", Args: map[string]any{"image_prompt": "a cat wearing a hat"}}
// Framework calls imageTool.Run(...), which runs ImageGeneratorAgent.
// The resulting image Part is returned to the Artist agent as the tool result.
// 概念设置:智能体作为工具
import com.google.adk.agents.BaseAgent;
import com.google.adk.agents.LlmAgent;
import com.google.adk.tools.AgentTool;
// 示例自定义智能体(可以是 LlmAgent 或自定义 BaseAgent)
public class ImageGeneratorAgent extends BaseAgent {
public ImageGeneratorAgent(String name, String description) {
super(name, description, List.of(), null, null);
}
// ... 内部逻辑 ...
@Override
protected Flowable<Event> runAsyncImpl(InvocationContext invocationContext) { // 简化的运行逻辑
invocationContext.session().state().get("image_prompt");
// 生成图像字节
// ...
Event responseEvent = Event.builder()
.author(this.name())
.content(Content.fromParts(Part.fromText("...")))
.build();
return Flowable.just(responseEvent);
}
@Override
protected Flowable<Event> runLiveImpl(InvocationContext invocationContext) {
return null;
}
}
// 使用 AgentTool 包装智能体
ImageGeneratorAgent imageAgent = new ImageGeneratorAgent("image_agent", "生成图像");
AgentTool imageTool = AgentTool.create(imageAgent);
// 父智能体使用 AgentTool
LlmAgent artistAgent = LlmAgent.builder()
.name("Artist")
.model("gemini-flash-latest")
.instruction(
"你是一位艺术家。为图像创建一个详细的提示,然后" +
"使用 'ImageGen' 工具生成图像。" +
"'ImageGen' 工具需要一个名为 'request' 的字符串参数," +
"其中包含图像提示。该工具将在其 'result' 字段中返回一个 JSON 字符串," +
"包含 'image_base64'、'mime_type' 和 'status'。"
)
.description("可以使用生成工具创建图像的智能体。")
.tools(imageTool) // 包含 AgentTool
.build();
// Artist LLM 生成提示,然后调用:
// FunctionCall(name='ImageGen', args={'imagePrompt': 'a cat wearing a hat'})
// 框架调用 imageTool.runAsync(...),它会运行 ImageGeneratorAgent。
// 生成的图像 Part 作为工具结果返回给 Artist 智能体。
val imageAgent =
LlmAgent(
name = "ImageGen",
model = model,
description = "Generates an image based on a prompt.",
)
val imageTool = AgentTool(agent = imageAgent)
val artistAgent =
LlmAgent(
name = "Artist",
model = model,
instruction =
Instruction(
"Create a prompt and use the ImageGen tool to generate the image.",
),
tools = listOf(imageTool),
)
这些原语提供了设计多智能体交互的灵活性,范围从紧密耦合的顺序工作流到动态的、LLM 驱动的委托网络。
设计模式示例:StoryFlow 智能体¶
让我们用一个示例模式来说明自定义智能体的强大能力:一个具有条件逻辑的多阶段内容生成工作流。
目标: 创建一个系统,生成故事,通过批评和修订进行迭代改进,执行最终检查,最重要的是,如果最终语调检查失败,则重新生成故事。
为什么需要自定义? 推动需要自定义智能体的核心需求是基于语调检查结果的条件性再生成。标准工作流智能体没有内建基于子智能体任务结果的条件分支。我们需要在编排器中实现自定义逻辑(如 if tone == "negative": ...)。
第 1 部分:简化的自定义智能体初始化¶
我们定义了继承自 BaseAgent 的 StoryFlowAgent。在 __init__ 方法中,我们将必要的子智能体(通过参数传入)存储为实例属性,并告知 BaseAgent 框架该自定义智能体将直接编排的顶层子智能体。
class StoryFlowAgent(BaseAgent):
"""
Custom agent for a story generation and refinement workflow.
This agent orchestrates a sequence of LLM agents to generate a story,
critique it, revise it, check grammar and tone, and potentially
regenerate the story if the tone is negative.
"""
# --- Field Declarations for Pydantic ---
# Declare the agents passed during initialization as class attributes with type hints
story_generator: LlmAgent
critic: LlmAgent
reviser: LlmAgent
grammar_check: LlmAgent
tone_check: LlmAgent
loop_agent: LoopAgent
sequential_agent: SequentialAgent
# model_config allows setting Pydantic configurations if needed, e.g., arbitrary_types_allowed
model_config = {"arbitrary_types_allowed": True}
def __init__(
self,
name: str,
story_generator: LlmAgent,
critic: LlmAgent,
reviser: LlmAgent,
grammar_check: LlmAgent,
tone_check: LlmAgent,
):
"""
Initializes the StoryFlowAgent.
Args:
name: The name of the agent.
story_generator: An LlmAgent to generate the initial story.
critic: An LlmAgent to critique the story.
reviser: An LlmAgent to revise the story based on criticism.
grammar_check: An LlmAgent to check the grammar.
tone_check: An LlmAgent to analyze the tone.
"""
# Create internal agents *before* calling super().__init__
loop_agent = LoopAgent(
name="CriticReviserLoop", sub_agents=[critic, reviser], max_iterations=2
)
sequential_agent = SequentialAgent(
name="PostProcessing", sub_agents=[grammar_check, tone_check]
)
# Define the sub_agents list for the framework
sub_agents_list = [
story_generator,
loop_agent,
sequential_agent,
]
# Pydantic will validate and assign them based on the class annotations.
super().__init__(
name=name,
story_generator=story_generator,
critic=critic,
reviser=reviser,
grammar_check=grammar_check,
tone_check=tone_check,
loop_agent=loop_agent,
sequential_agent=sequential_agent,
sub_agents=sub_agents_list, # Pass the sub_agents list directly
)
我们通过扩展 BaseAgent 来定义 StoryFlowAgent。在其构造函数中,我们:
1. 创建任何内部复合智能体(如 LoopAgent 或 SequentialAgent)。
2. 将所有顶层子智能体列表传递给 super() 构造函数。
3. 将子智能体(作为参数传入或内部创建)存储为实例属性(例如,this.storyGenerator),以便可以在自定义 runImpl 逻辑中访问它们。
class StoryFlowAgent extends BaseAgent {
// --- Property Declarations for TypeScript ---
private storyGenerator: LlmAgent;
private critic: LlmAgent;
private reviser: LlmAgent;
private grammarCheck: LlmAgent;
private toneCheck: LlmAgent;
private loopAgent: LoopAgent;
private sequentialAgent: SequentialAgent;
constructor(
name: string,
storyGenerator: LlmAgent,
critic: LlmAgent,
reviser: LlmAgent,
grammarCheck: LlmAgent,
toneCheck: LlmAgent
) {
// Create internal composite agents
const loopAgent = new LoopAgent({
name: "CriticReviserLoop",
subAgents: [critic, reviser],
maxIterations: 2,
});
const sequentialAgent = new SequentialAgent({
name: "PostProcessing",
subAgents: [grammarCheck, toneCheck],
});
// Define the sub-agents for the framework to know about
const subAgentsList = [
storyGenerator,
loopAgent,
sequentialAgent,
];
// Call the parent constructor
super({
name,
subAgents: subAgentsList,
});
// Assign agents to class properties for use in the custom run logic
this.storyGenerator = storyGenerator;
this.critic = critic;
this.reviser = reviser;
this.grammarCheck = grammarCheck;
this.toneCheck = toneCheck;
this.loopAgent = loopAgent;
this.sequentialAgent = sequentialAgent;
}
我们定义了 StoryFlowAgent 结构体和一个构造函数。在构造函数中,我们存储了必要的子智能体,并告知 BaseAgent 框架该自定义智能体将直接编排的顶层智能体。
// StoryFlowAgent is a custom agent that orchestrates a story generation workflow.
// It encapsulates the logic of running sub-agents in a specific sequence.
type StoryFlowAgent struct {
storyGenerator agent.Agent
revisionLoopAgent agent.Agent
postProcessorAgent agent.Agent
}
// NewStoryFlowAgent creates and configures the entire custom agent workflow.
// It takes individual LLM agents as input and internally creates the necessary
// workflow agents (loop, sequential), returning the final orchestrator agent.
func NewStoryFlowAgent(
storyGenerator,
critic,
reviser,
grammarCheck,
toneCheck agent.Agent,
) (agent.Agent, error) {
loopAgent, err := loopagent.New(loopagent.Config{
MaxIterations: 2,
AgentConfig: agent.Config{
Name: "CriticReviserLoop",
SubAgents: []agent.Agent{critic, reviser},
},
})
if err != nil {
return nil, fmt.Errorf("failed to create loop agent: %w", err)
}
sequentialAgent, err := sequentialagent.New(sequentialagent.Config{
AgentConfig: agent.Config{
Name: "PostProcessing",
SubAgents: []agent.Agent{grammarCheck, toneCheck},
},
})
if err != nil {
return nil, fmt.Errorf("failed to create sequential agent: %w", err)
}
// The StoryFlowAgent struct holds the agents needed for the Run method.
orchestrator := &StoryFlowAgent{
storyGenerator: storyGenerator,
revisionLoopAgent: loopAgent,
postProcessorAgent: sequentialAgent,
}
// agent.New creates the final agent, wiring up the Run method.
return agent.New(agent.Config{
Name: "StoryFlowAgent",
Description: "Orchestrates story generation, critique, revision, and checks.",
SubAgents: []agent.Agent{storyGenerator, loopAgent, sequentialAgent},
Run: orchestrator.Run,
})
}
我们通过扩展 BaseAgent 定义了 StoryFlowAgentExample。在其构造函数中,我们将必要的子智能体实例(作为参数传入)存储为实例字段。这些顶层子智能体也会作为列表传递给 BaseAgent 的 super 构造函数。
private final LlmAgent storyGenerator;
private final LoopAgent loopAgent;
private final SequentialAgent sequentialAgent;
public StoryFlowAgentExample(
String name, LlmAgent storyGenerator, LoopAgent loopAgent, SequentialAgent sequentialAgent) {
super(
name,
"Orchestrates story generation, critique, revision, and checks.",
List.of(storyGenerator, loopAgent, sequentialAgent),
null,
null);
this.storyGenerator = storyGenerator;
this.loopAgent = loopAgent;
this.sequentialAgent = sequentialAgent;
}
第 2 部分:定义自定义执行逻辑¶
该方法使用标准的 Python async/await 和控制流来编排子智能体。
@override
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
"""
Implements the custom orchestration logic for the story workflow.
Uses the instance attributes assigned by Pydantic (e.g., self.story_generator).
"""
logger.info(f"[{self.name}] Starting story generation workflow.")
# 1. Initial Story Generation
logger.info(f"[{self.name}] Running StoryGenerator...")
async for event in self.story_generator.run_async(ctx):
logger.info(f"[{self.name}] Event from StoryGenerator: {event.model_dump_json(indent=2, exclude_none=True)}")
yield event
# Check if story was generated before proceeding
if "current_story" not in ctx.session.state or not ctx.session.state["current_story"]:
logger.error(f"[{self.name}] Failed to generate initial story. Aborting workflow.")
return # Stop processing if initial story failed
logger.info(f"[{self.name}] Story state after generator: {ctx.session.state.get('current_story')}")
# 2. Critic-Reviser Loop
logger.info(f"[{self.name}] Running CriticReviserLoop...")
# Use the loop_agent instance attribute assigned during init
async for event in self.loop_agent.run_async(ctx):
logger.info(f"[{self.name}] Event from CriticReviserLoop: {event.model_dump_json(indent=2, exclude_none=True)}")
yield event
logger.info(f"[{self.name}] Story state after loop: {ctx.session.state.get('current_story')}")
# 3. Sequential Post-Processing (Grammar and Tone Check)
logger.info(f"[{self.name}] Running PostProcessing...")
# Use the sequential_agent instance attribute assigned during init
async for event in self.sequential_agent.run_async(ctx):
logger.info(f"[{self.name}] Event from PostProcessing: {event.model_dump_json(indent=2, exclude_none=True)}")
yield event
# 4. Tone-Based Conditional Logic
tone_check_result = ctx.session.state.get("tone_check_result")
logger.info(f"[{self.name}] Tone check result: {tone_check_result}")
if tone_check_result == "negative":
logger.info(f"[{self.name}] Tone is negative. Regenerating story...")
async for event in self.story_generator.run_async(ctx):
logger.info(f"[{self.name}] Event from StoryGenerator (Regen): {event.model_dump_json(indent=2, exclude_none=True)}")
yield event
else:
logger.info(f"[{self.name}] Tone is not negative. Keeping current story.")
pass
logger.info(f"[{self.name}] Workflow finished.")
- 首先运行
story_generator,其输出应存储在ctx.session.state["current_story"]。 - 然后运行
loop_agent,它会在内部按顺序调用critic和reviser,循环max_iterations次。它们会从 state 读取/写入current_story和criticism。 - 接着运行
sequential_agent,依次调用grammar_check和tone_check,读取current_story并将grammar_suggestions和tone_check_result写入 state。 - 自定义部分:
if语句检查 state 中的tone_check_result。如果为 "negative",则再次调用story_generator,覆盖 state 中的current_story。否则流程结束。
runImpl 方法使用标准 TypeScript async/await 和控制流来编排子智能体。runLiveImpl 也被添加以处理实时流场景。
// Implements the custom orchestration logic for the story workflow.
async* runLiveImpl(ctx: InvocationContext): AsyncGenerator<Event, void, undefined> {
yield* this.runAsyncImpl(ctx);
}
// Implements the custom orchestration logic for the story workflow.
async* runAsyncImpl(ctx: InvocationContext): AsyncGenerator<Event, void, undefined> {
console.log(`[${this.name}] Starting story generation workflow.`);
// 1. Initial Story Generation
console.log(`[${this.name}] Running StoryGenerator...`);
for await (const event of this.storyGenerator.runAsync(ctx)) {
console.log(`[${this.name}] Event from StoryGenerator: ${JSON.stringify(event, null, 2)}`);
yield event;
}
// Check if the story was generated before proceeding
if (!ctx.session.state["current_story"]) {
console.error(`[${this.name}] Failed to generate initial story. Aborting workflow.`);
return; // Stop processing
}
console.log(`[${this.name}] Story state after generator: ${ctx.session.state['current_story']}`);
// 2. Critic-Reviser Loop
console.log(`[${this.name}] Running CriticReviserLoop...`);
for await (const event of this.loopAgent.runAsync(ctx)) {
console.log(`[${this.name}] Event from CriticReviserLoop: ${JSON.stringify(event, null, 2)}`);
yield event;
}
console.log(`[${this.name}] Story state after loop: ${ctx.session.state['current_story']}`);
// 3. Sequential Post-Processing (Grammar and Tone Check)
console.log(`[${this.name}] Running PostProcessing...`);
for await (const event of this.sequentialAgent.runAsync(ctx)) {
console.log(`[${this.name}] Event from PostProcessing: ${JSON.stringify(event, null, 2)}`);
yield event;
}
// 4. Tone-Based Conditional Logic
const toneCheckResult = ctx.session.state["tone_check_result"] as string;
console.log(`[${this.name}] Tone check result: ${toneCheckResult}`);
if (toneCheckResult === "negative") {
console.log(`[${this.name}] Tone is negative. Regenerating story...`);
for await (const event of this.storyGenerator.runAsync(ctx)) {
console.log(`[${this.name}] Event from StoryGenerator (Regen): ${JSON.stringify(event, null, 2)}`);
yield event;
}
} else {
console.log(`[${this.name}] Tone is not negative. Keeping current story.`);
}
console.log(`[${this.name}] Workflow finished.`);
}
- 初始的
storyGenerator运行。其输出应位于ctx.session.state['current_story']。 loopAgent运行,它在内部按顺序调用critic和reviser,循环maxIterations次。它们从/向状态中读取/写入current_story和criticism。sequentialAgent运行,调用grammarCheck然后是toneCheck,读取current_story并将grammar_suggestions和tone_check_result写入状态。- 自定义部分:
if语句检查来自状态的tone_check_result。如果是 "negative",则再次调用storyGenerator,覆盖状态中的current_story。否则,流程结束。
Run 方法通过在其各自的 Run 方法循环中调用并产出其事件来编排子智能体。
// Run defines the custom execution logic for the StoryFlowAgent.
func (s *StoryFlowAgent) Run(ctx agent.InvocationContext) iter.Seq2[*session.Event, error] {
return func(yield func(*session.Event, error) bool) {
// Stage 1: Initial Story Generation
for event, err := range s.storyGenerator.Run(ctx) {
if err != nil {
yield(nil, fmt.Errorf("story generator failed: %w", err))
return
}
if !yield(event, nil) {
return
}
}
// Check if story was generated before proceeding
currentStory, err := ctx.Session().State().Get("current_story")
if err != nil || currentStory == "" {
log.Println("Failed to generate initial story. Aborting workflow.")
return
}
// Stage 2: Critic-Reviser Loop
for event, err := range s.revisionLoopAgent.Run(ctx) {
if err != nil {
yield(nil, fmt.Errorf("loop agent failed: %w", err))
return
}
if !yield(event, nil) {
return
}
}
// Stage 3: Post-Processing
for event, err := range s.postProcessorAgent.Run(ctx) {
if err != nil {
yield(nil, fmt.Errorf("sequential agent failed: %w", err))
return
}
if !yield(event, nil) {
return
}
}
// Stage 4: Conditional Regeneration
toneResult, err := ctx.Session().State().Get("tone_check_result")
if err != nil {
log.Printf("Could not read tone_check_result from state: %v. Assuming tone is not negative.", err)
return
}
if tone, ok := toneResult.(string); ok && tone == "negative" {
log.Println("Tone is negative. Regenerating story...")
for event, err := range s.storyGenerator.Run(ctx) {
if err != nil {
yield(nil, fmt.Errorf("story regeneration failed: %w", err))
return
}
if !yield(event, nil) {
return
}
}
} else {
log.Println("Tone is not negative. Keeping current story.")
}
}
}
- 初始的
storyGenerator运行。其输出应位于会话状态的"current_story"键下。 revisionLoopAgent运行,它在内部按顺序调用critic和reviser,最多迭代max_iterations次。它们从状态中读取/写入current_story和criticism。postProcessorAgent运行,调用grammar_check然后是tone_check,读取current_story并将grammar_suggestions和tone_check_result写入状态。- 自定义部分: 代码检查状态中的
tone_check_result。如果为 "negative",则再次调用story_generator,覆盖状态中的current_story。否则,流程结束。
runAsyncImpl 方法使用 RxJava 的 Flowable 流和操作符来实现异步控制流,编排子智能体。
@Override
protected Flowable<Event> runAsyncImpl(InvocationContext invocationContext) {
// Implements the custom orchestration logic for the story workflow.
// Uses the instance attributes assigned by Pydantic (e.g., self.story_generator).
logger.log(Level.INFO, () -> String.format("[%s] Starting story generation workflow.", name()));
// Stage 1. Initial Story Generation
Flowable<Event> storyGenFlow = runStage(storyGenerator, invocationContext, "StoryGenerator");
// Stage 2: Critic-Reviser Loop (runs after story generation completes)
Flowable<Event> criticReviserFlow = Flowable.defer(() -> {
if (!isStoryGenerated(invocationContext)) {
logger.log(Level.SEVERE,() ->
String.format("[%s] Failed to generate initial story. Aborting after StoryGenerator.",
name()));
return Flowable.empty(); // Stop further processing if no story
}
logger.log(Level.INFO, () ->
String.format("[%s] Story state after generator: %s",
name(), invocationContext.session().state().get("current_story")));
return runStage(loopAgent, invocationContext, "CriticReviserLoop");
});
// Stage 3: Post-Processing (runs after critic-reviser loop completes)
Flowable<Event> postProcessingFlow = Flowable.defer(() -> {
logger.log(Level.INFO, () ->
String.format("[%s] Story state after loop: %s",
name(), invocationContext.session().state().get("current_story")));
return runStage(sequentialAgent, invocationContext, "PostProcessing");
});
// Stage 4: Conditional Regeneration (runs after post-processing completes)
Flowable<Event> conditionalRegenFlow = Flowable.defer(() -> {
String toneCheckResult = (String) invocationContext.session().state().get("tone_check_result");
logger.log(Level.INFO, () -> String.format("[%s] Tone check result: %s", name(), toneCheckResult));
if ("negative".equalsIgnoreCase(toneCheckResult)) {
logger.log(Level.INFO, () ->
String.format("[%s] Tone is negative. Regenerating story...", name()));
return runStage(storyGenerator, invocationContext, "StoryGenerator (Regen)");
} else {
logger.log(Level.INFO, () ->
String.format("[%s] Tone is not negative. Keeping current story.", name()));
return Flowable.empty(); // No regeneration needed
}
});
return Flowable.concatArray(storyGenFlow, criticReviserFlow, postProcessingFlow, conditionalRegenFlow)
.doOnComplete(() -> logger.log(Level.INFO, () -> String.format("[%s] Workflow finished.", name())));
}
// Helper method for a single agent run stage with logging
private Flowable<Event> runStage(BaseAgent agentToRun, InvocationContext ctx, String stageName) {
logger.log(Level.INFO, () -> String.format("[%s] Running %s...", name(), stageName));
return agentToRun
.runAsync(ctx)
.doOnNext(event ->
logger.log(Level.INFO,() ->
String.format("[%s] Event from %s: %s", name(), stageName, event.toJson())))
.doOnError(err ->
logger.log(Level.SEVERE,
String.format("[%s] Error in %s", name(), stageName), err))
.doOnComplete(() ->
logger.log(Level.INFO, () ->
String.format("[%s] %s finished.", name(), stageName)));
}
- 首先执行
storyGenerator.runAsync(invocationContext)的 Flowable,其输出应存储在invocationContext.session().state().get("current_story")。 - 然后运行
loopAgent的 Flowable(通过Flowable.concatArray和Flowable.defer实现顺序),LoopAgent 内部会顺序调用critic和reviser,最多迭代maxIterations次。它们会从 state 读取/写入current_story和criticism。 - 接着执行
sequentialAgent的 Flowable,依次调用grammar_check和tone_check,读取current_story并将grammar_suggestions和tone_check_result写入 state。 - 自定义部分: 在
sequentialAgent完成后,Flowable.defer内的逻辑会检查invocationContext.session().state()中的 "tone_check_result"。如果为 "negative",则有条件地串联并再次执行storyGenerator的 Flowable,覆盖 "current_story"。否则使用空 Flowable,整体工作流结束。
第 3 部分:定义 LLM 子智能体¶
这些都是标准的 LlmAgent 定义,负责具体任务。它们的 output key 参数对于将结果放入 session.state 至关重要,其他智能体或自定义编排器可以从中获取数据。
指令中的直接状态注入
注意 story_generator 的指令。{var} 语法是一个占位符。在指令发送给 LLM 之前,ADK 框架会自动用 session.state['topic'] 的值替换(如示例:{topic})。这是为智能体提供上下文的推荐方式,即在指令中使用模板。详情见状态文档。
GEMINI_2_FLASH = "gemini-flash-latest" # 定义模型常量
# --- Define the individual LLM agents ---
story_generator = LlmAgent(
name="StoryGenerator",
model=GEMINI_2_FLASH,
instruction="""You are a story writer. Write a short story (around 100 words), on the following topic: {topic}""",
input_schema=None,
output_key="current_story", # Key for storing output in session state
)
critic = LlmAgent(
name="Critic",
model=GEMINI_2_FLASH,
instruction="""You are a story critic. Review the story provided: {{current_story}}. Provide 1-2 sentences of constructive criticism
on how to improve it. Focus on plot or character.""",
input_schema=None,
output_key="criticism", # Key for storing criticism in session state
)
reviser = LlmAgent(
name="Reviser",
model=GEMINI_2_FLASH,
instruction="""You are a story reviser. Revise the story provided: {{current_story}}, based on the criticism in
{{criticism}}. Output only the revised story.""",
input_schema=None,
output_key="current_story", # Overwrites the original story
)
grammar_check = LlmAgent(
name="GrammarCheck",
model=GEMINI_2_FLASH,
instruction="""You are a grammar checker. Check the grammar of the story provided: {current_story}. Output only the suggested
corrections as a list, or output 'Grammar is good!' if there are no errors.""",
input_schema=None,
output_key="grammar_suggestions",
)
tone_check = LlmAgent(
name="ToneCheck",
model=GEMINI_2_FLASH,
instruction="""You are a tone analyzer. Analyze the tone of the story provided: {current_story}. Output only one word: 'positive' if
the tone is generally positive, 'negative' if the tone is generally negative, or 'neutral'
otherwise.""",
input_schema=None,
output_key="tone_check_result", # This agent's output determines the conditional flow
)
// --- Define the individual LLM agents ---
const storyGenerator = new LlmAgent({
name: "StoryGenerator",
model: GEMINI_MODEL,
instruction: `You are a story writer. Write a short story (around 100 words), on the following topic: {topic}`,
outputKey: "current_story",
});
const critic = new LlmAgent({
name: "Critic",
model: GEMINI_MODEL,
instruction: `You are a story critic. Review the story provided: {{current_story}}. Provide 1-2 sentences of constructive criticism
on how to improve it. Focus on plot or character.`,
outputKey: "criticism",
});
const reviser = new LlmAgent({
name: "Reviser",
model: GEMINI_MODEL,
instruction: `You are a story reviser. Revise the story provided: {{current_story}}, based on the criticism in
{{criticism}}. Output only the revised story.`,
outputKey: "current_story", // Overwrites the original story
});
const grammarCheck = new LlmAgent({
name: "GrammarCheck",
model: GEMINI_MODEL,
instruction: `You are a grammar checker. Check the grammar of the story provided: {current_story}. Output only the suggested
corrections as a list, or output 'Grammar is good!' if there are no errors.`,
outputKey: "grammar_suggestions",
});
const toneCheck = new LlmAgent({
name: "ToneCheck",
model: GEMINI_MODEL,
instruction: `You are a tone analyzer. Analyze the tone of the story provided: {current_story}. Output only one word: 'positive' if
the tone is generally positive, 'negative' if the tone is generally negative, or 'neutral'
otherwise.`,
outputKey: "tone_check_result",
});
// --- Define the individual LLM agents ---
storyGenerator, err := llmagent.New(llmagent.Config{
Name: "StoryGenerator",
Model: model,
Description: "Generates the initial story.",
Instruction: "You are a story writer. Write a short story (around 100 words) about a cat, based on the topic: {topic}",
OutputKey: "current_story",
})
if err != nil {
log.Fatalf("Failed to create StoryGenerator agent: %v", err)
}
critic, err := llmagent.New(llmagent.Config{
Name: "Critic",
Model: model,
Description: "Critiques the story.",
Instruction: "You are a story critic. Review the story: {current_story}. Provide 1-2 sentences of constructive criticism on how to improve it. Focus on plot or character.",
OutputKey: "criticism",
})
if err != nil {
log.Fatalf("Failed to create Critic agent: %v", err)
}
reviser, err := llmagent.New(llmagent.Config{
Name: "Reviser",
Model: model,
Description: "Revises the story based on criticism.",
Instruction: "You are a story reviser. Revise the story: {current_story}, based on the criticism: {criticism}. Output only the revised story.",
OutputKey: "current_story",
})
if err != nil {
log.Fatalf("Failed to create Reviser agent: %v", err)
}
grammarCheck, err := llmagent.New(llmagent.Config{
Name: "GrammarCheck",
Model: model,
Description: "Checks grammar and suggests corrections.",
Instruction: "You are a grammar checker. Check the grammar of the story: {current_story}. Output only the suggested corrections as a list, or output 'Grammar is good!' if there are no errors.",
OutputKey: "grammar_suggestions",
})
if err != nil {
log.Fatalf("Failed to create GrammarCheck agent: %v", err)
}
toneCheck, err := llmagent.New(llmagent.Config{
Name: "ToneCheck",
Model: model,
Description: "Analyzes the tone of the story.",
Instruction: "You are a tone analyzer. Analyze the tone of the story: {current_story}. Output only one word: 'positive' if the tone is generally positive, 'negative' if the tone is generally negative, or 'neutral' otherwise.",
OutputKey: "tone_check_result",
})
if err != nil {
log.Fatalf("Failed to create ToneCheck agent: %v", err)
}
// --- Define the individual LLM agents ---
LlmAgent storyGenerator =
LlmAgent.builder()
.name("StoryGenerator")
.model(MODEL_NAME)
.description("Generates the initial story.")
.instruction(
"""
You are a story writer. Write a short story (around 100 words) about a cat,
based on the topic: {topic}
""")
.inputSchema(null)
.outputKey("current_story") // Key for storing output in session state
.build();
LlmAgent critic =
LlmAgent.builder()
.name("Critic")
.model(MODEL_NAME)
.description("Critiques the story.")
.instruction(
"""
You are a story critic. Review the story: {current_story}. Provide 1-2 sentences of constructive criticism
on how to improve it. Focus on plot or character.
""")
.inputSchema(null)
.outputKey("criticism") // Key for storing criticism in session state
.build();
LlmAgent reviser =
LlmAgent.builder()
.name("Reviser")
.model(MODEL_NAME)
.description("Revises the story based on criticism.")
.instruction(
"""
You are a story reviser. Revise the story: {current_story}, based on the criticism: {criticism}. Output only the revised story.
""")
.inputSchema(null)
.outputKey("current_story") // Overwrites the original story
.build();
LlmAgent grammarCheck =
LlmAgent.builder()
.name("GrammarCheck")
.model(MODEL_NAME)
.description("Checks grammar and suggests corrections.")
.instruction(
"""
You are a grammar checker. Check the grammar of the story: {current_story}. Output only the suggested
corrections as a list, or output 'Grammar is good!' if there are no errors.
""")
.outputKey("grammar_suggestions")
.build();
LlmAgent toneCheck =
LlmAgent.builder()
.name("ToneCheck")
.model(MODEL_NAME)
.description("Analyzes the tone of the story.")
.instruction(
"""
You are a tone analyzer. Analyze the tone of the story: {current_story}. Output only one word: 'positive' if
the tone is generally positive, 'negative' if the tone is generally negative, or 'neutral'
otherwise.
""")
.outputKey("tone_check_result") // This agent's output determines the conditional flow
.build();
LoopAgent loopAgent =
LoopAgent.builder()
.name("CriticReviserLoop")
.description("Iteratively critiques and revises the story.")
.subAgents(critic, reviser)
.maxIterations(2)
.build();
SequentialAgent sequentialAgent =
SequentialAgent.builder()
.name("PostProcessing")
.description("Performs grammar and tone checks sequentially.")
.subAgents(grammarCheck, toneCheck)
.build();
第 4 部分:实例化并运行自定义智能体¶
最后,你实例化你的 StoryFlowAgent 并像往常一样使用 Runner。
# --- Create the custom agent instance ---
story_flow_agent = StoryFlowAgent(
name="StoryFlowAgent",
story_generator=story_generator,
critic=critic,
reviser=reviser,
grammar_check=grammar_check,
tone_check=tone_check,
)
INITIAL_STATE = {"topic": "a brave kitten exploring a haunted house"}
# --- Setup Runner and Session ---
async def setup_session_and_runner():
session_service = InMemorySessionService()
session = await session_service.create_session(app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID, state=INITIAL_STATE)
logger.info(f"Initial session state: {session.state}")
runner = Runner(
agent=story_flow_agent, # Pass the custom orchestrator agent
app_name=APP_NAME,
session_service=session_service
)
return session_service, runner
# --- Function to Interact with the Agent ---
async def call_agent_async(user_input_topic: str):
"""
Sends a new topic to the agent (overwriting the initial one if needed)
and runs the workflow.
"""
session_service, runner = await setup_session_and_runner()
current_session = session_service.sessions[APP_NAME][USER_ID][SESSION_ID]
current_session.state["topic"] = user_input_topic
logger.info(f"Updated session state topic to: {user_input_topic}")
content = types.Content(role='user', parts=[types.Part(text=f"Generate a story about the preset topic.")])
events = runner.run_async(user_id=USER_ID, session_id=SESSION_ID, new_message=content)
final_response = "No final response captured."
async for event in events:
if event.is_final_response() and event.content and event.content.parts:
logger.info(f"Potential final response from [{event.author}]: {event.content.parts[0].text}")
final_response = event.content.parts[0].text
print("\n--- Agent Interaction Result ---")
print("Agent Final Response: ", final_response)
final_session = await session_service.get_session(app_name=APP_NAME,
user_id=USER_ID,
session_id=SESSION_ID)
print("Final Session State:")
import json
print(json.dumps(final_session.state, indent=2))
print("-------------------------------\n")
# --- Run the Agent ---
# Note: In Colab, you can directly use 'await' at the top level.
# If running this code as a standalone Python script, you'll need to use asyncio.run() or manage the event loop.
await call_agent_async("a lonely robot finding a friend in a junkyard")
// --- Create the custom agent instance ---
const storyFlowAgent = new StoryFlowAgent(
"StoryFlowAgent",
storyGenerator,
critic,
reviser,
grammarCheck,
toneCheck
);
const INITIAL_STATE = { "topic": "a brave kitten exploring a haunted house" };
// --- Setup Runner and Session ---
async function setupRunnerAndSession() {
const runner = new InMemoryRunner({
agent: storyFlowAgent,
appName: APP_NAME,
});
const session = await runner.sessionService.createSession({
appName: APP_NAME,
userId: USER_ID,
sessionId: SESSION_ID,
state: INITIAL_STATE,
});
console.log(`Initial session state: ${JSON.stringify(session.state, null, 2)}`);
return runner;
}
// --- Function to Interact with the Agent ---
async function callAgent(runner: InMemoryRunner, userInputTopic: string) {
const currentSession = await runner.sessionService.getSession({
appName: APP_NAME,
userId: USER_ID,
sessionId: SESSION_ID
});
if (!currentSession) {
return;
}
// Update the state with the new topic for this run
currentSession.state["topic"] = userInputTopic;
console.log(`Updated session state topic to: ${userInputTopic}`);
let finalResponse = "No final response captured.";
for await (const event of runner.runAsync({
userId: USER_ID,
sessionId: SESSION_ID,
newMessage: createUserContent(`Generate a story about: ${userInputTopic}`)
})) {
if (isFinalResponse(event) && event.content?.parts?.length) {
console.log(`Potential final response from [${event.author}]: ${event.content.parts.map(part => part.text ?? '').join('')}`);
finalResponse = event.content.parts.map(part => part.text ?? '').join('');
}
}
const finalSession = await runner.sessionService.getSession({
appName: APP_NAME,
userId: USER_ID,
sessionId: SESSION_ID
});
console.log("\n--- Agent Interaction Result ---");
console.log("Agent Final Response: ", finalResponse);
console.log("Final Session State:");
console.log(JSON.stringify(finalSession?.state, null, 2));
console.log("-------------------------------\n");
}
// --- Run the Agent ---
async function main() {
const runner = await setupRunnerAndSession();
await callAgent(runner, "a lonely robot finding a friend in a junkyard");
}
main();
// Instantiate the custom agent, which encapsulates the workflow agents.
storyFlowAgent, err := NewStoryFlowAgent(
storyGenerator,
critic,
reviser,
grammarCheck,
toneCheck,
)
if err != nil {
log.Fatalf("Failed to create story flow agent: %v", err)
}
// --- Run the Agent ---
sessionService := session.InMemoryService()
initialState := map[string]any{
"topic": "a brave kitten exploring a haunted house",
}
sessionInstance, err := sessionService.Create(ctx, &session.CreateRequest{
AppName: appName,
UserID: userID,
State: initialState,
})
if err != nil {
log.Fatalf("Failed to create session: %v", err)
}
userTopic := "a lonely robot finding a friend in a junkyard"
r, err := runner.New(runner.Config{
AppName: appName,
Agent: storyFlowAgent,
SessionService: sessionService,
})
if err != nil {
log.Fatalf("Failed to create runner: %v", err)
}
input := genai.NewContentFromText("Generate a story about: "+userTopic, genai.RoleUser)
events := r.Run(ctx, userID, sessionInstance.Session.ID(), input, agent.RunConfig{
StreamingMode: agent.StreamingModeSSE,
})
var finalResponse string
for event, err := range events {
if err != nil {
log.Fatalf("An error occurred during agent execution: %v", err)
}
for _, part := range event.Content.Parts {
// Accumulate text from all parts of the final response.
finalResponse += part.Text
}
}
fmt.Println("\n--- Agent Interaction Result ---")
fmt.Println("Agent Final Response: " + finalResponse)
finalSession, err := sessionService.Get(ctx, &session.GetRequest{
UserID: userID,
AppName: appName,
SessionID: sessionInstance.Session.ID(),
})
if err != nil {
log.Fatalf("Failed to retrieve final session: %v", err)
}
fmt.Println("Final Session State:", finalSession.Session.State())
}
// --- Function to Interact with the Agent ---
// Sends a new topic to the agent (overwriting the initial one if needed)
// and runs the workflow.
public static void runAgent(StoryFlowAgentExample agent, String userTopic) {
// --- Setup Runner and Session ---
InMemoryRunner runner = new InMemoryRunner(agent);
Map<String, Object> initialState = new HashMap<>();
initialState.put("topic", "a brave kitten exploring a haunted house");
Session session =
runner
.sessionService()
.createSession(APP_NAME, USER_ID, new ConcurrentHashMap<>(initialState), SESSION_ID)
.blockingGet();
logger.log(Level.INFO, () -> String.format("Initial session state: %s", session.state()));
session.state().put("topic", userTopic); // Update the state in the retrieved session
logger.log(Level.INFO, () -> String.format("Updated session state topic to: %s", userTopic));
Content userMessage = Content.fromParts(Part.fromText("Generate a story about: " + userTopic));
// Use the modified session object for the run
Flowable<Event> eventStream = runner.runAsync(USER_ID, session.id(), userMessage);
final String[] finalResponse = {"No final response captured."};
eventStream.blockingForEach(
event -> {
if (event.finalResponse() && event.content().isPresent()) {
String author = event.author() != null ? event.author() : "UNKNOWN_AUTHOR";
Optional<String> textOpt =
event
.content()
.flatMap(Content::parts)
.filter(parts -> !parts.isEmpty())
.map(parts -> parts.get(0).text().orElse(""));
logger.log(Level.INFO, () ->
String.format("Potential final response from [%s]: %s", author, textOpt.orElse("N/A")));
textOpt.ifPresent(text -> finalResponse[0] = text);
}
});
System.out.println("\n--- Agent Interaction Result ---");
System.out.println("Agent Final Response: " + finalResponse[0]);
// Retrieve session again to see the final state after the run
Session finalSession =
runner
.sessionService()
.getSession(APP_NAME, USER_ID, SESSION_ID, Optional.empty())
.blockingGet();
assert finalSession != null;
System.out.println("Final Session State:" + finalSession.state());
System.out.println("-------------------------------\n");
}
(注意:完整的可运行代码,包括导入和执行逻辑,可以在下面链接中找到。)
Storyflow 智能体完整代码¶
故事流智能体
# StoryFlowAgent 示例的完整可运行代码
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import AsyncGenerator
from typing_extensions import override
from google.adk.agents import LlmAgent, BaseAgent, LoopAgent, SequentialAgent
from google.adk.agents.invocation_context import InvocationContext
from google.genai import types
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.adk.events import Event
from pydantic import BaseModel, Field
# --- Constants ---
APP_NAME = "story_app"
USER_ID = "12345"
SESSION_ID = "123344"
GEMINI_2_FLASH = "gemini-2.0-flash"
# --- Configure Logging ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- Custom Orchestrator Agent ---
class StoryFlowAgent(BaseAgent):
"""
Custom agent for a story generation and refinement workflow.
This agent orchestrates a sequence of LLM agents to generate a story,
critique it, revise it, check grammar and tone, and potentially
regenerate the story if the tone is negative.
"""
# --- Field Declarations for Pydantic ---
# Declare the agents passed during initialization as class attributes with type hints
story_generator: LlmAgent
critic: LlmAgent
reviser: LlmAgent
grammar_check: LlmAgent
tone_check: LlmAgent
loop_agent: LoopAgent
sequential_agent: SequentialAgent
# model_config allows setting Pydantic configurations if needed, e.g., arbitrary_types_allowed
model_config = {"arbitrary_types_allowed": True}
def __init__(
self,
name: str,
story_generator: LlmAgent,
critic: LlmAgent,
reviser: LlmAgent,
grammar_check: LlmAgent,
tone_check: LlmAgent,
):
"""
Initializes the StoryFlowAgent.
Args:
name: The name of the agent.
story_generator: An LlmAgent to generate the initial story.
critic: An LlmAgent to critique the story.
reviser: An LlmAgent to revise the story based on criticism.
grammar_check: An LlmAgent to check the grammar.
tone_check: An LlmAgent to analyze the tone.
"""
# Create internal agents *before* calling super().__init__
loop_agent = LoopAgent(
name="CriticReviserLoop", sub_agents=[critic, reviser], max_iterations=2
)
sequential_agent = SequentialAgent(
name="PostProcessing", sub_agents=[grammar_check, tone_check]
)
# Define the sub_agents list for the framework
sub_agents_list = [
story_generator,
loop_agent,
sequential_agent,
]
# Pydantic will validate and assign them based on the class annotations.
super().__init__(
name=name,
story_generator=story_generator,
critic=critic,
reviser=reviser,
grammar_check=grammar_check,
tone_check=tone_check,
loop_agent=loop_agent,
sequential_agent=sequential_agent,
sub_agents=sub_agents_list, # Pass the sub_agents list directly
)
@override
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
"""
Implements the custom orchestration logic for the story workflow.
Uses the instance attributes assigned by Pydantic (e.g., self.story_generator).
"""
logger.info(f"[{self.name}] Starting story generation workflow.")
# 1. Initial Story Generation
logger.info(f"[{self.name}] Running StoryGenerator...")
async for event in self.story_generator.run_async(ctx):
logger.info(f"[{self.name}] Event from StoryGenerator: {event.model_dump_json(indent=2, exclude_none=True)}")
yield event
# Check if story was generated before proceeding
if "current_story" not in ctx.session.state or not ctx.session.state["current_story"]:
logger.error(f"[{self.name}] Failed to generate initial story. Aborting workflow.")
return # Stop processing if initial story failed
logger.info(f"[{self.name}] Story state after generator: {ctx.session.state.get('current_story')}")
# 2. Critic-Reviser Loop
logger.info(f"[{self.name}] Running CriticReviserLoop...")
# Use the loop_agent instance attribute assigned during init
async for event in self.loop_agent.run_async(ctx):
logger.info(f"[{self.name}] Event from CriticReviserLoop: {event.model_dump_json(indent=2, exclude_none=True)}")
yield event
logger.info(f"[{self.name}] Story state after loop: {ctx.session.state.get('current_story')}")
# 3. Sequential Post-Processing (Grammar and Tone Check)
logger.info(f"[{self.name}] Running PostProcessing...")
# Use the sequential_agent instance attribute assigned during init
async for event in self.sequential_agent.run_async(ctx):
logger.info(f"[{self.name}] Event from PostProcessing: {event.model_dump_json(indent=2, exclude_none=True)}")
yield event
# 4. Tone-Based Conditional Logic
tone_check_result = ctx.session.state.get("tone_check_result")
logger.info(f"[{self.name}] Tone check result: {tone_check_result}")
if tone_check_result == "negative":
logger.info(f"[{self.name}] Tone is negative. Regenerating story...")
async for event in self.story_generator.run_async(ctx):
logger.info(f"[{self.name}] Event from StoryGenerator (Regen): {event.model_dump_json(indent=2, exclude_none=True)}")
yield event
else:
logger.info(f"[{self.name}] Tone is not negative. Keeping current story.")
pass
logger.info(f"[{self.name}] Workflow finished.")
# --- Define the individual LLM agents ---
story_generator = LlmAgent(
name="StoryGenerator",
model=GEMINI_2_FLASH,
instruction="""You are a story writer. Write a short story (around 100 words), on the following topic: {topic}""",
input_schema=None,
output_key="current_story", # Key for storing output in session state
)
critic = LlmAgent(
name="Critic",
model=GEMINI_2_FLASH,
instruction="""You are a story critic. Review the story provided: {{current_story}}. Provide 1-2 sentences of constructive criticism
on how to improve it. Focus on plot or character.""",
input_schema=None,
output_key="criticism", # Key for storing criticism in session state
)
reviser = LlmAgent(
name="Reviser",
model=GEMINI_2_FLASH,
instruction="""You are a story reviser. Revise the story provided: {{current_story}}, based on the criticism in
{{criticism}}. Output only the revised story.""",
input_schema=None,
output_key="current_story", # Overwrites the original story
)
grammar_check = LlmAgent(
name="GrammarCheck",
model=GEMINI_2_FLASH,
instruction="""You are a grammar checker. Check the grammar of the story provided: {current_story}. Output only the suggested
corrections as a list, or output 'Grammar is good!' if there are no errors.""",
input_schema=None,
output_key="grammar_suggestions",
)
tone_check = LlmAgent(
name="ToneCheck",
model=GEMINI_2_FLASH,
instruction="""You are a tone analyzer. Analyze the tone of the story provided: {current_story}. Output only one word: 'positive' if
the tone is generally positive, 'negative' if the tone is generally negative, or 'neutral'
otherwise.""",
input_schema=None,
output_key="tone_check_result", # This agent's output determines the conditional flow
)
# --- Create the custom agent instance ---
story_flow_agent = StoryFlowAgent(
name="StoryFlowAgent",
story_generator=story_generator,
critic=critic,
reviser=reviser,
grammar_check=grammar_check,
tone_check=tone_check,
)
INITIAL_STATE = {"topic": "a brave kitten exploring a haunted house"}
# --- Setup Runner and Session ---
async def setup_session_and_runner():
session_service = InMemorySessionService()
session = await session_service.create_session(app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID, state=INITIAL_STATE)
logger.info(f"Initial session state: {session.state}")
runner = Runner(
agent=story_flow_agent, # Pass the custom orchestrator agent
app_name=APP_NAME,
session_service=session_service
)
return session_service, runner
# --- Function to Interact with the Agent ---
async def call_agent_async(user_input_topic: str):
"""
Sends a new topic to the agent (overwriting the initial one if needed)
and runs the workflow.
"""
session_service, runner = await setup_session_and_runner()
current_session = session_service.sessions[APP_NAME][USER_ID][SESSION_ID]
current_session.state["topic"] = user_input_topic
logger.info(f"Updated session state topic to: {user_input_topic}")
content = types.Content(role='user', parts=[types.Part(text=f"Generate a story about the preset topic.")])
events = runner.run_async(user_id=USER_ID, session_id=SESSION_ID, new_message=content)
final_response = "No final response captured."
async for event in events:
if event.is_final_response() and event.content and event.content.parts:
logger.info(f"Potential final response from [{event.author}]: {event.content.parts[0].text}")
final_response = event.content.parts[0].text
print("\n--- Agent Interaction Result ---")
print("Agent Final Response: ", final_response)
final_session = await session_service.get_session(app_name=APP_NAME,
user_id=USER_ID,
session_id=SESSION_ID)
print("Final Session State:")
import json
print(json.dumps(final_session.state, indent=2))
print("-------------------------------\n")
# --- Run the Agent ---
# Note: In Colab, you can directly use 'await' at the top level.
# If running this code as a standalone Python script, you'll need to use asyncio.run() or manage the event loop.
await call_agent_async("a lonely robot finding a friend in a junkyard")
// StoryFlowAgent 示例的完整可运行代码
/**
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { LlmAgent, BaseAgent, LoopAgent, SequentialAgent, InMemoryRunner, InvocationContext, Event, isFinalResponse } from '@google/adk';
import { createUserContent } from "@google/genai";
// --- Constants ---
const APP_NAME = "story_app_ts";
const USER_ID = "12345";
const SESSION_ID = "123344_ts";
const GEMINI_MODEL = "gemini-2.5-flash";
// --- Custom Orchestrator Agent ---
class StoryFlowAgent extends BaseAgent {
// --- Property Declarations for TypeScript ---
private storyGenerator: LlmAgent;
private critic: LlmAgent;
private reviser: LlmAgent;
private grammarCheck: LlmAgent;
private toneCheck: LlmAgent;
private loopAgent: LoopAgent;
private sequentialAgent: SequentialAgent;
constructor(
name: string,
storyGenerator: LlmAgent,
critic: LlmAgent,
reviser: LlmAgent,
grammarCheck: LlmAgent,
toneCheck: LlmAgent
) {
// Create internal composite agents
const loopAgent = new LoopAgent({
name: "CriticReviserLoop",
subAgents: [critic, reviser],
maxIterations: 2,
});
const sequentialAgent = new SequentialAgent({
name: "PostProcessing",
subAgents: [grammarCheck, toneCheck],
});
// Define the sub-agents for the framework to know about
const subAgentsList = [
storyGenerator,
loopAgent,
sequentialAgent,
];
// Call the parent constructor
super({
name,
subAgents: subAgentsList,
});
// Assign agents to class properties for use in the custom run logic
this.storyGenerator = storyGenerator;
this.critic = critic;
this.reviser = reviser;
this.grammarCheck = grammarCheck;
this.toneCheck = toneCheck;
this.loopAgent = loopAgent;
this.sequentialAgent = sequentialAgent;
}
// Implements the custom orchestration logic for the story workflow.
async* runLiveImpl(ctx: InvocationContext): AsyncGenerator<Event, void, undefined> {
yield* this.runAsyncImpl(ctx);
}
// Implements the custom orchestration logic for the story workflow.
async* runAsyncImpl(ctx: InvocationContext): AsyncGenerator<Event, void, undefined> {
console.log(`[${this.name}] Starting story generation workflow.`);
// 1. Initial Story Generation
console.log(`[${this.name}] Running StoryGenerator...`);
for await (const event of this.storyGenerator.runAsync(ctx)) {
console.log(`[${this.name}] Event from StoryGenerator: ${JSON.stringify(event, null, 2)}`);
yield event;
}
// Check if the story was generated before proceeding
if (!ctx.session.state["current_story"]) {
console.error(`[${this.name}] Failed to generate initial story. Aborting workflow.`);
return; // Stop processing
}
console.log(`[${this.name}] Story state after generator: ${ctx.session.state['current_story']}`);
// 2. Critic-Reviser Loop
console.log(`[${this.name}] Running CriticReviserLoop...`);
for await (const event of this.loopAgent.runAsync(ctx)) {
console.log(`[${this.name}] Event from CriticReviserLoop: ${JSON.stringify(event, null, 2)}`);
yield event;
}
console.log(`[${this.name}] Story state after loop: ${ctx.session.state['current_story']}`);
// 3. Sequential Post-Processing (Grammar and Tone Check)
console.log(`[${this.name}] Running PostProcessing...`);
for await (const event of this.sequentialAgent.runAsync(ctx)) {
console.log(`[${this.name}] Event from PostProcessing: ${JSON.stringify(event, null, 2)}`);
yield event;
}
// 4. Tone-Based Conditional Logic
const toneCheckResult = ctx.session.state["tone_check_result"] as string;
console.log(`[${this.name}] Tone check result: ${toneCheckResult}`);
if (toneCheckResult === "negative") {
console.log(`[${this.name}] Tone is negative. Regenerating story...`);
for await (const event of this.storyGenerator.runAsync(ctx)) {
console.log(`[${this.name}] Event from StoryGenerator (Regen): ${JSON.stringify(event, null, 2)}`);
yield event;
}
} else {
console.log(`[${this.name}] Tone is not negative. Keeping current story.`);
}
console.log(`[${this.name}] Workflow finished.`);
}
}
// --- Define the individual LLM agents ---
const storyGenerator = new LlmAgent({
name: "StoryGenerator",
model: GEMINI_MODEL,
instruction: `You are a story writer. Write a short story (around 100 words), on the following topic: {topic}`,
outputKey: "current_story",
});
const critic = new LlmAgent({
name: "Critic",
model: GEMINI_MODEL,
instruction: `You are a story critic. Review the story provided: {{current_story}}. Provide 1-2 sentences of constructive criticism
on how to improve it. Focus on plot or character.`,
outputKey: "criticism",
});
const reviser = new LlmAgent({
name: "Reviser",
model: GEMINI_MODEL,
instruction: `You are a story reviser. Revise the story provided: {{current_story}}, based on the criticism in
{{criticism}}. Output only the revised story.`,
outputKey: "current_story", // Overwrites the original story
});
const grammarCheck = new LlmAgent({
name: "GrammarCheck",
model: GEMINI_MODEL,
instruction: `You are a grammar checker. Check the grammar of the story provided: {current_story}. Output only the suggested
corrections as a list, or output 'Grammar is good!' if there are no errors.`,
outputKey: "grammar_suggestions",
});
const toneCheck = new LlmAgent({
name: "ToneCheck",
model: GEMINI_MODEL,
instruction: `You are a tone analyzer. Analyze the tone of the story provided: {current_story}. Output only one word: 'positive' if
the tone is generally positive, 'negative' if the tone is generally negative, or 'neutral'
otherwise.`,
outputKey: "tone_check_result",
});
// --- Create the custom agent instance ---
const storyFlowAgent = new StoryFlowAgent(
"StoryFlowAgent",
storyGenerator,
critic,
reviser,
grammarCheck,
toneCheck
);
const INITIAL_STATE = { "topic": "a brave kitten exploring a haunted house" };
// --- Setup Runner and Session ---
async function setupRunnerAndSession() {
const runner = new InMemoryRunner({
agent: storyFlowAgent,
appName: APP_NAME,
});
const session = await runner.sessionService.createSession({
appName: APP_NAME,
userId: USER_ID,
sessionId: SESSION_ID,
state: INITIAL_STATE,
});
console.log(`Initial session state: ${JSON.stringify(session.state, null, 2)}`);
return runner;
}
// --- Function to Interact with the Agent ---
async function callAgent(runner: InMemoryRunner, userInputTopic: string) {
const currentSession = await runner.sessionService.getSession({
appName: APP_NAME,
userId: USER_ID,
sessionId: SESSION_ID
});
if (!currentSession) {
return;
}
// Update the state with the new topic for this run
currentSession.state["topic"] = userInputTopic;
console.log(`Updated session state topic to: ${userInputTopic}`);
let finalResponse = "No final response captured.";
for await (const event of runner.runAsync({
userId: USER_ID,
sessionId: SESSION_ID,
newMessage: createUserContent(`Generate a story about: ${userInputTopic}`)
})) {
if (isFinalResponse(event) && event.content?.parts?.length) {
console.log(`Potential final response from [${event.author}]: ${event.content.parts.map(part => part.text ?? '').join('')}`);
finalResponse = event.content.parts.map(part => part.text ?? '').join('');
}
}
const finalSession = await runner.sessionService.getSession({
appName: APP_NAME,
userId: USER_ID,
sessionId: SESSION_ID
});
console.log("\n--- Agent Interaction Result ---");
console.log("Agent Final Response: ", finalResponse);
console.log("Final Session State:");
console.log(JSON.stringify(finalSession?.state, null, 2));
console.log("-------------------------------\n");
}
// --- Run the Agent ---
async function main() {
const runner = await setupRunnerAndSession();
await callAgent(runner, "a lonely robot finding a friend in a junkyard");
}
main();
# StoryFlowAgent 示例的完整可运行代码
package main
import (
"context"
"fmt"
"iter"
"log"
"google.golang.org/adk/agent/workflowagents/loopagent"
"google.golang.org/adk/agent/workflowagents/sequentialagent"
"google.golang.org/adk/agent"
"google.golang.org/adk/agent/llmagent"
"google.golang.org/adk/model/gemini"
"google.golang.org/adk/runner"
"google.golang.org/adk/session"
"google.golang.org/genai"
)
// StoryFlowAgent is a custom agent that orchestrates a story generation workflow.
// It encapsulates the logic of running sub-agents in a specific sequence.
type StoryFlowAgent struct {
storyGenerator agent.Agent
revisionLoopAgent agent.Agent
postProcessorAgent agent.Agent
}
// NewStoryFlowAgent creates and configures the entire custom agent workflow.
// It takes individual LLM agents as input and internally creates the necessary
// workflow agents (loop, sequential), returning the final orchestrator agent.
func NewStoryFlowAgent(
storyGenerator,
critic,
reviser,
grammarCheck,
toneCheck agent.Agent,
) (agent.Agent, error) {
loopAgent, err := loopagent.New(loopagent.Config{
MaxIterations: 2,
AgentConfig: agent.Config{
Name: "CriticReviserLoop",
SubAgents: []agent.Agent{critic, reviser},
},
})
if err != nil {
return nil, fmt.Errorf("failed to create loop agent: %w", err)
}
sequentialAgent, err := sequentialagent.New(sequentialagent.Config{
AgentConfig: agent.Config{
Name: "PostProcessing",
SubAgents: []agent.Agent{grammarCheck, toneCheck},
},
})
if err != nil {
return nil, fmt.Errorf("failed to create sequential agent: %w", err)
}
// The StoryFlowAgent struct holds the agents needed for the Run method.
orchestrator := &StoryFlowAgent{
storyGenerator: storyGenerator,
revisionLoopAgent: loopAgent,
postProcessorAgent: sequentialAgent,
}
// agent.New creates the final agent, wiring up the Run method.
return agent.New(agent.Config{
Name: "StoryFlowAgent",
Description: "Orchestrates story generation, critique, revision, and checks.",
SubAgents: []agent.Agent{storyGenerator, loopAgent, sequentialAgent},
Run: orchestrator.Run,
})
}
// Run defines the custom execution logic for the StoryFlowAgent.
func (s *StoryFlowAgent) Run(ctx agent.InvocationContext) iter.Seq2[*session.Event, error] {
return func(yield func(*session.Event, error) bool) {
// Stage 1: Initial Story Generation
for event, err := range s.storyGenerator.Run(ctx) {
if err != nil {
yield(nil, fmt.Errorf("story generator failed: %w", err))
return
}
if !yield(event, nil) {
return
}
}
// Check if story was generated before proceeding
currentStory, err := ctx.Session().State().Get("current_story")
if err != nil || currentStory == "" {
log.Println("Failed to generate initial story. Aborting workflow.")
return
}
// Stage 2: Critic-Reviser Loop
for event, err := range s.revisionLoopAgent.Run(ctx) {
if err != nil {
yield(nil, fmt.Errorf("loop agent failed: %w", err))
return
}
if !yield(event, nil) {
return
}
}
// Stage 3: Post-Processing
for event, err := range s.postProcessorAgent.Run(ctx) {
if err != nil {
yield(nil, fmt.Errorf("sequential agent failed: %w", err))
return
}
if !yield(event, nil) {
return
}
}
// Stage 4: Conditional Regeneration
toneResult, err := ctx.Session().State().Get("tone_check_result")
if err != nil {
log.Printf("Could not read tone_check_result from state: %v. Assuming tone is not negative.", err)
return
}
if tone, ok := toneResult.(string); ok && tone == "negative" {
log.Println("Tone is negative. Regenerating story...")
for event, err := range s.storyGenerator.Run(ctx) {
if err != nil {
yield(nil, fmt.Errorf("story regeneration failed: %w", err))
return
}
if !yield(event, nil) {
return
}
}
} else {
log.Println("Tone is not negative. Keeping current story.")
}
}
}
const (
modelName = "gemini-2.0-flash"
appName = "story_app"
userID = "user_12345"
)
func main() {
ctx := context.Background()
model, err := gemini.NewModel(ctx, modelName, &genai.ClientConfig{})
if err != nil {
log.Fatalf("Failed to create model: %v", err)
}
// --- Define the individual LLM agents ---
storyGenerator, err := llmagent.New(llmagent.Config{
Name: "StoryGenerator",
Model: model,
Description: "Generates the initial story.",
Instruction: "You are a story writer. Write a short story (around 100 words) about a cat, based on the topic: {topic}",
OutputKey: "current_story",
})
if err != nil {
log.Fatalf("Failed to create StoryGenerator agent: %v", err)
}
critic, err := llmagent.New(llmagent.Config{
Name: "Critic",
Model: model,
Description: "Critiques the story.",
Instruction: "You are a story critic. Review the story: {current_story}. Provide 1-2 sentences of constructive criticism on how to improve it. Focus on plot or character.",
OutputKey: "criticism",
})
if err != nil {
log.Fatalf("Failed to create Critic agent: %v", err)
}
reviser, err := llmagent.New(llmagent.Config{
Name: "Reviser",
Model: model,
Description: "Revises the story based on criticism.",
Instruction: "You are a story reviser. Revise the story: {current_story}, based on the criticism: {criticism}. Output only the revised story.",
OutputKey: "current_story",
})
if err != nil {
log.Fatalf("Failed to create Reviser agent: %v", err)
}
grammarCheck, err := llmagent.New(llmagent.Config{
Name: "GrammarCheck",
Model: model,
Description: "Checks grammar and suggests corrections.",
Instruction: "You are a grammar checker. Check the grammar of the story: {current_story}. Output only the suggested corrections as a list, or output 'Grammar is good!' if there are no errors.",
OutputKey: "grammar_suggestions",
})
if err != nil {
log.Fatalf("Failed to create GrammarCheck agent: %v", err)
}
toneCheck, err := llmagent.New(llmagent.Config{
Name: "ToneCheck",
Model: model,
Description: "Analyzes the tone of the story.",
Instruction: "You are a tone analyzer. Analyze the tone of the story: {current_story}. Output only one word: 'positive' if the tone is generally positive, 'negative' if the tone is generally negative, or 'neutral' otherwise.",
OutputKey: "tone_check_result",
})
if err != nil {
log.Fatalf("Failed to create ToneCheck agent: %v", err)
}
// Instantiate the custom agent, which encapsulates the workflow agents.
storyFlowAgent, err := NewStoryFlowAgent(
storyGenerator,
critic,
reviser,
grammarCheck,
toneCheck,
)
if err != nil {
log.Fatalf("Failed to create story flow agent: %v", err)
}
// --- Run the Agent ---
sessionService := session.InMemoryService()
initialState := map[string]any{
"topic": "a brave kitten exploring a haunted house",
}
sessionInstance, err := sessionService.Create(ctx, &session.CreateRequest{
AppName: appName,
UserID: userID,
State: initialState,
})
if err != nil {
log.Fatalf("Failed to create session: %v", err)
}
userTopic := "a lonely robot finding a friend in a junkyard"
r, err := runner.New(runner.Config{
AppName: appName,
Agent: storyFlowAgent,
SessionService: sessionService,
})
if err != nil {
log.Fatalf("Failed to create runner: %v", err)
}
input := genai.NewContentFromText("Generate a story about: "+userTopic, genai.RoleUser)
events := r.Run(ctx, userID, sessionInstance.Session.ID(), input, agent.RunConfig{
StreamingMode: agent.StreamingModeSSE,
})
var finalResponse string
for event, err := range events {
if err != nil {
log.Fatalf("An error occurred during agent execution: %v", err)
}
for _, part := range event.Content.Parts {
// Accumulate text from all parts of the final response.
finalResponse += part.Text
}
}
fmt.Println("\n--- Agent Interaction Result ---")
fmt.Println("Agent Final Response: " + finalResponse)
finalSession, err := sessionService.Get(ctx, &session.GetRequest{
UserID: userID,
AppName: appName,
SessionID: sessionInstance.Session.ID(),
})
if err != nil {
log.Fatalf("Failed to retrieve final session: %v", err)
}
fmt.Println("Final Session State:", finalSession.Session.State())
}
# StoryFlowAgent 示例的完整可运行代码
import com.google.adk.agents.LlmAgent;
import com.google.adk.agents.BaseAgent;
import com.google.adk.agents.InvocationContext;
import com.google.adk.agents.LoopAgent;
import com.google.adk.agents.SequentialAgent;
import com.google.adk.events.Event;
import com.google.adk.runner.InMemoryRunner;
import com.google.adk.sessions.Session;
import com.google.genai.types.Content;
import com.google.genai.types.Part;
import io.reactivex.rxjava3.core.Flowable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
public class StoryFlowAgentExample extends BaseAgent {
// --- Constants ---
private static final String APP_NAME = "story_app";
private static final String USER_ID = "user_12345";
private static final String SESSION_ID = "session_123344";
private static final String MODEL_NAME = "gemini-2.0-flash"; // Ensure this model is available
private static final Logger logger = Logger.getLogger(StoryFlowAgentExample.class.getName());
private final LlmAgent storyGenerator;
private final LoopAgent loopAgent;
private final SequentialAgent sequentialAgent;
public StoryFlowAgentExample(
String name, LlmAgent storyGenerator, LoopAgent loopAgent, SequentialAgent sequentialAgent) {
super(
name,
"Orchestrates story generation, critique, revision, and checks.",
List.of(storyGenerator, loopAgent, sequentialAgent),
null,
null);
this.storyGenerator = storyGenerator;
this.loopAgent = loopAgent;
this.sequentialAgent = sequentialAgent;
}
public static void main(String[] args) {
// --- Define the individual LLM agents ---
LlmAgent storyGenerator =
LlmAgent.builder()
.name("StoryGenerator")
.model(MODEL_NAME)
.description("Generates the initial story.")
.instruction(
"""
You are a story writer. Write a short story (around 100 words) about a cat,
based on the topic: {topic}
""")
.inputSchema(null)
.outputKey("current_story") // Key for storing output in session state
.build();
LlmAgent critic =
LlmAgent.builder()
.name("Critic")
.model(MODEL_NAME)
.description("Critiques the story.")
.instruction(
"""
You are a story critic. Review the story: {current_story}. Provide 1-2 sentences of constructive criticism
on how to improve it. Focus on plot or character.
""")
.inputSchema(null)
.outputKey("criticism") // Key for storing criticism in session state
.build();
LlmAgent reviser =
LlmAgent.builder()
.name("Reviser")
.model(MODEL_NAME)
.description("Revises the story based on criticism.")
.instruction(
"""
You are a story reviser. Revise the story: {current_story}, based on the criticism: {criticism}. Output only the revised story.
""")
.inputSchema(null)
.outputKey("current_story") // Overwrites the original story
.build();
LlmAgent grammarCheck =
LlmAgent.builder()
.name("GrammarCheck")
.model(MODEL_NAME)
.description("Checks grammar and suggests corrections.")
.instruction(
"""
You are a grammar checker. Check the grammar of the story: {current_story}. Output only the suggested
corrections as a list, or output 'Grammar is good!' if there are no errors.
""")
.outputKey("grammar_suggestions")
.build();
LlmAgent toneCheck =
LlmAgent.builder()
.name("ToneCheck")
.model(MODEL_NAME)
.description("Analyzes the tone of the story.")
.instruction(
"""
You are a tone analyzer. Analyze the tone of the story: {current_story}. Output only one word: 'positive' if
the tone is generally positive, 'negative' if the tone is generally negative, or 'neutral'
otherwise.
""")
.outputKey("tone_check_result") // This agent's output determines the conditional flow
.build();
LoopAgent loopAgent =
LoopAgent.builder()
.name("CriticReviserLoop")
.description("Iteratively critiques and revises the story.")
.subAgents(critic, reviser)
.maxIterations(2)
.build();
SequentialAgent sequentialAgent =
SequentialAgent.builder()
.name("PostProcessing")
.description("Performs grammar and tone checks sequentially.")
.subAgents(grammarCheck, toneCheck)
.build();
StoryFlowAgentExample storyFlowAgentExample =
new StoryFlowAgentExample(APP_NAME, storyGenerator, loopAgent, sequentialAgent);
// --- Run the Agent ---
runAgent(storyFlowAgentExample, "a lonely robot finding a friend in a junkyard");
}
// --- Function to Interact with the Agent ---
// Sends a new topic to the agent (overwriting the initial one if needed)
// and runs the workflow.
public static void runAgent(StoryFlowAgentExample agent, String userTopic) {
// --- Setup Runner and Session ---
InMemoryRunner runner = new InMemoryRunner(agent);
Map<String, Object> initialState = new HashMap<>();
initialState.put("topic", "a brave kitten exploring a haunted house");
Session session =
runner
.sessionService()
.createSession(APP_NAME, USER_ID, new ConcurrentHashMap<>(initialState), SESSION_ID)
.blockingGet();
logger.log(Level.INFO, () -> String.format("Initial session state: %s", session.state()));
session.state().put("topic", userTopic); // Update the state in the retrieved session
logger.log(Level.INFO, () -> String.format("Updated session state topic to: %s", userTopic));
Content userMessage = Content.fromParts(Part.fromText("Generate a story about: " + userTopic));
// Use the modified session object for the run
Flowable<Event> eventStream = runner.runAsync(USER_ID, session.id(), userMessage);
final String[] finalResponse = {"No final response captured."};
eventStream.blockingForEach(
event -> {
if (event.finalResponse() && event.content().isPresent()) {
String author = event.author() != null ? event.author() : "UNKNOWN_AUTHOR";
Optional<String> textOpt =
event
.content()
.flatMap(Content::parts)
.filter(parts -> !parts.isEmpty())
.map(parts -> parts.get(0).text().orElse(""));
logger.log(Level.INFO, () ->
String.format("Potential final response from [%s]: %s", author, textOpt.orElse("N/A")));
textOpt.ifPresent(text -> finalResponse[0] = text);
}
});
System.out.println("\n--- Agent Interaction Result ---");
System.out.println("Agent Final Response: " + finalResponse[0]);
// Retrieve session again to see the final state after the run
Session finalSession =
runner
.sessionService()
.getSession(APP_NAME, USER_ID, SESSION_ID, Optional.empty())
.blockingGet();
assert finalSession != null;
System.out.println("Final Session State:" + finalSession.state());
System.out.println("-------------------------------\n");
}
private boolean isStoryGenerated(InvocationContext ctx) {
Object currentStoryObj = ctx.session().state().get("current_story");
return currentStoryObj != null && !String.valueOf(currentStoryObj).isEmpty();
}
@Override
protected Flowable<Event> runAsyncImpl(InvocationContext invocationContext) {
// Implements the custom orchestration logic for the story workflow.
// Uses the instance attributes assigned by Pydantic (e.g., self.story_generator).
logger.log(Level.INFO, () -> String.format("[%s] Starting story generation workflow.", name()));
// Stage 1. Initial Story Generation
Flowable<Event> storyGenFlow = runStage(storyGenerator, invocationContext, "StoryGenerator");
// Stage 2: Critic-Reviser Loop (runs after story generation completes)
Flowable<Event> criticReviserFlow = Flowable.defer(() -> {
if (!isStoryGenerated(invocationContext)) {
logger.log(Level.SEVERE,() ->
String.format("[%s] Failed to generate initial story. Aborting after StoryGenerator.",
name()));
return Flowable.empty(); // Stop further processing if no story
}
logger.log(Level.INFO, () ->
String.format("[%s] Story state after generator: %s",
name(), invocationContext.session().state().get("current_story")));
return runStage(loopAgent, invocationContext, "CriticReviserLoop");
});
// Stage 3: Post-Processing (runs after critic-reviser loop completes)
Flowable<Event> postProcessingFlow = Flowable.defer(() -> {
logger.log(Level.INFO, () ->
String.format("[%s] Story state after loop: %s",
name(), invocationContext.session().state().get("current_story")));
return runStage(sequentialAgent, invocationContext, "PostProcessing");
});
// Stage 4: Conditional Regeneration (runs after post-processing completes)
Flowable<Event> conditionalRegenFlow = Flowable.defer(() -> {
String toneCheckResult = (String) invocationContext.session().state().get("tone_check_result");
logger.log(Level.INFO, () -> String.format("[%s] Tone check result: %s", name(), toneCheckResult));
if ("negative".equalsIgnoreCase(toneCheckResult)) {
logger.log(Level.INFO, () ->
String.format("[%s] Tone is negative. Regenerating story...", name()));
return runStage(storyGenerator, invocationContext, "StoryGenerator (Regen)");
} else {
logger.log(Level.INFO, () ->
String.format("[%s] Tone is not negative. Keeping current story.", name()));
return Flowable.empty(); // No regeneration needed
}
});
return Flowable.concatArray(storyGenFlow, criticReviserFlow, postProcessingFlow, conditionalRegenFlow)
.doOnComplete(() -> logger.log(Level.INFO, () -> String.format("[%s] Workflow finished.", name())));
}
// Helper method for a single agent run stage with logging
private Flowable<Event> runStage(BaseAgent agentToRun, InvocationContext ctx, String stageName) {
logger.log(Level.INFO, () -> String.format("[%s] Running %s...", name(), stageName));
return agentToRun
.runAsync(ctx)
.doOnNext(event ->
logger.log(Level.INFO,() ->
String.format("[%s] Event from %s: %s", name(), stageName, event.toJson())))
.doOnError(err ->
logger.log(Level.SEVERE,
String.format("[%s] Error in %s", name(), stageName), err))
.doOnComplete(() ->
logger.log(Level.INFO, () ->
String.format("[%s] %s finished.", name(), stageName)));
}
@Override
protected Flowable<Event> runLiveImpl(InvocationContext invocationContext) {
return Flowable.error(new UnsupportedOperationException("runLive not implemented."));
}
}