Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 48 additions & 8 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,61 @@ This directory contains examples for using the Asgardeo Python SDKs.

## Setup

Before running the examples, make sure to:
### 1. Create a virtual environment

From the repository root (`sdk/python`):

```bash
python3 -m venv .venv
source .venv/bin/activate # macOS/Linux
# .venv\Scripts\activate # Windows
```

### 2. Install the packages

Install both packages in editable mode so local source changes are picked up immediately:

1. Install the packages:
```bash
cd asgardeo && poetry install
cd ../asgardeo-ai && poetry install
pip install -e packages/asgardeo -e packages/asgardeo-ai
```

2. Set up your configuration in each example file with your actual Asgardeo credentials.
### 3. Configure credentials

Open the example file you want to run and replace the placeholder values with your actual Asgardeo credentials:

```python
config = AsgardeoConfig(
base_url="https://api.asgardeo.io/t/<your-org>",
client_id="<your-client-id>",
redirect_uri="<your-redirect-uri>",
client_secret="<your-client-secret>"
)
```

## Available Examples

| Example | Description |
|---------|-------------|
| `asgardeo/native_auth.py` | App-native authentication (username/password without browser redirect) |
| `asgardeo-ai/agent_auth.py` | AI agent token acquisition using native auth |
| `asgardeo-ai/obo_flow.py` | On-Behalf-Of (OBO) token flow via authorization code |
| `asgardeo-ai/ciba_obo_flow.py` | On-Behalf-Of (OBO) token flow via CIBA with polling |

## Running Examples

Each example is a standalone Python script that can be run directly:
Make sure the virtual environment is activated, then run any example from the repository root:

```bash
python examples/asgardeo/basic_auth.py
python examples/asgardeo/native_auth.py
python examples/asgardeo-ai/agent_auth.py
```
python examples/asgardeo-ai/obo_flow.py
python examples/asgardeo-ai/ciba_obo_flow.py
```

## Asgardeo Prerequisites

Before running the examples, ensure your application is configured in the Asgardeo Console:

- **Native auth / Agent auth**: Enable **App-Native Authentication** in the Login Flow tab
- **OBO flow**: Enable **Token Exchange** grant type in the Protocol tab
- **CIBA OBO flow**: Enable **CIBA** grant type in the Protocol tab and configure at least one notification channel (Email, SMS, or External)
125 changes: 125 additions & 0 deletions examples/asgardeo-ai/ciba_obo_flow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
"""
Copyright (c) 2025, WSO2 LLC. (https://www.wso2.com).
WSO2 LLC. licenses this file to you under the Apache License,
Version 2.0 (the "License"); you may not use this file except
in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
"""

"""
On-Behalf-Of (OBO) token flow using CIBA.

This example shows how an AI agent can obtain tokens on behalf of a user
using CIBA. The agent initiates a backchannel authentication request for
the user, and the user authenticates on a separate device (via email, SMS,
or an external link).
"""

import asyncio
import itertools
import sys

from asgardeo import AsgardeoConfig, CIBAResponse
from asgardeo_ai import AgentAuthManager, AgentConfig


async def _spinner(message: str, stop_event: asyncio.Event) -> None:
"""Display a spinning animation until stop_event is set."""
frames = itertools.cycle(["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"])
while not stop_event.is_set():
sys.stdout.write(f"\r{next(frames)} {message}")
sys.stdout.flush()
await asyncio.sleep(0.1)
sys.stdout.write(f"\r{' ' * (len(message) + 2)}\r")
sys.stdout.flush()


async def main():
"""On-Behalf-Of (OBO) CIBA flow example."""

# Asgardeo configuration - Replace with your actual values
config = AsgardeoConfig(
base_url="https://api.asgardeo.io/t/<tenant>",
client_id="<client_id>",
redirect_uri="<redirect_uri>",
client_secret="<client_secret>",
)

# AI Agent configuration - Replace with your actual agent credentials
agent_config = AgentConfig(
agent_id="<agent_id>",
agent_secret="<agent_secret>"
)

try:
async with AgentAuthManager(config, agent_config) as auth_manager:
print("Starting On-Behalf-Of (OBO) CIBA flow...")

# Step 1: Get agent token
print("\nStep 1: Getting agent token...")
agent_scopes = ["openid", "profile"]
agent_token = await auth_manager.get_agent_token(agent_scopes)
print(f"Agent authenticated: {agent_token.access_token[:20]}...")

# Step 2: Get OBO token via CIBA
print("\nStep 2: Initiating CIBA request for user...")
user_scopes = ["openid", "profile", "email"]

stop_spinner = asyncio.Event()
spinner_task = None

def on_ciba_initiated(ciba_response: CIBAResponse) -> None:
nonlocal spinner_task
print(f"\nCIBA request accepted. auth_req_id: {ciba_response.auth_req_id}")

if ciba_response.auth_url:
print(f"Open this URL to authenticate: {ciba_response.auth_url}")
else:
print("Notification sent! Check your email/SMS inbox to approve the request.")

print(f"(expires in {ciba_response.expires_in}s)\n")

spinner_task = asyncio.ensure_future(
_spinner("Waiting for user to complete authentication...", stop_spinner)
)

try:
_, user_token = await auth_manager.get_obo_token_with_ciba(
login_hint="<username>", # Replace with actual username
agent_token=agent_token,
scopes=user_scopes,
binding_message="AI Agent requests access to your account",
on_initiated=on_ciba_initiated,
)
finally:
stop_spinner.set()
if spinner_task:
await spinner_task

print("OBO token obtained successfully!")
print(f"User Access Token: {user_token.access_token[:30]}...")
if user_token.id_token:
print(f"User ID Token: {user_token.id_token[:30]}...")
if user_token.refresh_token:
print(f"Refresh Token: {user_token.refresh_token[:30]}...")
print(f"Expires in: {user_token.expires_in}s")
print(f"Scope: {user_token.scope}")
print("\nThe AI agent can now act on behalf of the user.")

except Exception as e:
print(f"\nCIBA OBO flow failed: {e}")
import traceback
traceback.print_exc()


if __name__ == "__main__":
print("Asgardeo AI On-Behalf-Of (OBO) CIBA Flow Example")
print("=" * 55)
asyncio.run(main())
139 changes: 134 additions & 5 deletions packages/asgardeo-ai/src/asgardeo_ai/agent_auth_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,25 @@

"""Agent-enhanced OAuth2 authentication manager for Asgardeo AI."""

import asyncio
import logging
import base64
import os
from typing import Dict, List, Optional, Tuple, Any
import time
from typing import Callable, Dict, List, Optional, Tuple, Any
from urllib.parse import urlencode
from dataclasses import dataclass

from asgardeo import (
AsgardeoConfig,
OAuthToken,
FlowStatus,
AsgardeoNativeAuthClient,
AsgardeoConfig,
OAuthToken,
FlowStatus,
AsgardeoNativeAuthClient,
AsgardeoTokenClient,
AuthenticationError,
CIBAAuthenticationError,
CIBAResponse,
CIBAStatus,
TokenError,
ValidationError,
generate_pkce_pair,
Expand Down Expand Up @@ -259,6 +264,130 @@ async def get_obo_token(
logger.error(f"OBO token exchange failed: {e}")
raise TokenError(f"OBO token exchange failed: {e}")

async def _poll_for_token(
self,
ciba_response: CIBAResponse,
scope: Optional[str] = None,
timeout: Optional[int] = None,
) -> OAuthToken:
"""Poll the token endpoint until CIBA authentication completes.

:param ciba_response: CIBA initiation response with auth_req_id, interval, expires_in
:param scope: Optional scope override
:param timeout: Optional max wait time in seconds (defaults to ciba_response.expires_in)
:return: OAuthToken on successful authentication
:raises CIBAAuthenticationError: If authentication is denied or expires
"""
interval = ciba_response.interval
max_wait = min(
timeout or ciba_response.expires_in,
ciba_response.expires_in,
)
start_time = time.monotonic()

while True:
elapsed = time.monotonic() - start_time
if elapsed >= max_wait:
raise CIBAAuthenticationError(
"CIBA authentication timed out: exceeded maximum wait time."
)

await asyncio.sleep(interval)

try:
token = await self.token_client.get_token(
"urn:openid:params:grant-type:ciba",
auth_req_id=ciba_response.auth_req_id,
scope=scope,
)
return token
except CIBAAuthenticationError as e:
error_msg = str(e)
if CIBAStatus.AUTHORIZATION_PENDING in error_msg:
logger.debug("CIBA authorization pending, continuing to poll...")
continue
elif CIBAStatus.SLOW_DOWN in error_msg:
interval += 5
logger.debug(f"CIBA slow_down received, increasing interval to {interval}s")
continue
elif CIBAStatus.EXPIRED_TOKEN in error_msg:
raise CIBAAuthenticationError(
"CIBA authentication request expired. The user did not authenticate in time."
)
elif CIBAStatus.ACCESS_DENIED in error_msg:
raise CIBAAuthenticationError(
"CIBA authentication denied. The user rejected the authentication request."
)
else:
raise

async def get_obo_token_with_ciba(
self,
login_hint: str,
agent_token: OAuthToken,
scopes: Optional[List[str]] = None,
binding_message: Optional[str] = None,
notification_channel: Optional[str] = None,
timeout: Optional[int] = None,
on_initiated: Optional[Callable[[CIBAResponse], None]] = None,
) -> Tuple[CIBAResponse, OAuthToken]:
"""Get on-behalf-of token using CIBA flow.

Initiates a CIBA request for a user identified by login_hint,
then polls until the user authenticates. The actor_token is sent
in the CIBA initiation to establish OBO delegation.

:param login_hint: Username or identifier of the user to authenticate
:param agent_token: The agent's OAuthToken (used as actor_token for delegation)
:param scopes: List of OAuth scopes to request
:param binding_message: Message displayed to the user during authentication
:param notification_channel: Notification channel (email, sms, external)
:param timeout: Maximum time to wait for authentication in seconds
:param on_initiated: Optional callback invoked with CIBAResponse immediately after
the CIBA request is accepted and before polling begins. Use this to notify the
caller that a push/email/SMS has been sent and polling is starting.
Accepts both sync and async callables.
:return: Tuple of (CIBAResponse, OAuthToken)
"""
if not login_hint:
raise ValidationError("login_hint is required for CIBA OBO token exchange.")
if not agent_token:
raise ValidationError("agent_token is required for CIBA OBO token exchange.")

scope_str = " ".join(scopes) if scopes else None

try:
ciba_response = await self.token_client.initiate_ciba(
login_hint=login_hint,
scope=scope_str,
binding_message=binding_message,
notification_channel=notification_channel,
actor_token=agent_token.access_token,
)

logger.info(
f"CIBA initiated for user '{login_hint}'. auth_req_id: {ciba_response.auth_req_id}, "
f"expires_in: {ciba_response.expires_in}s"
)

if on_initiated is not None:
result = on_initiated(ciba_response)
if asyncio.iscoroutine(result):
await result

token = await self._poll_for_token(
ciba_response=ciba_response,
scope=scope_str,
timeout=timeout,
)
return ciba_response, token

except (CIBAAuthenticationError, ValidationError):
raise
except Exception as e:
logger.error(f"CIBA OBO token exchange failed: {e}")
raise TokenError(f"CIBA OBO token exchange failed: {e}")

async def revoke_token(
self,
token: str,
Expand Down
6 changes: 6 additions & 0 deletions packages/asgardeo/src/asgardeo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
AsgardeoConfig,
AsgardeoError,
AuthenticationError,
CIBAAuthenticationError,
CIBAResponse,
CIBAStatus,
FlowStatus,
NetworkError,
OAuthToken,
Expand All @@ -34,6 +37,9 @@
"AsgardeoNativeAuthClient",
"AsgardeoTokenClient",
"AuthenticationError",
"CIBAAuthenticationError",
"CIBAResponse",
"CIBAStatus",
"FlowStatus",
"NetworkError",
"OAuthToken",
Expand Down
Loading
Loading