Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 86 additions & 1 deletion msal/authority.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,26 @@
'login-us.microsoftonline.com',
AZURE_US_GOVERNMENT,
])

# Trusted issuer hosts for OIDC issuer validation
# Includes all well-known Microsoft identity provider hosts and national clouds
TRUSTED_ISSUER_HOSTS = frozenset([
# Global/Public cloud
"login.microsoftonline.com",
"login.microsoft.com",
"login.windows.net",
"sts.windows.net",
# China cloud
"login.chinacloudapi.cn",
"login.partner.microsoftonline.cn",
# Germany cloud (legacy)
"login.microsoftonline.de",
# US Government clouds
"login.microsoftonline.us",
"login.usgovcloudapi.net",
"login-us.microsoftonline.com",
])

WELL_KNOWN_B2C_HOSTS = [
"b2clogin.com",
"b2clogin.cn",
Expand Down Expand Up @@ -67,6 +87,7 @@ def __init__(
performed.
"""
self._http_client = http_client
self._oidc_authority_url = oidc_authority_url
if oidc_authority_url:
logger.debug("Initializing with OIDC authority: %s", oidc_authority_url)
tenant_discovery_endpoint = self._initialize_oidc_authority(
Expand Down Expand Up @@ -95,11 +116,22 @@ def __init__(
raise ValueError(error_message)
logger.debug(
'openid_config("%s") = %s', tenant_discovery_endpoint, openid_config)
self._issuer = openid_config.get('issuer')
self.authorization_endpoint = openid_config['authorization_endpoint']
self.token_endpoint = openid_config['token_endpoint']
self.device_authorization_endpoint = openid_config.get('device_authorization_endpoint')
_, _, self.tenant = canonicalize(self.token_endpoint) # Usually a GUID

# Validate the issuer if using OIDC authority
if self._oidc_authority_url and not self.has_valid_issuer():
raise ValueError((
"The issuer '{iss}' does not match the authority '{auth}' or a known pattern. "
"When using the 'oidc_authority' parameter in ClientApplication, the authority "
"will be validated against the issuer from {auth}/.well-known/openid-configuration ."
"If using a known Entra authority (e.g. login.microsoftonline.com) the "
"'authority' parameter should be used instead of 'oidc_authority'. "
""
).format(iss=self._issuer, auth=oidc_authority_url))
def _initialize_oidc_authority(self, oidc_authority_url):
authority, self.instance, tenant = canonicalize(oidc_authority_url)
self.is_adfs = tenant.lower() == 'adfs' # As a convention
Expand Down Expand Up @@ -174,6 +206,60 @@ def user_realm_discovery(self, username, correlation_id=None, response=None):
self.__class__._domains_without_user_realm_discovery.add(self.instance)
return {} # This can guide the caller to fall back normal ROPC flow

def has_valid_issuer(self):
"""
Returns True if the issuer from OIDC discovery is valid for this authority.

An issuer is valid if one of the following is true:
- It exactly matches the authority URL (with/without trailing slash)
- It has the same scheme and host as the authority (path can be different)
- The issuer host is a well-known Microsoft authority host
- The issuer host is a regional variant of a well-known host (e.g., westus2.login.microsoft.com)
- For CIAM, the issuer follows the pattern of {tenant}.ciamlogin.com
"""
if not self._issuer or not self._oidc_authority_url:
return False

# Case 1: Exact match (most common case, normalized for trailing slashes)
if self._issuer.rstrip("/") == self._oidc_authority_url.rstrip("/"):
return True

issuer_parsed = urlparse(self._issuer)
authority_parsed = urlparse(self._oidc_authority_url)
issuer_host = issuer_parsed.hostname.lower() if issuer_parsed.hostname else None

if not issuer_host:
return False

# Case 2: Issuer is from a trusted Microsoft host - O(1) lookup
if issuer_host in TRUSTED_ISSUER_HOSTS:
return True

# Case 3: Regional variant check - O(1) lookup
# e.g., westus2.login.microsoft.com -> extract "login.microsoft.com"
dot_index = issuer_host.find(".")
if dot_index > 0:
potential_base = issuer_host[dot_index + 1:]
if potential_base in TRUSTED_ISSUER_HOSTS and "." not in issuer_host[:dot_index]:
return True

# Case 4: Same scheme and host (path can differ)
if (authority_parsed.scheme == issuer_parsed.scheme and
authority_parsed.netloc == issuer_parsed.netloc):
return True

# Case 5: CIAM scenario - issuer follows pattern {tenant}.ciamlogin.com
if issuer_host.endswith(_CIAM_DOMAIN_SUFFIX):
authority_host = authority_parsed.hostname.lower() if authority_parsed.hostname else ""
if authority_host.endswith(_CIAM_DOMAIN_SUFFIX):
tenant = authority_host[:-len(_CIAM_DOMAIN_SUFFIX)]
else:
parts = authority_parsed.path.split('/')
tenant = parts[1] if len(parts) >= 2 and parts[1] else None

if tenant and issuer_host == tenant + _CIAM_DOMAIN_SUFFIX:
return True
return False

def canonicalize(authority_or_auth_endpoint):
# Returns (url_parsed_result, hostname_in_lowercase, tenant)
Expand Down Expand Up @@ -222,4 +308,3 @@ def tenant_discovery(tenant_discovery_endpoint, http_client, **kwargs):
resp.raise_for_status()
raise RuntimeError( # A fallback here, in case resp.raise_for_status() is no-op
"Unable to complete OIDC Discovery: %d, %s" % (resp.status_code, resp.text))

1 change: 1 addition & 0 deletions tests/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,7 @@ def test_should_fallback_when_pymsalruntime_failed_to_initialize_broker(self):
@patch("msal.authority.tenant_discovery", new=Mock(return_value={
"authorization_endpoint": "https://contoso.com/placeholder",
"token_endpoint": "https://contoso.com/placeholder",
"issuer": "https://contoso.com/placeholder",
}))
@patch("msal.application._init_broker", new=Mock()) # Pretend pymsalruntime installed and working
class TestBrokerFallbackWithDifferentAuthorities(unittest.TestCase):
Expand Down
Loading
Loading