并行智能体¶
ParallelAgent
¶
ParallelAgent
是一种工作流智能体,它并发执行其子智能体。这极大地加速了可以独立执行任务的工作流。
使用 ParallelAgent
的场景:对于优先考虑速度并涉及独立、资源密集型任务的场景,ParallelAgent
可以实现高效的并行执行。当子智能体没有依赖关系时,它们的任务可以并发执行,显著减少整体处理时间。
与其他工作流智能体一样,ParallelAgent
不是由 LLM 驱动的,因此它的执行方式是确定性的。也就是说,工作流智能体只关注它们的执行方式(即并行),而不是它们的内部逻辑;工作流智能体的工具或子智能体可能使用也可能不使用 LLM。
示例¶
这种方法对于多源数据检索或大量计算等操作特别有益,因为并行化能带来实质性的性能提升。重要的是,这种策略假设并发执行的智能体之间不需要共享状态或直接交换信息。
工作原理¶
当调用 ParallelAgent
的 run_async()
方法时:
- 并发执行: 它并发地启动
sub_agents
列表中每个子智能体的run_async()
方法。这意味着所有智能体(几乎)同时开始运行。 - 独立分支: 每个子智能体在自己的执行分支中运行。在执行过程中,这些分支之间没有自动共享对话历史或状态。
- 结果收集:
ParallelAgent
管理并行执行,并通常提供一种方式在所有子智能体完成后访问它们的结果(例如,通过结果或事件列表)。结果的顺序可能不是确定性的。
独立执行和状态管理¶
理解 ParallelAgent
中的子智能体独立运行是至关重要的。如果你需要这些智能体之间的通信或数据共享,你必须明确实现它。可能的方法包括:
- 共享
InvocationContext
: 你可以向每个子智能体传递一个共享的InvocationContext
对象。这个对象可以作为共享数据存储。但是,你需要小心管理对这个共享上下文的并发访问(例如,使用锁)以避免竞态条件。 - 外部状态管理: 使用外部数据库、消息队列或其他机制来管理共享状态并促进智能体之间的通信。
- 后处理: 收集每个分支的结果,然后实现逻辑来协调数据。
完整示例:并行网络研究¶
想象同时研究多个主题:
- 研究智能体 1: 一个研究"可再生能源"的
LlmAgent
。 - 研究智能体 2: 一个研究"电动汽车技术"的
LlmAgent
。 -
研究智能体 3: 一个研究"碳捕获方法"的
LlmAgent
。
这些研究任务是独立的。使用 ParallelAgent
允许它们并发运行,与按顺序运行相比,可能显著减少总研究时间。每个智能体的结果将在它们完成后单独收集。
完整代码
# Part of agent.py --> Follow https://google.github.io/adk-docs/get-started/quickstart/ to learn the setup
# --- 1. Define Researcher Sub-Agents (to run in parallel) ---
# Researcher 1: Renewable Energy
researcher_agent_1 = LlmAgent(
name="RenewableEnergyResearcher",
model=GEMINI_MODEL,
instruction="""You are an AI Research Assistant specializing in energy.
Research the latest advancements in 'renewable energy sources'.
Use the Google Search tool provided.
Summarize your key findings concisely (1-2 sentences).
Output *only* the summary.
""",
description="Researches renewable energy sources.",
tools=[google_search],
# Store result in state for the merger agent
output_key="renewable_energy_result"
)
# Researcher 2: Electric Vehicles
researcher_agent_2 = LlmAgent(
name="EVResearcher",
model=GEMINI_MODEL,
instruction="""You are an AI Research Assistant specializing in transportation.
Research the latest developments in 'electric vehicle technology'.
Use the Google Search tool provided.
Summarize your key findings concisely (1-2 sentences).
Output *only* the summary.
""",
description="Researches electric vehicle technology.",
tools=[google_search],
# Store result in state for the merger agent
output_key="ev_technology_result"
)
# Researcher 3: Carbon Capture
researcher_agent_3 = LlmAgent(
name="CarbonCaptureResearcher",
model=GEMINI_MODEL,
instruction="""You are an AI Research Assistant specializing in climate solutions.
Research the current state of 'carbon capture methods'.
Use the Google Search tool provided.
Summarize your key findings concisely (1-2 sentences).
Output *only* the summary.
""",
description="Researches carbon capture methods.",
tools=[google_search],
# Store result in state for the merger agent
output_key="carbon_capture_result"
)
# --- 2. Create the ParallelAgent (Runs researchers concurrently) ---
# This agent orchestrates the concurrent execution of the researchers.
# It finishes once all researchers have completed and stored their results in state.
parallel_research_agent = ParallelAgent(
name="ParallelWebResearchAgent",
sub_agents=[researcher_agent_1, researcher_agent_2, researcher_agent_3],
description="Runs multiple research agents in parallel to gather information."
)
# --- 3. Define the Merger Agent (Runs *after* the parallel agents) ---
# This agent takes the results stored in the session state by the parallel agents
# and synthesizes them into a single, structured response with attributions.
merger_agent = LlmAgent(
name="SynthesisAgent",
model=GEMINI_MODEL, # Or potentially a more powerful model if needed for synthesis
instruction="""You are an AI Assistant responsible for combining research findings into a structured report.
Your primary task is to synthesize the following research summaries, clearly attributing findings to their source areas. Structure your response using headings for each topic. Ensure the report is coherent and integrates the key points smoothly.
**Crucially: Your entire response MUST be grounded *exclusively* on the information provided in the 'Input Summaries' below. Do NOT add any external knowledge, facts, or details not present in these specific summaries.**
**Input Summaries:**
* **Renewable Energy:**
{renewable_energy_result}
* **Electric Vehicles:**
{ev_technology_result}
* **Carbon Capture:**
{carbon_capture_result}
**Output Format:**
## Summary of Recent Sustainable Technology Advancements
### Renewable Energy Findings
(Based on RenewableEnergyResearcher's findings)
[Synthesize and elaborate *only* on the renewable energy input summary provided above.]
### Electric Vehicle Findings
(Based on EVResearcher's findings)
[Synthesize and elaborate *only* on the EV input summary provided above.]
### Carbon Capture Findings
(Based on CarbonCaptureResearcher's findings)
[Synthesize and elaborate *only* on the carbon capture input summary provided above.]
### Overall Conclusion
[Provide a brief (1-2 sentence) concluding statement that connects *only* the findings presented above.]
Output *only* the structured report following this format. Do not include introductory or concluding phrases outside this structure, and strictly adhere to using only the provided input summary content.
""",
description="Combines research findings from parallel agents into a structured, cited report, strictly grounded on provided inputs.",
# No tools needed for merging
# No output_key needed here, as its direct response is the final output of the sequence
)
# --- 4. Create the SequentialAgent (Orchestrates the overall flow) ---
# This is the main agent that will be run. It first executes the ParallelAgent
# to populate the state, and then executes the MergerAgent to produce the final output.
sequential_pipeline_agent = SequentialAgent(
name="ResearchAndSynthesisPipeline",
# Run parallel research first, then merge
sub_agents=[parallel_research_agent, merger_agent],
description="Coordinates parallel research and synthesizes the results."
)
root_agent = sequential_pipeline_agent
import com.google.adk.agents.LlmAgent;
import com.google.adk.agents.ParallelAgent;
import com.google.adk.agents.SequentialAgent;
import com.google.adk.events.Event;
import com.google.adk.runner.InMemoryRunner;
import com.google.adk.sessions.Session;
import com.google.adk.tools.GoogleSearchTool;
import com.google.genai.types.Content;
import com.google.genai.types.Part;
import io.reactivex.rxjava3.core.Flowable;
public class ParallelResearchPipeline {
private static final String APP_NAME = "parallel_research_app";
private static final String USER_ID = "research_user_01";
private static final String GEMINI_MODEL = "gemini-2.0-flash";
// Assume google_search is an instance of the GoogleSearchTool
private static final GoogleSearchTool googleSearchTool = new GoogleSearchTool();
public static void main(String[] args) {
String query = "Summarize recent sustainable tech advancements.";
SequentialAgent sequentialPipelineAgent = initAgent();
runAgent(sequentialPipelineAgent, query);
}
public static SequentialAgent initAgent() {
// --- 1. Define Researcher Sub-Agents (to run in parallel) ---
// Researcher 1: Renewable Energy
LlmAgent researcherAgent1 = LlmAgent.builder()
.name("RenewableEnergyResearcher")
.model(GEMINI_MODEL)
.instruction("""
You are an AI Research Assistant specializing in energy.
Research the latest advancements in 'renewable energy sources'.
Use the Google Search tool provided.
Summarize your key findings concisely (1-2 sentences).
Output *only* the summary.
""")
.description("Researches renewable energy sources.")
.tools(googleSearchTool)
.outputKey("renewable_energy_result") // Store result in state
.build();
// Researcher 2: Electric Vehicles
LlmAgent researcherAgent2 = LlmAgent.builder()
.name("EVResearcher")
.model(GEMINI_MODEL)
.instruction("""
You are an AI Research Assistant specializing in transportation.
Research the latest developments in 'electric vehicle technology'.
Use the Google Search tool provided.
Summarize your key findings concisely (1-2 sentences).
Output *only* the summary.
""")
.description("Researches electric vehicle technology.")
.tools(googleSearchTool)
.outputKey("ev_technology_result") // Store result in state
.build();
// Researcher 3: Carbon Capture
LlmAgent researcherAgent3 = LlmAgent.builder()
.name("CarbonCaptureResearcher")
.model(GEMINI_MODEL)
.instruction("""
You are an AI Research Assistant specializing in climate solutions.
Research the current state of 'carbon capture methods'.
Use the Google Search tool provided.
Summarize your key findings concisely (1-2 sentences).
Output *only* the summary.
""")
.description("Researches carbon capture methods.")
.tools(googleSearchTool)
.outputKey("carbon_capture_result") // Store result in state
.build();
// --- 2. Create the ParallelAgent (Runs researchers concurrently) ---
// This agent orchestrates the concurrent execution of the researchers.
// It finishes once all researchers have completed and stored their results in state.
ParallelAgent parallelResearchAgent =
ParallelAgent.builder()
.name("ParallelWebResearchAgent")
.subAgents(researcherAgent1, researcherAgent2, researcherAgent3)
.description("Runs multiple research agents in parallel to gather information.")
.build();
// --- 3. Define the Merger Agent (Runs *after* the parallel agents) ---
// This agent takes the results stored in the session state by the parallel agents
// and synthesizes them into a single, structured response with attributions.
LlmAgent mergerAgent =
LlmAgent.builder()
.name("SynthesisAgent")
.model(GEMINI_MODEL)
.instruction(
"""
You are an AI Assistant responsible for combining research findings into a structured report.
Your primary task is to synthesize the following research summaries, clearly attributing findings to their source areas. Structure your response using headings for each topic. Ensure the report is coherent and integrates the key points smoothly.
**Crucially: Your entire response MUST be grounded *exclusively* on the information provided in the 'Input Summaries' below. Do NOT add any external knowledge, facts, or details not present in these specific summaries.**
**Input Summaries:**
* **Renewable Energy:**
{renewable_energy_result}
* **Electric Vehicles:**
{ev_technology_result}
* **Carbon Capture:**
{carbon_capture_result}
**Output Format:**
## Summary of Recent Sustainable Technology Advancements
### Renewable Energy Findings
(Based on RenewableEnergyResearcher's findings)
[Synthesize and elaborate *only* on the renewable energy input summary provided above.]
### Electric Vehicle Findings
(Based on EVResearcher's findings)
[Synthesize and elaborate *only* on the EV input summary provided above.]
### Carbon Capture Findings
(Based on CarbonCaptureResearcher's findings)
[Synthesize and elaborate *only* on the carbon capture input summary provided above.]
### Overall Conclusion
[Provide a brief (1-2 sentence) concluding statement that connects *only* the findings presented above.]
Output *only* the structured report following this format. Do not include introductory or concluding phrases outside this structure, and strictly adhere to using only the provided input summary content.
""")
.description(
"Combines research findings from parallel agents into a structured, cited report, strictly grounded on provided inputs.")
// No tools needed for merging
// No output_key needed here, as its direct response is the final output of the sequence
.build();
// --- 4. Create the SequentialAgent (Orchestrates the overall flow) ---
// This is the main agent that will be run. It first executes the ParallelAgent
// to populate the state, and then executes the MergerAgent to produce the final output.
SequentialAgent sequentialPipelineAgent =
SequentialAgent.builder()
.name("ResearchAndSynthesisPipeline")
// Run parallel research first, then merge
.subAgents(parallelResearchAgent, mergerAgent)
.description("Coordinates parallel research and synthesizes the results.")
.build();
return sequentialPipelineAgent;
}
public static void runAgent(SequentialAgent sequentialPipelineAgent, String query) {
// Create an InMemoryRunner
InMemoryRunner runner = new InMemoryRunner(sequentialPipelineAgent, APP_NAME);
// InMemoryRunner automatically creates a session service. Create a session using the service
Session session = runner.sessionService().createSession(APP_NAME, USER_ID).blockingGet();
Content userMessage = Content.fromParts(Part.fromText(query));
// Run the agent
Flowable<Event> eventStream = runner.runAsync(USER_ID, session.id(), userMessage);
// Stream event response
eventStream.blockingForEach(
event -> {
if (event.finalResponse()) {
System.out.printf("Event Author: %s \n Event Response: %s \n\n\n", event.author(), event.stringifyContent());
}
});
}
}