diff --git a/.testing/python/keycloak-auth/test.sh b/.testing/python/keycloak-auth/test.sh new file mode 100755 index 0000000..b8e9996 --- /dev/null +++ b/.testing/python/keycloak-auth/test.sh @@ -0,0 +1,243 @@ +#!/bin/bash +# +# E2E test for python/keycloak-auth +# +# Spins up a real Keycloak instance in Docker, creates a realm + client + user, +# gets a real token, starts the function, and calls it with that token. +# +# This tests the one thing unit tests can't: the actual JWKS fetch from a +# real Keycloak server and end-to-end token validation. +# +# Expects these vars from run-e2e.sh: +# REPO_ROOT, LANGUAGE, TEMPLATE, FUNC_BIN, TEMPLATE_REPO, VERBOSE +# +# Prerequisites: +# - docker (or podman) +# - func CLI installed +# - curl, jq + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +WORKDIR=$(mktemp -d) +LISTEN_ADDRESS="${LISTEN_ADDRESS:-127.0.0.1:8080}" +KC_PORT="${KC_PORT:-18080}" +KC_CONTAINER="keycloak-e2e-$$" #keycloak-e2e-54321 (shell's PID, unique per run) +RUN_PID="" + +# Keycloak test config +KC_ADMIN_USER="admin" +KC_ADMIN_PASS="admin" +KC_REALM="e2e-test" +KC_CLIENT_ID="func-client" +KC_TEST_USER="testuser" +KC_TEST_PASS="testpassword" + +if [[ "$VERBOSE" == "true" ]]; then OUT=/dev/stdout; else OUT=/dev/null; fi + +# Container runtime choice +if command -v docker &>/dev/null; then + CONTAINER_RT=docker +elif command -v podman &>/dev/null; then + CONTAINER_RT=podman +else + echo "FAIL: docker or podman required" + exit 1 +fi + +cleanup() { + if [[ -n "$RUN_PID" ]] && ps -p "$RUN_PID" > /dev/null 2>&1; then + kill "$RUN_PID" 2>/dev/null || true + wait "$RUN_PID" 2>/dev/null || true + fi + lsof -ti ":${LISTEN_ADDRESS##*:}" 2>/dev/null | xargs kill 2>/dev/null || true + $CONTAINER_RT rm -f "$KC_CONTAINER" &>/dev/null || true + rm -rf "$WORKDIR" +} +trap cleanup EXIT + +KC_URL="http://localhost:$KC_PORT" + +# ─── 1. Start Keycloak ────────────────────────────────────── +echo "[1/6] Starting Keycloak..." + +$CONTAINER_RT run -d --name "$KC_CONTAINER" \ + -p "$KC_PORT:8080" \ + -e KC_BOOTSTRAP_ADMIN_USERNAME="$KC_ADMIN_USER" \ + -e KC_BOOTSTRAP_ADMIN_PASSWORD="$KC_ADMIN_PASS" \ + quay.io/keycloak/keycloak:latest start-dev >"$OUT" + +# Wait for Keycloak to be ready (can take 30-60s) +for i in $(seq 1 60); do + if curl -sf "$KC_URL/realms/master" &>/dev/null; then + break + fi + if [[ $i -eq 60 ]]; then + echo "FAIL: Keycloak did not start within 120s" + exit 1 + fi + sleep 2 +done + +echo " OK (Keycloak ready at $KC_URL)" + +# ─── 2. Configure Keycloak (realm, client, user) ──────────── +echo "[2/6] Configuring Keycloak realm..." + +# Get admin token +ADMIN_TOKEN=$(curl -sf -X POST \ + "$KC_URL/realms/master/protocol/openid-connect/token" \ + -d "grant_type=password" \ + -d "client_id=admin-cli" \ + -d "username=$KC_ADMIN_USER" \ + -d "password=$KC_ADMIN_PASS" | jq -r '.access_token') + +if [[ -z "$ADMIN_TOKEN" || "$ADMIN_TOKEN" == "null" ]]; then + echo "FAIL: Could not get admin token" + exit 1 +fi + +# Create realm +curl -sf -X POST "$KC_URL/admin/realms" \ + -H "Authorization: Bearer $ADMIN_TOKEN" \ + -H "Content-Type: application/json" \ + -d "{\"realm\": \"$KC_REALM\", \"enabled\": true}" >"$OUT" + +# Create public client (no client secret needed for password grant) +curl -sf -X POST "$KC_URL/admin/realms/$KC_REALM/clients" \ + -H "Authorization: Bearer $ADMIN_TOKEN" \ + -H "Content-Type: application/json" \ + -d "{ + \"clientId\": \"$KC_CLIENT_ID\", + \"enabled\": true, + \"publicClient\": true, + \"directAccessGrantsEnabled\": true + }" >"$OUT" + +# Create test user +curl -sf -X POST "$KC_URL/admin/realms/$KC_REALM/users" \ + -H "Authorization: Bearer $ADMIN_TOKEN" \ + -H "Content-Type: application/json" \ + -d "{ + \"username\": \"$KC_TEST_USER\", + \"firstName\": \"Test\", + \"lastName\": \"User\", + \"email\": \"testuser@example.com\", + \"emailVerified\": true, + \"enabled\": true, + \"requiredActions\": [], + \"credentials\": [{ + \"type\": \"password\", + \"value\": \"$KC_TEST_PASS\", + \"temporary\": false + }] + }" >"$OUT" + +echo " OK (realm=$KC_REALM, client=$KC_CLIENT_ID, user=$KC_TEST_USER)" + +# ─── 3. Get a real user token ─────────────────────────────── +echo "[3/6] Getting user token from Keycloak..." + +TOKEN_RESPONSE=$(curl -s -X POST \ + "$KC_URL/realms/$KC_REALM/protocol/openid-connect/token" \ + -d "grant_type=password" \ + -d "client_id=$KC_CLIENT_ID" \ + -d "username=$KC_TEST_USER" \ + -d "password=$KC_TEST_PASS") + +USER_TOKEN=$(echo "$TOKEN_RESPONSE" | jq -r '.access_token') + +if [[ -z "$USER_TOKEN" || "$USER_TOKEN" == "null" ]]; then + echo "FAIL: Could not get user token" + echo " Response: $TOKEN_RESPONSE" + exit 1 +fi + +echo " OK (got token, $(echo "$USER_TOKEN" | wc -c | tr -d ' ') bytes)" + +# ─── 4. Create function from template ─────────────────────── +echo "[4/6] Creating function from template..." + +cd "$WORKDIR" +$FUNC_BIN create e2e-test -r "$TEMPLATE_REPO" -l "$LANGUAGE" -t "$TEMPLATE" >"$OUT" +cd e2e-test + +echo " OK" + +# ─── 5. Start function server ─────────────────────────────── +echo "[5/6] Starting function server..." + +if lsof -ti ":${LISTEN_ADDRESS##*:}" &>/dev/null; then + echo "FAIL: Port ${LISTEN_ADDRESS##*:} is already in use" + exit 1 +fi + +LISTEN_ADDRESS="$LISTEN_ADDRESS" \ + $FUNC_BIN run --builder=host \ + -e "KEYCLOAK_URL=$KC_URL" \ + -e "KEYCLOAK_REALM=$KC_REALM" >"$OUT" 2>&1 & +RUN_PID=$! + +for i in $(seq 1 30); do + if curl -sf "http://$LISTEN_ADDRESS/" &>/dev/null; then + break + fi + if ! ps -p "$RUN_PID" > /dev/null 2>&1; then + echo "FAIL: func run process died" + exit 1 + fi + sleep 2 +done + +if ! curl -sf "http://$LISTEN_ADDRESS/" &>/dev/null; then + echo "FAIL: Function server did not start within 60s" + exit 1 +fi + +echo " OK" + +# ─── 6. Test the function ─────────────────────────────────── +echo "[6/6] Running tests..." + +BASE="http://$LISTEN_ADDRESS" + +# 6a. Public endpoint (no auth) +echo " [a] GET / (public) ..." +RESP=$(curl -sf "$BASE/") +echo "$RESP" | jq -e '.name == "keycloak-auth"' >"$OUT" +echo " OK → 200" + +# 6b. Auth with valid token +echo " [b] GET /auth/whoami (valid token) ..." +RESP=$(curl -sf -H "Authorization: Bearer $USER_TOKEN" "$BASE/auth/whoami") +echo "$RESP" | jq -e '.authenticated == true' >"$OUT" +echo "$RESP" | jq -e '.claims.preferred_username == "testuser"' >"$OUT" +echo " OK → 200, username=testuser" + +# 6c. No token +echo " [c] GET /auth/whoami (no token) ..." +HTTP_CODE=$(curl -s -o /dev/null -w "%{http_code}" "$BASE/auth/whoami") +if [[ "$HTTP_CODE" != "401" ]]; then + echo " FAIL: expected 401, got $HTTP_CODE" + exit 1 +fi +echo " OK → 401" + +# 6d. Garbage token +echo " [d] GET /auth/whoami (garbage token) ..." +HTTP_CODE=$(curl -s -o /dev/null -w "%{http_code}" \ + -H "Authorization: Bearer this.is.garbage" "$BASE/auth/whoami") +if [[ "$HTTP_CODE" != "403" ]]; then + echo " FAIL: expected 403, got $HTTP_CODE" + exit 1 +fi +echo " OK → 403" + +# 6e. Valid token again (function still works after bad requests) +echo " [e] GET /auth/whoami (valid token again) ..." +RESP=$(curl -sf -H "Authorization: Bearer $USER_TOKEN" "$BASE/auth/whoami") +echo "$RESP" | jq -e '.authenticated == true' >"$OUT" +echo " OK → 200" + +echo "" +echo "=== PASS ===" diff --git a/AGENTS.md b/AGENTS.md index e67595e..73a37cf 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -50,6 +50,7 @@ Python-specific: | `python/mcp-ollama` | Exposes Ollama LLM as MCP tools (list/pull/call models). Needs Ollama. | | `python/mcp-ollama-rag` | RAG via MCP — combines Ollama with Chroma vector DB for document Q&A. Needs Ollama. | | `python/ollama-client` | HTTP wrapper that forwards prompts to a local Ollama server. Needs Ollama. | +| `python/keycloak-auth` | Validates Keycloak JWT Bearer tokens via OIDC/JWKS. Protects endpoints with auth. | | `python/llamacpp` | Loads a Granite code model via llama.cpp for local text generation. | For contributing to this repo, see [CONTRIBUTING.md](CONTRIBUTING.md). diff --git a/python/keycloak-auth/README.md b/python/keycloak-auth/README.md new file mode 100644 index 0000000..83c4987 --- /dev/null +++ b/python/keycloak-auth/README.md @@ -0,0 +1,215 @@ +# Python HTTP Function - Keycloak Auth + +A Knative Function that validates JWT Bearer tokens issued by a Keycloak +realm. Demonstrates how to protect HTTP endpoints with OpenID Connect (OIDC) +authentication in a serverless function. + +## How It Works + +``` +User logs into Keycloak + -> receives a JWT (signed with Keycloak's private key) + -> sends request to this function with Bearer + -> function validates the JWT signature using Keycloak's public keys (JWKS) + -> returns the user's identity claims +``` + +No shared secrets needed. The function fetches Keycloak's public signing keys +from the standard JWKS endpoint and verifies tokens locally (no per-request +calls back to Keycloak). + +## Prerequisites + +- A Keycloak instance (any reachable endpoint) +- `func` CLI ([github](https://github.com/knative/func/blob/main/README.md)) +- `curl` and `jq` for testing + +> **No Keycloak yet?** The Quick Start below runs one locally via Docker. + +## Quick Start + +### 1. Create the function + +```bash +func create myfunc \ + -r https://github.com/functions-dev/templates \ + -l python -t keycloak-auth +cd myfunc +``` + +### 2. Run Keycloak locally + +```bash +docker run -d --name keycloak \ + -p 18080:8080 \ + -e KC_BOOTSTRAP_ADMIN_USERNAME=admin \ + -e KC_BOOTSTRAP_ADMIN_PASSWORD=admin \ + quay.io/keycloak/keycloak:latest start-dev +``` + +Wait for it to start (~30-60 seconds): + +```bash +until curl -sf http://localhost:18080/realms/master >/dev/null; do sleep 2; done +echo "Keycloak ready" +``` + +### 3. Set up a realm, client, and user + +Get an admin token and create everything via the Keycloak Admin REST API: + +```bash +# Admin token +ADMIN_TOKEN=$(curl -s -X POST \ + "http://localhost:18080/realms/master/protocol/openid-connect/token" \ + -d "grant_type=password" \ + -d "client_id=admin-cli" \ + -d "username=admin" \ + -d "password=admin" | jq -r '.access_token') + +# Create realm +curl -sf -X POST "http://localhost:18080/admin/realms" \ + -H "Authorization: Bearer $ADMIN_TOKEN" \ + -H "Content-Type: application/json" \ + -d '{"realm": "myrealm", "enabled": true}' + +# Create client (public, with password grant enabled) +curl -sf -X POST "http://localhost:18080/admin/realms/myrealm/clients" \ + -H "Authorization: Bearer $ADMIN_TOKEN" \ + -H "Content-Type: application/json" \ + -d '{ + "clientId": "my-func-client", + "enabled": true, + "publicClient": true, + "directAccessGrantsEnabled": true + }' + +# Create user +curl -sf -X POST "http://localhost:18080/admin/realms/myrealm/users" \ + -H "Authorization: Bearer $ADMIN_TOKEN" \ + -H "Content-Type: application/json" \ + -d '{ + "username": "testuser", + "firstName": "Test", + "lastName": "User", + "email": "testuser@example.com", + "emailVerified": true, + "enabled": true, + "requiredActions": [], + "credentials": [{ + "type": "password", + "value": "testpassword", + "temporary": false + }] + }' +``` + +### 4. Run the function + +```bash +func run --builder=host \ + -e "KEYCLOAK_URL=http://localhost:18080" \ + -e "KEYCLOAK_REALM=myrealm" +``` + +This blocks the terminal. **Open a second terminal** for the next steps. + +### 5. Get a token and test + +In your second terminal, get a token and call the function: + +```bash +# Get a token +TOKEN=$(curl -s -X POST \ + "http://localhost:18080/realms/myrealm/protocol/openid-connect/token" \ + -d "grant_type=password" \ + -d "client_id=my-func-client" \ + -d "username=testuser" \ + -d "password=testpassword" | jq -r '.access_token') + +echo "$TOKEN" | cut -c1-50 # should print a long base64 string + +# Public endpoint (no auth needed) +curl -s http://localhost:8080/ | jq . + +# See your identity (auth required) +curl -s -H "Authorization: Bearer $TOKEN" \ + http://localhost:8080/auth/whoami | jq . + +# No token -> 401 +curl -s -o /dev/null -w "%{http_code}\n" http://localhost:8080/auth/whoami + +# Bad token -> 403 +curl -s -o /dev/null -w "%{http_code}\n" \ + -H "Authorization: Bearer fake.token.here" \ + http://localhost:8080/auth/whoami +``` + +> **Tip:** Keycloak access tokens expire after **5 minutes** by default. +> If you get a 401 "token expired" response, re-run the `TOKEN=...` +> command above to get a fresh one. + +## Configuration + +Set via environment variables or `func run -e`: + +| Variable | Required | Description | Example | +|---|---|---|---| +| `KEYCLOAK_URL` | Yes | Base URL of Keycloak | `http://localhost:18080` | +| `KEYCLOAK_REALM` | Yes | Keycloak realm name | `myrealm` | +| `KEYCLOAK_AUDIENCE` | No | Expected `aud` claim | `my-func-client` | + +## Endpoints + +| Method | Path | Auth | Description | +|---|---|---|---| +| GET | `/` | No | Function info and available endpoints | +| GET | `/auth/whoami` | Yes | Returns authenticated user's token claims | +| POST | `/auth/verify` | Yes | Validates the token and returns claims | + +## Response Examples + +### GET /auth/whoami (valid token) +```json +{ + "authenticated": true, + "claims": { + "sub": "user-uuid", + "preferred_username": "testuser", + "email": "testuser@example.com", + "realm_access": {"roles": ["default-roles-myrealm"]}, + "iss": "http://localhost:18080/realms/myrealm", + "exp": 1712345678 + } +} +``` + +### Missing token (401) +```json +{"error": "authentication required", "detail": "No Authorization header found"} +``` + +### Invalid token (403) +```json +{"error": "invalid token", "detail": "Token could not be decoded: ..."} +``` + +### Keycloak not configured (503) +```json +{"error": "Keycloak not configured", "hint": "Set KEYCLOAK_URL and KEYCLOAK_REALM environment variables"} +``` + +## Cleanup + +```bash +docker rm -f keycloak +``` + +## Development + +```bash +pip install -e '.[dev]' +pytest tests/ +``` + +For more, see [the complete documentation](https://github.com/knative/func/tree/main/docs) diff --git a/python/keycloak-auth/function/__init__.py b/python/keycloak-auth/function/__init__.py new file mode 100644 index 0000000..c16dbac --- /dev/null +++ b/python/keycloak-auth/function/__init__.py @@ -0,0 +1 @@ +from .func import new diff --git a/python/keycloak-auth/function/func.py b/python/keycloak-auth/function/func.py new file mode 100644 index 0000000..0bd7469 --- /dev/null +++ b/python/keycloak-auth/function/func.py @@ -0,0 +1,256 @@ +"""Keycloak-authenticated HTTP function. + +A Knative Function that validates JWT Bearer tokens issued by a Keycloak +realm. Demonstrates how to protect HTTP endpoints with OIDC authentication +in a serverless function. + +Endpoints: + GET / → public info about this function (no auth) + GET /auth/whoami → returns the authenticated user's token claims + POST /auth/verify → validates a token and returns its claims + +All /auth/* endpoints require a valid "Authorization: Bearer " header. + +Configuration (environment variables): + KEYCLOAK_URL → base URL of Keycloak (e.g. https://keycloak.example.com) + KEYCLOAK_REALM → realm name (e.g. myrealm) + KEYCLOAK_AUDIENCE → optional: expected 'aud' claim in the token +""" + +import json +import logging + +from .keycloak_auth import ( + KeycloakAuthenticator, + TokenExpired, + TokenInvalid, + TokenMissing, +) + + +def new(): + """Entry point — called once by the Knative Functions runtime.""" + return Function() + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +async def send_json(send, body: dict, status: int = 200) -> None: + """Send a JSON response through the ASGI interface. + + ASGI (Asynchronous Server Gateway Interface) is the protocol between + the web server and your Python function. Every response is two messages: + 1. http.response.start → status code + headers + 2. http.response.body → the actual bytes + """ + payload = json.dumps(body).encode() + await send({ + "type": "http.response.start", + "status": status, + "headers": [ + [b"content-type", b"application/json"], + ], + }) + await send({ + "type": "http.response.body", + "body": payload, + }) + + +def extract_bearer_token(scope: dict) -> str: + """Extract the Bearer token from the ASGI scope's headers. + + ASGI headers are a list of [name, value] pairs, both as bytes. + We're looking for: [b"authorization", b"Bearer eyJhbG..."] + + Args: + scope: The ASGI scope dict (contains method, path, headers, etc.) + + Returns: + The raw JWT string (without the "Bearer " prefix). + + Raises: + TokenMissing: No Authorization header, or not a Bearer token. + """ + # ASGI stores headers as a list of 2-element byte lists: + # [[b"host", b"example.com"], [b"authorization", b"Bearer xyz"], ...] + headers = scope.get("headers", []) + + for name, value in headers: + if name.lower() == b"authorization": + decoded = value.decode() + + # Must start with "Bearer " (case-sensitive per RFC 6750) + if decoded.startswith("Bearer "): + token = decoded[7:] # skip "Bearer " + if token: + return token + + raise TokenMissing( + "Authorization header must use Bearer scheme" + ) + + raise TokenMissing("No Authorization header found") + + +def get_path(scope: dict) -> str: + """Extract the request path from the ASGI scope.""" + return scope.get("path", "/") + + +# --------------------------------------------------------------------------- +# The Function +# --------------------------------------------------------------------------- + +class Function: + """Keycloak-authenticated HTTP function. + + Lifecycle (managed by the Knative Functions runtime): + 1. new() → creates this instance + 2. start(cfg) → called with env vars — we set up the authenticator + 3. handle(...) → called on every HTTP request + 4. stop() → called on shutdown + """ + + def __init__(self): + self.auth = None # Set up in start() with config + self._configured = False # Track whether Keycloak config was provided + + async def handle(self, scope, receive, send) -> None: + """Handle an HTTP request. + + This is the ASGI handler — the runtime calls it for every request. + + Args: + scope: Dict with request metadata (method, path, headers, etc.) + receive: Async callable to read the request body + send: Async callable to send the response + """ + method = scope.get("method", "GET") + path = get_path(scope) + + # --- Public endpoint: function info (no auth required) --- + if path == "/": + return await send_json(send, { + "name": "keycloak-auth", + "description": "Keycloak-authenticated HTTP function", + "endpoints": { + "GET /": "This info (public)", + "GET /auth/whoami": "Your token claims (auth required)", + "POST /auth/verify": "Validate a token (auth required)", + }, + "configured": self._configured, + }) + + # --- All /auth/* endpoints require authentication --- + if not path.startswith("/auth"): + return await send_json(send, {"error": "Not found"}, status=404) + + # Check that Keycloak is configured + if not self._configured or self.auth is None: + return await send_json(send, { + "error": "Keycloak not configured", + "hint": "Set KEYCLOAK_URL and KEYCLOAK_REALM environment variables", + }, status=503) + + # --- Extract and validate the Bearer token --- + try: + token = extract_bearer_token(scope) + claims = self.auth.validate_token(token) + + except TokenMissing as e: + return await send_json(send, { + "error": "authentication required", + "detail": str(e), + }, status=401) + + except TokenExpired as e: + return await send_json(send, { + "error": "token expired", + "detail": str(e), + }, status=401) + + except TokenInvalid as e: + return await send_json(send, { + "error": "invalid token", + "detail": str(e), + }, status=403) + + # --- Token is valid — dispatch to the right endpoint --- + + if path == "/auth/whoami" and method == "GET": + # Return the user's claims from the token. + # Typical Keycloak claims include: + # sub → user UUID + # preferred_username → human-readable username + # email → user's email + # realm_access → {roles: ["user", "admin", ...]} + # iss → issuer URL + # exp → expiry timestamp + return await send_json(send, { + "authenticated": True, + "claims": claims, + }) + + elif path == "/auth/verify" and method == "POST": + # Just confirm the token is valid and return the claims + return await send_json(send, { + "valid": True, + "claims": claims, + }) + + else: + return await send_json(send, {"error": "Not found"}, status=404) + + def start(self, cfg) -> None: + """Called when the function starts — configure the authenticator. + + Args: + cfg: Dict of environment variables. The runtime passes this + (usually a copy of os.environ) so you don't access + os.environ directly. + """ + keycloak_url = cfg.get("KEYCLOAK_URL", "") + realm = cfg.get("KEYCLOAK_REALM", "") + + if not keycloak_url or not realm: + logging.warning( + "KEYCLOAK_URL and/or KEYCLOAK_REALM not set. " + "Auth endpoints will return 503." + ) + self._configured = False + return + + audience = cfg.get("KEYCLOAK_AUDIENCE", "") + + self.auth = KeycloakAuthenticator( + keycloak_url=keycloak_url, + realm=realm, + audience=audience, + ) + self._configured = True + + logging.info( + "Keycloak auth configured: realm=%s url=%s audience=%s", + realm, keycloak_url, audience or "(any)", + ) + + def stop(self) -> None: + """Called when the function stops.""" + logging.info("Function stopping") + + def alive(self) -> tuple: + """Liveness check — is the process healthy?""" + return True, "Alive" + + def ready(self) -> tuple: + """Readiness check — can we serve traffic? + + We report not-ready if Keycloak isn't configured, so the + platform won't route traffic to us until config is provided. + """ + if not self._configured: + return False, "Keycloak not configured" + return True, "Ready" diff --git a/python/keycloak-auth/function/keycloak_auth.py b/python/keycloak-auth/function/keycloak_auth.py new file mode 100644 index 0000000..41e98d1 --- /dev/null +++ b/python/keycloak-auth/function/keycloak_auth.py @@ -0,0 +1,175 @@ +"""Keycloak JWT validation via OIDC public keys (JWKS). + +This module validates Bearer tokens issued by a Keycloak realm without +needing client credentials. It fetches the realm's public signing keys +from the standard JWKS endpoint and verifies JWT signatures locally. + +Flow: + 1. On startup, configure with KEYCLOAK_URL + KEYCLOAK_REALM. + 2. On each request, call validate_token(token_string). + 3. PyJWKClient fetches (and caches) the realm's public keys. + 4. PyJWT verifies the signature, expiry, and issuer. + 5. Returns the decoded claims dict or raises an error. + +No network call to Keycloak per-request — keys are cached and only +refreshed when an unknown key-id (kid) appears (e.g. after key rotation). +""" + +import jwt # PyJWT — the JWT decode/verify library +from jwt import PyJWKClient # Fetches JWKS and caches public keys + + +# --------------------------------------------------------------------------- +# Custom exceptions — so callers can distinguish *why* auth failed +# --------------------------------------------------------------------------- + +class AuthError(Exception): + """Base class for authentication errors.""" + pass + + +class TokenMissing(AuthError): + """No token was provided in the request.""" + pass + + +class TokenExpired(AuthError): + """The token's 'exp' claim is in the past.""" + pass + + +class TokenInvalid(AuthError): + """The token is malformed, has a bad signature, wrong issuer, etc.""" + pass + + +class AuthNotConfigured(AuthError): + """Keycloak URL/realm not set — can't validate anything.""" + pass + + +# --------------------------------------------------------------------------- +# The authenticator +# --------------------------------------------------------------------------- + +class KeycloakAuthenticator: + """Validates JWTs issued by a Keycloak realm. + + Usage: + auth = KeycloakAuthenticator( + keycloak_url="https://keycloak.example.com", + realm="myrealm", + ) + claims = auth.validate_token("eyJhbG...") + print(claims["preferred_username"]) # → "john" + + How JWKS caching works (handled by PyJWKClient): + - First call fetches keys from the JWKS endpoint and caches them. + - Subsequent calls use the cache (no network hit). + - If a token arrives with an unknown 'kid' (key ID), PyJWKClient + automatically re-fetches the JWKS — this handles key rotation. + - Cache lifetime is ~5 minutes by default. + """ + + def __init__(self, keycloak_url: str, realm: str, audience: str = ""): + # Strip trailing slash for clean URL construction + self.keycloak_url = keycloak_url.rstrip("/") + self.realm = realm + self.audience = audience + + # The issuer claim ('iss') Keycloak puts in every token. + # It's always: {base_url}/realms/{realm} + self.issuer = f"{self.keycloak_url}/realms/{self.realm}" + + # JWKS endpoint — where Keycloak publishes its public signing keys. + # This is a standard OIDC endpoint; every OIDC provider has one. + jwks_url = f"{self.issuer}/protocol/openid-connect/certs" + + # PyJWKClient will: + # 1. GET the JWKS URL + # 2. Parse the JSON → extract RSA/EC public keys + # 3. Cache them (keyed by 'kid') + # 4. Auto-refresh if it sees an unknown 'kid' + self.jwks_client = PyJWKClient(jwks_url) + + def validate_token(self, token: str) -> dict: + """Validate a JWT string and return the decoded claims. + + Args: + token: The raw JWT string (without "Bearer " prefix). + + Returns: + dict: The decoded token claims, e.g.: + { + "sub": "user-uuid", + "preferred_username": "john", + "email": "john@example.com", + "realm_access": {"roles": ["user"]}, + "iss": "https://keycloak.example.com/realms/myrealm", + "exp": 1712345678, + ... + } + + Raises: + TokenExpired: Token's 'exp' claim is in the past. + TokenInvalid: Bad signature, wrong issuer, malformed, etc. + """ + try: + # Step 1: Get the signing key that matches this token's 'kid'. + # + # The JWT header contains a 'kid' (key ID) field that says + # "I was signed with key XYZ". PyJWKClient looks up that kid + # in its cached JWKS and returns the matching public key. + signing_key = self.jwks_client.get_signing_key_from_jwt(token) + + # Step 2: Decode and verify the token. + # + # jwt.decode() does ALL of these checks: + # - Verifies the signature using the public key + # - Checks 'exp' (expiration) — rejects expired tokens + # - Checks 'iss' (issuer) — must match our Keycloak realm + # - Checks 'aud' (audience) — if we configured one + # - Checks 'iat' (issued at) and 'nbf' (not before) + # + # algorithms=["RS256"] — Keycloak signs with RSA by default. + # We explicitly list allowed algorithms to prevent algorithm + # confusion attacks (where an attacker tricks you into using + # a weaker algorithm like HS256 with the public key as secret). + decode_options = { + "verify_aud": bool(self.audience), + } + + claims = jwt.decode( + token, + signing_key.key, + algorithms=["RS256"], + issuer=self.issuer, + audience=self.audience if self.audience else None, + options=decode_options, + ) + + return claims + + except jwt.ExpiredSignatureError: + raise TokenExpired("Token has expired") + + except jwt.InvalidIssuerError: + raise TokenInvalid( + f"Token issuer does not match. Expected: {self.issuer}" + ) + + except jwt.InvalidAudienceError: + raise TokenInvalid( + f"Token audience does not match. Expected: {self.audience}" + ) + + except jwt.DecodeError as e: + raise TokenInvalid(f"Token could not be decoded: {e}") + + except jwt.InvalidTokenError as e: + # Catch-all for any other JWT validation error + raise TokenInvalid(f"Token is invalid: {e}") + + except Exception as e: + # Network errors fetching JWKS, etc. + raise TokenInvalid(f"Token validation failed: {e}") diff --git a/python/keycloak-auth/pyproject.toml b/python/keycloak-auth/pyproject.toml new file mode 100644 index 0000000..fb627de --- /dev/null +++ b/python/keycloak-auth/pyproject.toml @@ -0,0 +1,28 @@ +[project] +name = "function" +description = "Keycloak-authenticated HTTP function" +version = "0.1.0" +requires-python = ">=3.9" +readme = "README.md" +license = "MIT" +dependencies = [ + "PyJWT[crypto]", +] +authors = [ + { name = "Your Name", email = "you@example.com" }, +] + +[project.optional-dependencies] +dev = [ + "pytest", + "pytest-asyncio", + "cryptography", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.pytest.ini_options] +asyncio_mode = "strict" +asyncio_default_fixture_loop_scope = "function" diff --git a/python/keycloak-auth/tests/test_func.py b/python/keycloak-auth/tests/test_func.py new file mode 100644 index 0000000..fd35725 --- /dev/null +++ b/python/keycloak-auth/tests/test_func.py @@ -0,0 +1,190 @@ +"""Tests for the keycloak-auth function. + +Tests run WITHOUT a real Keycloak server — we generate our own RSA key pair, +sign JWTs with the private key (playing Keycloak), and mock PyJWKClient to +return our public key. + +Tests are structured as scenarios that mirror real usage: one function +instance handling a sequence of requests, just like production. +""" + +import time +from unittest.mock import MagicMock, patch + +import jwt as pyjwt +import pytest +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.primitives import serialization + +from function import new + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +TEST_PRIVATE_KEY = rsa.generate_private_key(public_exponent=65537, key_size=2048) +TEST_PUBLIC_KEY = TEST_PRIVATE_KEY.public_key() +TEST_KEYCLOAK_URL = "https://keycloak.example.com" +TEST_REALM = "testrealm" +TEST_ISSUER = f"{TEST_KEYCLOAK_URL}/realms/{TEST_REALM}" + + +def make_jwt(claims: dict, private_key=None) -> str: + """Sign a JWT with the given key (default: our test key).""" + if private_key is None: + private_key = TEST_PRIVATE_KEY + pem = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ) + return pyjwt.encode(claims, pem, algorithm="RS256", headers={"kid": "test-key-id"}) + + +def make_claims(**overrides) -> dict: + """Build a valid Keycloak-like claims dict.""" + now = int(time.time()) + claims = { + "sub": "user-uuid-1234", + "preferred_username": "testuser", + "email": "testuser@example.com", + "iss": TEST_ISSUER, + "iat": now, + "exp": now + 3600, + "realm_access": {"roles": ["user"]}, + } + claims.update(overrides) + return claims + + +def make_scope(path: str = "/", method: str = "GET", token: str = "") -> dict: + """Build an ASGI scope dict.""" + headers = [] + if token: + headers.append([b"authorization", f"Bearer {token}".encode()]) + return {"method": method, "path": path, "headers": headers, "query_string": b""} + + +class ResponseCapture: + """Captures ASGI send() calls.""" + + def __init__(self): + self.status = None + self.body = b"" + + async def __call__(self, message): + if message["type"] == "http.response.start": + self.status = message["status"] + elif message["type"] == "http.response.body": + self.body += message.get("body", b"") + + @property + def json(self) -> dict: + import json + return json.loads(self.body) + + +def make_configured_function(): + """Create a Function with mocked JWKS (no network calls).""" + f = new() + with patch("function.keycloak_auth.PyJWKClient") as mock_cls: + mock_key = MagicMock() + mock_key.key = TEST_PUBLIC_KEY + mock_cls.return_value.get_signing_key_from_jwt.return_value = mock_key + f.start({"KEYCLOAK_URL": TEST_KEYCLOAK_URL, "KEYCLOAK_REALM": TEST_REALM}) + return f + + +async def call(f, path="/", method="GET", token=""): + """Helper: call the function and return the captured response.""" + resp = ResponseCapture() + await f.handle(make_scope(path, method, token), None, resp) + return resp + + +# --------------------------------------------------------------------------- +# Scenario 1: Deployed without config → everything fails gracefully +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio(loop_scope="function") +async def test_unconfigured_deployment(): + """Operator deploys the function but forgets KEYCLOAK_URL/REALM.""" + f = new() + f.start({}) + + # ready() should say no — platform won't route traffic yet + ok, msg = f.ready() + assert ok is False + + # but if a request sneaks through, auth endpoints return 503 not a crash + resp = await call(f, "/auth/whoami") + assert resp.status == 503 + + # the public root endpoint still works though + resp = await call(f, "/") + assert resp.status == 200 + + +# --------------------------------------------------------------------------- +# Scenario 2: Legitimate user flow +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio(loop_scope="function") +async def test_legitimate_user_flow(): + """User gets a token from Keycloak and calls protected endpoints.""" + f = make_configured_function() + + # function is ready + ok, _ = f.ready() + assert ok is True + + # user has a valid token + token = make_jwt(make_claims()) + + # GET /auth/whoami → see my identity + resp = await call(f, "/auth/whoami", token=token) + assert resp.status == 200 + assert resp.json["claims"]["preferred_username"] == "testuser" + assert resp.json["claims"]["email"] == "testuser@example.com" + + # POST /auth/verify → confirm token is valid + resp = await call(f, "/auth/verify", method="POST", token=token) + assert resp.status == 200 + assert resp.json["valid"] is True + + +# --------------------------------------------------------------------------- +# Scenario 3: Attacker tries various bad tokens +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio(loop_scope="function") +async def test_rejected_tokens(): + """Various invalid tokens are all rejected by the same function instance.""" + f = make_configured_function() + + # no token at all → 401 + resp = await call(f, "/auth/whoami") + assert resp.status == 401 + + # expired token (exp 1 hour ago) → 401 + expired = make_jwt(make_claims(exp=int(time.time()) - 3600)) + resp = await call(f, "/auth/whoami", token=expired) + assert resp.status == 401 + assert "expired" in resp.json["error"] + + # token signed with a completely different RSA key → 403 + attacker_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + forged = make_jwt(make_claims(), private_key=attacker_key) + resp = await call(f, "/auth/whoami", token=forged) + assert resp.status == 403 + + # garbage string as token → 403 + resp = await call(f, "/auth/whoami", token="not.a.jwt") + assert resp.status == 403 + + # and after all those attacks, a valid token still works + good_token = make_jwt(make_claims()) + resp = await call(f, "/auth/whoami", token=good_token) + assert resp.status == 200 + assert resp.json["authenticated"] is True