diff --git a/examples/README.md b/examples/README.md index b070f35..03a9c35 100644 --- a/examples/README.md +++ b/examples/README.md @@ -9,21 +9,61 @@ This directory contains examples for using the Asgardeo Python SDKs. ## Setup -Before running the examples, make sure to: +### 1. Create a virtual environment + +From the repository root (`sdk/python`): + +```bash +python3 -m venv .venv +source .venv/bin/activate # macOS/Linux +# .venv\Scripts\activate # Windows +``` + +### 2. Install the packages + +Install both packages in editable mode so local source changes are picked up immediately: -1. Install the packages: ```bash -cd asgardeo && poetry install -cd ../asgardeo-ai && poetry install +pip install -e packages/asgardeo -e packages/asgardeo-ai ``` -2. Set up your configuration in each example file with your actual Asgardeo credentials. +### 3. Configure credentials + +Open the example file you want to run and replace the placeholder values with your actual Asgardeo credentials: + +```python +config = AsgardeoConfig( + base_url="https://api.asgardeo.io/t/", + client_id="", + redirect_uri="", + client_secret="" +) +``` + +## Available Examples + +| Example | Description | +|---------|-------------| +| `asgardeo/native_auth.py` | App-native authentication (username/password without browser redirect) | +| `asgardeo-ai/agent_auth.py` | AI agent token acquisition using native auth | +| `asgardeo-ai/obo_flow.py` | On-Behalf-Of (OBO) token flow via authorization code | +| `asgardeo-ai/ciba_obo_flow.py` | On-Behalf-Of (OBO) token flow via CIBA with polling | ## Running Examples -Each example is a standalone Python script that can be run directly: +Make sure the virtual environment is activated, then run any example from the repository root: ```bash -python examples/asgardeo/basic_auth.py +python examples/asgardeo/native_auth.py python examples/asgardeo-ai/agent_auth.py -``` \ No newline at end of file +python examples/asgardeo-ai/obo_flow.py +python examples/asgardeo-ai/ciba_obo_flow.py +``` + +## Asgardeo Prerequisites + +Before running the examples, ensure your application is configured in the Asgardeo Console: + +- **Native auth / Agent auth**: Enable **App-Native Authentication** in the Login Flow tab +- **OBO flow**: Enable **Token Exchange** grant type in the Protocol tab +- **CIBA OBO flow**: Enable **CIBA** grant type in the Protocol tab and configure at least one notification channel (Email, SMS, or External) diff --git a/examples/asgardeo-ai/ciba_obo_flow.py b/examples/asgardeo-ai/ciba_obo_flow.py new file mode 100644 index 0000000..52a9d1b --- /dev/null +++ b/examples/asgardeo-ai/ciba_obo_flow.py @@ -0,0 +1,125 @@ +""" +Copyright (c) 2025, WSO2 LLC. (https://www.wso2.com). +WSO2 LLC. licenses this file to you under the Apache License, +Version 2.0 (the "License"); you may not use this file except +in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +""" + +""" +On-Behalf-Of (OBO) token flow using CIBA. + +This example shows how an AI agent can obtain tokens on behalf of a user +using CIBA. The agent initiates a backchannel authentication request for +the user, and the user authenticates on a separate device (via email, SMS, +or an external link). +""" + +import asyncio +import itertools +import sys + +from asgardeo import AsgardeoConfig, CIBAResponse +from asgardeo_ai import AgentAuthManager, AgentConfig + + +async def _spinner(message: str, stop_event: asyncio.Event) -> None: + """Display a spinning animation until stop_event is set.""" + frames = itertools.cycle(["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]) + while not stop_event.is_set(): + sys.stdout.write(f"\r{next(frames)} {message}") + sys.stdout.flush() + await asyncio.sleep(0.1) + sys.stdout.write(f"\r{' ' * (len(message) + 2)}\r") + sys.stdout.flush() + + +async def main(): + """On-Behalf-Of (OBO) CIBA flow example.""" + + # Asgardeo configuration - Replace with your actual values + config = AsgardeoConfig( + base_url="https://api.asgardeo.io/t/", + client_id="", + redirect_uri="", + client_secret="", + ) + + # AI Agent configuration - Replace with your actual agent credentials + agent_config = AgentConfig( + agent_id="", + agent_secret="" + ) + + try: + async with AgentAuthManager(config, agent_config) as auth_manager: + print("Starting On-Behalf-Of (OBO) CIBA flow...") + + # Step 1: Get agent token + print("\nStep 1: Getting agent token...") + agent_scopes = ["openid", "profile"] + agent_token = await auth_manager.get_agent_token(agent_scopes) + print(f"Agent authenticated: {agent_token.access_token[:20]}...") + + # Step 2: Get OBO token via CIBA + print("\nStep 2: Initiating CIBA request for user...") + user_scopes = ["openid", "profile", "email"] + + stop_spinner = asyncio.Event() + spinner_task = None + + def on_ciba_initiated(ciba_response: CIBAResponse) -> None: + nonlocal spinner_task + print(f"\nCIBA request accepted. auth_req_id: {ciba_response.auth_req_id}") + + if ciba_response.auth_url: + print(f"Open this URL to authenticate: {ciba_response.auth_url}") + else: + print("Notification sent! Check your email/SMS inbox to approve the request.") + + print(f"(expires in {ciba_response.expires_in}s)\n") + + spinner_task = asyncio.ensure_future( + _spinner("Waiting for user to complete authentication...", stop_spinner) + ) + + try: + _, user_token = await auth_manager.get_obo_token_with_ciba( + login_hint="", # Replace with actual username + agent_token=agent_token, + scopes=user_scopes, + binding_message="AI Agent requests access to your account", + on_initiated=on_ciba_initiated, + ) + finally: + stop_spinner.set() + if spinner_task: + await spinner_task + + print("OBO token obtained successfully!") + print(f"User Access Token: {user_token.access_token[:30]}...") + if user_token.id_token: + print(f"User ID Token: {user_token.id_token[:30]}...") + if user_token.refresh_token: + print(f"Refresh Token: {user_token.refresh_token[:30]}...") + print(f"Expires in: {user_token.expires_in}s") + print(f"Scope: {user_token.scope}") + print("\nThe AI agent can now act on behalf of the user.") + + except Exception as e: + print(f"\nCIBA OBO flow failed: {e}") + import traceback + traceback.print_exc() + + +if __name__ == "__main__": + print("Asgardeo AI On-Behalf-Of (OBO) CIBA Flow Example") + print("=" * 55) + asyncio.run(main()) diff --git a/packages/asgardeo-ai/src/asgardeo_ai/agent_auth_manager.py b/packages/asgardeo-ai/src/asgardeo_ai/agent_auth_manager.py index 18baf3d..7af0f48 100644 --- a/packages/asgardeo-ai/src/asgardeo_ai/agent_auth_manager.py +++ b/packages/asgardeo-ai/src/asgardeo_ai/agent_auth_manager.py @@ -15,20 +15,25 @@ """Agent-enhanced OAuth2 authentication manager for Asgardeo AI.""" +import asyncio import logging import base64 import os -from typing import Dict, List, Optional, Tuple, Any +import time +from typing import Callable, Dict, List, Optional, Tuple, Any from urllib.parse import urlencode from dataclasses import dataclass from asgardeo import ( - AsgardeoConfig, - OAuthToken, - FlowStatus, - AsgardeoNativeAuthClient, + AsgardeoConfig, + OAuthToken, + FlowStatus, + AsgardeoNativeAuthClient, AsgardeoTokenClient, AuthenticationError, + CIBAAuthenticationError, + CIBAResponse, + CIBAStatus, TokenError, ValidationError, generate_pkce_pair, @@ -259,6 +264,130 @@ async def get_obo_token( logger.error(f"OBO token exchange failed: {e}") raise TokenError(f"OBO token exchange failed: {e}") + async def _poll_for_token( + self, + ciba_response: CIBAResponse, + scope: Optional[str] = None, + timeout: Optional[int] = None, + ) -> OAuthToken: + """Poll the token endpoint until CIBA authentication completes. + + :param ciba_response: CIBA initiation response with auth_req_id, interval, expires_in + :param scope: Optional scope override + :param timeout: Optional max wait time in seconds (defaults to ciba_response.expires_in) + :return: OAuthToken on successful authentication + :raises CIBAAuthenticationError: If authentication is denied or expires + """ + interval = ciba_response.interval + max_wait = min( + timeout or ciba_response.expires_in, + ciba_response.expires_in, + ) + start_time = time.monotonic() + + while True: + elapsed = time.monotonic() - start_time + if elapsed >= max_wait: + raise CIBAAuthenticationError( + "CIBA authentication timed out: exceeded maximum wait time." + ) + + await asyncio.sleep(interval) + + try: + token = await self.token_client.get_token( + "urn:openid:params:grant-type:ciba", + auth_req_id=ciba_response.auth_req_id, + scope=scope, + ) + return token + except CIBAAuthenticationError as e: + error_msg = str(e) + if CIBAStatus.AUTHORIZATION_PENDING in error_msg: + logger.debug("CIBA authorization pending, continuing to poll...") + continue + elif CIBAStatus.SLOW_DOWN in error_msg: + interval += 5 + logger.debug(f"CIBA slow_down received, increasing interval to {interval}s") + continue + elif CIBAStatus.EXPIRED_TOKEN in error_msg: + raise CIBAAuthenticationError( + "CIBA authentication request expired. The user did not authenticate in time." + ) + elif CIBAStatus.ACCESS_DENIED in error_msg: + raise CIBAAuthenticationError( + "CIBA authentication denied. The user rejected the authentication request." + ) + else: + raise + + async def get_obo_token_with_ciba( + self, + login_hint: str, + agent_token: OAuthToken, + scopes: Optional[List[str]] = None, + binding_message: Optional[str] = None, + notification_channel: Optional[str] = None, + timeout: Optional[int] = None, + on_initiated: Optional[Callable[[CIBAResponse], None]] = None, + ) -> Tuple[CIBAResponse, OAuthToken]: + """Get on-behalf-of token using CIBA flow. + + Initiates a CIBA request for a user identified by login_hint, + then polls until the user authenticates. The actor_token is sent + in the CIBA initiation to establish OBO delegation. + + :param login_hint: Username or identifier of the user to authenticate + :param agent_token: The agent's OAuthToken (used as actor_token for delegation) + :param scopes: List of OAuth scopes to request + :param binding_message: Message displayed to the user during authentication + :param notification_channel: Notification channel (email, sms, external) + :param timeout: Maximum time to wait for authentication in seconds + :param on_initiated: Optional callback invoked with CIBAResponse immediately after + the CIBA request is accepted and before polling begins. Use this to notify the + caller that a push/email/SMS has been sent and polling is starting. + Accepts both sync and async callables. + :return: Tuple of (CIBAResponse, OAuthToken) + """ + if not login_hint: + raise ValidationError("login_hint is required for CIBA OBO token exchange.") + if not agent_token: + raise ValidationError("agent_token is required for CIBA OBO token exchange.") + + scope_str = " ".join(scopes) if scopes else None + + try: + ciba_response = await self.token_client.initiate_ciba( + login_hint=login_hint, + scope=scope_str, + binding_message=binding_message, + notification_channel=notification_channel, + actor_token=agent_token.access_token, + ) + + logger.info( + f"CIBA initiated for user '{login_hint}'. auth_req_id: {ciba_response.auth_req_id}, " + f"expires_in: {ciba_response.expires_in}s" + ) + + if on_initiated is not None: + result = on_initiated(ciba_response) + if asyncio.iscoroutine(result): + await result + + token = await self._poll_for_token( + ciba_response=ciba_response, + scope=scope_str, + timeout=timeout, + ) + return ciba_response, token + + except (CIBAAuthenticationError, ValidationError): + raise + except Exception as e: + logger.error(f"CIBA OBO token exchange failed: {e}") + raise TokenError(f"CIBA OBO token exchange failed: {e}") + async def revoke_token( self, token: str, diff --git a/packages/asgardeo/src/asgardeo/__init__.py b/packages/asgardeo/src/asgardeo/__init__.py index 0231552..611dab7 100644 --- a/packages/asgardeo/src/asgardeo/__init__.py +++ b/packages/asgardeo/src/asgardeo/__init__.py @@ -18,6 +18,9 @@ AsgardeoConfig, AsgardeoError, AuthenticationError, + CIBAAuthenticationError, + CIBAResponse, + CIBAStatus, FlowStatus, NetworkError, OAuthToken, @@ -34,6 +37,9 @@ "AsgardeoNativeAuthClient", "AsgardeoTokenClient", "AuthenticationError", + "CIBAAuthenticationError", + "CIBAResponse", + "CIBAStatus", "FlowStatus", "NetworkError", "OAuthToken", diff --git a/packages/asgardeo/src/asgardeo/auth/client.py b/packages/asgardeo/src/asgardeo/auth/client.py index 98b8025..780c9ad 100644 --- a/packages/asgardeo/src/asgardeo/auth/client.py +++ b/packages/asgardeo/src/asgardeo/auth/client.py @@ -16,6 +16,7 @@ """Async Asgardeo authentication and token clients.""" +import base64 import json import logging from typing import Any @@ -27,6 +28,9 @@ AsgardeoConfig, AsgardeoError, AuthenticationError, + CIBAAuthenticationError, + CIBAResponse, + CIBAStatus, FlowStatus, NetworkError, OAuthToken, @@ -80,8 +84,10 @@ async def _initiate_auth( "response_mode": "direct", } - # Only add client_secret if code_verifier is not in params (PKCE flow) - if not (params and "code_challenge" in params): + # Always authenticate confidential clients even when using PKCE. + # PKCE prevents auth code interception; client_secret authenticates the client. + # Public clients (no client_secret) rely on PKCE alone. + if self.config.client_secret: data["client_secret"] = self.config.client_secret if state: data["state"] = state @@ -286,7 +292,7 @@ async def get_token(self, grant_type: str, **kwargs: Any) -> OAuthToken: url = f"{self.base_url}/oauth2/token" data = {"grant_type": grant_type, "client_id": self.config.client_id} - if self.config.client_secret and "code_verifier" not in kwargs: + if self.config.client_secret: data["client_secret"] = self.config.client_secret if grant_type == "authorization_code": @@ -311,6 +317,16 @@ async def get_token(self, grant_type: str, **kwargs: Any) -> OAuthToken: scope = kwargs.get("scope") if scope: data["scope"] = scope + elif grant_type == "urn:openid:params:grant-type:ciba": + auth_req_id = kwargs.get("auth_req_id") + if not auth_req_id: + raise ValidationError( + "auth_req_id is required for CIBA grant type.", + ) + data["auth_req_id"] = auth_req_id + scope = kwargs.get("scope") + if scope: + data["scope"] = scope else: raise ValidationError(f"Unsupported grant type: {grant_type}") @@ -330,6 +346,19 @@ async def get_token(self, grant_type: str, **kwargs: Any) -> OAuthToken: scope=resp_json.get("scope"), ) except httpx.HTTPStatusError as e: + if grant_type == "urn:openid:params:grant-type:ciba" and e.response.status_code == 400: + try: + error_json = e.response.json() + error_code = error_json.get("error", "") + if error_code in ( + CIBAStatus.AUTHORIZATION_PENDING, + CIBAStatus.SLOW_DOWN, + CIBAStatus.EXPIRED_TOKEN, + CIBAStatus.ACCESS_DENIED, + ): + raise CIBAAuthenticationError(error_code) + except (ValueError, KeyError): + pass raise TokenError( f"Token request failed: {e.response.status_code} {e.response.text}", ) @@ -340,6 +369,83 @@ async def get_token(self, grant_type: str, **kwargs: Any) -> OAuthToken: except Exception as e: raise AsgardeoError(f"Unexpected error during token request: {e!s}") + def _get_basic_auth_header(self) -> str: + """Generate Basic auth header value from client credentials. + + :return: Basic auth header value string + :raises ValidationError: If client_secret is not configured + """ + if not self.config.client_secret: + raise ValidationError("Client secret is required for Basic authentication.") + credentials = f"{self.config.client_id}:{self.config.client_secret}" + encoded = base64.b64encode(credentials.encode("utf-8")).decode("utf-8") + return f"Basic {encoded}" + + async def initiate_ciba( + self, + login_hint: str, + scope: str | None = None, + binding_message: str | None = None, + notification_channel: str | None = None, + actor_token: str | None = None, + ) -> CIBAResponse: + """Initiate a CIBA backchannel authentication request. + + :param login_hint: Username or identifier of the user to authenticate + :param scope: Space-separated scopes to request (defaults to config scope) + :param binding_message: Human-readable message displayed during authentication + :param notification_channel: Notification channel (email, sms, external) + :param actor_token: Optional actor token for OBO delegation + :return: CIBAResponse with auth_req_id, interval, expires_in, optional auth_url + """ + url = f"{self.base_url}/oauth2/ciba" + + headers = {"Content-Type": "application/x-www-form-urlencoded"} + + data = { + "scope": scope or self.config.scope, + "login_hint": login_hint, + } + + if self.config.client_secret: + headers["Authorization"] = self._get_basic_auth_header() + else: + data["client_id"] = self.config.client_id + + if binding_message: + data["binding_message"] = binding_message + + if notification_channel: + data["notification_channel"] = notification_channel + + if actor_token: + data["actor_token"] = actor_token + + try: + response = await self.session.post( + url, + headers=headers, + data=urlencode(data), + ) + response.raise_for_status() + resp_json = response.json() + return CIBAResponse( + auth_req_id=resp_json["auth_req_id"], + interval=resp_json.get("interval", 2), + expires_in=resp_json.get("expires_in", 120), + auth_url=resp_json.get("auth_url"), + ) + except httpx.HTTPStatusError as e: + raise AuthenticationError( + f"CIBA initiation failed: {e.response.status_code} {e.response.text}", + ) + except httpx.RequestError as e: + raise NetworkError(f"Network error during CIBA initiation: {e!s}") + except KeyError as e: + raise AuthenticationError(f"Missing required field in CIBA response: {e!s}") + except Exception as e: + raise AsgardeoError(f"Unexpected error during CIBA initiation: {e!s}") + async def refresh_access_token(self, refresh_token: str) -> OAuthToken: """Simple token refresh method. diff --git a/packages/asgardeo/src/asgardeo/models.py b/packages/asgardeo/src/asgardeo/models.py index 26798f7..1e76c9c 100644 --- a/packages/asgardeo/src/asgardeo/models.py +++ b/packages/asgardeo/src/asgardeo/models.py @@ -63,8 +63,31 @@ class OAuthToken: scope: str | None = None +@dataclass +class CIBAResponse: + """CIBA backchannel authentication response.""" + + auth_req_id: str + interval: int = 2 + expires_in: int = 120 + auth_url: str | None = None + + class FlowStatus: """Authentication flow status constants.""" SUCCESS_COMPLETED = "SUCCESS_COMPLETED" INCOMPLETE = "INCOMPLETE" + + +class CIBAStatus: + """CIBA polling status constants.""" + + AUTHORIZATION_PENDING = "authorization_pending" + SLOW_DOWN = "slow_down" + EXPIRED_TOKEN = "expired_token" + ACCESS_DENIED = "access_denied" + + +class CIBAAuthenticationError(AuthenticationError): + """Raised when CIBA authentication fails (expired, denied, etc.)."""