From 7af382835a5a7bc438f8fce4e3c24aa77c4345f2 Mon Sep 17 00:00:00 2001 From: avdunn Date: Fri, 18 Jul 2025 14:26:37 -0700 Subject: [PATCH 1/4] Add OIDC issuer validation and related tests --- msal/authority.py | 57 ++++++++++++++++- tests/test_application.py | 1 + tests/test_authority.py | 127 ++++++++++++++++++++++++++++++++++---- 3 files changed, 172 insertions(+), 13 deletions(-) diff --git a/msal/authority.py b/msal/authority.py index faf11603..201c55ca 100644 --- a/msal/authority.py +++ b/msal/authority.py @@ -67,6 +67,7 @@ def __init__( performed. """ self._http_client = http_client + self._oidc_authority_url = oidc_authority_url if oidc_authority_url: logger.debug("Initializing with OIDC authority: %s", oidc_authority_url) tenant_discovery_endpoint = self._initialize_oidc_authority( @@ -95,11 +96,19 @@ def __init__( raise ValueError(error_message) logger.debug( 'openid_config("%s") = %s', tenant_discovery_endpoint, openid_config) + self._issuer = openid_config.get('issuer') self.authorization_endpoint = openid_config['authorization_endpoint'] self.token_endpoint = openid_config['token_endpoint'] self.device_authorization_endpoint = openid_config.get('device_authorization_endpoint') _, _, self.tenant = canonicalize(self.token_endpoint) # Usually a GUID + # Validate the issuer if using OIDC authority + if self._oidc_authority_url and not self.has_valid_issuer(): + raise ValueError(( + "The issuer '{iss}' does not match the authority '{auth}' or a known pattern. " + "When using the 'oidc_authority' parameter in ClientApplication, the authority " + "will be validated against the issuer from {auth}/.well-known/openid-configuration ." + ).format(iss=self._issuer, auth=oidc_authority_url)) def _initialize_oidc_authority(self, oidc_authority_url): authority, self.instance, tenant = canonicalize(oidc_authority_url) self.is_adfs = tenant.lower() == 'adfs' # As a convention @@ -174,6 +183,53 @@ def user_realm_discovery(self, username, correlation_id=None, response=None): self.__class__._domains_without_user_realm_discovery.add(self.instance) return {} # This can guide the caller to fall back normal ROPC flow + def has_valid_issuer(self) -> bool: + """ + Returns True if the issuer from OIDC discovery is valid for this authority. + + An issuer is valid if one of the following is true: + - It exactly matches the authority URL + - It has a known Microsoft host (e.g., login.microsoftonline.com) + - It has the same scheme and host as the authority (path can be different) + - For CIAM, the issuer follows the pattern of {tenant}.ciamlogin.com (tenant comes from the authority) + """ + if self._oidc_authority_url == self._issuer: + # The issuer matches the authority URL exactly + return True + + issuer = urlparse(self._issuer) if self._issuer else None + if not issuer: + return False + + # Check if issuer has a known Microsoft host + if issuer.hostname in WELL_KNOWN_AUTHORITY_HOSTS: + return True + + # Check if issuer has the same scheme and host as the authority + authority = urlparse(self._oidc_authority_url) + if authority.scheme == issuer.scheme and authority.netloc == issuer.netloc: + return True + + # Check CIAM scenario: issuer follows the pattern {tenant}.ciamlogin.com + # Extract tenant from authority URL - could be in the host or path + tenant = None + if authority.hostname.endswith(_CIAM_DOMAIN_SUFFIX): + # Normal CIAM host: {tenant}.ciamlogin.com + tenant = authority.hostname.rsplit(_CIAM_DOMAIN_SUFFIX, 1)[0] + else: + # Custom CIAM host: extract tenant from path + parts = authority.path.split('/') + if len(parts) >= 2 and parts[1]: + tenant = parts[1] # First path segment after the initial '/' + + if tenant and issuer.hostname.endswith(_CIAM_DOMAIN_SUFFIX): + # Check if issuer follows the pattern {tenant}.ciamlogin.com + expected_issuer_host = f"{tenant}{_CIAM_DOMAIN_SUFFIX}" + if issuer.hostname == expected_issuer_host: + return True + + # None of the conditions matched + return False def canonicalize(authority_or_auth_endpoint): # Returns (url_parsed_result, hostname_in_lowercase, tenant) @@ -222,4 +278,3 @@ def tenant_discovery(tenant_discovery_endpoint, http_client, **kwargs): resp.raise_for_status() raise RuntimeError( # A fallback here, in case resp.raise_for_status() is no-op "Unable to complete OIDC Discovery: %d, %s" % (resp.status_code, resp.text)) - diff --git a/tests/test_application.py b/tests/test_application.py index 16e512c4..84e562d9 100644 --- a/tests/test_application.py +++ b/tests/test_application.py @@ -796,6 +796,7 @@ def test_should_fallback_when_pymsalruntime_failed_to_initialize_broker(self): @patch("msal.authority.tenant_discovery", new=Mock(return_value={ "authorization_endpoint": "https://contoso.com/placeholder", "token_endpoint": "https://contoso.com/placeholder", + "issuer": "https://contoso.com/placeholder", })) @patch("msal.application._init_broker", new=Mock()) # Pretend pymsalruntime installed and working class TestBrokerFallbackWithDifferentAuthorities(unittest.TestCase): diff --git a/tests/test_authority.py b/tests/test_authority.py index 3fd1fce1..1925bd1a 100644 --- a/tests/test_authority.py +++ b/tests/test_authority.py @@ -6,6 +6,7 @@ import msal from msal.authority import * +from msal.authority import _CIAM_DOMAIN_SUFFIX # Explicitly import the private constant from tests import unittest from tests.http_client import MinimalHttpClient @@ -100,12 +101,12 @@ def test_authority_with_path_should_be_used_as_is(self, oidc_discovery): @patch("msal.authority._instance_discovery") -@patch("msal.authority.tenant_discovery", return_value={ - "authorization_endpoint": "https://contoso.com/authorize", - "token_endpoint": "https://contoso.com/token", - }) +@patch("msal.authority.tenant_discovery") # Moved return_value out of the decorator class OidcAuthorityTestCase(unittest.TestCase): authority = "https://contoso.com/tenant" + authorization_endpoint = "https://contoso.com/authorize" + token_endpoint = "https://contoso.com/token" + issuer = "https://contoso.com/tenant" # Added as class variable for inheritance def setUp(self): # setUp() gives subclass a dynamic setup based on their authority @@ -115,25 +116,37 @@ def setUp(self): # Here the test is to confirm the OIDC endpoint contains no "/v2.0" self.authority + "/.well-known/openid-configuration") + def setup_tenant_discovery(self, tenant_discovery): + """Configure the tenant_discovery mock with class-specific values""" + tenant_discovery.return_value = { + "authorization_endpoint": self.authorization_endpoint, + "token_endpoint": self.token_endpoint, + "issuer": self.issuer, + } + def test_authority_obj_should_do_oidc_discovery_and_skip_instance_discovery( self, oidc_discovery, instance_discovery): + self.setup_tenant_discovery(oidc_discovery) + c = MinimalHttpClient() a = Authority(None, c, oidc_authority_url=self.authority) instance_discovery.assert_not_called() oidc_discovery.assert_called_once_with(self.oidc_discovery_endpoint, c) - self.assertEqual(a.authorization_endpoint, 'https://contoso.com/authorize') - self.assertEqual(a.token_endpoint, 'https://contoso.com/token') + self.assertEqual(a.authorization_endpoint, self.authorization_endpoint) + self.assertEqual(a.token_endpoint, self.token_endpoint) def test_application_obj_should_do_oidc_discovery_and_skip_instance_discovery( self, oidc_discovery, instance_discovery): + self.setup_tenant_discovery(oidc_discovery) + app = msal.ClientApplication( "id", authority=None, oidc_authority=self.authority) instance_discovery.assert_not_called() oidc_discovery.assert_called_once_with( self.oidc_discovery_endpoint, app.http_client) self.assertEqual( - app.authority.authorization_endpoint, 'https://contoso.com/authorize') - self.assertEqual(app.authority.token_endpoint, 'https://contoso.com/token') + app.authority.authorization_endpoint, self.authorization_endpoint) + self.assertEqual(app.authority.token_endpoint, self.token_endpoint) class DstsAuthorityTestCase(OidcAuthorityTestCase): @@ -144,14 +157,14 @@ class DstsAuthorityTestCase(OidcAuthorityTestCase): "https://some.url.dsts.core.azure-test.net/dstsv2/common/oauth2/authorize") token_endpoint = ( "https://some.url.dsts.core.azure-test.net/dstsv2/common/oauth2/token") + issuer = "https://test-instance1-dsts.dsts.core.azure-test.net/dstsv2/common" @patch("msal.authority._instance_discovery") - @patch("msal.authority.tenant_discovery", return_value={ - "authorization_endpoint": authorization_endpoint, - "token_endpoint": token_endpoint, - }) # We need to create new patches (i.e. mocks) for non-inherited test cases + @patch("msal.authority.tenant_discovery") # Remove the hard-coded return_value def test_application_obj_should_accept_dsts_url_as_an_authority( self, oidc_discovery, instance_discovery): + self.setup_tenant_discovery(oidc_discovery) + app = msal.ClientApplication("id", authority=self.authority) instance_discovery.assert_not_called() oidc_discovery.assert_called_once_with( @@ -274,3 +287,93 @@ def _test_turning_off_instance_discovery_should_skip_authority_validation_and_in app.get_accounts() # This could make an instance metadata call for authority aliases instance_metadata.assert_not_called() + +class TestAuthorityIssuerValidation(unittest.TestCase): + """Test cases for authority.has_valid_issuer method """ + + def setUp(self): + self.http_client = MinimalHttpClient() + + def _create_authority_with_issuer(self, oidc_authority_url, issuer, tenant_discovery_mock): + tenant_discovery_mock.return_value = { + "authorization_endpoint": "https://example.com/oauth2/authorize", + "token_endpoint": "https://example.com/oauth2/token", + "issuer": issuer, + } + authority = Authority( + None, + self.http_client, + oidc_authority_url=oidc_authority_url + ) + return authority + + @patch("msal.authority.tenant_discovery") + def test_exact_match_issuer(self, tenant_discovery_mock): + """Test when issuer exactly matches the OIDC authority URL""" + authority_url = "https://example.com/tenant" + authority = self._create_authority_with_issuer(authority_url, authority_url, tenant_discovery_mock) + self.assertTrue(authority.has_valid_issuer(), "Issuer should be valid when it exactly matches the authority URL") + + @patch("msal.authority.tenant_discovery") + def test_no_issuer(self, tenant_discovery_mock): + """Test when no issuer is returned from OIDC discovery""" + authority_url = "https://example.com/tenant" + tenant_discovery_mock.return_value = { + "authorization_endpoint": "https://example.com/oauth2/authorize", + "token_endpoint": "https://example.com/oauth2/token", + # No issuer key + } + # Since initialization now checks for valid issuer, we expect it to raise ValueError + with self.assertRaises(ValueError) as context: + Authority(None, self.http_client, oidc_authority_url=authority_url) + self.assertIn("issuer", str(context.exception).lower()) + + @patch("msal.authority.tenant_discovery") + def test_microsoft_host_issuer(self, tenant_discovery_mock): + """Test when issuer has a known Microsoft host""" + authority_url = "https://custom-domain.com/tenant" + issuer = f"https://{WORLD_WIDE}/tenant" + authority = self._create_authority_with_issuer(authority_url, issuer, tenant_discovery_mock) + self.assertTrue(authority.has_valid_issuer(), "Issuer should be valid when it has a known Microsoft host") + + @patch("msal.authority.tenant_discovery") + def test_same_scheme_and_host_different_path(self, tenant_discovery_mock): + """Test when issuer has same scheme and host but different path""" + authority_url = "https://example.com/tenant" + issuer = "https://example.com/different/path" + authority = self._create_authority_with_issuer(authority_url, issuer, tenant_discovery_mock) + self.assertTrue(authority.has_valid_issuer(), "Issuer should be valid when it has the same scheme and host") + + @patch("msal.authority.tenant_discovery") + def test_ciam_authority_with_matching_tenant(self, tenant_discovery_mock): + """Test CIAM authority with matching tenant in path""" + authority_url = "https://custom-domain.com/tenant_name" + issuer = f"https://tenant_name{_CIAM_DOMAIN_SUFFIX}" + authority = self._create_authority_with_issuer(authority_url, issuer, tenant_discovery_mock) + self.assertTrue(authority.has_valid_issuer(), "Issuer should be valid for CIAM pattern with matching tenant") + + @patch("msal.authority.tenant_discovery") + def test_ciam_authority_with_host_tenant(self, tenant_discovery_mock): + """Test CIAM authority with tenant in hostname""" + tenant_name = "tenant_name" + authority_url = f"https://{tenant_name}{_CIAM_DOMAIN_SUFFIX}/custom/path" + issuer = f"https://{tenant_name}{_CIAM_DOMAIN_SUFFIX}" + authority = self._create_authority_with_issuer(authority_url, issuer, tenant_discovery_mock) + self.assertTrue(authority.has_valid_issuer(), "Issuer should be valid for CIAM pattern with tenant in hostname") + + @patch("msal.authority.tenant_discovery") + def test_invalid_issuer(self, tenant_discovery_mock): + """Test when issuer is completely different from authority""" + authority_url = "https://example.com/tenant" + issuer = "https://malicious-site.com/tenant" + tenant_discovery_mock.return_value = { + "authorization_endpoint": "https://example.com/oauth2/authorize", + "token_endpoint": "https://example.com/oauth2/token", + "issuer": issuer, + } + # Since initialization now checks for valid issuer, we expect it to raise ValueError + with self.assertRaises(ValueError) as context: + Authority(None, self.http_client, oidc_authority_url=authority_url) + self.assertIn("issuer", str(context.exception).lower()) + self.assertIn(issuer, str(context.exception)) + self.assertIn(authority_url, str(context.exception)) From 2b697fcfb39ce1fa03ef8383634f2569aa7ff207 Mon Sep 17 00:00:00 2001 From: avdunn Date: Thu, 24 Jul 2025 14:31:20 -0700 Subject: [PATCH 2/4] Remove check on known hosts and adjust error message towards regular authority API --- msal/authority.py | 8 +++---- tests/test_authority.py | 51 ++++++++++++++++++++++++++++++++++------- 2 files changed, 46 insertions(+), 13 deletions(-) diff --git a/msal/authority.py b/msal/authority.py index 201c55ca..dbc91399 100644 --- a/msal/authority.py +++ b/msal/authority.py @@ -108,6 +108,9 @@ def __init__( "The issuer '{iss}' does not match the authority '{auth}' or a known pattern. " "When using the 'oidc_authority' parameter in ClientApplication, the authority " "will be validated against the issuer from {auth}/.well-known/openid-configuration ." + "If using a known Entra authority (e.g. login.microsoftonline.com) the " + "'authority' parameter should be used instead of 'oidc_authority'. " + "" ).format(iss=self._issuer, auth=oidc_authority_url)) def _initialize_oidc_authority(self, oidc_authority_url): authority, self.instance, tenant = canonicalize(oidc_authority_url) @@ -189,7 +192,6 @@ def has_valid_issuer(self) -> bool: An issuer is valid if one of the following is true: - It exactly matches the authority URL - - It has a known Microsoft host (e.g., login.microsoftonline.com) - It has the same scheme and host as the authority (path can be different) - For CIAM, the issuer follows the pattern of {tenant}.ciamlogin.com (tenant comes from the authority) """ @@ -201,10 +203,6 @@ def has_valid_issuer(self) -> bool: if not issuer: return False - # Check if issuer has a known Microsoft host - if issuer.hostname in WELL_KNOWN_AUTHORITY_HOSTS: - return True - # Check if issuer has the same scheme and host as the authority authority = urlparse(self._oidc_authority_url) if authority.scheme == issuer.scheme and authority.netloc == issuer.netloc: diff --git a/tests/test_authority.py b/tests/test_authority.py index 1925bd1a..4b30f3b5 100644 --- a/tests/test_authority.py +++ b/tests/test_authority.py @@ -328,14 +328,6 @@ def test_no_issuer(self, tenant_discovery_mock): Authority(None, self.http_client, oidc_authority_url=authority_url) self.assertIn("issuer", str(context.exception).lower()) - @patch("msal.authority.tenant_discovery") - def test_microsoft_host_issuer(self, tenant_discovery_mock): - """Test when issuer has a known Microsoft host""" - authority_url = "https://custom-domain.com/tenant" - issuer = f"https://{WORLD_WIDE}/tenant" - authority = self._create_authority_with_issuer(authority_url, issuer, tenant_discovery_mock) - self.assertTrue(authority.has_valid_issuer(), "Issuer should be valid when it has a known Microsoft host") - @patch("msal.authority.tenant_discovery") def test_same_scheme_and_host_different_path(self, tenant_discovery_mock): """Test when issuer has same scheme and host but different path""" @@ -377,3 +369,46 @@ def test_invalid_issuer(self, tenant_discovery_mock): self.assertIn("issuer", str(context.exception).lower()) self.assertIn(issuer, str(context.exception)) self.assertIn(authority_url, str(context.exception)) + + @patch("msal.authority.tenant_discovery") + def test_custom_authority_with_microsoft_issuer(self, tenant_discovery_mock): + """Test when custom authority is used with a known Microsoft issuer (should fail)""" + authority_url = "https://custom-domain.com/tenant" + issuer = f"https://{WORLD_WIDE}/tenant" + + tenant_discovery_mock.return_value = { + "authorization_endpoint": "https://example.com/oauth2/authorize", + "token_endpoint": "https://example.com/oauth2/token", + "issuer": issuer, + } + + # Since initialization now checks for valid issuer and we removed the check for known hosts, + # we expect it to raise ValueError because the hosts don't match + with self.assertRaises(ValueError) as context: + Authority(None, self.http_client, oidc_authority_url=authority_url) + + self.assertIn("issuer", str(context.exception).lower()) + self.assertIn(issuer, str(context.exception)) + self.assertIn(authority_url, str(context.exception)) + + @patch("msal.authority.tenant_discovery") + def test_known_authority_with_non_matching_issuer(self, tenant_discovery_mock): + """Test when known authority is used with an issuer that doesn't match (should fail)""" + # Known Microsoft authority URLs + authority_url = f"https://{WORLD_WIDE}/tenant" + issuer = "https://custom-domain.com/tenant" + + tenant_discovery_mock.return_value = { + "authorization_endpoint": "https://example.com/oauth2/authorize", + "token_endpoint": "https://example.com/oauth2/token", + "issuer": issuer, + } + + # We expect it to raise ValueError because the paths don't match + # and we're now checking for exact matches + with self.assertRaises(ValueError) as context: + Authority(None, self.http_client, oidc_authority_url=authority_url) + + self.assertIn("issuer", str(context.exception).lower()) + self.assertIn(issuer, str(context.exception)) + self.assertIn(authority_url, str(context.exception)) From 6844506efd4aed06f4d65d1b9a2936ef3edb275c Mon Sep 17 00:00:00 2001 From: Nilesh Choudhary Date: Thu, 18 Dec 2025 16:42:09 +0000 Subject: [PATCH 3/4] Updated the trusted host list and added region check logic --- msal/authority.py | 96 +++++++++++++++++++++++------------ tests/test_authority.py | 109 ++++++++++++++++++++++++++++++++++------ 2 files changed, 158 insertions(+), 47 deletions(-) diff --git a/msal/authority.py b/msal/authority.py index dbc91399..d523e784 100644 --- a/msal/authority.py +++ b/msal/authority.py @@ -21,6 +21,26 @@ 'login-us.microsoftonline.com', AZURE_US_GOVERNMENT, ]) + +# Trusted issuer hosts for OIDC issuer validation +# Includes all well-known Microsoft identity provider hosts and national clouds +TRUSTED_ISSUER_HOSTS = frozenset([ + # Global/Public cloud + "login.microsoftonline.com", + "login.microsoft.com", + "login.windows.net", + "sts.windows.net", + # China cloud + "login.chinacloudapi.cn", + "login.partner.microsoftonline.cn", + # Germany cloud (legacy) + "login.microsoftonline.de", + # US Government clouds + "login.microsoftonline.us", + "login.usgovcloudapi.net", + "login-us.microsoftonline.com", +]) + WELL_KNOWN_B2C_HOSTS = [ "b2clogin.com", "b2clogin.cn", @@ -186,47 +206,59 @@ def user_realm_discovery(self, username, correlation_id=None, response=None): self.__class__._domains_without_user_realm_discovery.add(self.instance) return {} # This can guide the caller to fall back normal ROPC flow - def has_valid_issuer(self) -> bool: + def has_valid_issuer(self): """ - Returns True if the issuer from OIDC discovery is valid for this authority. - - An issuer is valid if one of the following is true: - - It exactly matches the authority URL - - It has the same scheme and host as the authority (path can be different) - - For CIAM, the issuer follows the pattern of {tenant}.ciamlogin.com (tenant comes from the authority) - """ - if self._oidc_authority_url == self._issuer: - # The issuer matches the authority URL exactly - return True + Returns True if the issuer from OIDC discovery is valid for this authority. - issuer = urlparse(self._issuer) if self._issuer else None - if not issuer: + An issuer is valid if one of the following is true: + - It exactly matches the authority URL (with/without trailing slash) + - It has the same scheme and host as the authority (path can be different) + - The issuer host is a well-known Microsoft authority host + - The issuer host is a regional variant of a well-known host (e.g., westus2.login.microsoft.com) + - For CIAM, the issuer follows the pattern of {tenant}.ciamlogin.com + """ + if not self._issuer or not self._oidc_authority_url: return False - # Check if issuer has the same scheme and host as the authority - authority = urlparse(self._oidc_authority_url) - if authority.scheme == issuer.scheme and authority.netloc == issuer.netloc: + # Case 1: Exact match (most common case, normalized for trailing slashes) + if self._issuer.rstrip("/") == self._oidc_authority_url.rstrip("/"): return True - # Check CIAM scenario: issuer follows the pattern {tenant}.ciamlogin.com - # Extract tenant from authority URL - could be in the host or path - tenant = None - if authority.hostname.endswith(_CIAM_DOMAIN_SUFFIX): - # Normal CIAM host: {tenant}.ciamlogin.com - tenant = authority.hostname.rsplit(_CIAM_DOMAIN_SUFFIX, 1)[0] - else: - # Custom CIAM host: extract tenant from path - parts = authority.path.split('/') - if len(parts) >= 2 and parts[1]: - tenant = parts[1] # First path segment after the initial '/' + issuer_parsed = urlparse(self._issuer) + authority_parsed = urlparse(self._oidc_authority_url) + issuer_host = issuer_parsed.hostname.lower() if issuer_parsed.hostname else None - if tenant and issuer.hostname.endswith(_CIAM_DOMAIN_SUFFIX): - # Check if issuer follows the pattern {tenant}.ciamlogin.com - expected_issuer_host = f"{tenant}{_CIAM_DOMAIN_SUFFIX}" - if issuer.hostname == expected_issuer_host: + if not issuer_host: + return False + + # Case 2: Issuer is from a trusted Microsoft host - O(1) lookup + if issuer_host in TRUSTED_ISSUER_HOSTS: + return True + + # Case 3: Regional variant check - O(1) lookup + # e.g., westus2.login.microsoft.com -> extract "login.microsoft.com" + dot_index = issuer_host.find(".") + if dot_index > 0: + potential_base = issuer_host[dot_index + 1:] + if potential_base in TRUSTED_ISSUER_HOSTS and "." not in issuer_host[:dot_index]: return True - # None of the conditions matched + # Case 4: Same scheme and host (path can differ) + if (authority_parsed.scheme == issuer_parsed.scheme and + authority_parsed.netloc == issuer_parsed.netloc): + return True + + # Case 5: CIAM scenario - issuer follows pattern {tenant}.ciamlogin.com + if issuer_host.endswith(_CIAM_DOMAIN_SUFFIX): + authority_host = authority_parsed.hostname.lower() if authority_parsed.hostname else "" + if authority_host.endswith(_CIAM_DOMAIN_SUFFIX): + tenant = authority_host[:-len(_CIAM_DOMAIN_SUFFIX)] + else: + parts = authority_parsed.path.split('/') + tenant = parts[1] if len(parts) >= 2 and parts[1] else None + + if tenant and issuer_host == tenant + _CIAM_DOMAIN_SUFFIX: + return True return False def canonicalize(authority_or_auth_endpoint): diff --git a/tests/test_authority.py b/tests/test_authority.py index 4b30f3b5..13bd2dcb 100644 --- a/tests/test_authority.py +++ b/tests/test_authority.py @@ -6,7 +6,7 @@ import msal from msal.authority import * -from msal.authority import _CIAM_DOMAIN_SUFFIX # Explicitly import the private constant +from msal.authority import _CIAM_DOMAIN_SUFFIX, TRUSTED_ISSUER_HOSTS # Explicitly import private/new constants from tests import unittest from tests.http_client import MinimalHttpClient @@ -372,9 +372,19 @@ def test_invalid_issuer(self, tenant_discovery_mock): @patch("msal.authority.tenant_discovery") def test_custom_authority_with_microsoft_issuer(self, tenant_discovery_mock): - """Test when custom authority is used with a known Microsoft issuer (should fail)""" + """Test when custom authority is used with a known Microsoft issuer (should succeed)""" authority_url = "https://custom-domain.com/tenant" issuer = f"https://{WORLD_WIDE}/tenant" + authority = self._create_authority_with_issuer(authority_url, issuer, tenant_discovery_mock) + self.assertTrue(authority.has_valid_issuer(), + "Issuer from trusted Microsoft host should be valid even with custom authority") + + @patch("msal.authority.tenant_discovery") + def test_known_authority_with_non_matching_issuer(self, tenant_discovery_mock): + """Test when known authority is used with an issuer that doesn't match (should fail)""" + # Known Microsoft authority URLs + authority_url = f"https://{WORLD_WIDE}/tenant" + issuer = "https://custom-domain.com/tenant" tenant_discovery_mock.return_value = { "authorization_endpoint": "https://example.com/oauth2/authorize", @@ -382,8 +392,8 @@ def test_custom_authority_with_microsoft_issuer(self, tenant_discovery_mock): "issuer": issuer, } - # Since initialization now checks for valid issuer and we removed the check for known hosts, - # we expect it to raise ValueError because the hosts don't match + # We expect it to raise ValueError because the paths don't match + # and we're now checking for exact matches with self.assertRaises(ValueError) as context: Authority(None, self.http_client, oidc_authority_url=authority_url) @@ -391,24 +401,93 @@ def test_custom_authority_with_microsoft_issuer(self, tenant_discovery_mock): self.assertIn(issuer, str(context.exception)) self.assertIn(authority_url, str(context.exception)) + # Regional pattern tests @patch("msal.authority.tenant_discovery") - def test_known_authority_with_non_matching_issuer(self, tenant_discovery_mock): - """Test when known authority is used with an issuer that doesn't match (should fail)""" - # Known Microsoft authority URLs - authority_url = f"https://{WORLD_WIDE}/tenant" - issuer = "https://custom-domain.com/tenant" + def test_regional_issuer_westus2_login_microsoft(self, tenant_discovery_mock): + """Test regional variant: westus2.login.microsoft.com""" + authority_url = "https://custom-authority.com/tenant" + issuer = "https://westus2.login.microsoftonline.com/tenant" + authority = self._create_authority_with_issuer(authority_url, issuer, tenant_discovery_mock) + self.assertTrue(authority.has_valid_issuer(), + "Regional issuer westus2.login.microsoftonline.com should be valid") + @patch("msal.authority.tenant_discovery") + def test_regional_issuer_eastus_login_microsoftonline(self, tenant_discovery_mock): + """Test regional variant: eastus.login.microsoftonline.com""" + authority_url = "https://custom-authority.com/tenant" + issuer = "https://eastus.login.microsoftonline.com/tenant" + authority = self._create_authority_with_issuer(authority_url, issuer, tenant_discovery_mock) + self.assertTrue(authority.has_valid_issuer(), + "Regional issuer eastus.login.microsoftonline.com should be valid") + + @patch("msal.authority.tenant_discovery") + def test_regional_issuer_for_china_cloud(self, tenant_discovery_mock): + """Test regional variant for China cloud: region.login.chinacloudapi.cn""" + authority_url = "https://custom-authority.com/tenant" + issuer = "https://chinanorth.login.chinacloudapi.cn/tenant" + authority = self._create_authority_with_issuer(authority_url, issuer, tenant_discovery_mock) + self.assertTrue(authority.has_valid_issuer(), + "Regional issuer for China cloud should be valid") + + @patch("msal.authority.tenant_discovery") + def test_regional_issuer_for_us_government(self, tenant_discovery_mock): + """Test regional variant for US Government: region.login.microsoftonline.us""" + authority_url = "https://custom-authority.com/tenant" + issuer = "https://usgovvirginia.login.microsoftonline.us/tenant" + authority = self._create_authority_with_issuer(authority_url, issuer, tenant_discovery_mock) + self.assertTrue(authority.has_valid_issuer(), + "Regional issuer for US Government should be valid") + + @patch("msal.authority.tenant_discovery") + def test_invalid_regional_pattern_with_dots_in_region(self, tenant_discovery_mock): + """Test that region with dots is rejected: west.us.2.login.microsoftonline.com""" + authority_url = "https://custom-authority.com/tenant" + issuer = "https://west.us.2.login.microsoftonline.com/tenant" tenant_discovery_mock.return_value = { "authorization_endpoint": "https://example.com/oauth2/authorize", "token_endpoint": "https://example.com/oauth2/token", "issuer": issuer, } + with self.assertRaises(ValueError): + Authority(None, self.http_client, oidc_authority_url=authority_url) - # We expect it to raise ValueError because the paths don't match - # and we're now checking for exact matches - with self.assertRaises(ValueError) as context: + @patch("msal.authority.tenant_discovery") + def test_invalid_regional_pattern_untrusted_base(self, tenant_discovery_mock): + """Test that regional pattern with untrusted base is rejected""" + authority_url = "https://custom-authority.com/tenant" + issuer = "https://westus2.login.evil.com/tenant" # evil.com is not trusted + tenant_discovery_mock.return_value = { + "authorization_endpoint": "https://example.com/oauth2/authorize", + "token_endpoint": "https://example.com/oauth2/token", + "issuer": issuer, + } + with self.assertRaises(ValueError): Authority(None, self.http_client, oidc_authority_url=authority_url) - self.assertIn("issuer", str(context.exception).lower()) - self.assertIn(issuer, str(context.exception)) - self.assertIn(authority_url, str(context.exception)) + @patch("msal.authority.tenant_discovery") + def test_well_known_host_issuer_directly(self, tenant_discovery_mock): + """Test issuer from well-known Microsoft host directly (not regional)""" + authority_url = "https://custom-authority.com/tenant" + issuer = f"https://{WORLD_WIDE}/tenant" + authority = self._create_authority_with_issuer(authority_url, issuer, tenant_discovery_mock) + self.assertTrue(authority.has_valid_issuer(), + "Issuer from well-known Microsoft host should be valid") + + @patch("msal.authority.tenant_discovery") + def test_issuer_with_trailing_slash_match(self, tenant_discovery_mock): + """Test issuer validation handles trailing slashes""" + authority_url = "https://example.com/tenant/" + issuer = "https://example.com/tenant" # No trailing slash + authority = self._create_authority_with_issuer(authority_url, issuer, tenant_discovery_mock) + self.assertTrue(authority.has_valid_issuer(), + "Trailing slash difference should not affect exact match") + + @patch("msal.authority.tenant_discovery") + def test_issuer_case_sensitivity_host(self, tenant_discovery_mock): + """Test that host comparison is case-insensitive for regional check""" + authority_url = "https://custom-authority.com/tenant" + issuer = "https://WESTUS2.LOGIN.MICROSOFTONLINE.COM/tenant" # Uppercase + authority = self._create_authority_with_issuer(authority_url, issuer, tenant_discovery_mock) + self.assertTrue(authority.has_valid_issuer(), + "Host comparison should be case-insensitive") + From aeeaf3e586152a8af793f8d41fa98bfe1434f955 Mon Sep 17 00:00:00 2001 From: Nilesh Choudhary Date: Thu, 18 Dec 2025 17:11:33 +0000 Subject: [PATCH 4/4] Update test_authority.py --- tests/test_authority.py | 51 ++++++++++++++++++++++++++++++++--------- 1 file changed, 40 insertions(+), 11 deletions(-) diff --git a/tests/test_authority.py b/tests/test_authority.py index 13bd2dcb..96b9ceb2 100644 --- a/tests/test_authority.py +++ b/tests/test_authority.py @@ -149,22 +149,51 @@ def test_application_obj_should_do_oidc_discovery_and_skip_instance_discovery( self.assertEqual(app.authority.token_endpoint, self.token_endpoint) -class DstsAuthorityTestCase(OidcAuthorityTestCase): - # Inherits OidcAuthority's test cases and run them with a dSTS authority - authority = ( # dSTS is single tenanted with a tenant placeholder - 'https://test-instance1-dsts.dsts.core.azure-test.net/dstsv2/common') - authorization_endpoint = ( - "https://some.url.dsts.core.azure-test.net/dstsv2/common/oauth2/authorize") - token_endpoint = ( - "https://some.url.dsts.core.azure-test.net/dstsv2/common/oauth2/token") +@patch("msal.authority._instance_discovery") +@patch("msal.authority.tenant_discovery") +class DstsAuthorityTestCase(unittest.TestCase): + # Standalone test class for dSTS authority (not inheriting to avoid decorator stacking) + authority = 'https://test-instance1-dsts.dsts.core.azure-test.net/dstsv2/common' + authorization_endpoint = "https://some.url.dsts.core.azure-test.net/dstsv2/common/oauth2/authorize" + token_endpoint = "https://some.url.dsts.core.azure-test.net/dstsv2/common/oauth2/token" issuer = "https://test-instance1-dsts.dsts.core.azure-test.net/dstsv2/common" - @patch("msal.authority._instance_discovery") - @patch("msal.authority.tenant_discovery") # Remove the hard-coded return_value - def test_application_obj_should_accept_dsts_url_as_an_authority( + def setUp(self): + self.oidc_discovery_endpoint = self.authority + "/.well-known/openid-configuration" + + def setup_tenant_discovery(self, tenant_discovery): + """Configure the tenant_discovery mock with class-specific values""" + tenant_discovery.return_value = { + "authorization_endpoint": self.authorization_endpoint, + "token_endpoint": self.token_endpoint, + "issuer": self.issuer, + } + + def test_authority_obj_should_do_oidc_discovery_and_skip_instance_discovery( + self, oidc_discovery, instance_discovery): + self.setup_tenant_discovery(oidc_discovery) + c = MinimalHttpClient() + a = Authority(None, c, oidc_authority_url=self.authority) + instance_discovery.assert_not_called() + oidc_discovery.assert_called_once_with(self.oidc_discovery_endpoint, c) + self.assertEqual(a.authorization_endpoint, self.authorization_endpoint) + self.assertEqual(a.token_endpoint, self.token_endpoint) + + def test_application_obj_should_do_oidc_discovery_and_skip_instance_discovery( self, oidc_discovery, instance_discovery): self.setup_tenant_discovery(oidc_discovery) + app = msal.ClientApplication( + "id", authority=None, oidc_authority=self.authority) + instance_discovery.assert_not_called() + oidc_discovery.assert_called_once_with( + self.oidc_discovery_endpoint, app.http_client) + self.assertEqual( + app.authority.authorization_endpoint, self.authorization_endpoint) + self.assertEqual(app.authority.token_endpoint, self.token_endpoint) + def test_application_obj_should_accept_dsts_url_as_an_authority( + self, oidc_discovery, instance_discovery): + self.setup_tenant_discovery(oidc_discovery) app = msal.ClientApplication("id", authority=self.authority) instance_discovery.assert_not_called() oidc_discovery.assert_called_once_with(