From 642ea6525b5bacad46a83f54ed6d62e87e92ef1f Mon Sep 17 00:00:00 2001 From: Ray Luo Date: Thu, 7 Apr 2022 19:50:41 -0700 Subject: [PATCH] Add test case to show that OBO supports SP --- tests/test_e2e.py | 101 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 80 insertions(+), 21 deletions(-) diff --git a/tests/test_e2e.py b/tests/test_e2e.py index 9a971f46..f0fb226d 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -570,19 +570,27 @@ def _test_acquire_token_obo(self, config_pca, config_cca, # Here we just test regional apps won't adversely break OBO http_client=None, ): - # 1. An app obtains a token representing a user, for our mid-tier service - pca = msal.PublicClientApplication( - config_pca["client_id"], authority=config_pca["authority"], - azure_region=azure_region, - http_client=http_client or MinimalHttpClient()) - pca_result = pca.acquire_token_by_username_password( - config_pca["username"], - config_pca["password"], - scopes=config_pca["scope"], - ) - self.assertIsNotNone( - pca_result.get("access_token"), - "PCA failed to get AT because %s" % json.dumps(pca_result, indent=2)) + if "client_secret" not in config_pca: + # 1.a An app obtains a token representing a user, for our mid-tier service + result = msal.PublicClientApplication( + config_pca["client_id"], authority=config_pca["authority"], + azure_region=azure_region, + http_client=http_client or MinimalHttpClient(), + ).acquire_token_by_username_password( + config_pca["username"], config_pca["password"], + scopes=config_pca["scope"], + ) + else: # We repurpose the config_pca to contain client_secret for cca app 1 + # 1.b An app obtains a token representing itself, for our mid-tier service + result = msal.ConfidentialClientApplication( + config_pca["client_id"], authority=config_pca["authority"], + client_credential=config_pca["client_secret"], + azure_region=azure_region, + http_client=http_client or MinimalHttpClient(), + ).acquire_token_for_client(scopes=config_pca["scope"]) + assertion = result.get("access_token") + self.assertIsNotNone(assertion, "First app failed to get AT. {}".format( + json.dumps(result, indent=2))) # 2. Our mid-tier service uses OBO to obtain a token for downstream service cca = msal.ConfidentialClientApplication( @@ -595,9 +603,9 @@ def _test_acquire_token_obo(self, config_pca, config_cca, # That's fine if OBO app uses short-lived msal instance per session. # Otherwise, the OBO app need to implement a one-cache-per-user setup. ) - cca_result = cca.acquire_token_on_behalf_of( - pca_result['access_token'], config_cca["scope"]) - self.assertNotEqual(None, cca_result.get("access_token"), str(cca_result)) + cca_result = cca.acquire_token_on_behalf_of(assertion, config_cca["scope"]) + self.assertIsNotNone(cca_result.get("access_token"), "OBO call failed: {}".format( + json.dumps(cca_result, indent=2))) # 3. Now the OBO app can simply store downstream token(s) in same session. # Alternatively, if you want to persist the downstream AT, and possibly @@ -606,13 +614,27 @@ def _test_acquire_token_obo(self, config_pca, config_cca, # Assuming you already did that (which is not shown in this test case), # the following part shows one of the ways to obtain an AT from cache. username = cca_result.get("id_token_claims", {}).get("preferred_username") - if username: # It means CCA have requested an IDT w/ "profile" scope - self.assertEqual(config_cca["username"], username) accounts = cca.get_accounts(username=username) - assert len(accounts) == 1, "App is expected to partition token cache per user" - account = accounts[0] + if username is not None: # It means CCA have requested an IDT w/ "profile" scope + assert config_cca["username"] == username, "Incorrect test case configuration" + self.assertEqual(1, len(accounts), "App is supposed to partition token cache per user") + account = accounts[0] # Alternatively, cca app could just loop through each account result = cca.acquire_token_silent(config_cca["scope"], account) - self.assertEqual(cca_result["access_token"], result["access_token"]) + self.assertTrue( + result and result.get("access_token") == cca_result["access_token"], + "CCA should hit an access token from cache: {}".format( + json.dumps(cca.token_cache._cache, indent=2))) + if "refresh_token" in cca_result: + result = cca.acquire_token_silent( + config_cca["scope"], account=account, force_refresh=True) + self.assertTrue( + result and "access_token" in result, + "CCA should get an AT silently, but we got this instead: {}".format(result)) + self.assertNotEqual( + result["access_token"], cca_result["access_token"], + "CCA should get a new AT") + else: + logger.info("AAD did not issue a RT for OBO flow") def _test_acquire_token_by_client_secret( self, client_id=None, client_secret=None, authority=None, scope=None, @@ -623,6 +645,18 @@ def _test_acquire_token_by_client_secret( http_client=MinimalHttpClient()) result = app.acquire_token_for_client(scope) self.assertIsNotNone(result.get("access_token"), "Got %s instead" % result) + result2 = app.acquire_token_silent(scope, account=None) + self.assertEqual( + result2.get("access_token"), result["access_token"], + "CCA should hit an access token from cache: {}".format( + json.dumps(app.token_cache._cache, indent=2)) + ) + if "refresh_token" in result: # Empirically, RT is unavailable, but just in case... + result3 = app.acquire_token_silent(scope, account=None, force_refresh=True) + error_message = "CCA should get a new AT via RT in cache: {}".format( + json.dumps(app.token_cache._cache, indent=2)) + self.assertIsNotNone(result3, error_message) + self.assertNotEqual(result3.get("access_token"), result["access_token"], error_message) class WorldWideTestCase(LabBasedTestCase): @@ -732,6 +766,31 @@ def test_acquire_token_obo(self): self._test_acquire_token_obo(config_pca, config_cca) + @unittest.skipUnless( + os.path.exists("tests/sp_obo.pem"), + "Need a 'tests/sp_obo.pem' private to run OBO for SP test") + def test_acquire_token_obo_for_sp(self): + authority = "https://login.windows-ppe.net/f686d426-8d16-42db-81b7-ab578e110ccd" + with open("tests/sp_obo.pem") as pem: + client_secret = { + "private_key": pem.read(), + "thumbprint": "378938210C976692D7F523B8C4FFBB645D17CE92", + } + midtier_app = { + "authority": authority, + "client_id": "c84e9c32-0bc9-4a73-af05-9efe9982a322", + "client_secret": client_secret, + "scope": ["23d08a1e-1249-4f7c-b5a5-cb11f29b6923/.default"], + #"username": "OBO-Client-PPE", # We do NOT attempt locating initial_app by name + } + initial_app = { + "authority": authority, + "client_id": "9793041b-9078-4942-b1d2-babdc472cc0c", + "client_secret": client_secret, + "scope": [midtier_app["client_id"] + "/.default"], + } + self._test_acquire_token_obo(initial_app, midtier_app) + def test_acquire_token_by_client_secret(self): # Vastly different than ArlingtonCloudTestCase.test_acquire_token_by_client_secret() _app = self.get_lab_app_object(