From 88c8dc2144d8f387eab0559bbb7a6273de930d79 Mon Sep 17 00:00:00 2001 From: Subhankar Maiti Date: Mon, 2 Feb 2026 13:35:24 +0530 Subject: [PATCH 01/10] feat(auth): Add custom token exchange types and error classes --- .../auth_types/__init__.py | 107 +++++++++++++++++- src/auth0_server_python/error/__init__.py | 19 ++++ 2 files changed, 125 insertions(+), 1 deletion(-) diff --git a/src/auth0_server_python/auth_types/__init__.py b/src/auth0_server_python/auth_types/__init__.py index 677a7da..cd470e0 100644 --- a/src/auth0_server_python/auth_types/__init__.py +++ b/src/auth0_server_python/auth_types/__init__.py @@ -5,7 +5,7 @@ from typing import Any, Optional -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, field_validator, model_validator class UserClaims(BaseModel): @@ -252,3 +252,108 @@ class CompleteConnectAccountResponse(BaseModel): created_at: str expires_at: Optional[str] = None app_state: Optional[Any] = None + + +class CustomTokenExchangeOptions(BaseModel): + """ + Options for custom token exchange (RFC 8693). + + Args: + subject_token: The security token being exchanged + subject_token_type: Identifier indicating the token format + audience: Logical name of target service (optional) + scope: Space-delimited list of scopes (optional) + actor_token: Security token representing the acting party (optional) + actor_token_type: Type of actor token (required if actor_token present) + authorization_params: Additional OAuth parameters (optional) + """ + subject_token: str = Field(..., min_length=1) + subject_token_type: str = Field(..., min_length=1) + audience: Optional[str] = None + scope: Optional[str] = None + actor_token: Optional[str] = None + actor_token_type: Optional[str] = None + authorization_params: Optional[dict[str, Any]] = None + + @field_validator('subject_token', 'actor_token') + @classmethod + def validate_token_format(cls, v: Optional[str]) -> Optional[str]: + """Validate token doesn't have Bearer prefix and isn't whitespace-only.""" + if v is not None: + if not v.strip(): + raise ValueError("Token cannot be empty or whitespace-only") + if v.strip().startswith("Bearer "): + raise ValueError("Token should not include 'Bearer ' prefix") + return v + + @model_validator(mode='after') + def validate_actor_token_type(self) -> 'CustomTokenExchangeOptions': + """Ensure actor_token_type is provided if actor_token is present.""" + if self.actor_token and not self.actor_token_type: + raise ValueError("actor_token_type is required when actor_token is provided") + return self + + +class TokenExchangeResponse(BaseModel): + """ + Response from token exchange operation. + + Attributes: + access_token: The issued access token + token_type: Token type (typically "Bearer") + expires_in: Token lifetime in seconds + scope: Granted scopes (if different from requested) + issued_token_type: Format of issued token + id_token: OpenID Connect ID token (optional) + refresh_token: Refresh token (optional) + """ + access_token: str + token_type: str = "Bearer" + expires_in: int + scope: Optional[str] = None + issued_token_type: Optional[str] = None + id_token: Optional[str] = None + refresh_token: Optional[str] = None + + +class LoginWithCustomTokenExchangeOptions(BaseModel): + """ + Options for logging in via custom token exchange. + + Combines token exchange parameters with session management. + """ + subject_token: str = Field(..., min_length=1) + subject_token_type: str = Field(..., min_length=1) + audience: Optional[str] = None + scope: Optional[str] = None + actor_token: Optional[str] = None + actor_token_type: Optional[str] = None + authorization_params: Optional[dict[str, Any]] = None + + @field_validator('subject_token', 'actor_token') + @classmethod + def validate_token_format(cls, v: Optional[str]) -> Optional[str]: + """Validate token doesn't have Bearer prefix and isn't whitespace-only.""" + if v is not None: + if not v.strip(): + raise ValueError("Token cannot be empty or whitespace-only") + if v.strip().startswith("Bearer "): + raise ValueError("Token should not include 'Bearer ' prefix") + return v + + @model_validator(mode='after') + def validate_actor_token_type(self) -> 'LoginWithCustomTokenExchangeOptions': + """Ensure actor_token_type is provided if actor_token is present.""" + if self.actor_token and not self.actor_token_type: + raise ValueError("actor_token_type is required when actor_token is provided") + return self + + +class LoginWithCustomTokenExchangeResult(BaseModel): + """ + Result from login with custom token exchange. + + Contains session data established after token exchange. + """ + state_data: dict[str, Any] + authorization_details: Optional[list[AuthorizationDetails]] = None diff --git a/src/auth0_server_python/error/__init__.py b/src/auth0_server_python/error/__init__.py index ef181ce..5cc4080 100644 --- a/src/auth0_server_python/error/__init__.py +++ b/src/auth0_server_python/error/__init__.py @@ -153,3 +153,22 @@ class AccessTokenForConnectionErrorCode: FAILED_TO_RETRIEVE = "failed_to_retrieve" API_ERROR = "api_error" FETCH_ERROR = "retrieval_error" + + +class CustomTokenExchangeError(Auth0Error): + """ + Error raised during custom token exchange operations. + """ + def __init__(self, code: str, message: str, cause=None): + super().__init__(message) + self.code = code + self.name = "CustomTokenExchangeError" + self.cause = cause + + +class CustomTokenExchangeErrorCode: + """Error codes for custom token exchange operations.""" + INVALID_TOKEN_FORMAT = "invalid_token_format" + MISSING_ACTOR_TOKEN_TYPE = "missing_actor_token_type" + TOKEN_EXCHANGE_FAILED = "token_exchange_failed" + INVALID_RESPONSE = "invalid_response" From 6c9d01ed088b86382db73f49adbf6d8cf361db23 Mon Sep 17 00:00:00 2001 From: Subhankar Maiti Date: Mon, 2 Feb 2026 13:35:56 +0530 Subject: [PATCH 02/10] feat(auth): Implement custom token exchange methods --- .../auth_server/server_client.py | 226 ++++++++++++++++++ 1 file changed, 226 insertions(+) diff --git a/src/auth0_server_python/auth_server/server_client.py b/src/auth0_server_python/auth_server/server_client.py index c968120..041e818 100644 --- a/src/auth0_server_python/auth_server/server_client.py +++ b/src/auth0_server_python/auth_server/server_client.py @@ -17,10 +17,14 @@ CompleteConnectAccountResponse, ConnectAccountOptions, ConnectAccountRequest, + CustomTokenExchangeOptions, + LoginWithCustomTokenExchangeOptions, + LoginWithCustomTokenExchangeResult, LogoutOptions, LogoutTokenClaims, StartInteractiveLoginOptions, StateData, + TokenExchangeResponse, TokenSet, TransactionData, UserClaims, @@ -32,6 +36,8 @@ AccessTokenForConnectionErrorCode, ApiError, BackchannelLogoutError, + CustomTokenExchangeError, + CustomTokenExchangeErrorCode, MissingRequiredArgumentError, MissingTransactionError, PollingApiError, @@ -1471,3 +1477,223 @@ async def complete_connect_account( await self._transaction_store.delete(transaction_identifier, options=store_options) return response + + async def custom_token_exchange( + self, + options: CustomTokenExchangeOptions + ) -> TokenExchangeResponse: + """ + Exchanges a custom token for Auth0 tokens using RFC 8693. + + This method implements the OAuth 2.0 Token Exchange specification, + allowing you to exchange external custom tokens for Auth0 access tokens. + + Args: + options: Configuration for the token exchange + + Returns: + TokenExchangeResponse containing access_token and metadata + + Raises: + CustomTokenExchangeError: If token exchange fails + MissingRequiredArgumentError: If required parameters are missing + + Example: + ```python + response = await client.custom_token_exchange( + CustomTokenExchangeOptions( + subject_token="custom-token-value", + subject_token_type="urn:acme:mcp-token", + audience="https://api.example.com", + scope="read:data write:data" + ) + ) + print(response.access_token) + ``` + + See: + https://datatracker.ietf.org/doc/html/rfc8693 + """ + try: + # Validate options (Pydantic handles this automatically) + if not isinstance(options, CustomTokenExchangeOptions): + options = CustomTokenExchangeOptions(**options) + + # Ensure we have OIDC metadata + if not hasattr(self._oauth, "metadata") or not self._oauth.metadata: + self._oauth.metadata = await self._fetch_oidc_metadata(self._domain) + + token_endpoint = self._oauth.metadata.get("token_endpoint") + if not token_endpoint: + raise ApiError("configuration_error", "Token endpoint missing in OIDC metadata") + + # Prepare token exchange parameters per RFC 8693 + params = { + "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", + "client_id": self._client_id, + "subject_token": options.subject_token, + "subject_token_type": options.subject_token_type, + } + + # Add optional parameters + if options.audience: + params["audience"] = options.audience + + if options.scope: + params["scope"] = options.scope + + if options.actor_token: + params["actor_token"] = options.actor_token + params["actor_token_type"] = options.actor_token_type + + # Merge additional authorization params + if options.authorization_params: + # Prevent override of critical parameters + forbidden_params = {"grant_type", "client_id", "subject_token", "subject_token_type"} + for key, value in options.authorization_params.items(): + if key not in forbidden_params: + params[key] = value + + # Make the token exchange request + async with httpx.AsyncClient() as client: + response = await client.post( + token_endpoint, + data=params, + auth=(self._client_id, self._client_secret) + ) + + if response.status_code != 200: + error_data = response.json() if response.headers.get( + "content-type", "").startswith("application/json") else {} + raise CustomTokenExchangeError( + error_data.get("error", CustomTokenExchangeErrorCode.TOKEN_EXCHANGE_FAILED), + error_data.get("error_description", f"Token exchange failed: {response.status_code}") + ) + + try: + token_data = response.json() + except json.JSONDecodeError: + raise CustomTokenExchangeError( + CustomTokenExchangeErrorCode.INVALID_RESPONSE, + "Failed to parse token response as JSON" + ) + + # Validate and return response + return TokenExchangeResponse(**token_data) + + except ValidationError as e: + raise CustomTokenExchangeError( + CustomTokenExchangeErrorCode.INVALID_TOKEN_FORMAT, + f"Token validation failed: {str(e)}" + ) + except Exception as e: + if isinstance(e, (CustomTokenExchangeError, ApiError)): + raise + raise CustomTokenExchangeError( + CustomTokenExchangeErrorCode.TOKEN_EXCHANGE_FAILED, + f"Token exchange failed: {str(e)}", + e + ) + + async def login_with_custom_token_exchange( + self, + options: LoginWithCustomTokenExchangeOptions, + store_options: Optional[dict[str, Any]] = None + ) -> LoginWithCustomTokenExchangeResult: + """ + Performs token exchange and establishes a user session. + + This method combines custom_token_exchange() with session management, + exchanging a custom token for Auth0 tokens and storing the session state. + + Args: + options: Configuration for token exchange and login + store_options: Optional options for state store (e.g., request/response for cookies) + + Returns: + LoginWithCustomTokenExchangeResult containing session state + + Raises: + CustomTokenExchangeError: If token exchange fails + ApiError: If session management fails + + Example: + ```python + result = await client.login_with_custom_token_exchange( + LoginWithCustomTokenExchangeOptions( + subject_token="custom-token-value", + subject_token_type="urn:acme:mcp-token", + audience="https://api.example.com" + ), + store_options={"request": request, "response": response} + ) + print(result.state_data["user"]) + ``` + + See: + https://datatracker.ietf.org/doc/html/rfc8693 + """ + try: + # Perform token exchange + exchange_options = CustomTokenExchangeOptions( + subject_token=options.subject_token, + subject_token_type=options.subject_token_type, + audience=options.audience, + scope=options.scope, + actor_token=options.actor_token, + actor_token_type=options.actor_token_type, + authorization_params=options.authorization_params + ) + + token_response = await self.custom_token_exchange(exchange_options) + + # Extract user claims from ID token if present + user_claims = None + sid = PKCE.generate_random_string(32) # Default sid + if token_response.id_token: + claims = jwt.decode(token_response.id_token, options={"verify_signature": False}) + user_claims = UserClaims.parse_obj(claims) + # Extract sid from token if available + sid = claims.get("sid", sid) + + # Determine audience for token set + audience = options.audience or self.DEFAULT_AUDIENCE_STATE_KEY + + # Build token set + token_set = TokenSet( + audience=audience, + access_token=token_response.access_token, + scope=token_response.scope or options.scope or "", + expires_at=int(time.time()) + token_response.expires_in + ) + + # Construct state data + state_data = StateData( + user=user_claims, + id_token=token_response.id_token, + refresh_token=token_response.refresh_token, + token_sets=[token_set], + internal={ + "sid": sid, + "created_at": int(time.time()) + } + ) + + # Store session + await self._state_store.set(self._state_identifier, state_data, options=store_options) + + # Build result + result = LoginWithCustomTokenExchangeResult( + state_data=state_data.dict() + ) + + return result + + except Exception as e: + if isinstance(e, (CustomTokenExchangeError, ApiError)): + raise + raise CustomTokenExchangeError( + CustomTokenExchangeErrorCode.TOKEN_EXCHANGE_FAILED, + f"Login with custom token exchange failed: {str(e)}", + e + ) From 73cdd95014b30731618fdb1b70bcc4f9261b59ef Mon Sep 17 00:00:00 2001 From: Subhankar Maiti Date: Mon, 2 Feb 2026 13:36:23 +0530 Subject: [PATCH 03/10] test(auth): Add comprehensive tests for custom token exchange --- .../tests/test_server_client.py | 628 ++++++++++++++++++ 1 file changed, 628 insertions(+) diff --git a/src/auth0_server_python/tests/test_server_client.py b/src/auth0_server_python/tests/test_server_client.py index 9f4f2cd..764540c 100644 --- a/src/auth0_server_python/tests/test_server_client.py +++ b/src/auth0_server_python/tests/test_server_client.py @@ -12,6 +12,8 @@ ConnectAccountRequest, ConnectAccountResponse, ConnectParams, + CustomTokenExchangeOptions, + LoginWithCustomTokenExchangeOptions, LogoutOptions, TransactionData, ) @@ -19,6 +21,8 @@ AccessTokenForConnectionError, ApiError, BackchannelLogoutError, + CustomTokenExchangeError, + CustomTokenExchangeErrorCode, MissingRequiredArgumentError, MissingTransactionError, PollingApiError, @@ -1932,3 +1936,627 @@ async def test_complete_connect_account_no_transactions(mocker): # Assert assert "transaction" in str(exc.value) mock_my_account_client.complete_connect_account.assert_not_awaited() + + +# ============================================================================= +# Custom Token Exchange Tests +# ============================================================================= + +@pytest.mark.asyncio +async def test_custom_token_exchange_success(mocker): + """Test successful token exchange with basic parameters.""" + # Setup + mock_transaction_store = AsyncMock() + mock_state_store = AsyncMock() + + client = ServerClient( + domain="auth0.local", + client_id="", + client_secret="", + state_store=mock_state_store, + transaction_store=mock_transaction_store, + secret="some-secret" + ) + + # Mock OIDC metadata + mocker.patch.object( + client, + "_fetch_oidc_metadata", + return_value={"token_endpoint": "https://auth0.local/oauth/token"} + ) + + # Mock httpx response + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "access_token": "exchanged_access_token", + "token_type": "Bearer", + "expires_in": 3600, + "scope": "read:data", + "issued_token_type": "urn:ietf:params:oauth:token-type:access_token" + } + mock_response.headers.get.return_value = "application/json" + + mock_httpx_client = AsyncMock() + mock_httpx_client.__aenter__.return_value = mock_httpx_client + mock_httpx_client.__aexit__.return_value = None + mock_httpx_client.post.return_value = mock_response + + mocker.patch("httpx.AsyncClient", return_value=mock_httpx_client) + + # Act + options = CustomTokenExchangeOptions( + subject_token="custom-token-123", + subject_token_type="urn:acme:mcp-token", + audience="https://api.example.com", + scope="read:data" + ) + result = await client.custom_token_exchange(options) + + # Assert + assert result.access_token == "exchanged_access_token" + assert result.token_type == "Bearer" + assert result.expires_in == 3600 + assert result.scope == "read:data" + assert result.issued_token_type == "urn:ietf:params:oauth:token-type:access_token" + + # Verify the request was made correctly + mock_httpx_client.post.assert_called_once() + call_args = mock_httpx_client.post.call_args + assert call_args[0][0] == "https://auth0.local/oauth/token" + assert call_args[1]["data"]["grant_type"] == "urn:ietf:params:oauth:grant-type:token-exchange" + assert call_args[1]["data"]["subject_token"] == "custom-token-123" + assert call_args[1]["data"]["subject_token_type"] == "urn:acme:mcp-token" + assert call_args[1]["data"]["audience"] == "https://api.example.com" + assert call_args[1]["data"]["scope"] == "read:data" + + +@pytest.mark.asyncio +async def test_custom_token_exchange_with_actor_token(mocker): + """Test token exchange with actor token (delegation scenario).""" + # Setup + mock_transaction_store = AsyncMock() + mock_state_store = AsyncMock() + + client = ServerClient( + domain="auth0.local", + client_id="", + client_secret="", + state_store=mock_state_store, + transaction_store=mock_transaction_store, + secret="some-secret" + ) + + mocker.patch.object( + client, + "_fetch_oidc_metadata", + return_value={"token_endpoint": "https://auth0.local/oauth/token"} + ) + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "access_token": "delegated_token", + "token_type": "Bearer", + "expires_in": 1800 + } + mock_response.headers.get.return_value = "application/json" + + mock_httpx_client = AsyncMock() + mock_httpx_client.__aenter__.return_value = mock_httpx_client + mock_httpx_client.__aexit__.return_value = None + mock_httpx_client.post.return_value = mock_response + + mocker.patch("httpx.AsyncClient", return_value=mock_httpx_client) + + # Act + options = CustomTokenExchangeOptions( + subject_token="user-token", + subject_token_type="urn:ietf:params:oauth:token-type:access_token", + actor_token="service-token", + actor_token_type="urn:ietf:params:oauth:token-type:access_token" + ) + result = await client.custom_token_exchange(options) + + # Assert + assert result.access_token == "delegated_token" + + # Verify actor params were sent + call_args = mock_httpx_client.post.call_args + assert call_args[1]["data"]["actor_token"] == "service-token" + assert call_args[1]["data"]["actor_token_type"] == "urn:ietf:params:oauth:token-type:access_token" + + +@pytest.mark.asyncio +async def test_custom_token_exchange_empty_token(): + """Test that empty/whitespace tokens are rejected.""" + # Setup + client = ServerClient( + domain="auth0.local", + client_id="", + client_secret="", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="some-secret" + ) + + # Act & Assert - empty token + with pytest.raises(CustomTokenExchangeError) as exc: + await client.custom_token_exchange( + CustomTokenExchangeOptions( + subject_token=" ", + subject_token_type="urn:acme:mcp-token" + ) + ) + assert exc.value.code == CustomTokenExchangeErrorCode.INVALID_TOKEN_FORMAT + assert "empty or whitespace" in str(exc.value).lower() + + +@pytest.mark.asyncio +async def test_custom_token_exchange_bearer_prefix(): + """Test that tokens with 'Bearer ' prefix are rejected.""" + # Setup + client = ServerClient( + domain="auth0.local", + client_id="", + client_secret="", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="some-secret" + ) + + # Act & Assert + with pytest.raises(CustomTokenExchangeError) as exc: + await client.custom_token_exchange( + CustomTokenExchangeOptions( + subject_token="Bearer abc123", + subject_token_type="urn:ietf:params:oauth:token-type:access_token" + ) + ) + assert exc.value.code == CustomTokenExchangeErrorCode.INVALID_TOKEN_FORMAT + assert "Bearer" in str(exc.value) + + +@pytest.mark.asyncio +async def test_custom_token_exchange_missing_actor_token_type(): + """Test that actor_token_type is required when actor_token is provided.""" + # Setup + client = ServerClient( + domain="auth0.local", + client_id="", + client_secret="", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="some-secret" + ) + + # Act & Assert + with pytest.raises(CustomTokenExchangeError) as exc: + await client.custom_token_exchange( + CustomTokenExchangeOptions( + subject_token="token", + subject_token_type="urn:acme:token", + actor_token="actor-token", + actor_token_type=None + ) + ) + assert exc.value.code == CustomTokenExchangeErrorCode.INVALID_TOKEN_FORMAT + assert "actor_token_type" in str(exc.value).lower() + + +@pytest.mark.asyncio +async def test_custom_token_exchange_api_error_400(mocker): + """Test handling of 400 error from Auth0.""" + # Setup + mock_transaction_store = AsyncMock() + mock_state_store = AsyncMock() + + client = ServerClient( + domain="auth0.local", + client_id="", + client_secret="", + state_store=mock_state_store, + transaction_store=mock_transaction_store, + secret="some-secret" + ) + + mocker.patch.object( + client, + "_fetch_oidc_metadata", + return_value={"token_endpoint": "https://auth0.local/oauth/token"} + ) + + # Mock 400 error response + mock_response = MagicMock() + mock_response.status_code = 400 + mock_response.json.return_value = { + "error": "invalid_grant", + "error_description": "Subject token is invalid" + } + mock_response.headers.get.return_value = "application/json" + + mock_httpx_client = AsyncMock() + mock_httpx_client.__aenter__.return_value = mock_httpx_client + mock_httpx_client.__aexit__.return_value = None + mock_httpx_client.post.return_value = mock_response + + mocker.patch("httpx.AsyncClient", return_value=mock_httpx_client) + + # Act & Assert + with pytest.raises(CustomTokenExchangeError) as exc: + await client.custom_token_exchange( + CustomTokenExchangeOptions( + subject_token="invalid-token", + subject_token_type="urn:acme:mcp-token" + ) + ) + assert exc.value.code == "invalid_grant" + assert "Subject token is invalid" in str(exc.value) + + +@pytest.mark.asyncio +async def test_custom_token_exchange_invalid_json_response(mocker): + """Test handling of non-JSON response from token endpoint.""" + # Setup + mock_transaction_store = AsyncMock() + mock_state_store = AsyncMock() + + client = ServerClient( + domain="auth0.local", + client_id="", + client_secret="", + state_store=mock_state_store, + transaction_store=mock_transaction_store, + secret="some-secret" + ) + + mocker.patch.object( + client, + "_fetch_oidc_metadata", + return_value={"token_endpoint": "https://auth0.local/oauth/token"} + ) + + # Mock response with invalid JSON + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.side_effect = json.JSONDecodeError("msg", "doc", 0) + mock_response.headers.get.return_value = "application/json" + + mock_httpx_client = AsyncMock() + mock_httpx_client.__aenter__.return_value = mock_httpx_client + mock_httpx_client.__aexit__.return_value = None + mock_httpx_client.post.return_value = mock_response + + mocker.patch("httpx.AsyncClient", return_value=mock_httpx_client) + + # Act & Assert + with pytest.raises(CustomTokenExchangeError) as exc: + await client.custom_token_exchange( + CustomTokenExchangeOptions( + subject_token="token", + subject_token_type="urn:acme:mcp-token" + ) + ) + assert exc.value.code == CustomTokenExchangeErrorCode.INVALID_RESPONSE + assert "parse" in str(exc.value).lower() + + +@pytest.mark.asyncio +async def test_custom_token_exchange_missing_token_endpoint(mocker): + """Test error when token endpoint is missing from OIDC metadata.""" + # Setup + client = ServerClient( + domain="auth0.local", + client_id="", + client_secret="", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="some-secret" + ) + + # Mock metadata without token_endpoint + mocker.patch.object( + client, + "_fetch_oidc_metadata", + return_value={"authorization_endpoint": "https://auth0.local/authorize"} + ) + + # Act & Assert + with pytest.raises(ApiError) as exc: + await client.custom_token_exchange( + CustomTokenExchangeOptions( + subject_token="token", + subject_token_type="urn:acme:mcp-token" + ) + ) + assert exc.value.code == "configuration_error" + assert "token endpoint" in str(exc.value).lower() + + +@pytest.mark.asyncio +async def test_custom_token_exchange_with_authorization_params(mocker): + """Test that additional authorization_params are passed through.""" + # Setup + mock_transaction_store = AsyncMock() + mock_state_store = AsyncMock() + + client = ServerClient( + domain="auth0.local", + client_id="", + client_secret="", + state_store=mock_state_store, + transaction_store=mock_transaction_store, + secret="some-secret" + ) + + mocker.patch.object( + client, + "_fetch_oidc_metadata", + return_value={"token_endpoint": "https://auth0.local/oauth/token"} + ) + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "access_token": "token", + "token_type": "Bearer", + "expires_in": 3600 + } + mock_response.headers.get.return_value = "application/json" + + mock_httpx_client = AsyncMock() + mock_httpx_client.__aenter__.return_value = mock_httpx_client + mock_httpx_client.__aexit__.return_value = None + mock_httpx_client.post.return_value = mock_response + + mocker.patch("httpx.AsyncClient", return_value=mock_httpx_client) + + # Act + await client.custom_token_exchange( + CustomTokenExchangeOptions( + subject_token="token", + subject_token_type="urn:acme:mcp-token", + authorization_params={"custom_param": "custom_value"} + ) + ) + + # Assert + call_args = mock_httpx_client.post.call_args + assert call_args[1]["data"]["custom_param"] == "custom_value" + + +@pytest.mark.asyncio +async def test_custom_token_exchange_forbidden_params_filtered(mocker): + """Test that forbidden params cannot be overridden.""" + # Setup + mock_transaction_store = AsyncMock() + mock_state_store = AsyncMock() + + client = ServerClient( + domain="auth0.local", + client_id="", + client_secret="", + state_store=mock_state_store, + transaction_store=mock_transaction_store, + secret="some-secret" + ) + + mocker.patch.object( + client, + "_fetch_oidc_metadata", + return_value={"token_endpoint": "https://auth0.local/oauth/token"} + ) + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "access_token": "token", + "token_type": "Bearer", + "expires_in": 3600 + } + mock_response.headers.get.return_value = "application/json" + + mock_httpx_client = AsyncMock() + mock_httpx_client.__aenter__.return_value = mock_httpx_client + mock_httpx_client.__aexit__.return_value = None + mock_httpx_client.post.return_value = mock_response + + mocker.patch("httpx.AsyncClient", return_value=mock_httpx_client) + + # Act + await client.custom_token_exchange( + CustomTokenExchangeOptions( + subject_token="token", + subject_token_type="urn:acme:mcp-token", + authorization_params={ + "grant_type": "malicious_grant", # Should be filtered + "client_id": "malicious_client", # Should be filtered + "allowed_param": "value" # Should be allowed + } + ) + ) + + # Assert + call_args = mock_httpx_client.post.call_args + assert call_args[1]["data"]["grant_type"] == "urn:ietf:params:oauth:grant-type:token-exchange" + assert call_args[1]["data"]["client_id"] == "" + assert call_args[1]["data"]["allowed_param"] == "value" + + +# ============================================================================= +# Login with Custom Token Exchange Tests +# ============================================================================= + +@pytest.mark.asyncio +async def test_login_with_custom_token_exchange_success(mocker): + """Test successful login with custom token exchange.""" + # Setup + mock_transaction_store = AsyncMock() + mock_state_store = AsyncMock() + + client = ServerClient( + domain="auth0.local", + client_id="", + client_secret="", + state_store=mock_state_store, + transaction_store=mock_transaction_store, + secret="some-secret" + ) + + mocker.patch.object( + client, + "_fetch_oidc_metadata", + return_value={"token_endpoint": "https://auth0.local/oauth/token"} + ) + + # Mock token exchange response with ID token + id_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ1c2VyMTIzIiwibmFtZSI6IkpvaG4gRG9lIiwic2lkIjoic2Vzc2lvbjEyMyJ9.fake" + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "access_token": "exchanged_token", + "token_type": "Bearer", + "expires_in": 3600, + "id_token": id_token, + "refresh_token": "refresh_token_123" + } + mock_response.headers.get.return_value = "application/json" + + mock_httpx_client = AsyncMock() + mock_httpx_client.__aenter__.return_value = mock_httpx_client + mock_httpx_client.__aexit__.return_value = None + mock_httpx_client.post.return_value = mock_response + + mocker.patch("httpx.AsyncClient", return_value=mock_httpx_client) + + # Mock JWT decode + mocker.patch("jwt.decode", return_value={ + "sub": "user123", + "name": "John Doe", + "sid": "session123" + }) + + # Act + result = await client.login_with_custom_token_exchange( + LoginWithCustomTokenExchangeOptions( + subject_token="custom-token", + subject_token_type="urn:acme:mcp-token", + audience="https://api.example.com" + ) + ) + + # Assert + assert "state_data" in result.state_data + assert result.state_data["user"]["sub"] == "user123" + assert result.state_data["user"]["name"] == "John Doe" + assert result.state_data["id_token"] == id_token + assert result.state_data["refresh_token"] == "refresh_token_123" + assert len(result.state_data["token_sets"]) == 1 + assert result.state_data["token_sets"][0]["access_token"] == "exchanged_token" + assert result.state_data["internal"]["sid"] == "session123" + + # Verify state was stored + mock_state_store.set.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_login_with_custom_token_exchange_no_id_token(mocker): + """Test login when no ID token is returned.""" + # Setup + mock_transaction_store = AsyncMock() + mock_state_store = AsyncMock() + + client = ServerClient( + domain="auth0.local", + client_id="", + client_secret="", + state_store=mock_state_store, + transaction_store=mock_transaction_store, + secret="some-secret" + ) + + mocker.patch.object( + client, + "_fetch_oidc_metadata", + return_value={"token_endpoint": "https://auth0.local/oauth/token"} + ) + + # Mock token exchange response without ID token + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "access_token": "exchanged_token", + "token_type": "Bearer", + "expires_in": 3600 + } + mock_response.headers.get.return_value = "application/json" + + mock_httpx_client = AsyncMock() + mock_httpx_client.__aenter__.return_value = mock_httpx_client + mock_httpx_client.__aexit__.return_value = None + mock_httpx_client.post.return_value = mock_response + + mocker.patch("httpx.AsyncClient", return_value=mock_httpx_client) + + # Act + result = await client.login_with_custom_token_exchange( + LoginWithCustomTokenExchangeOptions( + subject_token="custom-token", + subject_token_type="urn:acme:mcp-token" + ) + ) + + # Assert - user should be None, but session should be created + assert result.state_data["user"] is None + assert result.state_data["id_token"] is None + assert len(result.state_data["token_sets"]) == 1 + assert "sid" in result.state_data["internal"] + + # Verify state was stored + mock_state_store.set.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_login_with_custom_token_exchange_failure_propagates(mocker): + """Test that token exchange failures are propagated.""" + # Setup + client = ServerClient( + domain="auth0.local", + client_id="", + client_secret="", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="some-secret" + ) + + mocker.patch.object( + client, + "_fetch_oidc_metadata", + return_value={"token_endpoint": "https://auth0.local/oauth/token"} + ) + + # Mock 401 error + mock_response = MagicMock() + mock_response.status_code = 401 + mock_response.json.return_value = { + "error": "unauthorized", + "error_description": "Invalid credentials" + } + mock_response.headers.get.return_value = "application/json" + + mock_httpx_client = AsyncMock() + mock_httpx_client.__aenter__.return_value = mock_httpx_client + mock_httpx_client.__aexit__.return_value = None + mock_httpx_client.post.return_value = mock_response + + mocker.patch("httpx.AsyncClient", return_value=mock_httpx_client) + + # Act & Assert + with pytest.raises(CustomTokenExchangeError) as exc: + await client.login_with_custom_token_exchange( + LoginWithCustomTokenExchangeOptions( + subject_token="invalid-token", + subject_token_type="urn:acme:mcp-token" + ) + ) + assert exc.value.code == "unauthorized" From 8fcf8177df57263fbe19451eed45b00e05ca3d48 Mon Sep 17 00:00:00 2001 From: Subhankar Maiti Date: Mon, 2 Feb 2026 13:37:23 +0530 Subject: [PATCH 04/10] docs(auth): Add examples for custom token exchange in README and new documentation file --- README.md | 41 ++++ examples/CustomTokenExchange.md | 378 ++++++++++++++++++++++++++++++++ 2 files changed, 419 insertions(+) create mode 100644 examples/CustomTokenExchange.md diff --git a/README.md b/README.md index 9d26263..f81b537 100644 --- a/README.md +++ b/README.md @@ -104,6 +104,47 @@ async def callback(request: Request): return RedirectResponse(url="/") ``` +### 4. Login with Custom Token Exchange + +If you're migrating from a legacy authentication system or integrating with a custom identity provider, you can exchange external tokens for Auth0 tokens using the OAuth 2.0 Token Exchange specification (RFC 8693): + +```python +from auth0_server_python.auth_types import LoginWithCustomTokenExchangeOptions + +# Exchange a custom token and establish a session +result = await auth0.login_with_custom_token_exchange( + LoginWithCustomTokenExchangeOptions( + subject_token="your-custom-token", + subject_token_type="urn:acme:mcp-token", + audience="https://api.example.com" + ), + store_options={"request": request, "response": response} +) + +# Access the user session +user = result.state_data["user"] +``` + +For advanced token exchange scenarios (without creating a session), use `custom_token_exchange()` directly: + +```python +from auth0_server_python.auth_types import CustomTokenExchangeOptions + +# Exchange a custom token for Auth0 tokens +response = await auth0.custom_token_exchange( + CustomTokenExchangeOptions( + subject_token="your-custom-token", + subject_token_type="urn:acme:mcp-token", + audience="https://api.example.com", + scope="read:data write:data" + ) +) + +print(response.access_token) +``` + +For more details and examples, see [examples/CustomTokenExchange.md](examples/CustomTokenExchange.md). + ## Feedback ### Contributing diff --git a/examples/CustomTokenExchange.md b/examples/CustomTokenExchange.md new file mode 100644 index 0000000..c15f670 --- /dev/null +++ b/examples/CustomTokenExchange.md @@ -0,0 +1,378 @@ +# Custom Token Exchange + +Custom token exchange allows you to exchange tokens from external identity providers or legacy authentication systems for Auth0 tokens. This is implemented according to **OAuth 2.0 Token Exchange** ([RFC 8693](https://datatracker.ietf.org/doc/html/rfc8693)). + +## Table of Contents + +- [Basic Token Exchange](#basic-token-exchange) +- [Login with Token Exchange](#login-with-token-exchange) +- [Advanced Scenarios](#advanced-scenarios) + - [Actor Tokens (Delegation)](#actor-tokens-delegation) + - [Custom Authorization Parameters](#custom-authorization-parameters) +- [Error Handling](#error-handling) +- [Security Considerations](#security-considerations) + +--- + +## Basic Token Exchange + +The `custom_token_exchange()` method exchanges a custom token for Auth0 tokens without creating a user session. This is useful when you need to obtain Auth0 tokens programmatically. + +```python +from auth0_server_python.auth_server.server_client import ServerClient +from auth0_server_python.auth_types import CustomTokenExchangeOptions + +# Initialize the client +auth0 = ServerClient( + domain="", + client_id="", + client_secret="", + secret="" +) + +# Exchange a custom token +response = await auth0.custom_token_exchange( + CustomTokenExchangeOptions( + subject_token="custom-token-from-external-system", + subject_token_type="urn:acme:mcp-token", # Your custom token type + audience="https://api.example.com", + scope="read:data write:data" + ) +) + +# Use the exchanged Auth0 tokens +print(f"Access Token: {response.access_token}") +print(f"Expires In: {response.expires_in} seconds") +print(f"Scope: {response.scope}") + +# If an ID token is returned +if response.id_token: + print(f"ID Token: {response.id_token}") + +# If a refresh token is returned +if response.refresh_token: + print(f"Refresh Token: {response.refresh_token}") +``` + +### Parameters + +- **`subject_token`** (required): The token being exchanged from your external system +- **`subject_token_type`** (required): A URN identifying the token format (e.g., `urn:acme:mcp-token`, `urn:ietf:params:oauth:token-type:jwt`) +- **`audience`** (optional): Target API or service identifier +- **`scope`** (optional): Space-delimited list of OAuth scopes +- **`actor_token`** (optional): Token representing the acting party (for delegation scenarios) +- **`actor_token_type`** (optional): Type of actor token (required if `actor_token` is provided) +- **`authorization_params`** (optional): Additional OAuth parameters + +--- + +## Login with Token Exchange + +The `login_with_custom_token_exchange()` method combines token exchange with session management, establishing a logged-in user session after exchanging the token. + +```python +from auth0_server_python.auth_types import LoginWithCustomTokenExchangeOptions +from fastapi import FastAPI, Request, Response +from starlette.responses import RedirectResponse + +app = FastAPI() + +@app.get("/auth/exchange-login") +async def exchange_login(request: Request, response: Response): + # Exchange custom token and create user session + result = await auth0.login_with_custom_token_exchange( + LoginWithCustomTokenExchangeOptions( + subject_token="custom-token-from-external-system", + subject_token_type="urn:acme:mcp-token", + audience="https://api.example.com" + ), + store_options={"request": request, "response": response} + ) + + # Session is now established + user = result.state_data["user"] + print(f"User logged in: {user['sub']}") + + # Redirect to dashboard + return RedirectResponse(url="/dashboard") +``` + +### Use Cases + +- **Migration from Legacy Systems**: Exchange tokens during migration to Auth0 +- **Partner SSO**: Accept tokens from partner identity providers +- **Custom Authentication Flows**: Integrate proprietary authentication mechanisms + +--- + +## Advanced Scenarios + +### Actor Tokens (Delegation) + +Actor tokens enable delegation scenarios where one service acts on behalf of a user. + +```python +from auth0_server_python.auth_types import CustomTokenExchangeOptions + +# Service-to-service delegation +response = await auth0.custom_token_exchange( + CustomTokenExchangeOptions( + subject_token="user-access-token", + subject_token_type="urn:ietf:params:oauth:token-type:access_token", + actor_token="service-access-token", + actor_token_type="urn:ietf:params:oauth:token-type:access_token", + audience="https://api.example.com" + ) +) + +# The returned token represents the service acting on behalf of the user +``` + +### Custom Authorization Parameters + +You can pass additional OAuth parameters using `authorization_params`: + +```python +response = await auth0.custom_token_exchange( + CustomTokenExchangeOptions( + subject_token="custom-token", + subject_token_type="urn:acme:mcp-token", + audience="https://api.example.com", + authorization_params={ + "custom_field": "custom_value", + "organization": "org_123" + } + ) +) +``` + +> **Note**: Critical parameters (`grant_type`, `client_id`, `subject_token`, `subject_token_type`) cannot be overridden via `authorization_params` for security reasons. + +--- + +## Error Handling + +The SDK provides specific error types for token exchange failures: + +```python +from auth0_server_python.error import ( + CustomTokenExchangeError, + CustomTokenExchangeErrorCode +) + +try: + response = await auth0.custom_token_exchange( + CustomTokenExchangeOptions( + subject_token="invalid-token", + subject_token_type="urn:acme:mcp-token" + ) + ) +except CustomTokenExchangeError as e: + if e.code == CustomTokenExchangeErrorCode.INVALID_TOKEN_FORMAT: + print(f"Token format error: {e.message}") + elif e.code == CustomTokenExchangeErrorCode.TOKEN_EXCHANGE_FAILED: + print(f"Exchange failed: {e.message}") + elif e.code == CustomTokenExchangeErrorCode.INVALID_RESPONSE: + print(f"Invalid response from Auth0: {e.message}") + else: + print(f"Error: {e.code} - {e.message}") +``` + +### Common Error Codes + +| Error Code | Description | +|------------|-------------| +| `INVALID_TOKEN_FORMAT` | Token is empty, whitespace-only, or has "Bearer " prefix | +| `MISSING_ACTOR_TOKEN_TYPE` | `actor_token` provided without `actor_token_type` | +| `TOKEN_EXCHANGE_FAILED` | General token exchange failure (check `message` for details) | +| `INVALID_RESPONSE` | Auth0 returned a non-JSON response | + +### Validation Errors + +The SDK validates tokens before sending them to Auth0: + +```python +# ❌ These will fail validation: +CustomTokenExchangeOptions( + subject_token=" ", # Empty/whitespace + subject_token_type="urn:acme:token" +) + +CustomTokenExchangeOptions( + subject_token="Bearer abc123", # Has "Bearer " prefix + subject_token_type="urn:ietf:params:oauth:token-type:access_token" +) + +CustomTokenExchangeOptions( + subject_token="token", + subject_token_type="urn:acme:token", + actor_token="actor-token", + actor_token_type=None # Missing actor_token_type +) +``` + +--- + +## Security Considerations + +### 1. Token Format + +- **DO NOT** include the "Bearer " prefix in tokens +- Tokens should be sent exactly as received from the external system +- The SDK will validate token format before sending to Auth0 + +### 2. Subject Token Types + +Use standard URNs when possible: +- `urn:ietf:params:oauth:token-type:jwt` - JWT tokens +- `urn:ietf:params:oauth:token-type:access_token` - OAuth access tokens +- `urn:ietf:params:oauth:token-type:refresh_token` - OAuth refresh tokens +- `urn:ietf:params:oauth:token-type:id_token` - OpenID Connect ID tokens +- `urn:ietf:params:oauth:token-type:saml2` - SAML 2.0 assertions + +For custom token formats, use your own namespace: +- `urn:acme:mcp-token` +- `urn:example:legacy-token` + +### 3. Scope Limitation + +Request only the minimum required scopes: + +```python +# ✅ Good - minimal scopes +response = await auth0.custom_token_exchange( + CustomTokenExchangeOptions( + subject_token="token", + subject_token_type="urn:acme:mcp-token", + scope="read:profile" # Only what's needed + ) +) + +# ⚠️ Avoid - overly broad scopes +response = await auth0.custom_token_exchange( + CustomTokenExchangeOptions( + subject_token="token", + subject_token_type="urn:acme:mcp-token", + scope="openid profile email offline_access admin:all" # Too much + ) +) +``` + +### 4. HTTPS/TLS + +Token exchange MUST occur over HTTPS. The SDK enforces this by: +- Using the Auth0 tenant domain (always HTTPS) +- Validating OIDC metadata endpoints + +### 5. Client Authentication + +The SDK uses **client_secret_post** authentication method: +- Client credentials are sent in the POST body +- Credentials are protected by TLS encryption +- Never expose `client_secret` in client-side code + +--- + +## Complete Example + +Here's a complete FastAPI example integrating custom token exchange: + +```python +from fastapi import FastAPI, Request, Response, HTTPException +from starlette.responses import RedirectResponse +from auth0_server_python.auth_server.server_client import ServerClient +from auth0_server_python.auth_types import ( + LoginWithCustomTokenExchangeOptions, + CustomTokenExchangeOptions +) +from auth0_server_python.error import CustomTokenExchangeError + +app = FastAPI() + +# Initialize Auth0 client +auth0 = ServerClient( + domain="your-tenant.auth0.com", + client_id="your-client-id", + client_secret="your-client-secret", + secret="your-encryption-secret" +) + +@app.post("/auth/migrate-user") +async def migrate_user(request: Request, response: Response): + """ + Migrate a user from legacy system to Auth0 using token exchange. + """ + # Get legacy token from request + legacy_token = request.headers.get("X-Legacy-Token") + if not legacy_token: + raise HTTPException(status_code=400, detail="Missing legacy token") + + try: + # Exchange legacy token for Auth0 tokens and create session + result = await auth0.login_with_custom_token_exchange( + LoginWithCustomTokenExchangeOptions( + subject_token=legacy_token, + subject_token_type="urn:company:legacy-token", + audience="https://api.company.com" + ), + store_options={"request": request, "response": response} + ) + + # User is now logged in with Auth0 + user = result.state_data["user"] + return { + "status": "success", + "user_id": user["sub"], + "message": "User migrated successfully" + } + + except CustomTokenExchangeError as e: + raise HTTPException( + status_code=400, + detail=f"Token exchange failed: {e.message}" + ) + +@app.get("/api/service-call") +async def service_call(request: Request): + """ + Service-to-service call using actor tokens (delegation). + """ + # Get user's access token + user_token = request.headers.get("Authorization", "").replace("Bearer ", "") + + # Get service's access token + service_token = "" # From config/environment + + try: + # Exchange with delegation + response = await auth0.custom_token_exchange( + CustomTokenExchangeOptions( + subject_token=user_token, + subject_token_type="urn:ietf:params:oauth:token-type:access_token", + actor_token=service_token, + actor_token_type="urn:ietf:params:oauth:token-type:access_token", + audience="https://downstream-api.company.com" + ) + ) + + # Use delegated token for downstream API call + return { + "delegated_token": response.access_token, + "expires_in": response.expires_in + } + + except CustomTokenExchangeError as e: + raise HTTPException( + status_code=500, + detail=f"Delegation failed: {e.message}" + ) +``` + +--- + +## Additional Resources + +- [RFC 8693: OAuth 2.0 Token Exchange](https://datatracker.ietf.org/doc/html/rfc8693) +- [Auth0 Documentation](https://auth0.com/docs) +- [Interactive Login Examples](./InteractiveLogin.md) +- [Connected Accounts](./ConnectedAccounts.md) From 5db9b01d383f4a44ff6aa270a701387da97e2c8d Mon Sep 17 00:00:00 2001 From: Subhankar Maiti Date: Mon, 2 Feb 2026 13:53:30 +0530 Subject: [PATCH 05/10] docs(auth): Update Custom Token Exchange documentation for clarity and structure --- examples/CustomTokenExchange.md | 334 +++++--------------------------- 1 file changed, 50 insertions(+), 284 deletions(-) diff --git a/examples/CustomTokenExchange.md b/examples/CustomTokenExchange.md index c15f670..8a345a7 100644 --- a/examples/CustomTokenExchange.md +++ b/examples/CustomTokenExchange.md @@ -1,22 +1,12 @@ # Custom Token Exchange -Custom token exchange allows you to exchange tokens from external identity providers or legacy authentication systems for Auth0 tokens. This is implemented according to **OAuth 2.0 Token Exchange** ([RFC 8693](https://datatracker.ietf.org/doc/html/rfc8693)). +Custom Token Exchange allows you to exchange tokens from external identity providers or legacy authentication systems for Auth0 tokens without browser redirects. This implements **OAuth 2.0 Token Exchange** ([RFC 8693](https://datatracker.ietf.org/doc/html/rfc8693)). -## Table of Contents +> **NOTE**: For complete documentation on Custom Token Exchange, configuration requirements, and detailed use cases, see the [official Auth0 documentation](https://auth0.com/docs/authenticate/custom-token-exchange). -- [Basic Token Exchange](#basic-token-exchange) -- [Login with Token Exchange](#login-with-token-exchange) -- [Advanced Scenarios](#advanced-scenarios) - - [Actor Tokens (Delegation)](#actor-tokens-delegation) - - [Custom Authorization Parameters](#custom-authorization-parameters) -- [Error Handling](#error-handling) -- [Security Considerations](#security-considerations) +## 1. Basic Token Exchange ---- - -## Basic Token Exchange - -The `custom_token_exchange()` method exchanges a custom token for Auth0 tokens without creating a user session. This is useful when you need to obtain Auth0 tokens programmatically. +Exchange a custom token for Auth0 tokens without creating a user session. ```python from auth0_server_python.auth_server.server_client import ServerClient @@ -34,87 +24,50 @@ auth0 = ServerClient( response = await auth0.custom_token_exchange( CustomTokenExchangeOptions( subject_token="custom-token-from-external-system", - subject_token_type="urn:acme:mcp-token", # Your custom token type + subject_token_type="urn:acme:mcp-token", audience="https://api.example.com", scope="read:data write:data" ) ) -# Use the exchanged Auth0 tokens +# Access the exchanged tokens print(f"Access Token: {response.access_token}") print(f"Expires In: {response.expires_in} seconds") -print(f"Scope: {response.scope}") - -# If an ID token is returned if response.id_token: print(f"ID Token: {response.id_token}") - -# If a refresh token is returned -if response.refresh_token: - print(f"Refresh Token: {response.refresh_token}") ``` -### Parameters - -- **`subject_token`** (required): The token being exchanged from your external system -- **`subject_token_type`** (required): A URN identifying the token format (e.g., `urn:acme:mcp-token`, `urn:ietf:params:oauth:token-type:jwt`) -- **`audience`** (optional): Target API or service identifier -- **`scope`** (optional): Space-delimited list of OAuth scopes -- **`actor_token`** (optional): Token representing the acting party (for delegation scenarios) -- **`actor_token_type`** (optional): Type of actor token (required if `actor_token` is provided) -- **`authorization_params`** (optional): Additional OAuth parameters +## 2. Login with Token Exchange ---- - -## Login with Token Exchange - -The `login_with_custom_token_exchange()` method combines token exchange with session management, establishing a logged-in user session after exchanging the token. +Exchange a custom token AND establish a user session. ```python from auth0_server_python.auth_types import LoginWithCustomTokenExchangeOptions -from fastapi import FastAPI, Request, Response -from starlette.responses import RedirectResponse - -app = FastAPI() - -@app.get("/auth/exchange-login") -async def exchange_login(request: Request, response: Response): - # Exchange custom token and create user session - result = await auth0.login_with_custom_token_exchange( - LoginWithCustomTokenExchangeOptions( - subject_token="custom-token-from-external-system", - subject_token_type="urn:acme:mcp-token", - audience="https://api.example.com" - ), - store_options={"request": request, "response": response} - ) +from fastapi import Request, Response - # Session is now established - user = result.state_data["user"] - print(f"User logged in: {user['sub']}") +# Exchange token and create session +result = await auth0.login_with_custom_token_exchange( + LoginWithCustomTokenExchangeOptions( + subject_token="custom-token-from-external-system", + subject_token_type="urn:acme:mcp-token", + audience="https://api.example.com" + ), + store_options={"request": request, "response": response} +) - # Redirect to dashboard - return RedirectResponse(url="/dashboard") +# User is now logged in +user = result.state_data["user"] +print(f"User logged in: {user['sub']}") ``` -### Use Cases - -- **Migration from Legacy Systems**: Exchange tokens during migration to Auth0 -- **Partner SSO**: Accept tokens from partner identity providers -- **Custom Authentication Flows**: Integrate proprietary authentication mechanisms - ---- - -## Advanced Scenarios +> **TIP**: Use `login_with_custom_token_exchange()` when you need both token exchange and session management (e.g., user migration flows). Use `custom_token_exchange()` for pure token exchange scenarios (e.g., service-to-service authentication). -### Actor Tokens (Delegation) +## 3. Actor Tokens (Delegation) -Actor tokens enable delegation scenarios where one service acts on behalf of a user. +Enable delegation scenarios where one service acts on behalf of a user. ```python -from auth0_server_python.auth_types import CustomTokenExchangeOptions - -# Service-to-service delegation +# Service acting on behalf of a user response = await auth0.custom_token_exchange( CustomTokenExchangeOptions( subject_token="user-access-token", @@ -124,13 +77,11 @@ response = await auth0.custom_token_exchange( audience="https://api.example.com" ) ) - -# The returned token represents the service acting on behalf of the user ``` -### Custom Authorization Parameters +## 4. Custom Authorization Parameters -You can pass additional OAuth parameters using `authorization_params`: +Pass additional parameters to the token endpoint. ```python response = await auth0.custom_token_exchange( @@ -139,240 +90,55 @@ response = await auth0.custom_token_exchange( subject_token_type="urn:acme:mcp-token", audience="https://api.example.com", authorization_params={ - "custom_field": "custom_value", - "organization": "org_123" + "organization": "org_123", + "custom_field": "custom_value" } ) ) ``` -> **Note**: Critical parameters (`grant_type`, `client_id`, `subject_token`, `subject_token_type`) cannot be overridden via `authorization_params` for security reasons. - ---- - -## Error Handling +> **NOTE**: Critical parameters (`grant_type`, `client_id`, `subject_token`, `subject_token_type`) cannot be overridden via `authorization_params` for security reasons. -The SDK provides specific error types for token exchange failures: +## 5. Error Handling ```python -from auth0_server_python.error import ( - CustomTokenExchangeError, - CustomTokenExchangeErrorCode -) +from auth0_server_python.error import CustomTokenExchangeError try: response = await auth0.custom_token_exchange( CustomTokenExchangeOptions( - subject_token="invalid-token", + subject_token="token", subject_token_type="urn:acme:mcp-token" ) ) except CustomTokenExchangeError as e: - if e.code == CustomTokenExchangeErrorCode.INVALID_TOKEN_FORMAT: - print(f"Token format error: {e.message}") - elif e.code == CustomTokenExchangeErrorCode.TOKEN_EXCHANGE_FAILED: - print(f"Exchange failed: {e.message}") - elif e.code == CustomTokenExchangeErrorCode.INVALID_RESPONSE: - print(f"Invalid response from Auth0: {e.message}") - else: - print(f"Error: {e.code} - {e.message}") + print(f"Exchange failed: {e.code} - {e.message}") ``` ### Common Error Codes -| Error Code | Description | -|------------|-------------| -| `INVALID_TOKEN_FORMAT` | Token is empty, whitespace-only, or has "Bearer " prefix | -| `MISSING_ACTOR_TOKEN_TYPE` | `actor_token` provided without `actor_token_type` | -| `TOKEN_EXCHANGE_FAILED` | General token exchange failure (check `message` for details) | -| `INVALID_RESPONSE` | Auth0 returned a non-JSON response | - -### Validation Errors - -The SDK validates tokens before sending them to Auth0: - -```python -# ❌ These will fail validation: -CustomTokenExchangeOptions( - subject_token=" ", # Empty/whitespace - subject_token_type="urn:acme:token" -) - -CustomTokenExchangeOptions( - subject_token="Bearer abc123", # Has "Bearer " prefix - subject_token_type="urn:ietf:params:oauth:token-type:access_token" -) - -CustomTokenExchangeOptions( - subject_token="token", - subject_token_type="urn:acme:token", - actor_token="actor-token", - actor_token_type=None # Missing actor_token_type -) -``` - ---- - -## Security Considerations - -### 1. Token Format +- `INVALID_TOKEN_FORMAT`: Token is empty, whitespace-only, or has "Bearer " prefix +- `MISSING_ACTOR_TOKEN_TYPE`: `actor_token` provided without `actor_token_type` +- `TOKEN_EXCHANGE_FAILED`: General token exchange failure +- `INVALID_RESPONSE`: Auth0 returned a non-JSON response -- **DO NOT** include the "Bearer " prefix in tokens -- Tokens should be sent exactly as received from the external system -- The SDK will validate token format before sending to Auth0 - -### 2. Subject Token Types +## 6. Token Type URIs Use standard URNs when possible: -- `urn:ietf:params:oauth:token-type:jwt` - JWT tokens -- `urn:ietf:params:oauth:token-type:access_token` - OAuth access tokens -- `urn:ietf:params:oauth:token-type:refresh_token` - OAuth refresh tokens -- `urn:ietf:params:oauth:token-type:id_token` - OpenID Connect ID tokens -- `urn:ietf:params:oauth:token-type:saml2` - SAML 2.0 assertions - -For custom token formats, use your own namespace: -- `urn:acme:mcp-token` -- `urn:example:legacy-token` - -### 3. Scope Limitation - -Request only the minimum required scopes: ```python -# ✅ Good - minimal scopes -response = await auth0.custom_token_exchange( - CustomTokenExchangeOptions( - subject_token="token", - subject_token_type="urn:acme:mcp-token", - scope="read:profile" # Only what's needed - ) -) - -# ⚠️ Avoid - overly broad scopes -response = await auth0.custom_token_exchange( - CustomTokenExchangeOptions( - subject_token="token", - subject_token_type="urn:acme:mcp-token", - scope="openid profile email offline_access admin:all" # Too much - ) -) -``` - -### 4. HTTPS/TLS - -Token exchange MUST occur over HTTPS. The SDK enforces this by: -- Using the Auth0 tenant domain (always HTTPS) -- Validating OIDC metadata endpoints - -### 5. Client Authentication - -The SDK uses **client_secret_post** authentication method: -- Client credentials are sent in the POST body -- Credentials are protected by TLS encryption -- Never expose `client_secret` in client-side code - ---- - -## Complete Example - -Here's a complete FastAPI example integrating custom token exchange: - -```python -from fastapi import FastAPI, Request, Response, HTTPException -from starlette.responses import RedirectResponse -from auth0_server_python.auth_server.server_client import ServerClient -from auth0_server_python.auth_types import ( - LoginWithCustomTokenExchangeOptions, - CustomTokenExchangeOptions -) -from auth0_server_python.error import CustomTokenExchangeError - -app = FastAPI() - -# Initialize Auth0 client -auth0 = ServerClient( - domain="your-tenant.auth0.com", - client_id="your-client-id", - client_secret="your-client-secret", - secret="your-encryption-secret" -) - -@app.post("/auth/migrate-user") -async def migrate_user(request: Request, response: Response): - """ - Migrate a user from legacy system to Auth0 using token exchange. - """ - # Get legacy token from request - legacy_token = request.headers.get("X-Legacy-Token") - if not legacy_token: - raise HTTPException(status_code=400, detail="Missing legacy token") - - try: - # Exchange legacy token for Auth0 tokens and create session - result = await auth0.login_with_custom_token_exchange( - LoginWithCustomTokenExchangeOptions( - subject_token=legacy_token, - subject_token_type="urn:company:legacy-token", - audience="https://api.company.com" - ), - store_options={"request": request, "response": response} - ) - - # User is now logged in with Auth0 - user = result.state_data["user"] - return { - "status": "success", - "user_id": user["sub"], - "message": "User migrated successfully" - } - - except CustomTokenExchangeError as e: - raise HTTPException( - status_code=400, - detail=f"Token exchange failed: {e.message}" - ) - -@app.get("/api/service-call") -async def service_call(request: Request): - """ - Service-to-service call using actor tokens (delegation). - """ - # Get user's access token - user_token = request.headers.get("Authorization", "").replace("Bearer ", "") - - # Get service's access token - service_token = "" # From config/environment - - try: - # Exchange with delegation - response = await auth0.custom_token_exchange( - CustomTokenExchangeOptions( - subject_token=user_token, - subject_token_type="urn:ietf:params:oauth:token-type:access_token", - actor_token=service_token, - actor_token_type="urn:ietf:params:oauth:token-type:access_token", - audience="https://downstream-api.company.com" - ) - ) - - # Use delegated token for downstream API call - return { - "delegated_token": response.access_token, - "expires_in": response.expires_in - } - - except CustomTokenExchangeError as e: - raise HTTPException( - status_code=500, - detail=f"Delegation failed: {e.message}" - ) +# Standard token types +"urn:ietf:params:oauth:token-type:jwt" # JWT tokens +"urn:ietf:params:oauth:token-type:access_token" # OAuth access tokens +"urn:ietf:params:oauth:token-type:id_token" # OpenID Connect ID tokens +"urn:ietf:params:oauth:token-type:refresh_token" # OAuth refresh tokens + +# Custom token types (use your own namespace) +"urn:acme:mcp-token" +"urn:company:legacy-token" ``` ---- - ## Additional Resources -- [RFC 8693: OAuth 2.0 Token Exchange](https://datatracker.ietf.org/doc/html/rfc8693) -- [Auth0 Documentation](https://auth0.com/docs) -- [Interactive Login Examples](./InteractiveLogin.md) -- [Connected Accounts](./ConnectedAccounts.md) +- [Auth0 Custom Token Exchange Documentation](https://auth0.com/docs/authenticate/custom-token-exchange) +- [RFC 8693 - OAuth 2.0 Token Exchange](https://datatracker.ietf.org/doc/html/rfc8693) From bda9a1e4acaf49c073dc356c5daf54a0ea8c70ca Mon Sep 17 00:00:00 2001 From: Subhankar Maiti Date: Mon, 2 Feb 2026 16:09:13 +0530 Subject: [PATCH 06/10] feat(auth): Add organization support to custom token exchange options and update related documentation --- examples/CustomTokenExchange.md | 20 +++++-- .../auth_server/server_client.py | 3 ++ .../auth_types/__init__.py | 3 ++ .../tests/test_server_client.py | 54 +++++++++++++++++++ 4 files changed, 77 insertions(+), 3 deletions(-) diff --git a/examples/CustomTokenExchange.md b/examples/CustomTokenExchange.md index 8a345a7..c49dced 100644 --- a/examples/CustomTokenExchange.md +++ b/examples/CustomTokenExchange.md @@ -90,7 +90,6 @@ response = await auth0.custom_token_exchange( subject_token_type="urn:acme:mcp-token", audience="https://api.example.com", authorization_params={ - "organization": "org_123", "custom_field": "custom_value" } ) @@ -99,7 +98,22 @@ response = await auth0.custom_token_exchange( > **NOTE**: Critical parameters (`grant_type`, `client_id`, `subject_token`, `subject_token_type`) cannot be overridden via `authorization_params` for security reasons. -## 5. Error Handling +## 5. Organization Support + +Specify an organization when exchanging tokens. + +```python +response = await auth0.custom_token_exchange( + CustomTokenExchangeOptions( + subject_token="custom-token", + subject_token_type="urn:acme:mcp-token", + audience="https://api.example.com", + organization="org_abc1234" + ) +) +``` + +## 6. Error Handling ```python from auth0_server_python.error import CustomTokenExchangeError @@ -122,7 +136,7 @@ except CustomTokenExchangeError as e: - `TOKEN_EXCHANGE_FAILED`: General token exchange failure - `INVALID_RESPONSE`: Auth0 returned a non-JSON response -## 6. Token Type URIs +## 7. Token Type URIs Use standard URNs when possible: diff --git a/src/auth0_server_python/auth_server/server_client.py b/src/auth0_server_python/auth_server/server_client.py index 041e818..406d991 100644 --- a/src/auth0_server_python/auth_server/server_client.py +++ b/src/auth0_server_python/auth_server/server_client.py @@ -1546,6 +1546,9 @@ async def custom_token_exchange( params["actor_token"] = options.actor_token params["actor_token_type"] = options.actor_token_type + if options.organization: + params["organization"] = options.organization + # Merge additional authorization params if options.authorization_params: # Prevent override of critical parameters diff --git a/src/auth0_server_python/auth_types/__init__.py b/src/auth0_server_python/auth_types/__init__.py index cd470e0..3823fcd 100644 --- a/src/auth0_server_python/auth_types/__init__.py +++ b/src/auth0_server_python/auth_types/__init__.py @@ -265,6 +265,7 @@ class CustomTokenExchangeOptions(BaseModel): scope: Space-delimited list of scopes (optional) actor_token: Security token representing the acting party (optional) actor_token_type: Type of actor token (required if actor_token present) + organization: Organization identifier for the token exchange (optional) authorization_params: Additional OAuth parameters (optional) """ subject_token: str = Field(..., min_length=1) @@ -273,6 +274,7 @@ class CustomTokenExchangeOptions(BaseModel): scope: Optional[str] = None actor_token: Optional[str] = None actor_token_type: Optional[str] = None + organization: Optional[str] = None authorization_params: Optional[dict[str, Any]] = None @field_validator('subject_token', 'actor_token') @@ -328,6 +330,7 @@ class LoginWithCustomTokenExchangeOptions(BaseModel): scope: Optional[str] = None actor_token: Optional[str] = None actor_token_type: Optional[str] = None + organization: Optional[str] = None authorization_params: Optional[dict[str, Any]] = None @field_validator('subject_token', 'actor_token') diff --git a/src/auth0_server_python/tests/test_server_client.py b/src/auth0_server_python/tests/test_server_client.py index 764540c..4008eea 100644 --- a/src/auth0_server_python/tests/test_server_client.py +++ b/src/auth0_server_python/tests/test_server_client.py @@ -2067,6 +2067,60 @@ async def test_custom_token_exchange_with_actor_token(mocker): assert call_args[1]["data"]["actor_token_type"] == "urn:ietf:params:oauth:token-type:access_token" +@pytest.mark.asyncio +async def test_custom_token_exchange_with_organization(mocker): + """Test token exchange with organization parameter.""" + # Setup + mock_transaction_store = AsyncMock() + mock_state_store = AsyncMock() + + client = ServerClient( + domain="auth0.local", + client_id="", + client_secret="", + state_store=mock_state_store, + transaction_store=mock_transaction_store, + secret="some-secret" + ) + + mocker.patch.object( + client, + "_fetch_oidc_metadata", + return_value={"token_endpoint": "https://auth0.local/oauth/token"} + ) + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "access_token": "org_scoped_token", + "token_type": "Bearer", + "expires_in": 3600 + } + mock_response.headers.get.return_value = "application/json" + + mock_httpx_client = AsyncMock() + mock_httpx_client.__aenter__.return_value = mock_httpx_client + mock_httpx_client.__aexit__.return_value = None + mock_httpx_client.post.return_value = mock_response + + mocker.patch("httpx.AsyncClient", return_value=mock_httpx_client) + + # Act + options = CustomTokenExchangeOptions( + subject_token="custom-token", + subject_token_type="urn:acme:mcp-token", + organization="org_abc1234" + ) + result = await client.custom_token_exchange(options) + + # Assert + assert result.access_token == "org_scoped_token" + + # Verify organization param was sent + call_args = mock_httpx_client.post.call_args + assert call_args[1]["data"]["organization"] == "org_abc1234" + + @pytest.mark.asyncio async def test_custom_token_exchange_empty_token(): """Test that empty/whitespace tokens are rejected.""" From 2c81c5abb8cb70bf61c3f125704dfa983c6742bf Mon Sep 17 00:00:00 2001 From: Subhankar Maiti Date: Mon, 2 Feb 2026 16:16:30 +0530 Subject: [PATCH 07/10] test(auth): Update custom token exchange tests to raise ValidationError for invalid tokens --- src/auth0_server_python/tests/test_server_client.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/auth0_server_python/tests/test_server_client.py b/src/auth0_server_python/tests/test_server_client.py index 4008eea..885de8a 100644 --- a/src/auth0_server_python/tests/test_server_client.py +++ b/src/auth0_server_python/tests/test_server_client.py @@ -4,6 +4,7 @@ from urllib.parse import parse_qs, urlparse import pytest +from pydantic_core import ValidationError from auth0_server_python.auth_server.my_account_client import MyAccountClient from auth0_server_python.auth_server.server_client import ServerClient from auth0_server_python.auth_types import ( @@ -2135,14 +2136,13 @@ async def test_custom_token_exchange_empty_token(): ) # Act & Assert - empty token - with pytest.raises(CustomTokenExchangeError) as exc: + with pytest.raises(ValidationError) as exc: await client.custom_token_exchange( CustomTokenExchangeOptions( subject_token=" ", subject_token_type="urn:acme:mcp-token" ) ) - assert exc.value.code == CustomTokenExchangeErrorCode.INVALID_TOKEN_FORMAT assert "empty or whitespace" in str(exc.value).lower() @@ -2160,14 +2160,13 @@ async def test_custom_token_exchange_bearer_prefix(): ) # Act & Assert - with pytest.raises(CustomTokenExchangeError) as exc: + with pytest.raises(ValidationError) as exc: await client.custom_token_exchange( CustomTokenExchangeOptions( subject_token="Bearer abc123", subject_token_type="urn:ietf:params:oauth:token-type:access_token" ) ) - assert exc.value.code == CustomTokenExchangeErrorCode.INVALID_TOKEN_FORMAT assert "Bearer" in str(exc.value) @@ -2185,7 +2184,7 @@ async def test_custom_token_exchange_missing_actor_token_type(): ) # Act & Assert - with pytest.raises(CustomTokenExchangeError) as exc: + with pytest.raises(ValidationError) as exc: await client.custom_token_exchange( CustomTokenExchangeOptions( subject_token="token", @@ -2194,7 +2193,6 @@ async def test_custom_token_exchange_missing_actor_token_type(): actor_token_type=None ) ) - assert exc.value.code == CustomTokenExchangeErrorCode.INVALID_TOKEN_FORMAT assert "actor_token_type" in str(exc.value).lower() @@ -2500,7 +2498,7 @@ async def test_login_with_custom_token_exchange_success(mocker): ) # Assert - assert "state_data" in result.state_data + assert result.state_data is not None assert result.state_data["user"]["sub"] == "user123" assert result.state_data["user"]["name"] == "John Doe" assert result.state_data["id_token"] == id_token From ad960ed41c61c214931f8c1d678a68e864219b0a Mon Sep 17 00:00:00 2001 From: Subhankar Maiti Date: Tue, 3 Feb 2026 12:44:05 +0530 Subject: [PATCH 08/10] feat(auth): Add organization parameter to ServerClient for custom token exchange --- src/auth0_server_python/auth_server/server_client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/auth0_server_python/auth_server/server_client.py b/src/auth0_server_python/auth_server/server_client.py index 406d991..cf53bee 100644 --- a/src/auth0_server_python/auth_server/server_client.py +++ b/src/auth0_server_python/auth_server/server_client.py @@ -1645,6 +1645,7 @@ async def login_with_custom_token_exchange( scope=options.scope, actor_token=options.actor_token, actor_token_type=options.actor_token_type, + organization=options.organization, authorization_params=options.authorization_params ) From 9065993e62ec82280f76cf093cc5c8b98111f2d5 Mon Sep 17 00:00:00 2001 From: Subhankar Maiti Date: Tue, 3 Feb 2026 13:02:43 +0530 Subject: [PATCH 09/10] fix: import order issue --- src/auth0_server_python/tests/test_server_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/auth0_server_python/tests/test_server_client.py b/src/auth0_server_python/tests/test_server_client.py index 885de8a..24cf5dc 100644 --- a/src/auth0_server_python/tests/test_server_client.py +++ b/src/auth0_server_python/tests/test_server_client.py @@ -4,7 +4,6 @@ from urllib.parse import parse_qs, urlparse import pytest -from pydantic_core import ValidationError from auth0_server_python.auth_server.my_account_client import MyAccountClient from auth0_server_python.auth_server.server_client import ServerClient from auth0_server_python.auth_types import ( @@ -30,6 +29,7 @@ StartLinkUserError, ) from auth0_server_python.utils import PKCE +from pydantic_core import ValidationError @pytest.mark.asyncio From 72a4b28a71d98083ab84c9a5384c108704525b14 Mon Sep 17 00:00:00 2001 From: Snehil Kishore Date: Fri, 6 Feb 2026 16:58:59 +0530 Subject: [PATCH 10/10] fix: reorder import statements for consistency --- .snyk | 4 ++++ src/auth0_server_python/auth_server/server_client.py | 4 ++-- src/auth0_server_python/tests/test_server_client.py | 2 +- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/.snyk b/.snyk index 4eaa56f..7d0fc1c 100644 --- a/.snyk +++ b/.snyk @@ -21,4 +21,8 @@ ignore: - '*': reason: "Accepting the Unknown license for now" expires: "2030-12-31T23:59:59Z" + "snyk:lic:pip:cryptography:Unknown": + - '*': + reason: "Accepting the Unknown license for now" + expires: "2030-12-31T23:59:59Z" patch: {} diff --git a/src/auth0_server_python/auth_server/server_client.py b/src/auth0_server_python/auth_server/server_client.py index b403c84..57452f0 100644 --- a/src/auth0_server_python/auth_server/server_client.py +++ b/src/auth0_server_python/auth_server/server_client.py @@ -18,10 +18,10 @@ ConnectAccountOptions, ConnectAccountRequest, CustomTokenExchangeOptions, - LoginWithCustomTokenExchangeOptions, - LoginWithCustomTokenExchangeResult, ListConnectedAccountConnectionsResponse, ListConnectedAccountsResponse, + LoginWithCustomTokenExchangeOptions, + LoginWithCustomTokenExchangeResult, LogoutOptions, LogoutTokenClaims, StartInteractiveLoginOptions, diff --git a/src/auth0_server_python/tests/test_server_client.py b/src/auth0_server_python/tests/test_server_client.py index f51aab8..4f1b90b 100644 --- a/src/auth0_server_python/tests/test_server_client.py +++ b/src/auth0_server_python/tests/test_server_client.py @@ -15,9 +15,9 @@ ConnectedAccountConnection, ConnectParams, CustomTokenExchangeOptions, - LoginWithCustomTokenExchangeOptions, ListConnectedAccountConnectionsResponse, ListConnectedAccountsResponse, + LoginWithCustomTokenExchangeOptions, LogoutOptions, TransactionData, )