多智能体工作流模式¶
Supported in ADKPython v0.1.0Typescript v0.2.0Go v0.1.0Java v0.1.0Kotlin v0.1.0
本指南提供了多种你可以使用 Agent Development Kit(ADK)实现的智能体模式,包括代码示例。这些模式适用于广泛的应用程序,在完全实现之前,你应根据项目需求对其进行评估和测试。
协调者与分发器¶
- 结构: 一个中央
LlmAgent(协调者)管理多个专门的sub_agents。 - 目标: 将传入的请求路由到适当的专业智能体。
- 使用的 ADK 原语:
- 层次结构: 协调者在
sub_agents中列出专业智能体。 - 交互: 主要使用 LLM 驱动的委派(需要在子智能体上有清晰的
description,以及在协调者上有适当的instruction)或显式调用(AgentTool)(协调者在其tools中包含AgentTool包装的专业智能体)。
- 层次结构: 协调者在
# 概念代码:使用 LLM 转移的协调器
from google.adk.agents import LlmAgent
billing_agent = LlmAgent(name="Billing", description="Handles billing inquiries.")
support_agent = LlmAgent(name="Support", description="Handles technical support requests.")
coordinator = LlmAgent(
name="HelpDeskCoordinator",
model="gemini-flash-latest",
instruction="路由用户请求:对于支付问题使用 Billing 智能体,对于技术问题使用 Support 智能体。",
description="主服务台路由。 ",
# 在 AutoFlow 中,对于子智能体,允许转移通常是隐式的
sub_agents=[billing_agent, support_agent]
)
# 用户问 "My payment failed" -> 协调器的 LLM 应调用 transfer_to_agent(agent_name='Billing')
# 用户问 "I can't log in" -> 协调器的 LLM 应调用 transfer_to_agent(agent_name='Support')
// 概念代码:使用 LLM 转移的协调器
import { LlmAgent } from '@google/adk';
const billingAgent = new LlmAgent({name: 'Billing', description: '处理账单查询。'});
const supportAgent = new LlmAgent({name: 'Support', description: '处理技术支持请求。'});
const coordinator = new LlmAgent({
name: 'HelpDeskCoordinator',
model: 'gemini-flash-latest',
instruction: '路由用户请求:对于支付问题使用 Billing 智能体,对于技术问题使用 Support 智能体。',
description: '主服务台路由。 ',
// 在 AutoFlow 中,对于子智能体,允许转移通常是隐式的
subAgents: [billingAgent, supportAgent]
});
// 用户问 "My payment failed" -> 协调器的 LLM 应调用 {functionCall: {name: 'transfer_to_agent', args: {agent_name: 'Billing'}}}
// 用户问 "I can't log in" -> 协调器的 LLM 应调用 {functionCall: {name: 'transfer_to_agent', args: {agent_name: 'Support'}}}
import (
"google.golang.org/adk/agent"
"google.golang.org/adk/agent/llmagent"
)
// Conceptual Code: Coordinator using LLM Transfer
billingAgent, _ := llmagent.New(llmagent.Config{Name: "Billing", Description: "Handles billing inquiries.", Model: m})
supportAgent, _ := llmagent.New(llmagent.Config{Name: "Support", Description: "Handles technical support requests.", Model: m})
coordinator, _ := llmagent.New(llmagent.Config{
Name: "HelpDeskCoordinator",
Model: m,
Instruction: "Route user requests: Use Billing agent for payment issues, Support agent for technical problems.",
Description: "Main help desk router.",
SubAgents: []agent.Agent{billingAgent, supportAgent},
})
// User asks "My payment failed" -> Coordinator's LLM should call transfer_to_agent(agent_name='Billing')
// User asks "I can't log in" -> Coordinator's LLM should call transfer_to_agent(agent_name='Support')
// 概念代码:使用 LLM 转移的协调器
import com.google.adk.agents.LlmAgent;
LlmAgent billingAgent = LlmAgent.builder()
.name("Billing")
.description("处理账单查询和支付问题。")
.build();
LlmAgent supportAgent = LlmAgent.builder()
.name("Support")
.description("处理技术支持请求和登录问题。")
.build();
LlmAgent coordinator = LlmAgent.builder()
.name("HelpDeskCoordinator")
.model("gemini-flash-latest")
.instruction("路由用户请求:对于支付问题使用 Billing 智能体,对于技术问题使用 Support 智能体。")
.description("主服务台路由。 ")
.subAgents(billingAgent, supportAgent)
// 在 Autoflow 中,对于子智能体,智能体转移是隐式的,除非使用
// .disallowTransferToParent 或 disallowTransferToPeers 指定
.build();
// 用户问 "My payment failed" -> 协调器的 LLM 应调用
// transferToAgent(agentName='Billing')
// 用户问 "I can't log in" -> 协调器的 LLM 应调用
// transferToAgent(agentName='Support')
val billingAgent =
LlmAgent(name = "Billing", model = model, description = "Handles billing inquiries.")
val supportAgent =
LlmAgent(
name = "Support",
model = model,
description = "Handles technical support requests.",
)
val helpDesk =
LlmAgent(
name = "HelpDeskCoordinator",
model = model,
instruction =
Instruction(
"Route user requests: Use Billing agent for payment issues, Support agent for technical problems.",
),
description = "Main help desk router.",
subAgents = listOf(billingAgent, supportAgent),
)
顺序流水线¶
- 结构: 一个
SequentialAgent包含按固定顺序执行的sub_agents。 - 目标: 实现一个多步骤流程,其中一步的输出作为下一步的输入。
- 使用的 ADK 原语:
- 工作流:
SequentialAgent定义顺序。 - 通信: 主要使用共享会话状态。较早的智能体写入结果(通常通过
output_key),较晚的智能体从context.state中读取这些结果。
- 工作流:
# 概念代码:顺序数据流水线
from google.adk.agents import SequentialAgent, LlmAgent
validator = LlmAgent(name="ValidateInput", instruction="验证输入。", output_key="validation_status")
processor = LlmAgent(name="ProcessData", instruction="如果 {validation_status} 为 'valid',则处理数据。", output_key="result")
reporter = LlmAgent(name="ReportResult", instruction="报告来自 {result} 的结果。")
data_pipeline = SequentialAgent(
name="DataPipeline",
sub_agents=[validator, processor, reporter]
)
# validator 运行 -> 保存到 state['validation_status']
# processor 运行 -> 读取 state['validation_status'],保存到 state['result']
# reporter 运行 -> 读取 state['result']
// 概念代码:顺序数据流水线
import { SequentialAgent, LlmAgent } from '@google/adk';
const validator = new LlmAgent({name: 'ValidateInput', instruction: '验证输入。', outputKey: 'validation_status'});
const processor = new LlmAgent({name: 'ProcessData', instruction: '如果 {validation_status} 为 "valid",则处理数据。', outputKey: 'result'});
const reporter = new LlmAgent({name: 'ReportResult', instruction: '报告来自 {result} 的结果。'});
const dataPipeline = new SequentialAgent({
name: 'DataPipeline',
subAgents: [validator, processor, reporter]
});
// validator 运行 -> 保存到 state['validation_status']
// processor 运行 -> 读取 state['validation_status'],保存到 state['result']
// reporter 运行 -> 读取 state['result']
import (
"google.golang.org/adk/agent"
"google.golang.org/adk/agent/llmagent"
"google.golang.org/adk/agent/workflowagents/sequentialagent"
)
// Conceptual Code: Sequential Data Pipeline
validator, _ := llmagent.New(llmagent.Config{Name: "ValidateInput", Instruction: "Validate the input.", OutputKey: "validation_status", Model: m})
processor, _ := llmagent.New(llmagent.Config{Name: "ProcessData", Instruction: "Process data if {validation_status} is 'valid'.", OutputKey: "result", Model: m})
reporter, _ := llmagent.New(llmagent.Config{Name: "ReportResult", Instruction: "Report the result from {result}.", Model: m})
dataPipeline, _ := sequentialagent.New(sequentialagent.Config{
AgentConfig: agent.Config{Name: "DataPipeline", SubAgents: []agent.Agent{validator, processor, reporter}},
})
// validator runs -> saves to state["validation_status"]
// processor runs -> reads state["validation_status"], saves to state["result"]
// reporter runs -> reads state["result"]
// 概念代码:顺序数据流水线
import com.google.adk.agents.SequentialAgent;
LlmAgent validator = LlmAgent.builder()
.name("ValidateInput")
.instruction("验证输入")
.outputKey("validation_status") // 将其主要文本输出保存到 session.state["validation_status"]
.build();
LlmAgent processor = LlmAgent.builder()
.name("ProcessData")
.instruction("如果 {validation_status} 为 'valid',则处理数据")
.outputKey("result") // 将其主要文本输出保存到 session.state["result"]
.build();
LlmAgent reporter = LlmAgent.builder()
.name("ReportResult")
.instruction("报告来自 {result} 的结果")
.build();
SequentialAgent dataPipeline = SequentialAgent.builder()
.name("DataPipeline")
.subAgents(validator, processor, reporter)
.build();
// validator 运行 -> 保存到 state['validation_status']
// processor 运行 -> 读取 state['validation_status'],保存到 state['result']
// reporter 运行 -> 读取 state['result']
val validator =
LlmAgent(
name = "ValidateInput",
model = model,
instruction = Instruction("Validate the input."),
)
val processor =
LlmAgent(
name = "ProcessData",
model = model,
instruction = Instruction("Process data if validation is successful."),
)
val reporter =
LlmAgent(
name = "ReportResult",
model = model,
instruction = Instruction("Report the result."),
)
val dataPipeline =
SequentialAgent(
name = "DataPipeline",
subAgents = listOf(validator, processor, reporter),
)
并行分发与汇总¶
- 结构: 一个
ParallelAgent并发运行多个sub_agents,通常后面跟一个(在SequentialAgent中的)智能体来汇总结果。 - 目标: 同时执行独立任务以降低延迟,然后合并它们的输出。
- 使用的 ADK 原语:
- 工作流:
ParallelAgent用于并发执行(分发)。通常嵌套在SequentialAgent中,以处理后续的汇总步骤(汇总)。 - 通信: 子智能体将结果写入共享会话状态中的不同键。后续的"汇总"智能体读取多个状态键。
- 工作流:
# 概念代码:并行信息收集
from google.adk.agents import SequentialAgent, ParallelAgent, LlmAgent
fetch_api1 = LlmAgent(name="API1Fetcher", instruction="从 API 1 获取数据。", output_key="api1_data")
fetch_api2 = LlmAgent(name="API2Fetcher", instruction="从 API 2 获取数据。", output_key="api2_data")
gather_concurrently = ParallelAgent(
name="ConcurrentFetch",
sub_agents=[fetch_api1, fetch_api2]
)
synthesizer = LlmAgent(
name="Synthesizer",
instruction="合并来自 {api1_data} 和 {api2_data} 的结果。"
)
overall_workflow = SequentialAgent(
name="FetchAndSynthesize",
sub_agents=[gather_concurrently, synthesizer] # 运行并行获取,然后合成
)
# fetch_api1 和 fetch_api2 并发运行,保存到 state。
# synthesizer 随后运行,读取 state['api1_data'] 和 state['api2_data']。
// 概念代码:并行信息收集
import { SequentialAgent, ParallelAgent, LlmAgent } from '@google/adk';
const fetchApi1 = new LlmAgent({name: 'API1Fetcher', instruction: '从 API 1 获取数据。', outputKey: 'api1_data'});
const fetchApi2 = new LlmAgent({name: 'API2Fetcher', instruction: '从 API 2 获取数据。', outputKey: 'api2_data'});
const gatherConcurrently = new ParallelAgent({
name: 'ConcurrentFetch',
subAgents: [fetchApi1, fetchApi2]
});
const synthesizer = new LlmAgent({
name: 'Synthesizer',
instruction: '合并来自 {api1_data} 和 {api2_data} 的结果。'
});
const overallWorkflow = new SequentialAgent({
name: 'FetchAndSynthesize',
subAgents: [gatherConcurrently, synthesizer] // 运行并行获取,然后合成
});
// fetchApi1 和 fetchApi2 并发运行,保存到 state。
// synthesizer 随后运行,读取 state['api1_data'] 和 state['api2_data']。
import (
"google.golang.org/adk/agent"
"google.golang.org/adk/agent/llmagent"
"google.golang.org/adk/agent/workflowagents/parallelagent"
"google.golang.org/adk/agent/workflowagents/sequentialagent"
)
// Conceptual Code: Parallel Information Gathering
fetchAPI1, _ := llmagent.New(llmagent.Config{Name: "API1Fetcher", Instruction: "Fetch data from API 1.", OutputKey: "api1_data", Model: m})
fetchAPI2, _ := llmagent.New(llmagent.Config{Name: "API2Fetcher", Instruction: "Fetch data from API 2.", OutputKey: "api2_data", Model: m})
gatherConcurrently, _ := parallelagent.New(parallelagent.Config{
AgentConfig: agent.Config{Name: "ConcurrentFetch", SubAgents: []agent.Agent{fetchAPI1, fetchAPI2}},
})
synthesizer, _ := llmagent.New(llmagent.Config{Name: "Synthesizer", Instruction: "Combine results from {api1_data} and {api2_data}.", Model: m})
overallWorkflow, _ := sequentialagent.New(sequentialagent.Config{
AgentConfig: agent.Config{Name: "FetchAndSynthesize", SubAgents: []agent.Agent{gatherConcurrently, synthesizer}},
})
// fetch_api1 and fetch_api2 run concurrently, saving to state.
// synthesizer runs afterwards, reading state["api1_data"] and state["api2_data"].
// 概念代码:并行信息收集
import com.google.adk.agents.LlmAgent;
import com.google.adk.agents.ParallelAgent;
import com.google.adk.agents.SequentialAgent;
LlmAgent fetchApi1 = LlmAgent.builder()
.name("API1Fetcher")
.instruction("从 API 1 获取数据。")
.outputKey("api1_data")
.build();
LlmAgent fetchApi2 = LlmAgent.builder()
.name("API2Fetcher")
.instruction("从 API 2 获取数据。")
.outputKey("api2_data")
.build();
ParallelAgent gatherConcurrently = ParallelAgent.builder()
.name("ConcurrentFetcher")
.subAgents(fetchApi2, fetchApi1)
.build();
LlmAgent synthesizer = LlmAgent.builder()
.name("Synthesizer")
.instruction("合并来自 {api1_data} 和 {api2_data} 的结果。")
.build();
SequentialAgent overallWorfklow = SequentialAgent.builder()
.name("FetchAndSynthesize") // 运行并行获取,然后合成
.subAgents(gatherConcurrently, synthesizer)
.build();
// fetch_api1 和 fetch_api2 并发运行,保存到 state。
// synthesizer 随后运行,读取 state['api1_data'] 和 state['api2_data']。
val fetchApi1 =
LlmAgent(
name = "API1Fetcher",
model = model,
instruction = Instruction("Fetch data from API 1."),
)
val fetchApi2 =
LlmAgent(
name = "API2Fetcher",
model = model,
instruction = Instruction("Fetch data from API 2."),
)
val gatherConcurrently =
ParallelAgent(
name = "ConcurrentFetch",
subAgents = listOf(fetchApi1, fetchApi2),
)
val synthesizer =
LlmAgent(
name = "Synthesizer",
model = model,
instruction = Instruction("Combine results from state."),
)
val overallWorkflow =
SequentialAgent(
name = "FetchAndSynthesize",
subAgents = listOf(gatherConcurrently, synthesizer),
)
分层任务分解¶
- 结构: 一个多层次的智能体树,其中高层智能体分解复杂目标并将子任务委派给低层智能体。
- 目标: 通过递归地将复杂问题分解为更简单、可执行的步骤来解决它们。
- 使用的 ADK 原语:
- 层次结构: 多层次
parent_agent/sub_agents结构。 - 交互: 主要是父智能体使用 LLM 驱动委派或 显式调用 (
AgentTool) 将任务分配给子智能体。结果通过工具响应或状态向上返回层次结构。
- 层次结构: 多层次
# 概念代码:分层研究任务
from google.adk.agents import LlmAgent
from google.adk.tools import agent_tool
# 低层类工具智能体
web_searcher = LlmAgent(name="WebSearch", description="执行网页搜索以获取事实。")
summarizer = LlmAgent(name="Summarizer", description="摘要文本。")
# 中层智能体组合工具
research_assistant = LlmAgent(
name="ResearchAssistant",
model="gemini-flash-latest",
description="查找并摘要关于某个主题的信息。",
tools=[agent_tool.AgentTool(agent=web_searcher), agent_tool.AgentTool(agent=summarizer)]
)
# 高层智能体委派研究
report_writer = LlmAgent(
name="ReportWriter",
model="gemini-flash-latest",
instruction="撰写关于主题 X 的报告。使用 ResearchAssistant 收集信息。",
tools=[agent_tool.AgentTool(agent=research_assistant)]
# 或者,如果 research_assistant 是 sub_agent,可以使用 LLM 转移
)
# 用户与 ReportWriter 交互。
# ReportWriter 调用 ResearchAssistant 工具。
# ResearchAssistant 调用 WebSearch 和 Summarizer 工具。
# 结果向上流动。
// 概念代码:分层研究任务
import { LlmAgent, AgentTool } from '@google/adk';
// 低层类工具智能体
const webSearcher = new LlmAgent({name: 'WebSearch', description: '执行网页搜索以获取事实。'});
const summarizer = new LlmAgent({name: 'Summarizer', description: '摘要文本。'});
// 中层智能体组合工具
const researchAssistant = new LlmAgent({
name: 'ResearchAssistant',
model: 'gemini-flash-latest',
description: '查找并摘要关于某个主题的信息。',
tools: [new AgentTool({agent: webSearcher}), new AgentTool({agent: summarizer})]
});
// 高层智能体委派研究
const reportWriter = new LlmAgent({
name: 'ReportWriter',
model: 'gemini-flash-latest',
instruction: '撰写关于主题 X 的报告。使用 ResearchAssistant 收集信息。',
tools: [new AgentTool({agent: researchAssistant})]
// 或者,如果 researchAssistant 是 subAgent,可以使用 LLM 转移
});
// 用户与 ReportWriter 交互。
// ReportWriter 调用 ResearchAssistant 工具。
// ResearchAssistant 调用 WebSearch 和 Summarizer 工具。
// 结果向上流动。
import (
"google.golang.org/adk/agent/llmagent"
"google.golang.org/adk/tool"
"google.golang.org/adk/tool/agenttool"
)
// Conceptual Code: Hierarchical Research Task
// Low-level tool-like agents
webSearcher, _ := llmagent.New(llmagent.Config{Name: "WebSearch", Description: "Performs web searches for facts.", Model: m})
summarizer, _ := llmagent.New(llmagent.Config{Name: "Summarizer", Description: "Summarizes text.", Model: m})
// Mid-level agent combining tools
webSearcherTool := agenttool.New(webSearcher, nil)
summarizerTool := agenttool.New(summarizer, nil)
researchAssistant, _ := llmagent.New(llmagent.Config{
Name: "ResearchAssistant",
Model: m,
Description: "Finds and summarizes information on a topic.",
Tools: []tool.Tool{webSearcherTool, summarizerTool},
})
// High-level agent delegating research
researchAssistantTool := agenttool.New(researchAssistant, nil)
reportWriter, _ := llmagent.New(llmagent.Config{
Name: "ReportWriter",
Model: m,
Instruction: "Write a report on topic X. Use the ResearchAssistant to gather information.",
Tools: []tool.Tool{researchAssistantTool},
})
// User interacts with ReportWriter.
// ReportWriter calls ResearchAssistant tool.
// ResearchAssistant calls WebSearch and Summarizer tools.
// Results flow back up.
// 概念代码:分层研究任务
import com.google.adk.agents.LlmAgent;
import com.google.adk.tools.AgentTool;
// 低层类工具智能体
LlmAgent webSearcher = LlmAgent.builder()
.name("WebSearch")
.description("执行网页搜索以获取事实。")
.build();
LlmAgent summarizer = LlmAgent.builder()
.name("Summarizer")
.description("摘要文本。")
.build();
// 中层智能体组合工具
LlmAgent researchAssistant = LlmAgent.builder()
.name("ResearchAssistant")
.model("gemini-flash-latest")
.description("查找并摘要关于某个主题的信息。")
.tools(AgentTool.create(webSearcher), AgentTool.create(summarizer))
.build();
// 高层智能体委派研究
LlmAgent reportWriter = LlmAgent.builder()
.name("ReportWriter")
.model("gemini-flash-latest")
.instruction("撰写关于主题 X 的报告。使用 ResearchAssistant 收集信息。")
.tools(AgentTool.create(researchAssistant))
// 或者,如果 research_assistant 是 subAgent,可以使用 LLM 转移
.build();
// 用户与 ReportWriter 交互。
// ReportWriter 调用 ResearchAssistant 工具。
// ResearchAssistant 调用 WebSearch 和 Summarizer 工具。
// 结果向上流动。
val webSearcher =
LlmAgent(
name = "WebSearch",
model = model,
description = "Performs web searches for facts.",
)
val summarizer = LlmAgent(name = "Summarizer", model = model, description = "Summarizes text.")
val researchAssistant =
LlmAgent(
name = "ResearchAssistant",
model = model,
description = "Finds and summarizes information on a topic.",
subAgents = listOf(webSearcher, summarizer),
)
val reportWriter =
LlmAgent(
name = "ReportWriter",
model = model,
instruction =
Instruction(
"Write a report on topic X. Use the ResearchAssistant to gather information.",
),
subAgents = listOf(researchAssistant),
)
生成与审查模式¶
- 结构: 通常涉及
SequentialAgent内的两个智能体:一个生成器智能体和一个批评者审查智能体。 - 目标: 通过让专门的智能体审查生成的输出,提高其质量或有效性。
- 使用的 ADK 原语:
- 工作流:
SequentialAgent确保生成在审查之前发生。 - 通信: 共享会话状态(生成器使用
output_key保存输出;审查者读取该状态键)。审查者可能将其反馈保存到另一个状态键,供后续步骤使用。
- 工作流:
# 概念示例:Generator-Critic
from google.adk.agents import SequentialAgent, LlmAgent
generator = LlmAgent(
name="DraftWriter",
instruction="撰写关于主题 X 的简短段落。",
output_key="draft_text"
)
reviewer = LlmAgent(
name="FactChecker",
instruction="审查 {draft_text} 中的文本以确保事实准确性。输出 'valid' 或 'invalid' 并说明理由。",
output_key="review_status"
)
# 可选:根据 review_status 进行进一步操作
review_pipeline = SequentialAgent(
name="WriteAndReview",
sub_agents=[generator, reviewer]
)
# generator 运行 -> 将草稿保存到 state['draft_text']
# reviewer 运行 -> 读取 state['draft_text'], 将状态保存到 state['review_status']
// 概念代码:生成器-批评者
import { SequentialAgent, LlmAgent } from '@google/adk';
const generator = new LlmAgent({
name: 'DraftWriter',
instruction: '撰写关于主题 X 的简短段落。',
outputKey: 'draft_text'
});
const reviewer = new LlmAgent({
name: 'FactChecker',
instruction: '审查 {draft_text} 中的文本以确保事实准确性。输出 "valid" 或 "invalid" 并说明理由。',
outputKey: 'review_status'
});
// 可选:根据 review_status 进行进一步操作
const reviewPipeline = new SequentialAgent({
name: 'WriteAndReview',
subAgents: [generator, reviewer]
});
// generator 运行 -> 将草稿保存到 state['draft_text']
// reviewer 运行 -> 读取 state['draft_text'], 将状态保存到 state['review_status']
import (
"google.golang.org/adk/agent"
"google.golang.org/adk/agent/llmagent"
"google.golang.org/adk/agent/workflowagents/sequentialagent"
)
// Conceptual Code: Generator-Critic
generator, _ := llmagent.New(llmagent.Config{
Name: "DraftWriter",
Instruction: "Write a short paragraph about subject X.",
OutputKey: "draft_text",
Model: m,
})
reviewer, _ := llmagent.New(llmagent.Config{
Name: "FactChecker",
Instruction: "Review the text in {draft_text} for factual accuracy. Output 'valid' or 'invalid' with reasons.",
OutputKey: "review_status",
Model: m,
})
reviewPipeline, _ := sequentialagent.New(sequentialagent.Config{
AgentConfig: agent.Config{Name: "WriteAndReview", SubAgents: []agent.Agent{generator, reviewer}},
})
// generator runs -> saves draft to state["draft_text"]
// reviewer runs -> reads state["draft_text"], saves status to state["review_status"]
// 概念示例:Generator-Critic
import com.google.adk.agents.LlmAgent;
import com.google.adk.agents.SequentialAgent;
LlmAgent generator = LlmAgent.builder()
.name("DraftWriter")
.instruction("撰写关于主题 X 的简短段落。")
.outputKey("draft_text")
.build();
LlmAgent reviewer = LlmAgent.builder()
.name("FactChecker")
.instruction("审查 {draft_text} 中的文本以确保事实准确性。输出 'valid' 或 'invalid' 并说明理由。")
.outputKey("review_status")
.build();
// 可选:根据 review_status 进行进一步操作
SequentialAgent reviewPipeline = SequentialAgent.builder()
.name("WriteAndReview")
.subAgents(generator, reviewer)
.build();
// generator 运行 -> 将草稿保存到 state['draft_text']
// reviewer 运行 -> 读取 state['draft_text'], 将状态保存到 state['review_status']
val generator =
LlmAgent(
name = "DraftWriter",
model = model,
instruction = Instruction("Write a short paragraph about subject X."),
)
val reviewer =
LlmAgent(
name = "FactChecker",
model = model,
instruction =
Instruction(
"Review the generated text for factual accuracy. Output 'valid' or 'invalid' with reasons.",
),
)
val reviewPipeline =
SequentialAgent(
name = "WriteAndReview",
subAgents = listOf(generator, reviewer),
)
迭代优化¶
- 结构: 使用包含一个或多个智能体的
LoopAgent,这些智能体在多次迭代中处理任务。 - 目标: 逐步改进存储在会话状态中的结果(例如,代码、文本、计划),直到达到质量阈值或达到最大迭代次数。
- 使用的 ADK 原语:
- 工作流:
LoopAgent管理重复。 - 通信: 共享会话状态对于智能体读取前一次迭代的输出并保存优化后的版本至关重要。
- 终止: 循环通常基于
max_iterations结束,或者当结果令人满意时,由专门的检查智能体在Event Actions中设置escalate=True来结束。
- 工作流:
# 概念示例:Iterative Code Refinement
from google.adk.agents import LoopAgent, LlmAgent, BaseAgent
from google.adk.events import Event, EventActions
from google.adk.agents.invocation_context import InvocationContext
from typing import AsyncGenerator
# 根据 state['current_code'] 和 state['requirements'] 生成/优化代码的智能体
code_refiner = LlmAgent(
name="CodeRefiner",
instruction="读取 state['current_code'](如果存在)和 state['requirements']。生成/优化 Python 代码以满足要求。保存到 state['current_code']。",
output_key="current_code" # 每次覆盖 state 中的代码
)
# 检查代码是否符合质量标准的智能体
quality_checker = LlmAgent(
name="QualityChecker",
instruction="根据 state['requirements'] 评估 state['current_code'] 中的代码。输出 'pass' 或 'fail'。",
output_key="quality_status"
)
# 用于检查状态并在通过时升级的自定义智能体
class CheckStatusAndEscalate(BaseAgent):
async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event, None]:
status = ctx.session.state.get("quality_status", "fail")
should_stop = (status == "pass")
yield Event(author=self.name, actions=EventActions(escalate=should_stop))
refinement_loop = LoopAgent(
name="CodeRefinementLoop",
max_iterations=5,
sub_agents=[code_refiner, quality_checker, CheckStatusAndEscalate(name="StopChecker")]
)
# 循环运行:优化器 -> 检查器 -> 停止检查器
# state['current_code'] 在每次迭代中更新。
# 如果 QualityChecker 输出 'pass'(导致 StopChecker 升级)或在 5 次迭代后,循环停止。
// 概念代码:迭代代码优化
import { LoopAgent, LlmAgent, BaseAgent, InvocationContext } from '@google/adk';
import type { Event, createEvent, createEventActions } from '@google/genai';
// 根据 state['current_code'] 和 state['requirements'] 生成/优化代码的智能体
const codeRefiner = new LlmAgent({
name: 'CodeRefiner',
instruction: '读取 state["current_code"](如果存在)和 state["requirements"]。生成/优化 TypeScript 代码以满足要求。保存到 state["current_code"]。',
outputKey: 'current_code' // 覆盖 state 中之前的代码
});
// 检查代码是否符合质量标准的智能体
const qualityChecker = new LlmAgent({
name: 'QualityChecker',
instruction: '根据 state["requirements"] 评估 state["current_code"] 中的代码。输出 "pass" 或 "fail"。',
outputKey: 'quality_status'
});
// 用于检查状态并在通过时升级的自定义智能体
class CheckStatusAndEscalate extends BaseAgent {
async *runAsyncImpl(ctx: InvocationContext): AsyncGenerator<Event> {
const status = ctx.session.state.quality_status;
const shouldStop = status === 'pass';
if (shouldStop) {
yield createEvent({
author: 'StopChecker',
actions: createEventActions(),
});
}
}
async *runLiveImpl(ctx: InvocationContext): AsyncGenerator<Event> {
// 此智能体没有实时实现
yield createEvent({ author: 'StopChecker' });
}
}
// 循环运行:优化器 -> 检查器 -> 停止检查器
// state['current_code'] 在每次迭代中更新。
// 如果 QualityChecker 输出 'pass'(导致 StopChecker 升级)或在 5 次迭代后,循环停止。
const refinementLoop = new LoopAgent({
name: 'CodeRefinementLoop',
maxIterations: 5,
subAgents: [codeRefiner, qualityChecker, new CheckStatusAndEscalate({name: 'StopChecker'})]
});
import (
"iter"
"google.golang.org/adk/agent"
"google.golang.org/adk/agent/llmagent"
"google.golang.org/adk/agent/workflowagents/loopagent"
"google.golang.org/adk/session"
)
// Conceptual Code: Iterative Code Refinement
codeRefiner, _ := llmagent.New(llmagent.Config{
Name: "CodeRefiner",
Instruction: "Read state['current_code'] (if exists) and state['requirements']. Generate/refine Python code to meet requirements. Save to state['current_code'].",
OutputKey: "current_code",
Model: m,
})
qualityChecker, _ := llmagent.New(llmagent.Config{
Name: "QualityChecker",
Instruction: "Evaluate the code in state['current_code'] against state['requirements']. Output 'pass' or 'fail'.",
OutputKey: "quality_status",
Model: m,
})
checkStatusAndEscalate, _ := agent.New(agent.Config{
Name: "StopChecker",
Run: func(ctx agent.InvocationContext) iter.Seq2[*session.Event, error] {
return func(yield func(*session.Event, error) bool) {
status, _ := ctx.Session().State().Get("quality_status")
shouldStop := status == "pass"
yield(&session.Event{Author: "StopChecker", Actions: session.EventActions{Escalate: shouldStop}}, nil)
}
},
})
refinementLoop, _ := loopagent.New(loopagent.Config{
MaxIterations: 5,
AgentConfig: agent.Config{Name: "CodeRefinementLoop", SubAgents: []agent.Agent{codeRefiner, qualityChecker, checkStatusAndEscalate}},
})
// Loop runs: Refiner -> Checker -> StopChecker
// State["current_code"] is updated each iteration.
// Loop stops if QualityChecker outputs 'pass' (leading to StopChecker escalating) or after 5 iterations.
// 概念示例:Iterative Code Refinement
import com.google.adk.agents.BaseAgent;
import com.google.adk.agents.LlmAgent;
import com.google.adk.agents.LoopAgent;
import com.google.adk.events.Event;
import com.google.adk.events.EventActions;
import com.google.adk.agents.InvocationContext;
import io.reactivex.rxjava3.core.Flowable;
import java.util.List;
// 根据 state['current_code'] 和 state['requirements'] 生成/优化代码的智能体
LlmAgent codeRefiner = LlmAgent.builder()
.name("CodeRefiner")
.instruction("读取 state['current_code'](如果存在)和 state['requirements']。生成/优化 Java 代码以满足要求。保存到 state['current_code']。")
.outputKey("current_code") // 每次覆盖 state 中的代码
.build();
// 检查代码是否符合质量标准的智能体
LlmAgent qualityChecker = LlmAgent.builder()
.name("QualityChecker")
.instruction("根据 state['requirements'] 评估 state['current_code'] 中的代码。输出 'pass' 或 'fail'。")
.outputKey("quality_status")
.build();
BaseAgent checkStatusAndEscalate = new BaseAgent(
"StopChecker","Checks quality_status and escalates if 'pass'.", List.of(), null, null) {
@Override
protected Flowable<Event> runAsyncImpl(InvocationContext invocationContext) {
String status = (String) invocationContext.session().state().getOrDefault("quality_status", "fail");
boolean shouldStop = "pass".equals(status);
EventActions actions = EventActions.builder().escalate(shouldStop).build();
Event event = Event.builder()
.author(this.name())
.actions(actions)
.build();
return Flowable.just(event);
}
};
LoopAgent refinementLoop = LoopAgent.builder()
.name("CodeRefinementLoop")
.maxIterations(5)
.subAgents(codeRefiner, qualityChecker, checkStatusAndEscalate)
.build();
// 循环运行:优化器 -> 检查器 -> 停止检查器
// state['current_code'] 在每次迭代中更新。
// 如果 QualityChecker 输出 'pass'(导致 StopChecker 升级)或在 5 次迭代后停止。
val codeRefiner =
LlmAgent(
name = "CodeRefiner",
model = model,
instruction =
Instruction(
"Read current code (if exists) and requirements from state. Generate/refine Kotlin code to meet requirements.",
),
)
val qualityChecker =
LlmAgent(
name = "QualityChecker",
model = model,
instruction =
Instruction(
"Evaluate the code in state against requirements. Output 'pass' or 'fail'.",
),
)
val stopChecker = CheckConditionAgent(name = "StopChecker") // Checks quality_status
val refinementLoop =
LoopAgent(
name = "CodeRefinementLoop",
maxIterations = 5,
subAgents = listOf(codeRefiner, qualityChecker, stopChecker),
)
Human-in-the-loop¶
- 结构: 在智能体工作流中集成人类干预点。
- 目标: 允许人类监督、审批、纠正或执行 AI 无法完成的任务。
- 使用的 ADK 原语 (概念):
- 交互: 可以使用自定义工具实现,该工具暂停执行并向外部系统 (例如,UI、工单系统) 发送请求,等待人类输入。然后工具将人类的响应返回给智能体。
- 工作流: 可以使用 LLM 驱动委派(
transfer_to_agent) 针对概念上的“人类智能体”来触发外部工作流,或在LlmAgent内使用自定义工具。 - 状态/回调: 状态可以保存人类的任务详情;回调可以管理交互流程。
- 注意: ADK 没有内置的“人类智能体”类型,因此这需要自定义集成。
# 概念示例:使用工具进行人工审批
from google.adk.agents import LlmAgent, SequentialAgent
from google.adk.tools import FunctionTool
# --- 假设 external_approval_tool 已存在 ---
# 此工具将:
# 1. 获取详细信息(例如 request_id、金额、原因)。
# 2. 将这些详情发送到人工评审系统(例如通过 API)。
# 3. 轮询或等待人工响应(已批准/已拒绝)。
# 4. 返回人工决策。
# async def external_approval_tool(amount: float, reason: str) -> str: ...
approval_tool = FunctionTool(func=external_approval_tool)
# 准备请求的智能体
prepare_request = LlmAgent(
name="PrepareApproval",
instruction="根据用户输入准备审批请求详情。将金额和原因存储在状态中。",
# ... 可能设置 state['approval_amount'] 和 state['approval_reason'] ...
)
# 调用人工审批工具的智能体
request_approval = LlmAgent(
name="RequestHumanApproval",
instruction="使用 external_approval_tool,金额来自 state['approval_amount'],原因来自 state['approval_reason']。",
tools=[approval_tool],
output_key="human_decision"
)
# 根据人工决策继续进行的智能体
process_decision = LlmAgent(
name="ProcessDecision",
instruction="检查 {human_decision}。如果是 'approved',继续。如果是 'rejected',通知用户。"
)
approval_workflow = SequentialAgent(
name="HumanApprovalWorkflow",
sub_agents=[prepare_request, request_approval, process_decision]
)
// 概念代码:使用人工审批工具
import { LlmAgent, SequentialAgent, FunctionTool } from '@google/adk';
import { z } from 'zod';
// --- 假设 externalApprovalTool 已存在 ---
// 此工具将:
// 1. 获取详细信息(例如 request_id、金额、原因)。
// 2. 将这些详情发送到人工评审系统(例如通过 API)。
// 3. 轮询或等待人工响应(已批准/已拒绝)。
// 4. 返回人工决策。
async function externalApprovalTool(params: {amount: number, reason: string}): Promise<{decision: string}> {
// ... 调用外部系统的实现
return {decision: 'approved'}; // 或 'rejected'
}
const approvalTool = new FunctionTool({
name: 'external_approval_tool',
description: 'Sends a request for human approval.',
parameters: z.object({
amount: z.number(),
reason: z.string(),
}),
execute: externalApprovalTool,
});
// 准备请求的智能体
const prepareRequest = new LlmAgent({
name: 'PrepareApproval',
instruction: '根据用户输入准备审批请求详情。将金额和原因存储在状态中。',
// ... 可能设置 state['approval_amount'] 和 state['approval_reason'] ...
});
// 调用人工审批工具的智能体
const requestApproval = new LlmAgent({
name: 'RequestHumanApproval',
instruction: '使用 external_approval_tool,金额来自 state["approval_amount"],原因来自 state["approval_reason"]。',
tools: [approvalTool],
outputKey: 'human_decision'
});
// 根据人工决策继续进行的智能体
const processDecision = new LlmAgent({
name: 'ProcessDecision',
instruction: '检查 {human_decision}。如果是 "approved",继续。如果是 "rejected",通知用户。'
});
const approvalWorkflow = new SequentialAgent({
name: 'HumanApprovalWorkflow',
subAgents: [prepareRequest, requestApproval, processDecision]
});
import (
"google.golang.org/adk/agent"
"google.golang.org/adk/agent/llmagent"
"google.golang.org/adk/agent/workflowagents/sequentialagent"
"google.golang.org/adk/tool"
)
// Conceptual Code: Using a Tool for Human Approval
// --- Assume externalApprovalTool exists ---
// func externalApprovalTool(amount float64, reason string) (string, error) { ... }
type externalApprovalToolArgs struct {
Amount float64 `json:"amount" jsonschema:"The amount for which approval is requested."`
Reason string `json:"reason" jsonschema:"The reason for the approval request."`
}
var externalApprovalTool func(tool.Context, externalApprovalToolArgs) (string, error)
approvalTool, _ := functiontool.New(
functiontool.Config{
Name: "external_approval_tool",
Description: "Sends a request for human approval.",
},
externalApprovalTool,
)
prepareRequest, _ := llmagent.New(llmagent.Config{
Name: "PrepareApproval",
Instruction: "Prepare the approval request details based on user input. Store amount and reason in state.",
Model: m,
})
requestApproval, _ := llmagent.New(llmagent.Config{
Name: "RequestHumanApproval",
Instruction: "Use the external_approval_tool with amount from state['approval_amount'] and reason from state['approval_reason'].",
Tools: []tool.Tool{approvalTool},
OutputKey: "human_decision",
Model: m,
})
processDecision, _ := llmagent.New(llmagent.Config{
Name: "ProcessDecision",
Instruction: "Check {human_decision}. If 'approved', proceed. If 'rejected', inform user.",
Model: m,
})
approvalWorkflow, _ := sequentialagent.New(sequentialagent.Config{
AgentConfig: agent.Config{Name: "HumanApprovalWorkflow", SubAgents: []agent.Agent{prepareRequest, requestApproval, processDecision}},
})
// 概念示例:使用工具进行人工审批
import com.google.adk.agents.LlmAgent;
import com.google.adk.agents.SequentialAgent;
import com.google.adk.tools.FunctionTool;
// --- 假设 external_approval_tool 存在 ---
// 此工具将:
// 1. 接收详细信息(例如,request_id、amount、reason)。
// 2. 将这些详细信息发送到人工审核系统(例如,通过 API)。
// 3. 轮询或等待人工响应(批准/拒绝)。
// 4. 返回人工的决定。
// public boolean externalApprovalTool(float amount, String reason) { ... }
FunctionTool approvalTool = FunctionTool.create(externalApprovalTool);
// 准备请求的智能体
LlmAgent prepareRequest = LlmAgent.builder()
.name("PrepareApproval")
.instruction("根据用户输入准备审批请求详细信息。将金额和原因存储在状态中。")
// ... 可能设置 state['approval_amount'] 和 state['approval_reason'] ...
.build();
// 调用人工审批工具的智能体
LlmAgent requestApproval = LlmAgent.builder()
.name("RequestHumanApproval")
.instruction("使用 external_approval_tool,从 state['approval_amount'] 获取金额,从 state['approval_reason'] 获取原因。")
.tools(approvalTool)
.outputKey("human_decision")
.build();
// 根据人工决定继续的智能体
LlmAgent processDecision = LlmAgent.builder()
.name("ProcessDecision")
.instruction("检查 {human_decision}。如果是 'approved',则继续。如果是 'rejected',则通知用户。")
.build();
SequentialAgent approvalWorkflow = SequentialAgent.builder()
.name("HumanApprovalWorkflow")
.subAgents(prepareRequest, requestApproval, processDecision)
.build();
class ExternalApprovalTool : BaseTool(
"external_approval_tool",
"Sends a request for human approval.",
) {
override fun declaration(): FunctionDeclaration =
FunctionDeclaration(
"external_approval_tool",
"Sends a request for human approval.",
)
override suspend fun run(
context: ToolContext,
args: Map<String, Any>,
): Any {
// Simulate calling external system (e.g., UI, ticketing system)
// In a real app, this might poll for a result or wait for a webhook.
return mapOf("decision" to "approved")
}
}
结合策略的人机协作¶
实现人机协作的一种更高级和结构化的方法是使用 PolicyEngine。这种方法允许你定义策略,可以在执行工具之前触发用户的确认步骤。SecurityPlugin 拦截工具调用,咨询 PolicyEngine,如果策略规定,它将自动请求用户确认。这种模式对于执行治理和安全规则更加稳健。
工作原理如下:
SecurityPlugin:你将此插件添加到你的Runner。它充当所有工具调用的拦截器。BasePolicyEngine:你创建一个实现此接口的自定义类。其evaluate()方法包含你的逻辑,用于决定工具调用是否需要确认。PolicyOutcome.CONFIRM:当你的evaluate()方法返回此结果时,SecurityPlugin暂停工具执行并使用getAskUserConfirmationFunctionCalls生成一个特殊的FunctionCall。- 应用程序处理:你的应用程序代码接收此特殊函数调用并向用户呈现确认请求。
- 用户确认:一旦用户确认,你的应用程序将
FunctionResponse发送回智能体,这允许SecurityPlugin继续执行原始工具。
TypeScript 推荐模式
基于策略的模式是在 TypeScript 中实现人机协作工作流的推荐方法。其他 ADK 语言的支持计划在未来版本中提供。
下面显示了使用 CustomPolicyEngine 在执行任何工具之前要求用户确认的概念示例。
const rootAgent = new LlmAgent({
name: 'weather_time_agent',
model: 'gemini-flash-latest',
description:
'回答有关城市时间和天气问题的智能体。',
instruction:
'你是一个有用的智能体,可以回答用户关于城市时间和天气的问题。',
tools: [getWeatherTool],
});
class CustomPolicyEngine implements BasePolicyEngine {
async evaluate(_context: ToolCallPolicyContext): Promise<PolicyCheckResult> {
// 默认宽松实现
return Promise.resolve({
outcome: PolicyOutcome.CONFIRM,
reason: '需要确认工具调用',
});
}
}
const runner = new InMemoryRunner({
agent: rootAgent,
appName,
plugins: [new SecurityPlugin({policyEngine: new CustomPolicyEngine()})]
});
你可以在此处找到完整的代码示例:此处。