Skip to content

使用工具进行身份验证

核心概念

许多工具需要访问受保护的资源(如 Google Calendar 中的用户数据、Salesforce 记录等),因此需要身份验证。ADK 提供了一个系统来安全地处理各种身份验证方法。

涉及的关键组件有:

  1. AuthScheme:定义 API 期望身份验证凭据的方式(例如,在标头中作为 API 密钥、OAuth 2.0 Bearer 令牌)。ADK 支持与 OpenAPI 3.0 相同类型的身份验证方案。要了解每种凭据类型的更多信息,请参阅 OpenAPI doc: Authentication。ADK 使用特定的类,如 APIKeyHTTPBearerOAuth2OpenIdConnectWithConfig
  2. AuthCredential:持有开始身份验证过程所需的初始信息(例如,你的应用程序的 OAuth 客户端 ID/密钥、API 密钥值)。它包括一个 auth_type(如 API_KEYOAUTH2SERVICE_ACCOUNT)指定凭据类型。

一般流程涉及在配置工具时提供这些详细信息。在工具进行 API 调用之前,ADK 会尝试自动将初始凭据交换为可用凭据(如访问令牌)。对于需要用户交互的流程(如 OAuth 同意),会触发涉及智能体客户端应用程序的特定交互过程。

支持的初始凭据类型

  • API_KEY: 用于简单的键/值身份验证。通常不需要交换。
  • HTTP: 可以表示基本身份验证(不推荐/不支持交换)或已获得的 Bearer 令牌。如果是 Bearer 令牌,则不需要交换。
  • OAUTH2: 用于标准 OAuth 2.0 流程。需要配置(客户端 ID、密钥、作用域)并且通常会触发用户同意的交互流程。
  • OPEN_ID_CONNECT: 基于 OpenID Connect 的身份验证。与 OAuth2 类似,通常需要配置和用户交互。
  • SERVICE_ACCOUNT: 用于 Google Cloud 服务账号凭据(JSON 密钥或应用程序默认凭据)。通常交换为 Bearer 令牌。

在工具上配置身份验证

你在定义工具时设置身份验证:

  • RestApiTool / OpenAPIToolset:在初始化期间传递 auth_schemeauth_credential

  • GoogleApiToolSet 工具:ADK 有内置的第一方工具,如 Google Calendar、BigQuery 等。使用工具集的特定方法。

  • APIHubToolset / ApplicationIntegrationToolset:如果 API Hub 中管理的 API 或 Application Integration 提供的 API 需要身份验证,则在初始化期间传递 auth_schemeauth_credential

警告

根据你的会话存储后端(SessionService)和整体应用程序安全状况,将敏感凭据(如访问令牌,尤其是刷新令牌)直接存储在会话状态中可能会带来安全风险。

  • InMemorySessionService 适用于测试和开发,但当进程结束时数据会丢失。由于它是暂时的,风险较小。
  • 数据库/持久存储: 强烈建议在将令牌数据存储到数据库之前使用强大的加密库(如 cryptography)对其进行加密,并安全管理加密密钥(例如,使用密钥管理服务)。
  • 安全密钥存储: 对于生产环境,将敏感凭据存储在专用的密钥管理器(如 Google Cloud Secret Manager 或 HashiCorp Vault)中是最推荐的方法。你的工具可能只在会话状态中存储短期访问令牌或安全引用(而不是刷新令牌本身),并在需要时从安全存储中获取必要的密钥。

旅程 1:使用经过身份验证的工具构建智能应用程序

本节重点关注在你的智能应用程序中使用需要身份验证的预先存在的工具(如来自 RestApiTool/ OpenAPIToolsetAPIHubToolsetGoogleApiToolSet 或自定义 FunctionTools)。你的主要责任是配置工具并处理交互式身份验证流程的客户端部分(如果工具需要)。

身份验证

1. 配置带有身份验证的工具

向你的智能体添加经过身份验证的工具时,你需要提供其所需的 AuthScheme 和你的应用程序的初始 AuthCredential

A. 使用基于 OpenAPI 的工具集(OpenAPIToolsetAPIHubToolset 等)

在工具集初始化期间传递方案和凭据。工具集将它们应用于所有生成的工具。以下是在 ADK 中创建带有身份验证的工具的几种方法。

创建一个需要 API 密钥的工具。

from google.adk.tools.openapi_tool.auth.auth_helpers import token_to_scheme_credential
from google.adk.tools.apihub_tool.apihub_toolset import APIHubToolset
auth_scheme, auth_credential = token_to_scheme_credential(
   "apikey", "query", "apikey", YOUR_API_KEY_STRING
)
sample_api_toolset = APIHubToolset(
   name="sample-api-requiring-api-key",
   description="A tool using an API protected by API Key",
   apihub_resource_name="...",
   auth_scheme=auth_scheme,
   auth_credential=auth_credential,
)

创建一个需要 OAuth2 的工具。

from google.adk.tools.openapi_tool.openapi_spec_parser.openapi_toolset import OpenAPIToolset
from fastapi.openapi.models import OAuth2
from fastapi.openapi.models import OAuthFlowAuthorizationCode
from fastapi.openapi.models import OAuthFlows
from google.adk.auth import AuthCredential
from google.adk.auth import AuthCredentialTypes
from google.adk.auth import OAuth2Auth

auth_scheme = OAuth2(
   flows=OAuthFlows(
      authorizationCode=OAuthFlowAuthorizationCode(
            authorizationUrl="https://accounts.google.com/o/oauth2/auth",
            tokenUrl="https://oauth2.googleapis.com/token",
            scopes={
               "https://www.googleapis.com/auth/calendar": "calendar scope"
            },
      )
   )
)
auth_credential = AuthCredential(
   auth_type=AuthCredentialTypes.OAUTH2,
   oauth2=OAuth2Auth(
      client_id=YOUR_OAUTH_CLIENT_ID, 
      client_secret=YOUR_OAUTH_CLIENT_SECRET
   ),
)

calendar_api_toolset = OpenAPIToolset(
   spec_str=google_calendar_openapi_spec_str, # 用 openapi 规范填充此项
   spec_str_type='yaml',
   auth_scheme=auth_scheme,
   auth_credential=auth_credential,
)

创建一个需要服务账号的工具。

from google.adk.tools.openapi_tool.auth.auth_helpers import service_account_dict_to_scheme_credential
from google.adk.tools.openapi_tool.openapi_spec_parser.openapi_toolset import OpenAPIToolset

service_account_cred = json.loads(service_account_json_str)auth_scheme, auth_credential = service_account_dict_to_scheme_credential(
   config=service_account_cred,
   scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
sample_toolset = OpenAPIToolset(
   spec_str=sa_openapi_spec_str, # 用 openapi 规范填充此项
   spec_str_type='json',
   auth_scheme=auth_scheme,
   auth_credential=auth_credential,
)

创建一个需要 OpenID connect 的工具。

from google.adk.auth.auth_schemes import OpenIdConnectWithConfig
from google.adk.auth.auth_credential import AuthCredential, AuthCredentialTypes, OAuth2Auth
from google.adk.tools.openapi_tool.openapi_spec_parser.openapi_toolset import OpenAPIToolset

auth_scheme = OpenIdConnectWithConfig(
   authorization_endpoint=OAUTH2_AUTH_ENDPOINT_URL,
   token_endpoint=OAUTH2_TOKEN_ENDPOINT_URL,
   scopes=['openid', 'YOUR_OAUTH_SCOPES"]
)
auth_credential = AuthCredential(
auth_type=AuthCredentialTypes.OPEN_ID_CONNECT,
oauth2=OAuth2Auth(
   client_id="...",
   client_secret="...",
)
)

userinfo_toolset = OpenAPIToolset(
   spec_str=content, # 填充实际规范
   spec_str_type='yaml',
   auth_scheme=auth_scheme,
   auth_credential=auth_credential,
)

B. 使用 Google API 工具集(例如,calendar_tool_set

这些工具集通常具有专用的配置方法。

提示:有关如何创建 Google OAuth 客户端 ID 和密钥的指南,请参阅此指南:获取你的 Google API 客户端 ID

# 示例:配置 Google Calendar 工具
from google.adk.tools.google_api_tool import calendar_tool_set

client_id = "YOUR_GOOGLE_OAUTH_CLIENT_ID.apps.googleusercontent.com"
client_secret = "YOUR_GOOGLE_OAUTH_CLIENT_SECRET"

calendar_tools = calendar_tool_set.get_tools()
for tool in calendar_tools:
    # 使用此工具类型的特定配置方法
    tool.configure_auth(client_id=client_id, client_secret=client_secret)

# agent = LlmAgent(..., tools=calendar_tools)

2. 处理交互式 OAuth/OIDC 流程(客户端)

如果工具需要用户登录/同意(通常是 OAuth 2.0 或 OIDC),ADK 框架会暂停执行并向你的智能体客户端应用程序(调用 runner.run_async 的代码,如你的 UI 后端、CLI 应用程序或 Spark 作业)发出信号,以处理用户交互。

以下是你的客户端应用程序的逐步过程:

步骤 1:运行智能体并检测身份验证请求

  • 使用 runner.run_async 启动智能体交互。
  • 遍历生成的事件。
  • 查找智能体调用特殊函数 adk_request_credential 的特定事件。此事件表明需要用户交互。使用辅助函数来识别此事件并提取必要信息。
# runner = Runner(...)
# session = session_service.create_session(...)
# content = types.Content(...) # 用户的初始查询

print("\n运行智能体...")
events_async = runner.run_async(
    session_id=session.id, user_id='user', new_message=content
)

auth_request_event_id, auth_config = None, None

async for event in events_async:
    # 使用辅助函数检查特定的身份验证请求事件
    if is_pending_auth_event(event):
        print("--> 智能体需要身份验证。")
        # 存储稍后需要响应的 ID
        auth_request_event_id = get_function_call_id(event)
        # 获取包含 auth_uri 等的 AuthConfig
        auth_config = get_function_call_auth_config(event)
        break # 暂停处理事件,需要用户交互

if not auth_request_event_id:
    print("\n不需要身份验证或智能体已完成。")
    # return # 或者处理收到的最终响应

辅助函数 helpers.py

from google.adk.events import Event
from google.adk.auth import AuthConfig # 导入必要的类型

def is_pending_auth_event(event: Event) -> bool:
  # 检查事件是否是特殊的身份验证请求函数调用
  return (
      event.content and event.content.parts and event.content.parts[0]
      and event.content.parts[0].function_call
      and event.content.parts[0].function_call.name == 'adk_request_credential'
      # 检查是否标记为长时间运行(可选但是好的做法)
      and event.long_running_tool_ids
      and event.content.parts[0].function_call.id in event.long_running_tool_ids
  )

def get_function_call_id(event: Event) -> str:
  # 提取函数调用的 ID(适用于任何调用,包括身份验证)
  if ( event and event.content and event.content.parts and event.content.parts[0]
      and event.content.parts[0].function_call and event.content.parts[0].function_call.id ):
    return event.content.parts[0].function_call.id
  raise ValueError(f'无法从事件 {event} 获取函数调用 id')

def get_function_call_auth_config(event: Event) -> AuthConfig:
    # 从身份验证请求事件的参数中提取 AuthConfig 对象
    auth_config_dict = None
    try:
        auth_config_dict = event.content.parts[0].function_call.args.get('auth_config')
        if auth_config_dict and isinstance(auth_config_dict, dict):
            # 重构 AuthConfig 对象
            return AuthConfig.model_validate(auth_config_dict)
        else:
            raise ValueError("事件参数中缺少 auth_config 或不是字典")
    except (AttributeError, IndexError, KeyError, TypeError, ValueError) as e:
        raise ValueError(f'无法从事件 {event} 获取身份验证配置') from e

步骤 2:重定向用户进行授权

  • 从上一步提取的 auth_config 中获取授权 URL (auth_uri)。
  • 至关重要的是,将你的应用程序的 redirect_uri 作为查询参数附加到此 auth_uri。这个 redirect_uri 必须预先注册在你的 OAuth 提供商处(例如,Google Cloud ConsoleOkta 管理面板)。
  • 引导用户到这个完整的 URL(例如,在他们的浏览器中打开它)。
# (在检测到需要身份验证后继续)

if auth_request_event_id and auth_config:
    # 从 AuthConfig 获取基本授权 URL
    base_auth_uri = auth_config.exchanged_auth_credential.oauth2.auth_uri

    if base_auth_uri:
        redirect_uri = 'http://localhost:8000/callback' # 必须匹配你的 OAuth 客户端配置
        # 附加 redirect_uri(在生产环境中使用 urlencode)
        auth_request_uri = base_auth_uri + f'&redirect_uri={redirect_uri}'

        print("\n--- 需要用户操作 ---")
        print(f'1. 请在浏览器中打开此 URL:\n   {auth_request_uri}\n')
        print(f'2. 登录并授予请求的权限。')
        print(f'3. 授权后,你将被重定向到:{redirect_uri}')
        print(f'   从浏览器的地址栏复制完整的 URL(它包含一个 `code=...`)。')
        # 下一步:从用户(或你的 Web 服务器处理程序)获取此回调 URL
    else:
         print("错误:在 auth_config 中未找到授权 URI。")
         # 处理错误

身份验证

步骤 3. 处理重定向回调(客户端):

  • 你的应用程序必须有一个机制(例如,在 redirect_uri 的 Web 服务器路由)来接收用户在提供商那里授权应用程序后的请求。
  • 提供商将用户重定向到你的 redirect_uri,并将 authorization_code(以及可能的 statescope)作为查询参数附加到 URL。
  • 从此传入请求中捕获完整的回调 URL
  • (此步骤发生在主智能体执行循环之外,在你的 Web 服务器或等效回调处理程序中。)

步骤 4. 将身份验证结果发送回 ADK(客户端):

  • 一旦你有了完整的回调 URL(包含授权码),检索在客户端步骤 1 中保存的 auth_request_event_idAuthConfig 对象。
  • 更新 将捕获的回调 URL 设置到 exchanged_auth_credential.oauth2.auth_response_uri 字段。还要确保 exchanged_auth_credential.oauth2.redirect_uri 包含你使用的重定向 URI。
  • 构造一个 创建一个包含 types.FunctionResponsetypes.Parttypes.Content 对象。
    • name 设置为 "adk_request_credential"。(注意:这是 ADK 继续身份验证的特殊名称。不要使用其他名称。)
    • id 设置为你保存的 auth_request_event_id
    • response 设置为序列化(例如,.model_dump())更新后的 AuthConfig 对象。
  • 再次调用 runner.run_async 用于同一会话,将此 FunctionResponse 内容作为 new_message 传递。
# (用户交互后继续)

    # 模拟获取回调 URL(例如,从用户粘贴或 Web 处理程序)
    auth_response_uri = await get_user_input(
        f'在此处粘贴完整的回调 URL:\n> '
    )
    auth_response_uri = auth_response_uri.strip() # 清理输入

    if not auth_response_uri:
        print("未提供回调 URL。中止。")
        return

    # 使用回调详细信息更新收到的 AuthConfig
    auth_config.exchanged_auth_credential.oauth2.auth_response_uri = auth_response_uri
    # 也包括使用的 redirect_uri,因为令牌交换可能需要它
    auth_config.exchanged_auth_credential.oauth2.redirect_uri = redirect_uri

    # 构造 FunctionResponse Content 对象
    auth_content = types.Content(
        role='user', # 发送 FunctionResponse 时角色可以是 'user'
        parts=[
            types.Part(
                function_response=types.FunctionResponse(
                    id=auth_request_event_id,       # 链接到原始请求
                    name='adk_request_credential', # 特殊框架函数名称
                    response=auth_config.model_dump() # 发送回*更新后的* AuthConfig
                )
            )
        ],
    )

    # --- 恢复执行 ---
    print("\n将身份验证详细信息提交回智能体...")
    events_async_after_auth = runner.run_async(
        session_id=session.id,
        user_id='user',
        new_message=auth_content, # 发送回 FunctionResponse
    )

    # --- 处理智能体最终输出 ---
    print("\n--- 身份验证后的智能体响应 ---")
    async for event in events_async_after_auth:
        # 正常处理事件,期望工具调用现在成功
        print(event) # 打印完整事件以供检查

步骤 5:ADK 处理令牌交换和工具重试并获取工具结果

  • ADK 接收 adk_request_credentialFunctionResponse
  • 它使用更新后的 AuthConfig(包括包含代码的回调 URL)中的信息与提供商的令牌端点执行 OAuth 令牌交换,获取访问令牌(可能还有刷新令牌)。
  • ADK 内部使这些令牌可用(通常通过 tool_context.get_auth_response() 或通过更新会话状态)。
  • ADK 自动重试原始工具调用(最初由于缺少身份验证而失败的那个)。
  • 这次,工具找到有效的令牌并成功执行经过身份验证的 API 调用。
  • 智能体从工具接收实际结果并生成其对用户的最终响应。

旅程 2:构建需要身份验证的自定义工具(FunctionTool

本节重点介绍在创建新的 ADK 工具时在自定义 Python 函数内部实现身份验证逻辑。我们将实现一个 FunctionTool 作为示例。

先决条件

你的函数签名必须包括 tool_context: ToolContext。ADK 自动注入此对象,提供对状态和身份验证机制的访问。

from google.adk.tools import FunctionTool, ToolContext
from typing import Dict

def my_authenticated_tool_function(param1: str, ..., tool_context: ToolContext) -> dict:
    # ... 你的逻辑 ...
    pass

my_tool = FunctionTool(func=my_authenticated_tool_function)

工具函数内的身份验证逻辑

在你的函数内实施以下步骤:

步骤 1:检查缓存和有效凭据:

在你的工具函数内,首先检查是否已经在此会话的之前运行中存储了有效的凭据(例如,访问/刷新令牌)。当前会话的凭据应存储在 tool_context.invocation_context.session.state(状态字典)中。通过检查 tool_context.invocation_context.session.state.get(credential_name, None) 来检查现有凭据的存在。

# 在你的工具函数内
TOKEN_CACHE_KEY = "my_tool_tokens" # 选择一个唯一的键
SCOPES = ["scope1", "scope2"] # 定义所需的作用域

creds = None
cached_token_info = tool_context.state.get(TOKEN_CACHE_KEY)
if cached_token_info:
    try:
        creds = Credentials.from_authorized_user_info(cached_token_info, SCOPES)
        if not creds.valid and creds.expired and creds.refresh_token:
            creds.refresh(Request())
            tool_context.state[TOKEN_CACHE_KEY] = json.loads(creds.to_json()) # 更新缓存
        elif not creds.valid:
            creds = None # 无效,需要重新认证
            tool_context.state.pop(TOKEN_CACHE_KEY, None)
    except Exception as e:
        print(f"加载/刷新缓存凭据时出错:{e}")
        creds = None
        tool_context.state.pop(TOKEN_CACHE_KEY, None)

if creds and creds.valid:
    # 跳到步骤 5:进行经过身份验证的 API 调用
    pass
else:
    # 继续步骤 2...
    pass

步骤 2:检查来自客户端的身份验证响应

  • 如果步骤 1 没有产生有效的凭据,通过调用 auth_response_config = tool_context.get_auth_response() 检查客户端是否刚刚完成了交互式流程。
  • 这将返回由客户端发送回的更新后的 AuthConfig 对象(在 auth_response_uri 中包含回调 URL)。
# 使用在工具中配置的 auth_scheme 和 auth_credential。
# exchanged_credential: AuthCredential|None

exchanged_credential = tool_context.get_auth_response(AuthConfig(
  auth_scheme=auth_scheme,
  raw_auth_credential=auth_credential,
))
# 如果 exchanged_credential 不是 None,那么已经从身份验证响应中有了交换后的凭据。使用它,并跳到步骤 5

步骤 3:发起身份验证请求

如果找不到有效的凭据(步骤 1.)和身份验证响应(步骤 2.),则工具需要启动 OAuth 流程。定义 AuthScheme 和初始 AuthCredential 并调用 tool_context.request_credential()。返回一个状态,表明需要授权。

# 使用在工具中配置的 auth_scheme 和 auth_credential。

  tool_context.request_credential(AuthConfig(
    auth_scheme=auth_scheme,
    raw_auth_credential=auth_credential,
  ))
  return {'pending': true, 'message': '等待用户身份验证。'}

# 通过设置 request_credential,ADK 检测到一个待处理的身份验证事件。它会暂停执行并要求最终用户登录。

步骤 4:将授权码交换为令牌

ADK 自动生成 oauth 授权 URL 并将其呈现给你的智能体客户端应用程序。一旦用户按照授权 URL 完成登录流程,ADK 就会从智能体客户端应用程序中提取身份验证回调 url,自动解析身份验证代码,并生成身份验证令牌。在下一次工具调用时,步骤 2 中的 tool_context.get_auth_response 将包含一个在后续 API 调用中使用的有效凭据。

步骤 5:缓存获得的凭据

在成功从 ADK(步骤 2)获取令牌或者如果令牌仍然有效(步骤 1)之后,立即存储新的 Credentials 对象在 tool_context.state(序列化,例如,为 JSON)中使用你的缓存键。

# 在你的工具函数内,在获取 'creds'(无论是刷新的还是新交换的)之后
# 缓存新的/刷新的令牌
tool_context.state[TOKEN_CACHE_KEY] = json.loads(creds.to_json())
print(f"调试:在键下缓存/更新令牌:{TOKEN_CACHE_KEY}")
# 继续步骤 6(进行 API 调用)

步骤 6:进行经过身份验证的 API 调用

  • 一旦你有了有效的 Credentials 对象(来自步骤 1 或步骤 4 的 creds),使用它通过适当的客户端库(例如,googleapiclientrequests)对受保护的 API 进行实际调用。传递 credentials=creds 参数。
  • 包括错误处理,特别是 HttpError 401/403,这可能意味着令牌在调用之间过期或被撤销。如果你得到这样的错误,考虑清除缓存的令牌(tool_context.state.pop(...))并可能再次返回 auth_required 状态以强制重新认证。
# 在你的工具函数内,使用有效的 'creds' 对象
# 确保 creds 在继续之前是有效的
if not creds or not creds.valid:
   return {"status": "error", "error_message": "没有有效的凭据无法继续。"}

try:
   service = build("calendar", "v3", credentials=creds) # 示例
   api_result = service.events().list(...).execute()
   # 继续步骤 7
except Exception as e:
   # 处理 API 错误(例如,检查 401/403,可能清除缓存并重新请求认证)
   print(f"错误:API 调用失败:{e}")
   return {"status": "error", "error_message": f"API 调用失败:{e}"}

步骤 7:返回工具结果

  • 在成功的 API 调用之后,将结果处理成对 LLM 有用的字典格式。
  • 至关重要的是,包括一个状态以及数据。
# 在你的工具函数内,成功 API 调用后
    processed_result = [...] # 为 LLM 处理 api_result
    return {"status": "success", "data": processed_result}
完整代码
tools_and_agent.py
import asyncio
from dotenv import load_dotenv
from google.adk.artifacts.in_memory_artifact_service import InMemoryArtifactService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types

from .helpers import is_pending_auth_event, get_function_call_id, get_function_call_auth_config, get_user_input
from .tools_and_agent import root_agent

load_dotenv()

agent = root_agent

async def async_main():
  """
  Main asynchronous function orchestrating the agent interaction and authentication flow.
  """
  # --- Step 1: Service Initialization ---
  # Use in-memory services for session and artifact storage (suitable for demos/testing).
  session_service = InMemorySessionService()
  artifacts_service = InMemoryArtifactService()

  # Create a new user session to maintain conversation state.
  session = session_service.create_session(
      state={},  # Optional state dictionary for session-specific data
      app_name='my_app', # Application identifier
      user_id='user' # User identifier
  )

  # --- Step 2: Initial User Query ---
  # Define the user's initial request.
  query = 'Show me my user info'
  print(f"user: {query}")

  # Format the query into the Content structure expected by the ADK Runner.
  content = types.Content(role='user', parts=[types.Part(text=query)])

  # Initialize the ADK Runner
  runner = Runner(
      app_name='my_app',
      agent=agent,
      artifact_service=artifacts_service,
      session_service=session_service,
  )

  # --- Step 3: Send Query and Handle Potential Auth Request ---
  print("\nRunning agent with initial query...")
  events_async = runner.run_async(
      session_id=session.id, user_id='user', new_message=content
  )

  # Variables to store details if an authentication request occurs.
  auth_request_event_id, auth_config = None, None

  # Iterate through the events generated by the first run.
  async for event in events_async:
    # Check if this event is the specific 'adk_request_credential' function call.
    if is_pending_auth_event(event):
      print("--> Authentication required by agent.")
      auth_request_event_id = get_function_call_id(event)
      auth_config = get_function_call_auth_config(event)
      # Once the auth request is found and processed, exit this loop.
      # We need to pause execution here to get user input for authentication.
      break


  # If no authentication request was detected after processing all events, exit.
  if not auth_request_event_id or not auth_config:
      print("\nAuthentication not required for this query or processing finished.")
      return # Exit the main function

  # --- Step 4: Manual Authentication Step (Simulated OAuth 2.0 Flow) ---
  # This section simulates the user interaction part of an OAuth 2.0 flow.
  # In a real web application, this would involve browser redirects.

  # Define the Redirect URI. This *must* match one of the URIs registered
  # with the OAuth provider for your application. The provider sends the user
  # back here after they approve the request.
  redirect_uri = 'http://localhost:8000/dev-ui' # Example for local development

  # Construct the Authorization URL that the user must visit.
  # This typically includes the provider's authorization endpoint URL,
  # client ID, requested scopes, response type (e.g., 'code'), and the redirect URI.
  # Here, we retrieve the base authorization URI from the AuthConfig provided by ADK
  # and append the redirect_uri.
  # NOTE: A robust implementation would use urlencode and potentially add state, scope, etc.
  auth_request_uri = (
      auth_config.exchanged_auth_credential.oauth2.auth_uri
      + f'&redirect_uri={redirect_uri}' # Simple concatenation; ensure correct query param format
  )

  print("\n--- User Action Required ---")
  # Prompt the user to visit the authorization URL, log in, grant permissions,
  # and then paste the *full* URL they are redirected back to (which contains the auth code).
  auth_response_uri = await get_user_input(
      f'1. Please open this URL in your browser to log in:\n   {auth_request_uri}\n\n'
      f'2. After successful login and authorization, your browser will be redirected.\n'
      f'   Copy the *entire* URL from the browser\'s address bar.\n\n'
      f'3. Paste the copied URL here and press Enter:\n\n> '
  )

  # --- Step 5: Prepare Authentication Response for the Agent ---
  # Update the AuthConfig object with the information gathered from the user.
  # The ADK framework needs the full response URI (containing the code)
  # and the original redirect URI to complete the OAuth token exchange process internally.
  auth_config.exchanged_auth_credential.oauth2.auth_response_uri = auth_response_uri
  auth_config.exchanged_auth_credential.oauth2.redirect_uri = redirect_uri

  # Construct a FunctionResponse Content object to send back to the agent/runner.
  # This response explicitly targets the 'adk_request_credential' function call
  # identified earlier by its ID.
  auth_content = types.Content(
      role='user',
      parts=[
          types.Part(
              function_response=types.FunctionResponse(
                  # Crucially, link this response to the original request using the saved ID.
                  id=auth_request_event_id,
                  # The special name of the function call we are responding to.
                  name='adk_request_credential',
                  # The payload containing all necessary authentication details.
                  response=auth_config.model_dump(),
              )
          )
      ],
  )

  # --- Step 6: Resume Execution with Authentication ---
  print("\nSubmitting authentication details back to the agent...")
  # Run the agent again, this time providing the `auth_content` (FunctionResponse).
  # The ADK Runner intercepts this, processes the 'adk_request_credential' response
  # (performs token exchange, stores credentials), and then allows the agent
  # to retry the original tool call that required authentication, now succeeding with
  # a valid access token embedded.
  events_async = runner.run_async(
      session_id=session.id,
      user_id='user',
      new_message=auth_content, # Provide the prepared auth response
  )

  # Process and print the final events from the agent after authentication is complete.
  # This stream now contain the actual result from the tool (e.g., the user info).
  print("\n--- Agent Response after Authentication ---")
  async for event in events_async:
    print(event)


if __name__ == '__main__':
  asyncio.run(async_main())
agent_cli.py
import asyncio
from dotenv import load_dotenv
from google.adk.artifacts.in_memory_artifact_service import InMemoryArtifactService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types

from .helpers import is_pending_auth_event, get_function_call_id, get_function_call_auth_config, get_user_input
from .tools_and_agent import root_agent

load_dotenv()

agent = root_agent

async def async_main():
  """
  Main asynchronous function orchestrating the agent interaction and authentication flow.
  """
  # --- Step 1: Service Initialization ---
  # Use in-memory services for session and artifact storage (suitable for demos/testing).
  session_service = InMemorySessionService()
  artifacts_service = InMemoryArtifactService()

  # Create a new user session to maintain conversation state.
  session = session_service.create_session(
      state={},  # Optional state dictionary for session-specific data
      app_name='my_app', # Application identifier
      user_id='user' # User identifier
  )

  # --- Step 2: Initial User Query ---
  # Define the user's initial request.
  query = 'Show me my user info'
  print(f"user: {query}")

  # Format the query into the Content structure expected by the ADK Runner.
  content = types.Content(role='user', parts=[types.Part(text=query)])

  # Initialize the ADK Runner
  runner = Runner(
      app_name='my_app',
      agent=agent,
      artifact_service=artifacts_service,
      session_service=session_service,
  )

  # --- Step 3: Send Query and Handle Potential Auth Request ---
  print("\nRunning agent with initial query...")
  events_async = runner.run_async(
      session_id=session.id, user_id='user', new_message=content
  )

  # Variables to store details if an authentication request occurs.
  auth_request_event_id, auth_config = None, None

  # Iterate through the events generated by the first run.
  async for event in events_async:
    # Check if this event is the specific 'adk_request_credential' function call.
    if is_pending_auth_event(event):
      print("--> Authentication required by agent.")
      auth_request_event_id = get_function_call_id(event)
      auth_config = get_function_call_auth_config(event)
      # Once the auth request is found and processed, exit this loop.
      # We need to pause execution here to get user input for authentication.
      break


  # If no authentication request was detected after processing all events, exit.
  if not auth_request_event_id or not auth_config:
      print("\nAuthentication not required for this query or processing finished.")
      return # Exit the main function

  # --- Step 4: Manual Authentication Step (Simulated OAuth 2.0 Flow) ---
  # This section simulates the user interaction part of an OAuth 2.0 flow.
  # In a real web application, this would involve browser redirects.

  # Define the Redirect URI. This *must* match one of the URIs registered
  # with the OAuth provider for your application. The provider sends the user
  # back here after they approve the request.
  redirect_uri = 'http://localhost:8000/dev-ui' # Example for local development

  # Construct the Authorization URL that the user must visit.
  # This typically includes the provider's authorization endpoint URL,
  # client ID, requested scopes, response type (e.g., 'code'), and the redirect URI.
  # Here, we retrieve the base authorization URI from the AuthConfig provided by ADK
  # and append the redirect_uri.
  # NOTE: A robust implementation would use urlencode and potentially add state, scope, etc.
  auth_request_uri = (
      auth_config.exchanged_auth_credential.oauth2.auth_uri
      + f'&redirect_uri={redirect_uri}' # Simple concatenation; ensure correct query param format
  )

  print("\n--- User Action Required ---")
  # Prompt the user to visit the authorization URL, log in, grant permissions,
  # and then paste the *full* URL they are redirected back to (which contains the auth code).
  auth_response_uri = await get_user_input(
      f'1. Please open this URL in your browser to log in:\n   {auth_request_uri}\n\n'
      f'2. After successful login and authorization, your browser will be redirected.\n'
      f'   Copy the *entire* URL from the browser\'s address bar.\n\n'
      f'3. Paste the copied URL here and press Enter:\n\n> '
  )

  # --- Step 5: Prepare Authentication Response for the Agent ---
  # Update the AuthConfig object with the information gathered from the user.
  # The ADK framework needs the full response URI (containing the code)
  # and the original redirect URI to complete the OAuth token exchange process internally.
  auth_config.exchanged_auth_credential.oauth2.auth_response_uri = auth_response_uri
  auth_config.exchanged_auth_credential.oauth2.redirect_uri = redirect_uri

  # Construct a FunctionResponse Content object to send back to the agent/runner.
  # This response explicitly targets the 'adk_request_credential' function call
  # identified earlier by its ID.
  auth_content = types.Content(
      role='user',
      parts=[
          types.Part(
              function_response=types.FunctionResponse(
                  # Crucially, link this response to the original request using the saved ID.
                  id=auth_request_event_id,
                  # The special name of the function call we are responding to.
                  name='adk_request_credential',
                  # The payload containing all necessary authentication details.
                  response=auth_config.model_dump(),
              )
          )
      ],
  )

  # --- Step 6: Resume Execution with Authentication ---
  print("\nSubmitting authentication details back to the agent...")
  # Run the agent again, this time providing the `auth_content` (FunctionResponse).
  # The ADK Runner intercepts this, processes the 'adk_request_credential' response
  # (performs token exchange, stores credentials), and then allows the agent
  # to retry the original tool call that required authentication, now succeeding with
  # a valid access token embedded.
  events_async = runner.run_async(
      session_id=session.id,
      user_id='user',
      new_message=auth_content, # Provide the prepared auth response
  )

  # Process and print the final events from the agent after authentication is complete.
  # This stream now contain the actual result from the tool (e.g., the user info).
  print("\n--- Agent Response after Authentication ---")
  async for event in events_async:
    print(event)


if __name__ == '__main__':
  asyncio.run(async_main())
helpers.py
from google.adk.auth import AuthConfig
from google.adk.events import Event
import asyncio

# --- Helper Functions ---
async def get_user_input(prompt: str) -> str:
  """
  Asynchronously prompts the user for input in the console.

  Uses asyncio's event loop and run_in_executor to avoid blocking the main
  asynchronous execution thread while waiting for synchronous `input()`.

  Args:
    prompt: The message to display to the user.

  Returns:
    The string entered by the user.
  """
  loop = asyncio.get_event_loop()
  # Run the blocking `input()` function in a separate thread managed by the executor.
  return await loop.run_in_executor(None, input, prompt)


def is_pending_auth_event(event: Event) -> bool:
  """
  Checks if an ADK Event represents a request for user authentication credentials.

  The ADK framework emits a specific function call ('adk_request_credential')
  when a tool requires authentication that hasn't been previously satisfied.

  Args:
    event: The ADK Event object to inspect.

  Returns:
    True if the event is an 'adk_request_credential' function call, False otherwise.
  """
  # Safely checks nested attributes to avoid errors if event structure is incomplete.
  return (
      event.content
      and event.content.parts
      and event.content.parts[0] # Assuming the function call is in the first part
      and event.content.parts[0].function_call
      # The specific function name indicating an auth request from the ADK framework.
      and event.content.parts[0].function_call.name == 'adk_request_credential'
  )


def get_function_call_id(event: Event) -> str:
  """
  Extracts the unique ID of the function call from an ADK Event.

  This ID is crucial for correlating a function *response* back to the specific
  function *call* that the agent initiated to request for auth credentials.

  Args:
    event: The ADK Event object containing the function call.

  Returns:
    The unique identifier string of the function call.

  Raises:
    ValueError: If the function call ID cannot be found in the event structure.
                (Corrected typo from `contents` to `content` below)
  """
  # Navigate through the event structure to find the function call ID.
  if (
      event
      and event.content
      and event.content.parts
      and event.content.parts[0] # Use content, not contents
      and event.content.parts[0].function_call
      and event.content.parts[0].function_call.id
  ):
    return event.content.parts[0].function_call.id
  # If the ID is missing, raise an error indicating an unexpected event format.
  raise ValueError(f'Cannot get function call id from event {event}')


def get_function_call_auth_config(event: Event) -> AuthConfig:
  """
  Extracts the authentication configuration details from an 'adk_request_credential' event.

  Client should use this AuthConfig to necessary authentication details (like OAuth codes and state)
  and sent it back to the ADK to continue OAuth token exchanging.

  Args:
    event: The ADK Event object containing the 'adk_request_credential' call.

  Returns:
    An AuthConfig object populated with details from the function call arguments.

  Raises:
    ValueError: If the 'auth_config' argument cannot be found in the event.
                (Corrected typo from `contents` to `content` below)
  """
  if (
      event
      and event.content
      and event.content.parts
      and event.content.parts[0] # Use content, not contents
      and event.content.parts[0].function_call
      and event.content.parts[0].function_call.args
      and event.content.parts[0].function_call.args.get('auth_config')
  ):
    # Reconstruct the AuthConfig object using the dictionary provided in the arguments.
    # The ** operator unpacks the dictionary into keyword arguments for the constructor.
    return AuthConfig(
          **event.content.parts[0].function_call.args.get('auth_config')
      )
  raise ValueError(f'Cannot get auth config from event {event}')
openapi: 3.0.1
info:
title: Okta User Info API
version: 1.0.0
description: |-
   API to retrieve user profile information based on a valid Okta OIDC Access Token.
   Authentication is handled via OpenID Connect with Okta.
contact:
   name: API Support
   email: support@example.com # Replace with actual contact if available
servers:
- url: <substitute with your server name>
   description: Production Environment
paths:
/okta-jwt-user-api:
   get:
      summary: Get Authenticated User Info
      description: |-
      Fetches profile details for the user
      operationId: getUserInfo
      tags:
      - User Profile
      security:
      - okta_oidc:
            - openid
            - email
            - profile
      responses:
      '200':
         description: Successfully retrieved user information.
         content:
            application/json:
            schema:
               type: object
               properties:
                  sub:
                  type: string
                  description: Subject identifier for the user.
                  example: "abcdefg"
                  name:
                  type: string
                  description: Full name of the user.
                  example: "Example LastName"
                  locale:
                  type: string
                  description: User's locale, e.g., en-US or en_US.
                  example: "en_US"
                  email:
                  type: string
                  format: email
                  description: User's primary email address.
                  example: "username@example.com"
                  preferred_username:
                  type: string
                  description: Preferred username of the user (often the email).
                  example: "username@example.com"
                  given_name:
                  type: string
                  description: Given name (first name) of the user.
                  example: "Example"
                  family_name:
                  type: string
                  description: Family name (last name) of the user.
                  example: "LastName"
                  zoneinfo:
                  type: string
                  description: User's timezone, e.g., America/Los_Angeles.
                  example: "America/Los_Angeles"
                  updated_at:
                  type: integer
                  format: int64 # Using int64 for Unix timestamp
                  description: Timestamp when the user's profile was last updated (Unix epoch time).
                  example: 1743617719
                  email_verified:
                  type: boolean
                  description: Indicates if the user's email address has been verified.
                  example: true
               required:
                  - sub
                  - name
                  - locale
                  - email
                  - preferred_username
                  - given_name
                  - family_name
                  - zoneinfo
                  - updated_at
                  - email_verified
      '401':
         description: Unauthorized. The provided Bearer token is missing, invalid, or expired.
         content:
            application/json:
            schema:
               $ref: '#/components/schemas/Error'
      '403':
         description: Forbidden. The provided token does not have the required scopes or permissions to access this resource.
         content:
            application/json:
            schema:
               $ref: '#/components/schemas/Error'
components:
securitySchemes:
   okta_oidc:
      type: openIdConnect
      description: Authentication via Okta using OpenID Connect. Requires a Bearer Access Token.
      openIdConnectUrl: https://your-endpoint.okta.com/.well-known/openid-configuration
schemas:
   Error:
      type: object
      properties:
      code:
         type: string
         description: An error code.
      message:
         type: string
         description: A human-readable error message.
      required:
         - code
         - message