Skip to content

上下文(Context)

Supported in ADKPython v0.1.0TypeScript v0.2.0Go v0.1.0Java v0.1.0

在智能体开发套件 (ADK) 中,上下文 (Context) 是指在特定操作期间,智能体及其工具可用的关键信息包。你可以将其视为在有效处理当前任务或对话轮次时,所需的背景知识和资源。

智能体通常需要的不仅仅是最新的用户消息才能表现良好。上下文至关重要,因为它能够:

  1. 维持状态: 记住对话中多个步骤的详细信息(例如,用户偏好、之前的计算、购物车中的物品)。这主要通过会话状态管理。
  2. 传递数据: 在一个步骤(如 LLM 调用或工具执行)中发现或生成的信息可以与后续步骤共享。会话状态在这里也是关键。
  3. 访问服务: 与框架功能交互,如:
    • 制品 (Artifacts) 存储: 保存或加载与会话相关的文件或数据块(如 PDF、图像、配置文件)。
    • 记忆 (Memory): 从过去的交互或与用户相关的外部知识源中搜索相关信息。
    • 认证: 请求并检索工具安全访问外部 API 所需的凭证。
  4. 身份和跟踪: 知道当前运行的是哪个智能体 (agent.name),以及唯一标识当前请求-响应周期 (invocation_id) 以进行日志记录和调试。
  5. 工具特定操作: 启用工具内的专门操作,例如请求认证或搜索记忆 (Memory),这些操作需要访问当前交互的详细信息。

保存单个完整的用户请求到最终响应周期(一次调用 (Invocation))所有信息的核心部分是 InvocationContext。但是,你通常不会直接创建或管理此对象。ADK 框架在调用开始时创建它(例如,通过 runner.run_async),并将相关上下文信息隐式传递给你的智能体代码、回调和工具。

# 框架如何提供上下文
from google.adk import Runner

# 1. 使用智能体和服务初始化运行器 (Runner)
runner = Runner(
    app_name="my_app",
    agent=my_root_agent,
    session_service=my_session_service,
    artifact_service=my_artifact_service,
)

# 2. 使用用户输入调用 run_async
# 注意:run_async 是一个异步生成器,会产出事件 (Events)。
# 框架内部会创建一个 InvocationContext 并将其隐式
# 传递给你的智能体代码、回调和工具。
async for event in runner.run_async(
    user_id="user123",
    session_id="session456",
    new_message=user_message
):
    print(event.stringify_content())

# 作为开发者,你通过方法参数中提供的上下文对象进行工作。
/* 概念伪代码:框架如何提供上下文(内部逻辑) */

const runner = new InMemoryRunner({ agent: myRootAgent });
const session = await runner.sessionService.createSession({ ... });
const userMessage = createUserContent(...);

// --- 在 runner.runAsync(...) 内部 ---
// 1. 框架为本次运行创建主上下文
const invocationContext = new InvocationContext({
  invocationId: "unique-id-for-this-run",
  session: session,
  userContent: userMessage,
  agent: myRootAgent, // 起始智能体
  sessionService: runner.sessionService,
  pluginManager: runner.pluginManager,
  // ... 其他必要字段 ...
});
//
// 2. 框架调用智能体的 run 方法,隐式传递上下文
await myRootAgent.runAsync(invocationContext);
//   --- 结束内部逻辑 ---

// 作为开发者,你使用方法参数中提供的上下文对象。
/* 概念伪代码:框架如何提供上下文(内部逻辑) */
sessionService := session.InMemoryService()

r, err := runner.New(runner.Config{
    AppName:        appName,
    Agent:          myAgent,
    SessionService: sessionService,
})
if err != nil {
    log.Fatalf("Failed to create runner: %v", err)
}

s, err := sessionService.Create(ctx, &session.CreateRequest{
    AppName: appName,
    UserID:  userID,
})
if err != nil {
    log.Fatalf("FATAL: Failed to create session: %v", err)
}

scanner := bufio.NewScanner(os.Stdin)
for {
    fmt.Print("\nYou > ")
    if !scanner.Scan() {
        break
    }
    userInput := scanner.Text()
    if strings.EqualFold(userInput, "quit") {
        break
    }
    userMsg := genai.NewContentFromText(userInput, genai.RoleUser)
    events := r.Run(ctx, s.Session.UserID(), s.Session.ID(), userMsg, agent.RunConfig{
        StreamingMode: agent.StreamingModeNone,
    })
    fmt.Print("\nAgent > ")
    for event, err := range events {
        if err != nil {
            log.Printf("ERROR during agent execution: %v", err)
            break
        }
        fmt.Print(event.Content.Parts[0].Text)
    }
}
/* 框架如何提供上下文 */
InMemoryRunner runner = new InMemoryRunner(agent);
Session session = runner
    .sessionService()
    .createSession(runner.appName(), USER_ID, initialState, SESSION_ID )
    .blockingGet();

try (Scanner scanner = new Scanner(System.in, StandardCharsets.UTF_8)) {
  while (true) {
    System.out.print("\nYou > ");
    String userInput = scanner.nextLine();
    if ("quit".equalsIgnoreCase(userInput)) {
      break;
    }
    Content userMsg = Content.fromParts(Part.fromText(userInput));
    Flowable<Event> events = runner.runAsync(session.userId(), session.id(), userMsg);
    System.out.print("\nAgent > ");
    events.blockingForEach(event -> System.out.print(event.stringifyContent()));
  }
}

不同类型的上下文

虽然 InvocationContext 作为全面的内部容器,但 ADK 提供了根据特定情况定制的专门上下文对象。这确保了你拥有适合手头任务的正确工具和权限,而无需在任何地方处理完整内部上下文的全部复杂性。以下是你将遇到的不同“风格”:

  1. InvocationContext {: #invocationcontext }

    • 使用场所: 在智能体的核心实现方法(_run_async_impl_run_live_impl)中直接接收为 ctx 参数。
    • 目的: 提供对当前调用整个状态的访问。这是最全面的上下文对象。
    • 关键内容: 直接访问 session(包括 stateevents)、当前 agent 实例、invocation_id、初始 user_content、对已配置服务的引用(artifact_servicememory_servicesession_service),以及与实时/流式模式相关的字段。
    • 使用场景: 主要在智能体的核心逻辑需要直接访问整体会话或服务时使用,尽管状态和制品 (Artifacts) 交互通常委托给使用自己上下文的回调/工具。也用于控制调用本身(例如,设置 ctx.end_invocation = True)。
    # 接收 InvocationContext 的智能体实现
    from google.adk.agents import BaseAgent
    from google.adk.agents.invocation_context import InvocationContext
    from google.adk.events import Event
    from typing import AsyncGenerator
    
    class MyAgent(BaseAgent):
        async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event, None]:
            # 直接访问示例
            agent_name = ctx.agent.name
            session_id = ctx.session.id
            print(f"智能体 {agent_name} 正在会话 {session_id} 中为调用 {ctx.invocation_id} 运行")
            # ... 使用 ctx 的智能体逻辑 ...
            yield # ... 事件 ...
    
    // 伪代码:智能体实现接收 InvocationContext
    import { BaseAgent, InvocationContext, Event } from '@google/adk';
    
    class MyAgent extends BaseAgent {
      async *runAsyncImpl(ctx: InvocationContext): AsyncGenerator<Event, void, undefined> {
        // 直接访问示例
        const agentName = ctx.agent.name;
        const sessionId = ctx.session.id;
        console.log(`智能体 ${agentName} 正在会话 ${sessionId} 中为调用 ${ctx.invocationId} 运行`);
        // ... 使用 ctx 的智能体逻辑 ...
        yield; // ... 事件 ...
      }
    }
    
    import (
        "google.golang.org/adk/agent"
        "google.golang.org/adk/session"
    )
    
    // Pseudocode: Agent implementation receiving InvocationContext
    type MyAgent struct {
    }
    
    func (a *MyAgent) Run(ctx agent.InvocationContext) iter.Seq2[*session.Event, error] {
        return func(yield func(*session.Event, error) bool) {
            // Direct access example
            agentName := ctx.Agent().Name()
            sessionID := ctx.Session().ID()
            fmt.Printf("Agent %s running in session %s for invocation %s\n", agentName, sessionID, ctx.InvocationID())
            // ... agent logic using ctx ...
            yield(&session.Event{Author: agentName}, nil)
        }
    }
    
    // 示例:接收 InvocationContext 的智能体实现
    import com.google.adk.agents.BaseAgent;
    import com.google.adk.agents.InvocationContext;
    import com.google.adk.events.Event;
    import io.reactivex.rxjava3.core.Flowable;
    
    public class MyAgent extends BaseAgent {
        @Override
        protected Flowable<Event> runAsyncImpl(InvocationContext invocationContext) {
            // 直接访问示例
            String agentName = invocationContext.agent().name();
            String sessionId = invocationContext.session().id();
            String invocationId = invocationContext.invocationId();
            System.out.println("Agent " + agentName + " running in session " + sessionId + " for invocation " + invocationId);
            // ... 使用 invocationContext 的智能体逻辑 ...
            return Flowable.empty();
        }
    }
    
  2. ReadonlyContext {: #readonlycontext }

    • 使用场所: 在只需要对基本信息进行只读访问且不允许修改的场景中提供(例如,InstructionProvider 函数)。它也是其他上下文的基类。
    • 目的: 提供基本上下文详细信息的安全、只读视图。
    • 关键内容: invocation_idagent_name 以及当前 state 的只读视图
    # 示例:接收 ReadonlyContext 的指令提供器
    from google.adk.agents.readonly_context import ReadonlyContext
    
    def my_instruction_provider(context: ReadonlyContext) -> str:
        # 只读访问示例
        # state 属性提供会话状态的只读 MappingProxyType 视图
        user_tier = context.state.get("user_tier", "standard")
        # context.state['new_key'] = 'value' # TypeError: 'mappingproxy' 对象不支持项赋值
        return f"Process the request for a {user_tier} user."
    
    // 伪代码:指令提供器接收 ReadonlyContext
    import { ReadonlyContext } from '@google/adk';
    
    function myInstructionProvider(context: ReadonlyContext): string {
      // 只读访问示例
      // state 对象是只读的
      const userTier = context.state.get('user_tier') ?? 'standard';
      // context.state.set('new_key', 'value'); // 这会失败或报错
      return `处理 ${userTier} 用户的请求。`;
    }
    
    import "google.golang.org/adk/agent"
    
    // Pseudocode: Instruction provider receiving ReadonlyContext
    func myInstructionProvider(ctx agent.ReadonlyContext) (string, error) {
        // Read-only access example
        userTier, err := ctx.ReadonlyState().Get("user_tier")
        if err != nil {
            userTier = "standard" // Default value
        }
        // ctx.ReadonlyState() has no Set method since State() is read-only.
        return fmt.Sprintf("Process the request for a %v user.", userTier), nil
    }
    
    // 示例:接收 ReadonlyContext 的指令提供器
    import com.google.adk.agents.ReadonlyContext;
    
    public String myInstructionProvider(ReadonlyContext context) {
        // 只读访问示例
        // state() 返回一个不可修改的会话状态视图
        String userTier = (String) context.state().getOrDefault("user_tier", "standard");
        // context.state().put("new_key", "value"); // UnsupportedOperationException
        return "Process the request for a " + userTier + " user.";
    }
    
  3. CallbackContext {: #callbackcontext }

    • 使用场景: 在智能体生命周期回调(before_agent_callbackafter_agent_callback)和模型交互回调(before_model_callbackafter_model_callback)中作为 callback_context 参数提供。
    • 目的: 便于在 回调函数内部 检查和修改状态、与制品 (Artifacts) 交互以及访问调用详情。
    • 核心功能(相对于 ReadonlyContext 的增强):
      • 可变 state 属性: 允许读取 和写入 会话状态。在此处所做的更改 (callback_context.state['key'] = value) 会被跟踪,并与回调后框架生成的事件相关联。
      • 制品 (Artifacts) 方法: 提供 load_artifact(filename)save_artifact(filename, part) 方法,用于与配置的 artifact_service 进行交互。
      • 直接访问 user_content

    (注:在 TypeScript 中,CallbackContextToolContext 已统一为单个 Context 类型。)

    # 示例:接收 Context 的回调(CallbackContext 已统一为 Context)
    from google.adk.agents.context import Context
    from google.adk.models import LlmRequest
    from google.genai import types
    from typing import Optional
    
    def my_before_model_cb(context: Context, request: LlmRequest) -> Optional[types.Content]:
        # 读/写状态示例
        call_count = context.state.get("model_calls", 0)
        context.state["model_calls"] = call_count + 1 # 修改状态(跟踪增量)
    
        # (可选)加载制品 (Artifact)
        # config_part = context.load_artifact("model_config.json")
        print(f"正在为调用 {context.invocation_id} 准备模型调用 #{call_count + 1}")
        return None # 允许模型调用继续
    
    // 伪代码:Callback 接收 Context
    import { Context, LlmRequest } from '@google/adk';
    import { Content } from '@google/genai';
    
    function myBeforeModelCb(context: Context, request: LlmRequest): Content | undefined {
      // 读/写 state 示例
      const callCount = (context.state.get('model_calls') as number) || 0;
      context.state.set('model_calls', callCount + 1); // 修改 state
    
      // 可选:加载制品(Artifact)
      // const configPart = await context.loadArtifact('model_config.json');
      console.log(`正在为调用 ${context.invocationId} 准备模型调用 #${callCount + 1}`);
      return undefined; // 允许模型调用继续
    }
    
    import (
        "google.golang.org/adk/agent"
        "google.golang.org/adk/model"
    )
    
    // Pseudocode: Callback receiving CallbackContext
    func myBeforeModelCb(ctx agent.CallbackContext, req *model.LLMRequest) (*model.LLMResponse, error) {
        // Read/Write state example
        callCount, err := ctx.State().Get("model_calls")
        if err != nil {
            callCount = 0 // Default value
        }
        newCount := callCount.(int) + 1
        if err := ctx.State().Set("model_calls", newCount); err != nil {
            return nil, err
        }
    
        // Optionally load an artifact
        // configPart, err := ctx.Artifacts().Load("model_config.json")
        fmt.Printf("Preparing model call #%d for invocation %s\n", newCount, ctx.InvocationID())
        return nil, nil // Allow model call to proceed
    }
    
    // 示例:接收 CallbackContext 的回调
    import com.google.adk.agents.CallbackContext;
    import com.google.adk.models.LlmRequest;
    import com.google.adk.models.LlmResponse;
    import io.reactivex.rxjava3.core.Maybe;
    
    public Maybe<LlmResponse> myBeforeModelCb(CallbackContext callbackContext, LlmRequest request) {
        // 读/写状态示例
        int callCount = (int) callbackContext.state().getOrDefault("model_calls", 0);
        callbackContext.state().put("model_calls", callCount + 1); // 修改状态(跟踪增量)
    
        // 可选:加载制品(Artifact)
        // Maybe<Part> configPart = callbackContext.loadArtifact("model_config.json");
        System.out.println("Preparing model call " + (callCount + 1) + " for invocation " + callbackContext.invocationId());
        return Maybe.empty(); // 允许模型调用继续
    }
    
  4. ToolContext {: #toolcontext }

    • 使用场所: 在支持 FunctionTool 的函数以及工具执行回调(before_tool_callbackafter_tool_callback)中作为 tool_context 提供。
    • 目的: 提供 CallbackContext 的所有功能,外加工具执行必需的专门逻辑,如处理认证、搜索记忆 (Memory) 和列出制品 (Artifacts)。
    • 主要功能(相对于 CallbackContext 的增强):
      • 认证方法: request_credential(auth_config) 用于触发认证流程,以及 get_auth_response(auth_config) 用于检索用户或系统提供的凭证。
      • 制品 (Artifacts) 列表: list_artifacts() 用于发现会话中可用的制品 (Artifacts)。
      • 记忆 (Memory) 搜索: search_memory(query) 用于查询配置的 memory_service
      • function_call_id 属性: 标识触发此工具执行的 LLM 的特定函数调用,对于将认证请求或响应正确关联非常重要。
      • actions 属性: 直接访问此步骤的 EventActions 对象,允许工具发出状态更改、认证请求等信号。
    # 示例:接收 ToolContext 的工具函数
    from google.adk.tools import ToolContext
    from typing import Dict, Any
    
    # 假设此函数被 FunctionTool 包装
    def search_external_api(query: str, tool_context: ToolContext) -> Dict[str, Any]:
        api_key = tool_context.state.get("api_key")
        if not api_key:
            # 定义所需的认证配置
            # auth_config = AuthConfig(...)
            # tool_context.request_credential(auth_config) # 请求凭证
            # 使用 'actions' 属性来标记已发起认证请求
            # tool_context.actions.requested_auth_configs[tool_context.function_call_id] = auth_config
            return {"status": "需要认证"}
    
        # 使用 API key...
        print(f"工具正在为查询 '{query}' 执行,使用 API 密钥。调用 ID:{tool_context.invocation_id}")
    
        # 可选:搜索记忆或列出制品
        # relevant_docs = tool_context.search_memory(f"info related to {query}")
        # available_files = tool_context.list_artifacts()
    
        return {"result": f"Data for {query} fetched."}
    
    // 伪代码:工具函数接收 Context
    import { Context } from '@google/adk';
    
    // __假设此函数被 FunctionTool 包装__
    function searchExternalApi(query: string, context: Context): { [key: string]: string } {
      const apiKey = context.state.get('api_key') as string;
      if (!apiKey) {
         // 定义所需的认证配置
         // const authConfig = new AuthConfig(...);
         // context.requestCredential(authConfig); // 请求凭证
         // 'actions' 属性现在由 requestCredential 自动更新
         return { status: 'Auth Required' };
      }
    
      // 使用 API 密钥...
      console.log(`工具正在为查询 '${query}' 执行,使用 API 密钥。调用 ID:${context.invocationId}`);
    
      // 可选:搜索记忆或列出制品
      // 注意:在 TS 中访问记忆/制品等服务通常是异步的,
      // 因此如果你复用它们,需要将此函数标记为 'async'。
      // context.searchMemory(`与 ${query} 相关的详细信息`).then(...)
      // context.listArtifacts().then(...)
    
      return { result: `已获取 ${query} 的数据。` };
    }
    
    import "google.golang.org/adk/tool"
    
    // Pseudocode: Tool function receiving ToolContext
    type searchExternalAPIArgs struct {
        Query string `json:"query" jsonschema:"The query to search for."`
    }
    
    func searchExternalAPI(tc tool.Context, input searchExternalAPIArgs) (string, error) {
        apiKey, err := tc.State().Get("api_key")
        if err != nil || apiKey == "" {
            // In a real scenario, you would define and request credentials here.
            // This is a conceptual placeholder.
            return "", fmt.Errorf("auth required")
        }
    
        // Use the API key...
        fmt.Printf("Tool executing for query '%s' using API key. Invocation: %s\n", input.Query, tc.InvocationID())
    
        // Optionally search memory or list artifacts
        // relevantDocs, _ := tc.SearchMemory(tc, "info related to %s", input.Query))
        // availableFiles, _ := tc.Artifacts().List()
    
        return fmt.Sprintf("Data for %s fetched.", input.Query), nil
    }
    
    // Example: Tool function receiving ToolContext
    import com.google.adk.tools.ToolContext;
    import java.util.Map;
    
    // Assume this function is wrapped by a FunctionTool
    public Map<String, Object> searchExternalApi(String query, ToolContext toolContext) {
        String apiKey = (String) toolContext.state().getOrDefault("api_key", "");
        if (apiKey.isEmpty()) {
            // Define required auth config
            // authConfig = AuthConfig(...);
            // toolContext.requestCredential(authConfig); // Request credentials
            // Use the 'actions' property to signal the auth request has been made
            return Map.of("status", "Auth Required");
        }
    
        // Use the API key...
        System.out.println("Tool executing for query " + query + " using API key.");
    
        // Optionally list artifacts
        // Single<List<String>> availableFiles = toolContext.listArtifacts();
    
        return Map.of("result", "Data for " + query + " fetched");
    }
    

理解这些不同的上下文对象以及何时使用它们是有效管理状态、访问服务以及控制 ADK 应用程序流程的关键。下一节将详细介绍使用这些上下文可以执行的常见任务。

使用上下文的常见任务

现在你已经了解了不同的上下文对象,让我们专注于在构建智能体和工具时如何使用它们来执行常见任务。

访问信息

你将经常需要读取存储在上下文中的信息。

  • 读取会话状态: 访问之前步骤中保存的数据或用户/应用级别设置。在 state 属性上使用类似字典的方式进行访问。

    # 示例:在工具函数中
    from google.adk.tools import ToolContext
    
    def my_tool(tool_context: ToolContext, **kwargs):
        user_pref = tool_context.state.get("user_display_preference", "default_mode")
        api_endpoint = tool_context.state.get("app:api_endpoint") # 读取应用级状态
    
        if user_pref == "dark_mode":
            # ... 应用深色模式逻辑 ...
            pass
        print(f"使用 API 端点:{api_endpoint}")
        # ... 工具逻辑的其余部分 ...
    
    # 示例:在回调函数中
    from google.adk.agents.context import Context
    
    def my_callback(context: Context, **kwargs):
        last_tool_result = context.state.get("temp:last_api_result") # 读取临时状态
        if last_tool_result:
            print(f"发现上一个工具的临时结果:{last_tool_result}")
        # ... 回调逻辑 ...
    
    // 伪代码:在工具函数中
    import { Context } from '@google/adk';
    
    async function myTool(context: Context) {
      const userPref = context.state.get('user_display_preference', 'default_mode');
      const apiEndpoint = context.state.get('app:api_endpoint'); // 读取应用级状态
    
      if (userPref === 'dark_mode') {
        // ... 应用深色模式逻辑 ...
      }
      console.log(`使用 API 端点:${apiEndpoint}`);
      // ... 工具逻辑的其余部分 ...
    }
    
    // 伪代码:在回调函数中
    import { Context } from '@google/adk';
    
    function myCallback(context: Context) {
      const lastToolResult = context.state.get('temp:last_api_result'); // 读取临时状态
      if (lastToolResult) {
        console.log(`发现上一个工具的临时结果:${lastToolResult}`);
      }
      // ... 回调逻辑 ...
    }
    
    import (
        "google.golang.org/adk/agent"
        "google.golang.org/adk/session"
        "google.golang.org/adk/tool"
        "google.golang.org/genai"
    )
    
    // Pseudocode: In a Tool function
    type toolArgs struct {
        // Define tool-specific arguments here
    }
    
    type toolResults struct {
        // Define tool-specific results here
    }
    
    // Example tool function demonstrating state access
    func myTool(tc tool.Context, input toolArgs) (toolResults, error) {
        userPref, err := tc.State().Get("user_display_preference")
        if err != nil {
            userPref = "default_mode"
        }
        apiEndpoint, _ := tc.State().Get("app:api_endpoint") // Read app-level state
    
        if userPref == "dark_mode" {
            // ... apply dark mode logic ...
        }
        fmt.Printf("Using API endpoint: %v\n", apiEndpoint)
        // ... rest of tool logic ...
        return toolResults{}, nil
    }
    
    
    // Pseudocode: In a Callback function
    func myCallback(ctx agent.CallbackContext) (*genai.Content, error) {
        lastToolResult, err := ctx.State().Get("temp:last_api_result") // Read temporary state
        if err == nil {
            fmt.Printf("Found temporary result from last tool: %v\n", lastToolResult)
        } else {
            fmt.Println("No temporary result found.")
        }
        // ... callback logic ...
        return nil, nil
    }
    
    // 示例:在工具函数中
    import com.google.adk.tools.ToolContext;
    
    public void myTool(ToolContext toolContext) {
        String userPref = (String) toolContext.state().getOrDefault("user_display_preference", "default_mode");
        String apiEndpoint = (String) toolContext.state().get("app:api_endpoint"); // 读取应用级状态
    
        if ("dark_mode".equals(userPref)) {
            // ... 应用深色模式逻辑 ...
        }
        System.out.println("使用 API 端点:" + apiEndpoint);
        // ... 工具逻辑的其余部分 ...
    }
    
    // 示例:在回调函数中
    import com.google.adk.agents.CallbackContext;
    
    public void myCallback(CallbackContext callbackContext) {
        String lastToolResult = (String) callbackContext.state().get("temp:last_api_result"); // 读取临时状态
    
        if (lastToolResult != null && !lastToolResult.isEmpty()) {
            System.out.println("发现上一个工具的临时结果:" + lastToolResult);
        }
        // ... 回调逻辑 ...
    }
    
  • 获取当前标识符: 对于基于当前操作的日志记录或自定义逻辑很有用。

    # 示例:在任何上下文中(以 ToolContext 为例)
    from google.adk.tools import ToolContext
    
    def log_tool_usage(tool_context: ToolContext, **kwargs):
        agent_name = tool_context.agent_name
        inv_id = tool_context.invocation_id
        func_call_id = getattr(tool_context, 'function_call_id', 'N/A') # 特定于 ToolContext
    
        print(f"日志:调用 ID={inv_id}, 智能体={agent_name}, 函数调用 ID={func_call_id} - 工具已执行。")
    
    // 伪代码:在任何上下文中
    import { Context } from '@google/adk';
    
    function logToolUsage(context: Context) {
      const agentName = context.agentName;
      const invId = context.invocationId;
      const functionCallId = context.functionCallId ?? '无'; // 执行工具时可用
    
      console.log(`日志:调用 ID=${invId}, 智能体=${agentName}, 函数调用 ID=${functionCallId} - 工具已执行。`);
    }
    
    import "google.golang.org/adk/tool"
    
    // Pseudocode: In any context (ToolContext shown)
    type logToolUsageArgs struct{}
    type logToolUsageResult struct {
        Status string `json:"status"`
    }
    
    func logToolUsage(tc tool.Context, args logToolUsageArgs) (logToolUsageResult, error) {
        agentName := tc.AgentName()
        invID := tc.InvocationID()
        funcCallID := tc.FunctionCallID()
    
        fmt.Printf("Log: Invocation=%s, Agent=%s, FunctionCallID=%s - Tool Executed.\n", invID, agentName, funcCallID)
        return logToolUsageResult{Status: "Logged successfully"}, nil
    }
    
    // 示例:在任何上下文中(以 ToolContext 为例)
    import com.google.adk.tools.ToolContext;
    
    public void logToolUsage(ToolContext toolContext) {
        String agentName = toolContext.agentName();
        String invId = toolContext.invocationId();
        String functionCallId = toolContext.functionCallId().orElse("N/A"); // 特定于 ToolContext
        System.out.println("日志:调用 ID= " + invId + " 智能体= " + agentName + " 函数调用 ID= " + functionCallId);
    }
    
  • 访问初始用户输入: 参考触发当前调用的初始消息。

    # 示例:在回调中
    from google.adk.agents.context import Context
    
    def check_initial_intent(context: Context, **kwargs):
        initial_text = "无"
        if context.user_content and context.user_content.parts:
            initial_text = context.user_content.parts[0].text or "非文本输入"
    
        print(f"本次调用以用户输入开始:'{initial_text}'")
    
    # 示例:在智能体的 _run_async_impl 中
    # async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event, None]:
    #     if ctx.user_content and ctx.user_content.parts:
    #         initial_text = ctx.user_content.parts[0].text
    #         print(f"智能体逻辑记住的初始查询为:{initial_text}")
    #     ...
    
    // 伪代码:在回调中
    import { Context } from '@google/adk';
    
    function checkInitialIntent(context: Context) {
      let initialText = '无';
      const userContent = context.userContent;
      if (userContent?.parts?.length) {
        initialText = userContent.parts[0].text ?? '非文本输入';
      }
    
      console.log(`本次调用以用户输入开始:'${initialText}'`);
    }
    
    import (
        "google.golang.org/adk/agent"
        "google.golang.org/genai"
    )
    
    // Pseudocode: In a Callback
    func logInitialUserInput(ctx agent.CallbackContext) (*genai.Content, error) {
        userContent := ctx.UserContent()
        if userContent != nil && len(userContent.Parts) > 0 {
            if text := userContent.Parts[0].Text; text != "" {
                fmt.Printf("User's initial input for this turn: '%s'\n", text)
            }
        }
        return nil, nil // No modification
    }
    
    // 示例:在回调中
    import com.google.adk.agents.CallbackContext;
    import com.google.genai.types.Content;
    
    public void checkInitialIntent(CallbackContext callbackContext) {
        String initialText = "N/A";
        if (callbackContext.userContent().isPresent() && callbackContext.userContent().get().parts() != null && !callbackContext.userContent().get().parts().get().isEmpty()) {
            initialText = callbackContext.userContent().get().parts().get().get(0).text().orElse("Non-text input");
            // ...
            System.out.println("This invocation started with user input: " + initialText);
        }
    }
    

管理状态

状态对于记忆(Memory)和数据流至关重要。当你使用 CallbackContextToolContext 修改状态时,框架会自动跟踪并持久化这些更改。

  • 在工具之间传递数据

    # 示例:工具 1 - 获取用户 ID
    from google.adk.tools import ToolContext
    import uuid
    
    def get_user_profile(tool_context: ToolContext) -> dict:
        user_id = str(uuid.uuid4()) # 模拟获取 ID
        # 将 ID 保存到状态中供下一个工具使用
        tool_context.state["temp:current_user_id"] = user_id
        return {"profile_status": "ID generated"}
    
    # 示例:工具 2 - 从状态中使用用户 ID
    def get_user_orders(tool_context: ToolContext) -> dict:
        user_id = tool_context.state.get("temp:current_user_id")
        if not user_id:
            return {"error": "User ID not found in state"}
    
        print(f"正在获取用户 ID 的订单:{user_id}")
        # ... 使用 user_id 获取订单的逻辑 ...
        return {"orders": ["order123", "order456"]}
    
    // 伪代码:工具 1 - 获取用户 ID
    import { Context } from '@google/adk';
    import { v4 as uuidv4 } from 'uuid';
    
    function getUserProfile(context: Context): Record<string, string> {
      const userId = uuidv4(); // 模拟获取 ID
      // 将 ID 保存到状态中供下一个工具使用
      context.state.set('temp:current_user_id', userId);
      return { profile_status: 'ID generated' };
    }
    
    // 伪代码:工具 2 - 从状态中使用用户 ID
    function getUserOrders(context: Context): Record<string, string | string[]> {
      const userId = context.state.get('temp:current_user_id');
      if (!userId) {
        return { error: '状态中未找到用户 ID' };
      }
    
      console.log(`正在获取用户 ID 的订单:${userId}`);
      // ... 使用 user_id 获取订单的逻辑 ...
      return { orders: ['order123', 'order456'] };
    }
    
    import "google.golang.org/adk/tool"
    
    // Pseudocode: Tool 1 - Fetches user ID
    type GetUserProfileArgs struct {
    }
    
    func getUserProfile(tc tool.Context, input GetUserProfileArgs) (string, error) {
        // A random user ID for demonstration purposes
        userID := "random_user_456"
    
        // Save the ID to state for the next tool
        if err := tc.State().Set("temp:current_user_id", userID); err != nil {
            return "", fmt.Errorf("failed to set user ID in state: %w", err)
        }
        return "ID generated", nil
    }
    
    
    // Pseudocode: Tool 2 - Uses user ID from state
    type GetUserOrdersArgs struct {
    }
    
    type getUserOrdersResult struct {
        Orders []string `json:"orders"`
    }
    
    func getUserOrders(tc tool.Context, input GetUserOrdersArgs) (*getUserOrdersResult, error) {
        userID, err := tc.State().Get("temp:current_user_id")
        if err != nil {
            return &getUserOrdersResult{}, fmt.Errorf("user ID not found in state")
        }
    
        fmt.Printf("Fetching orders for user ID: %v\n", userID)
        // ... logic to fetch orders using user_id ...
        return &getUserOrdersResult{Orders: []string{"order123", "order456"}}, nil
    }
    
    // 示例:工具 1 - 获取用户 ID
    import com.google.adk.tools.ToolContext;
    import java.util.Map;
    import java.util.UUID;
    
    public Map<String, String> getUserProfile(ToolContext toolContext) {
        String userId = UUID.randomUUID().toString();
        // 将 ID 保存到状态中供下一个工具使用
        toolContext.state().put("temp:current_user_id", userId);
        return Map.of("profile_status", "ID generated");
    }
    
    // 示例:工具 2 - 从状态中使用用户 ID
    public Map<String, String> getUserOrders(ToolContext toolContext) {
        String userId = (String) toolContext.state().get("temp:current_user_id");
        if (userId == null || userId.isEmpty()) {
            return Map.of("error", "User ID not found in state");
        }
        System.out.println("正在获取用户 ID 的订单:" + userId);
        // ... 使用 user_id 获取订单的逻辑 ...
        return Map.of("orders", "order123");
    }
    
  • 更新用户偏好:

    # 示例:工具或回调识别偏好
    from google.adk.tools import ToolContext # 或 Context
    
    def set_user_preference(tool_context: ToolContext, preference: str, value: str) -> dict:
        # 如果使用持久化 SessionService,请使用 'user:' 前缀表示用户级状态
        state_key = f"user:{preference}"
        tool_context.state[state_key] = value
        print(f"已将用户偏好 '{preference}' 设置为 '{value}'")
        return {"status": "Preference updated"}
    
    // 伪代码:工具或回调识别偏好
    import { Context } from '@google/adk';
    
    function setUserPreference(context: Context, preference: string, value: string): Record<string, string> {
      // 使用 'user:' 前缀表示用户级状态(如果使用持久化 SessionService)
      const stateKey = `user:${preference}`;
      context.state.set(stateKey, value);
      console.log(`已将用户偏好 '${preference}' 设置为 '${value}'`);
      return { status: 'Preference updated' };
    }
    
    import "google.golang.org/adk/tool"
    
    // Pseudocode: Tool or Callback identifies a preference
    type setUserPreferenceArgs struct {
        Preference string `json:"preference" jsonschema:"The name of the preference to set."`
        Value      string `json:"value" jsonschema:"The value to set for the preference."`
    }
    
    type setUserPreferenceResult struct {
        Status string `json:"status"`
    }
    
    func setUserPreference(tc tool.Context, args setUserPreferenceArgs) (setUserPreferenceResult, error) {
        // Use 'user:' prefix for user-level state (if using a persistent SessionService)
        stateKey := fmt.Sprintf("user:%s", args.Preference)
        if err := tc.State().Set(stateKey, args.Value); err != nil {
            return setUserPreferenceResult{}, fmt.Errorf("failed to set preference in state: %w", err)
        }
        fmt.Printf("Set user preference '%s' to '%s'\n", args.Preference, args.Value)
        return setUserPreferenceResult{Status: "Preference updated"}, nil
    }
    
    // 示例:工具或回调识别偏好
    import com.google.adk.tools.ToolContext; // 或 CallbackContext
    
    public Map<String, String> setUserPreference(ToolContext toolContext, String preference, String value) {
        // 如果使用持久化 SessionService,请使用 'user:' 前缀表示用户级状态
        String stateKey = "user:" + preference;
        toolContext.state().put(stateKey, value);
        System.out.println("Set user preference '" + preference + "' to '" + value + "'");
        return Map.of("status", "Preference updated");
    }
    
  • 状态前缀: 虽然基本状态是会话特定的,但像app:user:这样的前缀可以与持久SessionService实现(如DatabaseSessionServiceVertexAiSessionService)一起使用,以指示更广泛的范围(应用范围或跨会话的用户范围)。temp:可以表示仅在当前调用中相关的数据。

使用制品(Artifacts)

使用制品(Artifacts)处理与会话相关的文件或大型数据块。常见用例:处理上传的文档。

  • 文档摘要生成器示例流程:

    1. 引入引用(例如,在设置工具或回调中): 将文档的路径或 URI保存为制品(Artifacts),而不是整个内容。

      # Example: In a callback or initial tool
      from google.adk.agents.context import Context # Or ToolContext
      from google.genai import types
      
      def save_document_reference(context: Context, file_path: str) -> None:
          # Assume file_path is something like "gs://my-bucket/docs/report.pdf" or "/local/path/to/report.pdf"
          try:
              # Create a Part containing the path/URI text
              artifact_part = types.Part.from_text(file_path)
              version = context.save_artifact("document_to_summarize.txt", artifact_part)
              print(f"Saved document reference '{file_path}' as artifact version {version}")
              # 如果其他工具需要,将文件名存储在状态中
              context.state["temp:doc_artifact_name"] = "document_to_summarize.txt"
          except ValueError as e:
              print(f"Error saving artifact: {e}") # 例如,制品服务未配置
          except Exception as e:
              print(f"Unexpected error saving artifact reference: {e}")
      
      # Example usage:
      # save_document_reference(context, "gs://my-bucket/docs/report.pdf")
      
      // Pseudocode: In a callback or initial tool
      import { Context } from '@google/adk';
      import type { Part } from '@google/genai';
      
      async function saveDocumentReference(context: Context, filePath: string) {
        // Assume filePath is something like "gs://my-bucket/docs/report.pdf" or "/local/path/to/report.pdf"
        try {
          // Create a Part containing the path/URI text
          const artifactPart: Part = { text: filePath };
          const version = await context.saveArtifact('document_to_summarize.txt', artifactPart);
          console.log(`Saved document reference '${filePath}' as artifact version ${version}`);
          // Store the filename in state if needed by other tools
          context.state.set('temp:doc_artifact_name', 'document_to_summarize.txt');
        } catch (e) {
          console.error(`Unexpected error saving artifact reference: ${e}`);
        }
      }
      
      // Example usage:
      // saveDocumentReference(context, "gs://my-bucket/docs/report.pdf");
      
      import (
          "google.golang.org/adk/tool"
          "google.golang.org/genai"
      )
      
      // Adapt the saveDocumentReference callback into a tool for this example.
      type saveDocRefArgs struct {
          FilePath string `json:"file_path" jsonschema:"The path to the file to save."`
      }
      
      type saveDocRefResult struct {
          Status string `json:"status"`
      }
      
      func saveDocRef(tc tool.Context, args saveDocRefArgs) (saveDocRefResult, error) {
          artifactPart := genai.NewPartFromText(args.FilePath)
          _, err := tc.Artifacts().Save(tc, "document_to_summarize.txt", artifactPart)
          if err != nil {
              return saveDocRefResult{}, err
          }
          fmt.Printf("Saved document reference '%s' as artifact\n", args.FilePath)
          if err := tc.State().Set("temp:doc_artifact_name", "document_to_summarize.txt"); err != nil {
              return saveDocRefResult{}, fmt.Errorf("failed to set artifact name in state")
          }
          return saveDocRefResult{"Reference saved"}, nil
      }
      
      // Example: In a callback or initial tool
      import com.google.adk.agents.CallbackContext;
      import com.google.genai.types.Content;
      import com.google.genai.types.Part;
      import java.util.Optional;
      
      public void saveDocumentReference(CallbackContext context, String filePath) {
          // Assume file_path is something like "gs://my-bucket/docs/report.pdf" or "/local/path/to/report.pdf"
          try {
              // Create a Part containing the path/URI text
              Part artifactPart = Part.fromText(filePath);
              Optional<Integer> version = context.saveArtifact("document_to_summarize.txt", artifactPart);
              System.out.println("Saved document reference" + filePath + " as artifact version " + version.orElse(-1));
              // Store the filename in state if needed by other tools
              context.state().put("temp:doc_artifact_name", "document_to_summarize.txt");
          } catch (Exception e) {
              System.out.println("Unexpected error saving artifact reference: " + e);
          }
      }
      
      // 使用示例:
      // saveDocumentReference(context, "gs://my-bucket/docs/report.pdf")
      
    2. 摘要工具: 加载制品(Artifacts)以获取路径/URI,使用适当的库读取实际文档内容,总结,并返回结果。

      # Example: In the Summarizer tool function
      from google.adk.tools import ToolContext
      from google.genai import types
      # 假设像 google.cloud.storage 或内置 open 等库可用
      # 假设存在 'summarize_text' 函数
      # from my_summarizer_lib import summarize_text
      
      def summarize_document_tool(tool_context: ToolContext) -> dict:
          artifact_name = tool_context.state.get("temp:doc_artifact_name")
          if not artifact_name:
              return {"error": "Document artifact name not found in state."}
      
          try:
              # 1. 加载包含路径/URI 的制品部分
              artifact_part = tool_context.load_artifact(artifact_name)
              if not artifact_part or not artifact_part.text:
                  return {"error": f"Could not load artifact or artifact has no text path: {artifact_name}"}
      
              file_path = artifact_part.text
              print(f"Loaded document reference: {file_path}")
      
              # 2. 读取实际文档内容(在 ADK 上下文之外)
              document_content = ""
              if file_path.startswith("gs://"):
                  # Example: Use GCS client library to download/read
                  pass # Replace with actual GCS reading logic
              elif file_path.startswith("/"):
                   # 示例:使用本地文件系统
                   with open(file_path, 'r', encoding='utf-8') as f:
                       document_content = f.read()
              else:
                  return {"error": f"Unsupported file path scheme: {file_path}"}
      
              # 3. 总结内容
              if not document_content:
                   return {"error": "Failed to read document content."}
      
              # summary = summarize_text(document_content) # 调用你的总结逻辑
              summary = f"Summary of content from {file_path}" # 占位符
      
              return {"summary": summary}
      
          except ValueError as e:
               return {"error": f"Artifact service error: {e}"}
          except FileNotFoundError:
               return {"error": f"Local file not found: {file_path}"}
      
      // 伪代码:在摘要生成器工具函数中
      import { Context } from '@google/adk';
      
      async function summarizeDocumentTool(context: Context): Promise<Record<string, string>> {
        const artifactName = context.state.get('temp:doc_artifact_name') as string;
        if (!artifactName) {
          return { error: '状态中未找到文档制品名称。' };
        }
      
        try {
          // 1. 加载包含路径/URI 的制品部分
          const artifactPart = await context.loadArtifact(artifactName);
          if (!artifactPart?.text) {
            return { error: `无法加载制品或制品没有文本路径:${artifactName}` };
          }
      
          const filePath = artifactPart.text;
          console.log(`已加载文档引用:${filePath}`);
      
          // 2. 读取实际文档内容(在 ADK 上下文之外)
          let documentContent = '';
          if (filePath.startsWith('gs://')) {
            // 示例:使用 GCS 客户端库进行下载/读取
            // const storage = new Storage();
            // const bucket = storage.bucket('my-bucket');
            // const file = bucket.file(filePath.replace('gs://my-bucket/', ''));
            // const [contents] = await file.download();
            // documentContent = contents.toString();
          } else if (filePath.startsWith('/')) {
            // 示例:使用本地文件系统
            // import { readFile } from 'fs/promises';
            // documentContent = await readFile(filePath, 'utf8');
          } else {
            return { error: `不支持的文件路径方案: ${filePath}` };
          }
      
          // 3. 总结内容
          if (!documentContent) {
             return { error: '读取文档内容失败。' };
          }
      
          // const summary = summarizeText(documentContent); // 调用你的总结逻辑
          const summary = `来自 ${filePath} 的内容摘要`; // 占位符
      
          return { summary };
      
        } catch (e) {
           return { error: `处理制品时出错: ${e}` };
        }
      }
      
      import "google.golang.org/adk/tool"
      
      // Pseudocode: In the Summarizer tool function
      type summarizeDocumentArgs struct{}
      
      type summarizeDocumentResult struct {
          Summary string `json:"summary"`
      }
      
      func summarizeDocumentTool(tc tool.Context, input summarizeDocumentArgs) (summarizeDocumentResult, error) {
          artifactName, err := tc.State().Get("temp:doc_artifact_name")
          if err != nil {
              return summarizeDocumentResult{}, fmt.Errorf("No document artifact name found in state")
          }
      
          // 1. Load the artifact part containing the path/URI
          artifactPart, err := tc.Artifacts().Load(tc, artifactName.(string))
          if err != nil {
              return summarizeDocumentResult{}, err
          }
      
          if artifactPart.Part.Text == "" {
              return summarizeDocumentResult{}, fmt.Errorf("Could not load artifact or artifact has no text path.")
          }
          filePath := artifactPart.Part.Text
          fmt.Printf("Loaded document reference: %s\n", filePath)
      
          // 2. Read the actual document content (outside ADK context)
          // In a real implementation, you would use a GCS client or local file reader.
          documentContent := "This is the fake content of the document at " + filePath
          _ = documentContent // Avoid unused variable error.
      
          // 3. Summarize the content
          summary := "Summary of content from " + filePath // Placeholder
      
          return summarizeDocumentResult{Summary: summary}, nil
      }
      
      // 示例:在摘要生成器工具函数中
      import com.google.adk.tools.ToolContext;
      import com.google.genai.types.Content;
      import com.google.genai.types.Part;
      import java.util.Map;
      import java.util.Optional;
      import java.io.FileNotFoundException;
      
      public Map<String, String> summarizeDocumentTool(ToolContext toolContext) {
          String artifactName = (String) toolContext.state().get("temp:doc_artifact_name");
          if (artifactName == null || artifactName.isEmpty()) {
              return Map.of("error", "状态中未找到文档制品名称。");
          }
          try {
              // 1. 加载包含路径/URI 的制品部分
              Optional<Part> artifactPart = toolContext.loadArtifact(artifactName);
              if (!artifactPart.isPresent() || !artifactPart.get().text().isPresent() || artifactPart.get().text().get().isEmpty()) {
                  return Map.of("error", "无法加载制品或制品没有文本路径: " + artifactName);
              }
              String filePath = artifactPart.get().text().get();
              System.out.println("已加载文档引用: " + filePath);
      
              // 2. 读取实际文档内容(在 ADK 上下文之外)
              String documentContent = "";
              if (filePath.startsWith("gs://")) {
                  // 示例:使用 GCS 客户端库进行下载/读取
              } else if (filePath.startsWith("/")) {
                  // 示例:使用本地文件系统
              } else {
                  return Map.of("error", "不支持的文件路径方案: " + filePath);
              }
      
              // 3. 总结内容
              if (documentContent.isEmpty()) {
                  return Map.of("error", "读取文档内容失败。");
              }
      
              // summary = summarizeText(documentContent) // 调用你的总结逻辑
              String summary = "来自 " + filePath + " 的内容摘要"; // 占位符
      
              return Map.of("summary", summary);
          } catch (IllegalArgumentException e) {
              return Map.of("error", "制品服务错误 " + e);
          } catch (Exception e) {
              return Map.of("error", "读取文档时出错 " + e);
          }
      }
      
  • 列出制品(Artifacts): 发现哪些文件可用。

    # 示例:在工具函数中
    from google.adk.tools import ToolContext
    
    def check_available_docs(tool_context: ToolContext) -> dict:
        try:
            artifact_keys = tool_context.list_artifacts()
            print(f"可用制品: {artifact_keys}")
            return {"available_docs": artifact_keys}
        except ValueError as e:
            return {"error": f"制品服务错误: {e}"}
    
    // 伪代码:在工具函数中
    import { Context } from '@google/adk';
    
    async function checkAvailableDocs(context: Context): Promise<Record<string, string[] | string>> {
      try {
        const artifactKeys = await context.listArtifacts();
        console.log(`可用制品: ${artifactKeys}`);
        return { available_docs: artifactKeys };
      } catch (e) {
        return { error: `制品服务错误: ${e}` };
      }
    }
    
    import "google.golang.org/adk/tool"
    
    // Pseudocode: In a tool function
    type checkAvailableDocsArgs struct{}
    
    type checkAvailableDocsResult struct {
        AvailableDocs []string `json:"available_docs"`
    }
    
    func checkAvailableDocs(tc tool.Context, args checkAvailableDocsArgs) (checkAvailableDocsResult, error) {
        artifactKeys, err := tc.Artifacts().List(tc)
        if err != nil {
            return checkAvailableDocsResult{}, err
        }
        fmt.Printf("Available artifacts: %v\n", artifactKeys)
        return checkAvailableDocsResult{AvailableDocs: artifactKeys.FileNames}, nil
    }
    
    // 示例:在工具函数中
    import com.google.adk.tools.ToolContext;
    import io.reactivex.rxjava3.core.Single;
    import java.util.List;
    import java.util.Map;
    
    public Map<String, Object> checkAvailableDocs(ToolContext toolContext) {
        try {
            Single<List<String>> artifactKeys = toolContext.listArtifacts();
            System.out.println("可用制品: " + artifactKeys.blockingGet().toString());
            return Map.of("availableDocs", artifactKeys.blockingGet());
        } catch (IllegalArgumentException e) {
            return Map.of("error", "制品服务错误: " + e);
        }
    }
    

处理工具认证

Supported in ADKPython v0.1.0TypeScript v0.2.0Java v0.2.0

安全管理工具所需的 API 密钥或其他凭证。

# 示例:需要认证的工具
from google.adk.tools import ToolContext
from google.adk.auth import AuthConfig # 假设已定义适当的 AuthConfig

# 定义所需的认证配置(例如 OAuth、API 密钥)
MY_API_AUTH_CONFIG = AuthConfig(...)
AUTH_STATE_KEY = "user:my_api_credential" # 存储检索到的凭证的键

def call_secure_api(tool_context: ToolContext, request_data: str) -> dict:
    # 1. 检查状态中是否已存在凭证
    credential = tool_context.state.get(AUTH_STATE_KEY)

    if not credential:
        # 2. 如果不存在,则请求凭证
        print("未找到凭证,正在请求...")
        try:
            tool_context.request_credential(MY_API_AUTH_CONFIG)
            # 框架处理生成事件。工具执行在本轮停止。
            return {"status": "需要认证,请提供凭证。"}
        except ValueError as e:
            return {"error": f"认证错误: {e}"} # 例如:function_call_id 缺失
        except Exception as e:
            return {"error": f"请求凭证失败: {e}"}

    # 3. 如果凭证存在(可能来自请求后的上一个回合)
    #    或者如果在外部完成认证流程后进行后续调用
    try:
        # 可选:如果需要,重新验证/检索,或直接使用
        # 如果外部流程刚刚完成,这可能会检索凭证
        auth_credential_obj = tool_context.get_auth_response(MY_API_AUTH_CONFIG)
        api_key = auth_credential_obj.api_key # 或 access_token 等。

        # 将其存回状态,供会话内的将来调用使用
        tool_context.state[AUTH_STATE_KEY] = auth_credential_obj.model_dump() # 持久化检索到的凭证

        print(f"正在使用检索到的凭证通过数据调用 API: {request_data}")
        # ... 使用 api_key 进行实际的 API 调用 ...
        api_result = f"API result for {request_data}"

        return {"result": api_result}
    except Exception as e:
        # 处理检索/使用凭证时的错误
        print(f"使用凭证时出错: {e}")
        # 如果凭证无效,可以选择清除状态键
        # tool_context.state[AUTH_STATE_KEY] = None
        return {"error": "使用凭证失败"}
// 伪代码:需要认证的工具
import { Context } from '@google/adk'; // AuthConfig 来自 ADK 或自定义

// 定义一个本地 AuthConfig 接口,因为 ADK 未公开导出它
interface AuthConfig {
  credentialKey: string;
  authScheme: { type: string }; // 示例的最小化表示
}

// 定义所需的认证配置(例如 OAuth、API 密钥)
const MY_API_AUTH_CONFIG: AuthConfig = {
  credentialKey: 'my-api-key', // 示例密钥
  authScheme: { type: 'api-key' }, // 示例方案类型
};
const AUTH_STATE_KEY = 'user:my_api_credential'; // 存储检索到的凭证的键

async function callSecureApi(context: Context, requestData: string): Promise<Record<string, string>> {
  // 1. 检查状态中是否已存在凭证
  const credential = context.state.get(AUTH_STATE_KEY);

  if (!credential) {
    // 2. 如果不存在,则请求凭证
    console.log('未找到凭证,正在请求...');
    try {
      context.requestCredential(MY_API_AUTH_CONFIG);
      // 框架处理生成的事件。工具执行在本轮停止。
      return { status: '需要认证,请提供凭证。' };
    } catch (e) {
      return { error: `认证或凭证请求错误: ${e}` };
    }
  }

  // 3. 如果凭证已存在(可能来自请求后的上一个轮次)
  //    或者是在外部完成认证流程后进行的后续调用
  try {
    // (可选)如果需要,重新验证/检索,或直接使用
    // 如果外部流程刚刚完成,这可能会检索凭证
    const authCredentialObj = context.getAuthResponse(MY_API_AUTH_CONFIG);
    const apiKey = authCredentialObj?.apiKey; // 或 accessToken 等

    // 将其存回状态,供会话内的将来调用使用
    // 注意:在严格的 TypeScript 中,可能需要对 authCredentialObj 进行转换或序列化
    context.state.set(AUTH_STATE_KEY, JSON.stringify(authCredentialObj));

    console.log(`正在使用检索到的凭证调用 API,数据为: ${requestData}`);
    // ... 使用 apiKey 进行实际的 API 调用 ...
    const apiResult = `${requestData} 的 API 结果`;

    return { result: apiResult };
  } catch (e) {
    // 处理检索/使用凭证时的错误
    console.error(`使用凭证时出错: ${e}`);
    // 如果凭证无效,可以选择清除状态键
    // toolContext.state.set(AUTH_STATE_KEY, null);
    return { error: '使用凭证失败' };
  }
}
// 示例:需要认证的工具
import com.google.adk.tools.ToolContext;
import java.util.Map;

// 注意:AuthConfig、requestCredential 和 getAuthResponse 尚未 
// 在 Java ADK 公共 API 中完全实现。
// 本示例依赖于将会话状态中的外部认证信息进行填充。

public class SecureApiTool {
  private static final String AUTH_STATE_KEY = "user:my_api_credential";

  public Map<String, String> callSecureApi(ToolContext context, String requestData) {
    // 1. 检查状态中是否已存在凭证
    Object credential = context.state().get(AUTH_STATE_KEY);

    if (credential == null) {
      // 2. 如果不存在,则请求凭证
      System.out.println("未找到凭证,正在请求...");
      try {
        // context.requestCredential(MY_API_AUTH_CONFIG); # Java ADK 尚未实现
        // 框架处理生成的事件。工具执行在本轮停止。
        return Map.of("status", "需要认证,请提供凭证。");
      } catch (Exception e) {
        return Map.of("error", "认证或凭证请求错误: " + e.getMessage());
      }
    }

    // 3. 如果凭证已存在(可能来自请求后的上一个轮次)
    //    或者是在外部完成认证流程后进行的后续调用
    try {
      // (可选)如果需要,重新验证/检索,或直接使用
      // String apiKey = context.getAuthResponse(MY_API_AUTH_CONFIG).getApiKey();
      String apiKey = credential.toString(); // 示例简化逻辑

      // 将其存回状态,供会话内的将来调用使用
      context.state().put(AUTH_STATE_KEY, apiKey);

      System.out.println("正在使用检索到的凭证调用 API,数据为: " + requestData);
      // ... 使用 apiKey 进行实际的 API 调用 ...
      String apiResult = "API result for " + requestData;

      return Map.of("result", apiResult);
    } catch (Exception e) {
      // 处理检索/使用凭证时的错误
      System.err.println("使用凭证时出错: " + e.getMessage());
      return Map.of("error", "使用凭证失败");
    }
  }
}

请记住:request_credential 会暂停工具并发出认证需求信号。用户/系统提供凭证后,在后续调用中,get_auth_response(或再次检查状态)允许工具继续执行。 框架会隐式使用 tool_context.function_call_id 来关联请求和响应。

利用记忆(Memory)

Supported in ADKPython v0.1.0TypeScript v0.2.0Java v0.2.0

python_only

访问来自过去或外部来源的相关信息。

# Example: Tool using memory search
from google.adk.tools import ToolContext

def find_related_info(tool_context: ToolContext, topic: str) -> dict:
    try:
        search_results = tool_context.search_memory(f"关于 {topic} 的信息")
        if search_results.results:
            print(f"为 '{topic}' 找到了 {len(search_results.results)} 条记忆结果")
            # 处理 search_results.results (类型为 SearchMemoryResponseEntry)
            top_result_text = search_results.results[0].text
            return {"memory_snippet": top_result_text}
        else:
            return {"message": "未找到相关记忆。"}
    except ValueError as e:
        return {"error": f"记忆服务错误:{e}"} # 例如:服务未配置
    except Exception as e:
        return {"error": f"搜索记忆时发生意外错误:{e}"}
// 伪代码:使用记忆搜索的工具
import { Context } from '@google/adk';

async function findRelatedInfo(context: Context, topic: string): Promise<Record<string, string>> {
  try {
    const searchResults = await context.searchMemory(`关于 ${topic} 的信息`);
    if (searchResults.results?.length) {
      console.log(`为 '${topic}' 找到了 ${searchResults.results.length} 条记忆结果`);
      // 处理 searchResults.results
      const topResultText = searchResults.results[0].text;
      return { memory_snippet: topResultText };
    } else {
      return { message: '未找到相关记忆。' };
    }
  } catch (e) {
     return { error: `记忆服务错误:${e}` }; // 例如:服务未配置
  }
}
// Example: Tool using memory search
import com.google.adk.tools.ToolContext;
import com.google.adk.memory.SearchMemoryResponse;
import io.reactivex.rxjava3.core.Single;
import java.util.Map;

public class MemorySearchTool {
  public Single<Map<String, String>> findRelatedInfo(ToolContext context, String topic) {
    return context.searchMemory("Information about " + topic)
        .map(searchResults -> {
          if (searchResults != null && searchResults.results() != null && !searchResults.results().isEmpty()) {
            System.out.println("Found " + searchResults.results().size() + " memory results for '" + topic + "'");
            // Process searchResults.results
            String topResultText = searchResults.results().get(0).text();
            return Map.of("memory_snippet", topResultText);
          } else {
            return Map.of("message", "No relevant memories found.");
          }
        })
        .onErrorReturnItem(Map.of("error", "Memory service error"));
  }
}

Advanced: Direct InvocationContext Usage

Supported in ADKPython v0.1.0TypeScript v0.2.0Java v0.2.0

虽然大多数交互通过CallbackContextToolContext进行,但有时智能体的核心逻辑(_run_async_impl/_run_live_impl)需要直接访问。

# Example: Inside agent's _run_async_impl
from google.adk.agents import BaseAgent
from google.adk.agents.invocation_context import InvocationContext
from google.adk.events import Event
from typing import AsyncGenerator

class MyControllingAgent(BaseAgent):
    async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event, None]:
        # Example: Check if a specific service is available
        if not ctx.memory_service:
            print("Memory service is not available for this invocation.")
            # Potentially change agent behavior

        # Example: Early termination based on some condition
        if ctx.session.state.get("critical_error_flag"):
            print("Critical error detected, ending invocation.")
            ctx.end_invocation = True # Signal framework to stop processing
            yield Event(author=self.name, invocation_id=ctx.invocation_id, content="Stopping due to critical error.")
            return # Stop this agent's execution

        # ... Normal agent processing ...
        yield # ... event ...
// 伪代码:在智能体的 runAsyncImpl 内部
import { BaseAgent, InvocationContext } from '@google/adk';
import type { Event } from '@google/adk';

class MyControllingAgent extends BaseAgent {
  async *runAsyncImpl(ctx: InvocationContext): AsyncGenerator<Event, void, undefined> {
    // 示例:检查特定服务是否可用
    if (!ctx.memoryService) {
      console.log('本次调用记忆服务不可用。');
      // 潜在地改变智能体行为
    }

    // 示例:基于某些条件提前终止
    // 通过 ctx.session.state 直接访问状态,或者如果已包装则通过 ctx.session.state 属性访问
    if ((ctx.session.state as { 'critical_error_flag': boolean })['critical_error_flag']) {
      console.log('检测到严重错误,正在结束调用。');
      ctx.endInvocation = true; // 发送信号给框架以停止处理
      yield {
        author: this.name,
        invocationId: ctx.invocationId,
        content: { parts: [{ text: '由于严重错误而停止。' }] }
      } as Event;
      return; // 停止此智能体的执行
    }

    // ... 正常的智能体处理 ...
    yield; // ... 事件 ...
  }
}
// Example: Inside agent's runAsyncImpl
import com.google.adk.agents.BaseAgent;
import com.google.adk.agents.InvocationContext;
import com.google.adk.events.Event;
import com.google.genai.types.Content;
import com.google.genai.types.Part;
import io.reactivex.rxjava3.core.Flowable;
import java.util.List;

public class MyControllingAgent extends BaseAgent {

  @Override
  protected Flowable<Event> runAsyncImpl(InvocationContext ctx) {
    // Example: Check if a specific service is available
    if (ctx.memoryService() == null) {
      System.out.println("Memory service is not available for this invocation.");
      // Potentially change agent behavior
    }

    // Example: Early termination based on some condition
    Boolean criticalError = (Boolean) ctx.session().state().getOrDefault("critical_error_flag", false);
    if (criticalError != null && criticalError) {
      System.out.println("Critical error detected, ending invocation.");
      ctx.setEndInvocation(true); // Signal framework to stop processing

      Event errorEvent = Event.builder()
          .author(name())
          .invocationId(ctx.invocationId())
          .content(Content.builder().parts(List.of(Part.builder().text("Stopping due to critical error.").build())).build())
          .build();

      return Flowable.just(errorEvent); // Stop this agent's execution
    }

    // ... Normal agent processing ...
    // return Flowable.just(normalEvent);
    return Flowable.empty();
  }
}

Setting ctx.end_invocation = True is a way to gracefully stop the entire request-response cycle from within the agent or its callbacks/tools (via their respective context objects which also have access to modify the underlying InvocationContext's flag).

关键要点和最佳实践

  • 使用合适的上下文: 始终使用提供的最具体的上下文对象(工具/工具回调中的ToolContext,智能体/模型回调中的CallbackContext,适用情况下的ReadonlyContext)。仅在必要时直接在_run_async_impl / _run_live_impl中使用完整的InvocationContextctx)。
  • 用于数据流的状态: context.state是在调用内部共享数据、记住偏好和管理对话记忆(Memory)的主要方式。使用持久存储时,要深思熟虑地使用前缀(app:user:temp:)。
  • 用于文件的制品(Artifacts): 使用context.save_artifactcontext.load_artifact来管理文件引用(如路径或 URI)或更大的数据块。存储引用,按需加载内容。
  • 跟踪更改: 通过上下文方法对状态或制品(Artifacts)所做的修改会自动链接到当前步骤的EventActions并由SessionService处理。
  • 从简单开始: 首先专注于state和基本制品(Artifacts)用法。随着需求变得更加复杂,再探索认证、记忆(Memory)和高级InvocationContext字段(如用于实时流式处理的字段)。

通过理解并有效使用这些上下文对象,你可以使用 ADK 构建更复杂、状态化且功能强大的智能体。