Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@ Unreleased
----------


[4.6.1] - 2025-09-23
--------------------

Added
~~~~~

* Enhanced OAuth2 authentication backend to logout any old session before initiating a new OAuth2 flow. This prevents user association conflicts with the previously logged-in user.

* Added temporary rollout toggle ENABLE_OAUTH_SESSION_CLEANUP to control session cleanup during OAuth start process.

[4.6.0] - 2025-06-18
--------------------

Expand Down
2 changes: 1 addition & 1 deletion auth_backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
These package is designed to be used primarily with Open edX Django projects, but should be compatible with non-edX
projects as well.
"""
__version__ = '4.6.0' # pragma: no cover
__version__ = '4.6.1' # pragma: no cover
69 changes: 68 additions & 1 deletion auth_backends/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,27 @@

For more information visit https://docs.djangoproject.com/en/dev/topics/auth/customizing/.
"""
import logging
import jwt
from django.contrib.auth import logout
from django.dispatch import Signal
from social_core.backends.oauth import BaseOAuth2
from edx_toggles.toggles import SettingToggle
from edx_django_utils.monitoring import set_custom_attribute

logger = logging.getLogger(__name__)

# .. toggle_name: ENABLE_OAUTH_SESSION_CLEANUP
# .. toggle_implementation: SettingToggle
# .. toggle_default: False
# .. toggle_description: Controls whether to perform session cleanup during OAuth start.
# When enabled (True), existing user sessions are cleared before OAuth authentication
# to prevent user association conflicts. When disabled (False), session cleanup is skipped.
# This toggle allows for gradual rollout and quick rollback if issues arise.
# .. toggle_use_cases: temporary
# .. toggle_creation_date: 2025-09-25
# .. toggle_target_removal_date: 2025-11-25
ENABLE_OAUTH_SESSION_CLEANUP = SettingToggle("ENABLE_OAUTH_SESSION_CLEANUP", default=False)

PROFILE_CLAIMS_TO_DETAILS_KEY_MAP = {
'preferred_username': 'username',
Expand All @@ -31,7 +49,6 @@ def _to_language(locale):
return locale.replace('_', '-').lower()


# pylint: disable=abstract-method
class EdXOAuth2(BaseOAuth2):
"""
IMPORTANT: The oauth2 application must have access to the ``user_id`` scope in order
Expand Down Expand Up @@ -70,6 +87,56 @@ def logout_url(self):
else:
return self.end_session_url()

def start(self):
"""Initialize OAuth authentication with optional session cleanup."""

# .. custom_attribute_name: session_cleanup.toggle_enabled
# .. custom_attribute_description: Tracks whether the ENABLE_OAUTH_SESSION_CLEANUP
# toggle is enabled during OAuth start.
set_custom_attribute('session_cleanup.toggle_enabled', ENABLE_OAUTH_SESSION_CLEANUP.is_enabled())

request = self.strategy.request if hasattr(self.strategy, 'request') else None

# .. custom_attribute_name: session_cleanup.has_request
# .. custom_attribute_description: Tracks whether a request object is available
# during OAuth start. True if request exists, False if missing.
set_custom_attribute('session_cleanup.has_request', request is not None)

user_authenticated = (
request is not None and
hasattr(request, 'user') and
request.user.is_authenticated
)

# .. custom_attribute_name: session_cleanup.logout_required
# .. custom_attribute_description: Tracks whether a user was authenticated
# before session cleanup. True if user was logged in, False otherwise.
set_custom_attribute('session_cleanup.logout_required', user_authenticated)

if user_authenticated and ENABLE_OAUTH_SESSION_CLEANUP.is_enabled():
existing_username = getattr(request.user, 'username', 'unknown')

# .. custom_attribute_name: session_cleanup.logged_out_username
# .. custom_attribute_description: Records the username that was logged out
# during session cleanup for tracking and debugging purposes.
set_custom_attribute('session_cleanup.logged_out_username', existing_username)

logger.info(
"OAuth start: Performing session cleanup for user '%s'",
existing_username
)

logout(request)

# .. custom_attribute_name: session_cleanup.logout_performed
# .. custom_attribute_description: Indicates that session cleanup was
# actually performed during OAuth start.
set_custom_attribute('session_cleanup.logout_performed', True)
else:
set_custom_attribute('session_cleanup.logout_performed', False)

return super().start()

def authorization_url(self):
url_root = self.get_public_or_internal_url_root()
return f'{url_root}/oauth2/authorize'
Expand Down
79 changes: 76 additions & 3 deletions auth_backends/tests/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,25 @@
import datetime
import json
from calendar import timegm
from unittest.mock import patch, call

import ddt
import jwt
import pytest
import responses
import six
from Cryptodome.PublicKey import RSA
from django.contrib.auth import get_user_model
from django.contrib.sessions.middleware import SessionMiddleware
from django.core.cache import cache
from django.test import RequestFactory
from django.test.utils import override_settings
from social_core.tests.backends.oauth import OAuth2Test

User = get_user_model()


@ddt.ddt
class EdXOAuth2Tests(OAuth2Test):
""" Tests for the EdXOAuth2 backend. """

Expand Down Expand Up @@ -37,10 +48,13 @@ def set_social_auth_setting(self, setting_name, value):
# does not rely on Django settings.
self.strategy.set_settings({f'SOCIAL_AUTH_{backend_name}_{setting_name}': value})

def access_token_body(self, request, _url, headers):
def access_token_body(self, request):
""" Generates a response from the provider's access token endpoint. """
# The backend should always request JWT access tokens, not Bearer.
body = six.moves.urllib.parse.parse_qs(request.body.decode('utf8'))
body_content = request.body
if isinstance(body_content, bytes):
body_content = body_content.decode('utf8')
body = six.moves.urllib.parse.parse_qs(body_content)
self.assertEqual(body['token_type'], ['jwt'])

expires_in = 3600
Expand All @@ -51,7 +65,16 @@ def access_token_body(self, request, _url, headers):
'expires_in': expires_in,
'access_token': access_token
})
return 200, headers, body
return (200, {}, body)

def pre_complete_callback(self, start_url):
""" Override to properly set up the access token response with callback. """
responses.add_callback(
responses.POST,
url=self.backend.access_token_url(),
callback=self.access_token_body,
content_type="application/json",
)

def create_jwt_access_token(self, expires_in=3600, issuer=None, key=None, alg='RS512'):
"""
Expand Down Expand Up @@ -103,6 +126,56 @@ def extra_settings(self):
def test_login(self):
self.do_login()

@pytest.mark.django_db
@ddt.data(True, False) # Test session cleanup with both toggle enabled and disabled
@patch('auth_backends.backends.set_custom_attribute')
@patch('auth_backends.backends.logger')
def test_start_with_session_cleanup(self, toggle_enabled, mock_logger, mock_set_attr):
"""Test start method for session cleanup of existing user with toggle variation."""
with override_settings(ENABLE_OAUTH_SESSION_CLEANUP=toggle_enabled):
existing_user = User.objects.create_user(username='existing_user', email='existing@example.com')

request = RequestFactory().get('/auth/login/edx-oauth2/')
request.user = existing_user

middleware = SessionMiddleware(lambda req: None)
middleware.process_request(request)
request.session.save()

initial_session_key = request.session.session_key

self.backend.strategy.request = request

self.do_start()

if toggle_enabled:
self.assertNotEqual(request.session.session_key, initial_session_key)

self.assertTrue(request.user.is_anonymous)

mock_set_attr.assert_has_calls([
call('session_cleanup.toggle_enabled', True),
call('session_cleanup.logout_performed', True),
call('session_cleanup.logged_out_username', 'existing_user')
], any_order=True)

mock_logger.info.assert_called_with(
"OAuth start: Performing session cleanup for user '%s'",
'existing_user'
)
else:
self.assertEqual(request.session.session_key, initial_session_key)

self.assertEqual(request.user, existing_user)
self.assertFalse(request.user.is_anonymous)

mock_set_attr.assert_has_calls([
call('session_cleanup.toggle_enabled', False),
call('session_cleanup.logout_performed', False)
], any_order=True)

mock_logger.info.assert_not_called()

def test_partial_pipeline(self):
self.do_partial_pipeline()

Expand Down
7 changes: 5 additions & 2 deletions requirements/test.in
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
-r base.txt # Core dependencies

coverage
ddt # for running multiple test cases with multiple input
edx-lint
httpretty
pycodestyle
pycryptodomex # used for crypto tests
pycryptodomex # used for crypto tests
pytest-cov
pytest-django
responses # required by ddt
tox
typing_extensions # required by ddt
unittest2
edx-django-release-util # Contains the reserved keyword check
edx-django-release-util # Contains the reserved keyword check
Loading