diff --git a/requirements.txt b/requirements.txt index 7252b96e..2256d5ca 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,7 @@ python-dotenv>=0.21,<2 pytest-benchmark>=4,<5 perf_baseline>=0.1,<0.2 +# For lab_config.py to fetch test secrets from Key Vault +azure-identity>=1.12,<2 +azure-keyvault-secrets>=4.6,<5 + diff --git a/tests/lab_config.py b/tests/lab_config.py new file mode 100644 index 00000000..181ac035 --- /dev/null +++ b/tests/lab_config.py @@ -0,0 +1,420 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +""" +Lab configuration helpers for integration tests. + +This module provides access to test user and app configuration stored in +Azure Key Vault. Configuration is retrieved as JSON and parsed into +dataclasses for type-safe access. + +Usage:: + + from tests.lab_config import ( + get_user_config, get_app_config, get_user_password, + UserSecrets, AppSecrets, + ) + + user = get_user_config(UserSecrets.PUBLIC_CLOUD) + password = get_user_password(user) + app = get_app_config(AppSecrets.PCA_CLIENT) + +Environment Variables: + LAB_APP_CLIENT_ID: Client ID for Key Vault authentication (required) + LAB_APP_CLIENT_CERT_PFX_PATH: Path to .pfx certificate file (preferred) + LAB_APP_CLIENT_SECRET: Client secret (alternative to certificate) +""" + +import json +import logging +import os +from dataclasses import dataclass +from typing import Dict, Optional + +from azure.identity import CertificateCredential, ClientSecretCredential +from azure.keyvault.secrets import SecretClient + +logger = logging.getLogger(__name__) + +__all__ = [ + # Constants + "UserSecrets", + "AppSecrets", + # Data classes + "UserConfig", + "AppConfig", + # Functions + "get_secret", + "get_user_config", + "get_app_config", + "get_user_password", + "get_client_certificate", +] + +# ============================================================================= +# Key Vault URLs +# ============================================================================= + +_MSID_LAB_VAULT = "https://msidlabs.vault.azure.net" +_MSAL_TEAM_VAULT = "https://id4skeyvault.vault.azure.net" + +# ============================================================================= +# Secret Name Constants +# ============================================================================= + +class UserSecrets: + """ + Secret names for test user configuration in Key Vault. + + Each constant maps to a JSON blob containing user details like UPN, + tenant ID, and the lab name (used to retrieve the user's password). + """ + PUBLIC_CLOUD = "User-PublicCloud-Config" + FEDERATED = "User-Federated-Config" + B2C = "MSAL-USER-B2C-JSON" + ARLINGTON = "MSAL-USER-Arlington-JSON" + CIAM = "MSAL-USER-CIAM-JSON" + +class AppSecrets: + """ + Secret names for test app configuration in Key Vault. + + Each constant maps to a JSON blob containing app registration details + like client ID, authority, redirect URI, and client secret reference. + """ + PCA_CLIENT = "App-PCAClient-Config" + S2S_CLIENT = "App-S2S-Config" + WEB_API_CLIENT = "App-WebAPI-Config" + WEB_APP_CLIENT = "App-WebAPP-Config" + B2C_CLIENT = "MSAL-App-B2C-JSON" + CIAM_CLIENT = "MSAL-App-CIAM-JSON" + ARLINGTON_CLIENT = "MSAL-App-Arlington-JSON" + +# ============================================================================= +# Data Classes +# ============================================================================= + +@dataclass +class UserConfig: + """ + Test user configuration retrieved from Key Vault. + + Attributes: + upn: User principal name (email format). + tenant_id: Azure AD tenant ID. + lab_name: Key Vault secret name for the user's password. + home_upn: UPN in the user's home tenant (for federated scenarios). + b2c_provider: Identity provider for B2C users (e.g., 'local', 'google'). + federation_provider: Federation provider type (e.g., 'ADFSv4'). + """ + app_id: Optional[str] = None + object_id: Optional[str] = None + user_type: Optional[str] = None + display_name: Optional[str] = None + licenses: Optional[str] = None + upn: Optional[str] = None + mfa: Optional[str] = None + protection_policy: Optional[str] = None + home_domain: Optional[str] = None + home_upn: Optional[str] = None + b2c_provider: Optional[str] = None + lab_name: Optional[str] = None + last_updated_by: Optional[str] = None + last_updated_date: Optional[str] = None + tenant_id: Optional[str] = None + federation_provider: Optional[str] = None + + @property + def authority(self) -> str: + """Construct the Azure AD authority URL from tenant_id.""" + if self.tenant_id: + return f"https://login.microsoftonline.com/{self.tenant_id}" + return "https://login.microsoftonline.com/common" + + +@dataclass +class AppConfig: + """ + Test app registration configuration retrieved from Key Vault. + + Attributes: + app_id: Application (client) ID. + authority: Azure AD authority URL for the app's tenant. + redirect_uri: OAuth redirect URI configured for the app. + defaultscopes: Space-separated default scopes for the app. + client_secret: Key Vault secret name containing the app's client secret. + secret_name: Alternative field for the client secret reference. + """ + app_type: Optional[str] = None + app_name: Optional[str] = None + app_id: Optional[str] = None + defaultscopes: Optional[str] = None + redirect_uri: Optional[str] = None + authority: Optional[str] = None + lab_name: Optional[str] = None + client_secret: Optional[str] = None + secret_name: Optional[str] = None + + +# ============================================================================= +# Key Vault Client Setup +# ============================================================================= + +# Module-level client cache (lazy initialized) +_msid_lab_client: Optional[SecretClient] = None +_msal_team_client: Optional[SecretClient] = None + + +def _get_credential(): + """ + Create an Azure credential for Key Vault access. + + Reads authentication details from environment variables. Prefers + certificate-based authentication if LAB_APP_CLIENT_CERT_PFX_PATH is set, + otherwise falls back to client secret. + + Returns: + A credential object suitable for Azure SDK clients. + + Raises: + EnvironmentError: If required environment variables are not set. + """ + client_id = os.getenv("LAB_APP_CLIENT_ID") + client_secret = os.getenv("LAB_APP_CLIENT_SECRET") + cert_path = os.getenv("LAB_APP_CLIENT_CERT_PFX_PATH") + tenant_id = "72f988bf-86f1-41af-91ab-2d7cd011db47" # Microsoft tenant + + if not client_id: + raise EnvironmentError( + "LAB_APP_CLIENT_ID environment variable is required for Key Vault access") + + if cert_path: + logger.debug("Using certificate credential for Key Vault access") + return CertificateCredential( + tenant_id=tenant_id, + client_id=client_id, + certificate_path=cert_path, + send_certificate_chain=True, + ) + elif client_secret: + logger.debug("Using client secret credential for Key Vault access") + return ClientSecretCredential( + tenant_id=tenant_id, + client_id=client_id, + client_secret=client_secret, + ) + else: + raise EnvironmentError( + "Either LAB_APP_CLIENT_SECRET or LAB_APP_CLIENT_CERT_PFX_PATH is required") + + +def _get_msid_lab_client() -> SecretClient: + """Return the MSID Lab Key Vault client, creating it if needed.""" + global _msid_lab_client + if _msid_lab_client is None: + logger.debug("Initializing MSID Lab Key Vault client") + _msid_lab_client = SecretClient( + vault_url=_MSID_LAB_VAULT, + credential=_get_credential(), + ) + return _msid_lab_client + + +def _get_msal_team_client() -> SecretClient: + """Return the MSAL Team Key Vault client, creating it if needed.""" + global _msal_team_client + if _msal_team_client is None: + logger.debug("Initializing MSAL Team Key Vault client") + _msal_team_client = SecretClient( + vault_url=_MSAL_TEAM_VAULT, + credential=_get_credential(), + ) + return _msal_team_client + + +# ============================================================================= +# Secret Retrieval Functions +# ============================================================================= + +# Module-level caches for config objects (keyed by secret_name) +_user_config_cache: Dict[str, UserConfig] = {} +_app_config_cache: Dict[str, AppConfig] = {} + + +def _lowercase_keys(d: dict) -> dict: + """Recursively convert all dictionary keys to lowercase for case-insensitive access.""" + return {k.lower(): (_lowercase_keys(v) if isinstance(v, dict) else v) for k, v in d.items()} + + +def get_secret(secret_name: str, vault: str = "msid_lab") -> str: + """ + Retrieve a raw secret value from Key Vault. + + Args: + secret_name: The name of the secret in Key Vault. + vault: Which vault to use - ``"msid_lab"`` (default, for passwords) + or ``"msal_team"`` (for configuration JSON blobs). + + Returns: + The secret value as a string. + + Raises: + ValueError: If the vault name is unknown or the secret is empty. + """ + logger.debug("Retrieving secret '%s' from %s vault", secret_name, vault) + + if vault == "msid_lab": + client = _get_msid_lab_client() + elif vault == "msal_team": + client = _get_msal_team_client() + else: + raise ValueError(f"Unknown vault: {vault}. Use 'msid_lab' or 'msal_team'") + + secret = client.get_secret(secret_name) + + if not secret.value: + raise ValueError(f"Secret '{secret_name}' is empty in Key Vault") + + logger.debug("Successfully retrieved secret '%s'", secret_name) + return secret.value + + +def get_user_config(secret_name: str) -> UserConfig: + """ + Retrieve and parse a test user configuration from Key Vault. + + Results are cached for subsequent calls with the same secret name. + + Args: + secret_name: The secret name (use ``UserSecrets`` constants). + + Returns: + A ``UserConfig`` instance populated from the JSON. + """ + # Check cache first + if secret_name in _user_config_cache: + return _user_config_cache[secret_name] + + logger.info("Retrieving user config from secret '%s'", secret_name) + + raw = get_secret(secret_name, vault="msal_team") + data = _lowercase_keys(json.loads(raw)) + + # The JSON has a "user" wrapper object + user_data = data.get("user", data) + + config = UserConfig( + app_id=user_data.get("appid"), + object_id=user_data.get("objectid"), + user_type=user_data.get("usertype"), + display_name=user_data.get("displayname"), + licenses=user_data.get("licenses"), + upn=user_data.get("upn"), + mfa=user_data.get("mfa"), + protection_policy=user_data.get("protectionpolicy"), + home_domain=user_data.get("homedomain"), + home_upn=user_data.get("homeupn"), + b2c_provider=user_data.get("b2cprovider"), + lab_name=user_data.get("labname"), + last_updated_by=user_data.get("lastupdatedby"), + last_updated_date=user_data.get("lastupdateddate"), + tenant_id=user_data.get("tenantid"), + federation_provider=user_data.get("federationprovider"), + ) + + _user_config_cache[secret_name] = config + return config + + +def get_app_config(secret_name: str) -> AppConfig: + """ + Retrieve and parse an app registration configuration from Key Vault. + + Results are cached for subsequent calls with the same secret name. + + Args: + secret_name: The secret name (use ``AppSecrets`` constants). + + Returns: + An ``AppConfig`` instance populated from the JSON. + """ + # Check cache first + if secret_name in _app_config_cache: + return _app_config_cache[secret_name] + + logger.info("Retrieving app config from Key Vault app configuration secret") + + raw = get_secret(secret_name, vault="msal_team") + data = _lowercase_keys(json.loads(raw)) + + # The JSON has an "app" wrapper object + app_data = data.get("app", data) + + config = AppConfig( + app_type=app_data.get("apptype"), + app_name=app_data.get("appname"), + app_id=app_data.get("appid"), + defaultscopes=app_data.get("defaultscopes"), + redirect_uri=app_data.get("redirecturi"), + authority=app_data.get("authority"), + lab_name=app_data.get("labname"), + client_secret=app_data.get("clientsecret"), + secret_name=app_data.get("secretname") + ) + + _app_config_cache[secret_name] = config + return config + + +def get_user_password(user_config: UserConfig) -> str: + """ + Retrieve a test user's password from Key Vault. + + The password is stored in the MSID Lab vault under the secret name + specified by the user's ``lab_name`` field. + + Args: + user_config: A UserConfig instance with lab_name set. + + Returns: + The user's password as a string. + + Raises: + ValueError: If the user_config has no lab_name. + """ + if not user_config.lab_name: + raise ValueError("UserConfig has no lab_name configured") + return get_secret(user_config.lab_name.lower(), vault="msid_lab") + + +def get_client_certificate() -> Dict[str, object]: + """ + Get the client certificate credential for ConfidentialClientApplication. + + Reads the certificate path from the LAB_APP_CLIENT_CERT_PFX_PATH + environment variable and returns a dict configured for Subject Name/Issuer + (SNI) authentication. + + Returns: + A dict suitable for MSAL's ``client_credential`` parameter:: + + { + "private_key_pfx_path": "/path/to/cert.pfx", + "public_certificate": True + } + + Raises: + EnvironmentError: If LAB_APP_CLIENT_CERT_PFX_PATH is not set. + """ + cert_path = os.getenv("LAB_APP_CLIENT_CERT_PFX_PATH") + if not cert_path: + raise EnvironmentError( + "LAB_APP_CLIENT_CERT_PFX_PATH environment variable is required " + "for certificate authentication" + ) + + logger.debug("Using client certificate from: %s", cert_path) + return { + "private_key_pfx_path": cert_path, + "public_certificate": True, # Enable SNI (send certificate chain) + } \ No newline at end of file diff --git a/tests/test_e2e.py b/tests/test_e2e.py index 65b90aa2..77af940b 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -30,6 +30,10 @@ from msal.oauth2cli import AuthCodeReceiver from msal.oauth2cli.oidc import decode_part from tests.broker_util import is_pymsalruntime_installed +from tests.lab_config import ( + get_user_config, get_app_config, get_user_password, get_secret, + UserSecrets, AppSecrets, +) logger = logging.getLogger(__name__) @@ -533,46 +537,6 @@ def setUpClass(cls): def tearDownClass(cls): cls.session.close() - @classmethod - def get_lab_app_object(cls, client_id=None, **query): # https://msidlab.com/swagger/index.html - url = "https://msidlab.com/api/app/{}".format(client_id or "") - resp = cls.session.get(url, params=query) - result = resp.json()[0] - result["scopes"] = [ # Raw data has extra space, such as "s1, s2" - s.strip() for s in result["defaultScopes"].split(',')] - return result - - @classmethod - def get_lab_user_secret(cls, lab_name="msidlab4"): - lab_name = lab_name.lower() - if lab_name not in cls._secrets: - logger.info("Querying lab user password for %s", lab_name) - url = "https://msidlab.com/api/LabSecret?secret=%s" % lab_name - resp = cls.session.get(url) - cls._secrets[lab_name] = resp.json()["value"] - return cls._secrets[lab_name] - - @classmethod - def get_lab_user(cls, **query): # https://docs.msidlab.com/labapi/userapi.html - resp = cls.session.get("https://msidlab.com/api/user", params=query) - result = resp.json()[0] - assert result.get("upn"), "Found no test user but {}".format( - json.dumps(result, indent=2)) - _env = query.get("azureenvironment", "").lower() - authority_base = { - "azureusgovernment": "https://login.microsoftonline.us/" - }.get(_env, "https://login.microsoftonline.com/") - scope = { - "azureusgovernment": ["https://graph.microsoft.us/.default"], - }.get(_env, ["https://graph.microsoft.com/.default"]) - return { # Mapping lab API response to our simplified configuration format - "authority": authority_base + result["tenantID"], - "client_id": result["appId"], - "username": result["upn"], - "lab_name": result["labName"], - "scope": scope, - } - @unittest.skipIf(os.getenv("TRAVIS"), "Browser automation is not yet implemented") def _test_acquire_token_by_auth_code( self, client_id=None, authority=None, port=None, scope=None, @@ -729,10 +693,52 @@ def _test_acquire_token_by_client_secret( self.assertIsNotNone(result.get("access_token"), "Got %s instead" % result) self.assertCacheWorksForApp(result, scope) + def _test_acquire_token_by_client_cert( + self, client_id=None, authority=None, scope=None, + oidc_authority=None, + **ignored): + """Test client credentials flow with certificate. + + Uses the lab certificate from LAB_APP_CLIENT_CERT_PFX_PATH environment + variable for authentication. + """ + from tests.lab_config import get_client_certificate + + assert client_id and scope and (authority or oidc_authority) + client_credential = get_client_certificate() + self.app = msal.ConfidentialClientApplication( + client_id, client_credential=client_credential, authority=authority, + oidc_authority=oidc_authority, + http_client=MinimalHttpClient()) + result = self.app.acquire_token_for_client(scope) + self.assertIsNotNone(result.get("access_token"), "Got %s instead" % result) + self.assertCacheWorksForApp(result, scope) + class PopWithExternalKeyTestCase(LabBasedTestCase): + """Base class for POP/SSH-cert tests with external key. + + Uses the S2S app configuration from Key Vault for service principal tests + and the public cloud user for user account tests. + """ + def _test_service_principal(self): - app = get_lab_app() # Any SP can obtain an ssh-cert. Here we use the lab app. + """Test acquiring POP/SSH-cert token for a service principal. + + Uses the S2S app configuration and lab certificate. + """ + from tests.lab_config import get_app_config, get_client_certificate, AppSecrets + + app_config = get_app_config(AppSecrets.S2S_CLIENT) + client_credential = get_client_certificate() + + app = msal.ConfidentialClientApplication( + app_config.app_id, + client_credential=client_credential, + authority=app_config.authority or "https://login.microsoftonline.com/microsoft.onmicrosoft.com", + http_client=MinimalHttpClient(), + ) + result = app.acquire_token_for_client(self.SCOPE, data=self.DATA1) self.assertIsNotNone(result.get("access_token"), "Encountered {}: {}".format( result.get("error"), result.get("error_description"))) @@ -756,15 +762,20 @@ def _test_service_principal(self): self.assertEqual(refreshed_result["token_source"], "identity_provider") def _test_user_account(self): - lab_user = self.get_lab_user(usertype="cloud") + """Test acquiring POP/SSH-cert token for a user account. + + Uses Azure CLI client ID since it's one of the only clients that are + pre-authorized to use the SSH cert feature. + """ + user = get_user_config(UserSecrets.PUBLIC_CLOUD) result = self._test_acquire_token_interactive( client_id="04b07795-8ddb-461a-bbee-02f9e1bf7b46", # Azure CLI is one # of the only 2 clients that are PreAuthz to use ssh cert feature authority="https://login.microsoftonline.com/common", scope=self.SCOPE, data=self.DATA1, - username=lab_user["username"], - lab_name=lab_user["lab_name"], + username=user.upn, + lab_name=user.lab_name, prompt="none" if msal.application._is_running_in_cloud_shell() else None, ) # It already tests reading AT from cache, and using RT to refresh # acquire_token_silent() would work because we pass in the same key @@ -844,41 +855,43 @@ class WorldWideTestCase(LabBasedTestCase): _ADFS_LABS_UNAVAILABLE = "ADFS labs were temporarily down since July 2025 until further notice" def test_aad_managed_user(self): # Pure cloud - config = self.get_lab_user(usertype="cloud") - config["password"] = self.get_lab_user_secret(config["lab_name"]) - self._test_username_password(**config) - - @unittest.skip(_ADFS_LABS_UNAVAILABLE) - def test_adfs4_fed_user(self): - config = self.get_lab_user(usertype="federated", federationProvider="ADFSv4") - config["password"] = self.get_lab_user_secret(config["lab_name"]) - self._test_username_password(**config) - - @unittest.skip("ADFSv3 is decommissioned in our test environment") - def test_adfs3_fed_user(self): - config = self.get_lab_user(usertype="federated", federationProvider="ADFSv3") - config["password"] = self.get_lab_user_secret(config["lab_name"]) - self._test_username_password(**config) - - @unittest.skip("ADFSv2 is decommissioned in our test environment") - def test_adfs2_fed_user(self): - config = self.get_lab_user(usertype="federated", federationProvider="ADFSv2") - config["password"] = self.get_lab_user_secret(config["lab_name"]) - self._test_username_password(**config) + """Test username/password flow for a managed AAD user.""" + app = get_app_config(AppSecrets.PCA_CLIENT) + user = get_user_config(UserSecrets.PUBLIC_CLOUD) + password = get_user_password(user) + self._test_username_password( + authority=user.authority, + client_id=app.app_id, + username=user.upn, + password=password, + scope=["https://graph.microsoft.com/.default"], + ) @unittest.skip(_ADFS_LABS_UNAVAILABLE) - def test_adfs2019_fed_user(self): - try: - config = self.get_lab_user(usertype="federated", federationProvider="ADFSv2019") - config["password"] = self.get_lab_user_secret(config["lab_name"]) - self._test_username_password(**config) - except requests.exceptions.HTTPError: - if os.getenv("TRAVIS"): - self.skipTest("MEX endpoint in our test environment tends to fail") - raise + def test_adfs2022_fed_user(self): + """Test username/password flow for a federated user via ADFS 2022.""" + app = get_app_config(AppSecrets.PCA_CLIENT) + user = get_user_config(UserSecrets.FEDERATED) + password = get_user_password(user) + self._test_username_password( + authority=user.authority, + client_id=app.app_id, + username=user.upn, + password=password, + scope=["https://graph.microsoft.com/.default"], + ) def test_cloud_acquire_token_interactive(self): - self._test_acquire_token_interactive(**self.get_lab_user(usertype="cloud")) + """Test interactive flow for a cloud user.""" + app = get_app_config(AppSecrets.PCA_CLIENT) + user = get_user_config(UserSecrets.PUBLIC_CLOUD) + self._test_acquire_token_interactive( + authority=user.authority, + client_id=app.app_id, + username=user.upn, + lab_name=user.lab_name, + scope=["https://graph.microsoft.com/.default"], + ) def test_msa_pt_app_signin_via_organizations_authority_without_login_hint(self): """There is/was an upstream bug. See test case full docstring for the details. @@ -887,284 +900,303 @@ def test_msa_pt_app_signin_via_organizations_authority_without_login_hint(self): selects an AAD account that is NOT the default AAD account from the OS, it will incorrectly get tokens for default AAD account. """ - self._test_acquire_token_interactive(**dict( - self.get_lab_user(usertype="cloud"), # This is generally not the current laptop's default AAD account + user = get_user_config(UserSecrets.PUBLIC_CLOUD) + self._test_acquire_token_interactive( authority="https://login.microsoftonline.com/organizations", client_id="04b07795-8ddb-461a-bbee-02f9e1bf7b46", # Azure CLI is an MSA-PT app + username=user.upn, + lab_name=user.lab_name, + scope=["https://graph.microsoft.com/.default"], enable_msa_passthrough=True, prompt="select_account", # In MSAL Python, this resets login_hint - )) - - @unittest.skip(_ADFS_LABS_UNAVAILABLE) - def test_ropc_adfs2019_onprem(self): - # Configuration is derived from https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.7.0/tests/Microsoft.Identity.Test.Common/TestConstants.cs#L250-L259 - config = self.get_lab_user(usertype="onprem", federationProvider="ADFSv2019") - config["authority"] = "https://fs.%s.com/adfs" % config["lab_name"] - config["scope"] = self.adfs2019_scopes - config["password"] = self.get_lab_user_secret(config["lab_name"]) - self._test_username_password(**config) - - @unittest.skip(_ADFS_LABS_UNAVAILABLE) - def test_adfs2019_onprem_acquire_token_by_auth_code(self): - """When prompted, you can manually login using this account: - - # https://msidlab.com/api/user?usertype=onprem&federationprovider=ADFSv2019 - username = "..." # The upn from the link above - password="***" # From https://aka.ms/GetLabSecret?Secret=msidlabXYZ - """ - config = self.get_lab_user(usertype="onprem", federationProvider="ADFSv2019") - config["authority"] = "https://fs.%s.com/adfs" % config["lab_name"] - config["scope"] = self.adfs2019_scopes - config["port"] = 8080 - self._test_acquire_token_by_auth_code(**config) - - @unittest.skip(_ADFS_LABS_UNAVAILABLE) - def test_adfs2019_onprem_acquire_token_by_auth_code_flow(self): - config = self.get_lab_user(usertype="onprem", federationProvider="ADFSv2019") - self._test_acquire_token_by_auth_code_flow(**dict( - config, - authority="https://fs.%s.com/adfs" % config["lab_name"], - scope=self.adfs2019_scopes, - port=8080, - )) - - @unittest.skip(_ADFS_LABS_UNAVAILABLE) - def test_adfs2019_onprem_acquire_token_interactive(self): - config = self.get_lab_user(usertype="onprem", federationProvider="ADFSv2019") - self._test_acquire_token_interactive(**dict( - config, - authority="https://fs.%s.com/adfs" % config["lab_name"], - scope=self.adfs2019_scopes, - port=8080, - )) + ) - @unittest.skipUnless( - os.getenv("LAB_OBO_CLIENT_SECRET"), - "Need LAB_OBO_CLIENT_SECRET from https://aka.ms/GetLabSecret?Secret=TodoListServiceV2-OBO") - @unittest.skipUnless( - os.getenv("LAB_OBO_CONFIDENTIAL_CLIENT_ID"), - "Need LAB_OBO_CONFIDENTIAL_CLIENT_ID from https://identitydivision.visualstudio.com/Engineering/_git/IDLABS?path=/src/DocFX/LabDocs/flows/onbehalfofflow.md&_a=preview") - @unittest.skipUnless( - os.getenv("LAB_OBO_PUBLIC_CLIENT_ID"), - "Need LAB_OBO_PUBLIC_CLIENT_ID from https://identitydivision.visualstudio.com/Engineering/_git/IDLABS?path=/src/DocFX/LabDocs/flows/onbehalfofflow.md&_a=preview") def test_acquire_token_obo(self): - config = self.get_lab_user(usertype="cloud") - - config_cca = {} - config_cca.update(config) - config_cca["client_id"] = os.getenv("LAB_OBO_CONFIDENTIAL_CLIENT_ID") - config_cca["scope"] = ["https://graph.microsoft.com/.default"] - config_cca["client_secret"] = os.getenv("LAB_OBO_CLIENT_SECRET") - - config_pca = {} - config_pca.update(config) - config_pca["client_id"] = os.getenv("LAB_OBO_PUBLIC_CLIENT_ID") - config_pca["password"] = self.get_lab_user_secret(config_pca["lab_name"]) - config_pca["scope"] = ["api://%s/read" % config_cca["client_id"]] + """Test On-Behalf-Of flow. + + Flow: + 1. PCA acquires token for user to access the WebAPI (scope: api:///access_as_user) + 2. WebAPI (CCA) uses that token as assertion to get token for downstream service (Graph) + """ + user = get_user_config(UserSecrets.PUBLIC_CLOUD) + password = get_user_password(user) + web_api_app = get_app_config(AppSecrets.WEB_API_CLIENT) + + # Step 1: PCA gets token for user to access the WebAPI + # Note: Java test uses "organizations" authority for PCA + config_pca = { + "authority": "https://login.microsoftonline.com/organizations", + "client_id": web_api_app.app_id, + "username": user.upn, + "password": password, + "scope": ["api://%s/access_as_user" % web_api_app.app_id], + } + + # Step 2: WebAPI (CCA) exchanges the token via OBO for Graph access + # Note: web_api_app.client_secret contains the Key Vault secret name, + # which we pass to get_secret() to retrieve the actual secret value. + config_cca = { + "authority": user.authority, # Tenant-specific authority + "client_id": web_api_app.app_id, + "client_secret": get_secret(web_api_app.client_secret, vault="msal_team"), + "scope": ["https://graph.microsoft.com/.default"], + "username": user.upn, + } self._test_acquire_token_obo(config_pca, config_cca) + @unittest.skipUnless( os.path.exists("tests/sp_obo.pem"), - "Need a 'tests/sp_obo.pem' private to run OBO for SP test") + "Need a 'tests/sp_obo.pem' private key to run OBO for SP test") def test_acquire_token_obo_for_sp(self): + """Test On-Behalf-Of flow for service principal. + + Note: This test uses hardcoded PPE (pre-production environment) values + as it tests a specific SP-to-SP OBO scenario. + """ authority = "https://login.windows-ppe.net/f686d426-8d16-42db-81b7-ab578e110ccd" with open("tests/sp_obo.pem") as pem: client_secret = { "private_key": pem.read(), "thumbprint": "378938210C976692D7F523B8C4FFBB645D17CE92", - } + } midtier_app = { "authority": authority, "client_id": "c84e9c32-0bc9-4a73-af05-9efe9982a322", "client_secret": client_secret, "scope": ["23d08a1e-1249-4f7c-b5a5-cb11f29b6923/.default"], - #"username": "OBO-Client-PPE", # We do NOT attempt locating initial_app by name - } + } initial_app = { "authority": authority, "client_id": "9793041b-9078-4942-b1d2-babdc472cc0c", "client_secret": client_secret, "scope": [midtier_app["client_id"] + "/.default"], - } + } self._test_acquire_token_obo(initial_app, midtier_app) def test_acquire_token_by_client_secret(self): - # Vastly different than ArlingtonCloudTestCase.test_acquire_token_by_client_secret() - _app = self.get_lab_app_object( - publicClient="no", signinAudience="AzureAdMyOrg") + """Test client credentials flow with client secret.""" + app = get_app_config(AppSecrets.S2S_CLIENT) + + client_secret = get_secret(app.secret_name, vault="msal_team") self._test_acquire_token_by_client_secret( - client_id=_app["appId"], - client_secret=self.get_lab_user_secret( - _app["clientSecret"].split("/")[-1]), - authority="{}{}.onmicrosoft.com".format( - _app["authority"], _app["labName"].lower().rstrip(".com")), + client_id=app.app_id, + client_secret=client_secret, + authority=app.authority, scope=["https://graph.microsoft.com/.default"], - ) + ) - @unittest.skipUnless( - os.getenv("LAB_OBO_CLIENT_SECRET"), - "Need LAB_OBO_CLIENT_SECRET from https://aka.ms/GetLabSecret?Secret=TodoListServiceV2-OBO") - @unittest.skipUnless( - os.getenv("LAB_OBO_CONFIDENTIAL_CLIENT_ID"), - "Need LAB_OBO_CONFIDENTIAL_CLIENT_ID from https://identitydivision.visualstudio.com/Engineering/_git/IDLABS?path=/src/DocFX/LabDocs/flows/onbehalfofflow.md&_a=preview") def test_confidential_client_acquire_token_by_username_password(self): - # This approach won't work: - # config = self.get_lab_user(usertype="cloud", publicClient="no") - # so we repurpose the obo confidential app to test ROPC - config = self.get_lab_user(usertype="cloud") - config["password"] = self.get_lab_user_secret(config["lab_name"]) - # Swap in the OBO confidential app - config["client_id"] = os.getenv("LAB_OBO_CONFIDENTIAL_CLIENT_ID") - config["scope"] = ["https://graph.microsoft.com/.default"] - config["client_secret"] = os.getenv("LAB_OBO_CLIENT_SECRET") - self._test_username_password(**config) + """Test ROPC flow with a confidential client.""" + user = get_user_config(UserSecrets.PUBLIC_CLOUD) + password = get_user_password(user) + # Use a confidential client app that supports ROPC + app = get_app_config(AppSecrets.S2S_CLIENT) + client_secret = get_secret(app.secret_name, vault="msal_team") + self._test_username_password( + authority=user.authority, + client_id=app.app_id, + username=user.upn, + password=password, + scope=["https://graph.microsoft.com/.default"], + client_secret=client_secret, + ) def _build_b2c_authority(self, policy): base = "https://msidlabb2c.b2clogin.com/msidlabb2c.onmicrosoft.com" return base + "/" + policy # We do not support base + "?p=" + policy def test_b2c_acquire_token_by_auth_code(self): - """ - When prompted, you can manually login using this account: - - username="b2clocal@msidlabb2c.onmicrosoft.com" - # This won't work https://msidlab.com/api/user?usertype=b2c - password="***" # From https://aka.ms/GetLabSecret?Secret=msidlabb2c - """ - config = self.get_lab_app_object(azureenvironment="azureb2ccloud") + """Test auth code flow for B2C.""" + # TODO: Update with B2C app config from Key Vault when available + app = get_app_config(AppSecrets.B2C_CLIENT) # TODO: Verify secret name self._test_acquire_token_by_auth_code( authority=self._build_b2c_authority("B2C_1_SignInPolicy"), - client_id=config["appId"], + client_id=app.app_id, port=3843, # Lab defines 4 of them: [3843, 4584, 4843, 60000] - scope=config["scopes"], - ) + scope=app.defaultscopes, + ) def test_b2c_acquire_token_by_auth_code_flow(self): - self._test_acquire_token_by_auth_code_flow(**dict( - self.get_lab_user(usertype="b2c", b2cprovider="local"), + """Test auth code flow (with PKCE) for B2C.""" + user = get_user_config(UserSecrets.B2C) # TODO: Verify secret name + app = get_app_config(AppSecrets.B2C_CLIENT) # TODO: Verify secret name + self._test_acquire_token_by_auth_code_flow( authority=self._build_b2c_authority("B2C_1_SignInPolicy"), + client_id=app.app_id, + username=user.upn, + lab_name=user.lab_name, port=3843, # Lab defines 4 of them: [3843, 4584, 4843, 60000] - scope=self.get_lab_app_object(azureenvironment="azureb2ccloud")["scopes"], - )) + scope=app.defaultscopes, + ) def test_b2c_acquire_token_by_ropc(self): - config = self.get_lab_app_object(azureenvironment="azureb2ccloud") + """Test ROPC flow for B2C with custom authority.""" + user = get_user_config(UserSecrets.B2C) + password = get_user_password(user) + app = get_app_config(AppSecrets.B2C_CLIENT) + # B2C uses a specific API scope, not generic app.scopes + b2c_scope = ["https://msidlabb2c.onmicrosoft.com/msidlabb2capi/read"] self._test_username_password( authority=self._build_b2c_authority("B2C_1_ROPC_Auth"), - client_id=config["appId"], - username="b2clocal@msidlabb2c.onmicrosoft.com", - password=self.get_lab_user_secret("msidlabb2c"), - scope=config["scopes"], - ) + client_id=app.app_id, + username=user.upn, + password=password, + scope=b2c_scope, + ) def test_b2c_allows_using_client_id_as_scope(self): - # See also https://learn.microsoft.com/en-us/azure/active-directory-b2c/access-tokens#openid-connect-scopes - config = self.get_lab_app_object(azureenvironment="azureb2ccloud") - config["scopes"] = [config["appId"]] + """Test that B2C allows using client_id as scope. + + See also https://learn.microsoft.com/en-us/azure/active-directory-b2c/access-tokens#openid-connect-scopes + """ + user = get_user_config(UserSecrets.B2C) + password = get_user_password(user) + app = get_app_config(AppSecrets.B2C_CLIENT) self._test_username_password( authority=self._build_b2c_authority("B2C_1_ROPC_Auth"), - client_id=config["appId"], - username="b2clocal@msidlabb2c.onmicrosoft.com", - password=self.get_lab_user_secret("msidlabb2c"), - scope=config["scopes"], - ) + client_id=app.app_id, + username=user.upn, + password=password, + scope=[app.app_id], # Using client_id as scope + ) class CiamTestCase(LabBasedTestCase): - # Test cases below show you what scenarios need to be covered for CIAM. - # Detail test behaviors have already been implemented in preexisting helpers. + """Test cases for CIAM (Customer Identity and Access Management). + + CIAM uses a different authority format: https://.ciamlogin.com/ + """ @classmethod def setUpClass(cls): super(CiamTestCase, cls).setUpClass() - cls.user = cls.get_lab_user( - #federationProvider="ciam", # This line would return ciam2 tenant - federationProvider="ciamcud", signinAudience="AzureAdMyOrg", # ciam6 - ) - # FYI: Only single- or multi-tenant CIAM app can have other-than-OIDC - # delegated permissions on Microsoft Graph. - cls.app_config = cls.get_lab_app_object(cls.user["client_id"]) + # Get CIAM user and app config from Key Vault + cls.user_config = get_user_config(UserSecrets.CIAM) + cls.app_config = get_app_config(AppSecrets.CIAM_CLIENT) + # CIAM authority format: https://.ciamlogin.com/ + cls.ciam_authority = f"https://{cls.user_config.lab_name}.ciamlogin.com/" def test_ciam_acquire_token_interactive(self): + """Test interactive flow for CIAM.""" self._test_acquire_token_interactive( - authority=self.app_config.get("authority"), - oidc_authority=self.app_config.get("oidc_authority"), - client_id=self.app_config["appId"], - scope=self.app_config["scopes"], - username=self.user["username"], - lab_name=self.user["lab_name"], - ) + authority=self.ciam_authority, + client_id=self.app_config.app_id, + scope=self.app_config.defaultscopes, + username=self.user_config.upn, + lab_name=self.user_config.lab_name, + ) def test_ciam_acquire_token_for_client(self): - raw_url = self.app_config["clientSecret"] - secret_url = urlparse(raw_url) - if secret_url.query: # Ciam2 era has a query param Secret=name - secret_name = parse_qs(secret_url.query)["Secret"][0] - else: # Ciam6 era has a URL path that ends with the secret name - secret_name = secret_url.path.split("/")[-1] - logger.info('Detected secret name "%s" from "%s"', secret_name, raw_url) - self._test_acquire_token_by_client_secret( - client_id=self.app_config["appId"], - client_secret=self.get_lab_user_secret(secret_name), - authority=self.app_config.get("authority"), - oidc_authority=self.app_config.get("oidc_authority"), - scope=self.app_config["scopes"], # It shall ends with "/.default" - ) + """Test client credentials flow for CIAM using certificate.""" + self._test_acquire_token_by_client_cert( + client_id=self.app_config.app_id, + authority=self.ciam_authority, + scope=[f"{self.app_config.app_id}/.default"], + ) def test_ciam_acquire_token_by_ropc(self): - """CIAM does not officially support ROPC, especially not for external emails. - - We keep this test case for now, because the test data will use a local email. + """Test ROPC flow for CIAM. + + Note: CIAM does not officially support ROPC for external emails, + but this test works because the test data uses a local email. """ - # Somehow, this would only work after creating a secret for the test app - # and enabling "Allow public client flows". - # Otherwise it would hit AADSTS7000218. + password = get_user_password(self.user_config) self._test_username_password( - authority=self.app_config.get("authority"), - oidc_authority=self.app_config.get("oidc_authority"), - client_id=self.app_config["appId"], - username=self.user["username"], - password=self.get_lab_user_secret(self.user["lab_name"]), - scope=self.app_config["scopes"], - ) + authority=self.ciam_authority, + client_id=self.app_config.app_id, + username=self.user_config.upn, + password=password, + scope=["user.read"], + ) @unittest.skip("""As of Aug 2024, in both ciam2 and ciam6, sign-in fails with AADSTS500208: The domain is not a valid login domain for the account type.""") def test_ciam_device_flow(self): self._test_device_flow( - authority=self.app_config.get("authority"), - oidc_authority=self.app_config.get("oidc_authority"), - client_id=self.app_config["appId"], - scope=self.app_config["scopes"], - ) + authority=self.ciam_authority, + client_id=self.app_config.app_id, + scope=self.app_config.scopes or ["user.read"], + ) class CiamCudTestCase(CiamTestCase): + """CIAM CUD (Custom URL Domain) test case. + + Uses OIDC authority instead of standard CIAM authority for custom domain scenarios. + """ + # OIDC authority for CIAM CUD + CIAM_CUD_OIDC_AUTHORITY = "https://login.msidlabsciam.com/fe362aec-5d43-45d1-b730-9755e60dc3b9/v2.0" + @classmethod def setUpClass(cls): super(CiamCudTestCase, cls).setUpClass() - cls.app_config["authority"] = None - cls.app_config["oidc_authority"] = ( - # Derived from https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.63.0/tests/Microsoft.Identity.Test.Integration.netcore/HeadlessTests/CiamIntegrationTests.cs#L156 - "https://login.msidlabsciam.com/fe362aec-5d43-45d1-b730-9755e60dc3b9/v2.0") + # Override authority to use OIDC authority for CUD scenario + cls.ciam_authority = None # Clear standard authority + cls.oidc_authority = cls.CIAM_CUD_OIDC_AUTHORITY + + def test_ciam_acquire_token_interactive(self): + """Test interactive flow for CIAM CUD with OIDC authority.""" + self._test_acquire_token_interactive( + oidc_authority=self.oidc_authority, + client_id=self.app_config.app_id, + scope=self.app_config.defaultscopes, + username=self.user_config.upn, + lab_name=self.user_config.lab_name, + ) + + def test_ciam_acquire_token_for_client(self): + """Test client credentials flow for CIAM CUD using certificate with OIDC authority.""" + self._test_acquire_token_by_client_cert( + client_id=self.app_config.app_id, + oidc_authority=self.oidc_authority, + scope=[".default"], + ) + + def test_ciam_acquire_token_by_ropc(self): + """Test ROPC flow for CIAM CUD with OIDC authority.""" + password = get_user_password(self.user_config) + self._test_username_password( + oidc_authority=self.oidc_authority, + client_id=self.app_config.app_id, + username=self.user_config.upn, + password=password, + scope=["user.read"], + ) class WorldWideRegionalEndpointTestCase(LabBasedTestCase): + """Test regional endpoint behavior for confidential client applications. + + Regional endpoints are used for Azure-hosted applications to reduce latency. + These tests verify that MSAL correctly routes requests to regional vs global endpoints. + """ region = "westus" timeout = 2 # Short timeout makes this test case responsive on non-VM def _test_acquire_token_for_client(self, configured_region, expected_region): - """This is the only grant supported by regional endpoint, for now""" - self.app = get_lab_app( # Regional endpoint only supports confidential client - - ## FWIW, the MSAL<1.12 versions could use this to achieve similar result - #authority="https://westus.login.microsoft.com/microsoft.onmicrosoft.com", - #validate_authority=False, + """This is the only grant supported by regional endpoint, for now. + + Uses the lab app certificate for authentication. + """ + import os + from tests.lab_config import get_client_certificate + + # Get client ID from environment and certificate from lab_config + client_id = os.getenv("LAB_APP_CLIENT_ID") + if not client_id: + self.skipTest("LAB_APP_CLIENT_ID environment variable is required") + + client_credential = get_client_certificate() + + self.app = msal.ConfidentialClientApplication( + client_id, + client_credential=client_credential, authority="https://login.microsoftonline.com/microsoft.onmicrosoft.com", azure_region=configured_region, - timeout=2, # Short timeout makes this test case responsive on non-VM - ) + http_client=MinimalHttpClient(timeout=self.timeout), + ) scopes = ["https://graph.microsoft.com/.default"] with patch.object( # Test the request hit the regional endpoint @@ -1219,105 +1251,129 @@ def test_acquire_token_for_client_should_use_an_env_var_with_long_region_name(se msal.ConfidentialClientApplication.ATTEMPT_REGION_DISCOVERY, "eastus2") del os.environ["REGION_NAME"] - @unittest.skipUnless( - os.getenv("LAB_OBO_CLIENT_SECRET"), - "Need LAB_OBO_CLIENT_SECRET from https://aka.ms/GetLabSecret?Secret=TodoListServiceV2-OBO") - @unittest.skipUnless( - os.getenv("LAB_OBO_CONFIDENTIAL_CLIENT_ID"), - "Need LAB_OBO_CONFIDENTIAL_CLIENT_ID from https://identitydivision.visualstudio.com/Engineering/_git/IDLABS?path=/src/DocFX/LabDocs/flows/onbehalfofflow.md&_a=preview") - @unittest.skipUnless( - os.getenv("LAB_OBO_PUBLIC_CLIENT_ID"), - "Need LAB_OBO_PUBLIC_CLIENT_ID from https://identitydivision.visualstudio.com/Engineering/_git/IDLABS?path=/src/DocFX/LabDocs/flows/onbehalfofflow.md&_a=preview") def test_cca_obo_should_bypass_regional_endpoint_therefore_still_work(self): - """We test OBO because it is implemented in sub class ConfidentialClientApplication""" - config = self.get_lab_user(usertype="cloud") - - config_cca = {} - config_cca.update(config) - config_cca["client_id"] = os.getenv("LAB_OBO_CONFIDENTIAL_CLIENT_ID") - config_cca["scope"] = ["https://graph.microsoft.com/.default"] - config_cca["client_secret"] = os.getenv("LAB_OBO_CLIENT_SECRET") - - config_pca = {} - config_pca.update(config) - config_pca["client_id"] = os.getenv("LAB_OBO_PUBLIC_CLIENT_ID") - config_pca["password"] = self.get_lab_user_secret(config_pca["lab_name"]) - config_pca["scope"] = ["api://%s/read" % config_cca["client_id"]] + """We test OBO because it is implemented in sub class ConfidentialClientApplication. + + Regional endpoint does not directly support OBO, but MSAL should automatically + bypass regional endpoint for OBO calls. + """ + user = get_user_config(UserSecrets.PUBLIC_CLOUD) + password = get_user_password(user) + web_api_app = get_app_config(AppSecrets.WEB_API_CLIENT) + + # Step 1: PCA gets token for user to access the WebAPI + config_pca = { + "authority": "https://login.microsoftonline.com/organizations", + "client_id": web_api_app.app_id, + "username": user.upn, + "password": password, + "scope": ["api://%s/access_as_user" % web_api_app.app_id], + } + + # Step 2: WebAPI (CCA) exchanges the token via OBO for Graph access + config_cca = { + "authority": user.authority, + "client_id": web_api_app.app_id, + "client_secret": get_secret(web_api_app.client_secret, vault="msal_team"), + "scope": ["https://graph.microsoft.com/.default"], + "username": user.upn, + } self._test_acquire_token_obo( config_pca, config_cca, azure_region=self.region, http_client=MinimalHttpClient(timeout=self.timeout), - ) + ) - @unittest.skipUnless( - os.getenv("LAB_OBO_CLIENT_SECRET"), - "Need LAB_OBO_CLIENT_SECRET from https://aka.ms/GetLabSecret?Secret=TodoListServiceV2-OBO") - @unittest.skipUnless( - os.getenv("LAB_OBO_CONFIDENTIAL_CLIENT_ID"), - "Need LAB_OBO_CONFIDENTIAL_CLIENT_ID from https://identitydivision.visualstudio.com/Engineering/_git/IDLABS?path=/src/DocFX/LabDocs/flows/onbehalfofflow.md&_a=preview") def test_cca_ropc_should_bypass_regional_endpoint_therefore_still_work(self): - """We test ROPC because it is implemented in base class ClientApplication""" - config = self.get_lab_user(usertype="cloud") - config["password"] = self.get_lab_user_secret(config["lab_name"]) - # We repurpose the obo confidential app to test ROPC - # Swap in the OBO confidential app - config["client_id"] = os.getenv("LAB_OBO_CONFIDENTIAL_CLIENT_ID") - config["scope"] = ["https://graph.microsoft.com/.default"] - config["client_secret"] = os.getenv("LAB_OBO_CLIENT_SECRET") + """We test ROPC because it is implemented in base class ClientApplication. + + Regional endpoint does not directly support ROPC, but MSAL should automatically + bypass regional endpoint for ROPC calls. + """ + user = get_user_config(UserSecrets.PUBLIC_CLOUD) + password = get_user_password(user) + # Use the S2S app which supports ROPC with client secret + app = get_app_config(AppSecrets.S2S_CLIENT) + client_secret = get_secret(app.secret_name, vault="msal_team") + self._test_username_password( + authority=user.authority, + client_id=app.app_id, + username=user.upn, + password=password, + scope=["https://graph.microsoft.com/.default"], + client_secret=client_secret, azure_region=self.region, http_client=MinimalHttpClient(timeout=self.timeout), - **config) + ) class ArlingtonCloudTestCase(LabBasedTestCase): - environment = "azureusgovernment" - - def test_acquire_token_by_ropc(self): - config = self.get_lab_user(azureenvironment=self.environment) - config["password"] = self.get_lab_user_secret(config["lab_name"]) - self._test_username_password(**config) - - def test_acquire_token_by_client_secret(self): - config = self.get_lab_user(usertype="cloud", azureenvironment=self.environment, publicClient="no") - config["client_secret"] = self.get_lab_user_secret("ARLMSIDLAB1-IDLASBS-App-CC-Secret") - self._test_acquire_token_by_client_secret(**config) - - def test_acquire_token_obo(self): - config_cca = self.get_lab_user( - usertype="cloud", azureenvironment=self.environment, publicClient="no") - config_cca["scope"] = ["https://graph.microsoft.us/.default"] - config_cca["client_secret"] = self.get_lab_user_secret("ARLMSIDLAB1-IDLASBS-App-CC-Secret") + """Test cases for Azure US Government (Arlington) cloud. + + Arlington uses different endpoints (login.microsoftonline.us, graph.microsoft.us) + and requires specific test resources configured for the government cloud. + """ - config_pca = self.get_lab_user(usertype="cloud", azureenvironment=self.environment, publicClient="yes") - obo_app_object = self.get_lab_app_object( - usertype="cloud", azureenvironment=self.environment, publicClient="no") - config_pca["password"] = self.get_lab_user_secret(config_pca["lab_name"]) - config_pca["scope"] = ["{app_uri}/files.read".format(app_uri=obo_app_object.get("identifierUris"))] + @classmethod + def setUpClass(cls): + super(ArlingtonCloudTestCase, cls).setUpClass() + # Get Arlington user and app config from Key Vault + cls.user_config = get_user_config(UserSecrets.ARLINGTON) + cls.app_config = get_app_config(AppSecrets.ARLINGTON_CLIENT) + # Arlington authority uses login.microsoftonline.us + cls.arlington_authority = f"https://login.microsoftonline.us/{cls.user_config.tenant_id}" - self._test_acquire_token_obo(config_pca, config_cca) + def test_acquire_token_by_ropc(self): + """Test username/password flow for Arlington cloud.""" + password = get_user_password(self.user_config) + self._test_username_password( + authority=self.arlington_authority, + client_id=self.app_config.app_id, + username=self.user_config.upn, + password=password, + scope=["https://graph.microsoft.us/.default"], + ) def test_acquire_token_device_flow(self): - config = self.get_lab_user(usertype="cloud", azureenvironment=self.environment, publicClient="yes") - config["scope"] = ["user.read"] - self._test_device_flow(**config) + """Test device code flow for Arlington cloud.""" + self._test_device_flow( + authority=self.arlington_authority, + client_id=self.app_config.app_id, + scope=["user.read"], + username=self.user_config.upn, + lab_name=self.user_config.lab_name, + ) def test_acquire_token_silent_with_an_empty_cache_should_return_none(self): - config = self.get_lab_user( - usertype="cloud", azureenvironment=self.environment, publicClient="no") + """Test that acquire_token_silent returns None with empty cache. + + Note: An alias in this region is no longer accepting HTTPS traffic. + If this test case passes without exception, it means MSAL Python + is not affected by that. + """ + client_secret = get_secret(self.app_config.client_secret, vault="msal_team") app = msal.ConfidentialClientApplication( - config['client_id'], authority=config['authority'], - http_client=MinimalHttpClient()) - result = app.acquire_token_silent(scopes=config['scope'], account=None) + self.app_config.app_id, + client_credential=client_secret, + authority=self.arlington_authority, + http_client=MinimalHttpClient(), + ) + result = app.acquire_token_silent( + scopes=["https://graph.microsoft.us/.default"], + account=None, + ) self.assertEqual(result, None) - # Note: An alias in this region is no longer accepting HTTPS traffic. - # If this test case passes without exception, - # it means MSAL Python is not affected by that. @unittest.skipUnless(_PYMSALRUNTIME_INSTALLED, "AT POP feature is only supported by using broker") class PopTestCase(LabBasedTestCase): + """Test cases for Proof-of-Possession (POP) tokens using the broker. + + These tests require pymsalruntime to be installed. + """ + def test_at_pop_should_contain_pop_scheme_content(self): auth_scheme = msal.PopAuthScheme( http_method=msal.PopAuthScheme.HTTP_GET, @@ -1342,7 +1398,7 @@ def test_at_pop_should_contain_pop_scheme_content(self): # TODO: Remove this, as ROPC support is removed by Broker-on-Win def test_at_pop_via_testingsts_service(self): - """Based on https://testingsts.azurewebsites.net/ServerNonce""" + """Test AT POP via testingsts.azurewebsites.net nonce validation service.""" self.skipTest("ROPC support is removed by Broker-on-Win") auth_scheme = msal.PopAuthScheme( http_method="POST", @@ -1351,9 +1407,18 @@ def test_at_pop_via_testingsts_service(self): # TODO: Could use ".../missing" and then parse its WWW-Authenticate header "https://testingsts.azurewebsites.net/servernonce/get").text, ) - config = self.get_lab_user(usertype="cloud") - config["password"] = self.get_lab_user_secret(config["lab_name"]) - result = self._test_username_password(auth_scheme=auth_scheme, **config) + # Use Key Vault config for user and app + user = get_user_config(UserSecrets.PUBLIC_CLOUD) + password = get_user_password(user) + app = get_app_config(AppSecrets.PCA_CLIENT) + result = self._test_username_password( + authority=user.authority, + client_id=app.app_id, + username=user.upn, + password=password, + scope=["User.Read"], + auth_scheme=auth_scheme, + ) self.assertEqual(result["token_type"], "pop") shr = result["access_token"] payload = json.loads(decode_part(result["access_token"].split(".")[1])) @@ -1371,9 +1436,11 @@ def test_at_pop_via_testingsts_service(self): self.assertEqual(validation.status_code, 200) def test_at_pop_calling_pattern(self): - # The calling pattern was described here: - # https://identitydivision.visualstudio.com/DevEx/_git/AuthLibrariesApiReview?path=/PoPTokensProtocol/PoP_API_In_MSAL.md&_a=preview&anchor=proposal-2---optional-isproofofposessionsupportedbyclient-helper-(accepted) - + """Test the POP calling pattern with Key Vault config. + + The calling pattern was described here: + https://identitydivision.visualstudio.com/DevEx/_git/AuthLibrariesApiReview?path=/PoPTokensProtocol/PoP_API_In_MSAL.md + """ # It is supposed to call app.is_pop_supported() first, # and then fallback to bearer token code path. # We skip it here because this test case has not yet initialize self.app @@ -1392,15 +1459,21 @@ def test_at_pop_calling_pattern(self): # @suppress py/bandit/requests-ssl-verify-disabled resp = requests.get(api_endpoint, verify=verify) # CodeQL [SM03157] self.assertEqual(resp.status_code, 401, "Initial call should end with an http 401 error") - result = self._get_shr_pop(**dict( - self.get_lab_user(usertype="cloud"), # This is generally not the current laptop's default AAD account + + # Use Key Vault config instead of Lab API + user = get_user_config(UserSecrets.PUBLIC_CLOUD) + result = self._get_shr_pop( + client_id=get_app_config(AppSecrets.PCA_CLIENT).app_id, + authority=user.authority, scope=["https://graph.microsoft.com/.default"], + username=user.upn, + lab_name=user.lab_name, auth_scheme=msal.PopAuthScheme( http_method=msal.PopAuthScheme.HTTP_GET, url=api_endpoint, nonce=self._extract_pop_nonce(resp.headers.get("WWW-Authenticate")), - ), - )) + ), + ) resp = requests.get( api_endpoint, # CodeQL [SM03157]