Skip to content

上下文(Context)

Supported in ADKPython v0.1.0TypeScript v0.2.0Go v0.1.0Java v0.1.0

在智能体开发工具包(ADK)中,"上下文"指的是在特定操作期间可供你的智能体及其工具使用的关键信息包。可以将其视为有效处理当前任务或对话轮次所需的必要背景知识和资源。

智能体通常需要的不仅仅是最新的用户消息才能表现良好。上下文至关重要,因为它能够:

  1. 维持状态: 记住对话中多个步骤的详细信息(例如,用户偏好、之前的计算、购物车中的物品)。这主要通过会话状态管理。
  2. 传递数据: 在一个步骤(如 LLM 调用或工具执行)中发现或生成的信息与后续步骤共享。会话状态在这里也是关键。
  3. 访问服务: 与框架功能交互,如:
    • 制品(Artifacts)存储: 保存或加载与会话相关的文件或数据块(如 PDF、图像、配置文件)。
    • 记忆(Memory): 从过去的交互或与用户相关的外部知识源中搜索相关信息。
    • 认证: 请求并检索工具安全访问外部 API 所需的凭证。
  4. 身份和跟踪: 知道当前运行的是哪个智能体(agent.name)以及唯一标识当前请求 - 响应周期(invocation_id)以进行日志记录和调试。
  5. 工具特定操作: 启用工具内的专门操作,例如请求认证或搜索记忆(Memory),这些操作需要访问当前交互的详细信息。

保存单个完整的用户请求到最终响应周期(一次调用)所有信息的核心部分是InvocationContext。但是,你通常不会直接创建或管理此对象。ADK 框架在调用开始时创建它(例如,通过runner.run_async),并将相关上下文信息隐式传递给你的智能体代码、回调和工具。

# 概念伪代码:框架如何提供上下文(内部逻辑)

# runner = Runner(agent=my_root_agent, session_service=..., artifact_service=...)
# user_message = types.Content(...)
# session = session_service.get_session(...) # 或创建新会话

# --- 在 runner.run_async(...) 内部 ---
# 1. 框架为本次运行创建主上下文
# invocation_context = InvocationContext(
#     invocation_id="unique-id-for-this-run",
#     session=session,
#     user_content=user_message,
#     agent=my_root_agent, # 起始智能体
#     session_service=session_service,
#     artifact_service=artifact_service,
#     memory_service=memory_service,
#     # ... 其他必要字段 ...
# )
#
# 2. 框架调用智能体的 run 方法,隐式传递上下文
#    (智能体方法签名会接收它,例如 runAsyncImpl(InvocationContext invocationContext))
# await my_root_agent.run_async(invocation_context)
#   --- 结束内部逻辑 ---
#
# 作为开发者,你只需在方法参数中使用框架提供的上下文对象。
/* 概念伪代码:框架如何提供上下文(内部逻辑) */

const runner = new InMemoryRunner({ agent: myRootAgent });
const session = await runner.sessionService.createSession({ ... });
const userMessage = createUserContent(...);

// --- 在 runner.runAsync(...) 内部 ---
// 1. 框架为本次运行创建主上下文
const invocationContext = new InvocationContext({
  invocationId: "unique-id-for-this-run",
  session: session,
  userContent: userMessage,
  agent: myRootAgent, // 起始智能体
  sessionService: runner.sessionService,
  pluginManager: runner.pluginManager,
  // ... 其他必要字段 ...
});
//
// 2. 框架调用智能体的 run 方法,隐式传递上下文
await myRootAgent.runAsync(invocationContext);
//   --- 结束内部逻辑 ---

// 作为开发者,你使用方法参数中提供的上下文对象。
/* Conceptual Pseudocode: How the framework provides context (Internal Logic) */
sessionService := session.InMemoryService()

r, err := runner.New(runner.Config{
    AppName:        appName,
    Agent:          myAgent,
    SessionService: sessionService,
})
if err != nil {
    log.Fatalf("Failed to create runner: %v", err)
}

s, err := sessionService.Create(ctx, &session.CreateRequest{
    AppName: appName,
    UserID:  userID,
})
if err != nil {
    log.Fatalf("FATAL: Failed to create session: %v", err)
}

scanner := bufio.NewScanner(os.Stdin)
for {
    fmt.Print("\nYou > ")
    if !scanner.Scan() {
        break
    }
    userInput := scanner.Text()
    if strings.EqualFold(userInput, "quit") {
        break
    }
    userMsg := genai.NewContentFromText(userInput, genai.RoleUser)
    events := r.Run(ctx, s.Session.UserID(), s.Session.ID(), userMsg, agent.RunConfig{
        StreamingMode: agent.StreamingModeNone,
    })
    fmt.Print("\nAgent > ")
    for event, err := range events {
        if err != nil {
            log.Printf("ERROR during agent execution: %v", err)
            break
        }
        fmt.Print(event.Content.Parts[0].Text)
    }
}
/* 概念伪代码:框架如何提供上下文(内部逻辑) */
InMemoryRunner runner = new InMemoryRunner(agent);
Session session = runner
    .sessionService()
    .createSession(runner.appName(), USER_ID, initialState, SESSION_ID )
    .blockingGet();

try (Scanner scanner = new Scanner(System.in, StandardCharsets.UTF_8)) {
  while (true) {
    System.out.print("\nYou > ");
  }
  String userInput = scanner.nextLine();
  if ("quit".equalsIgnoreCase(userInput)) {
    break;
  }
  Content userMsg = Content.fromParts(Part.fromText(userInput));
  Flowable<Event> events = runner.runAsync(session.userId(), session.id(), userMsg);
  System.out.print("\nAgent > ");
  events.blockingForEach(event -> System.out.print(event.stringifyContent()));
}

不同类型的上下文

虽然InvocationContext作为全面的内部容器,但 ADK 提供了根据特定情况定制的专门上下文对象。这确保了你拥有适合手头任务的正确工具和权限,而无需在任何地方处理完整内部上下文的全部复杂性。以下是你将遇到的不同"风格":

  1. InvocationContext {: #invocationcontext }

    • 使用场所: 在智能体的核心实现方法(_run_async_impl_run_live_impl)中直接接收为ctx参数。
    • 目的: 提供对当前调用整个状态的访问。这是最全面的上下文对象。
    • 关键内容: 直接访问session(包括stateevents)、当前agent实例、invocation_id、初始user_content、对已配置服务的引用(artifact_servicememory_servicesession_service),以及与实时/流式模式相关的字段。
    • 使用场景: 主要在智能体的核心逻辑需要直接访问整体会话或服务时使用,尽管状态和制品(Artifacts)交互通常委托给使用自己上下文的回调/工具。也用于控制调用本身(例如,设置ctx.end_invocation = True)。
    # 伪代码:Agent 实现接收 InvocationContext
    from google.adk.agents import BaseAgent
    from google.adk.agents.invocation_context import InvocationContext
    from google.adk.events import Event
    from typing import AsyncGenerator
    
    class MyAgent(BaseAgent):
        async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event, None]:
            # 直接访问示例
            agent_name = ctx.agent.name
            session_id = ctx.session.id
            print(f"Agent {agent_name} running in session {session_id} for invocation {ctx.invocation_id}")
            # ... 智能体逻辑使用 ctx ...
            yield # ... 事件 ...
    
    // 伪代码:智能体实现接收 InvocationContext
    import { BaseAgent, InvocationContext, Event } from '@google/adk';
    
    class MyAgent extends BaseAgent {
      async *runAsyncImpl(ctx: InvocationContext): AsyncGenerator<Event, void, undefined> {
        // 直接访问示例
        const agentName = ctx.agent.name;
        const sessionId = ctx.session.id;
        console.log(`智能体 ${agentName} 正在会话 ${sessionId} 中为调用 ${ctx.invocationId} 运行`);
        // ... 使用 ctx 的智能体逻辑 ...
        yield; // ... 事件 ...
      }
    }
    
    import (
        "google.golang.org/adk/agent"
        "google.golang.org/adk/session"
    )
    
    // Pseudocode: Agent implementation receiving InvocationContext
    type MyAgent struct {
    }
    
    func (a *MyAgent) Run(ctx agent.InvocationContext) iter.Seq2[*session.Event, error] {
        return func(yield func(*session.Event, error) bool) {
            // Direct access example
            agentName := ctx.Agent().Name()
            sessionID := ctx.Session().ID()
            fmt.Printf("Agent %s running in session %s for invocation %s\n", agentName, sessionID, ctx.InvocationID())
            // ... agent logic using ctx ...
            yield(&session.Event{Author: agentName}, nil)
        }
    }
    
    // 伪代码:Agent 实现接收 InvocationContext
    import com.google.adk.agents.BaseAgent;
    import com.google.adk.agents.InvocationContext;
    
        LlmAgent root_agent =
            LlmAgent.builder()
                .model("gemini-***")
                .name("sample_agent")
                .description("Answers user questions.")
                .instruction(
                    """
                    provide instruction for the agent here.
                    """
                )
                .tools(sampleTool)
                .outputKey("YOUR_KEY")
                .build();
    
        ConcurrentMap<String, Object> initialState = new ConcurrentHashMap<>();
        initialState.put("YOUR_KEY", "");
    
        InMemoryRunner runner = new InMemoryRunner(agent);
        Session session =
              runner
                  .sessionService()
                  .createSession(runner.appName(), USER_ID, initialState, SESSION_ID )
                  .blockingGet();
    
       try (Scanner scanner = new Scanner(System.in, StandardCharsets.UTF_8)) {
            while (true) {
              System.out.print("\nYou > ");
              String userInput = scanner.nextLine();
    
              if ("quit".equalsIgnoreCase(userInput)) {
                break;
              }
    
              Content userMsg = Content.fromParts(Part.fromText(userInput));
              Flowable<Event> events =
                      runner.runAsync(session.userId(), session.id(), userMsg);
    
              System.out.print("\nAgent > ");
              events.blockingForEach(event ->
                      System.out.print(event.stringifyContent()));
          }
    
        protected Flowable<Event> runAsyncImpl(InvocationContext invocationContext) {
            // 直接访问示例
            String agentName = invocationContext.agent.name
            String sessionId = invocationContext.session.id
            String invocationId = invocationContext.invocationId
            System.out.println("Agent " + agentName + " running in session " + session_id + " for invocation " + invocationId)
            // ... 智能体逻辑使用 ctx ...
        }
    
  2. ReadonlyContext {: #readonlycontext }

    • 使用场所: 在只需要对基本信息进行只读访问且不允许修改的场景中提供(例如,InstructionProvider函数)。它也是其他上下文的基类。
    • 目的: 提供基本上下文详细信息的安全、只读视图。
    • 关键内容: invocation_idagent_name以及当前state的只读视图
    # 伪代码:Instruction provider 接收 ReadonlyContext
    from google.adk.agents.readonly_context import ReadonlyContext
    
    def my_instruction_provider(context: ReadonlyContext) -> str:
        # 只读访问示例
        user_tier = context.state().get("user_tier", "standard") # 可以读取 state
        # context.state['new_key'] = 'value' # 这通常会报错或无效
        return f"处理 {user_tier} 用户的请求。"
    
    // 伪代码:指令提供器接收 ReadonlyContext
    import { ReadonlyContext } from '@google/adk';
    
    function myInstructionProvider(context: ReadonlyContext): string {
      // 只读访问示例
      // state 对象是只读的
      const userTier = context.state.get('user_tier') ?? 'standard';
      // context.state.set('new_key', 'value'); // 这会失败或报错
      return `处理 ${userTier} 用户的请求。`;
    }
    
    import "google.golang.org/adk/agent"
    
    // Pseudocode: Instruction provider receiving ReadonlyContext
    func myInstructionProvider(ctx agent.ReadonlyContext) (string, error) {
        // Read-only access example
        userTier, err := ctx.ReadonlyState().Get("user_tier")
        if err != nil {
            userTier = "standard" // Default value
        }
        // ctx.ReadonlyState() has no Set method since State() is read-only.
        return fmt.Sprintf("Process the request for a %v user.", userTier), nil
    }
    
    // 伪代码:Instruction provider 接收 ReadonlyContext
    import com.google.adk.agents.ReadonlyContext;
    
    public String myInstructionProvider(ReadonlyContext context){
        // 只读访问示例
        String userTier = context.state().get("user_tier", "standard");
        context.state().put('new_key', 'value'); // 这通常会报错
        return "处理 " + userTier + " 用户的请求。";
    }
    
  3. CallbackContext

    • 使用场景: 作为 callback_context 传递给智能体生命周期回调(before_agent_callbackafter_agent_callback)和模型交互回调(before_model_callbackafter_model_callback)。
    • 目的: 便于在 回调函数内部 检查和修改状态、与制品(Artifacts)交互以及访问调用详情。
    • 核心功能(相对于 ReadonlyContext 的增强):
      • 可变 state 属性: 允许读取 和写入 会话状态。在此处所做的更改(callback_context.state['key'] = value)会被跟踪,并与回调后框架生成的事件相关联。
      • 制品(Artifacts)方法: 提供 load_artifact(filename)save_artifact(filename, part) 方法,用于与配置的 artifact_service 进行交互。
      • 直接访问 user_content

    (注:在 TypeScript 中,CallbackContextToolContext 已统一为单个 Context 类型。)

    # 伪代码:Callback 接收 CallbackContext
    from google.adk.agents.callback_context import CallbackContext
    from google.adk.models import LlmRequest
    from google.genai import types
    from typing import Optional
    
    def my_before_model_cb(callback_context: CallbackContext, request: LlmRequest) -> Optional[types.Content]:
        # 读/写 state 示例
        call_count = callback_context.state.get("model_calls", 0)
        callback_context.state["model_calls"] = call_count + 1 # 修改 state
    
        # 可选:加载制品(Artifact)
        # config_part = callback_context.load_artifact("model_config.json")
        print(f"正在为调用 {callback_context.invocation_id} 准备模型调用 #{call_count + 1}")
        return None # 允许模型调用继续
    
    // 伪代码:Callback 接收 Context
    import { Context, LlmRequest } from '@google/adk';
    import { Content } from '@google/genai';
    
    function myBeforeModelCb(context: Context, request: LlmRequest): Content | undefined {
      // 读/写 state 示例
      const callCount = (context.state.get('model_calls') as number) || 0;
      context.state.set('model_calls', callCount + 1); // 修改 state
    
      // 可选:加载制品(Artifact)
      // const configPart = await context.loadArtifact('model_config.json');
      console.log(`正在为调用 ${context.invocationId} 准备模型调用 #${callCount + 1}`);
      return undefined; // 允许模型调用继续
    }
    
    import (
        "google.golang.org/adk/agent"
        "google.golang.org/adk/model"
    )
    
    // Pseudocode: Callback receiving CallbackContext
    func myBeforeModelCb(ctx agent.CallbackContext, req *model.LLMRequest) (*model.LLMResponse, error) {
        // Read/Write state example
        callCount, err := ctx.State().Get("model_calls")
        if err != nil {
            callCount = 0 // Default value
        }
        newCount := callCount.(int) + 1
        if err := ctx.State().Set("model_calls", newCount); err != nil {
            return nil, err
        }
    
        // Optionally load an artifact
        // configPart, err := ctx.Artifacts().Load("model_config.json")
        fmt.Printf("Preparing model call #%d for invocation %s\n", newCount, ctx.InvocationID())
        return nil, nil // Allow model call to proceed
    }
    
    // 伪代码:Callback 接收 CallbackContext
    import com.google.adk.agents.CallbackContext;
    import com.google.adk.models.LlmRequest;
    import com.google.genai.types.Content;
    import java.util.Optional;
    
    public Maybe<LlmResponse> myBeforeModelCb(CallbackContext callbackContext, LlmRequest request){
        // 读/写 state 示例
        callCount = callbackContext.state().get("model_calls", 0)
        callbackContext.state().put("model_calls") = callCount + 1 # 修改状态
    
        // 可选:加载制品(Artifact)
        // Maybe<Part> configPart = callbackContext.loadArtifact("model_config.json");
        System.out.println("正在准备模型调用 " + (callCount + 1));
        return Maybe.empty(); // 允许模型调用继续
    }
    
  4. ToolContext {: #toolcontext }

    • 使用场所: 作为tool_context传递给支持FunctionTool的函数以及工具执行回调(before_tool_callbackafter_tool_callback)。
    • 目的: 提供CallbackContext所有功能,外加工具执行必需的专门方法,如处理认证、搜索记忆(Memory)和列出制品(Artifacts)。
    • 主要功能(添加到CallbackContext):
      • 认证方法: request_credential(auth_config)触发认证流程,以及get_auth_response(auth_config)检索用户/系统提供的凭证。
      • 制品(Artifacts)列表: list_artifacts()用于发现会话中可用的制品(Artifacts)。
      • 记忆(Memory)搜索: search_memory(query)用于查询配置的memory_service
      • function_call_id属性: 标识触发此工具执行的 LLM 的特定函数调用,对于将认证请求或响应正确地链接回来至关重要。
      • actions属性: 直接访问此步骤的EventActions对象,允许工具发出状态更改、认证请求等信号。
    # 伪代码:工具函数接收 ToolContext
    from google.adk.tools import ToolContext
    from typing import Dict, Any
    
    # 假设此函数被 FunctionTool 包装
    def search_external_api(query: str, tool_context: ToolContext) -> Dict[str, Any]:
        api_key = tool_context.state.get("api_key")
        if not api_key:
            # 定义所需的认证配置
            # auth_config = AuthConfig(...)
            # tool_context.request_credential(auth_config) # 请求凭证
            # 使用 'actions' 属性来标记已发起认证请求
            # tool_context.actions.requested_auth_configs[tool_context.function_call_id] = auth_config
            return {"status": "Auth Required"}
    
        # 使用 API key...
        print(f"Tool executing for query '{query}' using API key. Invocation: {tool_context.invocation_id}")
    
        # 可选:搜索记忆或列出制品
        # relevant_docs = tool_context.search_memory(f"info related to {query}")
        # available_files = tool_context.list_artifacts()
    
        return {"result": f"Data for {query} fetched."}
    
    // 伪代码:工具函数接收 Context
    import { Context } from '@google/adk';
    
    // __假设此函数被 FunctionTool 包装__
    function searchExternalApi(query: string, context: Context): { [key: string]: string } {
      const apiKey = context.state.get('api_key') as string;
      if (!apiKey) {
         // 定义所需的认证配置
         // const authConfig = new AuthConfig(...);
         // context.requestCredential(authConfig); // 请求凭证
         // 'actions' 属性现在由 requestCredential 自动更新
         return { status: 'Auth Required' };
      }
    
      // 使用 API 密钥...
      console.log(`工具正在为查询 '${query}' 执行,使用 API 密钥。调用 ID:${context.invocationId}`);
    
      // 可选:搜索记忆或列出制品
      // 注意:在 TS 中访问记忆/制品等服务通常是异步的,
      // 因此如果你复用它们,需要将此函数标记为 'async'。
      // context.searchMemory(`与 ${query} 相关的详细信息`).then(...)
      // context.listArtifacts().then(...)
    
      return { result: `已获取 ${query} 的数据。` };
    }
    
    import "google.golang.org/adk/tool"
    
    // Pseudocode: Tool function receiving ToolContext
    type searchExternalAPIArgs struct {
        Query string `json:"query" jsonschema:"The query to search for."`
    }
    
    func searchExternalAPI(tc tool.Context, input searchExternalAPIArgs) (string, error) {
        apiKey, err := tc.State().Get("api_key")
        if err != nil || apiKey == "" {
            // In a real scenario, you would define and request credentials here.
            // This is a conceptual placeholder.
            return "", fmt.Errorf("auth required")
        }
    
        // Use the API key...
        fmt.Printf("Tool executing for query '%s' using API key. Invocation: %s\n", input.Query, tc.InvocationID())
    
        // Optionally search memory or list artifacts
        // relevantDocs, _ := tc.SearchMemory(tc, "info related to %s", input.Query))
        // availableFiles, _ := tc.Artifacts().List()
    
        return fmt.Sprintf("Data for %s fetched.", input.Query), nil
    }
    
    // 伪代码:工具函数接收 ToolContext
    import com.google.adk.tools.ToolContext;
    import java.util.HashMap;
    import java.util.Map;
    
    // 假设此函数被 FunctionTool 包装
    public Map<String, Object> searchExternalApi(String query, ToolContext toolContext){
        String apiKey = toolContext.state.get("api_key");
        if(apiKey.isEmpty()){
            // 定义所需的认证配置
            // authConfig = AuthConfig(...);
            // toolContext.requestCredential(authConfig); // 请求凭证
            // 使用 'actions' 属性来标记已发起认证请求
            ...
            return Map.of("status", "Auth Required");
        }
    
        // 使用 API key...
        System.out.println("Tool executing for query " + query + " using API key. ");
    
        // 可选:列出制品
        // Single<List<String>> availableFiles = toolContext.listArtifacts();
    
        return Map.of("result", "Data for " + query + " fetched");
    }
    

理解这些不同的上下文对象以及何时使用它们是有效管理状态、访问服务以及控制 ADK 应用程序流程的关键。下一节将详细介绍使用这些上下文可以执行的常见任务。

使用上下文的常见任务

现在你已经了解了不同的上下文对象,让我们专注于在构建智能体和工具时如何使用它们来执行常见任务。

访问信息

你将经常需要读取存储在上下文中的信息。

  • 读取会话状态: 访问之前步骤中保存的数据或用户/应用级别设置。在state属性上使用类似字典的访问。

    # 伪代码:在 Tool 函数中
    from google.adk.tools import ToolContext
    
    def my_tool(tool_context: ToolContext, **kwargs):
        user_pref = tool_context.state.get("user_display_preference", "default_mode")
        api_endpoint = tool_context.state.get("app:api_endpoint") # Read app-level state
    
        if user_pref == "dark_mode":
            # ... apply dark mode logic ...
            pass
        print(f"Using API endpoint: {api_endpoint}")
        # ... rest of tool logic ...
    
    # 伪代码:在 Callback 函数中
    from google.adk.agents.callback_context import CallbackContext
    
    def my_callback(callback_context: CallbackContext, **kwargs):
        last_tool_result = callback_context.state.get("temp:last_api_result") # Read temporary state
        if last_tool_result:
            print(f"Found temporary result from last tool: {last_tool_result}")
        # ... callback logic ...
    
    // 伪代码:在工具函数中
    import { Context } from '@google/adk';
    
    async function myTool(context: Context) {
      const userPref = context.state.get('user_display_preference', 'default_mode');
      const apiEndpoint = context.state.get('app:api_endpoint'); // 读取应用级状态
    
      if (userPref === 'dark_mode') {
        // ... 应用深色模式逻辑 ...
      }
      console.log(`使用 API 端点:${apiEndpoint}`);
      // ... 工具逻辑的其余部分 ...
    }
    
    // 伪代码:在回调函数中
    import { Context } from '@google/adk';
    
    function myCallback(context: Context) {
      const lastToolResult = context.state.get('temp:last_api_result'); // 读取临时状态
      if (lastToolResult) {
        console.log(`发现上一个工具的临时结果:${lastToolResult}`);
      }
      // ... 回调逻辑 ...
    }
    
    import (
        "google.golang.org/adk/agent"
        "google.golang.org/adk/session"
        "google.golang.org/adk/tool"
        "google.golang.org/genai"
    )
    
    // Pseudocode: In a Tool function
    type toolArgs struct {
        // Define tool-specific arguments here
    }
    
    type toolResults struct {
        // Define tool-specific results here
    }
    
    // Example tool function demonstrating state access
    func myTool(tc tool.Context, input toolArgs) (toolResults, error) {
        userPref, err := tc.State().Get("user_display_preference")
        if err != nil {
            userPref = "default_mode"
        }
        apiEndpoint, _ := tc.State().Get("app:api_endpoint") // Read app-level state
    
        if userPref == "dark_mode" {
            // ... apply dark mode logic ...
        }
        fmt.Printf("Using API endpoint: %v\n", apiEndpoint)
        // ... rest of tool logic ...
        return toolResults{}, nil
    }
    
    
    // Pseudocode: In a Callback function
    func myCallback(ctx agent.CallbackContext) (*genai.Content, error) {
        lastToolResult, err := ctx.State().Get("temp:last_api_result") // Read temporary state
        if err == nil {
            fmt.Printf("Found temporary result from last tool: %v\n", lastToolResult)
        } else {
            fmt.Println("No temporary result found.")
        }
        // ... callback logic ...
        return nil, nil
    }
    
    // 伪代码:在 Tool 函数中
    import com.google.adk.tools.ToolContext;
    
    public void myTool(ToolContext toolContext){
       String userPref = toolContext.state().get("user_display_preference");
       String apiEndpoint = toolContext.state().get("app:api_endpoint"); // Read app-level state
       if(userPref.equals("dark_mode")){
            // ... apply dark mode logic ...
            pass
        }
       System.out.println("Using API endpoint: " + apiEndpoint);
       // ... rest of tool logic ...
    }
    
    // 伪代码:在 Callback 函数中
    import com.google.adk.agents.CallbackContext;
    
        public void myCallback(CallbackContext callbackContext){
            String lastToolResult = (String) callbackContext.state().get("temp:last_api_result"); // Read temporary state
        }
        if(!(lastToolResult.isEmpty())){
            System.out.println("Found temporary result from last tool: " + lastToolResult);
        }
        // ... callback logic ...
    
  • 获取当前标识符: 对于基于当前操作的日志记录或自定义逻辑很有用。

    # 伪代码:在任何上下文中(ToolContext 示例)
    from google.adk.tools import ToolContext
    
    def log_tool_usage(tool_context: ToolContext, **kwargs):
        agent_name = tool_context.agent_name
        inv_id = tool_context.invocation_id
        func_call_id = getattr(tool_context, 'function_call_id', 'N/A') # 特定于 ToolContext
    
        print(f"Log: Invocation={inv_id}, Agent={agent_name}, FunctionCallID={func_call_id} - Tool Executed.")
    
    // 伪代码:在任何上下文中
    import { Context } from '@google/adk';
    
    function logToolUsage(context: Context) {
      const agentName = context.agentName;
      const invId = context.invocationId;
      const functionCallId = context.functionCallId ?? '无'; // 执行工具时可用
    
      console.log(`日志:调用=${invId}, 智能体=${agentName}, 函数调用ID=${functionCallId} - 工具已执行。`);
    }
    
    import "google.golang.org/adk/tool"
    
    // Pseudocode: In any context (ToolContext shown)
    type logToolUsageArgs struct{}
    type logToolUsageResult struct {
        Status string `json:"status"`
    }
    
    func logToolUsage(tc tool.Context, args logToolUsageArgs) (logToolUsageResult, error) {
        agentName := tc.AgentName()
        invID := tc.InvocationID()
        funcCallID := tc.FunctionCallID()
    
        fmt.Printf("Log: Invocation=%s, Agent=%s, FunctionCallID=%s - Tool Executed.\n", invID, agentName, funcCallID)
        return logToolUsageResult{Status: "Logged successfully"}, nil
    }
    
    // 伪代码:在任何上下文中(ToolContext 示例)
     import com.google.adk.tools.ToolContext;
    
     public void logToolUsage(ToolContext toolContext){
                String agentName = toolContext.agentName;
                String invId = toolContext.invocationId;
                String functionCallId = toolContext.functionCallId().get(); // Specific to ToolContext
                System.out.println("Log: Invocation= " + invId &+ " Agent= " + agentName);
            }
    
  • 访问初始用户输入: 参考开始当前调用的消息。

    # 伪代码:在 Callback 中
    from google.adk.agents.callback_context import CallbackContext
    
    def check_initial_intent(callback_context: CallbackContext, **kwargs):
        initial_text = "N/A"
        if callback_context.user_content and callback_context.user_content.parts:
            initial_text = callback_context.user_content.parts[0].text or "Non-text input"
    
        print(f"This invocation started with user input: '{initial_text}'")
    
    # 伪代码:在 Agent 的 _run_async_impl 中
    # async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event, None]:
    #     if ctx.user_content and ctx.user_content.parts:
    #         initial_text = ctx.user_content.parts[0].text
    #         print(f"Agent logic remembering initial query: {initial_text}")
    #     ...
    
    // 伪代码:在回调中
    import { Context } from '@google/adk';
    
    function checkInitialIntent(context: Context) {
      let initialText = '无';
      const userContent = context.userContent;
      if (userContent?.parts?.length) {
        initialText = userContent.parts[0].text ?? '非文本输入';
      }
    
      console.log(`本次调用以用户输入开始:'${initialText}'`);
    }
    
    import (
        "google.golang.org/adk/agent"
        "google.golang.org/genai"
    )
    
    // Pseudocode: In a Callback
    func logInitialUserInput(ctx agent.CallbackContext) (*genai.Content, error) {
        userContent := ctx.UserContent()
        if userContent != nil && len(userContent.Parts) > 0 {
            if text := userContent.Parts[0].Text; text != "" {
                fmt.Printf("User's initial input for this turn: '%s'\n", text)
            }
        }
        return nil, nil // No modification
    }
    
    // 伪代码:在 Callback 中
    import com.google.adk.agents.CallbackContext;
    
    public void checkInitialIntent(CallbackContext callbackContext){
        String initialText = "N/A";
        if((!(callbackContext.userContent().isEmpty())) && (!(callbackContext.userContent().parts.isEmpty()))){
            initialText = cbx.userContent().get().parts().get().get(0).text().get();
            ...
            System.out.println("This invocation started with user input: " + initialText)
        }
    }
    

管理状态

状态对于记忆(Memory)和数据流至关重要。当你使用CallbackContextToolContext修改状态时,框架会自动跟踪并持久化这些更改。

  • 在工具之间传递数据

    # 伪代码:Tool 1 - Fetches user ID
    from google.adk.tools import ToolContext
    import uuid
    
    def get_user_profile(tool_context: ToolContext) -> dict:
        user_id = str(uuid.uuid4()) # Simulate fetching ID
        # Save the ID to state for the next tool
        tool_context.state["temp:current_user_id"] = user_id
        return {"profile_status": "ID generated"}
    
    # 伪代码:Tool 2 - Uses user ID from state
    def get_user_orders(tool_context: ToolContext) -> dict:
        user_id = tool_context.state.get("temp:current_user_id")
        if not user_id:
            return {"error": "User ID not found in state"}
    
        print(f"Fetching orders for user ID: {user_id}")
        # ... logic to fetch orders using user_id ...
        return {"orders": ["order123", "order456"]}
    
    // 伪代码:工具 1 - 获取用户 ID
    import { Context } from '@google/adk';
    import { v4 as uuidv4 } from 'uuid';
    
    function getUserProfile(context: Context): Record<string, string> {
      const userId = uuidv4(); // 模拟获取 ID
      // 将 ID 保存到状态中供下一个工具使用
      context.state.set('temp:current_user_id', userId);
      return { profile_status: 'ID generated' };
    }
    
    // 伪代码:工具 2 - 从状态中使用用户 ID
    function getUserOrders(context: Context): Record<string, string | string[]> {
      const userId = context.state.get('temp:current_user_id');
      if (!userId) {
        return { error: '状态中未找到用户 ID' };
      }
    
      console.log(`正在获取用户 ID 的订单:${userId}`);
      // ... 使用 user_id 获取订单的逻辑 ...
      return { orders: ['order123', 'order456'] };
    }
    
    import "google.golang.org/adk/tool"
    
    // Pseudocode: Tool 1 - Fetches user ID
    type GetUserProfileArgs struct {
    }
    
    func getUserProfile(tc tool.Context, input GetUserProfileArgs) (string, error) {
        // A random user ID for demonstration purposes
        userID := "random_user_456"
    
        // Save the ID to state for the next tool
        if err := tc.State().Set("temp:current_user_id", userID); err != nil {
            return "", fmt.Errorf("failed to set user ID in state: %w", err)
        }
        return "ID generated", nil
    }
    
    
    // Pseudocode: Tool 2 - Uses user ID from state
    type GetUserOrdersArgs struct {
    }
    
    type getUserOrdersResult struct {
        Orders []string `json:"orders"`
    }
    
    func getUserOrders(tc tool.Context, input GetUserOrdersArgs) (*getUserOrdersResult, error) {
        userID, err := tc.State().Get("temp:current_user_id")
        if err != nil {
            return &getUserOrdersResult{}, fmt.Errorf("user ID not found in state")
        }
    
        fmt.Printf("Fetching orders for user ID: %v\n", userID)
        // ... logic to fetch orders using user_id ...
        return &getUserOrdersResult{Orders: []string{"order123", "order456"}}, nil
    }
    
    // 伪代码:工具 1 - 获取用户 ID
    import com.google.adk.tools.ToolContext;
    import java.util.UUID;
    
    public Map<String, String> getUserProfile(ToolContext toolContext){
        String userId = UUID.randomUUID().toString();
        return Map.of("profile_status", "ID generated");
    }
    
    // 伪代码:Tool 2 - Uses user ID from state
    public  Map<String, String> getUserOrders(ToolContext toolContext){
        String userId = toolContext.state().get("temp:current_user_id");
        if(userId.isEmpty()){
            return Map.of("error", "User ID not found in state");
        }
        System.out.println("Fetching orders for user id: " + userId);
         // ... logic to fetch orders using user_id ...
        return Map.of("orders", "order123");
    }
    
  • 更新用户偏好:

    # 伪代码:Tool 或 Callback 识别偏好
    from google.adk.tools import ToolContext # Or CallbackContext
    
    def set_user_preference(tool_context: ToolContext, preference: str, value: str) -> dict:
        # 如果使用持久 SessionService,请使用 'user:' 前缀表示用户级状态
        state_key = f"user:{preference}"
        tool_context.state[state_key] = value
        print(f"Set user preference '{preference}' to '{value}'")
        return {"status": "Preference updated"}
    
    // 伪代码:工具或回调识别偏好
    import { Context } from '@google/adk';
    
    function setUserPreference(context: Context, preference: string, value: string): Record<string, string> {
      // 使用 'user:' 前缀表示用户级状态(如果使用持久化 SessionService)
      const stateKey = `user:${preference}`;
      context.state.set(stateKey, value);
      console.log(`已将用户偏好 '${preference}' 设置为 '${value}'`);
      return { status: 'Preference updated' };
    }
    
    import "google.golang.org/adk/tool"
    
    // Pseudocode: Tool or Callback identifies a preference
    type setUserPreferenceArgs struct {
        Preference string `json:"preference" jsonschema:"The name of the preference to set."`
        Value      string `json:"value" jsonschema:"The value to set for the preference."`
    }
    
    type setUserPreferenceResult struct {
        Status string `json:"status"`
    }
    
    func setUserPreference(tc tool.Context, args setUserPreferenceArgs) (setUserPreferenceResult, error) {
        // Use 'user:' prefix for user-level state (if using a persistent SessionService)
        stateKey := fmt.Sprintf("user:%s", args.Preference)
        if err := tc.State().Set(stateKey, args.Value); err != nil {
            return setUserPreferenceResult{}, fmt.Errorf("failed to set preference in state: %w", err)
        }
        fmt.Printf("Set user preference '%s' to '%s'\n", args.Preference, args.Value)
        return setUserPreferenceResult{Status: "Preference updated"}, nil
    }
    
    // 伪代码:Tool 或 Callback 识别偏好
    import com.google.adk.tools.ToolContext; // Or CallbackContext
    
    public Map<String, String> setUserPreference(ToolContext toolContext, String preference, String value){
        // 如果使用持久 SessionService,请使用 'user:' 前缀表示用户级状态
        String stateKey = "user:" + preference;
        toolContext.state().put(stateKey, value);
        System.out.println("Set user preference '" + preference + "' to '" + value + "'");
        return Map.of("status", "Preference updated");
    }
    
  • 状态前缀: 虽然基本状态是会话特定的,但像app:user:这样的前缀可以与持久SessionService实现(如DatabaseSessionServiceVertexAiSessionService)一起使用,以指示更广泛的范围(应用范围或跨会话的用户范围)。temp:可以表示仅在当前调用中相关的数据。

使用制品(Artifacts)

使用制品(Artifacts)处理与会话相关的文件或大型数据块。常见用例:处理上传的文档。

  • 文档摘要生成器示例流程:

    1. 引入引用(例如,在设置工具或回调中): 将文档的路径或 URI保存为制品(Artifacts),而不是整个内容。

      # 伪代码:在 callback 或 initial tool
      from google.adk.agents.callback_context import CallbackContext # Or ToolContext
      from google.genai import types
      
      def save_document_reference(context: CallbackContext, file_path: str) -> None:
          # 假设 file_path 类似 "gs://my-bucket/docs/report.pdf" 或 "/local/path/to/report.pdf"
          try:
              # 创建包含路径/URI 文本的 Part
              artifact_part = types.Part(text=file_path)
              version = context.save_artifact("document_to_summarize.txt", artifact_part)
              print(f"Saved document reference '{file_path}' as artifact version {version}")
              # 如果其他工具需要,将文件名存储在状态中
              context.state["temp:doc_artifact_name"] = "document_to_summarize.txt"
          except ValueError as e:
              print(f"Error saving artifact: {e}") # 例如,制品服务未配置
          except Exception as e:
              print(f"Unexpected error saving artifact reference: {e}")
      
      # 使用示例:
      # save_document_reference(callback_context, "gs://my-bucket/docs/report.pdf")
      
      // Pseudocode: In a callback or initial tool
      import { Context } from '@google/adk';
      import type { Part } from '@google/genai';
      
      async function saveDocumentReference(context: Context, filePath: string) {
        // Assume filePath is something like "gs://my-bucket/docs/report.pdf" or "/local/path/to/report.pdf"
        try {
          // Create a Part containing the path/URI text
          const artifactPart: Part = { text: filePath };
          const version = await context.saveArtifact('document_to_summarize.txt', artifactPart);
          console.log(`Saved document reference '${filePath}' as artifact version ${version}`);
          // Store the filename in state if needed by other tools
          context.state.set('temp:doc_artifact_name', 'document_to_summarize.txt');
        } catch (e) {
          console.error(`Unexpected error saving artifact reference: ${e}`);
        }
      }
      
      // Example usage:
      // saveDocumentReference(context, "gs://my-bucket/docs/report.pdf");
      
      import (
          "google.golang.org/adk/tool"
          "google.golang.org/genai"
      )
      
      // Adapt the saveDocumentReference callback into a tool for this example.
      type saveDocRefArgs struct {
          FilePath string `json:"file_path" jsonschema:"The path to the file to save."`
      }
      
      type saveDocRefResult struct {
          Status string `json:"status"`
      }
      
      func saveDocRef(tc tool.Context, args saveDocRefArgs) (saveDocRefResult, error) {
          artifactPart := genai.NewPartFromText(args.FilePath)
          _, err := tc.Artifacts().Save(tc, "document_to_summarize.txt", artifactPart)
          if err != nil {
              return saveDocRefResult{}, err
          }
          fmt.Printf("Saved document reference '%s' as artifact\n", args.FilePath)
          if err := tc.State().Set("temp:doc_artifact_name", "document_to_summarize.txt"); err != nil {
              return saveDocRefResult{}, fmt.Errorf("failed to set artifact name in state")
          }
          return saveDocRefResult{"Reference saved"}, nil
      }
      
      // 伪代码:在 callback 或 initial tool
      import com.google.adk.agents.CallbackContext;
      import com.google.genai.types.Content;
      import com.google.genai.types.Part;
      
      
      pubic void saveDocumentReference(CallbackContext context, String filePath){
          // 假设 file_path 类似 "gs://my-bucket/docs/report.pdf" 或 "/local/path/to/report.pdf"
          try{
              // 创建包含路径/URI 文本的 Part
              Part artifactPart = types.Part(filePath)
              Optional<Integer> version = context.saveArtifact("document_to_summarize.txt", artifactPart)
              System.out.println("Saved document reference" + filePath + " as artifact version " + version);
              // 如果其他工具需要,将文件名存储在状态中
              context.state().put("temp:doc_artifact_name", "document_to_summarize.txt");
          } catch(Exception e){
              System.out.println("Unexpected error saving artifact reference: " + e);
          }
      }
      
      // 使用示例:
      // saveDocumentReference(context, "gs://my-bucket/docs/report.pdf")
      
    2. 摘要工具: 加载制品(Artifacts)以获取路径/URI,使用适当的库读取实际文档内容,总结,并返回结果。

      # 伪代码:在 Summarizer 工具函数中
      from google.adk.tools import ToolContext
      from google.genai import types
      # 假设像 google.cloud.storage 或内置 open 等库可用
      # 假设存在 'summarize_text' 函数
      # from my_summarizer_lib import summarize_text
      
      def summarize_document_tool(tool_context: ToolContext) -> dict:
          artifact_name = tool_context.state.get("temp:doc_artifact_name")
          if not artifact_name:
              return {"error": "Document artifact name not found in state."}
      
          try:
              # 1. 加载包含路径/URI 的制品部分
              artifact_part = tool_context.load_artifact(artifact_name)
              if not artifact_part or not artifact_part.text:
                  return {"error": f"Could not load artifact or artifact has no text path: {artifact_name}"}
      
              file_path = artifact_part.text
              print(f"Loaded document reference: {file_path}")
      
              # 2. 读取实际文档内容(在 ADK 上下文之外)
              document_content = ""
              if file_path.startswith("gs://"):
                  # 示例:使用 GCS 客户端库下载/读取
                  # from google.cloud import storage
                  # client = storage.Client()
                  # blob = storage.Blob.from_string(file_path, client=client)
                  # document_content = blob.download_as_text() # 或字节,取决于格式
                  pass # 替换为实际的 GCS 读取逻辑
              elif file_path.startswith("/"):
                   # 示例:使用本地文件系统
                   with open(file_path, 'r', encoding='utf-8') as f:
                       document_content = f.read()
              else:
                  return {"error": f"Unsupported file path scheme: {file_path}"}
      
              # 3. 总结内容
              if not document_content:
                   return {"error": "Failed to read document content."}
      
              # summary = summarize_text(document_content) # 调用你的总结逻辑
              summary = f"Summary of content from {file_path}" # 占位符
      
              return {"summary": summary}
      
          except ValueError as e:
               return {"error": f"Artifact service error: {e}"}
          except FileNotFoundError:
               return {"error": f"Local file not found: {file_path}"}
          # except Exception as e: # 捕获 GCS 等的特定异常
          #      return {"error": f"Error reading document {file_path}: {e}"}
      
      // 伪代码:在摘要生成器工具函数中
      import { Context } from '@google/adk';
      
      async function summarizeDocumentTool(context: Context): Promise<Record<string, string>> {
        const artifactName = context.state.get('temp:doc_artifact_name') as string;
        if (!artifactName) {
          return { error: '状态中未找到文档制品名称。' };
        }
      
        try {
          // 1. 加载包含路径/URI 的制品部分
          const artifactPart = await context.loadArtifact(artifactName);
          if (!artifactPart?.text) {
            return { error: `无法加载制品或制品没有文本路径:${artifactName}` };
          }
      
          const filePath = artifactPart.text;
          console.log(`已加载文档引用:${filePath}`);
      
          // 2. 读取实际文档内容(在 ADK 上下文之外)
          let documentContent = '';
          if (filePath.startsWith('gs://')) {
            // 示例:使用 GCS 客户端库进行下载/读取
            // const storage = new Storage();
            // const bucket = storage.bucket('my-bucket');
            // const file = bucket.file(filePath.replace('gs://my-bucket/', ''));
            // const [contents] = await file.download();
            // documentContent = contents.toString();
          } else if (filePath.startsWith('/')) {
            // 示例:使用本地文件系统
            // import { readFile } from 'fs/promises';
            // documentContent = await readFile(filePath, 'utf8');
          } else {
            return { error: `不支持的文件路径方案:${filePath}` };
          }
      
          // 3. 总结内容
          if (!documentContent) {
             return { error: '读取文档内容失败。' };
          }
      
          // const summary = summarizeText(documentContent); // 调用你的总结逻辑
          const summary = `来自 ${filePath} 的内容摘要`; // 占位符
      
          return { summary };
      
        } catch (e) {
           return { error: `处理制品时出错:${e}` };
        }
      }
      
      import "google.golang.org/adk/tool"
      
      // Pseudocode: In the Summarizer tool function
      type summarizeDocumentArgs struct{}
      
      type summarizeDocumentResult struct {
          Summary string `json:"summary"`
      }
      
      func summarizeDocumentTool(tc tool.Context, input summarizeDocumentArgs) (summarizeDocumentResult, error) {
          artifactName, err := tc.State().Get("temp:doc_artifact_name")
          if err != nil {
              return summarizeDocumentResult{}, fmt.Errorf("No document artifact name found in state")
          }
      
          // 1. Load the artifact part containing the path/URI
          artifactPart, err := tc.Artifacts().Load(tc, artifactName.(string))
          if err != nil {
              return summarizeDocumentResult{}, err
          }
      
          if artifactPart.Part.Text == "" {
              return summarizeDocumentResult{}, fmt.Errorf("Could not load artifact or artifact has no text path.")
          }
          filePath := artifactPart.Part.Text
          fmt.Printf("Loaded document reference: %s\n", filePath)
      
          // 2. Read the actual document content (outside ADK context)
          // In a real implementation, you would use a GCS client or local file reader.
          documentContent := "This is the fake content of the document at " + filePath
          _ = documentContent // Avoid unused variable error.
      
          // 3. Summarize the content
          summary := "Summary of content from " + filePath // Placeholder
      
          return summarizeDocumentResult{Summary: summary}, nil
      }
      
      // 伪代码:在 Summarizer 工具函数中
      import com.google.adk.tools.ToolContext;
      import com.google.genai.types.Content;
      import com.google.genai.types.Part;
      
      public Map<String, String> summarizeDocumentTool(ToolContext toolContext){
          String artifactName = toolContext.state().get("temp:doc_artifact_name");
          if(artifactName.isEmpty()){
              return Map.of("error", "Document artifact name not found in state.");
          }
          try{
              // 1. Load the artifact part containing the path/URI
              Maybe<Part> artifactPart = toolContext.loadArtifact(artifactName);
              if((artifactPart == null) || (artifactPart.text().isEmpty())){
                  return Map.of("error", "Could not load artifact or artifact has no text path: " + artifactName);
              }
              filePath = artifactPart.text();
              System.out.println("Loaded document reference: " + filePath);
      
              // 2. Read the actual document content (outside ADK context)
              String documentContent = "";
              if(filePath.startsWith("gs://")){
                  // Example: Use GCS client library to download/read into documentContent
                  pass; // Replace with actual GCS reading logic
              } else if(){
                  // Example: Use local file system to download/read into documentContent
              } else{
                  return Map.of("error", "Unsupported file path scheme: " + filePath);
              }
      
              // 3. Summarize the content
              if(documentContent.isEmpty()){
                  return Map.of("error", "Failed to read document content.");
              }
      
              // summary = summarizeText(documentContent) // Call your summarization logic
              summary = "Summary of content from " + filePath; // Placeholder
      
              return Map.of("summary", summary);
          } catch(IllegalArgumentException e){
              return Map.of("error", "Artifact service error " + filePath + e);
          } catch(FileNotFoundException e){
              return Map.of("error", "Local file not found " + filePath + e);
          } catch(Exception e){
              return Map.of("error", "Error reading document " + filePath + e);
          }
      }
      
  • 列出制品(Artifacts): 发现哪些文件可用。

    # 伪代码:在 tool 函数中
    from google.adk.tools import ToolContext
    
    def check_available_docs(tool_context: ToolContext) -> dict:
        try:
            artifact_keys = tool_context.list_artifacts()
            print(f"Available artifacts: {artifact_keys}")
            return {"available_docs": artifact_keys}
        except ValueError as e:
            return {"error": f"Artifact service error: {e}"}
    
    // 伪代码:在工具函数中
    import { Context } from '@google/adk';
    
    async function checkAvailableDocs(context: Context): Promise<Record<string, string[] | string>> {
      try {
        const artifactKeys = await context.listArtifacts();
        console.log(`可用制品:${artifactKeys}`);
        return { available_docs: artifactKeys };
      } catch (e) {
        return { error: `制品服务错误:${e}` };
      }
    }
    
    import "google.golang.org/adk/tool"
    
    // Pseudocode: In a tool function
    type checkAvailableDocsArgs struct{}
    
    type checkAvailableDocsResult struct {
        AvailableDocs []string `json:"available_docs"`
    }
    
    func checkAvailableDocs(tc tool.Context, args checkAvailableDocsArgs) (checkAvailableDocsResult, error) {
        artifactKeys, err := tc.Artifacts().List(tc)
        if err != nil {
            return checkAvailableDocsResult{}, err
        }
        fmt.Printf("Available artifacts: %v\n", artifactKeys)
        return checkAvailableDocsResult{AvailableDocs: artifactKeys.FileNames}, nil
    }
    
    // 伪代码:在 tool 函数中
    import com.google.adk.tools.ToolContext;
    
    public Map<String, String> checkAvailableDocs(ToolContext toolContext){
        try{
            Single<List<String>> artifactKeys = toolContext.listArtifacts();
            System.out.println("Available artifacts" + artifactKeys.tostring());
            return Map.of("availableDocs", "artifactKeys");
        } catch(IllegalArgumentException e){
            return Map.of("error", "Artifact service error: " + e);
        }
    }
    

处理工具认证

Supported in ADKPython v0.1.0TypeScript v0.2.0

安全管理工具所需的 API 密钥或其他凭证。

# 伪代码:需要认证的工具
from google.adk.tools import ToolContext
from google.adk.auth import AuthConfig # 假设已定义适当的 AuthConfig

# 定义所需的认证配置(例如 OAuth、API 密钥)
MY_API_AUTH_CONFIG = AuthConfig(...)
AUTH_STATE_KEY = "user:my_api_credential" # 存储检索到的凭证的键

def call_secure_api(tool_context: ToolContext, request_data: str) -> dict:
    # 1. 检查状态中是否已存在凭证
    credential = tool_context.state.get(AUTH_STATE_KEY)

    if not credential:
        # 2. 如果不存在,则请求凭证
        print("未找到凭证,正在请求...")
        try:
            tool_context.request_credential(MY_API_AUTH_CONFIG)
            # 框架处理生成事件。工具执行在本轮停止。
            return {"status": "Authentication required. Please provide credentials."}
        except ValueError as e:
            return {"error": f"认证错误:{e}"} # 例如:function_call_id 缺失
        except Exception as e:
            return {"error": f"请求凭证失败:{e}"}

    # 3. 如果凭证存在(可能来自请求后的上一个回合)
    #    或者如果在外部完成认证流程后进行后续调用
    try:
        # 可选:如果需要,重新验证/检索,或直接使用
        # 如果外部流程刚刚完成,这可能会检索凭证
        auth_credential_obj = tool_context.get_auth_response(MY_API_AUTH_CONFIG)
        api_key = auth_credential_obj.api_key # 或 access_token 等。

        # 将其存回状态,供会话内的将来调用使用
        tool_context.state[AUTH_STATE_KEY] = auth_credential_obj.model_dump() # 持久化检索到的凭证

        print(f"正在使用检索到的凭证通过数据调用 API:{request_data}")
        # ... 使用 api_key 进行实际的 API 调用 ...
        api_result = f"API result for {request_data}"

        return {"result": api_result}
    except Exception as e:
        # 处理检索/使用凭证时的错误
        print(f"使用凭证时出错:{e}")
        # 如果凭证无效,是否清除状态键?
        # tool_context.state[AUTH_STATE_KEY] = None
        return {"error": "使用凭证失败"}
// 伪代码:需要认证的工具
import { Context } from '@google/adk'; // AuthConfig 来自 ADK 或自定义

// 定义一个本地 AuthConfig 接口,因为 ADK 未公开导出它
interface AuthConfig {
  credentialKey: string;
  authScheme: { type: string }; // 示例的最小化表示
  // 如果与示例相关,请添加其他属性
}

// 定义所需的认证配置(例如 OAuth、API 密钥)
const MY_API_AUTH_CONFIG: AuthConfig = {
  credentialKey: 'my-api-key', // 示例密钥
  authScheme: { type: 'api-key' }, // 示例方案类型
};
const AUTH_STATE_KEY = 'user:my_api_credential'; // 存储检索到的凭证的键

async function callSecureApi(context: Context, requestData: string): Promise<Record<string, string>> {
  // 1. 检查状态中是否已存在凭证
  const credential = context.state.get(AUTH_STATE_KEY);

  if (!credential) {
    // 2. 如果不存在,则请求凭证
    console.log('未找到凭证,正在请求...');
    try {
      context.requestCredential(MY_API_AUTH_CONFIG);
      // 框架处理生成事件。工具执行在本轮停止。
      return { status: 'Authentication required. Please provide credentials.' };
    } catch (e) {
      return { error: `认证或凭证请求错误:${e}` };
    }
  }

  // 3. 如果凭证存在(可能来自请求后的上一个回合)
  //    或者如果在外部完成认证流程后进行后续调用
  try {
    // 可选:如果需要,重新验证/检索,或直接使用
    // 如果外部流程刚刚完成,这可能会检索凭证
    const authCredentialObj = context.getAuthResponse(MY_API_AUTH_CONFIG);
    const apiKey = authCredentialObj?.apiKey; // 或 accessToken 等。

    // 将其存回状态,供会话内的将来调用使用
    // 注意:在严格的 TS 中,可能需要对 authCredentialObj 进行转换或序列化
    context.state.set(AUTH_STATE_KEY, JSON.stringify(authCredentialObj));

    console.log(`正在使用检索到的凭证通过数据调用 API:${requestData}`);
    // ... 使用 apiKey 进行实际的 API 调用 ...
    const apiResult = `${requestData} 的 API 结果`;

    return { result: apiResult };
  } catch (e) {
    // 处理检索/使用凭证时的错误
    console.error(`使用凭证时出错:${e}`);
    // 如果凭证无效,是否清除状态键?
    // toolContext.state.set(AUTH_STATE_KEY, null);
    return { error: '使用凭证失败' };
  }
}

记住:request_credential 会暂停工具并发出需要认证的信号。用户/系统提供凭证,在后续调用中,get_auth_response(或再次检查状态)允许工具继续执行。 框架隐式使用 tool_context.function_call_id 来链接请求和响应。

利用记忆(Memory)

Supported in ADKPython v0.1.0TypeScript v0.2.0

python_only

访问来自过去或外部来源的相关信息。

# 伪代码:使用记忆搜索的工具
from google.adk.tools import ToolContext

def find_related_info(tool_context: ToolContext, topic: str) -> dict:
    try:
        search_results = tool_context.search_memory(f"关于 {topic} 的信息")
        if search_results.results:
            print(f"为 '{topic}' 找到了 {len(search_results.results)} 条记忆结果")
            # 处理 search_results.results (类型为 SearchMemoryResponseEntry)
            top_result_text = search_results.results[0].text
            return {"memory_snippet": top_result_text}
        else:
            return {"message": "未找到相关记忆。"}
    except ValueError as e:
        return {"error": f"记忆服务错误:{e}"} # 例如:服务未配置
    except Exception as e:
        return {"error": f"搜索记忆时发生意外错误:{e}"}
// 伪代码:使用记忆搜索的工具
import { Context } from '@google/adk';

async function findRelatedInfo(context: Context, topic: string): Promise<Record<string, string>> {
  try {
    const searchResults = await context.searchMemory(`关于 ${topic} 的信息`);
    if (searchResults.results?.length) {
      console.log(`为 '${topic}' 找到了 ${searchResults.results.length} 条记忆结果`);
      // 处理 searchResults.results
      const topResultText = searchResults.results[0].text;
      return { memory_snippet: topResultText };
    } else {
      return { message: '未找到相关记忆。' };
    }
  } catch (e) {
     return { error: `记忆服务错误:${e}` }; // 例如:服务未配置
  }
}

高级用法:直接使用 InvocationContext

Supported in ADKPython v0.1.0TypeScript v0.2.0

虽然大多数交互通过CallbackContextToolContext进行,但有时智能体的核心逻辑(_run_async_impl/_run_live_impl)需要直接访问。

# Pseudocode: Inside agent's _run_async_impl
from google.adk.agents import BaseAgent
from google.adk.agents.invocation_context import InvocationContext
from google.adk.events import Event
from typing import AsyncGenerator

class MyControllingAgent(BaseAgent):
    async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event, None]:
        # Example: Check if a specific service is available
        if not ctx.memory_service:
            print("Memory service is not available for this invocation.")
            # Potentially change agent behavior

        # Example: Early termination based on some condition
        if ctx.session.state.get("critical_error_flag"):
            print("Critical error detected, ending invocation.")
            ctx.end_invocation = True # Signal framework to stop processing
            yield Event(author=self.name, invocation_id=ctx.invocation_id, content="Stopping due to critical error.")
            return # Stop this agent's execution

        # ... Normal agent processing ...
        yield # ... event ...
// 伪代码:在智能体的 runAsyncImpl 内部
import { BaseAgent, InvocationContext } from '@google/adk';
import type { Event } from '@google/adk';

class MyControllingAgent extends BaseAgent {
  async *runAsyncImpl(ctx: InvocationContext): AsyncGenerator<Event, void, undefined> {
    // 示例:检查特定服务是否可用
    if (!ctx.memoryService) {
      console.log('本次调用记忆服务不可用。');
      // 潜在地改变智能体行为
    }

    // 示例:基于某些条件提前终止
    // 通过 ctx.session.state 直接访问状态,或者如果已包装则通过 ctx.session.state 属性访问
    if ((ctx.session.state as { 'critical_error_flag': boolean })['critical_error_flag']) {
      console.log('检测到严重错误,正在结束调用。');
      ctx.endInvocation = true; // 发送信号给框架以停止处理
      yield {
        author: this.name,
        invocationId: ctx.invocationId,
        content: { parts: [{ text: '由于严重错误而停止。' }] }
      } as Event;
      return; // 停止此智能体的执行
    }

    // ... 正常的智能体处理 ...
    yield; // ... 事件 ...
  }
}

设置ctx.end_invocation = True是从智能体内部或其回调/工具(通过它们各自的上下文对象,这些对象也可以访问并修改底层InvocationContext的标志)平滑地停止整个请求 - 响应周期的方法。

关键要点和最佳实践

  • 使用合适的上下文: 始终使用提供的最具体的上下文对象(工具/工具回调中的ToolContext,智能体/模型回调中的CallbackContext,适用情况下的ReadonlyContext)。仅在必要时直接在_run_async_impl / _run_live_impl中使用完整的InvocationContextctx)。
  • 用于数据流的状态: context.state是在调用内部共享数据、记住偏好和管理对话记忆(Memory)的主要方式。使用持久存储时,要深思熟虑地使用前缀(app:user:temp:)。
  • 用于文件的制品(Artifacts): 使用context.save_artifactcontext.load_artifact来管理文件引用(如路径或 URI)或更大的数据块。存储引用,按需加载内容。
  • 跟踪更改: 通过上下文方法对状态或制品(Artifacts)所做的修改会自动链接到当前步骤的EventActions并由SessionService处理。
  • 从简单开始: 首先专注于state和基本制品(Artifacts)用法。随着需求变得更加复杂,再探索认证、记忆(Memory)和高级InvocationContext字段(如用于实时流式处理的字段)。

通过理解并有效使用这些上下文对象,你可以使用 ADK 构建更复杂、状态化且功能强大的智能体。