使用工具进行身份验证¶
核心概念¶
许多工具需要访问受保护的资源(如 Google Calendar 中的用户数据、Salesforce 记录等),因此需要身份验证。ADK 提供了一个系统来安全地处理各种身份验证方法。
涉及的关键组件有:
AuthScheme
:定义 API 期望身份验证凭据的方式(例如,在标头中作为 API 密钥、OAuth 2.0 Bearer 令牌)。ADK 支持与 OpenAPI 3.0 相同类型的身份验证方案。要了解每种凭据类型的更多信息,请参阅 OpenAPI doc: Authentication。ADK 使用特定的类,如APIKey
、HTTPBearer
、OAuth2
、OpenIdConnectWithConfig
。AuthCredential
:持有开始身份验证过程所需的初始信息(例如,你的应用程序的 OAuth 客户端 ID/密钥、API 密钥值)。它包括一个auth_type
(如API_KEY
、OAUTH2
、SERVICE_ACCOUNT
)指定凭据类型。
一般流程涉及在配置工具时提供这些详细信息。在工具进行 API 调用之前,ADK 会尝试自动将初始凭据交换为可用凭据(如访问令牌)。对于需要用户交互的流程(如 OAuth 同意),会触发涉及智能体客户端应用程序的特定交互过程。
支持的初始凭据类型¶
- API_KEY: 用于简单的键/值身份验证。通常不需要交换。
- HTTP: 可以表示基本身份验证(不推荐/不支持交换)或已获得的 Bearer 令牌。如果是 Bearer 令牌,则不需要交换。
- OAUTH2: 用于标准 OAuth 2.0 流程。需要配置(客户端 ID、密钥、作用域)并且通常会触发用户同意的交互流程。
- OPEN_ID_CONNECT: 基于 OpenID Connect 的身份验证。与 OAuth2 类似,通常需要配置和用户交互。
- SERVICE_ACCOUNT: 用于 Google Cloud 服务账号凭据(JSON 密钥或应用程序默认凭据)。通常交换为 Bearer 令牌。
在工具上配置身份验证¶
你在定义工具时设置身份验证:
-
RestApiTool / OpenAPIToolset:在初始化期间传递
auth_scheme
和auth_credential
-
GoogleApiToolSet 工具:ADK 有内置的第一方工具,如 Google Calendar、BigQuery 等。使用工具集的特定方法。
-
APIHubToolset / ApplicationIntegrationToolset:如果 API Hub 中管理的 API 或 Application Integration 提供的 API 需要身份验证,则在初始化期间传递
auth_scheme
和auth_credential
。
警告
根据你的会话存储后端(SessionService
)和整体应用程序安全状况,将敏感凭据(如访问令牌,尤其是刷新令牌)直接存储在会话状态中可能会带来安全风险。
InMemorySessionService
: 适用于测试和开发,但当进程结束时数据会丢失。由于它是暂时的,风险较小。- 数据库/持久存储: 强烈建议在将令牌数据存储到数据库之前使用强大的加密库(如
cryptography
)对其进行加密,并安全管理加密密钥(例如,使用密钥管理服务)。 - 安全密钥存储: 对于生产环境,将敏感凭据存储在专用的密钥管理器(如 Google Cloud Secret Manager 或 HashiCorp Vault)中是最推荐的方法。你的工具可能只在会话状态中存储短期访问令牌或安全引用(而不是刷新令牌本身),并在需要时从安全存储中获取必要的密钥。
旅程 1:使用经过身份验证的工具构建智能应用程序¶
本节重点介绍在你的智能应用程序中使用需要身份验证的预先存在的工具(如来自 RestApiTool/ OpenAPIToolset
、APIHubToolset
、GoogleApiToolSet
的工具)。你的主要责任是配置这些工具并处理交互式身份验证流程的客户端部分(如果工具需要)。
1. 配置带有身份验证的工具¶
向你的智能体添加经过身份验证的工具时,你需要提供其所需的 AuthScheme
和你的应用程序的初始 AuthCredential
。
A. 使用基于 OpenAPI 的工具集(OpenAPIToolset
、APIHubToolset
等)
在工具集初始化期间传递方案和凭据。工具集将它们应用于所有生成的工具。以下是在 ADK 中创建带有身份验证的工具的几种方法。
创建一个需要 API 密钥的工具。
from google.adk.tools.openapi_tool.auth.auth_helpers import token_to_scheme_credential
from google.adk.tools.apihub_tool.apihub_toolset import APIHubToolset
auth_scheme, auth_credential = token_to_scheme_credential(
"apikey", "query", "apikey", YOUR_API_KEY_STRING
)
sample_api_toolset = APIHubToolset(
name="sample-api-requiring-api-key",
description="A tool using an API protected by API Key",
apihub_resource_name="...",
auth_scheme=auth_scheme,
auth_credential=auth_credential,
)
创建一个需要 OAuth2 的工具。
from google.adk.tools.openapi_tool.openapi_spec_parser.openapi_toolset import OpenAPIToolset
from fastapi.openapi.models import OAuth2
from fastapi.openapi.models import OAuthFlowAuthorizationCode
from fastapi.openapi.models import OAuthFlows
from google.adk.auth import AuthCredential
from google.adk.auth import AuthCredentialTypes
from google.adk.auth import OAuth2Auth
auth_scheme = OAuth2(
flows=OAuthFlows(
authorizationCode=OAuthFlowAuthorizationCode(
authorizationUrl="https://accounts.google.com/o/oauth2/auth",
tokenUrl="https://oauth2.googleapis.com/token",
scopes={
"https://www.googleapis.com/auth/calendar": "calendar scope"
},
)
)
)
auth_credential = AuthCredential(
auth_type=AuthCredentialTypes.OAUTH2,
oauth2=OAuth2Auth(
client_id=YOUR_OAUTH_CLIENT_ID,
client_secret=YOUR_OAUTH_CLIENT_SECRET
),
)
calendar_api_toolset = OpenAPIToolset(
spec_str=google_calendar_openapi_spec_str, # 此处填写 openapi 规范字符串
spec_str_type='yaml',
auth_scheme=auth_scheme,
auth_credential=auth_credential,
)
创建一个需要服务账号的工具。
from google.adk.tools.openapi_tool.auth.auth_helpers import service_account_dict_to_scheme_credential
from google.adk.tools.openapi_tool.openapi_spec_parser.openapi_toolset import OpenAPIToolset
service_account_cred = json.loads(service_account_json_str)
auth_scheme, auth_credential = service_account_dict_to_scheme_credential(
config=service_account_cred,
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
sample_toolset = OpenAPIToolset(
spec_str=sa_openapi_spec_str, # 此处填写 openapi 规范字符串
spec_str_type='json',
auth_scheme=auth_scheme,
auth_credential=auth_credential,
)
创建一个需要 OpenID connect 的工具。
from google.adk.auth.auth_schemes import OpenIdConnectWithConfig
from google.adk.auth.auth_credential import AuthCredential, AuthCredentialTypes, OAuth2Auth
from google.adk.tools.openapi_tool.openapi_spec_parser.openapi_toolset import OpenAPIToolset
auth_scheme = OpenIdConnectWithConfig(
authorization_endpoint=OAUTH2_AUTH_ENDPOINT_URL,
token_endpoint=OAUTH2_TOKEN_ENDPOINT_URL,
scopes=['openid', 'YOUR_OAUTH_SCOPES"]
)
auth_credential = AuthCredential(
auth_type=AuthCredentialTypes.OPEN_ID_CONNECT,
oauth2=OAuth2Auth(
client_id="...",
client_secret="...",
)
)
userinfo_toolset = OpenAPIToolset(
spec_str=content, # 此处填写实际规范
spec_str_type='yaml',
auth_scheme=auth_scheme,
auth_credential=auth_credential,
)
B. 使用 Google API 工具集(例如,calendar_tool_set
)
这些工具集通常具有专用的配置方法。
提示:有关如何创建 Google OAuth 客户端 ID 和密钥的指南,请参阅此指南:获取你的 Google API 客户端 ID
# 示例:配置 Google Calendar 工具
from google.adk.tools.google_api_tool import calendar_tool_set
client_id = "YOUR_GOOGLE_OAUTH_CLIENT_ID.apps.googleusercontent.com"
client_secret = "YOUR_GOOGLE_OAUTH_CLIENT_SECRET"
# 使用该工具集类型的专用配置方法
calendar_tool_set.configure_auth(
client_id=oauth_client_id, client_secret=oauth_client_secret
)
# agent = LlmAgent(..., tools=calendar_tool_set.get_tool('calendar_tool_set'))
身份验证请求流程(工具请求身份验证凭据)的序列图如下所示:
2. 处理交互式 OAuth/OIDC 流程(客户端)¶
如果工具需要用户登录/同意(通常是 OAuth 2.0 或 OIDC),ADK 框架会暂停执行并向你的智能体客户端应用程序发出信号。存在两种情况:
- 智能体客户端应用程序在同一进程中直接运行智能体(通过
runner.run_async
)。例如 UI 后端、CLI 应用或 Spark 作业等。 - 智能体客户端应用程序通过
/run
或/run_sse
端点与 ADK 的 fastapi 服务器交互。ADK 的 fastapi 服务器可以设置在与智能体客户端应用程序相同或不同的服务器上。
第二种情况是第一种情况的特殊情况,因为 /run
或 /run_sse
端点也会调用 runner.run_async
。唯一的区别是:
- 是调用 Python 函数来运行智能体(第一种情况)还是调用服务端点来运行智能体(第二种情况)。
- 结果事件是内存中的对象(第一种情况)还是 HTTP 响应中的序列化 JSON 字符串(第二种情况)。
以下部分重点关注第一种情况,你应该能够非常直接地将其映射到第二种情况。如有必要,我们还将描述第二种情况需要处理的一些差异。
以下是你的客户端应用程序的逐步过程:
步骤 1:运行智能体并检测身份验证请求 {: #run-agent-and-detect-auth-request}
- 使用
runner.run_async
启动智能体交互。 - 遍历产生的事件。
- 寻找一个特定的函数调用事件,其函数调用具有特殊名称:
adk_request_credential
。此事件表示需要用户交互。你可以使用辅助函数来识别此事件并提取必要的信息。(对于第二种情况,逻辑类似。你从 HTTP 响应中反序列化事件)。
# runner = Runner(...)
# session = await session_service.create_session(...)
# content = types.Content(...) # 用户的初始查询
print("\n运行智能体...")
events_async = runner.run_async(
session_id=session.id, user_id='user', new_message=content
)
auth_request_function_call_id, auth_config = None, None
async for event in events_async:
# 使用辅助函数检查是否为特定的身份验证请求事件
if (auth_request_function_call := get_auth_request_function_call(event)):
print("--> 智能体需要身份验证。")
# 保存后续响应所需的 ID
if not (auth_request_function_call_id := auth_request_function_call.id):
raise ValueError(f'无法从函数调用中获取 function call id: {auth_request_function_call}')
# 获取包含 auth_uri 等的 AuthConfig
auth_config = get_auth_config(auth_request_function_call)
break # 暂停处理事件,等待用户交互
if not auth_request_function_call_id:
print("\n不需要身份验证或智能体已完成。")
# return # 或处理已收到的最终响应
辅助函数 helpers.py
:
from google.adk.events import Event
from google.adk.auth import AuthConfig # 导入必要类型
from google.genai import types
def get_auth_request_function_call(event: Event) -> types.FunctionCall:
# 从事件中获取特殊的身份验证请求函数调用
if not event.content or event.content.parts:
return
for part in event.content.parts:
if (
part
and part.function_call
and part.function_call.name == 'adk_request_credential'
and event.long_running_tool_ids
and part.function_call.id in event.long_running_tool_ids
):
return part.function_call
def get_auth_config(auth_request_function_call: types.FunctionCall) -> AuthConfig:
# 从身份验证请求函数调用的参数中提取 AuthConfig 对象
if not auth_request_function_call.args or not (auth_config := auth_request_function_call.args.get('auth_config')):
raise ValueError(f'无法从函数调用中获取 auth config: {auth_request_function_call}')
if not isinstance(auth_config, AuthConfig):
raise ValueError(f'获取到的 auth config {auth_config} 不是 AuthConfig 实例。')
return auth_config
步骤 2:重定向用户进行授权
- 从上一步提取的
auth_config
中获取授权 URL (auth_uri
)。 - 至关重要的是,将你的应用程序的 redirect_uri 作为查询参数附加到此
auth_uri
。这个redirect_uri
必须预先注册在你的 OAuth 提供商处(例如,Google Cloud Console,Okta 管理面板)。 - 引导用户到这个完整的 URL(例如,在他们的浏览器中打开它)。
# (在检测到需要身份验证后继续)
if auth_request_function_call_id and auth_config:
# 从 AuthConfig 获取基础授权 URL
base_auth_uri = auth_config.exchanged_auth_credential.oauth2.auth_uri
if base_auth_uri:
redirect_uri = 'http://localhost:8000/callback' # 必须与你的 OAuth 客户端应用配置一致
# 添加 redirect_uri(生产环境请用 urlencode)
auth_request_uri = base_auth_uri + f'&redirect_uri={redirect_uri}'
# 现在你需要将终端用户重定向到此 auth_request_uri,或让他们在浏览器中打开
# 该 auth_request_uri 应由对应的认证提供方服务,终端用户登录并授权你的应用访问其数据
# 然后认证提供方会将终端用户重定向到你提供的 redirect_uri
# 下一步:从用户(或你的 Web 服务器处理器)获取此回调 URL
else:
print("错误:在 auth_config 中未找到授权 URI。")
# 处理错误
步骤 3. 处理重定向回调(客户端): {: #handle-the-redirect-callback-client}
- 你的应用程序必须有一个机制(例如,在
redirect_uri
的 Web 服务器路由)来接收用户在提供商那里授权应用程序后的请求。 - 提供商将用户重定向到你的
redirect_uri
,并将authorization_code
(以及可能的state
、scope
)作为查询参数附加到 URL。 - 从此传入请求中捕获完整的回调 URL。
- (此步骤发生在主智能体执行循环之外,在你的 Web 服务器或等效回调处理程序中。)
步骤 4. 将身份验证结果发送回 ADK(客户端): {: #send-authentication-result-back-to-adk-client}
- 一旦你获得了完整的回调 URL(包含授权码),检索在客户端步骤 1 中保存的
auth_request_function_call_id
和auth_config
对象。 - 将捕获的回调 URL 设置到
exchanged_auth_credential.oauth2.auth_response_uri
字段中。还要确保exchanged_auth_credential.oauth2.redirect_uri
包含你使用的重定向 URI。 - 创建一个
types.Content
对象,包含带有types.FunctionResponse
的types.Part
。- 将
name
设置为"adk_request_credential"
。(注意:这是 ADK 继续身份验证的特殊名称。不要使用其他名称。) - 将
id
设置为你保存的auth_request_function_call_id
。 - 将
response
设置为序列化的(例如,.model_dump()
)更新后的AuthConfig
对象。
- 将
- 为同一会话再次调用
runner.run_async
,将此FunctionResponse
内容作为new_message
传递。
# (用户交互后继续)
# 模拟获取回调 URL(例如,从用户粘贴或 Web 处理程序)
auth_response_uri = await get_user_input(
f'在此处粘贴完整的回调 URL:\n> '
)
auth_response_uri = auth_response_uri.strip() # 清理输入
if not auth_response_uri:
print("未提供回调 URL。中止。")
return
# 使用回调详细信息更新收到的 AuthConfig
auth_config.exchanged_auth_credential.oauth2.auth_response_uri = auth_response_uri
# 也包括使用的 redirect_uri,因为令牌交换可能需要它
auth_config.exchanged_auth_credential.oauth2.redirect_uri = redirect_uri
# 构造 FunctionResponse Content 对象
auth_content = types.Content(
role='user', # 发送 FunctionResponse 时角色可以是 'user'
parts=[
types.Part(
function_response=types.FunctionResponse(
id=auth_request_function_call_id, # 关联原始请求
name='adk_request_credential', # 框架专用函数名
response=auth_config.model_dump() # 返回*更新后的* AuthConfig
)
)
],
)
# --- 恢复执行 ---
print("\n将身份验证详细信息提交回智能体...")
events_async_after_auth = runner.run_async(
session_id=session.id,
user_id='user',
new_message=auth_content, # 发送回 FunctionResponse
)
# --- 处理智能体最终输出 ---
print("\n--- 身份验证后的智能体响应 ---")
async for event in events_async_after_auth:
# 正常处理事件,期望工具调用现在成功
print(event) # 打印完整事件以供检查
步骤 5:ADK 处理令牌交换和工具重试并获取工具结果 {: #adk-handles-token-exchange-and-tool-retry}
- ADK 接收
adk_request_credential
的FunctionResponse
。 - 它使用更新的
AuthConfig
中的信息(包括包含授权码的回调 URL)与提供商的令牌端点执行 OAuth 令牌交换,获取访问令牌(可能还有刷新令牌)。 - ADK 通过在会话状态中设置这些令牌使它们在内部可用。
- ADK 自动重试最初的工具调用(最初因缺少身份验证而失败的那个)。
- 这次,工具通过
tool_context.get_auth_response()
找到有效的令牌并成功执行经过身份验证的 API 调用。 - 智能体从工具接收实际结果并为用户生成最终响应。
身份验证响应流程(智能体客户端发回身份验证响应并且 ADK 重试工具调用)的序列图如下所示:
旅程 2:构建需要身份验证的自定义工具(FunctionTool
)¶
本节重点介绍在创建新的 ADK 工具时在自定义 Python 函数内部实现身份验证逻辑。我们将实现一个 FunctionTool
作为示例。
先决条件¶
你的函数签名必须包括 tool_context: ToolContext
。ADK 自动注入此对象,提供对状态和身份验证机制的访问。
from google.adk.tools import FunctionTool, ToolContext
from typing import Dict
def my_authenticated_tool_function(param1: str, ..., tool_context: ToolContext) -> dict:
# ... 你的逻辑 ...
pass
my_tool = FunctionTool(func=my_authenticated_tool_function)
工具函数内的身份验证逻辑¶
在你的函数内实施以下步骤:
步骤 1:检查缓存和有效凭据: {: #check-cache-and-valid-credentials}
在你的工具函数内,首先检查是否已经在此会话的之前运行中存储了有效的凭据(例如,访问/刷新令牌)。当前会话的凭据应存储在 tool_context.invocation_context.session.state
(状态字典)中。通过检查 tool_context.invocation_context.session.state.get(credential_name, None)
来检查现有凭据的存在。
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
# 在你的工具函数内
TOKEN_CACHE_KEY = "my_tool_tokens" # 选择一个唯一的键
SCOPES = ["scope1", "scope2"] # 定义所需的作用域
creds = None
cached_token_info = tool_context.state.get(TOKEN_CACHE_KEY)
if cached_token_info:
try:
creds = Credentials.from_authorized_user_info(cached_token_info, SCOPES)
if not creds.valid and creds.expired and creds.refresh_token:
creds.refresh(Request())
tool_context.state[TOKEN_CACHE_KEY] = json.loads(creds.to_json()) # 更新缓存
elif not creds.valid:
creds = None # 无效,需要重新认证
tool_context.state[TOKEN_CACHE_KEY] = None
except Exception as e:
print(f"加载/刷新缓存凭据时出错:{e}")
creds = None
tool_context.state[TOKEN_CACHE_KEY] = None
if creds and creds.valid:
# 跳到步骤 5:进行经过身份验证的 API 调用
pass
else:
# 继续步骤 2...
pass
步骤 2:检查来自客户端的身份验证响应 {: #check-auth-response-from-client}
- 如果步骤 1 没有产生有效的凭据,通过调用
exchanged_credential = tool_context.get_auth_response()
检查客户端是否刚刚完成了交互式流程。 - 这将返回由客户端发回的更新后的
exchanged_credential
对象(在auth_response_uri
中包含回调 URL)。
# 使用工具中配置的 auth_scheme 和 auth_credential。
# exchanged_credential: AuthCredential | None
exchanged_credential = tool_context.get_auth_response(AuthConfig(
auth_scheme=auth_scheme,
raw_auth_credential=auth_credential,
))
# 如果 exchanged_credential 不为 None,那么已经有来自身份验证响应的交换凭据。
if exchanged_credential:
# ADK 已经为我们交换了访问令牌
access_token = exchanged_credential.oauth2.access_token
refresh_token = exchanged_credential.oauth2.refresh_token
creds = Credentials(
token=access_token,
refresh_token=refresh_token,
token_uri=auth_scheme.flows.authorizationCode.tokenUrl,
client_id=auth_credential.oauth2.client_id,
client_secret=auth_credential.oauth2.client_secret,
scopes=list(auth_scheme.flows.authorizationCode.scopes.keys()),
)
# 将令牌缓存在会话状态中并调用 API,跳到步骤 5
步骤 3:发起身份验证请求 {: #initiate-auth-request}
如果没有找到有效的凭据(步骤 1)和身份验证响应(步骤 2),工具需要启动 OAuth 流程。定义 AuthScheme 和初始 AuthCredential 并调用 tool_context.request_credential()
。返回表示需要授权的响应。
# 使用在工具中配置的 auth_scheme 和 auth_credential。
tool_context.request_credential(AuthConfig(
auth_scheme=auth_scheme,
raw_auth_credential=auth_credential,
))
return {'pending': true, 'message': '等待用户身份验证。'}
# 通过设置 request_credential,ADK 检测到一个待处理的身份验证事件。它会暂停执行并要求最终用户登录。
步骤 4:将授权码交换为令牌 {: #exchange-auth-code-for-token}
ADK 自动生成 OAuth 授权 URL 并将其呈现给你的智能体客户端应用程序。你的智能体客户端应用程序应按照旅程 1 中描述的相同方式,将用户重定向到授权 URL(附加 redirect_uri
)。一旦用户按照授权 URL 完成登录流程,并且 ADK 从智能体客户端应用程序中提取身份验证回调 URL,自动解析授权码并生成身份验证令牌。在下一次工具调用中,步骤 2 中的 tool_context.get_auth_response
将包含一个可在后续 API 调用中使用的有效凭据。
步骤 5:缓存获得的凭据 {: #cache-obtained-credentials}
在从 ADK 成功获取令牌(步骤 2)或如果令牌仍然有效(步骤 1)后,使用你的缓存键立即存储新的 Credentials
对象到 tool_context.state
(序列化,例如,作为 JSON)。
# 在你的工具函数内,在获取 'creds'(无论是刷新的还是新交换的)之后
# 缓存新的/刷新的令牌
tool_context.state[TOKEN_CACHE_KEY] = json.loads(creds.to_json())
print(f"调试:在键下缓存/更新令牌:{TOKEN_CACHE_KEY}")
# 继续步骤 6(进行 API 调用)
步骤 6:进行经过身份验证的 API 调用 {: #make-authenticated-api-call}
- 一旦你有了有效的
Credentials
对象(来自步骤 1 或步骤 4 的creds
),使用它通过适当的客户端库(例如,googleapiclient
,requests
)对受保护的 API 进行实际调用。传递credentials=creds
参数。 - 包括错误处理,特别是
HttpError
401/403,这可能意味着令牌在调用之间过期或被撤销。如果你得到这样的错误,考虑清除缓存的令牌(tool_context.state.pop(...)
)并可能再次返回auth_required
状态以强制重新认证。
# 在你的工具函数内,使用有效的 'creds' 对象
# 确保 creds 在继续之前是有效的
if not creds or not creds.valid:
return {"status": "error", "error_message": "没有有效的凭据无法继续。"}
try:
service = build("calendar", "v3", credentials=creds) # 示例
api_result = service.events().list(...).execute()
# 继续步骤 7
except Exception as e:
# 处理 API 错误(例如,检查 401/403,可能清除缓存并重新请求认证)
print(f"错误:API 调用失败:{e}")
return {"status": "error", "error_message": f"API 调用失败:{e}"}
步骤 7:返回工具结果 {: #return-tool-results}
- 在成功的 API 调用之后,将结果处理成对 LLM 有用的字典格式。
- 至关重要的是,包括一个状态以及数据。
# 在你的工具函数内,成功 API 调用后
processed_result = [...] # 为 LLM 处理 api_result
return {"status": "success", "data": processed_result}
完整代码
import os
from google.adk.auth.auth_schemes import OpenIdConnectWithConfig
from google.adk.auth.auth_credential import AuthCredential, AuthCredentialTypes, OAuth2Auth
from google.adk.tools.openapi_tool.openapi_spec_parser.openapi_toolset import OpenAPIToolset
from google.adk.agents.llm_agent import LlmAgent
# --- Authentication Configuration ---
# This section configures how the agent will handle authentication using OpenID Connect (OIDC),
# often layered on top of OAuth 2.0.
# Define the Authentication Scheme using OpenID Connect.
# This object tells the ADK *how* to perform the OIDC/OAuth2 flow.
# It requires details specific to your Identity Provider (IDP), like Google OAuth, Okta, Auth0, etc.
# Note: Replace the example Okta URLs and credentials with your actual IDP details.
# All following fields are required, and available from your IDP.
auth_scheme = OpenIdConnectWithConfig(
# The URL of the IDP's authorization endpoint where the user is redirected to log in.
authorization_endpoint="https://your-endpoint.okta.com/oauth2/v1/authorize",
# The URL of the IDP's token endpoint where the authorization code is exchanged for tokens.
token_endpoint="https://your-token-endpoint.okta.com/oauth2/v1/token",
# The scopes (permissions) your application requests from the IDP.
# 'openid' is standard for OIDC. 'profile' and 'email' request user profile info.
scopes=['openid', 'profile', "email"]
)
# Define the Authentication Credentials for your specific application.
# This object holds the client identifier and secret that your application uses
# to identify itself to the IDP during the OAuth2 flow.
# !! SECURITY WARNING: Avoid hardcoding secrets in production code. !!
# !! Use environment variables or a secret management system instead. !!
auth_credential = AuthCredential(
auth_type=AuthCredentialTypes.OPEN_ID_CONNECT,
oauth2=OAuth2Auth(
client_id="CLIENT_ID",
client_secret="CIENT_SECRET",
)
)
# --- Toolset Configuration from OpenAPI Specification ---
# This section defines a sample set of tools the agent can use, configured with Authentication
# from steps above.
# This sample set of tools use endpoints protected by Okta and requires an OpenID Connect flow
# to acquire end user credentials.
with open(os.path.join(os.path.dirname(__file__), 'spec.yaml'), 'r') as f:
spec_content = f.read()
userinfo_toolset = OpenAPIToolset(
spec_str=spec_content,
spec_str_type='yaml',
# ** Crucially, associate the authentication scheme and credentials with these tools. **
# This tells the ADK that the tools require the defined OIDC/OAuth2 flow.
auth_scheme=auth_scheme,
auth_credential=auth_credential,
)
# --- Agent Configuration ---
# Configure and create the main LLM Agent.
root_agent = LlmAgent(
model='gemini-2.0-flash',
name='enterprise_assistant',
instruction='Help user integrate with multiple enterprise systems, including retrieving user information which may require authentication.',
tools=userinfo_toolset.get_tools(),
)
# --- Ready for Use ---
# The `root_agent` is now configured with tools protected by OIDC/OAuth2 authentication.
# When the agent attempts to use one of these tools, the ADK framework will automatically
# trigger the authentication flow defined by `auth_scheme` and `auth_credential`
# if valid credentials are not already available in the session.
# The subsequent interaction flow would guide the user through the login process and handle
# token exchanging, and automatically attach the exchanged token to the endpoint defined in
# the tool.
import asyncio
from dotenv import load_dotenv
from google.adk.artifacts.in_memory_artifact_service import InMemoryArtifactService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
from .helpers import is_pending_auth_event, get_function_call_id, get_function_call_auth_config, get_user_input
from .tools_and_agent import root_agent
load_dotenv()
agent = root_agent
async def async_main():
"""
Main asynchronous function orchestrating the agent interaction and authentication flow.
"""
# --- Step 1: Service Initialization ---
# Use in-memory services for session and artifact storage (suitable for demos/testing).
session_service = InMemorySessionService()
artifacts_service = InMemoryArtifactService()
# Create a new user session to maintain conversation state.
session = session_service.create_session(
state={}, # Optional state dictionary for session-specific data
app_name='my_app', # Application identifier
user_id='user' # User identifier
)
# --- Step 2: Initial User Query ---
# Define the user's initial request.
query = 'Show me my user info'
print(f"user: {query}")
# Format the query into the Content structure expected by the ADK Runner.
content = types.Content(role='user', parts=[types.Part(text=query)])
# Initialize the ADK Runner
runner = Runner(
app_name='my_app',
agent=agent,
artifact_service=artifacts_service,
session_service=session_service,
)
# --- Step 3: Send Query and Handle Potential Auth Request ---
print("\nRunning agent with initial query...")
events_async = runner.run_async(
session_id=session.id, user_id='user', new_message=content
)
# Variables to store details if an authentication request occurs.
auth_request_event_id, auth_config = None, None
# Iterate through the events generated by the first run.
async for event in events_async:
# Check if this event is the specific 'adk_request_credential' function call.
if is_pending_auth_event(event):
print("--> Authentication required by agent.")
auth_request_event_id = get_function_call_id(event)
auth_config = get_function_call_auth_config(event)
# Once the auth request is found and processed, exit this loop.
# We need to pause execution here to get user input for authentication.
break
# If no authentication request was detected after processing all events, exit.
if not auth_request_event_id or not auth_config:
print("\nAuthentication not required for this query or processing finished.")
return # Exit the main function
# --- Step 4: Manual Authentication Step (Simulated OAuth 2.0 Flow) ---
# This section simulates the user interaction part of an OAuth 2.0 flow.
# In a real web application, this would involve browser redirects.
# Define the Redirect URI. This *must* match one of the URIs registered
# with the OAuth provider for your application. The provider sends the user
# back here after they approve the request.
redirect_uri = 'http://localhost:8000/dev-ui' # Example for local development
# Construct the Authorization URL that the user must visit.
# This typically includes the provider's authorization endpoint URL,
# client ID, requested scopes, response type (e.g., 'code'), and the redirect URI.
# Here, we retrieve the base authorization URI from the AuthConfig provided by ADK
# and append the redirect_uri.
# NOTE: A robust implementation would use urlencode and potentially add state, scope, etc.
auth_request_uri = (
auth_config.exchanged_auth_credential.oauth2.auth_uri
+ f'&redirect_uri={redirect_uri}' # Simple concatenation; ensure correct query param format
)
print("\n--- User Action Required ---")
# Prompt the user to visit the authorization URL, log in, grant permissions,
# and then paste the *full* URL they are redirected back to (which contains the auth code).
auth_response_uri = await get_user_input(
f'1. Please open this URL in your browser to log in:\n {auth_request_uri}\n\n'
f'2. After successful login and authorization, your browser will be redirected.\n'
f' Copy the *entire* URL from the browser\'s address bar.\n\n'
f'3. Paste the copied URL here and press Enter:\n\n> '
)
# --- Step 5: Prepare Authentication Response for the Agent ---
# Update the AuthConfig object with the information gathered from the user.
# The ADK framework needs the full response URI (containing the code)
# and the original redirect URI to complete the OAuth token exchange process internally.
auth_config.exchanged_auth_credential.oauth2.auth_response_uri = auth_response_uri
auth_config.exchanged_auth_credential.oauth2.redirect_uri = redirect_uri
# Construct a FunctionResponse Content object to send back to the agent/runner.
# This response explicitly targets the 'adk_request_credential' function call
# identified earlier by its ID.
auth_content = types.Content(
role='user',
parts=[
types.Part(
function_response=types.FunctionResponse(
# Crucially, link this response to the original request using the saved ID.
id=auth_request_event_id,
# The special name of the function call we are responding to.
name='adk_request_credential',
# The payload containing all necessary authentication details.
response=auth_config.model_dump(),
)
)
],
)
# --- Step 6: Resume Execution with Authentication ---
print("\nSubmitting authentication details back to the agent...")
# Run the agent again, this time providing the `auth_content` (FunctionResponse).
# The ADK Runner intercepts this, processes the 'adk_request_credential' response
# (performs token exchange, stores credentials), and then allows the agent
# to retry the original tool call that required authentication, now succeeding with
# a valid access token embedded.
events_async = runner.run_async(
session_id=session.id,
user_id='user',
new_message=auth_content, # Provide the prepared auth response
)
# Process and print the final events from the agent after authentication is complete.
# This stream now contain the actual result from the tool (e.g., the user info).
print("\n--- Agent Response after Authentication ---")
async for event in events_async:
print(event)
if __name__ == '__main__':
asyncio.run(async_main())
from google.adk.auth import AuthConfig
from google.adk.events import Event
import asyncio
# --- Helper Functions ---
async def get_user_input(prompt: str) -> str:
"""
Asynchronously prompts the user for input in the console.
Uses asyncio's event loop and run_in_executor to avoid blocking the main
asynchronous execution thread while waiting for synchronous `input()`.
Args:
prompt: The message to display to the user.
Returns:
The string entered by the user.
"""
loop = asyncio.get_event_loop()
# Run the blocking `input()` function in a separate thread managed by the executor.
return await loop.run_in_executor(None, input, prompt)
def is_pending_auth_event(event: Event) -> bool:
"""
Checks if an ADK Event represents a request for user authentication credentials.
The ADK framework emits a specific function call ('adk_request_credential')
when a tool requires authentication that hasn't been previously satisfied.
Args:
event: The ADK Event object to inspect.
Returns:
True if the event is an 'adk_request_credential' function call, False otherwise.
"""
# Safely checks nested attributes to avoid errors if event structure is incomplete.
return (
event.content
and event.content.parts
and event.content.parts[0] # Assuming the function call is in the first part
and event.content.parts[0].function_call
# The specific function name indicating an auth request from the ADK framework.
and event.content.parts[0].function_call.name == 'adk_request_credential'
)
def get_function_call_id(event: Event) -> str:
"""
Extracts the unique ID of the function call from an ADK Event.
This ID is crucial for correlating a function *response* back to the specific
function *call* that the agent initiated to request for auth credentials.
Args:
event: The ADK Event object containing the function call.
Returns:
The unique identifier string of the function call.
Raises:
ValueError: If the function call ID cannot be found in the event structure.
(Corrected typo from `contents` to `content` below)
"""
# Navigate through the event structure to find the function call ID.
if (
event
and event.content
and event.content.parts
and event.content.parts[0] # Use content, not contents
and event.content.parts[0].function_call
and event.content.parts[0].function_call.id
):
return event.content.parts[0].function_call.id
# If the ID is missing, raise an error indicating an unexpected event format.
raise ValueError(f'Cannot get function call id from event {event}')
def get_function_call_auth_config(event: Event) -> AuthConfig:
"""
Extracts the authentication configuration details from an 'adk_request_credential' event.
Client should use this AuthConfig to necessary authentication details (like OAuth codes and state)
and sent it back to the ADK to continue OAuth token exchanging.
Args:
event: The ADK Event object containing the 'adk_request_credential' call.
Returns:
An AuthConfig object populated with details from the function call arguments.
Raises:
ValueError: If the 'auth_config' argument cannot be found in the event.
(Corrected typo from `contents` to `content` below)
"""
if (
event
and event.content
and event.content.parts
and event.content.parts[0] # Use content, not contents
and event.content.parts[0].function_call
and event.content.parts[0].function_call.args
and event.content.parts[0].function_call.args.get('auth_config')
):
# Reconstruct the AuthConfig object using the dictionary provided in the arguments.
# The ** operator unpacks the dictionary into keyword arguments for the constructor.
return AuthConfig(
**event.content.parts[0].function_call.args.get('auth_config')
)
raise ValueError(f'Cannot get auth config from event {event}')
openapi: 3.0.1
info:
title: Okta User Info API
version: 1.0.0
description: |-
API to retrieve user profile information based on a valid Okta OIDC Access Token.
Authentication is handled via OpenID Connect with Okta.
contact:
name: API Support
email: support@example.com # Replace with actual contact if available
servers:
- url: <substitute with your server name>
description: Production Environment
paths:
/okta-jwt-user-api:
get:
summary: Get Authenticated User Info
description: |-
Fetches profile details for the user
operationId: getUserInfo
tags:
- User Profile
security:
- okta_oidc:
- openid
- email
- profile
responses:
'200':
description: Successfully retrieved user information.
content:
application/json:
schema:
type: object
properties:
sub:
type: string
description: Subject identifier for the user.
example: "abcdefg"
name:
type: string
description: Full name of the user.
example: "Example LastName"
locale:
type: string
description: User's locale, e.g., en-US or en_US.
example: "en_US"
email:
type: string
format: email
description: User's primary email address.
example: "username@example.com"
preferred_username:
type: string
description: Preferred username of the user (often the email).
example: "username@example.com"
given_name:
type: string
description: Given name (first name) of the user.
example: "Example"
family_name:
type: string
description: Family name (last name) of the user.
example: "LastName"
zoneinfo:
type: string
description: User's timezone, e.g., America/Los_Angeles.
example: "America/Los_Angeles"
updated_at:
type: integer
format: int64 # Using int64 for Unix timestamp
description: Timestamp when the user's profile was last updated (Unix epoch time).
example: 1743617719
email_verified:
type: boolean
description: Indicates if the user's email address has been verified.
example: true
required:
- sub
- name
- locale
- email
- preferred_username
- given_name
- family_name
- zoneinfo
- updated_at
- email_verified
'401':
description: Unauthorized. The provided Bearer token is missing, invalid, or expired.
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
'403':
description: Forbidden. The provided token does not have the required scopes or permissions to access this resource.
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
components:
securitySchemes:
okta_oidc:
type: openIdConnect
description: Authentication via Okta using OpenID Connect. Requires a Bearer Access Token.
openIdConnectUrl: https://your-endpoint.okta.com/.well-known/openid-configuration
schemas:
Error:
type: object
properties:
code:
type: string
description: An error code.
message:
type: string
description: A human-readable error message.
required:
- code
- message