Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions api/app/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from oauth2_metadata.views import (
DynamicClientRegistrationView,
OAuthAuthorizeView,
authorization_server_metadata,
)
from users.views import password_reset_redirect
Expand Down Expand Up @@ -56,6 +57,11 @@
"robots.txt",
TemplateView.as_view(template_name="robots.txt", content_type="text/plain"),
),
path(
"api/v1/oauth/authorize/",
OAuthAuthorizeView.as_view(),
name="oauth-authorize",
),
path(
"o/register/",
DynamicClientRegistrationView.as_view(),
Expand Down
12 changes: 12 additions & 0 deletions api/oauth2_metadata/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@

from oauth2_metadata.services import validate_redirect_uri


class OAuthConsentSerializer(serializers.Serializer): # type: ignore[type-arg]
allow = serializers.BooleanField()
client_id = serializers.CharField()
redirect_uri = serializers.CharField()
response_type = serializers.CharField()
scope = serializers.CharField(required=False, default="mcp")
code_challenge = serializers.CharField()
code_challenge_method = serializers.CharField()
state = serializers.CharField(required=False, allow_blank=True, default="")


# Allow ASCII letters, digits, spaces, hyphens, underscores, dots, and parentheses.
# ASCII-only to prevent Unicode homoglyph spoofing on the consent screen.
_CLIENT_NAME_RE = re.compile(r"^[\w\s.\-()]+$", re.ASCII)
Expand Down
100 changes: 97 additions & 3 deletions api/oauth2_metadata/views.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
from typing import Any
from urllib.parse import urlencode, urlparse, urlunparse

from django.conf import settings
from django.http import HttpRequest, JsonResponse
from django.http import HttpRequest, JsonResponse, QueryDict
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_GET
from oauth2_provider.exceptions import OAuthToolkitError
from oauth2_provider.models import get_application_model
from oauth2_provider.scopes import get_scopes_backend
from oauth2_provider.views.mixins import OAuthLibMixin
from rest_framework import status
from rest_framework import status as drf_status
from rest_framework.permissions import AllowAny
from rest_framework.permissions import AllowAny, IsAuthenticated
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.throttling import ScopedRateThrottle
from rest_framework.views import APIView

from oauth2_metadata.serializers import DCRRequestSerializer
from oauth2_metadata.serializers import DCRRequestSerializer, OAuthConsentSerializer
from oauth2_metadata.services import create_oauth2_application


Expand Down Expand Up @@ -46,6 +52,94 @@
return JsonResponse(metadata)


class OAuthAuthorizeView(OAuthLibMixin, APIView): # type: ignore[misc]
"""Validate an OAuth authorisation request and process consent decisions."""

permission_classes = [IsAuthenticated]

def get(self, request: Request, *args: Any, **kwargs: Any) -> Response:
"""Validate an authorisation request and return application info."""
# Bridge DRF auth to Django request so DOT sees the authenticated user.
request._request.user = request.user # type: ignore[assignment]

Check failure on line 63 in api/oauth2_metadata/views.py

View workflow job for this annotation

GitHub Actions / API Unit Tests (3.11)

Unused "type: ignore" comment

try:
scopes, credentials = self.validate_authorization_request(request._request)
except OAuthToolkitError as e:
oauthlib_error = e.oauthlib_error
return Response(
{
"error": getattr(oauthlib_error, "error", "invalid_request"),
"error_description": getattr(oauthlib_error, "description", str(e)),
},
status=status.HTTP_400_BAD_REQUEST,
)

Application = get_application_model()
application = Application.objects.get(
client_id=credentials["client_id"],
)
all_scopes = get_scopes_backend().get_all_scopes()
scopes_dict: dict[str, str] = {s: all_scopes.get(s, s) for s in scopes}
return Response(
{
"application": {
"name": application.name,
"client_id": application.client_id,
},
"scopes": scopes_dict,
"redirect_uri": credentials.get("redirect_uri", ""),
"is_verified": bool(application.skip_authorization),
}
)

def post(self, request: Request, *args: Any, **kwargs: Any) -> Response:
"""Process a consent decision and return the redirect URI."""
serializer = OAuthConsentSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
data: dict[str, Any] = serializer.validated_data
allow: bool = data.pop("allow")

# Bridge DRF auth to Django request so DOT sees the authenticated user.
request._request.user = request.user # type: ignore[assignment]

Check failure on line 103 in api/oauth2_metadata/views.py

View workflow job for this annotation

GitHub Actions / API Unit Tests (3.11)

Unused "type: ignore" comment

# DOT's validate_authorization_request reads OAuth params from GET
# and also from request.get_full_path() which uses META['QUERY_STRING'].
query = QueryDict(mutable=True)
for key, value in data.items():
query[key] = str(value)
request._request.GET = query

Check failure on line 110 in api/oauth2_metadata/views.py

View workflow job for this annotation

GitHub Actions / API Unit Tests (3.11)

Incompatible types in assignment (expression has type "QueryDict", variable has type "_ImmutableQueryDict")
request._request.META["QUERY_STRING"] = query.urlencode()

try:
scopes, credentials = self.validate_authorization_request(request._request)
except OAuthToolkitError as e:
oauthlib_error = e.oauthlib_error
return Response(
{
"error": getattr(oauthlib_error, "error", "invalid_request"),
"error_description": getattr(oauthlib_error, "description", str(e)),
},
status=status.HTTP_400_BAD_REQUEST,
)

try:
scopes_str = " ".join(scopes) if isinstance(scopes, list) else scopes
uri, _headers, _body, _status = self.create_authorization_response(
request._request, scopes_str, credentials, allow
)
except OAuthToolkitError:
# User denied access -- build the error redirect manually.
redirect_uri = credentials.get("redirect_uri", data.get("redirect_uri", ""))
state = credentials.get("state", data.get("state", ""))
error_params: dict[str, str] = {"error": "access_denied"}
if state:
error_params["state"] = state
parsed = urlparse(str(redirect_uri))
uri = urlunparse(parsed._replace(query=urlencode(error_params)))

return Response({"redirect_uri": uri})


class DynamicClientRegistrationView(APIView):
"""RFC 7591 Dynamic Client Registration endpoint."""

Expand Down
5 changes: 3 additions & 2 deletions api/oauth2_test_server.mjs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { createServer } from "node:http";
import { randomBytes, createHash } from "node:crypto";

const CLIENT_ID = "B4wAl37pg9y1PRsIvAXZ14cTp0FpqpNCtMSI7ETC";
const CLIENT_ID = "PVuLryS7ISh5gveydoLafTt02q1jMsCiwwOVoMy6";
const REDIRECT_URI = "http://localhost:3000/oauth/callback";
const FRONTEND_URL = "http://localhost:8080";
const API_URL = "http://localhost:8000";
const PORT = 3000;

Expand All @@ -13,7 +14,7 @@ const codeChallenge = createHash("sha256")
.digest("base64url");

const authorizeUrl =
`${API_URL}/o/authorize/?` +
`${FRONTEND_URL}/oauth/authorize?` +
new URLSearchParams({
response_type: "code",
client_id: CLIENT_ID,
Expand Down
Loading
Loading