diff --git a/msal/authority.py b/msal/authority.py index faf11603..d523e784 100644 --- a/msal/authority.py +++ b/msal/authority.py @@ -21,6 +21,26 @@ 'login-us.microsoftonline.com', AZURE_US_GOVERNMENT, ]) + +# Trusted issuer hosts for OIDC issuer validation +# Includes all well-known Microsoft identity provider hosts and national clouds +TRUSTED_ISSUER_HOSTS = frozenset([ + # Global/Public cloud + "login.microsoftonline.com", + "login.microsoft.com", + "login.windows.net", + "sts.windows.net", + # China cloud + "login.chinacloudapi.cn", + "login.partner.microsoftonline.cn", + # Germany cloud (legacy) + "login.microsoftonline.de", + # US Government clouds + "login.microsoftonline.us", + "login.usgovcloudapi.net", + "login-us.microsoftonline.com", +]) + WELL_KNOWN_B2C_HOSTS = [ "b2clogin.com", "b2clogin.cn", @@ -67,6 +87,7 @@ def __init__( performed. """ self._http_client = http_client + self._oidc_authority_url = oidc_authority_url if oidc_authority_url: logger.debug("Initializing with OIDC authority: %s", oidc_authority_url) tenant_discovery_endpoint = self._initialize_oidc_authority( @@ -95,11 +116,22 @@ def __init__( raise ValueError(error_message) logger.debug( 'openid_config("%s") = %s', tenant_discovery_endpoint, openid_config) + self._issuer = openid_config.get('issuer') self.authorization_endpoint = openid_config['authorization_endpoint'] self.token_endpoint = openid_config['token_endpoint'] self.device_authorization_endpoint = openid_config.get('device_authorization_endpoint') _, _, self.tenant = canonicalize(self.token_endpoint) # Usually a GUID + # Validate the issuer if using OIDC authority + if self._oidc_authority_url and not self.has_valid_issuer(): + raise ValueError(( + "The issuer '{iss}' does not match the authority '{auth}' or a known pattern. " + "When using the 'oidc_authority' parameter in ClientApplication, the authority " + "will be validated against the issuer from {auth}/.well-known/openid-configuration ." + "If using a known Entra authority (e.g. login.microsoftonline.com) the " + "'authority' parameter should be used instead of 'oidc_authority'. " + "" + ).format(iss=self._issuer, auth=oidc_authority_url)) def _initialize_oidc_authority(self, oidc_authority_url): authority, self.instance, tenant = canonicalize(oidc_authority_url) self.is_adfs = tenant.lower() == 'adfs' # As a convention @@ -174,6 +206,60 @@ def user_realm_discovery(self, username, correlation_id=None, response=None): self.__class__._domains_without_user_realm_discovery.add(self.instance) return {} # This can guide the caller to fall back normal ROPC flow + def has_valid_issuer(self): + """ + Returns True if the issuer from OIDC discovery is valid for this authority. + + An issuer is valid if one of the following is true: + - It exactly matches the authority URL (with/without trailing slash) + - It has the same scheme and host as the authority (path can be different) + - The issuer host is a well-known Microsoft authority host + - The issuer host is a regional variant of a well-known host (e.g., westus2.login.microsoft.com) + - For CIAM, the issuer follows the pattern of {tenant}.ciamlogin.com + """ + if not self._issuer or not self._oidc_authority_url: + return False + + # Case 1: Exact match (most common case, normalized for trailing slashes) + if self._issuer.rstrip("/") == self._oidc_authority_url.rstrip("/"): + return True + + issuer_parsed = urlparse(self._issuer) + authority_parsed = urlparse(self._oidc_authority_url) + issuer_host = issuer_parsed.hostname.lower() if issuer_parsed.hostname else None + + if not issuer_host: + return False + + # Case 2: Issuer is from a trusted Microsoft host - O(1) lookup + if issuer_host in TRUSTED_ISSUER_HOSTS: + return True + + # Case 3: Regional variant check - O(1) lookup + # e.g., westus2.login.microsoft.com -> extract "login.microsoft.com" + dot_index = issuer_host.find(".") + if dot_index > 0: + potential_base = issuer_host[dot_index + 1:] + if potential_base in TRUSTED_ISSUER_HOSTS and "." not in issuer_host[:dot_index]: + return True + + # Case 4: Same scheme and host (path can differ) + if (authority_parsed.scheme == issuer_parsed.scheme and + authority_parsed.netloc == issuer_parsed.netloc): + return True + + # Case 5: CIAM scenario - issuer follows pattern {tenant}.ciamlogin.com + if issuer_host.endswith(_CIAM_DOMAIN_SUFFIX): + authority_host = authority_parsed.hostname.lower() if authority_parsed.hostname else "" + if authority_host.endswith(_CIAM_DOMAIN_SUFFIX): + tenant = authority_host[:-len(_CIAM_DOMAIN_SUFFIX)] + else: + parts = authority_parsed.path.split('/') + tenant = parts[1] if len(parts) >= 2 and parts[1] else None + + if tenant and issuer_host == tenant + _CIAM_DOMAIN_SUFFIX: + return True + return False def canonicalize(authority_or_auth_endpoint): # Returns (url_parsed_result, hostname_in_lowercase, tenant) @@ -222,4 +308,3 @@ def tenant_discovery(tenant_discovery_endpoint, http_client, **kwargs): resp.raise_for_status() raise RuntimeError( # A fallback here, in case resp.raise_for_status() is no-op "Unable to complete OIDC Discovery: %d, %s" % (resp.status_code, resp.text)) - diff --git a/tests/test_application.py b/tests/test_application.py index 16e512c4..84e562d9 100644 --- a/tests/test_application.py +++ b/tests/test_application.py @@ -796,6 +796,7 @@ def test_should_fallback_when_pymsalruntime_failed_to_initialize_broker(self): @patch("msal.authority.tenant_discovery", new=Mock(return_value={ "authorization_endpoint": "https://contoso.com/placeholder", "token_endpoint": "https://contoso.com/placeholder", + "issuer": "https://contoso.com/placeholder", })) @patch("msal.application._init_broker", new=Mock()) # Pretend pymsalruntime installed and working class TestBrokerFallbackWithDifferentAuthorities(unittest.TestCase): diff --git a/tests/test_authority.py b/tests/test_authority.py index 3fd1fce1..96b9ceb2 100644 --- a/tests/test_authority.py +++ b/tests/test_authority.py @@ -6,6 +6,7 @@ import msal from msal.authority import * +from msal.authority import _CIAM_DOMAIN_SUFFIX, TRUSTED_ISSUER_HOSTS # Explicitly import private/new constants from tests import unittest from tests.http_client import MinimalHttpClient @@ -100,12 +101,12 @@ def test_authority_with_path_should_be_used_as_is(self, oidc_discovery): @patch("msal.authority._instance_discovery") -@patch("msal.authority.tenant_discovery", return_value={ - "authorization_endpoint": "https://contoso.com/authorize", - "token_endpoint": "https://contoso.com/token", - }) +@patch("msal.authority.tenant_discovery") # Moved return_value out of the decorator class OidcAuthorityTestCase(unittest.TestCase): authority = "https://contoso.com/tenant" + authorization_endpoint = "https://contoso.com/authorize" + token_endpoint = "https://contoso.com/token" + issuer = "https://contoso.com/tenant" # Added as class variable for inheritance def setUp(self): # setUp() gives subclass a dynamic setup based on their authority @@ -115,43 +116,84 @@ def setUp(self): # Here the test is to confirm the OIDC endpoint contains no "/v2.0" self.authority + "/.well-known/openid-configuration") + def setup_tenant_discovery(self, tenant_discovery): + """Configure the tenant_discovery mock with class-specific values""" + tenant_discovery.return_value = { + "authorization_endpoint": self.authorization_endpoint, + "token_endpoint": self.token_endpoint, + "issuer": self.issuer, + } + def test_authority_obj_should_do_oidc_discovery_and_skip_instance_discovery( self, oidc_discovery, instance_discovery): + self.setup_tenant_discovery(oidc_discovery) + c = MinimalHttpClient() a = Authority(None, c, oidc_authority_url=self.authority) instance_discovery.assert_not_called() oidc_discovery.assert_called_once_with(self.oidc_discovery_endpoint, c) - self.assertEqual(a.authorization_endpoint, 'https://contoso.com/authorize') - self.assertEqual(a.token_endpoint, 'https://contoso.com/token') + self.assertEqual(a.authorization_endpoint, self.authorization_endpoint) + self.assertEqual(a.token_endpoint, self.token_endpoint) def test_application_obj_should_do_oidc_discovery_and_skip_instance_discovery( self, oidc_discovery, instance_discovery): + self.setup_tenant_discovery(oidc_discovery) + app = msal.ClientApplication( "id", authority=None, oidc_authority=self.authority) instance_discovery.assert_not_called() oidc_discovery.assert_called_once_with( self.oidc_discovery_endpoint, app.http_client) self.assertEqual( - app.authority.authorization_endpoint, 'https://contoso.com/authorize') - self.assertEqual(app.authority.token_endpoint, 'https://contoso.com/token') - - -class DstsAuthorityTestCase(OidcAuthorityTestCase): - # Inherits OidcAuthority's test cases and run them with a dSTS authority - authority = ( # dSTS is single tenanted with a tenant placeholder - 'https://test-instance1-dsts.dsts.core.azure-test.net/dstsv2/common') - authorization_endpoint = ( - "https://some.url.dsts.core.azure-test.net/dstsv2/common/oauth2/authorize") - token_endpoint = ( - "https://some.url.dsts.core.azure-test.net/dstsv2/common/oauth2/token") - - @patch("msal.authority._instance_discovery") - @patch("msal.authority.tenant_discovery", return_value={ - "authorization_endpoint": authorization_endpoint, - "token_endpoint": token_endpoint, - }) # We need to create new patches (i.e. mocks) for non-inherited test cases + app.authority.authorization_endpoint, self.authorization_endpoint) + self.assertEqual(app.authority.token_endpoint, self.token_endpoint) + + +@patch("msal.authority._instance_discovery") +@patch("msal.authority.tenant_discovery") +class DstsAuthorityTestCase(unittest.TestCase): + # Standalone test class for dSTS authority (not inheriting to avoid decorator stacking) + authority = 'https://test-instance1-dsts.dsts.core.azure-test.net/dstsv2/common' + authorization_endpoint = "https://some.url.dsts.core.azure-test.net/dstsv2/common/oauth2/authorize" + token_endpoint = "https://some.url.dsts.core.azure-test.net/dstsv2/common/oauth2/token" + issuer = "https://test-instance1-dsts.dsts.core.azure-test.net/dstsv2/common" + + def setUp(self): + self.oidc_discovery_endpoint = self.authority + "/.well-known/openid-configuration" + + def setup_tenant_discovery(self, tenant_discovery): + """Configure the tenant_discovery mock with class-specific values""" + tenant_discovery.return_value = { + "authorization_endpoint": self.authorization_endpoint, + "token_endpoint": self.token_endpoint, + "issuer": self.issuer, + } + + def test_authority_obj_should_do_oidc_discovery_and_skip_instance_discovery( + self, oidc_discovery, instance_discovery): + self.setup_tenant_discovery(oidc_discovery) + c = MinimalHttpClient() + a = Authority(None, c, oidc_authority_url=self.authority) + instance_discovery.assert_not_called() + oidc_discovery.assert_called_once_with(self.oidc_discovery_endpoint, c) + self.assertEqual(a.authorization_endpoint, self.authorization_endpoint) + self.assertEqual(a.token_endpoint, self.token_endpoint) + + def test_application_obj_should_do_oidc_discovery_and_skip_instance_discovery( + self, oidc_discovery, instance_discovery): + self.setup_tenant_discovery(oidc_discovery) + app = msal.ClientApplication( + "id", authority=None, oidc_authority=self.authority) + instance_discovery.assert_not_called() + oidc_discovery.assert_called_once_with( + self.oidc_discovery_endpoint, app.http_client) + self.assertEqual( + app.authority.authorization_endpoint, self.authorization_endpoint) + self.assertEqual(app.authority.token_endpoint, self.token_endpoint) + def test_application_obj_should_accept_dsts_url_as_an_authority( self, oidc_discovery, instance_discovery): + self.setup_tenant_discovery(oidc_discovery) app = msal.ClientApplication("id", authority=self.authority) instance_discovery.assert_not_called() oidc_discovery.assert_called_once_with( @@ -274,3 +316,207 @@ def _test_turning_off_instance_discovery_should_skip_authority_validation_and_in app.get_accounts() # This could make an instance metadata call for authority aliases instance_metadata.assert_not_called() + +class TestAuthorityIssuerValidation(unittest.TestCase): + """Test cases for authority.has_valid_issuer method """ + + def setUp(self): + self.http_client = MinimalHttpClient() + + def _create_authority_with_issuer(self, oidc_authority_url, issuer, tenant_discovery_mock): + tenant_discovery_mock.return_value = { + "authorization_endpoint": "https://example.com/oauth2/authorize", + "token_endpoint": "https://example.com/oauth2/token", + "issuer": issuer, + } + authority = Authority( + None, + self.http_client, + oidc_authority_url=oidc_authority_url + ) + return authority + + @patch("msal.authority.tenant_discovery") + def test_exact_match_issuer(self, tenant_discovery_mock): + """Test when issuer exactly matches the OIDC authority URL""" + authority_url = "https://example.com/tenant" + authority = self._create_authority_with_issuer(authority_url, authority_url, tenant_discovery_mock) + self.assertTrue(authority.has_valid_issuer(), "Issuer should be valid when it exactly matches the authority URL") + + @patch("msal.authority.tenant_discovery") + def test_no_issuer(self, tenant_discovery_mock): + """Test when no issuer is returned from OIDC discovery""" + authority_url = "https://example.com/tenant" + tenant_discovery_mock.return_value = { + "authorization_endpoint": "https://example.com/oauth2/authorize", + "token_endpoint": "https://example.com/oauth2/token", + # No issuer key + } + # Since initialization now checks for valid issuer, we expect it to raise ValueError + with self.assertRaises(ValueError) as context: + Authority(None, self.http_client, oidc_authority_url=authority_url) + self.assertIn("issuer", str(context.exception).lower()) + + @patch("msal.authority.tenant_discovery") + def test_same_scheme_and_host_different_path(self, tenant_discovery_mock): + """Test when issuer has same scheme and host but different path""" + authority_url = "https://example.com/tenant" + issuer = "https://example.com/different/path" + authority = self._create_authority_with_issuer(authority_url, issuer, tenant_discovery_mock) + self.assertTrue(authority.has_valid_issuer(), "Issuer should be valid when it has the same scheme and host") + + @patch("msal.authority.tenant_discovery") + def test_ciam_authority_with_matching_tenant(self, tenant_discovery_mock): + """Test CIAM authority with matching tenant in path""" + authority_url = "https://custom-domain.com/tenant_name" + issuer = f"https://tenant_name{_CIAM_DOMAIN_SUFFIX}" + authority = self._create_authority_with_issuer(authority_url, issuer, tenant_discovery_mock) + self.assertTrue(authority.has_valid_issuer(), "Issuer should be valid for CIAM pattern with matching tenant") + + @patch("msal.authority.tenant_discovery") + def test_ciam_authority_with_host_tenant(self, tenant_discovery_mock): + """Test CIAM authority with tenant in hostname""" + tenant_name = "tenant_name" + authority_url = f"https://{tenant_name}{_CIAM_DOMAIN_SUFFIX}/custom/path" + issuer = f"https://{tenant_name}{_CIAM_DOMAIN_SUFFIX}" + authority = self._create_authority_with_issuer(authority_url, issuer, tenant_discovery_mock) + self.assertTrue(authority.has_valid_issuer(), "Issuer should be valid for CIAM pattern with tenant in hostname") + + @patch("msal.authority.tenant_discovery") + def test_invalid_issuer(self, tenant_discovery_mock): + """Test when issuer is completely different from authority""" + authority_url = "https://example.com/tenant" + issuer = "https://malicious-site.com/tenant" + tenant_discovery_mock.return_value = { + "authorization_endpoint": "https://example.com/oauth2/authorize", + "token_endpoint": "https://example.com/oauth2/token", + "issuer": issuer, + } + # Since initialization now checks for valid issuer, we expect it to raise ValueError + with self.assertRaises(ValueError) as context: + Authority(None, self.http_client, oidc_authority_url=authority_url) + self.assertIn("issuer", str(context.exception).lower()) + self.assertIn(issuer, str(context.exception)) + self.assertIn(authority_url, str(context.exception)) + + @patch("msal.authority.tenant_discovery") + def test_custom_authority_with_microsoft_issuer(self, tenant_discovery_mock): + """Test when custom authority is used with a known Microsoft issuer (should succeed)""" + authority_url = "https://custom-domain.com/tenant" + issuer = f"https://{WORLD_WIDE}/tenant" + authority = self._create_authority_with_issuer(authority_url, issuer, tenant_discovery_mock) + self.assertTrue(authority.has_valid_issuer(), + "Issuer from trusted Microsoft host should be valid even with custom authority") + + @patch("msal.authority.tenant_discovery") + def test_known_authority_with_non_matching_issuer(self, tenant_discovery_mock): + """Test when known authority is used with an issuer that doesn't match (should fail)""" + # Known Microsoft authority URLs + authority_url = f"https://{WORLD_WIDE}/tenant" + issuer = "https://custom-domain.com/tenant" + + tenant_discovery_mock.return_value = { + "authorization_endpoint": "https://example.com/oauth2/authorize", + "token_endpoint": "https://example.com/oauth2/token", + "issuer": issuer, + } + + # We expect it to raise ValueError because the paths don't match + # and we're now checking for exact matches + with self.assertRaises(ValueError) as context: + Authority(None, self.http_client, oidc_authority_url=authority_url) + + self.assertIn("issuer", str(context.exception).lower()) + self.assertIn(issuer, str(context.exception)) + self.assertIn(authority_url, str(context.exception)) + + # Regional pattern tests + @patch("msal.authority.tenant_discovery") + def test_regional_issuer_westus2_login_microsoft(self, tenant_discovery_mock): + """Test regional variant: westus2.login.microsoft.com""" + authority_url = "https://custom-authority.com/tenant" + issuer = "https://westus2.login.microsoftonline.com/tenant" + authority = self._create_authority_with_issuer(authority_url, issuer, tenant_discovery_mock) + self.assertTrue(authority.has_valid_issuer(), + "Regional issuer westus2.login.microsoftonline.com should be valid") + + @patch("msal.authority.tenant_discovery") + def test_regional_issuer_eastus_login_microsoftonline(self, tenant_discovery_mock): + """Test regional variant: eastus.login.microsoftonline.com""" + authority_url = "https://custom-authority.com/tenant" + issuer = "https://eastus.login.microsoftonline.com/tenant" + authority = self._create_authority_with_issuer(authority_url, issuer, tenant_discovery_mock) + self.assertTrue(authority.has_valid_issuer(), + "Regional issuer eastus.login.microsoftonline.com should be valid") + + @patch("msal.authority.tenant_discovery") + def test_regional_issuer_for_china_cloud(self, tenant_discovery_mock): + """Test regional variant for China cloud: region.login.chinacloudapi.cn""" + authority_url = "https://custom-authority.com/tenant" + issuer = "https://chinanorth.login.chinacloudapi.cn/tenant" + authority = self._create_authority_with_issuer(authority_url, issuer, tenant_discovery_mock) + self.assertTrue(authority.has_valid_issuer(), + "Regional issuer for China cloud should be valid") + + @patch("msal.authority.tenant_discovery") + def test_regional_issuer_for_us_government(self, tenant_discovery_mock): + """Test regional variant for US Government: region.login.microsoftonline.us""" + authority_url = "https://custom-authority.com/tenant" + issuer = "https://usgovvirginia.login.microsoftonline.us/tenant" + authority = self._create_authority_with_issuer(authority_url, issuer, tenant_discovery_mock) + self.assertTrue(authority.has_valid_issuer(), + "Regional issuer for US Government should be valid") + + @patch("msal.authority.tenant_discovery") + def test_invalid_regional_pattern_with_dots_in_region(self, tenant_discovery_mock): + """Test that region with dots is rejected: west.us.2.login.microsoftonline.com""" + authority_url = "https://custom-authority.com/tenant" + issuer = "https://west.us.2.login.microsoftonline.com/tenant" + tenant_discovery_mock.return_value = { + "authorization_endpoint": "https://example.com/oauth2/authorize", + "token_endpoint": "https://example.com/oauth2/token", + "issuer": issuer, + } + with self.assertRaises(ValueError): + Authority(None, self.http_client, oidc_authority_url=authority_url) + + @patch("msal.authority.tenant_discovery") + def test_invalid_regional_pattern_untrusted_base(self, tenant_discovery_mock): + """Test that regional pattern with untrusted base is rejected""" + authority_url = "https://custom-authority.com/tenant" + issuer = "https://westus2.login.evil.com/tenant" # evil.com is not trusted + tenant_discovery_mock.return_value = { + "authorization_endpoint": "https://example.com/oauth2/authorize", + "token_endpoint": "https://example.com/oauth2/token", + "issuer": issuer, + } + with self.assertRaises(ValueError): + Authority(None, self.http_client, oidc_authority_url=authority_url) + + @patch("msal.authority.tenant_discovery") + def test_well_known_host_issuer_directly(self, tenant_discovery_mock): + """Test issuer from well-known Microsoft host directly (not regional)""" + authority_url = "https://custom-authority.com/tenant" + issuer = f"https://{WORLD_WIDE}/tenant" + authority = self._create_authority_with_issuer(authority_url, issuer, tenant_discovery_mock) + self.assertTrue(authority.has_valid_issuer(), + "Issuer from well-known Microsoft host should be valid") + + @patch("msal.authority.tenant_discovery") + def test_issuer_with_trailing_slash_match(self, tenant_discovery_mock): + """Test issuer validation handles trailing slashes""" + authority_url = "https://example.com/tenant/" + issuer = "https://example.com/tenant" # No trailing slash + authority = self._create_authority_with_issuer(authority_url, issuer, tenant_discovery_mock) + self.assertTrue(authority.has_valid_issuer(), + "Trailing slash difference should not affect exact match") + + @patch("msal.authority.tenant_discovery") + def test_issuer_case_sensitivity_host(self, tenant_discovery_mock): + """Test that host comparison is case-insensitive for regional check""" + authority_url = "https://custom-authority.com/tenant" + issuer = "https://WESTUS2.LOGIN.MICROSOFTONLINE.COM/tenant" # Uppercase + authority = self._create_authority_with_issuer(authority_url, issuer, tenant_discovery_mock) + self.assertTrue(authority.has_valid_issuer(), + "Host comparison should be case-insensitive") +