!!! warning "高级概念" {: #advanced-concept}
通过直接实现 `_run_async_impl`(或其他语言中的等效方法)来构建自定义智能体,能够提供强大的控制能力,但比使用预定义的 `LlmAgent` 或标准 `WorkflowAgent` 类型更为复杂。我们建议你在尝试自定义编排逻辑之前,先理解这些基础的智能体类型。
自定义智能体¶
自定义智能体在 ADK 中提供了最大的灵活性,允许你通过直接继承 BaseAgent
并实现你自己的控制流来定义任意编排逻辑。这超越了 SequentialAgent
、LoopAgent
和 ParallelAgent
的预定义模式,使你能够构建高度特定和复杂的智能体工作流。
引言:超越预定义工作流¶
什么是自定义智能体?¶
自定义智能体本质上是你创建的任何继承自 google.adk.agents.BaseAgent
的类,并在其异步核心方法 _run_async_impl
中实现主要执行逻辑。你可以完全控制该方法如何调用其他智能体(子智能体)、管理状态以及处理事件。
Note
用于实现智能体核心异步逻辑的方法名可能会因 SDK 语言而略有不同(如 Java 中为 runAsyncImpl
,Python 中为 _run_async_impl
)。详情请参考对应语言的 API 文档。
为什么使用它们?¶
虽然标准的工作流智能体(SequentialAgent
、LoopAgent
、ParallelAgent
)涵盖了常见的编排模式,但当你的需求包括以下内容时,你将需要一个自定义智能体:
- 条件逻辑: 根据运行时条件或前序步骤的结果执行不同的子智能体或采取不同路径。
- 复杂状态管理: 实现超越简单顺序传递的复杂逻辑,用于在整个工作流中维护和更新状态。
- 外部集成: 在编排流程控制中直接集成对外部 API、数据库或自定义库的调用。
- 动态智能体选择: 根据对当前情境或输入的动态评估,选择下一个要运行的子智能体。
- 独特的工作流模式: 实现不符合标准顺序、并行或循环结构的编排逻辑。
实现自定义逻辑:¶
自定义智能体的核心在于你定义其独特异步行为的方法。这个方法允许你编排子智能体并管理执行流程。
任何自定义智能体的核心都是 _run_async_impl
方法。你需要在这里定义其独特的行为。
- 签名:
async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event, None]:
- 异步生成器: 它必须是一个
async def
函数,并返回一个AsyncGenerator
。这样你可以yield
由子智能体或自身逻辑产生的事件给上层 runner。 ctx
(InvocationContext): 提供关键的运行时信息,最重要的是ctx.session.state
,这是在你的自定义智能体编排的各个步骤之间共享数据的主要方式。
任何自定义智能体的核心都是你需要重写的 runAsyncImpl
方法。
- 签名:
protected Flowable<Event> runAsyncImpl(InvocationContext ctx)
- 响应式流(Flowable): 它必须返回一个
io.reactivex.rxjava3.core.Flowable<Event>
。这个Flowable
代表了自定义智能体逻辑将产生的事件流,通常通过组合或转换来自子智能体的多个Flowable
实现。 ctx
(InvocationContext): 提供关键的运行时信息,最重要的是ctx.session().state()
,这是一个java.util.concurrent.ConcurrentMap<String, Object>
。这是在你的自定义智能体编排的各个步骤之间共享数据的主要方式。
核心异步方法中的关键能力:
-
调用子智能体: 你可以通过子智能体的
run_async
方法调用它们(通常作为实例属性存储,如self.my_llm_agent
),并yield
其事件: -
管理状态: 通过会话状态字典(
ctx.session.state
)读取和写入数据,在子智能体调用之间传递数据或做决策: -
实现控制流: 使用标准 Python 结构(
if
/elif
/else
,for
/while
循环,try
/except
)来创建涉及子智能体的复杂、条件或迭代工作流。
-
调用子智能体: 你可以通过子智能体的异步运行方法调用它们(通常作为实例属性或对象存储),并返回它们的事件流:
通常你会用 RxJava 操作符如
concatWith
、flatMapPublisher
或concatArray
链接子智能体的Flowable
。如果后续阶段的执行依赖于前序阶段的完成或状态,通常会用// 示例:运行一个子智能体 // return someSubAgent.runAsync(ctx); // 示例:顺序运行多个子智能体 Flowable<Event> firstAgentEvents = someSubAgent1.runAsync(ctx) .doOnNext(event -> System.out.println("Event from agent 1: " + event.id())); Flowable<Event> secondAgentEvents = Flowable.defer(() -> someSubAgent2.runAsync(ctx) .doOnNext(event -> System.out.println("Event from agent 2: " + event.id())) ); return firstAgentEvents.concatWith(secondAgentEvents);
Flowable.defer()
。 -
管理状态: 通过会话状态读取和写入数据,在子智能体调用之间传递数据或做决策。会话状态是通过
ctx.session().state()
获得的java.util.concurrent.ConcurrentMap<String, Object>
。// 读取前一个智能体设置的数据 Object previousResult = ctx.session().state().get("some_key"); // 根据状态做决策 if ("some_value".equals(previousResult)) { // ... 包含特定子智能体 Flowable 的逻辑 ... } else { // ... 包含另一个子智能体 Flowable 的逻辑 ... } // 为后续步骤存储结果(通常通过子智能体的 output_key 完成) // ctx.session().state().put("my_custom_result", "calculated_value");
-
实现控制流: 结合响应式操作符(RxJava)和标准语言结构(
if
/else
、循环、try
/catch
)来创建复杂的工作流。- 条件分支: 用
Flowable.defer()
根据条件选择订阅哪个Flowable
,或用filter()
在流内过滤事件。 - 迭代: 用
repeat()
、retry()
等操作符,或通过结构化Flowable
链,在条件下递归调用自身部分(通常用flatMapPublisher
或concatMap
管理)。
- 条件分支: 用
管理子智能体和状态¶
通常,自定义智能体会编排其他智能体(如 LlmAgent
、LoopAgent
等)。
- 初始化: 你通常会在自定义智能体的构造函数中传入这些子智能体的实例,并将它们存储为实例字段/属性(如
this.story_generator = story_generator_instance
或self.story_generator = story_generator_instance
)。这样它们就可以在自定义智能体的核心异步执行逻辑(如_run_async_impl
方法)中被访问到。 - 子智能体列表: 在用
super()
构造BaseAgent
时,你应该传递一个sub agents
列表。这个列表告诉 ADK 框架哪些智能体是该自定义智能体直接编排的子层级。这对于框架的生命周期管理、内省以及未来可能的路由功能都很重要,即使你的核心执行逻辑(_run_async_impl
)是直接通过self.xxx_agent
调用这些智能体的。请包含你自定义逻辑直接调用的顶层智能体。 - 状态: 如前所述,
ctx.session.state
是子智能体(尤其是使用output key
的LlmAgent
)将结果传递回编排者,以及编排者向下传递必要输入的标准方式。
设计模式示例:StoryFlowAgent
¶
让我们用一个示例模式来说明自定义智能体的强大能力:一个具有条件逻辑的多阶段内容生成工作流。
目标: 创建一个系统,生成故事,通过批评和修订进行迭代改进,执行最终检查,最重要的是,如果最终语调检查失败,则重新生成故事。
Why Custom? 推动需要自定义智能体的核心需求是基于语调检查结果的条件性再生成。标准工作流智能体没有内建基于子智能体任务结果的条件分支。我们需要在编排器中实现自定义逻辑(如 if tone == "negative": ...
)。
第 1 部分:简化的自定义智能体初始化¶
我们定义了继承自 BaseAgent
的 StoryFlowAgent
。在 __init__
方法中,我们将必要的子智能体(通过参数传入)存储为实例属性,并告知 BaseAgent
框架该自定义智能体将直接编排的顶层子智能体。
class StoryFlowAgent(BaseAgent):
"""
Custom agent for a story generation and refinement workflow.
This agent orchestrates a sequence of LLM agents to generate a story,
critique it, revise it, check grammar and tone, and potentially
regenerate the story if the tone is negative.
"""
# --- Field Declarations for Pydantic ---
# Declare the agents passed during initialization as class attributes with type hints
story_generator: LlmAgent
critic: LlmAgent
reviser: LlmAgent
grammar_check: LlmAgent
tone_check: LlmAgent
loop_agent: LoopAgent
sequential_agent: SequentialAgent
# model_config allows setting Pydantic configurations if needed, e.g., arbitrary_types_allowed
model_config = {"arbitrary_types_allowed": True}
def __init__(
self,
name: str,
story_generator: LlmAgent,
critic: LlmAgent,
reviser: LlmAgent,
grammar_check: LlmAgent,
tone_check: LlmAgent,
):
"""
Initializes the StoryFlowAgent.
Args:
name: The name of the agent.
story_generator: An LlmAgent to generate the initial story.
critic: An LlmAgent to critique the story.
reviser: An LlmAgent to revise the story based on criticism.
grammar_check: An LlmAgent to check the grammar.
tone_check: An LlmAgent to analyze the tone.
"""
# Create internal agents *before* calling super().__init__
loop_agent = LoopAgent(
name="CriticReviserLoop", sub_agents=[critic, reviser], max_iterations=2
)
sequential_agent = SequentialAgent(
name="PostProcessing", sub_agents=[grammar_check, tone_check]
)
# Define the sub_agents list for the framework
sub_agents_list = [
story_generator,
loop_agent,
sequential_agent,
]
# Pydantic will validate and assign them based on the class annotations.
super().__init__(
name=name,
story_generator=story_generator,
critic=critic,
reviser=reviser,
grammar_check=grammar_check,
tone_check=tone_check,
loop_agent=loop_agent,
sequential_agent=sequential_agent,
sub_agents=sub_agents_list, # Pass the sub_agents list directly
)
我们通过扩展 BaseAgent
定义了 StoryFlowAgentExample
。在其构造函数中,我们将必要的子智能体实例(作为参数传入)存储为实例字段。这些顶层子智能体也会作为列表传递给 BaseAgent
的 super
构造函数。
private final LlmAgent storyGenerator;
private final LoopAgent loopAgent;
private final SequentialAgent sequentialAgent;
public StoryFlowAgentExample(
String name, LlmAgent storyGenerator, LoopAgent loopAgent, SequentialAgent sequentialAgent) {
super(
name,
"Orchestrates story generation, critique, revision, and checks.",
List.of(storyGenerator, loopAgent, sequentialAgent),
null,
null);
this.storyGenerator = storyGenerator;
this.loopAgent = loopAgent;
this.sequentialAgent = sequentialAgent;
}
第 2 部分:定义自定义执行逻辑¶
该方法使用标准的 Python async/await 和控制流来编排子智能体。
@override
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
"""
Implements the custom orchestration logic for the story workflow.
Uses the instance attributes assigned by Pydantic (e.g., self.story_generator).
"""
logger.info(f"[{self.name}] Starting story generation workflow.")
# 1. Initial Story Generation
logger.info(f"[{self.name}] Running StoryGenerator...")
async for event in self.story_generator.run_async(ctx):
logger.info(f"[{self.name}] Event from StoryGenerator: {event.model_dump_json(indent=2, exclude_none=True)}")
yield event
# Check if story was generated before proceeding
if "current_story" not in ctx.session.state or not ctx.session.state["current_story"]:
logger.error(f"[{self.name}] Failed to generate initial story. Aborting workflow.")
return # Stop processing if initial story failed
logger.info(f"[{self.name}] Story state after generator: {ctx.session.state.get('current_story')}")
# 2. Critic-Reviser Loop
logger.info(f"[{self.name}] Running CriticReviserLoop...")
# Use the loop_agent instance attribute assigned during init
async for event in self.loop_agent.run_async(ctx):
logger.info(f"[{self.name}] Event from CriticReviserLoop: {event.model_dump_json(indent=2, exclude_none=True)}")
yield event
logger.info(f"[{self.name}] Story state after loop: {ctx.session.state.get('current_story')}")
# 3. Sequential Post-Processing (Grammar and Tone Check)
logger.info(f"[{self.name}] Running PostProcessing...")
# Use the sequential_agent instance attribute assigned during init
async for event in self.sequential_agent.run_async(ctx):
logger.info(f"[{self.name}] Event from PostProcessing: {event.model_dump_json(indent=2, exclude_none=True)}")
yield event
# 4. Tone-Based Conditional Logic
tone_check_result = ctx.session.state.get("tone_check_result")
logger.info(f"[{self.name}] Tone check result: {tone_check_result}")
if tone_check_result == "negative":
logger.info(f"[{self.name}] Tone is negative. Regenerating story...")
async for event in self.story_generator.run_async(ctx):
logger.info(f"[{self.name}] Event from StoryGenerator (Regen): {event.model_dump_json(indent=2, exclude_none=True)}")
yield event
else:
logger.info(f"[{self.name}] Tone is not negative. Keeping current story.")
pass
logger.info(f"[{self.name}] Workflow finished.")
- 首先运行
story_generator
,其输出应存储在ctx.session.state["current_story"]
。 - 然后运行
loop_agent
,它会在内部按顺序调用critic
和reviser
,循环max_iterations
次。它们会从 state 读取/写入current_story
和criticism
。 - 接着运行
sequential_agent
,依次调用grammar_check
和tone_check
,读取current_story
并将grammar_suggestions
和tone_check_result
写入 state。 - 自定义部分:
if
语句检查 state 中的tone_check_result
。如果为 "negative",则再次调用story_generator
,覆盖 state 中的current_story
。否则流程结束。
runAsyncImpl
方法使用 RxJava 的 Flowable 流和操作符来实现异步控制流,编排子智能体。
@Override
protected Flowable<Event> runAsyncImpl(InvocationContext invocationContext) {
// Implements the custom orchestration logic for the story workflow.
// Uses the instance attributes assigned by Pydantic (e.g., self.story_generator).
logger.log(Level.INFO, () -> String.format("[%s] Starting story generation workflow.", name()));
// Stage 1. Initial Story Generation
Flowable<Event> storyGenFlow = runStage(storyGenerator, invocationContext, "StoryGenerator");
// Stage 2: Critic-Reviser Loop (runs after story generation completes)
Flowable<Event> criticReviserFlow = Flowable.defer(() -> {
if (!isStoryGenerated(invocationContext)) {
logger.log(Level.SEVERE,() ->
String.format("[%s] Failed to generate initial story. Aborting after StoryGenerator.",
name()));
return Flowable.empty(); // Stop further processing if no story
}
logger.log(Level.INFO, () ->
String.format("[%s] Story state after generator: %s",
name(), invocationContext.session().state().get("current_story")));
return runStage(loopAgent, invocationContext, "CriticReviserLoop");
});
// Stage 3: Post-Processing (runs after critic-reviser loop completes)
Flowable<Event> postProcessingFlow = Flowable.defer(() -> {
logger.log(Level.INFO, () ->
String.format("[%s] Story state after loop: %s",
name(), invocationContext.session().state().get("current_story")));
return runStage(sequentialAgent, invocationContext, "PostProcessing");
});
// Stage 4: Conditional Regeneration (runs after post-processing completes)
Flowable<Event> conditionalRegenFlow = Flowable.defer(() -> {
String toneCheckResult = (String) invocationContext.session().state().get("tone_check_result");
logger.log(Level.INFO, () -> String.format("[%s] Tone check result: %s", name(), toneCheckResult));
if ("negative".equalsIgnoreCase(toneCheckResult)) {
logger.log(Level.INFO, () ->
String.format("[%s] Tone is negative. Regenerating story...", name()));
return runStage(storyGenerator, invocationContext, "StoryGenerator (Regen)");
} else {
logger.log(Level.INFO, () ->
String.format("[%s] Tone is not negative. Keeping current story.", name()));
return Flowable.empty(); // No regeneration needed
}
});
return Flowable.concatArray(storyGenFlow, criticReviserFlow, postProcessingFlow, conditionalRegenFlow)
.doOnComplete(() -> logger.log(Level.INFO, () -> String.format("[%s] Workflow finished.", name())));
}
// Helper method for a single agent run stage with logging
private Flowable<Event> runStage(BaseAgent agentToRun, InvocationContext ctx, String stageName) {
logger.log(Level.INFO, () -> String.format("[%s] Running %s...", name(), stageName));
return agentToRun
.runAsync(ctx)
.doOnNext(event ->
logger.log(Level.INFO,() ->
String.format("[%s] Event from %s: %s", name(), stageName, event.toJson())))
.doOnError(err ->
logger.log(Level.SEVERE,
String.format("[%s] Error in %s", name(), stageName), err))
.doOnComplete(() ->
logger.log(Level.INFO, () ->
String.format("[%s] %s finished.", name(), stageName)));
}
- 首先执行
storyGenerator.runAsync(invocationContext)
的 Flowable,其输出应存储在invocationContext.session().state().get("current_story")
。 - 然后运行
loopAgent
的 Flowable(通过Flowable.concatArray
和Flowable.defer
实现顺序),LoopAgent 内部会顺序调用critic
和reviser
,最多迭代maxIterations
次。它们会从 state 读取/写入current_story
和criticism
。 - 接着执行
sequentialAgent
的 Flowable,依次调用grammar_check
和tone_check
,读取current_story
并将grammar_suggestions
和tone_check_result
写入 state。 - 自定义部分: 在
sequentialAgent
完成后,Flowable.defer
内的逻辑会检查invocationContext.session().state()
中的 "tone_check_result"。如果为 "negative",则有条件地串联并再次执行storyGenerator
的 Flowable,覆盖 "current_story"。否则使用空 Flowable,整体工作流结束。
第 3 部分:定义 LLM 子智能体¶
这些都是标准的 LlmAgent
定义,负责具体任务。它们的 output key
参数对于将结果放入 session.state
至关重要,其他智能体或自定义编排器可以从中获取数据。
GEMINI_2_FLASH = "gemini-2.0-flash" # 定义模型常量
# --- Define the individual LLM agents ---
story_generator = LlmAgent(
name="StoryGenerator",
model=GEMINI_2_FLASH,
instruction="""You are a story writer. Write a short story (around 100 words) about a cat,
based on the topic provided in session state with key 'topic'""",
input_schema=None,
output_key="current_story", # Key for storing output in session state
)
critic = LlmAgent(
name="Critic",
model=GEMINI_2_FLASH,
instruction="""You are a story critic. Review the story provided in
session state with key 'current_story'. Provide 1-2 sentences of constructive criticism
on how to improve it. Focus on plot or character.""",
input_schema=None,
output_key="criticism", # Key for storing criticism in session state
)
reviser = LlmAgent(
name="Reviser",
model=GEMINI_2_FLASH,
instruction="""You are a story reviser. Revise the story provided in
session state with key 'current_story', based on the criticism in
session state with key 'criticism'. Output only the revised story.""",
input_schema=None,
output_key="current_story", # Overwrites the original story
)
grammar_check = LlmAgent(
name="GrammarCheck",
model=GEMINI_2_FLASH,
instruction="""You are a grammar checker. Check the grammar of the story
provided in session state with key 'current_story'. Output only the suggested
corrections as a list, or output 'Grammar is good!' if there are no errors.""",
input_schema=None,
output_key="grammar_suggestions",
)
tone_check = LlmAgent(
name="ToneCheck",
model=GEMINI_2_FLASH,
instruction="""You are a tone analyzer. Analyze the tone of the story
provided in session state with key 'current_story'. Output only one word: 'positive' if
the tone is generally positive, 'negative' if the tone is generally negative, or 'neutral'
otherwise.""",
input_schema=None,
output_key="tone_check_result", # This agent's output determines the conditional flow
)
// --- Define the individual LLM agents ---
LlmAgent storyGenerator =
LlmAgent.builder()
.name("StoryGenerator")
.model(MODEL_NAME)
.description("Generates the initial story.")
.instruction(
"""
You are a story writer. Write a short story (around 100 words) about a cat,
based on the topic provided in session state with key 'topic'
""")
.inputSchema(null)
.outputKey("current_story") // Key for storing output in session state
.build();
LlmAgent critic =
LlmAgent.builder()
.name("Critic")
.model(MODEL_NAME)
.description("Critiques the story.")
.instruction(
"""
You are a story critic. Review the story provided in
session state with key 'current_story'. Provide 1-2 sentences of constructive criticism
on how to improve it. Focus on plot or character.
""")
.inputSchema(null)
.outputKey("criticism") // Key for storing criticism in session state
.build();
LlmAgent reviser =
LlmAgent.builder()
.name("Reviser")
.model(MODEL_NAME)
.description("Revises the story based on criticism.")
.instruction(
"""
You are a story reviser. Revise the story provided in
session state with key 'current_story', based on the criticism in
session state with key 'criticism'. Output only the revised story.
""")
.inputSchema(null)
.outputKey("current_story") // Overwrites the original story
.build();
LlmAgent grammarCheck =
LlmAgent.builder()
.name("GrammarCheck")
.model(MODEL_NAME)
.description("Checks grammar and suggests corrections.")
.instruction(
"""
You are a grammar checker. Check the grammar of the story
provided in session state with key 'current_story'. Output only the suggested
corrections as a list, or output 'Grammar is good!' if there are no errors.
""")
.outputKey("grammar_suggestions")
.build();
LlmAgent toneCheck =
LlmAgent.builder()
.name("ToneCheck")
.model(MODEL_NAME)
.description("Analyzes the tone of the story.")
.instruction(
"""
You are a tone analyzer. Analyze the tone of the story
provided in session state with key 'current_story'. Output only one word: 'positive' if
the tone is generally positive, 'negative' if the tone is generally negative, or 'neutral'
otherwise.
""")
.outputKey("tone_check_result") // This agent's output determines the conditional flow
.build();
LoopAgent loopAgent =
LoopAgent.builder()
.name("CriticReviserLoop")
.description("Iteratively critiques and revises the story.")
.subAgents(critic, reviser)
.maxIterations(2)
.build();
SequentialAgent sequentialAgent =
SequentialAgent.builder()
.name("PostProcessing")
.description("Performs grammar and tone checks sequentially.")
.subAgents(grammarCheck, toneCheck)
.build();
第 4 部分:实例化并运行自定义智能体¶
最后,你实例化你的 StoryFlowAgent
并像往常一样使用 Runner
。
# --- Create the custom agent instance ---
story_flow_agent = StoryFlowAgent(
name="StoryFlowAgent",
story_generator=story_generator,
critic=critic,
reviser=reviser,
grammar_check=grammar_check,
tone_check=tone_check,
)
# --- Setup Runner and Session ---
session_service = InMemorySessionService()
initial_state = {"topic": "a brave kitten exploring a haunted house"}
session = session_service.create_session(
app_name=APP_NAME,
user_id=USER_ID,
session_id=SESSION_ID,
state=initial_state # Pass initial state here
)
logger.info(f"Initial session state: {session.state}")
runner = Runner(
agent=story_flow_agent, # Pass the custom orchestrator agent
app_name=APP_NAME,
session_service=session_service
)
# --- Function to Interact with the Agent ---
def call_agent(user_input_topic: str):
"""
Sends a new topic to the agent (overwriting the initial one if needed)
and runs the workflow.
"""
current_session = session_service.get_session(app_name=APP_NAME,
user_id=USER_ID,
session_id=SESSION_ID)
if not current_session:
logger.error("Session not found!")
return
current_session.state["topic"] = user_input_topic
logger.info(f"Updated session state topic to: {user_input_topic}")
content = types.Content(role='user', parts=[types.Part(text=f"Generate a story about: {user_input_topic}")])
events = runner.run(user_id=USER_ID, session_id=SESSION_ID, new_message=content)
final_response = "No final response captured."
for event in events:
if event.is_final_response() and event.content and event.content.parts:
logger.info(f"Potential final response from [{event.author}]: {event.content.parts[0].text}")
final_response = event.content.parts[0].text
print("\n--- Agent Interaction Result ---")
print("Agent Final Response: ", final_response)
final_session = session_service.get_session(app_name=APP_NAME,
user_id=USER_ID,
session_id=SESSION_ID)
print("Final Session State:")
import json
print(json.dumps(final_session.state, indent=2))
print("-------------------------------\n")
# --- Run the Agent ---
call_agent("a lonely robot finding a friend in a junkyard")
// --- Function to Interact with the Agent ---
// Sends a new topic to the agent (overwriting the initial one if needed)
// and runs the workflow.
public static void runAgent(StoryFlowAgentExample agent, String userTopic) {
// --- Setup Runner and Session ---
InMemoryRunner runner = new InMemoryRunner(agent);
Map<String, Object> initialState = new HashMap<>();
initialState.put("topic", "a brave kitten exploring a haunted house");
Session session =
runner
.sessionService()
.createSession(APP_NAME, USER_ID, new ConcurrentHashMap<>(initialState), SESSION_ID)
.blockingGet();
logger.log(Level.INFO, () -> String.format("Initial session state: %s", session.state()));
session.state().put("topic", userTopic); // Update the state in the retrieved session
logger.log(Level.INFO, () -> String.format("Updated session state topic to: %s", userTopic));
Content userMessage = Content.fromParts(Part.fromText("Generate a story about: " + userTopic));
// Use the modified session object for the run
Flowable<Event> eventStream = runner.runAsync(USER_ID, session.id(), userMessage);
final String[] finalResponse = {"No final response captured."};
eventStream.blockingForEach(
event -> {
if (event.finalResponse() && event.content().isPresent()) {
String author = event.author() != null ? event.author() : "UNKNOWN_AUTHOR";
Optional<String> textOpt =
event
.content()
.flatMap(Content::parts)
.filter(parts -> !parts.isEmpty())
.map(parts -> parts.get(0).text().orElse(""));
logger.log(Level.INFO, () ->
String.format("Potential final response from [%s]: %s", author, textOpt.orElse("N/A")));
textOpt.ifPresent(text -> finalResponse[0] = text);
}
});
System.out.println("\n--- Agent Interaction Result ---");
System.out.println("Agent Final Response: " + finalResponse[0]);
// Retrieve session again to see the final state after the run
Session finalSession =
runner
.sessionService()
.getSession(APP_NAME, USER_ID, SESSION_ID, Optional.empty())
.blockingGet();
assert finalSession != null;
System.out.println("Final Session State:" + finalSession.state());
System.out.println("-------------------------------\n");
}
(注:完整的可运行代码,包括导入和执行逻辑,可以在下面链接中找到。)
完整代码示例¶
故事流智能体
# StoryFlowAgent 示例的完整可运行代码
import logging
from typing import AsyncGenerator
from typing_extensions import override
from google.adk.agents import LlmAgent, BaseAgent, LoopAgent, SequentialAgent
from google.adk.agents.invocation_context import InvocationContext
from google.genai import types
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.adk.events import Event
from pydantic import BaseModel, Field
# --- Constants ---
APP_NAME = "story_app"
USER_ID = "12345"
SESSION_ID = "123344"
GEMINI_2_FLASH = "gemini-2.0-flash"
# --- Configure Logging ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- Custom Orchestrator Agent ---
class StoryFlowAgent(BaseAgent):
"""
Custom agent for a story generation and refinement workflow.
This agent orchestrates a sequence of LLM agents to generate a story,
critique it, revise it, check grammar and tone, and potentially
regenerate the story if the tone is negative.
"""
# --- Field Declarations for Pydantic ---
# Declare the agents passed during initialization as class attributes with type hints
story_generator: LlmAgent
critic: LlmAgent
reviser: LlmAgent
grammar_check: LlmAgent
tone_check: LlmAgent
loop_agent: LoopAgent
sequential_agent: SequentialAgent
# model_config allows setting Pydantic configurations if needed, e.g., arbitrary_types_allowed
model_config = {"arbitrary_types_allowed": True}
def __init__(
self,
name: str,
story_generator: LlmAgent,
critic: LlmAgent,
reviser: LlmAgent,
grammar_check: LlmAgent,
tone_check: LlmAgent,
):
"""
Initializes the StoryFlowAgent.
Args:
name: The name of the agent.
story_generator: An LlmAgent to generate the initial story.
critic: An LlmAgent to critique the story.
reviser: An LlmAgent to revise the story based on criticism.
grammar_check: An LlmAgent to check the grammar.
tone_check: An LlmAgent to analyze the tone.
"""
# Create internal agents *before* calling super().__init__
loop_agent = LoopAgent(
name="CriticReviserLoop", sub_agents=[critic, reviser], max_iterations=2
)
sequential_agent = SequentialAgent(
name="PostProcessing", sub_agents=[grammar_check, tone_check]
)
# Define the sub_agents list for the framework
sub_agents_list = [
story_generator,
loop_agent,
sequential_agent,
]
# Pydantic will validate and assign them based on the class annotations.
super().__init__(
name=name,
story_generator=story_generator,
critic=critic,
reviser=reviser,
grammar_check=grammar_check,
tone_check=tone_check,
loop_agent=loop_agent,
sequential_agent=sequential_agent,
sub_agents=sub_agents_list, # Pass the sub_agents list directly
)
@override
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
"""
Implements the custom orchestration logic for the story workflow.
Uses the instance attributes assigned by Pydantic (e.g., self.story_generator).
"""
logger.info(f"[{self.name}] Starting story generation workflow.")
# 1. Initial Story Generation
logger.info(f"[{self.name}] Running StoryGenerator...")
async for event in self.story_generator.run_async(ctx):
logger.info(f"[{self.name}] Event from StoryGenerator: {event.model_dump_json(indent=2, exclude_none=True)}")
yield event
# Check if story was generated before proceeding
if "current_story" not in ctx.session.state or not ctx.session.state["current_story"]:
logger.error(f"[{self.name}] Failed to generate initial story. Aborting workflow.")
return # Stop processing if initial story failed
logger.info(f"[{self.name}] Story state after generator: {ctx.session.state.get('current_story')}")
# 2. Critic-Reviser Loop
logger.info(f"[{self.name}] Running CriticReviserLoop...")
# Use the loop_agent instance attribute assigned during init
async for event in self.loop_agent.run_async(ctx):
logger.info(f"[{self.name}] Event from CriticReviserLoop: {event.model_dump_json(indent=2, exclude_none=True)}")
yield event
logger.info(f"[{self.name}] Story state after loop: {ctx.session.state.get('current_story')}")
# 3. Sequential Post-Processing (Grammar and Tone Check)
logger.info(f"[{self.name}] Running PostProcessing...")
# Use the sequential_agent instance attribute assigned during init
async for event in self.sequential_agent.run_async(ctx):
logger.info(f"[{self.name}] Event from PostProcessing: {event.model_dump_json(indent=2, exclude_none=True)}")
yield event
# 4. Tone-Based Conditional Logic
tone_check_result = ctx.session.state.get("tone_check_result")
logger.info(f"[{self.name}] Tone check result: {tone_check_result}")
if tone_check_result == "negative":
logger.info(f"[{self.name}] Tone is negative. Regenerating story...")
async for event in self.story_generator.run_async(ctx):
logger.info(f"[{self.name}] Event from StoryGenerator (Regen): {event.model_dump_json(indent=2, exclude_none=True)}")
yield event
else:
logger.info(f"[{self.name}] Tone is not negative. Keeping current story.")
pass
logger.info(f"[{self.name}] Workflow finished.")
# --- Define the individual LLM agents ---
story_generator = LlmAgent(
name="StoryGenerator",
model=GEMINI_2_FLASH,
instruction="""You are a story writer. Write a short story (around 100 words) about a cat,
based on the topic provided in session state with key 'topic'""",
input_schema=None,
output_key="current_story", # Key for storing output in session state
)
critic = LlmAgent(
name="Critic",
model=GEMINI_2_FLASH,
instruction="""You are a story critic. Review the story provided in
session state with key 'current_story'. Provide 1-2 sentences of constructive criticism
on how to improve it. Focus on plot or character.""",
input_schema=None,
output_key="criticism", # Key for storing criticism in session state
)
reviser = LlmAgent(
name="Reviser",
model=GEMINI_2_FLASH,
instruction="""You are a story reviser. Revise the story provided in
session state with key 'current_story', based on the criticism in
session state with key 'criticism'. Output only the revised story.""",
input_schema=None,
output_key="current_story", # Overwrites the original story
)
grammar_check = LlmAgent(
name="GrammarCheck",
model=GEMINI_2_FLASH,
instruction="""You are a grammar checker. Check the grammar of the story
provided in session state with key 'current_story'. Output only the suggested
corrections as a list, or output 'Grammar is good!' if there are no errors.""",
input_schema=None,
output_key="grammar_suggestions",
)
tone_check = LlmAgent(
name="ToneCheck",
model=GEMINI_2_FLASH,
instruction="""You are a tone analyzer. Analyze the tone of the story
provided in session state with key 'current_story'. Output only one word: 'positive' if
the tone is generally positive, 'negative' if the tone is generally negative, or 'neutral'
otherwise.""",
input_schema=None,
output_key="tone_check_result", # This agent's output determines the conditional flow
)
# --- Create the custom agent instance ---
story_flow_agent = StoryFlowAgent(
name="StoryFlowAgent",
story_generator=story_generator,
critic=critic,
reviser=reviser,
grammar_check=grammar_check,
tone_check=tone_check,
)
# --- Setup Runner and Session ---
session_service = InMemorySessionService()
initial_state = {"topic": "a brave kitten exploring a haunted house"}
session = session_service.create_session(
app_name=APP_NAME,
user_id=USER_ID,
session_id=SESSION_ID,
state=initial_state # Pass initial state here
)
logger.info(f"Initial session state: {session.state}")
runner = Runner(
agent=story_flow_agent, # Pass the custom orchestrator agent
app_name=APP_NAME,
session_service=session_service
)
# --- Function to Interact with the Agent ---
def call_agent(user_input_topic: str):
"""
Sends a new topic to the agent (overwriting the initial one if needed)
and runs the workflow.
"""
current_session = session_service.get_session(app_name=APP_NAME,
user_id=USER_ID,
session_id=SESSION_ID)
if not current_session:
logger.error("Session not found!")
return
current_session.state["topic"] = user_input_topic
logger.info(f"Updated session state topic to: {user_input_topic}")
content = types.Content(role='user', parts=[types.Part(text=f"Generate a story about: {user_input_topic}")])
events = runner.run(user_id=USER_ID, session_id=SESSION_ID, new_message=content)
final_response = "No final response captured."
for event in events:
if event.is_final_response() and event.content and event.content.parts:
logger.info(f"Potential final response from [{event.author}]: {event.content.parts[0].text}")
final_response = event.content.parts[0].text
print("\n--- Agent Interaction Result ---")
print("Agent Final Response: ", final_response)
final_session = session_service.get_session(app_name=APP_NAME,
user_id=USER_ID,
session_id=SESSION_ID)
print("Final Session State:")
import json
print(json.dumps(final_session.state, indent=2))
print("-------------------------------\n")
# --- Run the Agent ---
call_agent("a lonely robot finding a friend in a junkyard")
# StoryFlowAgent 示例的完整可运行代码
import com.google.adk.agents.LlmAgent;
import com.google.adk.agents.BaseAgent;
import com.google.adk.agents.InvocationContext;
import com.google.adk.agents.LoopAgent;
import com.google.adk.agents.SequentialAgent;
import com.google.adk.events.Event;
import com.google.adk.runner.InMemoryRunner;
import com.google.adk.sessions.Session;
import com.google.genai.types.Content;
import com.google.genai.types.Part;
import io.reactivex.rxjava3.core.Flowable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
public class StoryFlowAgentExample extends BaseAgent {
// --- Constants ---
private static final String APP_NAME = "story_app";
private static final String USER_ID = "user_12345";
private static final String SESSION_ID = "session_123344";
private static final String MODEL_NAME = "gemini-2.0-flash"; // Ensure this model is available
private static final Logger logger = Logger.getLogger(StoryFlowAgentExample.class.getName());
private final LlmAgent storyGenerator;
private final LoopAgent loopAgent;
private final SequentialAgent sequentialAgent;
public StoryFlowAgentExample(
String name, LlmAgent storyGenerator, LoopAgent loopAgent, SequentialAgent sequentialAgent) {
super(
name,
"Orchestrates story generation, critique, revision, and checks.",
List.of(storyGenerator, loopAgent, sequentialAgent),
null,
null);
this.storyGenerator = storyGenerator;
this.loopAgent = loopAgent;
this.sequentialAgent = sequentialAgent;
}
public static void main(String[] args) {
// --- Define the individual LLM agents ---
LlmAgent storyGenerator =
LlmAgent.builder()
.name("StoryGenerator")
.model(MODEL_NAME)
.description("Generates the initial story.")
.instruction(
"""
You are a story writer. Write a short story (around 100 words) about a cat,
based on the topic provided in session state with key 'topic'
""")
.inputSchema(null)
.outputKey("current_story") // Key for storing output in session state
.build();
LlmAgent critic =
LlmAgent.builder()
.name("Critic")
.model(MODEL_NAME)
.description("Critiques the story.")
.instruction(
"""
You are a story critic. Review the story provided in
session state with key 'current_story'. Provide 1-2 sentences of constructive criticism
on how to improve it. Focus on plot or character.
""")
.inputSchema(null)
.outputKey("criticism") // Key for storing criticism in session state
.build();
LlmAgent reviser =
LlmAgent.builder()
.name("Reviser")
.model(MODEL_NAME)
.description("Revises the story based on criticism.")
.instruction(
"""
You are a story reviser. Revise the story provided in
session state with key 'current_story', based on the criticism in
session state with key 'criticism'. Output only the revised story.
""")
.inputSchema(null)
.outputKey("current_story") // Overwrites the original story
.build();
LlmAgent grammarCheck =
LlmAgent.builder()
.name("GrammarCheck")
.model(MODEL_NAME)
.description("Checks grammar and suggests corrections.")
.instruction(
"""
You are a grammar checker. Check the grammar of the story
provided in session state with key 'current_story'. Output only the suggested
corrections as a list, or output 'Grammar is good!' if there are no errors.
""")
.outputKey("grammar_suggestions")
.build();
LlmAgent toneCheck =
LlmAgent.builder()
.name("ToneCheck")
.model(MODEL_NAME)
.description("Analyzes the tone of the story.")
.instruction(
"""
You are a tone analyzer. Analyze the tone of the story
provided in session state with key 'current_story'. Output only one word: 'positive' if
the tone is generally positive, 'negative' if the tone is generally negative, or 'neutral'
otherwise.
""")
.outputKey("tone_check_result") // This agent's output determines the conditional flow
.build();
LoopAgent loopAgent =
LoopAgent.builder()
.name("CriticReviserLoop")
.description("Iteratively critiques and revises the story.")
.subAgents(critic, reviser)
.maxIterations(2)
.build();
SequentialAgent sequentialAgent =
SequentialAgent.builder()
.name("PostProcessing")
.description("Performs grammar and tone checks sequentially.")
.subAgents(grammarCheck, toneCheck)
.build();
StoryFlowAgentExample storyFlowAgentExample =
new StoryFlowAgentExample(APP_NAME, storyGenerator, loopAgent, sequentialAgent);
// --- Run the Agent ---
runAgent(storyFlowAgentExample, "a lonely robot finding a friend in a junkyard");
}
// --- Function to Interact with the Agent ---
// Sends a new topic to the agent (overwriting the initial one if needed)
// and runs the workflow.
public static void runAgent(StoryFlowAgentExample agent, String userTopic) {
// --- Setup Runner and Session ---
InMemoryRunner runner = new InMemoryRunner(agent);
Map<String, Object> initialState = new HashMap<>();
initialState.put("topic", "a brave kitten exploring a haunted house");
Session session =
runner
.sessionService()
.createSession(APP_NAME, USER_ID, new ConcurrentHashMap<>(initialState), SESSION_ID)
.blockingGet();
logger.log(Level.INFO, () -> String.format("Initial session state: %s", session.state()));
session.state().put("topic", userTopic); // Update the state in the retrieved session
logger.log(Level.INFO, () -> String.format("Updated session state topic to: %s", userTopic));
Content userMessage = Content.fromParts(Part.fromText("Generate a story about: " + userTopic));
// Use the modified session object for the run
Flowable<Event> eventStream = runner.runAsync(USER_ID, session.id(), userMessage);
final String[] finalResponse = {"No final response captured."};
eventStream.blockingForEach(
event -> {
if (event.finalResponse() && event.content().isPresent()) {
String author = event.author() != null ? event.author() : "UNKNOWN_AUTHOR";
Optional<String> textOpt =
event
.content()
.flatMap(Content::parts)
.filter(parts -> !parts.isEmpty())
.map(parts -> parts.get(0).text().orElse(""));
logger.log(Level.INFO, () ->
String.format("Potential final response from [%s]: %s", author, textOpt.orElse("N/A")));
textOpt.ifPresent(text -> finalResponse[0] = text);
}
});
System.out.println("\n--- Agent Interaction Result ---");
System.out.println("Agent Final Response: " + finalResponse[0]);
// Retrieve session again to see the final state after the run
Session finalSession =
runner
.sessionService()
.getSession(APP_NAME, USER_ID, SESSION_ID, Optional.empty())
.blockingGet();
assert finalSession != null;
System.out.println("Final Session State:" + finalSession.state());
System.out.println("-------------------------------\n");
}
private boolean isStoryGenerated(InvocationContext ctx) {
Object currentStoryObj = ctx.session().state().get("current_story");
return currentStoryObj != null && !String.valueOf(currentStoryObj).isEmpty();
}
@Override
protected Flowable<Event> runAsyncImpl(InvocationContext invocationContext) {
// Implements the custom orchestration logic for the story workflow.
// Uses the instance attributes assigned by Pydantic (e.g., self.story_generator).
logger.log(Level.INFO, () -> String.format("[%s] Starting story generation workflow.", name()));
// Stage 1. Initial Story Generation
Flowable<Event> storyGenFlow = runStage(storyGenerator, invocationContext, "StoryGenerator");
// Stage 2: Critic-Reviser Loop (runs after story generation completes)
Flowable<Event> criticReviserFlow = Flowable.defer(() -> {
if (!isStoryGenerated(invocationContext)) {
logger.log(Level.SEVERE,() ->
String.format("[%s] Failed to generate initial story. Aborting after StoryGenerator.",
name()));
return Flowable.empty(); // Stop further processing if no story
}
logger.log(Level.INFO, () ->
String.format("[%s] Story state after generator: %s",
name(), invocationContext.session().state().get("current_story")));
return runStage(loopAgent, invocationContext, "CriticReviserLoop");
});
// Stage 3: Post-Processing (runs after critic-reviser loop completes)
Flowable<Event> postProcessingFlow = Flowable.defer(() -> {
logger.log(Level.INFO, () ->
String.format("[%s] Story state after loop: %s",
name(), invocationContext.session().state().get("current_story")));
return runStage(sequentialAgent, invocationContext, "PostProcessing");
});
// Stage 4: Conditional Regeneration (runs after post-processing completes)
Flowable<Event> conditionalRegenFlow = Flowable.defer(() -> {
String toneCheckResult = (String) invocationContext.session().state().get("tone_check_result");
logger.log(Level.INFO, () -> String.format("[%s] Tone check result: %s", name(), toneCheckResult));
if ("negative".equalsIgnoreCase(toneCheckResult)) {
logger.log(Level.INFO, () ->
String.format("[%s] Tone is negative. Regenerating story...", name()));
return runStage(storyGenerator, invocationContext, "StoryGenerator (Regen)");
} else {
logger.log(Level.INFO, () ->
String.format("[%s] Tone is not negative. Keeping current story.", name()));
return Flowable.empty(); // No regeneration needed
}
});
return Flowable.concatArray(storyGenFlow, criticReviserFlow, postProcessingFlow, conditionalRegenFlow)
.doOnComplete(() -> logger.log(Level.INFO, () -> String.format("[%s] Workflow finished.", name())));
}
// Helper method for a single agent run stage with logging
private Flowable<Event> runStage(BaseAgent agentToRun, InvocationContext ctx, String stageName) {
logger.log(Level.INFO, () -> String.format("[%s] Running %s...", name(), stageName));
return agentToRun
.runAsync(ctx)
.doOnNext(event ->
logger.log(Level.INFO,() ->
String.format("[%s] Event from %s: %s", name(), stageName, event.toJson())))
.doOnError(err ->
logger.log(Level.SEVERE,
String.format("[%s] Error in %s", name(), stageName), err))
.doOnComplete(() ->
logger.log(Level.INFO, () ->
String.format("[%s] %s finished.", name(), stageName)));
}
@Override
protected Flowable<Event> runLiveImpl(InvocationContext invocationContext) {
return Flowable.error(new UnsupportedOperationException("runLive not implemented."));
}
}