Skip to content

Commit

Permalink
Add test case to show that OBO supports SP
Browse files Browse the repository at this point in the history
  • Loading branch information
rayluo committed Jun 8, 2022
1 parent 85f4f9e commit 642ea65
Showing 1 changed file with 80 additions and 21 deletions.
101 changes: 80 additions & 21 deletions tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,19 +570,27 @@ def _test_acquire_token_obo(self, config_pca, config_cca,
# Here we just test regional apps won't adversely break OBO
http_client=None,
):
# 1. An app obtains a token representing a user, for our mid-tier service
pca = msal.PublicClientApplication(
config_pca["client_id"], authority=config_pca["authority"],
azure_region=azure_region,
http_client=http_client or MinimalHttpClient())
pca_result = pca.acquire_token_by_username_password(
config_pca["username"],
config_pca["password"],
scopes=config_pca["scope"],
)
self.assertIsNotNone(
pca_result.get("access_token"),
"PCA failed to get AT because %s" % json.dumps(pca_result, indent=2))
if "client_secret" not in config_pca:
# 1.a An app obtains a token representing a user, for our mid-tier service
result = msal.PublicClientApplication(
config_pca["client_id"], authority=config_pca["authority"],
azure_region=azure_region,
http_client=http_client or MinimalHttpClient(),
).acquire_token_by_username_password(
config_pca["username"], config_pca["password"],
scopes=config_pca["scope"],
)
else: # We repurpose the config_pca to contain client_secret for cca app 1
# 1.b An app obtains a token representing itself, for our mid-tier service
result = msal.ConfidentialClientApplication(
config_pca["client_id"], authority=config_pca["authority"],
client_credential=config_pca["client_secret"],
azure_region=azure_region,
http_client=http_client or MinimalHttpClient(),
).acquire_token_for_client(scopes=config_pca["scope"])
assertion = result.get("access_token")
self.assertIsNotNone(assertion, "First app failed to get AT. {}".format(
json.dumps(result, indent=2)))

# 2. Our mid-tier service uses OBO to obtain a token for downstream service
cca = msal.ConfidentialClientApplication(
Expand All @@ -595,9 +603,9 @@ def _test_acquire_token_obo(self, config_pca, config_cca,
# That's fine if OBO app uses short-lived msal instance per session.
# Otherwise, the OBO app need to implement a one-cache-per-user setup.
)
cca_result = cca.acquire_token_on_behalf_of(
pca_result['access_token'], config_cca["scope"])
self.assertNotEqual(None, cca_result.get("access_token"), str(cca_result))
cca_result = cca.acquire_token_on_behalf_of(assertion, config_cca["scope"])
self.assertIsNotNone(cca_result.get("access_token"), "OBO call failed: {}".format(
json.dumps(cca_result, indent=2)))

# 3. Now the OBO app can simply store downstream token(s) in same session.
# Alternatively, if you want to persist the downstream AT, and possibly
Expand All @@ -606,13 +614,27 @@ def _test_acquire_token_obo(self, config_pca, config_cca,
# Assuming you already did that (which is not shown in this test case),
# the following part shows one of the ways to obtain an AT from cache.
username = cca_result.get("id_token_claims", {}).get("preferred_username")
if username: # It means CCA have requested an IDT w/ "profile" scope
self.assertEqual(config_cca["username"], username)
accounts = cca.get_accounts(username=username)
assert len(accounts) == 1, "App is expected to partition token cache per user"
account = accounts[0]
if username is not None: # It means CCA have requested an IDT w/ "profile" scope
assert config_cca["username"] == username, "Incorrect test case configuration"
self.assertEqual(1, len(accounts), "App is supposed to partition token cache per user")
account = accounts[0] # Alternatively, cca app could just loop through each account
result = cca.acquire_token_silent(config_cca["scope"], account)
self.assertEqual(cca_result["access_token"], result["access_token"])
self.assertTrue(
result and result.get("access_token") == cca_result["access_token"],
"CCA should hit an access token from cache: {}".format(
json.dumps(cca.token_cache._cache, indent=2)))
if "refresh_token" in cca_result:
result = cca.acquire_token_silent(
config_cca["scope"], account=account, force_refresh=True)
self.assertTrue(
result and "access_token" in result,
"CCA should get an AT silently, but we got this instead: {}".format(result))
self.assertNotEqual(
result["access_token"], cca_result["access_token"],
"CCA should get a new AT")
else:
logger.info("AAD did not issue a RT for OBO flow")

def _test_acquire_token_by_client_secret(
self, client_id=None, client_secret=None, authority=None, scope=None,
Expand All @@ -623,6 +645,18 @@ def _test_acquire_token_by_client_secret(
http_client=MinimalHttpClient())
result = app.acquire_token_for_client(scope)
self.assertIsNotNone(result.get("access_token"), "Got %s instead" % result)
result2 = app.acquire_token_silent(scope, account=None)
self.assertEqual(
result2.get("access_token"), result["access_token"],
"CCA should hit an access token from cache: {}".format(
json.dumps(app.token_cache._cache, indent=2))
)
if "refresh_token" in result: # Empirically, RT is unavailable, but just in case...
result3 = app.acquire_token_silent(scope, account=None, force_refresh=True)
error_message = "CCA should get a new AT via RT in cache: {}".format(
json.dumps(app.token_cache._cache, indent=2))
self.assertIsNotNone(result3, error_message)
self.assertNotEqual(result3.get("access_token"), result["access_token"], error_message)


class WorldWideTestCase(LabBasedTestCase):
Expand Down Expand Up @@ -732,6 +766,31 @@ def test_acquire_token_obo(self):

self._test_acquire_token_obo(config_pca, config_cca)

@unittest.skipUnless(
os.path.exists("tests/sp_obo.pem"),
"Need a 'tests/sp_obo.pem' private to run OBO for SP test")
def test_acquire_token_obo_for_sp(self):
authority = "https://login.windows-ppe.net/f686d426-8d16-42db-81b7-ab578e110ccd"
with open("tests/sp_obo.pem") as pem:
client_secret = {
"private_key": pem.read(),
"thumbprint": "378938210C976692D7F523B8C4FFBB645D17CE92",
}
midtier_app = {
"authority": authority,
"client_id": "c84e9c32-0bc9-4a73-af05-9efe9982a322",
"client_secret": client_secret,
"scope": ["23d08a1e-1249-4f7c-b5a5-cb11f29b6923/.default"],
#"username": "OBO-Client-PPE", # We do NOT attempt locating initial_app by name
}
initial_app = {
"authority": authority,
"client_id": "9793041b-9078-4942-b1d2-babdc472cc0c",
"client_secret": client_secret,
"scope": [midtier_app["client_id"] + "/.default"],
}
self._test_acquire_token_obo(initial_app, midtier_app)

def test_acquire_token_by_client_secret(self):
# Vastly different than ArlingtonCloudTestCase.test_acquire_token_by_client_secret()
_app = self.get_lab_app_object(
Expand Down

0 comments on commit 642ea65

Please sign in to comment.