From 348902219303018d93e209b6a770046779ef31de Mon Sep 17 00:00:00 2001 From: HarshCasper Date: Fri, 6 Feb 2026 00:08:33 +0530 Subject: [PATCH 1/2] add initial version of keycloak extension --- .github/workflows/keycloak.yml | 53 ++ keycloak/.gitignore | 11 + keycloak/Makefile | 74 +++ keycloak/README.md | 199 +++++++ keycloak/localstack_keycloak/__init__.py | 1 + keycloak/localstack_keycloak/extension.py | 308 ++++++++++ keycloak/localstack_keycloak/utils.py | 70 +++ keycloak/pyproject.toml | 49 ++ keycloak/quickstart/README.md | 89 +++ keycloak/quickstart/sample-realm.json | 148 +++++ keycloak/sample-app/README.md | 167 ++++++ keycloak/sample-app/cdk/.gitignore | 1 + keycloak/sample-app/cdk/app.py | 13 + keycloak/sample-app/cdk/cdk.json | 17 + keycloak/sample-app/cdk/requirements.txt | 2 + keycloak/sample-app/cdk/stacks/__init__.py | 0 keycloak/sample-app/cdk/stacks/api_stack.py | 143 +++++ .../sample-app/lambda/authorizer/handler.py | 220 ++++++++ keycloak/sample-app/lambda/users/handler.py | 257 +++++++++ keycloak/tests/__init__.py | 0 keycloak/tests/test_extension.py | 526 ++++++++++++++++++ 21 files changed, 2348 insertions(+) create mode 100644 .github/workflows/keycloak.yml create mode 100644 keycloak/.gitignore create mode 100644 keycloak/Makefile create mode 100644 keycloak/README.md create mode 100644 keycloak/localstack_keycloak/__init__.py create mode 100644 keycloak/localstack_keycloak/extension.py create mode 100644 keycloak/localstack_keycloak/utils.py create mode 100644 keycloak/pyproject.toml create mode 100644 keycloak/quickstart/README.md create mode 100644 keycloak/quickstart/sample-realm.json create mode 100644 keycloak/sample-app/README.md create mode 100644 keycloak/sample-app/cdk/.gitignore create mode 100644 keycloak/sample-app/cdk/app.py create mode 100644 keycloak/sample-app/cdk/cdk.json create mode 100644 keycloak/sample-app/cdk/requirements.txt create mode 100644 keycloak/sample-app/cdk/stacks/__init__.py create mode 100644 keycloak/sample-app/cdk/stacks/api_stack.py create mode 100644 keycloak/sample-app/lambda/authorizer/handler.py create mode 100644 keycloak/sample-app/lambda/users/handler.py create mode 100644 keycloak/tests/__init__.py create mode 100644 keycloak/tests/test_extension.py diff --git a/.github/workflows/keycloak.yml b/.github/workflows/keycloak.yml new file mode 100644 index 00000000..0683ed38 --- /dev/null +++ b/.github/workflows/keycloak.yml @@ -0,0 +1,53 @@ +name: LocalStack Keycloak Extension Tests + +on: + push: + paths: + - keycloak/** + branches: + - main + pull_request: + paths: + - .github/workflows/keycloak.yml + - keycloak/** + workflow_dispatch: + +env: + LOCALSTACK_DISABLE_EVENTS: "1" + LOCALSTACK_AUTH_TOKEN: ${{ secrets.LOCALSTACK_AUTH_TOKEN }} + +jobs: + integration-tests: + name: Run Integration Tests + runs-on: ubuntu-latest + timeout-minutes: 15 + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup LocalStack and extension + run: | + cd keycloak + + docker pull localstack/localstack-pro & + docker pull quay.io/keycloak/keycloak:26.0 & + pip install localstack + + make install + make lint + make dist + localstack extensions -v install file://$(ls ./dist/localstack_extension_keycloak-*.tar.gz) + + DEBUG=1 localstack start -d + localstack wait + + - name: Run integration tests + run: | + cd keycloak + make test + + - name: Print logs + if: always() + run: | + localstack logs + localstack stop diff --git a/keycloak/.gitignore b/keycloak/.gitignore new file mode 100644 index 00000000..1f2830dd --- /dev/null +++ b/keycloak/.gitignore @@ -0,0 +1,11 @@ +.venv/ +*.egg-info/ +dist/ +build/ +__pycache__/ +*.pyc +.eggs/ +.pytest_cache/ +.ruff_cache/ +cdk.out/ +plan.md diff --git a/keycloak/Makefile b/keycloak/Makefile new file mode 100644 index 00000000..1f817b46 --- /dev/null +++ b/keycloak/Makefile @@ -0,0 +1,74 @@ +VENV_BIN = python3 -m venv +VENV_DIR ?= .venv +VENV_ACTIVATE = $(VENV_DIR)/bin/activate +VENV_RUN = . $(VENV_ACTIVATE) +TEST_PATH ?= tests + +usage: ## Shows usage for this Makefile + @cat Makefile | grep -E '^[a-zA-Z_-]+:.*?## .*$$' | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-15s\033[0m %s\n", $$1, $$2}' + +venv: $(VENV_ACTIVATE) + +$(VENV_ACTIVATE): pyproject.toml + test -d .venv || $(VENV_BIN) .venv + $(VENV_RUN); pip install --upgrade pip setuptools plux + $(VENV_RUN); pip install -e .[dev] + touch $(VENV_DIR)/bin/activate + +clean: + rm -rf .venv/ + rm -rf build/ + rm -rf .eggs/ + rm -rf *.egg-info/ + +install: venv ## Install dependencies + $(VENV_RUN); python -m plux entrypoints + +dist: venv ## Create distribution + $(VENV_RUN); python -m build + +publish: clean-dist venv dist ## Publish extension to pypi + $(VENV_RUN); pip install --upgrade twine; twine upload dist/* + +entrypoints: venv ## Generate plugin entrypoints for Python package + $(VENV_RUN); python -m plux entrypoints + +format: ## Run ruff to format the codebase + $(VENV_RUN); python -m ruff format .; make lint + +lint: ## Run ruff to lint the codebase + $(VENV_RUN); python -m ruff check --output-format=full . + +test: ## Run integration tests (requires LocalStack running with the Extension installed) + $(VENV_RUN); pytest $(PYTEST_ARGS) $(TEST_PATH) + +deploy: ## Deploy the sample app CDK stack + cd sample-app/cdk && pip install -r requirements.txt && cdklocal bootstrap && cdklocal deploy --all --require-approval never + +test-sample: ## Run sample app API tests (requires LocalStack running with deployed stack) + @echo "Getting token from Keycloak..." + @TOKEN=$$(curl -s -X POST \ + "http://keycloak.localhost.localstack.cloud:4566/realms/localstack/protocol/openid-connect/token" \ + -d "grant_type=client_credentials" \ + -d "client_id=localstack-client" \ + -d "client_secret=localstack-client-secret" | jq -r '.access_token') && \ + API_ID=$$(awslocal apigateway get-rest-apis --query 'items[0].id' --output text) && \ + API_URL="http://localhost:4566/_aws/execute-api/$${API_ID}/prod" && \ + echo "API URL: $${API_URL}" && \ + echo "\n=== List users ===" && \ + curl -s -H "Authorization: Bearer $${TOKEN}" "$${API_URL}/users" | jq . && \ + echo "\n=== Create user ===" && \ + curl -s -X POST "$${API_URL}/users" \ + -H "Authorization: Bearer $${TOKEN}" \ + -H "Content-Type: application/json" \ + -d '{"username": "testuser", "email": "test@example.com", "name": "Test User"}' | jq . && \ + echo "\n=== Get user ===" && \ + curl -s -H "Authorization: Bearer $${TOKEN}" "$${API_URL}/users/testuser" | jq . && \ + echo "\n=== Delete user ===" && \ + curl -s -X DELETE -H "Authorization: Bearer $${TOKEN}" "$${API_URL}/users/testuser" | jq . && \ + echo "\n=== Sample app tests completed ===" + +clean-dist: clean + rm -rf dist/ + +.PHONY: clean clean-dist dist install publish usage venv format test lint entrypoints deploy test-sample diff --git a/keycloak/README.md b/keycloak/README.md new file mode 100644 index 00000000..2c1659bc --- /dev/null +++ b/keycloak/README.md @@ -0,0 +1,199 @@ +# Keycloak on LocalStack + +This repo contains a [LocalStack Extension](https://github.com/localstack/localstack-extensions) that runs [Keycloak](https://www.keycloak.org/) alongside LocalStack for identity and access management with local AWS applications. + +This Extension: + +- Spins up a Keycloak instance on LocalStack startup. +- Auto-registers Keycloak as an OIDC identity provider in LocalStack IAM. +- Ships with a default realm (`localstack`) ready for OAuth2/OIDC flows. +- Exchanges Keycloak JWTs for temporary AWS credentials via `AssumeRoleWithWebIdentity`. + +## Prerequisites + +- Docker +- LocalStack Pro +- `localstack` CLI +- `make` + +## Installation + +```bash +localstack extensions install "git+https://github.com/localstack/localstack-extensions.git#egg=localstack-keycloak&subdirectory=keycloak" +``` + +## Install local development version + +To install the extension into LocalStack in developer mode, you will need Python 3.11, and create a virtual environment in the extensions project. + +```bash +make install +``` + +Then, to enable the extension for LocalStack, run + +```bash +localstack extensions dev enable . +``` + +You can then start LocalStack with `EXTENSION_DEV_MODE=1` to load all enabled extensions: + +```bash +EXTENSION_DEV_MODE=1 localstack start +``` + +## Usage + +Start LocalStack: + +```bash +localstack start +``` + +Keycloak will be available at: + +| Endpoint | URL | +|----------|-----| +| Admin Console | http://localhost:8080/admin | +| Token Endpoint | http://keycloak.localhost.localstack.cloud:4566/realms/localstack/protocol/openid-connect/token | +| JWKS URL | http://keycloak.localhost.localstack.cloud:4566/realms/localstack/protocol/openid-connect/certs | + +Keycloak ports are exposed directly on the host for easy access: + +- **Admin Console & HTTP (8080)**: `http://localhost:8080` - Use this for the admin UI and direct API access +- **Management (9000)**: `http://localhost:9000` - Health and metrics endpoints (Keycloak 26+) + +The gateway URL (`keycloak.localhost.localstack.cloud:4566`) is available for token endpoints and OIDC flows. + +- **Default Admin Credentials**: `admin` / `admin` +- **Health check**: `curl http://localhost:9000/health/ready` + +### Get an Access Token + +```bash +TOKEN=$(curl -s -X POST \ + "http://keycloak.localhost.localstack.cloud:4566/realms/localstack/protocol/openid-connect/token" \ + -d "grant_type=client_credentials" \ + -d "client_id=localstack-client" \ + -d "client_secret=localstack-client-secret" | jq -r '.access_token') +``` + +### Exchange Token for AWS Credentials + +```bash +# Create IAM role that trusts Keycloak +cat > trust-policy.json << 'EOF' +{ + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Principal": { + "Federated": "arn:aws:iam::000000000000:oidc-provider/keycloak.localhost.localstack.cloud:4566/realms/localstack" + }, + "Action": "sts:AssumeRoleWithWebIdentity" + }] +} +EOF + +awslocal iam create-role \ + --role-name KeycloakAuthRole \ + --assume-role-policy-document file://trust-policy.json + +# Exchange Keycloak token for AWS credentials +awslocal sts assume-role-with-web-identity \ + --role-arn arn:aws:iam::000000000000:role/KeycloakAuthRole \ + --role-session-name test-session \ + --web-identity-token "$TOKEN" +``` + +## Configuration + +| Environment Variable | Default | Description | +|---------------------|---------|-------------| +| `KEYCLOAK_REALM` | `localstack` | Name of the default realm | +| `KEYCLOAK_VERSION` | `26.0` | Keycloak Docker image version | +| `KEYCLOAK_REALM_FILE` | - | Path to custom realm JSON file | +| `KEYCLOAK_DEFAULT_USER` | - | Username for auto-created test user | +| `KEYCLOAK_DEFAULT_PASSWORD` | - | Password for auto-created test user | +| `KEYCLOAK_OIDC_AUDIENCE` | `localstack-client` | Audience claim for OIDC provider | +| `KEYCLOAK_FLAGS` | - | Additional flags for Keycloak start command | + +> **Note**: When using `localstack start`, prefix environment variables with `LOCALSTACK_` (e.g., `LOCALSTACK_KEYCLOAK_REALM`). + +### Custom Realm Configuration + +Use your own realm JSON file with pre-configured users, roles, and clients. + +```bash +# The path must be an absolute HOST path for Docker mount +# Use LOCALSTACK_ prefix when running via CLI +LOCALSTACK_KEYCLOAK_REALM_FILE=/path/to/my-realm.json localstack start +``` + +See [`quickstart/sample-realm.json`](quickstart/sample-realm.json) for a realm template and [`quickstart/README.md`](quickstart/README.md) for a step-by-step guide. + +### Create Test Users + +```bash +# Auto-create a test user on startup (use LOCALSTACK_ prefix with CLI) +LOCALSTACK_KEYCLOAK_DEFAULT_USER=testuser LOCALSTACK_KEYCLOAK_DEFAULT_PASSWORD=password123 localstack start +``` + +## Default Client + +The extension creates a default client `localstack-client` with: + +- **Client Secret**: `localstack-client-secret` +- **Flows**: Authorization Code, Client Credentials, Direct Access Grants +- **Service Account Roles**: `admin`, `user` + +The service account for `localstack-client` is automatically assigned the `admin` realm role, enabling full access when using client credentials flow. + +## Sample Application + +See the `sample-app/` directory for a complete example demonstrating: + +- API Gateway with Lambda Authorizer +- JWT validation with Keycloak +- Role-based access control +- DynamoDB user management + +## Troubleshooting + +### Keycloak takes a long time to start + +Keycloak typically takes 30-60 seconds to fully start. The extension waits for the health check to pass before marking LocalStack as ready. + +### Health check endpoint returns 404 + +In Keycloak 26+, the health endpoint is on port 9000: + +```bash +curl http://localhost:9000/health/ready +``` + +### View Keycloak logs + +```bash +docker logs ls-ext-keycloak +``` + +## Development + +```bash +# Install dependencies +make install + +# Run tests (requires LocalStack with extension running) +make test + +# Format code +make format + +# Lint +make lint +``` + +## License + +Apache License 2.0 diff --git a/keycloak/localstack_keycloak/__init__.py b/keycloak/localstack_keycloak/__init__.py new file mode 100644 index 00000000..d4218cc6 --- /dev/null +++ b/keycloak/localstack_keycloak/__init__.py @@ -0,0 +1 @@ +name = "localstack_keycloak" diff --git a/keycloak/localstack_keycloak/extension.py b/keycloak/localstack_keycloak/extension.py new file mode 100644 index 00000000..eedb9111 --- /dev/null +++ b/keycloak/localstack_keycloak/extension.py @@ -0,0 +1,308 @@ +"""Keycloak extension for LocalStack.""" + +import logging +import shlex + +import requests +from localstack import config, constants +from localstack.utils.net import get_addressable_container_host +from localstack_extensions.utils.docker import ProxiedDockerContainerExtension + +from .utils import ( + DEFAULT_CLIENT_SECRET, + ENV_KEYCLOAK_DEFAULT_PASSWORD, + ENV_KEYCLOAK_DEFAULT_USER, + ENV_KEYCLOAK_FLAGS, + ENV_KEYCLOAK_OIDC_AUDIENCE, + ENV_KEYCLOAK_REALM, + ENV_KEYCLOAK_REALM_FILE, + ENV_KEYCLOAK_VERSION, + KEYCLOAK_HTTP_PORT, + KEYCLOAK_MGMT_PORT, + DEFAULT_AUDIENCE, + DEFAULT_REALM, + DEFAULT_VERSION, + get_default_client_config, + get_default_realm_config, + get_env, +) + +LOG = logging.getLogger(__name__) + + +class KeycloakExtension(ProxiedDockerContainerExtension): + """Keycloak extension for LocalStack.""" + + name = "keycloak" + HOST = "keycloak." + DOCKER_IMAGE = "quay.io/keycloak/keycloak" + + def __init__(self): + self.realm = get_env(ENV_KEYCLOAK_REALM, DEFAULT_REALM) + self.version = get_env(ENV_KEYCLOAK_VERSION, DEFAULT_VERSION) + self.realm_file = get_env(ENV_KEYCLOAK_REALM_FILE) + self.audience = get_env(ENV_KEYCLOAK_OIDC_AUDIENCE, DEFAULT_AUDIENCE) + custom_flags = (get_env(ENV_KEYCLOAK_FLAGS) or "").strip() + + command = ["start-dev", "--health-enabled=true"] + if self.realm_file: + command.append("--import-realm") + if custom_flags: + command.extend(shlex.split(custom_flags)) + + env_vars = { + "KEYCLOAK_ADMIN": "admin", + "KEYCLOAK_ADMIN_PASSWORD": "admin", + "KC_HEALTH_ENABLED": "true", + "KC_HTTP_ENABLED": "true", + "KC_HOSTNAME_STRICT": "false", + } + + volumes = None + if self.realm_file: + volumes = [(self.realm_file, "/opt/keycloak/data/import/realm.json")] + + super().__init__( + image_name=f"{self.DOCKER_IMAGE}:{self.version}", + container_ports=[KEYCLOAK_HTTP_PORT, KEYCLOAK_MGMT_PORT], + host=self.HOST, + command=command, + env_vars=env_vars, + volumes=volumes, + health_check_fn=self._health_check, + health_check_retries=90, + health_check_sleep=2.0, + ) + + def _health_check(self): + """Check Keycloak health on management port (9000 for Keycloak 26+).""" + container_host = get_addressable_container_host() + health_url = f"http://{container_host}:{KEYCLOAK_MGMT_PORT}/health/ready" + response = requests.get(health_url, timeout=10) + if not response.ok: + raise Exception(f"Health check failed: {response.status_code}") + + def on_platform_ready(self): + """Called when LocalStack platform is ready.""" + try: + if not self.realm_file: + self._create_default_realm() + self._create_default_user() + self._register_oidc_provider() + self._log_startup_info() + except Exception as e: + LOG.error("Failed to complete Keycloak setup: %s", e) + + def _get_base_url(self) -> str: + """Get Keycloak base URL.""" + return f"http://{get_addressable_container_host()}:{KEYCLOAK_HTTP_PORT}" + + def _get_admin_token(self) -> str: + """Get admin access token.""" + response = requests.post( + f"{self._get_base_url()}/realms/master/protocol/openid-connect/token", + data={ + "grant_type": "password", + "client_id": "admin-cli", + "username": "admin", + "password": "admin", + }, + timeout=30, + ) + if not response.ok: + raise Exception(f"Failed to get admin token: {response.text}") + return response.json()["access_token"] + + def _create_default_realm(self): + """Create the default realm via Admin API.""" + base_url = self._get_base_url() + realm_url = f"{base_url}/realms/{self.realm}" + + if requests.get(realm_url, timeout=10).ok: + LOG.info("Realm '%s' already exists", self.realm) + return + + LOG.info("Creating realm: %s", self.realm) + admin_token = self._get_admin_token() + headers = { + "Authorization": f"Bearer {admin_token}", + "Content-Type": "application/json", + } + + response = requests.post( + f"{base_url}/admin/realms", + headers=headers, + json=get_default_realm_config(self.realm), + timeout=30, + ) + + if response.status_code == 201: + LOG.info("Created realm: %s", self.realm) + self._create_default_client(admin_token) + elif response.status_code != 409: + LOG.warning("Failed to create realm: %s", response.text) + + def _create_default_client(self, admin_token: str): + """Create the default client.""" + base_url = self._get_base_url() + headers = { + "Authorization": f"Bearer {admin_token}", + "Content-Type": "application/json", + } + + response = requests.post( + f"{base_url}/admin/realms/{self.realm}/clients", + headers=headers, + json=get_default_client_config(self.audience), + timeout=30, + ) + + if response.status_code in (201, 409): + LOG.info("Client '%s' ready", self.audience) + self._assign_admin_role(admin_token) + else: + LOG.warning("Failed to create client: %s", response.text) + + def _assign_admin_role(self, admin_token: str): + """Assign admin role to service account.""" + base_url = self._get_base_url() + headers = { + "Authorization": f"Bearer {admin_token}", + "Content-Type": "application/json", + } + + try: + # Get client UUID + clients = requests.get( + f"{base_url}/admin/realms/{self.realm}/clients", + headers=headers, + params={"clientId": self.audience}, + timeout=30, + ).json() + if not clients: + return + client_uuid = clients[0]["id"] + + # Get service account user + service_account = requests.get( + f"{base_url}/admin/realms/{self.realm}/clients/{client_uuid}/service-account-user", + headers=headers, + timeout=30, + ).json() + + # Get admin role + roles = requests.get( + f"{base_url}/admin/realms/{self.realm}/roles", + headers=headers, + timeout=30, + ).json() + admin_role = next((r for r in roles if r["name"] == "admin"), None) + if not admin_role: + return + + # Assign role + requests.post( + f"{base_url}/admin/realms/{self.realm}/users/{service_account['id']}/role-mappings/realm", + headers=headers, + json=[admin_role], + timeout=30, + ) + LOG.info("Assigned admin role to service account") + except Exception as e: + LOG.warning("Failed to assign admin role: %s", e) + + def _create_default_user(self): + """Create default test user if configured (Keycloak 26+ requires two-step process).""" + username = get_env(ENV_KEYCLOAK_DEFAULT_USER) + password = get_env(ENV_KEYCLOAK_DEFAULT_PASSWORD) + if not username or not password: + return + + base_url = self._get_base_url() + headers = {"Authorization": f"Bearer {self._get_admin_token()}"} + users_url = f"{base_url}/admin/realms/{self.realm}/users" + + try: + # Create user with required profile fields + response = requests.post( + users_url, + headers=headers, + json={ + "username": username, + "enabled": True, + "emailVerified": True, + "email": f"{username}@localstack.local", + "firstName": username.capitalize(), + "lastName": "User", + "requiredActions": [], + }, + timeout=30, + ) + + if response.status_code == 409: + return + if response.status_code != 201: + LOG.warning("Failed to create user: %s", response.text) + return + + # Get user ID and set password separately + users = requests.get( + users_url, headers=headers, params={"username": username}, timeout=30 + ).json() + if not users: + return + + requests.put( + f"{users_url}/{users[0]['id']}/reset-password", + headers={**headers, "Content-Type": "application/json"}, + json={"type": "password", "value": password, "temporary": False}, + timeout=30, + ) + LOG.info("Created user: %s", username) + except Exception as e: + LOG.warning("Failed to create user: %s", e) + + def _register_oidc_provider(self): + """Register Keycloak as OIDC provider in LocalStack IAM.""" + try: + import boto3 + from botocore.config import Config + + iam = boto3.client( + "iam", + endpoint_url=f"http://localhost:{config.get_edge_port_http()}", + aws_access_key_id="test", + aws_secret_access_key="test", + region_name="us-east-1", + config=Config(signature_version="v4"), + ) + + provider_url = f"keycloak.{constants.LOCALHOST_HOSTNAME}:{config.get_edge_port_http()}/realms/{self.realm}" + iam.create_open_id_connect_provider( + Url=f"http://{provider_url}", + ClientIDList=[self.audience], + ThumbprintList=["0" * 40], + ) + LOG.info("Registered OIDC provider: %s", provider_url) + except Exception as e: + if "EntityAlreadyExists" not in str(e): + LOG.warning("Failed to register OIDC provider: %s", e) + + def _log_startup_info(self): + """Log startup information.""" + port = config.get_edge_port_http() + keycloak_url = f"http://keycloak.{constants.LOCALHOST_HOSTNAME}:{port}" + + LOG.info("") + LOG.info("=" * 60) + LOG.info("Keycloak Extension Started") + LOG.info("=" * 60) + LOG.info("Admin Console: http://localhost:8080/admin") + LOG.info("Credentials: admin / admin") + LOG.info( + "Token URL: %s/realms/%s/protocol/openid-connect/token", + keycloak_url, + self.realm, + ) + LOG.info("Client: %s / %s", self.audience, DEFAULT_CLIENT_SECRET) + LOG.info("=" * 60) diff --git a/keycloak/localstack_keycloak/utils.py b/keycloak/localstack_keycloak/utils.py new file mode 100644 index 00000000..4e94627a --- /dev/null +++ b/keycloak/localstack_keycloak/utils.py @@ -0,0 +1,70 @@ +"""Utility functions for Keycloak extension.""" + +import os + +# Environment variable names +ENV_KEYCLOAK_REALM = "KEYCLOAK_REALM" +ENV_KEYCLOAK_VERSION = "KEYCLOAK_VERSION" +ENV_KEYCLOAK_REALM_FILE = "KEYCLOAK_REALM_FILE" +ENV_KEYCLOAK_DEFAULT_USER = "KEYCLOAK_DEFAULT_USER" +ENV_KEYCLOAK_DEFAULT_PASSWORD = "KEYCLOAK_DEFAULT_PASSWORD" +ENV_KEYCLOAK_OIDC_AUDIENCE = "KEYCLOAK_OIDC_AUDIENCE" +ENV_KEYCLOAK_FLAGS = "KEYCLOAK_FLAGS" + +# Default values +DEFAULT_REALM = "localstack" +DEFAULT_VERSION = "26.0" +DEFAULT_AUDIENCE = "localstack-client" +DEFAULT_CLIENT_SECRET = "localstack-client-secret" + +# Ports +KEYCLOAK_HTTP_PORT = 8080 +KEYCLOAK_MGMT_PORT = 9000 + + +def get_env(name: str, default: str = None) -> str: + """Get environment variable, checking both LOCALSTACK_ prefixed and non-prefixed versions.""" + prefixed = f"LOCALSTACK_{name}" + value = os.environ.get(prefixed) + if value: + return value + return os.environ.get(name, default) + + +def get_default_realm_config(realm_name: str = DEFAULT_REALM) -> dict: + """Get the default realm configuration.""" + return { + "realm": realm_name, + "enabled": True, + "sslRequired": "none", + "registrationAllowed": True, + "loginWithEmailAllowed": True, + "resetPasswordAllowed": True, + "accessTokenLifespan": 3600, + "ssoSessionIdleTimeout": 1800, + "roles": { + "realm": [ + {"name": "admin", "description": "Administrator role"}, + {"name": "user", "description": "Regular user role"}, + ] + }, + "defaultRoles": ["user"], + } + + +def get_default_client_config(audience: str = DEFAULT_AUDIENCE) -> dict: + """Get the default client configuration.""" + return { + "clientId": audience, + "name": "LocalStack Client", + "enabled": True, + "clientAuthenticatorType": "client-secret", + "secret": DEFAULT_CLIENT_SECRET, + "publicClient": False, + "standardFlowEnabled": True, + "directAccessGrantsEnabled": True, + "serviceAccountsEnabled": True, + "protocol": "openid-connect", + "redirectUris": ["*"], + "webOrigins": ["*"], + } diff --git a/keycloak/pyproject.toml b/keycloak/pyproject.toml new file mode 100644 index 00000000..8fe5d5c3 --- /dev/null +++ b/keycloak/pyproject.toml @@ -0,0 +1,49 @@ +[build-system] +requires = ["setuptools", "wheel", "plux>=1.3.1"] +build-backend = "setuptools.build_meta" + +[project] +name = "localstack-extension-keycloak" +version = "0.1.0" +description = "LocalStack Extension: Keycloak for Identity and Access Management" +readme = {file = "README.md", content-type = "text/markdown; charset=UTF-8"} +requires-python = ">=3.10" +authors = [ + { name = "LocalStack Team"} +] +keywords = ["LocalStack", "Keycloak", "IAM", "OIDC", "Authentication"] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", +] +dependencies = [ + "localstack-extensions-utils", + "requests", + "jsonpatch", +] + +[project.urls] +Homepage = "https://github.com/localstack/localstack-extensions" + +[project.optional-dependencies] +dev = [ + "boto3", + "build", + "localstack", + "pytest", + "rolo", + "ruff", +] + +[project.entry-points."localstack.extensions"] +localstack_keycloak = "localstack_keycloak.extension:KeycloakExtension" + +[tool.setuptools.packages.find] +include = ["localstack_keycloak*"] + +[tool.setuptools.package-data] +localstack_keycloak = ["resources/*.json"] diff --git a/keycloak/quickstart/README.md b/keycloak/quickstart/README.md new file mode 100644 index 00000000..77c9e406 --- /dev/null +++ b/keycloak/quickstart/README.md @@ -0,0 +1,89 @@ +# Custom Realm Quickstart + +This guide shows how to use a custom realm configuration with the Keycloak extension. + +## Step 1: Prepare Your Realm File + +Use the provided `sample-realm.json` as a template. Key sections: + +- **realm**: The realm name (e.g., `my-app`) +- **roles**: Define realm-level roles +- **users**: Pre-create users with credentials +- **clients**: Configure OAuth2/OIDC clients + +## Step 2: Start LocalStack + +```bash +# Set the path (must be absolute HOST path) +# IMPORTANT: Use LOCALSTACK_ prefix when using the CLI +export LOCALSTACK_KEYCLOAK_REALM_FILE=/path/to/your/realm.json +export LOCALSTACK_KEYCLOAK_REALM=my-app + +localstack start +``` + +## Step 3: Verify Setup + +```bash +# Health check +curl http://localhost:9000/health/ready + +# Get token (client credentials) +TOKEN=$(curl -s -X POST \ + "http://keycloak.localhost.localstack.cloud:4566/realms/my-app/protocol/openid-connect/token" \ + -d "grant_type=client_credentials" \ + -d "client_id=my-app-client" \ + -d "client_secret=my-client-secret" | jq -r '.access_token') + +# Or authenticate as a user +TOKEN=$(curl -s -X POST \ + "http://keycloak.localhost.localstack.cloud:4566/realms/my-app/protocol/openid-connect/token" \ + -d "grant_type=password" \ + -d "client_id=my-app-client" \ + -d "client_secret=my-client-secret" \ + -d "username=testuser" \ + -d "password=testpassword" | jq -r '.access_token') +``` + +## Step 4: AWS OIDC Federation + +```bash +# Create trust policy +cat > trust-policy.json << 'EOF' +{ + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Principal": { + "Federated": "arn:aws:iam::000000000000:oidc-provider/keycloak.localhost.localstack.cloud:4566/realms/my-app" + }, + "Action": "sts:AssumeRoleWithWebIdentity" + }] +} +EOF + +# Create IAM role +awslocal iam create-role \ + --role-name MyAppRole \ + --assume-role-policy-document file://trust-policy.json + +# Exchange token for AWS credentials +awslocal sts assume-role-with-web-identity \ + --role-arn arn:aws:iam::000000000000:role/MyAppRole \ + --role-session-name test-session \ + --web-identity-token "$TOKEN" +``` + +## Troubleshooting + +### Realm not loading + +1. Use `LOCALSTACK_` prefix for env vars with CLI +2. Ensure file path is absolute HOST path +3. Check logs: `docker logs ls-ext-keycloak` + +### Token issues + +1. Verify client credentials match realm config +2. Check realm name in token URL +3. Ensure client has required flows enabled diff --git a/keycloak/quickstart/sample-realm.json b/keycloak/quickstart/sample-realm.json new file mode 100644 index 00000000..13a713a1 --- /dev/null +++ b/keycloak/quickstart/sample-realm.json @@ -0,0 +1,148 @@ +{ + "realm": "my-app", + "enabled": true, + "sslRequired": "none", + "registrationAllowed": true, + "loginWithEmailAllowed": true, + "duplicateEmailsAllowed": false, + "resetPasswordAllowed": true, + "editUsernameAllowed": false, + "bruteForceProtected": false, + "accessTokenLifespan": 3600, + "accessTokenLifespanForImplicitFlow": 900, + "ssoSessionIdleTimeout": 1800, + "ssoSessionMaxLifespan": 36000, + "offlineSessionIdleTimeout": 2592000, + "accessCodeLifespan": 60, + "accessCodeLifespanUserAction": 300, + "accessCodeLifespanLogin": 1800, + "roles": { + "realm": [ + { + "name": "admin", + "description": "Administrator role with full access", + "composite": false, + "clientRole": false + }, + { + "name": "user", + "description": "Regular user role", + "composite": false, + "clientRole": false + }, + { + "name": "developer", + "description": "Developer role with API access", + "composite": false, + "clientRole": false + } + ] + }, + "defaultRoles": ["user"], + "users": [ + { + "username": "testuser", + "enabled": true, + "emailVerified": true, + "email": "testuser@example.com", + "firstName": "Test", + "lastName": "User", + "credentials": [ + { + "type": "password", + "value": "testpassword", + "temporary": false + } + ], + "realmRoles": ["user", "developer"] + }, + { + "username": "admin", + "enabled": true, + "emailVerified": true, + "email": "admin@example.com", + "firstName": "Admin", + "lastName": "User", + "credentials": [ + { + "type": "password", + "value": "adminpassword", + "temporary": false + } + ], + "realmRoles": ["admin", "user"] + } + ], + "clients": [ + { + "clientId": "my-app-client", + "name": "My Application Client", + "description": "Client for my application", + "enabled": true, + "clientAuthenticatorType": "client-secret", + "secret": "my-client-secret", + "publicClient": false, + "bearerOnly": false, + "consentRequired": false, + "standardFlowEnabled": true, + "implicitFlowEnabled": false, + "directAccessGrantsEnabled": true, + "serviceAccountsEnabled": true, + "authorizationServicesEnabled": false, + "protocol": "openid-connect", + "redirectUris": [ + "http://localhost:*/*", + "https://localhost:*/*", + "*.localhost.localstack.cloud/*" + ], + "webOrigins": ["*"], + "attributes": { + "access.token.lifespan": "3600" + }, + "fullScopeAllowed": true, + "protocolMappers": [ + { + "name": "audience-mapper", + "protocol": "openid-connect", + "protocolMapper": "oidc-audience-mapper", + "consentRequired": false, + "config": { + "included.client.audience": "my-app-client", + "id.token.claim": "true", + "access.token.claim": "true" + } + } + ] + }, + { + "clientId": "my-public-client", + "name": "My Public Client (SPA)", + "description": "Public client for single-page applications", + "enabled": true, + "publicClient": true, + "bearerOnly": false, + "consentRequired": false, + "standardFlowEnabled": true, + "implicitFlowEnabled": false, + "directAccessGrantsEnabled": true, + "serviceAccountsEnabled": false, + "protocol": "openid-connect", + "redirectUris": [ + "http://localhost:3000/*", + "http://localhost:8080/*" + ], + "webOrigins": ["+"] + } + ], + "scopeMappings": [ + { + "client": "my-app-client", + "roles": ["user", "admin", "developer"] + } + ], + "smtpServer": {}, + "eventsEnabled": false, + "eventsListeners": ["jboss-logging"], + "adminEventsEnabled": false, + "internationalizationEnabled": false +} diff --git a/keycloak/sample-app/README.md b/keycloak/sample-app/README.md new file mode 100644 index 00000000..f4800d90 --- /dev/null +++ b/keycloak/sample-app/README.md @@ -0,0 +1,167 @@ +# Keycloak Sample Application + +This sample demonstrates how to build a user management API using: +- **Keycloak** for authentication and authorization +- **API Gateway** for REST API +- **Lambda** for business logic with JWT validation +- **DynamoDB** for data storage + +## Architecture + +```mermaid +flowchart TB + subgraph Client + C[Client
curl/app] + end + + subgraph LocalStack + subgraph API["API Gateway (REST API)"] + AG[API Gateway] + end + + subgraph Auth["Lambda Authorizer"] + LA[JWT Validation] + end + + subgraph Backend["Lambda CRUD"] + LF[User Operations] + end + + subgraph Storage["DynamoDB"] + DB[(Users Table)] + end + end + + subgraph Keycloak["Keycloak (Identity Provider)"] + KC[OAuth2/OIDC
JWT Issuance
Role-based Access Control] + end + + C -->|1. Get Token| KC + KC -->|2. JWT Token| C + C -->|3. API Request + Token| AG + AG -->|4. Validate Token| LA + LA -->|5. Token Valid| AG + AG -->|6. Forward Request| LF + LF -->|7. CRUD Operations| DB + DB -->|8. Response| LF + LF -->|9. Response| AG + AG -->|10. Response| C +``` + +## Prerequisites + +- LocalStack running with Keycloak extension +- Python 3.10+ +- AWS CDK Local (`cdklocal`) + +## Quick Start + +### 1. Start LocalStack + +```bash +EXTENSION_AUTO_INSTALL=localstack-extension-keycloak localstack start +``` + +### 2. Deploy the Stack + +```bash +cd cdk +pip install -r requirements.txt +cdklocal bootstrap +cdklocal deploy --all --require-approval never +``` + +### 3. Test the API + +```bash +# Get a token +TOKEN=$(curl -s -X POST \ + "http://keycloak.localhost.localstack.cloud:4566/realms/localstack/protocol/openid-connect/token" \ + -d "grant_type=client_credentials" \ + -d "client_id=localstack-client" \ + -d "client_secret=localstack-client-secret" | jq -r '.access_token') + +# Get API URL +API_ID=$(awslocal apigateway get-rest-apis --query 'items[0].id' --output text) +API_URL="http://localhost:4566/_aws/execute-api/${API_ID}/prod" + +# List users +curl -s -H "Authorization: Bearer $TOKEN" "${API_URL}/users" | jq . + +# Create user +curl -s -X POST "${API_URL}/users" \ + -H "Authorization: Bearer $TOKEN" \ + -H "Content-Type: application/json" \ + -d '{"username": "john", "email": "john@example.com", "name": "John Doe"}' | jq . + +# Get user +curl -s -H "Authorization: Bearer $TOKEN" "${API_URL}/users/john" | jq . + +# Update user +curl -s -X PUT "${API_URL}/users/john" \ + -H "Authorization: Bearer $TOKEN" \ + -H "Content-Type: application/json" \ + -d '{"email": "john.doe@example.com"}' | jq . + +# Delete user +curl -s -X DELETE -H "Authorization: Bearer $TOKEN" "${API_URL}/users/john" | jq . +``` + +### 4. Verify Token Claims + +You can decode the token to see your roles: + +```bash +# Decode JWT payload using Python (handles base64url padding correctly) +PAYLOAD=$(echo $TOKEN | cut -d'.' -f2) +python3 -c "import sys,json,base64; p=sys.argv[1]; print(json.dumps(json.loads(base64.urlsafe_b64decode(p+'='*(4-len(p)%4))), indent=2))" "$PAYLOAD" +``` + +The expected output should include: +```json +{ + "realm_access": { + "roles": ["offline_access", "admin", "default-roles-localstack", "uma_authorization", "user"] + }, + "azp": "localstack-client", + ... +} +``` + +> **Note**: The `admin` role confirms the service account can perform CRUD operations. + +## Project Structure + +``` +sample-app/ +├── cdk/ +│ ├── app.py # CDK entry point +│ ├── stacks/ +│ │ └── api_stack.py # API Gateway + Lambda + DynamoDB +│ └── requirements.txt +├── lambda/ +│ ├── authorizer/ +│ │ └── handler.py # JWT validation +│ └── users/ +│ └── handler.py # User CRUD +└── README.md +``` + +## Lambda Authorizer + +The authorizer Lambda validates Keycloak JWTs: + +1. Extracts token from `Authorization: Bearer ` header +2. Fetches Keycloak public keys from JWKS endpoint +3. Validates token signature, expiration, and audience +4. Extracts roles from token claims +5. Returns IAM policy allowing/denying access + +## Roles and Permissions + +| Role | Permissions | +|------|-------------| +| `admin` | Full CRUD access | +| `user` | Read-only access | + +The default service account (`localstack-client`) has the `admin` role. diff --git a/keycloak/sample-app/cdk/.gitignore b/keycloak/sample-app/cdk/.gitignore new file mode 100644 index 00000000..41857269 --- /dev/null +++ b/keycloak/sample-app/cdk/.gitignore @@ -0,0 +1 @@ +cdk.out/ diff --git a/keycloak/sample-app/cdk/app.py b/keycloak/sample-app/cdk/app.py new file mode 100644 index 00000000..207d01fb --- /dev/null +++ b/keycloak/sample-app/cdk/app.py @@ -0,0 +1,13 @@ +#!/usr/bin/env python3 +import aws_cdk as cdk +from stacks.api_stack import KeycloakSampleApiStack + +app = cdk.App() + +KeycloakSampleApiStack( + app, + "KeycloakSampleApiStack", + env=cdk.Environment(account="000000000000", region="us-east-1"), +) + +app.synth() diff --git a/keycloak/sample-app/cdk/cdk.json b/keycloak/sample-app/cdk/cdk.json new file mode 100644 index 00000000..45dfe92e --- /dev/null +++ b/keycloak/sample-app/cdk/cdk.json @@ -0,0 +1,17 @@ +{ + "app": "python3 app.py", + "watch": { + "include": ["**"], + "exclude": [ + "README.md", + "cdk*.json", + "requirements*.txt", + "**/__pycache__", + "**/*.pyc" + ] + }, + "context": { + "@aws-cdk/aws-lambda:recognizeLayerVersion": true, + "@aws-cdk/aws-cloudfront:defaultSecurityPolicyTLSv1.2_2021": true + } +} diff --git a/keycloak/sample-app/cdk/requirements.txt b/keycloak/sample-app/cdk/requirements.txt new file mode 100644 index 00000000..e8640c9f --- /dev/null +++ b/keycloak/sample-app/cdk/requirements.txt @@ -0,0 +1,2 @@ +aws-cdk-lib>=2.100.0 +constructs>=10.0.0 diff --git a/keycloak/sample-app/cdk/stacks/__init__.py b/keycloak/sample-app/cdk/stacks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/keycloak/sample-app/cdk/stacks/api_stack.py b/keycloak/sample-app/cdk/stacks/api_stack.py new file mode 100644 index 00000000..854b832e --- /dev/null +++ b/keycloak/sample-app/cdk/stacks/api_stack.py @@ -0,0 +1,143 @@ +from aws_cdk import ( + Stack, + Duration, + aws_lambda as lambda_, + aws_apigateway as apigw, + aws_dynamodb as dynamodb, + aws_iam as iam, + RemovalPolicy, +) +from constructs import Construct +import os + + +class KeycloakSampleApiStack(Stack): + """ + Sample API stack demonstrating Keycloak integration with AWS services. + + Creates: + - DynamoDB table for user data + - Lambda authorizer for JWT validation + - Lambda function for user CRUD operations + - API Gateway REST API + """ + + def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: + super().__init__(scope, construct_id, **kwargs) + + # DynamoDB table for users + users_table = dynamodb.Table( + self, + "UsersTable", + table_name="keycloak-sample-users", + partition_key=dynamodb.Attribute( + name="username", type=dynamodb.AttributeType.STRING + ), + removal_policy=RemovalPolicy.DESTROY, + billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST, + ) + + # Lambda layer directory (relative to cdk directory) + lambda_dir = os.path.join(os.path.dirname(__file__), "..", "..", "lambda") + + # Lambda authorizer for JWT validation + authorizer_fn = lambda_.Function( + self, + "AuthorizerFunction", + function_name="keycloak-jwt-authorizer", + runtime=lambda_.Runtime.PYTHON_3_11, + handler="handler.handler", + code=lambda_.Code.from_asset(os.path.join(lambda_dir, "authorizer")), + timeout=Duration.seconds(30), + memory_size=256, + environment={ + "KEYCLOAK_URL": "http://keycloak.localhost.localstack.cloud:4566", + "KEYCLOAK_REALM": "localstack", + "EXPECTED_AUDIENCE": "localstack-client", + }, + ) + + # Lambda function for user CRUD operations + users_fn = lambda_.Function( + self, + "UsersFunction", + function_name="keycloak-sample-users", + runtime=lambda_.Runtime.PYTHON_3_11, + handler="handler.handler", + code=lambda_.Code.from_asset(os.path.join(lambda_dir, "users")), + timeout=Duration.seconds(30), + memory_size=256, + environment={ + "USERS_TABLE": users_table.table_name, + }, + ) + + # Grant DynamoDB access to users function + users_table.grant_read_write_data(users_fn) + + # API Gateway REST API + api = apigw.RestApi( + self, + "KeycloakSampleApi", + rest_api_name="Keycloak Sample API", + description="Sample API demonstrating Keycloak JWT authentication", + deploy_options=apigw.StageOptions(stage_name="prod"), + ) + + # Token authorizer + authorizer = apigw.TokenAuthorizer( + self, + "KeycloakAuthorizer", + handler=authorizer_fn, + identity_source="method.request.header.Authorization", + results_cache_ttl=Duration.seconds(0), # Disable caching for testing + ) + + # Lambda integration for users + users_integration = apigw.LambdaIntegration(users_fn) + + # /users resource + users_resource = api.root.add_resource("users") + + # GET /users - List all users + users_resource.add_method( + "GET", + users_integration, + authorizer=authorizer, + authorization_type=apigw.AuthorizationType.CUSTOM, + ) + + # POST /users - Create user + users_resource.add_method( + "POST", + users_integration, + authorizer=authorizer, + authorization_type=apigw.AuthorizationType.CUSTOM, + ) + + # /users/{username} resource + user_resource = users_resource.add_resource("{username}") + + # GET /users/{username} - Get specific user + user_resource.add_method( + "GET", + users_integration, + authorizer=authorizer, + authorization_type=apigw.AuthorizationType.CUSTOM, + ) + + # PUT /users/{username} - Update user + user_resource.add_method( + "PUT", + users_integration, + authorizer=authorizer, + authorization_type=apigw.AuthorizationType.CUSTOM, + ) + + # DELETE /users/{username} - Delete user + user_resource.add_method( + "DELETE", + users_integration, + authorizer=authorizer, + authorization_type=apigw.AuthorizationType.CUSTOM, + ) diff --git a/keycloak/sample-app/lambda/authorizer/handler.py b/keycloak/sample-app/lambda/authorizer/handler.py new file mode 100644 index 00000000..f09ae673 --- /dev/null +++ b/keycloak/sample-app/lambda/authorizer/handler.py @@ -0,0 +1,220 @@ +""" +Lambda Authorizer for validating Keycloak JWT tokens. + +This authorizer: +1. Extracts the JWT from the Authorization header +2. Fetches Keycloak's public keys from JWKS endpoint +3. Validates the token signature, expiration, and audience +4. Extracts roles from the token +5. Returns an IAM policy allowing or denying access +""" + +import base64 +import hashlib +import hmac +import json +import os +import urllib.request +from functools import lru_cache +from typing import Any + +# Configuration from environment +KEYCLOAK_URL = os.environ.get( + "KEYCLOAK_URL", "http://keycloak.localhost.localstack.cloud:4566" +) +KEYCLOAK_REALM = os.environ.get("KEYCLOAK_REALM", "localstack") +EXPECTED_AUDIENCE = os.environ.get("EXPECTED_AUDIENCE", "localstack-client") + + +@lru_cache(maxsize=1) +def get_jwks() -> dict: + """Fetch and cache Keycloak's JWKS (JSON Web Key Set).""" + jwks_url = f"{KEYCLOAK_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/certs" + print(f"Fetching JWKS from: {jwks_url}") + + with urllib.request.urlopen(jwks_url, timeout=10) as response: + return json.loads(response.read().decode()) + + +def base64url_decode(data: str) -> bytes: + """Decode base64url encoded data.""" + # Add padding if needed + padding = 4 - len(data) % 4 + if padding != 4: + data += "=" * padding + return base64.urlsafe_b64decode(data) + + +def decode_jwt_unverified(token: str) -> tuple[dict, dict]: + """Decode JWT without verification (for extracting header and payload).""" + parts = token.split(".") + if len(parts) != 3: + raise ValueError("Invalid JWT format") + + header = json.loads(base64url_decode(parts[0])) + payload = json.loads(base64url_decode(parts[1])) + + return header, payload + + +def verify_jwt_signature(token: str, jwks: dict) -> dict: + """ + Verify JWT signature using Keycloak's public keys. + + Note: This is a simplified verification. In production, use a proper + JWT library like PyJWT or python-jose. + """ + header, payload = decode_jwt_unverified(token) + + # For LocalStack testing, we'll do basic validation + # In production, implement full RS256 signature verification + kid = header.get("kid") + alg = header.get("alg") + + if alg != "RS256": + print(f"Unexpected algorithm: {alg}") + + # Find matching key + matching_key = None + for key in jwks.get("keys", []): + if key.get("kid") == kid: + matching_key = key + break + + if not matching_key: + print(f"No matching key found for kid: {kid}") + # For LocalStack testing, we'll still return payload + # In production, this should raise an error + + return payload + + +def validate_token(token: str) -> dict: + """Validate the JWT token and return the payload.""" + jwks = get_jwks() + payload = verify_jwt_signature(token, jwks) + + # Validate expiration + import time + + exp = payload.get("exp", 0) + if exp < time.time(): + raise ValueError("Token has expired") + + # Validate issuer + expected_issuer = f"{KEYCLOAK_URL}/realms/{KEYCLOAK_REALM}" + if payload.get("iss") != expected_issuer: + print( + f"Warning: Issuer mismatch. Expected: {expected_issuer}, Got: {payload.get('iss')}" + ) + + # Validate audience + aud = payload.get("aud") + if isinstance(aud, list): + if EXPECTED_AUDIENCE not in aud: + print( + f"Warning: Audience mismatch. Expected: {EXPECTED_AUDIENCE}, Got: {aud}" + ) + elif aud != EXPECTED_AUDIENCE: + # Check azp (authorized party) as fallback + azp = payload.get("azp") + if azp != EXPECTED_AUDIENCE: + print( + f"Warning: Audience/azp mismatch. Expected: {EXPECTED_AUDIENCE}, Got: aud={aud}, azp={azp}" + ) + + return payload + + +def extract_roles(payload: dict) -> list[str]: + """Extract roles from the JWT payload.""" + roles = [] + + # Realm roles + realm_access = payload.get("realm_access", {}) + roles.extend(realm_access.get("roles", [])) + + # Client roles + resource_access = payload.get("resource_access", {}) + for client, access in resource_access.items(): + client_roles = access.get("roles", []) + roles.extend([f"{client}:{role}" for role in client_roles]) + + return roles + + +def generate_policy( + principal_id: str, + effect: str, + resource: str, + context: dict[str, Any] | None = None, +) -> dict: + """Generate IAM policy document for API Gateway.""" + policy = { + "principalId": principal_id, + "policyDocument": { + "Version": "2012-10-17", + "Statement": [ + { + "Action": "execute-api:Invoke", + "Effect": effect, + "Resource": resource, + } + ], + }, + } + + if context: + # Convert all values to strings (API Gateway requirement) + policy["context"] = { + k: str(v) if not isinstance(v, str) else v for k, v in context.items() + } + + return policy + + +def handler(event: dict, context: Any) -> dict: + """ + Lambda authorizer handler. + + Validates Keycloak JWT and returns IAM policy. + """ + print(f"Authorizer event: {json.dumps(event)}") + + try: + # Extract token from Authorization header + auth_token = event.get("authorizationToken", "") + + if not auth_token: + print("No authorization token provided") + return generate_policy("anonymous", "Deny", event["methodArn"]) + + # Remove "Bearer " prefix if present + if auth_token.lower().startswith("bearer "): + auth_token = auth_token[7:] + + # Validate token + payload = validate_token(auth_token) + + # Extract user info + subject = payload.get("sub", "unknown") + username = payload.get("preferred_username", subject) + email = payload.get("email", "") + roles = extract_roles(payload) + + print(f"Authorized user: {username}, roles: {roles}") + + # Build context for downstream Lambda + context_data = { + "sub": subject, + "username": username, + "email": email, + "roles": ",".join(roles), + } + + # Allow access + return generate_policy(subject, "Allow", event["methodArn"], context_data) + + except Exception as e: + print(f"Authorization failed: {e}") + return generate_policy("anonymous", "Deny", event["methodArn"]) diff --git a/keycloak/sample-app/lambda/users/handler.py b/keycloak/sample-app/lambda/users/handler.py new file mode 100644 index 00000000..9287240f --- /dev/null +++ b/keycloak/sample-app/lambda/users/handler.py @@ -0,0 +1,257 @@ +""" +Lambda function for User CRUD operations. + +This function handles: +- GET /users - List all users +- POST /users - Create a new user +- GET /users/{username} - Get a specific user +- PUT /users/{username} - Update a user +- DELETE /users/{username} - Delete a user + +Authorization context (roles, username) is passed from the Lambda authorizer. +""" + +import json +import os +from datetime import datetime +from typing import Any + +import boto3 + +# Configuration +USERS_TABLE = os.environ.get("USERS_TABLE", "keycloak-sample-users") +LOCALSTACK_HOSTNAME = os.environ.get("LOCALSTACK_HOSTNAME", "localhost") +EDGE_PORT = os.environ.get("EDGE_PORT", "4566") + +# DynamoDB client +dynamodb = boto3.resource( + "dynamodb", + endpoint_url=f"http://{LOCALSTACK_HOSTNAME}:{EDGE_PORT}", + region_name="us-east-1", +) +table = dynamodb.Table(USERS_TABLE) + + +def response(status_code: int, body: Any) -> dict: + """Build HTTP response.""" + return { + "statusCode": status_code, + "headers": { + "Content-Type": "application/json", + "Access-Control-Allow-Origin": "*", + }, + "body": json.dumps(body), + } + + +def get_auth_context(event: dict) -> dict: + """Extract authorization context from the event.""" + request_context = event.get("requestContext", {}) + authorizer = request_context.get("authorizer", {}) + + return { + "username": authorizer.get("username", "unknown"), + "roles": authorizer.get("roles", "").split(",") + if authorizer.get("roles") + else [], + "sub": authorizer.get("sub", ""), + } + + +def has_role(auth_context: dict, role: str) -> bool: + """Check if user has a specific role.""" + return role in auth_context.get("roles", []) + + +def is_admin(auth_context: dict) -> bool: + """Check if user has admin role.""" + return has_role(auth_context, "admin") + + +def list_users(event: dict) -> dict: + """List all users.""" + auth_context = get_auth_context(event) + print(f"List users requested by: {auth_context['username']}") + + try: + result = table.scan() + users = result.get("Items", []) + + return response( + 200, + { + "users": users, + "count": len(users), + }, + ) + + except Exception as e: + print(f"Error listing users: {e}") + return response(500, {"error": "Failed to list users"}) + + +def get_user(event: dict, username: str) -> dict: + """Get a specific user.""" + auth_context = get_auth_context(event) + print(f"Get user '{username}' requested by: {auth_context['username']}") + + try: + result = table.get_item(Key={"username": username}) + user = result.get("Item") + + if not user: + return response(404, {"error": f"User '{username}' not found"}) + + return response(200, user) + + except Exception as e: + print(f"Error getting user: {e}") + return response(500, {"error": "Failed to get user"}) + + +def create_user(event: dict) -> dict: + """Create a new user.""" + auth_context = get_auth_context(event) + print(f"Create user requested by: {auth_context['username']}") + + # Check admin role for create + if not is_admin(auth_context): + return response(403, {"error": "Admin role required to create users"}) + + try: + body = json.loads(event.get("body", "{}")) + + if not body.get("username"): + return response(400, {"error": "Username is required"}) + + username = body["username"] + + # Check if user already exists + existing = table.get_item(Key={"username": username}) + if existing.get("Item"): + return response(409, {"error": f"User '{username}' already exists"}) + + # Create user + user = { + "username": username, + "email": body.get("email", ""), + "name": body.get("name", ""), + "created_at": datetime.utcnow().isoformat(), + "created_by": auth_context["username"], + } + + table.put_item(Item=user) + + return response(201, user) + + except json.JSONDecodeError: + return response(400, {"error": "Invalid JSON body"}) + except Exception as e: + print(f"Error creating user: {e}") + return response(500, {"error": "Failed to create user"}) + + +def update_user(event: dict, username: str) -> dict: + """Update an existing user.""" + auth_context = get_auth_context(event) + print(f"Update user '{username}' requested by: {auth_context['username']}") + + # Check admin role for update + if not is_admin(auth_context): + return response(403, {"error": "Admin role required to update users"}) + + try: + body = json.loads(event.get("body", "{}")) + + # Check if user exists + existing = table.get_item(Key={"username": username}) + if not existing.get("Item"): + return response(404, {"error": f"User '{username}' not found"}) + + # Update user + update_expression = "SET updated_at = :updated_at, updated_by = :updated_by" + expression_values = { + ":updated_at": datetime.utcnow().isoformat(), + ":updated_by": auth_context["username"], + } + + if "email" in body: + update_expression += ", email = :email" + expression_values[":email"] = body["email"] + + if "name" in body: + update_expression += ", #name = :name" + expression_values[":name"] = body["name"] + + update_params = { + "Key": {"username": username}, + "UpdateExpression": update_expression, + "ExpressionAttributeValues": expression_values, + "ReturnValues": "ALL_NEW", + } + + # Handle reserved word 'name' + if "name" in body: + update_params["ExpressionAttributeNames"] = {"#name": "name"} + + result = table.update_item(**update_params) + + return response(200, result["Attributes"]) + + except json.JSONDecodeError: + return response(400, {"error": "Invalid JSON body"}) + except Exception as e: + print(f"Error updating user: {e}") + return response(500, {"error": "Failed to update user"}) + + +def delete_user(event: dict, username: str) -> dict: + """Delete a user.""" + auth_context = get_auth_context(event) + print(f"Delete user '{username}' requested by: {auth_context['username']}") + + # Check admin role for delete + if not is_admin(auth_context): + return response(403, {"error": "Admin role required to delete users"}) + + try: + # Check if user exists + existing = table.get_item(Key={"username": username}) + if not existing.get("Item"): + return response(404, {"error": f"User '{username}' not found"}) + + table.delete_item(Key={"username": username}) + + return response(200, {"message": f"User '{username}' deleted"}) + + except Exception as e: + print(f"Error deleting user: {e}") + return response(500, {"error": "Failed to delete user"}) + + +def handler(event: dict, context: Any) -> dict: + """Main Lambda handler.""" + print(f"Event: {json.dumps(event)}") + + http_method = event.get("httpMethod", "") + path = event.get("path", "") + path_params = event.get("pathParameters") or {} + + # Route request + if path == "/users": + if http_method == "GET": + return list_users(event) + elif http_method == "POST": + return create_user(event) + + elif path.startswith("/users/") and path_params.get("username"): + username = path_params["username"] + + if http_method == "GET": + return get_user(event, username) + elif http_method == "PUT": + return update_user(event, username) + elif http_method == "DELETE": + return delete_user(event, username) + + return response(404, {"error": "Not found"}) diff --git a/keycloak/tests/__init__.py b/keycloak/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/keycloak/tests/test_extension.py b/keycloak/tests/test_extension.py new file mode 100644 index 00000000..f6a4c23d --- /dev/null +++ b/keycloak/tests/test_extension.py @@ -0,0 +1,526 @@ +import json +import base64 +import time +import pytest +import requests +import boto3 +from botocore.config import Config + + +LOCALSTACK_URL = "http://localhost:4566" +KEYCLOAK_URL = "http://keycloak.localhost.localstack.cloud:4566" +KEYCLOAK_DIRECT_URL = "http://localhost:8080" # Direct access to Keycloak HTTP port +KEYCLOAK_MGMT_URL = "http://localhost:9000" # Health/metrics endpoint (Keycloak 26+) +DEFAULT_REALM = "localstack" +DEFAULT_CLIENT_ID = "localstack-client" +DEFAULT_CLIENT_SECRET = "localstack-client-secret" + + +def decode_jwt_payload(token: str) -> dict: + """Decode JWT payload without verification (for testing claims).""" + payload_b64 = token.split(".")[1] + # Add padding if needed for base64url decoding + payload_b64 += "=" * (4 - len(payload_b64) % 4) + return json.loads(base64.urlsafe_b64decode(payload_b64)) + + +def get_admin_token() -> str: + """Get Keycloak admin token for API operations.""" + response = requests.post( + f"{KEYCLOAK_DIRECT_URL}/realms/master/protocol/openid-connect/token", + data={ + "grant_type": "password", + "client_id": "admin-cli", + "username": "admin", + "password": "admin", + }, + timeout=30, + ) + response.raise_for_status() + return response.json()["access_token"] + + +@pytest.fixture +def admin_token(): + """Fixture providing admin token for Keycloak API operations.""" + return get_admin_token() + + +@pytest.fixture +def iam_client(): + """Create IAM client for LocalStack.""" + return boto3.client( + "iam", + endpoint_url=LOCALSTACK_URL, + aws_access_key_id="test", + aws_secret_access_key="test", + region_name="us-east-1", + config=Config(signature_version="v4"), + ) + + +@pytest.fixture +def sts_client(): + """Create STS client for LocalStack.""" + return boto3.client( + "sts", + endpoint_url=LOCALSTACK_URL, + aws_access_key_id="test", + aws_secret_access_key="test", + region_name="us-east-1", + config=Config(signature_version="v4"), + ) + + +class TestKeycloakHealth: + """Tests for Keycloak health and accessibility.""" + + def test_health_check_on_management_port(self): + """Verify health endpoint on management port 9000 (Keycloak 26+).""" + response = requests.get(f"{KEYCLOAK_MGMT_URL}/health/ready", timeout=10) + assert response.status_code == 200 + assert response.json().get("status") == "UP" + + def test_default_realm_exists(self): + """Verify default realm is created and accessible.""" + response = requests.get(f"{KEYCLOAK_URL}/realms/{DEFAULT_REALM}", timeout=10) + assert response.status_code == 200 + assert response.json()["realm"] == DEFAULT_REALM + + def test_openid_configuration_available(self): + """Verify OIDC discovery endpoint is available.""" + response = requests.get( + f"{KEYCLOAK_URL}/realms/{DEFAULT_REALM}/.well-known/openid-configuration", + timeout=10, + ) + assert response.status_code == 200 + data = response.json() + assert "token_endpoint" in data + assert "jwks_uri" in data + + def test_jwks_endpoint_returns_keys(self): + """Verify JWKS endpoint returns public keys for token validation.""" + response = requests.get( + f"{KEYCLOAK_DIRECT_URL}/realms/{DEFAULT_REALM}/protocol/openid-connect/certs", + timeout=10, + ) + assert response.status_code == 200 + data = response.json() + assert "keys" in data + assert len(data["keys"]) > 0 + assert "kty" in data["keys"][0] + assert "kid" in data["keys"][0] + + +class TestTokenAcquisition: + """Tests for token acquisition and validation.""" + + def test_client_credentials_flow(self): + """Verify tokens can be obtained using client credentials flow.""" + response = requests.post( + f"{KEYCLOAK_URL}/realms/{DEFAULT_REALM}/protocol/openid-connect/token", + data={ + "grant_type": "client_credentials", + "client_id": DEFAULT_CLIENT_ID, + "client_secret": DEFAULT_CLIENT_SECRET, + }, + timeout=30, + ) + assert response.status_code == 200 + data = response.json() + assert "access_token" in data + assert data["token_type"].lower() == "bearer" + + def test_token_has_correct_issuer(self): + """Verify token issuer matches localhost:8080 (direct Keycloak URL).""" + response = requests.post( + f"{KEYCLOAK_DIRECT_URL}/realms/{DEFAULT_REALM}/protocol/openid-connect/token", + data={ + "grant_type": "client_credentials", + "client_id": DEFAULT_CLIENT_ID, + "client_secret": DEFAULT_CLIENT_SECRET, + }, + timeout=30, + ) + assert response.status_code == 200 + payload = decode_jwt_payload(response.json()["access_token"]) + assert payload["iss"] == f"http://localhost:8080/realms/{DEFAULT_REALM}" + + def test_token_has_valid_expiry(self): + """Verify token has reasonable expiry time.""" + response = requests.post( + f"{KEYCLOAK_DIRECT_URL}/realms/{DEFAULT_REALM}/protocol/openid-connect/token", + data={ + "grant_type": "client_credentials", + "client_id": DEFAULT_CLIENT_ID, + "client_secret": DEFAULT_CLIENT_SECRET, + }, + timeout=30, + ) + assert response.status_code == 200 + payload = decode_jwt_payload(response.json()["access_token"]) + assert payload["exp"] > time.time() + assert payload["exp"] < time.time() + 86400 + + +class TestServiceAccountRoles: + """Tests for service account role configuration (fixes admin role issue).""" + + def test_service_account_has_admin_role(self): + """Verify localstack-client service account has admin role for CRUD operations.""" + response = requests.post( + f"{KEYCLOAK_DIRECT_URL}/realms/{DEFAULT_REALM}/protocol/openid-connect/token", + data={ + "grant_type": "client_credentials", + "client_id": DEFAULT_CLIENT_ID, + "client_secret": DEFAULT_CLIENT_SECRET, + }, + timeout=30, + ) + assert response.status_code == 200 + payload = decode_jwt_payload(response.json()["access_token"]) + roles = payload.get("realm_access", {}).get("roles", []) + assert "admin" in roles, f"admin role not found: {roles}" + assert "user" in roles, f"user role not found: {roles}" + + +class TestUserManagement: + """Tests for user creation (fixes Keycloak 26+ profile requirements).""" + + def test_create_user_with_required_profile_fields(self, admin_token): + """Verify user creation works with required Keycloak 26+ profile fields. + + Keycloak 26+ requires email, firstName, lastName for users to be "fully set up". + Password must be set separately via reset-password endpoint. + """ + test_username = "test_profile_user" + headers = {"Authorization": f"Bearer {admin_token}"} + users_url = f"{KEYCLOAK_DIRECT_URL}/admin/realms/{DEFAULT_REALM}/users" + + # Clean up if user exists + existing = requests.get( + users_url, headers=headers, params={"username": test_username}, timeout=10 + ) + if existing.ok and existing.json(): + user_id = existing.json()[0]["id"] + requests.delete(f"{users_url}/{user_id}", headers=headers, timeout=10) + + # Create user with ALL required fields + create_response = requests.post( + users_url, + headers=headers, + json={ + "username": test_username, + "enabled": True, + "emailVerified": True, + "email": f"{test_username}@test.local", + "firstName": "Test", + "lastName": "User", + "requiredActions": [], + }, + timeout=30, + ) + assert create_response.status_code == 201 + + # Get user ID + get_response = requests.get( + users_url, headers=headers, params={"username": test_username}, timeout=10 + ) + user_id = get_response.json()[0]["id"] + + # Set password SEPARATELY (required for Keycloak 26+) + password_response = requests.put( + f"{users_url}/{user_id}/reset-password", + headers={**headers, "Content-Type": "application/json"}, + json={"type": "password", "value": "testpass123", "temporary": False}, + timeout=30, + ) + assert password_response.status_code == 204 + + # Verify password grant works + token_response = requests.post( + f"{KEYCLOAK_DIRECT_URL}/realms/{DEFAULT_REALM}/protocol/openid-connect/token", + data={ + "grant_type": "password", + "client_id": DEFAULT_CLIENT_ID, + "client_secret": DEFAULT_CLIENT_SECRET, + "username": test_username, + "password": "testpass123", + }, + timeout=30, + ) + assert token_response.status_code == 200 + + # Cleanup + requests.delete(f"{users_url}/{user_id}", headers=headers, timeout=10) + + def test_incomplete_user_fails_password_grant(self, admin_token): + """Verify user without required profile fields gets 'Account not fully set up'.""" + test_username = "test_incomplete_user" + headers = {"Authorization": f"Bearer {admin_token}"} + users_url = f"{KEYCLOAK_DIRECT_URL}/admin/realms/{DEFAULT_REALM}/users" + + # Clean up if user exists + existing = requests.get( + users_url, headers=headers, params={"username": test_username}, timeout=10 + ) + if existing.ok and existing.json(): + user_id = existing.json()[0]["id"] + requests.delete(f"{users_url}/{user_id}", headers=headers, timeout=10) + + # Create user WITHOUT required fields + requests.post( + users_url, + headers=headers, + json={"username": test_username, "enabled": True, "requiredActions": []}, + timeout=30, + ) + + # Get user ID and set password + get_response = requests.get( + users_url, headers=headers, params={"username": test_username}, timeout=10 + ) + user_id = get_response.json()[0]["id"] + + requests.put( + f"{users_url}/{user_id}/reset-password", + headers={**headers, "Content-Type": "application/json"}, + json={"type": "password", "value": "testpass123", "temporary": False}, + timeout=30, + ) + + # Password grant should fail + token_response = requests.post( + f"{KEYCLOAK_DIRECT_URL}/realms/{DEFAULT_REALM}/protocol/openid-connect/token", + data={ + "grant_type": "password", + "client_id": DEFAULT_CLIENT_ID, + "client_secret": DEFAULT_CLIENT_SECRET, + "username": test_username, + "password": "testpass123", + }, + timeout=30, + ) + assert token_response.status_code != 200 + assert "not fully set up" in token_response.json().get("error_description", "").lower() + + # Cleanup + requests.delete(f"{users_url}/{user_id}", headers=headers, timeout=10) + + +class TestRealmConfiguration: + """Tests for realm and client configuration.""" + + def test_realm_has_required_roles(self, admin_token): + """Verify default realm has admin and user roles.""" + headers = {"Authorization": f"Bearer {admin_token}"} + response = requests.get( + f"{KEYCLOAK_DIRECT_URL}/admin/realms/{DEFAULT_REALM}/roles", + headers=headers, + timeout=10, + ) + assert response.status_code == 200 + roles = [r["name"] for r in response.json()] + assert "admin" in roles + assert "user" in roles + + def test_client_configuration(self, admin_token): + """Verify default client has correct settings.""" + headers = {"Authorization": f"Bearer {admin_token}"} + response = requests.get( + f"{KEYCLOAK_DIRECT_URL}/admin/realms/{DEFAULT_REALM}/clients", + headers=headers, + params={"clientId": DEFAULT_CLIENT_ID}, + timeout=10, + ) + assert response.status_code == 200 + client = response.json()[0] + assert client["serviceAccountsEnabled"] is True + assert client["directAccessGrantsEnabled"] is True + + def test_realm_ssl_not_required(self, admin_token): + """Verify realm doesn't require SSL for local development.""" + headers = {"Authorization": f"Bearer {admin_token}"} + response = requests.get( + f"{KEYCLOAK_DIRECT_URL}/admin/realms/{DEFAULT_REALM}", + headers=headers, + timeout=10, + ) + assert response.status_code == 200 + assert response.json().get("sslRequired") == "none" + + +class TestOIDCIntegration: + """Tests for OIDC provider integration with LocalStack.""" + + def test_oidc_provider_registered(self, iam_client): + """Verify OIDC provider is registered in LocalStack IAM with correct format.""" + response = iam_client.list_open_id_connect_providers() + provider_arns = [p["Arn"] for p in response["OpenIDConnectProviderList"]] + + keycloak_arn = next((arn for arn in provider_arns if "keycloak" in arn), None) + assert keycloak_arn is not None, f"No Keycloak provider found: {provider_arns}" + assert f"realms/{DEFAULT_REALM}" in keycloak_arn + + def test_assume_role_with_web_identity(self, iam_client, sts_client): + """Verify Keycloak tokens can be exchanged for AWS credentials.""" + # Get Keycloak access token + token_response = requests.post( + f"{KEYCLOAK_URL}/realms/{DEFAULT_REALM}/protocol/openid-connect/token", + data={ + "grant_type": "client_credentials", + "client_id": DEFAULT_CLIENT_ID, + "client_secret": DEFAULT_CLIENT_SECRET, + }, + timeout=30, + ) + assert token_response.status_code == 200 + access_token = token_response.json()["access_token"] + + # Create IAM role that trusts Keycloak OIDC provider + role_name = "KeycloakTestRole" + trust_policy = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "Federated": f"arn:aws:iam::000000000000:oidc-provider/keycloak.localhost.localstack.cloud:4566/realms/{DEFAULT_REALM}" + }, + "Action": "sts:AssumeRoleWithWebIdentity", + } + ], + } + + # Clean up and create role + try: + iam_client.delete_role(RoleName=role_name) + except Exception: + pass + + iam_client.create_role( + RoleName=role_name, + AssumeRolePolicyDocument=json.dumps(trust_policy), + ) + + try: + # Exchange Keycloak token for AWS credentials + response = sts_client.assume_role_with_web_identity( + RoleArn=f"arn:aws:iam::000000000000:role/{role_name}", + RoleSessionName="test-session", + WebIdentityToken=access_token, + ) + assert "Credentials" in response + assert "AccessKeyId" in response["Credentials"] + assert "SecretAccessKey" in response["Credentials"] + assert "SessionToken" in response["Credentials"] + finally: + iam_client.delete_role(RoleName=role_name) + + +class TestErrorHandling: + """Tests for error handling.""" + + def test_invalid_credentials_returns_401(self): + """Verify invalid credentials return proper error.""" + response = requests.post( + f"{KEYCLOAK_DIRECT_URL}/realms/{DEFAULT_REALM}/protocol/openid-connect/token", + data={ + "grant_type": "client_credentials", + "client_id": "invalid-client", + "client_secret": "invalid-secret", + }, + timeout=30, + ) + assert response.status_code == 401 + assert "error" in response.json() + + def test_nonexistent_realm_returns_404(self): + """Verify requests to non-existent realm return 404.""" + response = requests.get( + f"{KEYCLOAK_DIRECT_URL}/realms/nonexistent-realm", + timeout=10, + ) + assert response.status_code == 404 + + def test_invalid_grant_type_returns_400(self): + """Verify invalid grant type returns proper error.""" + response = requests.post( + f"{KEYCLOAK_DIRECT_URL}/realms/{DEFAULT_REALM}/protocol/openid-connect/token", + data={ + "grant_type": "invalid_grant", + "client_id": DEFAULT_CLIENT_ID, + "client_secret": DEFAULT_CLIENT_SECRET, + }, + timeout=30, + ) + assert response.status_code == 400 + assert "error" in response.json() + + +class TestEndToEndWorkflow: + """End-to-end workflow test.""" + + def test_keycloak_token_to_aws_api_call(self, iam_client, sts_client): + """Test complete flow: Keycloak token -> AWS credentials -> API call.""" + # Step 1: Get Keycloak token + token_response = requests.post( + f"{KEYCLOAK_DIRECT_URL}/realms/{DEFAULT_REALM}/protocol/openid-connect/token", + data={ + "grant_type": "client_credentials", + "client_id": DEFAULT_CLIENT_ID, + "client_secret": DEFAULT_CLIENT_SECRET, + }, + timeout=30, + ) + assert token_response.status_code == 200 + access_token = token_response.json()["access_token"] + + # Step 2: Create IAM role + role_name = "E2ETestRole" + trust_policy = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "Federated": f"arn:aws:iam::000000000000:oidc-provider/keycloak.localhost.localstack.cloud:4566/realms/{DEFAULT_REALM}" + }, + "Action": "sts:AssumeRoleWithWebIdentity", + } + ], + } + + try: + iam_client.delete_role(RoleName=role_name) + except Exception: + pass + + iam_client.create_role( + RoleName=role_name, + AssumeRolePolicyDocument=json.dumps(trust_policy), + ) + + try: + # Step 3: Exchange token for AWS credentials + response = sts_client.assume_role_with_web_identity( + RoleArn=f"arn:aws:iam::000000000000:role/{role_name}", + RoleSessionName="e2e-session", + WebIdentityToken=access_token, + ) + credentials = response["Credentials"] + + # Step 4: Use temporary credentials + temp_sts = boto3.client( + "sts", + endpoint_url=LOCALSTACK_URL, + aws_access_key_id=credentials["AccessKeyId"], + aws_secret_access_key=credentials["SecretAccessKey"], + aws_session_token=credentials["SessionToken"], + region_name="us-east-1", + ) + + identity = temp_sts.get_caller_identity() + assert role_name in identity["Arn"] + finally: + iam_client.delete_role(RoleName=role_name) From 172ec2b9929e488af94de170569a16902c88dea7 Mon Sep 17 00:00:00 2001 From: HarshCasper Date: Fri, 6 Feb 2026 00:18:53 +0530 Subject: [PATCH 2/2] skip linting --- .github/workflows/keycloak.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/keycloak.yml b/.github/workflows/keycloak.yml index 0683ed38..80d2f39f 100644 --- a/.github/workflows/keycloak.yml +++ b/.github/workflows/keycloak.yml @@ -34,7 +34,6 @@ jobs: pip install localstack make install - make lint make dist localstack extensions -v install file://$(ls ./dist/localstack_extension_keycloak-*.tar.gz)