上下文(Context)¶
在智能体开发套件 (ADK) 中,上下文 (Context) 是指在特定操作期间,智能体及其工具可用的关键信息包。你可以将其视为在有效处理当前任务或对话轮次时,所需的背景知识和资源。
智能体通常需要的不仅仅是最新的用户消息才能表现良好。上下文至关重要,因为它能够:
- 维持状态: 记住对话中多个步骤的详细信息(例如,用户偏好、之前的计算、购物车中的物品)。这主要通过会话状态管理。
- 传递数据: 在一个步骤(如 LLM 调用或工具执行)中发现或生成的信息可以与后续步骤共享。会话状态在这里也是关键。
- 访问服务: 与框架功能交互,如:
- 制品 (Artifacts) 存储: 保存或加载与会话相关的文件或数据块(如 PDF、图像、配置文件)。
- 记忆 (Memory): 从过去的交互或与用户相关的外部知识源中搜索相关信息。
- 认证: 请求并检索工具安全访问外部 API 所需的凭证。
- 身份和跟踪: 知道当前运行的是哪个智能体 (
agent.name),以及唯一标识当前请求-响应周期 (invocation_id) 以进行日志记录和调试。 - 工具特定操作: 启用工具内的专门操作,例如请求认证或搜索记忆 (Memory),这些操作需要访问当前交互的详细信息。
保存单个完整的用户请求到最终响应周期(一次调用 (Invocation))所有信息的核心部分是 InvocationContext。但是,你通常不会直接创建或管理此对象。ADK 框架在调用开始时创建它(例如,通过 runner.run_async),并将相关上下文信息隐式传递给你的智能体代码、回调和工具。
# 框架如何提供上下文
from google.adk import Runner
# 1. 使用智能体和服务初始化运行器 (Runner)
runner = Runner(
app_name="my_app",
agent=my_root_agent,
session_service=my_session_service,
artifact_service=my_artifact_service,
)
# 2. 使用用户输入调用 run_async
# 注意:run_async 是一个异步生成器,会产出事件 (Events)。
# 框架内部会创建一个 InvocationContext 并将其隐式
# 传递给你的智能体代码、回调和工具。
async for event in runner.run_async(
user_id="user123",
session_id="session456",
new_message=user_message
):
print(event.stringify_content())
# 作为开发者,你通过方法参数中提供的上下文对象进行工作。
/* 概念伪代码:框架如何提供上下文(内部逻辑) */
const runner = new InMemoryRunner({ agent: myRootAgent });
const session = await runner.sessionService.createSession({ ... });
const userMessage = createUserContent(...);
// --- 在 runner.runAsync(...) 内部 ---
// 1. 框架为本次运行创建主上下文
const invocationContext = new InvocationContext({
invocationId: "unique-id-for-this-run",
session: session,
userContent: userMessage,
agent: myRootAgent, // 起始智能体
sessionService: runner.sessionService,
pluginManager: runner.pluginManager,
// ... 其他必要字段 ...
});
//
// 2. 框架调用智能体的 run 方法,隐式传递上下文
await myRootAgent.runAsync(invocationContext);
// --- 结束内部逻辑 ---
// 作为开发者,你使用方法参数中提供的上下文对象。
/* 概念伪代码:框架如何提供上下文(内部逻辑) */
sessionService := session.InMemoryService()
r, err := runner.New(runner.Config{
AppName: appName,
Agent: myAgent,
SessionService: sessionService,
})
if err != nil {
log.Fatalf("Failed to create runner: %v", err)
}
s, err := sessionService.Create(ctx, &session.CreateRequest{
AppName: appName,
UserID: userID,
})
if err != nil {
log.Fatalf("FATAL: Failed to create session: %v", err)
}
scanner := bufio.NewScanner(os.Stdin)
for {
fmt.Print("\nYou > ")
if !scanner.Scan() {
break
}
userInput := scanner.Text()
if strings.EqualFold(userInput, "quit") {
break
}
userMsg := genai.NewContentFromText(userInput, genai.RoleUser)
events := r.Run(ctx, s.Session.UserID(), s.Session.ID(), userMsg, agent.RunConfig{
StreamingMode: agent.StreamingModeNone,
})
fmt.Print("\nAgent > ")
for event, err := range events {
if err != nil {
log.Printf("ERROR during agent execution: %v", err)
break
}
fmt.Print(event.Content.Parts[0].Text)
}
}
/* 框架如何提供上下文 */
InMemoryRunner runner = new InMemoryRunner(agent);
Session session = runner
.sessionService()
.createSession(runner.appName(), USER_ID, initialState, SESSION_ID )
.blockingGet();
try (Scanner scanner = new Scanner(System.in, StandardCharsets.UTF_8)) {
while (true) {
System.out.print("\nYou > ");
String userInput = scanner.nextLine();
if ("quit".equalsIgnoreCase(userInput)) {
break;
}
Content userMsg = Content.fromParts(Part.fromText(userInput));
Flowable<Event> events = runner.runAsync(session.userId(), session.id(), userMsg);
System.out.print("\nAgent > ");
events.blockingForEach(event -> System.out.print(event.stringifyContent()));
}
}
不同类型的上下文¶
虽然 InvocationContext 作为全面的内部容器,但 ADK 提供了根据特定情况定制的专门上下文对象。这确保了你拥有适合手头任务的正确工具和权限,而无需在任何地方处理完整内部上下文的全部复杂性。以下是你将遇到的不同“风格”:
-
InvocationContext{: #invocationcontext }- 使用场所: 在智能体的核心实现方法(
_run_async_impl,_run_live_impl)中直接接收为ctx参数。 - 目的: 提供对当前调用整个状态的访问。这是最全面的上下文对象。
- 关键内容: 直接访问
session(包括state和events)、当前agent实例、invocation_id、初始user_content、对已配置服务的引用(artifact_service、memory_service、session_service),以及与实时/流式模式相关的字段。 - 使用场景: 主要在智能体的核心逻辑需要直接访问整体会话或服务时使用,尽管状态和制品 (Artifacts) 交互通常委托给使用自己上下文的回调/工具。也用于控制调用本身(例如,设置
ctx.end_invocation = True)。
# 接收 InvocationContext 的智能体实现 from google.adk.agents import BaseAgent from google.adk.agents.invocation_context import InvocationContext from google.adk.events import Event from typing import AsyncGenerator class MyAgent(BaseAgent): async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event, None]: # 直接访问示例 agent_name = ctx.agent.name session_id = ctx.session.id print(f"智能体 {agent_name} 正在会话 {session_id} 中为调用 {ctx.invocation_id} 运行") # ... 使用 ctx 的智能体逻辑 ... yield # ... 事件 ...// 伪代码:智能体实现接收 InvocationContext import { BaseAgent, InvocationContext, Event } from '@google/adk'; class MyAgent extends BaseAgent { async *runAsyncImpl(ctx: InvocationContext): AsyncGenerator<Event, void, undefined> { // 直接访问示例 const agentName = ctx.agent.name; const sessionId = ctx.session.id; console.log(`智能体 ${agentName} 正在会话 ${sessionId} 中为调用 ${ctx.invocationId} 运行`); // ... 使用 ctx 的智能体逻辑 ... yield; // ... 事件 ... } }import ( "google.golang.org/adk/agent" "google.golang.org/adk/session" ) // Pseudocode: Agent implementation receiving InvocationContext type MyAgent struct { } func (a *MyAgent) Run(ctx agent.InvocationContext) iter.Seq2[*session.Event, error] { return func(yield func(*session.Event, error) bool) { // Direct access example agentName := ctx.Agent().Name() sessionID := ctx.Session().ID() fmt.Printf("Agent %s running in session %s for invocation %s\n", agentName, sessionID, ctx.InvocationID()) // ... agent logic using ctx ... yield(&session.Event{Author: agentName}, nil) } }// 示例:接收 InvocationContext 的智能体实现 import com.google.adk.agents.BaseAgent; import com.google.adk.agents.InvocationContext; import com.google.adk.events.Event; import io.reactivex.rxjava3.core.Flowable; public class MyAgent extends BaseAgent { @Override protected Flowable<Event> runAsyncImpl(InvocationContext invocationContext) { // 直接访问示例 String agentName = invocationContext.agent().name(); String sessionId = invocationContext.session().id(); String invocationId = invocationContext.invocationId(); System.out.println("Agent " + agentName + " running in session " + sessionId + " for invocation " + invocationId); // ... 使用 invocationContext 的智能体逻辑 ... return Flowable.empty(); } } - 使用场所: 在智能体的核心实现方法(
-
ReadonlyContext{: #readonlycontext }- 使用场所: 在只需要对基本信息进行只读访问且不允许修改的场景中提供(例如,
InstructionProvider函数)。它也是其他上下文的基类。 - 目的: 提供基本上下文详细信息的安全、只读视图。
- 关键内容:
invocation_id、agent_name以及当前state的只读视图。
# 示例:接收 ReadonlyContext 的指令提供器 from google.adk.agents.readonly_context import ReadonlyContext def my_instruction_provider(context: ReadonlyContext) -> str: # 只读访问示例 # state 属性提供会话状态的只读 MappingProxyType 视图 user_tier = context.state.get("user_tier", "standard") # context.state['new_key'] = 'value' # TypeError: 'mappingproxy' 对象不支持项赋值 return f"Process the request for a {user_tier} user."// 伪代码:指令提供器接收 ReadonlyContext import { ReadonlyContext } from '@google/adk'; function myInstructionProvider(context: ReadonlyContext): string { // 只读访问示例 // state 对象是只读的 const userTier = context.state.get('user_tier') ?? 'standard'; // context.state.set('new_key', 'value'); // 这会失败或报错 return `处理 ${userTier} 用户的请求。`; }import "google.golang.org/adk/agent" // Pseudocode: Instruction provider receiving ReadonlyContext func myInstructionProvider(ctx agent.ReadonlyContext) (string, error) { // Read-only access example userTier, err := ctx.ReadonlyState().Get("user_tier") if err != nil { userTier = "standard" // Default value } // ctx.ReadonlyState() has no Set method since State() is read-only. return fmt.Sprintf("Process the request for a %v user.", userTier), nil }// 示例:接收 ReadonlyContext 的指令提供器 import com.google.adk.agents.ReadonlyContext; public String myInstructionProvider(ReadonlyContext context) { // 只读访问示例 // state() 返回一个不可修改的会话状态视图 String userTier = (String) context.state().getOrDefault("user_tier", "standard"); // context.state().put("new_key", "value"); // UnsupportedOperationException return "Process the request for a " + userTier + " user."; } - 使用场所: 在只需要对基本信息进行只读访问且不允许修改的场景中提供(例如,
-
CallbackContext{: #callbackcontext }- 使用场景: 在智能体生命周期回调(
before_agent_callback、after_agent_callback)和模型交互回调(before_model_callback、after_model_callback)中作为callback_context参数提供。 - 目的: 便于在 回调函数内部 检查和修改状态、与制品 (Artifacts) 交互以及访问调用详情。
- 核心功能(相对于
ReadonlyContext的增强):- 可变
state属性: 允许读取 和写入 会话状态。在此处所做的更改 (callback_context.state['key'] = value) 会被跟踪,并与回调后框架生成的事件相关联。 - 制品 (Artifacts) 方法: 提供
load_artifact(filename)和save_artifact(filename, part)方法,用于与配置的artifact_service进行交互。 - 直接访问
user_content。
- 可变
(注:在 TypeScript 中,
CallbackContext和ToolContext已统一为单个Context类型。)# 示例:接收 Context 的回调(CallbackContext 已统一为 Context) from google.adk.agents.context import Context from google.adk.models import LlmRequest from google.genai import types from typing import Optional def my_before_model_cb(context: Context, request: LlmRequest) -> Optional[types.Content]: # 读/写状态示例 call_count = context.state.get("model_calls", 0) context.state["model_calls"] = call_count + 1 # 修改状态(跟踪增量) # (可选)加载制品 (Artifact) # config_part = context.load_artifact("model_config.json") print(f"正在为调用 {context.invocation_id} 准备模型调用 #{call_count + 1}") return None # 允许模型调用继续// 伪代码:Callback 接收 Context import { Context, LlmRequest } from '@google/adk'; import { Content } from '@google/genai'; function myBeforeModelCb(context: Context, request: LlmRequest): Content | undefined { // 读/写 state 示例 const callCount = (context.state.get('model_calls') as number) || 0; context.state.set('model_calls', callCount + 1); // 修改 state // 可选:加载制品(Artifact) // const configPart = await context.loadArtifact('model_config.json'); console.log(`正在为调用 ${context.invocationId} 准备模型调用 #${callCount + 1}`); return undefined; // 允许模型调用继续 }import ( "google.golang.org/adk/agent" "google.golang.org/adk/model" ) // Pseudocode: Callback receiving CallbackContext func myBeforeModelCb(ctx agent.CallbackContext, req *model.LLMRequest) (*model.LLMResponse, error) { // Read/Write state example callCount, err := ctx.State().Get("model_calls") if err != nil { callCount = 0 // Default value } newCount := callCount.(int) + 1 if err := ctx.State().Set("model_calls", newCount); err != nil { return nil, err } // Optionally load an artifact // configPart, err := ctx.Artifacts().Load("model_config.json") fmt.Printf("Preparing model call #%d for invocation %s\n", newCount, ctx.InvocationID()) return nil, nil // Allow model call to proceed }// 示例:接收 CallbackContext 的回调 import com.google.adk.agents.CallbackContext; import com.google.adk.models.LlmRequest; import com.google.adk.models.LlmResponse; import io.reactivex.rxjava3.core.Maybe; public Maybe<LlmResponse> myBeforeModelCb(CallbackContext callbackContext, LlmRequest request) { // 读/写状态示例 int callCount = (int) callbackContext.state().getOrDefault("model_calls", 0); callbackContext.state().put("model_calls", callCount + 1); // 修改状态(跟踪增量) // 可选:加载制品(Artifact) // Maybe<Part> configPart = callbackContext.loadArtifact("model_config.json"); System.out.println("Preparing model call " + (callCount + 1) + " for invocation " + callbackContext.invocationId()); return Maybe.empty(); // 允许模型调用继续 } - 使用场景: 在智能体生命周期回调(
-
ToolContext{: #toolcontext }- 使用场所: 在支持
FunctionTool的函数以及工具执行回调(before_tool_callback,after_tool_callback)中作为tool_context提供。 - 目的: 提供
CallbackContext的所有功能,外加工具执行必需的专门逻辑,如处理认证、搜索记忆 (Memory) 和列出制品 (Artifacts)。 - 主要功能(相对于
CallbackContext的增强):- 认证方法:
request_credential(auth_config)用于触发认证流程,以及get_auth_response(auth_config)用于检索用户或系统提供的凭证。 - 制品 (Artifacts) 列表:
list_artifacts()用于发现会话中可用的制品 (Artifacts)。 - 记忆 (Memory) 搜索:
search_memory(query)用于查询配置的memory_service。 function_call_id属性: 标识触发此工具执行的 LLM 的特定函数调用,对于将认证请求或响应正确关联非常重要。actions属性: 直接访问此步骤的EventActions对象,允许工具发出状态更改、认证请求等信号。
- 认证方法:
# 示例:接收 ToolContext 的工具函数 from google.adk.tools import ToolContext from typing import Dict, Any # 假设此函数被 FunctionTool 包装 def search_external_api(query: str, tool_context: ToolContext) -> Dict[str, Any]: api_key = tool_context.state.get("api_key") if not api_key: # 定义所需的认证配置 # auth_config = AuthConfig(...) # tool_context.request_credential(auth_config) # 请求凭证 # 使用 'actions' 属性来标记已发起认证请求 # tool_context.actions.requested_auth_configs[tool_context.function_call_id] = auth_config return {"status": "需要认证"} # 使用 API key... print(f"工具正在为查询 '{query}' 执行,使用 API 密钥。调用 ID:{tool_context.invocation_id}") # 可选:搜索记忆或列出制品 # relevant_docs = tool_context.search_memory(f"info related to {query}") # available_files = tool_context.list_artifacts() return {"result": f"Data for {query} fetched."}// 伪代码:工具函数接收 Context import { Context } from '@google/adk'; // __假设此函数被 FunctionTool 包装__ function searchExternalApi(query: string, context: Context): { [key: string]: string } { const apiKey = context.state.get('api_key') as string; if (!apiKey) { // 定义所需的认证配置 // const authConfig = new AuthConfig(...); // context.requestCredential(authConfig); // 请求凭证 // 'actions' 属性现在由 requestCredential 自动更新 return { status: 'Auth Required' }; } // 使用 API 密钥... console.log(`工具正在为查询 '${query}' 执行,使用 API 密钥。调用 ID:${context.invocationId}`); // 可选:搜索记忆或列出制品 // 注意:在 TS 中访问记忆/制品等服务通常是异步的, // 因此如果你复用它们,需要将此函数标记为 'async'。 // context.searchMemory(`与 ${query} 相关的详细信息`).then(...) // context.listArtifacts().then(...) return { result: `已获取 ${query} 的数据。` }; }import "google.golang.org/adk/tool" // Pseudocode: Tool function receiving ToolContext type searchExternalAPIArgs struct { Query string `json:"query" jsonschema:"The query to search for."` } func searchExternalAPI(tc tool.Context, input searchExternalAPIArgs) (string, error) { apiKey, err := tc.State().Get("api_key") if err != nil || apiKey == "" { // In a real scenario, you would define and request credentials here. // This is a conceptual placeholder. return "", fmt.Errorf("auth required") } // Use the API key... fmt.Printf("Tool executing for query '%s' using API key. Invocation: %s\n", input.Query, tc.InvocationID()) // Optionally search memory or list artifacts // relevantDocs, _ := tc.SearchMemory(tc, "info related to %s", input.Query)) // availableFiles, _ := tc.Artifacts().List() return fmt.Sprintf("Data for %s fetched.", input.Query), nil }// Example: Tool function receiving ToolContext import com.google.adk.tools.ToolContext; import java.util.Map; // Assume this function is wrapped by a FunctionTool public Map<String, Object> searchExternalApi(String query, ToolContext toolContext) { String apiKey = (String) toolContext.state().getOrDefault("api_key", ""); if (apiKey.isEmpty()) { // Define required auth config // authConfig = AuthConfig(...); // toolContext.requestCredential(authConfig); // Request credentials // Use the 'actions' property to signal the auth request has been made return Map.of("status", "Auth Required"); } // Use the API key... System.out.println("Tool executing for query " + query + " using API key."); // Optionally list artifacts // Single<List<String>> availableFiles = toolContext.listArtifacts(); return Map.of("result", "Data for " + query + " fetched"); } - 使用场所: 在支持
理解这些不同的上下文对象以及何时使用它们是有效管理状态、访问服务以及控制 ADK 应用程序流程的关键。下一节将详细介绍使用这些上下文可以执行的常见任务。
使用上下文的常见任务¶
现在你已经了解了不同的上下文对象,让我们专注于在构建智能体和工具时如何使用它们来执行常见任务。
访问信息¶
你将经常需要读取存储在上下文中的信息。
-
读取会话状态: 访问之前步骤中保存的数据或用户/应用级别设置。在
state属性上使用类似字典的方式进行访问。# 示例:在工具函数中 from google.adk.tools import ToolContext def my_tool(tool_context: ToolContext, **kwargs): user_pref = tool_context.state.get("user_display_preference", "default_mode") api_endpoint = tool_context.state.get("app:api_endpoint") # 读取应用级状态 if user_pref == "dark_mode": # ... 应用深色模式逻辑 ... pass print(f"使用 API 端点:{api_endpoint}") # ... 工具逻辑的其余部分 ... # 示例:在回调函数中 from google.adk.agents.context import Context def my_callback(context: Context, **kwargs): last_tool_result = context.state.get("temp:last_api_result") # 读取临时状态 if last_tool_result: print(f"发现上一个工具的临时结果:{last_tool_result}") # ... 回调逻辑 ...// 伪代码:在工具函数中 import { Context } from '@google/adk'; async function myTool(context: Context) { const userPref = context.state.get('user_display_preference', 'default_mode'); const apiEndpoint = context.state.get('app:api_endpoint'); // 读取应用级状态 if (userPref === 'dark_mode') { // ... 应用深色模式逻辑 ... } console.log(`使用 API 端点:${apiEndpoint}`); // ... 工具逻辑的其余部分 ... } // 伪代码:在回调函数中 import { Context } from '@google/adk'; function myCallback(context: Context) { const lastToolResult = context.state.get('temp:last_api_result'); // 读取临时状态 if (lastToolResult) { console.log(`发现上一个工具的临时结果:${lastToolResult}`); } // ... 回调逻辑 ... }import ( "google.golang.org/adk/agent" "google.golang.org/adk/session" "google.golang.org/adk/tool" "google.golang.org/genai" ) // Pseudocode: In a Tool function type toolArgs struct { // Define tool-specific arguments here } type toolResults struct { // Define tool-specific results here } // Example tool function demonstrating state access func myTool(tc tool.Context, input toolArgs) (toolResults, error) { userPref, err := tc.State().Get("user_display_preference") if err != nil { userPref = "default_mode" } apiEndpoint, _ := tc.State().Get("app:api_endpoint") // Read app-level state if userPref == "dark_mode" { // ... apply dark mode logic ... } fmt.Printf("Using API endpoint: %v\n", apiEndpoint) // ... rest of tool logic ... return toolResults{}, nil } // Pseudocode: In a Callback function func myCallback(ctx agent.CallbackContext) (*genai.Content, error) { lastToolResult, err := ctx.State().Get("temp:last_api_result") // Read temporary state if err == nil { fmt.Printf("Found temporary result from last tool: %v\n", lastToolResult) } else { fmt.Println("No temporary result found.") } // ... callback logic ... return nil, nil }// 示例:在工具函数中 import com.google.adk.tools.ToolContext; public void myTool(ToolContext toolContext) { String userPref = (String) toolContext.state().getOrDefault("user_display_preference", "default_mode"); String apiEndpoint = (String) toolContext.state().get("app:api_endpoint"); // 读取应用级状态 if ("dark_mode".equals(userPref)) { // ... 应用深色模式逻辑 ... } System.out.println("使用 API 端点:" + apiEndpoint); // ... 工具逻辑的其余部分 ... } // 示例:在回调函数中 import com.google.adk.agents.CallbackContext; public void myCallback(CallbackContext callbackContext) { String lastToolResult = (String) callbackContext.state().get("temp:last_api_result"); // 读取临时状态 if (lastToolResult != null && !lastToolResult.isEmpty()) { System.out.println("发现上一个工具的临时结果:" + lastToolResult); } // ... 回调逻辑 ... } -
获取当前标识符: 对于基于当前操作的日志记录或自定义逻辑很有用。
# 示例:在任何上下文中(以 ToolContext 为例) from google.adk.tools import ToolContext def log_tool_usage(tool_context: ToolContext, **kwargs): agent_name = tool_context.agent_name inv_id = tool_context.invocation_id func_call_id = getattr(tool_context, 'function_call_id', 'N/A') # 特定于 ToolContext print(f"日志:调用 ID={inv_id}, 智能体={agent_name}, 函数调用 ID={func_call_id} - 工具已执行。")// 伪代码:在任何上下文中 import { Context } from '@google/adk'; function logToolUsage(context: Context) { const agentName = context.agentName; const invId = context.invocationId; const functionCallId = context.functionCallId ?? '无'; // 执行工具时可用 console.log(`日志:调用 ID=${invId}, 智能体=${agentName}, 函数调用 ID=${functionCallId} - 工具已执行。`); }import "google.golang.org/adk/tool" // Pseudocode: In any context (ToolContext shown) type logToolUsageArgs struct{} type logToolUsageResult struct { Status string `json:"status"` } func logToolUsage(tc tool.Context, args logToolUsageArgs) (logToolUsageResult, error) { agentName := tc.AgentName() invID := tc.InvocationID() funcCallID := tc.FunctionCallID() fmt.Printf("Log: Invocation=%s, Agent=%s, FunctionCallID=%s - Tool Executed.\n", invID, agentName, funcCallID) return logToolUsageResult{Status: "Logged successfully"}, nil }// 示例:在任何上下文中(以 ToolContext 为例) import com.google.adk.tools.ToolContext; public void logToolUsage(ToolContext toolContext) { String agentName = toolContext.agentName(); String invId = toolContext.invocationId(); String functionCallId = toolContext.functionCallId().orElse("N/A"); // 特定于 ToolContext System.out.println("日志:调用 ID= " + invId + " 智能体= " + agentName + " 函数调用 ID= " + functionCallId); } -
访问初始用户输入: 参考触发当前调用的初始消息。
# 示例:在回调中 from google.adk.agents.context import Context def check_initial_intent(context: Context, **kwargs): initial_text = "无" if context.user_content and context.user_content.parts: initial_text = context.user_content.parts[0].text or "非文本输入" print(f"本次调用以用户输入开始:'{initial_text}'") # 示例:在智能体的 _run_async_impl 中 # async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event, None]: # if ctx.user_content and ctx.user_content.parts: # initial_text = ctx.user_content.parts[0].text # print(f"智能体逻辑记住的初始查询为:{initial_text}") # ...import ( "google.golang.org/adk/agent" "google.golang.org/genai" ) // Pseudocode: In a Callback func logInitialUserInput(ctx agent.CallbackContext) (*genai.Content, error) { userContent := ctx.UserContent() if userContent != nil && len(userContent.Parts) > 0 { if text := userContent.Parts[0].Text; text != "" { fmt.Printf("User's initial input for this turn: '%s'\n", text) } } return nil, nil // No modification }// 示例:在回调中 import com.google.adk.agents.CallbackContext; import com.google.genai.types.Content; public void checkInitialIntent(CallbackContext callbackContext) { String initialText = "N/A"; if (callbackContext.userContent().isPresent() && callbackContext.userContent().get().parts() != null && !callbackContext.userContent().get().parts().get().isEmpty()) { initialText = callbackContext.userContent().get().parts().get().get(0).text().orElse("Non-text input"); // ... System.out.println("This invocation started with user input: " + initialText); } }
管理状态¶
状态对于记忆(Memory)和数据流至关重要。当你使用 CallbackContext 或 ToolContext 修改状态时,框架会自动跟踪并持久化这些更改。
-
在工具之间传递数据
# 示例:工具 1 - 获取用户 ID from google.adk.tools import ToolContext import uuid def get_user_profile(tool_context: ToolContext) -> dict: user_id = str(uuid.uuid4()) # 模拟获取 ID # 将 ID 保存到状态中供下一个工具使用 tool_context.state["temp:current_user_id"] = user_id return {"profile_status": "ID generated"} # 示例:工具 2 - 从状态中使用用户 ID def get_user_orders(tool_context: ToolContext) -> dict: user_id = tool_context.state.get("temp:current_user_id") if not user_id: return {"error": "User ID not found in state"} print(f"正在获取用户 ID 的订单:{user_id}") # ... 使用 user_id 获取订单的逻辑 ... return {"orders": ["order123", "order456"]}// 伪代码:工具 1 - 获取用户 ID import { Context } from '@google/adk'; import { v4 as uuidv4 } from 'uuid'; function getUserProfile(context: Context): Record<string, string> { const userId = uuidv4(); // 模拟获取 ID // 将 ID 保存到状态中供下一个工具使用 context.state.set('temp:current_user_id', userId); return { profile_status: 'ID generated' }; } // 伪代码:工具 2 - 从状态中使用用户 ID function getUserOrders(context: Context): Record<string, string | string[]> { const userId = context.state.get('temp:current_user_id'); if (!userId) { return { error: '状态中未找到用户 ID' }; } console.log(`正在获取用户 ID 的订单:${userId}`); // ... 使用 user_id 获取订单的逻辑 ... return { orders: ['order123', 'order456'] }; }import "google.golang.org/adk/tool" // Pseudocode: Tool 1 - Fetches user ID type GetUserProfileArgs struct { } func getUserProfile(tc tool.Context, input GetUserProfileArgs) (string, error) { // A random user ID for demonstration purposes userID := "random_user_456" // Save the ID to state for the next tool if err := tc.State().Set("temp:current_user_id", userID); err != nil { return "", fmt.Errorf("failed to set user ID in state: %w", err) } return "ID generated", nil } // Pseudocode: Tool 2 - Uses user ID from state type GetUserOrdersArgs struct { } type getUserOrdersResult struct { Orders []string `json:"orders"` } func getUserOrders(tc tool.Context, input GetUserOrdersArgs) (*getUserOrdersResult, error) { userID, err := tc.State().Get("temp:current_user_id") if err != nil { return &getUserOrdersResult{}, fmt.Errorf("user ID not found in state") } fmt.Printf("Fetching orders for user ID: %v\n", userID) // ... logic to fetch orders using user_id ... return &getUserOrdersResult{Orders: []string{"order123", "order456"}}, nil }// 示例:工具 1 - 获取用户 ID import com.google.adk.tools.ToolContext; import java.util.Map; import java.util.UUID; public Map<String, String> getUserProfile(ToolContext toolContext) { String userId = UUID.randomUUID().toString(); // 将 ID 保存到状态中供下一个工具使用 toolContext.state().put("temp:current_user_id", userId); return Map.of("profile_status", "ID generated"); } // 示例:工具 2 - 从状态中使用用户 ID public Map<String, String> getUserOrders(ToolContext toolContext) { String userId = (String) toolContext.state().get("temp:current_user_id"); if (userId == null || userId.isEmpty()) { return Map.of("error", "User ID not found in state"); } System.out.println("正在获取用户 ID 的订单:" + userId); // ... 使用 user_id 获取订单的逻辑 ... return Map.of("orders", "order123"); } -
更新用户偏好:
# 示例:工具或回调识别偏好 from google.adk.tools import ToolContext # 或 Context def set_user_preference(tool_context: ToolContext, preference: str, value: str) -> dict: # 如果使用持久化 SessionService,请使用 'user:' 前缀表示用户级状态 state_key = f"user:{preference}" tool_context.state[state_key] = value print(f"已将用户偏好 '{preference}' 设置为 '{value}'") return {"status": "Preference updated"}// 伪代码:工具或回调识别偏好 import { Context } from '@google/adk'; function setUserPreference(context: Context, preference: string, value: string): Record<string, string> { // 使用 'user:' 前缀表示用户级状态(如果使用持久化 SessionService) const stateKey = `user:${preference}`; context.state.set(stateKey, value); console.log(`已将用户偏好 '${preference}' 设置为 '${value}'`); return { status: 'Preference updated' }; }import "google.golang.org/adk/tool" // Pseudocode: Tool or Callback identifies a preference type setUserPreferenceArgs struct { Preference string `json:"preference" jsonschema:"The name of the preference to set."` Value string `json:"value" jsonschema:"The value to set for the preference."` } type setUserPreferenceResult struct { Status string `json:"status"` } func setUserPreference(tc tool.Context, args setUserPreferenceArgs) (setUserPreferenceResult, error) { // Use 'user:' prefix for user-level state (if using a persistent SessionService) stateKey := fmt.Sprintf("user:%s", args.Preference) if err := tc.State().Set(stateKey, args.Value); err != nil { return setUserPreferenceResult{}, fmt.Errorf("failed to set preference in state: %w", err) } fmt.Printf("Set user preference '%s' to '%s'\n", args.Preference, args.Value) return setUserPreferenceResult{Status: "Preference updated"}, nil }// 示例:工具或回调识别偏好 import com.google.adk.tools.ToolContext; // 或 CallbackContext public Map<String, String> setUserPreference(ToolContext toolContext, String preference, String value) { // 如果使用持久化 SessionService,请使用 'user:' 前缀表示用户级状态 String stateKey = "user:" + preference; toolContext.state().put(stateKey, value); System.out.println("Set user preference '" + preference + "' to '" + value + "'"); return Map.of("status", "Preference updated"); } -
状态前缀: 虽然基本状态是会话特定的,但像
app:和user:这样的前缀可以与持久SessionService实现(如DatabaseSessionService或VertexAiSessionService)一起使用,以指示更广泛的范围(应用范围或跨会话的用户范围)。temp:可以表示仅在当前调用中相关的数据。
使用制品(Artifacts)¶
使用制品(Artifacts)处理与会话相关的文件或大型数据块。常见用例:处理上传的文档。
-
文档摘要生成器示例流程:
-
引入引用(例如,在设置工具或回调中): 将文档的路径或 URI保存为制品(Artifacts),而不是整个内容。
# Example: In a callback or initial tool from google.adk.agents.context import Context # Or ToolContext from google.genai import types def save_document_reference(context: Context, file_path: str) -> None: # Assume file_path is something like "gs://my-bucket/docs/report.pdf" or "/local/path/to/report.pdf" try: # Create a Part containing the path/URI text artifact_part = types.Part.from_text(file_path) version = context.save_artifact("document_to_summarize.txt", artifact_part) print(f"Saved document reference '{file_path}' as artifact version {version}") # 如果其他工具需要,将文件名存储在状态中 context.state["temp:doc_artifact_name"] = "document_to_summarize.txt" except ValueError as e: print(f"Error saving artifact: {e}") # 例如,制品服务未配置 except Exception as e: print(f"Unexpected error saving artifact reference: {e}") # Example usage: # save_document_reference(context, "gs://my-bucket/docs/report.pdf")// Pseudocode: In a callback or initial tool import { Context } from '@google/adk'; import type { Part } from '@google/genai'; async function saveDocumentReference(context: Context, filePath: string) { // Assume filePath is something like "gs://my-bucket/docs/report.pdf" or "/local/path/to/report.pdf" try { // Create a Part containing the path/URI text const artifactPart: Part = { text: filePath }; const version = await context.saveArtifact('document_to_summarize.txt', artifactPart); console.log(`Saved document reference '${filePath}' as artifact version ${version}`); // Store the filename in state if needed by other tools context.state.set('temp:doc_artifact_name', 'document_to_summarize.txt'); } catch (e) { console.error(`Unexpected error saving artifact reference: ${e}`); } } // Example usage: // saveDocumentReference(context, "gs://my-bucket/docs/report.pdf");import ( "google.golang.org/adk/tool" "google.golang.org/genai" ) // Adapt the saveDocumentReference callback into a tool for this example. type saveDocRefArgs struct { FilePath string `json:"file_path" jsonschema:"The path to the file to save."` } type saveDocRefResult struct { Status string `json:"status"` } func saveDocRef(tc tool.Context, args saveDocRefArgs) (saveDocRefResult, error) { artifactPart := genai.NewPartFromText(args.FilePath) _, err := tc.Artifacts().Save(tc, "document_to_summarize.txt", artifactPart) if err != nil { return saveDocRefResult{}, err } fmt.Printf("Saved document reference '%s' as artifact\n", args.FilePath) if err := tc.State().Set("temp:doc_artifact_name", "document_to_summarize.txt"); err != nil { return saveDocRefResult{}, fmt.Errorf("failed to set artifact name in state") } return saveDocRefResult{"Reference saved"}, nil }// Example: In a callback or initial tool import com.google.adk.agents.CallbackContext; import com.google.genai.types.Content; import com.google.genai.types.Part; import java.util.Optional; public void saveDocumentReference(CallbackContext context, String filePath) { // Assume file_path is something like "gs://my-bucket/docs/report.pdf" or "/local/path/to/report.pdf" try { // Create a Part containing the path/URI text Part artifactPart = Part.fromText(filePath); Optional<Integer> version = context.saveArtifact("document_to_summarize.txt", artifactPart); System.out.println("Saved document reference" + filePath + " as artifact version " + version.orElse(-1)); // Store the filename in state if needed by other tools context.state().put("temp:doc_artifact_name", "document_to_summarize.txt"); } catch (Exception e) { System.out.println("Unexpected error saving artifact reference: " + e); } } // 使用示例: // saveDocumentReference(context, "gs://my-bucket/docs/report.pdf") -
摘要工具: 加载制品(Artifacts)以获取路径/URI,使用适当的库读取实际文档内容,总结,并返回结果。
# Example: In the Summarizer tool function from google.adk.tools import ToolContext from google.genai import types # 假设像 google.cloud.storage 或内置 open 等库可用 # 假设存在 'summarize_text' 函数 # from my_summarizer_lib import summarize_text def summarize_document_tool(tool_context: ToolContext) -> dict: artifact_name = tool_context.state.get("temp:doc_artifact_name") if not artifact_name: return {"error": "Document artifact name not found in state."} try: # 1. 加载包含路径/URI 的制品部分 artifact_part = tool_context.load_artifact(artifact_name) if not artifact_part or not artifact_part.text: return {"error": f"Could not load artifact or artifact has no text path: {artifact_name}"} file_path = artifact_part.text print(f"Loaded document reference: {file_path}") # 2. 读取实际文档内容(在 ADK 上下文之外) document_content = "" if file_path.startswith("gs://"): # Example: Use GCS client library to download/read pass # Replace with actual GCS reading logic elif file_path.startswith("/"): # 示例:使用本地文件系统 with open(file_path, 'r', encoding='utf-8') as f: document_content = f.read() else: return {"error": f"Unsupported file path scheme: {file_path}"} # 3. 总结内容 if not document_content: return {"error": "Failed to read document content."} # summary = summarize_text(document_content) # 调用你的总结逻辑 summary = f"Summary of content from {file_path}" # 占位符 return {"summary": summary} except ValueError as e: return {"error": f"Artifact service error: {e}"} except FileNotFoundError: return {"error": f"Local file not found: {file_path}"}// 伪代码:在摘要生成器工具函数中 import { Context } from '@google/adk'; async function summarizeDocumentTool(context: Context): Promise<Record<string, string>> { const artifactName = context.state.get('temp:doc_artifact_name') as string; if (!artifactName) { return { error: '状态中未找到文档制品名称。' }; } try { // 1. 加载包含路径/URI 的制品部分 const artifactPart = await context.loadArtifact(artifactName); if (!artifactPart?.text) { return { error: `无法加载制品或制品没有文本路径:${artifactName}` }; } const filePath = artifactPart.text; console.log(`已加载文档引用:${filePath}`); // 2. 读取实际文档内容(在 ADK 上下文之外) let documentContent = ''; if (filePath.startsWith('gs://')) { // 示例:使用 GCS 客户端库进行下载/读取 // const storage = new Storage(); // const bucket = storage.bucket('my-bucket'); // const file = bucket.file(filePath.replace('gs://my-bucket/', '')); // const [contents] = await file.download(); // documentContent = contents.toString(); } else if (filePath.startsWith('/')) { // 示例:使用本地文件系统 // import { readFile } from 'fs/promises'; // documentContent = await readFile(filePath, 'utf8'); } else { return { error: `不支持的文件路径方案: ${filePath}` }; } // 3. 总结内容 if (!documentContent) { return { error: '读取文档内容失败。' }; } // const summary = summarizeText(documentContent); // 调用你的总结逻辑 const summary = `来自 ${filePath} 的内容摘要`; // 占位符 return { summary }; } catch (e) { return { error: `处理制品时出错: ${e}` }; } }import "google.golang.org/adk/tool" // Pseudocode: In the Summarizer tool function type summarizeDocumentArgs struct{} type summarizeDocumentResult struct { Summary string `json:"summary"` } func summarizeDocumentTool(tc tool.Context, input summarizeDocumentArgs) (summarizeDocumentResult, error) { artifactName, err := tc.State().Get("temp:doc_artifact_name") if err != nil { return summarizeDocumentResult{}, fmt.Errorf("No document artifact name found in state") } // 1. Load the artifact part containing the path/URI artifactPart, err := tc.Artifacts().Load(tc, artifactName.(string)) if err != nil { return summarizeDocumentResult{}, err } if artifactPart.Part.Text == "" { return summarizeDocumentResult{}, fmt.Errorf("Could not load artifact or artifact has no text path.") } filePath := artifactPart.Part.Text fmt.Printf("Loaded document reference: %s\n", filePath) // 2. Read the actual document content (outside ADK context) // In a real implementation, you would use a GCS client or local file reader. documentContent := "This is the fake content of the document at " + filePath _ = documentContent // Avoid unused variable error. // 3. Summarize the content summary := "Summary of content from " + filePath // Placeholder return summarizeDocumentResult{Summary: summary}, nil }// 示例:在摘要生成器工具函数中 import com.google.adk.tools.ToolContext; import com.google.genai.types.Content; import com.google.genai.types.Part; import java.util.Map; import java.util.Optional; import java.io.FileNotFoundException; public Map<String, String> summarizeDocumentTool(ToolContext toolContext) { String artifactName = (String) toolContext.state().get("temp:doc_artifact_name"); if (artifactName == null || artifactName.isEmpty()) { return Map.of("error", "状态中未找到文档制品名称。"); } try { // 1. 加载包含路径/URI 的制品部分 Optional<Part> artifactPart = toolContext.loadArtifact(artifactName); if (!artifactPart.isPresent() || !artifactPart.get().text().isPresent() || artifactPart.get().text().get().isEmpty()) { return Map.of("error", "无法加载制品或制品没有文本路径: " + artifactName); } String filePath = artifactPart.get().text().get(); System.out.println("已加载文档引用: " + filePath); // 2. 读取实际文档内容(在 ADK 上下文之外) String documentContent = ""; if (filePath.startsWith("gs://")) { // 示例:使用 GCS 客户端库进行下载/读取 } else if (filePath.startsWith("/")) { // 示例:使用本地文件系统 } else { return Map.of("error", "不支持的文件路径方案: " + filePath); } // 3. 总结内容 if (documentContent.isEmpty()) { return Map.of("error", "读取文档内容失败。"); } // summary = summarizeText(documentContent) // 调用你的总结逻辑 String summary = "来自 " + filePath + " 的内容摘要"; // 占位符 return Map.of("summary", summary); } catch (IllegalArgumentException e) { return Map.of("error", "制品服务错误 " + e); } catch (Exception e) { return Map.of("error", "读取文档时出错 " + e); } }
-
-
列出制品(Artifacts): 发现哪些文件可用。
// 伪代码:在工具函数中 import { Context } from '@google/adk'; async function checkAvailableDocs(context: Context): Promise<Record<string, string[] | string>> { try { const artifactKeys = await context.listArtifacts(); console.log(`可用制品: ${artifactKeys}`); return { available_docs: artifactKeys }; } catch (e) { return { error: `制品服务错误: ${e}` }; } }import "google.golang.org/adk/tool" // Pseudocode: In a tool function type checkAvailableDocsArgs struct{} type checkAvailableDocsResult struct { AvailableDocs []string `json:"available_docs"` } func checkAvailableDocs(tc tool.Context, args checkAvailableDocsArgs) (checkAvailableDocsResult, error) { artifactKeys, err := tc.Artifacts().List(tc) if err != nil { return checkAvailableDocsResult{}, err } fmt.Printf("Available artifacts: %v\n", artifactKeys) return checkAvailableDocsResult{AvailableDocs: artifactKeys.FileNames}, nil }// 示例:在工具函数中 import com.google.adk.tools.ToolContext; import io.reactivex.rxjava3.core.Single; import java.util.List; import java.util.Map; public Map<String, Object> checkAvailableDocs(ToolContext toolContext) { try { Single<List<String>> artifactKeys = toolContext.listArtifacts(); System.out.println("可用制品: " + artifactKeys.blockingGet().toString()); return Map.of("availableDocs", artifactKeys.blockingGet()); } catch (IllegalArgumentException e) { return Map.of("error", "制品服务错误: " + e); } }
处理工具认证¶
安全管理工具所需的 API 密钥或其他凭证。
# 示例:需要认证的工具
from google.adk.tools import ToolContext
from google.adk.auth import AuthConfig # 假设已定义适当的 AuthConfig
# 定义所需的认证配置(例如 OAuth、API 密钥)
MY_API_AUTH_CONFIG = AuthConfig(...)
AUTH_STATE_KEY = "user:my_api_credential" # 存储检索到的凭证的键
def call_secure_api(tool_context: ToolContext, request_data: str) -> dict:
# 1. 检查状态中是否已存在凭证
credential = tool_context.state.get(AUTH_STATE_KEY)
if not credential:
# 2. 如果不存在,则请求凭证
print("未找到凭证,正在请求...")
try:
tool_context.request_credential(MY_API_AUTH_CONFIG)
# 框架处理生成事件。工具执行在本轮停止。
return {"status": "需要认证,请提供凭证。"}
except ValueError as e:
return {"error": f"认证错误: {e}"} # 例如:function_call_id 缺失
except Exception as e:
return {"error": f"请求凭证失败: {e}"}
# 3. 如果凭证存在(可能来自请求后的上一个回合)
# 或者如果在外部完成认证流程后进行后续调用
try:
# 可选:如果需要,重新验证/检索,或直接使用
# 如果外部流程刚刚完成,这可能会检索凭证
auth_credential_obj = tool_context.get_auth_response(MY_API_AUTH_CONFIG)
api_key = auth_credential_obj.api_key # 或 access_token 等。
# 将其存回状态,供会话内的将来调用使用
tool_context.state[AUTH_STATE_KEY] = auth_credential_obj.model_dump() # 持久化检索到的凭证
print(f"正在使用检索到的凭证通过数据调用 API: {request_data}")
# ... 使用 api_key 进行实际的 API 调用 ...
api_result = f"API result for {request_data}"
return {"result": api_result}
except Exception as e:
# 处理检索/使用凭证时的错误
print(f"使用凭证时出错: {e}")
# 如果凭证无效,可以选择清除状态键
# tool_context.state[AUTH_STATE_KEY] = None
return {"error": "使用凭证失败"}
// 伪代码:需要认证的工具
import { Context } from '@google/adk'; // AuthConfig 来自 ADK 或自定义
// 定义一个本地 AuthConfig 接口,因为 ADK 未公开导出它
interface AuthConfig {
credentialKey: string;
authScheme: { type: string }; // 示例的最小化表示
}
// 定义所需的认证配置(例如 OAuth、API 密钥)
const MY_API_AUTH_CONFIG: AuthConfig = {
credentialKey: 'my-api-key', // 示例密钥
authScheme: { type: 'api-key' }, // 示例方案类型
};
const AUTH_STATE_KEY = 'user:my_api_credential'; // 存储检索到的凭证的键
async function callSecureApi(context: Context, requestData: string): Promise<Record<string, string>> {
// 1. 检查状态中是否已存在凭证
const credential = context.state.get(AUTH_STATE_KEY);
if (!credential) {
// 2. 如果不存在,则请求凭证
console.log('未找到凭证,正在请求...');
try {
context.requestCredential(MY_API_AUTH_CONFIG);
// 框架处理生成的事件。工具执行在本轮停止。
return { status: '需要认证,请提供凭证。' };
} catch (e) {
return { error: `认证或凭证请求错误: ${e}` };
}
}
// 3. 如果凭证已存在(可能来自请求后的上一个轮次)
// 或者是在外部完成认证流程后进行的后续调用
try {
// (可选)如果需要,重新验证/检索,或直接使用
// 如果外部流程刚刚完成,这可能会检索凭证
const authCredentialObj = context.getAuthResponse(MY_API_AUTH_CONFIG);
const apiKey = authCredentialObj?.apiKey; // 或 accessToken 等
// 将其存回状态,供会话内的将来调用使用
// 注意:在严格的 TypeScript 中,可能需要对 authCredentialObj 进行转换或序列化
context.state.set(AUTH_STATE_KEY, JSON.stringify(authCredentialObj));
console.log(`正在使用检索到的凭证调用 API,数据为: ${requestData}`);
// ... 使用 apiKey 进行实际的 API 调用 ...
const apiResult = `${requestData} 的 API 结果`;
return { result: apiResult };
} catch (e) {
// 处理检索/使用凭证时的错误
console.error(`使用凭证时出错: ${e}`);
// 如果凭证无效,可以选择清除状态键
// toolContext.state.set(AUTH_STATE_KEY, null);
return { error: '使用凭证失败' };
}
}
// 示例:需要认证的工具
import com.google.adk.tools.ToolContext;
import java.util.Map;
// 注意:AuthConfig、requestCredential 和 getAuthResponse 尚未
// 在 Java ADK 公共 API 中完全实现。
// 本示例依赖于将会话状态中的外部认证信息进行填充。
public class SecureApiTool {
private static final String AUTH_STATE_KEY = "user:my_api_credential";
public Map<String, String> callSecureApi(ToolContext context, String requestData) {
// 1. 检查状态中是否已存在凭证
Object credential = context.state().get(AUTH_STATE_KEY);
if (credential == null) {
// 2. 如果不存在,则请求凭证
System.out.println("未找到凭证,正在请求...");
try {
// context.requestCredential(MY_API_AUTH_CONFIG); # Java ADK 尚未实现
// 框架处理生成的事件。工具执行在本轮停止。
return Map.of("status", "需要认证,请提供凭证。");
} catch (Exception e) {
return Map.of("error", "认证或凭证请求错误: " + e.getMessage());
}
}
// 3. 如果凭证已存在(可能来自请求后的上一个轮次)
// 或者是在外部完成认证流程后进行的后续调用
try {
// (可选)如果需要,重新验证/检索,或直接使用
// String apiKey = context.getAuthResponse(MY_API_AUTH_CONFIG).getApiKey();
String apiKey = credential.toString(); // 示例简化逻辑
// 将其存回状态,供会话内的将来调用使用
context.state().put(AUTH_STATE_KEY, apiKey);
System.out.println("正在使用检索到的凭证调用 API,数据为: " + requestData);
// ... 使用 apiKey 进行实际的 API 调用 ...
String apiResult = "API result for " + requestData;
return Map.of("result", apiResult);
} catch (Exception e) {
// 处理检索/使用凭证时的错误
System.err.println("使用凭证时出错: " + e.getMessage());
return Map.of("error", "使用凭证失败");
}
}
}
请记住:request_credential 会暂停工具并发出认证需求信号。用户/系统提供凭证后,在后续调用中,get_auth_response(或再次检查状态)允许工具继续执行。 框架会隐式使用 tool_context.function_call_id 来关联请求和响应。
利用记忆(Memory)¶
访问来自过去或外部来源的相关信息。
# Example: Tool using memory search
from google.adk.tools import ToolContext
def find_related_info(tool_context: ToolContext, topic: str) -> dict:
try:
search_results = tool_context.search_memory(f"关于 {topic} 的信息")
if search_results.results:
print(f"为 '{topic}' 找到了 {len(search_results.results)} 条记忆结果")
# 处理 search_results.results (类型为 SearchMemoryResponseEntry)
top_result_text = search_results.results[0].text
return {"memory_snippet": top_result_text}
else:
return {"message": "未找到相关记忆。"}
except ValueError as e:
return {"error": f"记忆服务错误:{e}"} # 例如:服务未配置
except Exception as e:
return {"error": f"搜索记忆时发生意外错误:{e}"}
// 伪代码:使用记忆搜索的工具
import { Context } from '@google/adk';
async function findRelatedInfo(context: Context, topic: string): Promise<Record<string, string>> {
try {
const searchResults = await context.searchMemory(`关于 ${topic} 的信息`);
if (searchResults.results?.length) {
console.log(`为 '${topic}' 找到了 ${searchResults.results.length} 条记忆结果`);
// 处理 searchResults.results
const topResultText = searchResults.results[0].text;
return { memory_snippet: topResultText };
} else {
return { message: '未找到相关记忆。' };
}
} catch (e) {
return { error: `记忆服务错误:${e}` }; // 例如:服务未配置
}
}
// Example: Tool using memory search
import com.google.adk.tools.ToolContext;
import com.google.adk.memory.SearchMemoryResponse;
import io.reactivex.rxjava3.core.Single;
import java.util.Map;
public class MemorySearchTool {
public Single<Map<String, String>> findRelatedInfo(ToolContext context, String topic) {
return context.searchMemory("Information about " + topic)
.map(searchResults -> {
if (searchResults != null && searchResults.results() != null && !searchResults.results().isEmpty()) {
System.out.println("Found " + searchResults.results().size() + " memory results for '" + topic + "'");
// Process searchResults.results
String topResultText = searchResults.results().get(0).text();
return Map.of("memory_snippet", topResultText);
} else {
return Map.of("message", "No relevant memories found.");
}
})
.onErrorReturnItem(Map.of("error", "Memory service error"));
}
}
Advanced: Direct InvocationContext Usage¶
虽然大多数交互通过CallbackContext或ToolContext进行,但有时智能体的核心逻辑(_run_async_impl/_run_live_impl)需要直接访问。
# Example: Inside agent's _run_async_impl
from google.adk.agents import BaseAgent
from google.adk.agents.invocation_context import InvocationContext
from google.adk.events import Event
from typing import AsyncGenerator
class MyControllingAgent(BaseAgent):
async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event, None]:
# Example: Check if a specific service is available
if not ctx.memory_service:
print("Memory service is not available for this invocation.")
# Potentially change agent behavior
# Example: Early termination based on some condition
if ctx.session.state.get("critical_error_flag"):
print("Critical error detected, ending invocation.")
ctx.end_invocation = True # Signal framework to stop processing
yield Event(author=self.name, invocation_id=ctx.invocation_id, content="Stopping due to critical error.")
return # Stop this agent's execution
# ... Normal agent processing ...
yield # ... event ...
// 伪代码:在智能体的 runAsyncImpl 内部
import { BaseAgent, InvocationContext } from '@google/adk';
import type { Event } from '@google/adk';
class MyControllingAgent extends BaseAgent {
async *runAsyncImpl(ctx: InvocationContext): AsyncGenerator<Event, void, undefined> {
// 示例:检查特定服务是否可用
if (!ctx.memoryService) {
console.log('本次调用记忆服务不可用。');
// 潜在地改变智能体行为
}
// 示例:基于某些条件提前终止
// 通过 ctx.session.state 直接访问状态,或者如果已包装则通过 ctx.session.state 属性访问
if ((ctx.session.state as { 'critical_error_flag': boolean })['critical_error_flag']) {
console.log('检测到严重错误,正在结束调用。');
ctx.endInvocation = true; // 发送信号给框架以停止处理
yield {
author: this.name,
invocationId: ctx.invocationId,
content: { parts: [{ text: '由于严重错误而停止。' }] }
} as Event;
return; // 停止此智能体的执行
}
// ... 正常的智能体处理 ...
yield; // ... 事件 ...
}
}
// Example: Inside agent's runAsyncImpl
import com.google.adk.agents.BaseAgent;
import com.google.adk.agents.InvocationContext;
import com.google.adk.events.Event;
import com.google.genai.types.Content;
import com.google.genai.types.Part;
import io.reactivex.rxjava3.core.Flowable;
import java.util.List;
public class MyControllingAgent extends BaseAgent {
@Override
protected Flowable<Event> runAsyncImpl(InvocationContext ctx) {
// Example: Check if a specific service is available
if (ctx.memoryService() == null) {
System.out.println("Memory service is not available for this invocation.");
// Potentially change agent behavior
}
// Example: Early termination based on some condition
Boolean criticalError = (Boolean) ctx.session().state().getOrDefault("critical_error_flag", false);
if (criticalError != null && criticalError) {
System.out.println("Critical error detected, ending invocation.");
ctx.setEndInvocation(true); // Signal framework to stop processing
Event errorEvent = Event.builder()
.author(name())
.invocationId(ctx.invocationId())
.content(Content.builder().parts(List.of(Part.builder().text("Stopping due to critical error.").build())).build())
.build();
return Flowable.just(errorEvent); // Stop this agent's execution
}
// ... Normal agent processing ...
// return Flowable.just(normalEvent);
return Flowable.empty();
}
}
Setting ctx.end_invocation = True is a way to gracefully stop the entire request-response cycle from within the agent or its callbacks/tools (via their respective context objects which also have access to modify the underlying InvocationContext's flag).
关键要点和最佳实践¶
- 使用合适的上下文: 始终使用提供的最具体的上下文对象(工具/工具回调中的
ToolContext,智能体/模型回调中的CallbackContext,适用情况下的ReadonlyContext)。仅在必要时直接在_run_async_impl/_run_live_impl中使用完整的InvocationContext(ctx)。 - 用于数据流的状态:
context.state是在调用内部共享数据、记住偏好和管理对话记忆(Memory)的主要方式。使用持久存储时,要深思熟虑地使用前缀(app:、user:、temp:)。 - 用于文件的制品(Artifacts): 使用
context.save_artifact和context.load_artifact来管理文件引用(如路径或 URI)或更大的数据块。存储引用,按需加载内容。 - 跟踪更改: 通过上下文方法对状态或制品(Artifacts)所做的修改会自动链接到当前步骤的
EventActions并由SessionService处理。 - 从简单开始: 首先专注于
state和基本制品(Artifacts)用法。随着需求变得更加复杂,再探索认证、记忆(Memory)和高级InvocationContext字段(如用于实时流式处理的字段)。
通过理解并有效使用这些上下文对象,你可以使用 ADK 构建更复杂、状态化且功能强大的智能体。