Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Duo Universal Prompt support to Okta Classic #437

Merged
merged 2 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,11 @@ A configuration wizard will prompt you to enter the necessary configuration para
- email - OTP via email
- web - DUO uses localhost webbrowser to support push|call|passcode
- passcode - DUO uses `OKTA_MFA_CODE` or `--mfa-code` if set, or prompts user for passcode(OTP).

- claims_provider - DUO Universal Prompt
- duo_universal_factor - (optional) Configure which type of factor to use with Duo Universal Prompt. Must be one of (case-sensitive):
- `Duo Push` (default)
- `Passcode`
- `Phone Call`
- resolve_aws_alias - y or n. If yes, gimme-aws-creds will try to resolve AWS account ids with respective alias names (default: n). This option can also be set interactively in the command line using `-r` or `--resolve` parameter
- include_path - (optional) Includes full role path to the role name in AWS credential profile name. (default: n). If `y`: `<acct>-/some/path/administrator`. If `n`: `<acct>-administrator`
- remember_device - y or n. If yes, the MFA device will be remembered by Okta service for a limited time. This option can also be set interactively in the command line using `-m` or `--remember-device`
Expand Down
197 changes: 197 additions & 0 deletions gimme_aws_creds/duo_universal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
import time

import html5lib
from furl import furl

from . import version


class DuoMfaDenied(BaseException):
""" Duo MFA was denied """

def __init__(self, response):
super(DuoMfaDenied, self).__init__(f'Duo MFA denied: {response}')


class OktaDuoUniversal:
""" Handles interaction with the Duo Universal Prompt """

def __init__(self, ui, session, state_token, okta_factor, remember_device, duo_factor='Duo Push', duo_passcode=None):
self.ui = ui
self.state_token = state_token
self.okta_factor = okta_factor
self.remember_device = remember_device
self.session = session
if duo_factor not in ['Duo Push', 'Passcode', 'Phone Call']:
raise Exception('Preferred Duo Universal factor must be one of: Duo Push, Passcode, Phone Call')
self.duo_factor = duo_factor
self.duo_passcode = duo_passcode

def do_auth(self):
""" Follow Duo Universal Prompt flow through to an active Okta user session """

duo_prompt_url, okta_profile_login = self._initiate_okta_factor_verification()
duo_origin, duo_plugin_form_response = self._handle_duo_plugin_form(duo_prompt_url)

# Submit second Duo form (login-form), which triggers a Duo Push, phone call, or accepts the Passcode
login_form_action, duo_login_form_data = self._get_duo_universal_login_form_data(duo_plugin_form_response)
login_form_action_url = furl(duo_origin) / login_form_action
duo_factor, duo_sid, duo_txid, duo_xsrf = self._submit_duo_login_form(duo_login_form_data,
login_form_action_url)

self.ui.info(f"Duo Universal: Using {self.duo_factor}...")

self._wait_for_duo_universal_transaction(duo_origin, duo_txid, duo_sid)

# Once Duo has been approved, load the OIDC exit URL to be redirected to Okta and gain a user session
oidc_exit_url = furl(duo_origin) / 'frame/v4/oidc/exit'
exit_headers = self._get_form_headers()
exit_response = self.session.post(
oidc_exit_url.url,
data={
'txid': duo_txid,
'sid': duo_sid,
'factor': duo_factor,
'_xsrf': duo_xsrf,
'device_key': '',
'dampen_choice': 'false',
},
headers=exit_headers,
)
exit_response.raise_for_status()

# The claims_provider factor immediately yields an active user session, no subsequent request for SID required.
return {
'apiResponse': {
'status': 'SUCCESS',
'userSession': {
"username": okta_profile_login,
"session": self.session.cookies['sid'],
"device_token": self.session.cookies['DT']
}
},
}

def _submit_duo_login_form(self, duo_login_form_data, login_form_action_url):
# Submit Duo's form id=login-form, which triggers a Duo Push, phone call, or accepts a Passcode.
duo_login_form_response = self.session.post(
login_form_action_url.url,
data=duo_login_form_data,
headers=self._get_form_headers(),
)
duo_login_form_response.raise_for_status()
duo_sid = duo_login_form_data['sid']
duo_factor = duo_login_form_data['factor']
duo_xsrf = duo_login_form_data['_xsrf']
duo_login_response_data = duo_login_form_response.json()
if duo_login_response_data['stat'] != 'OK':
raise Exception(f"Triggering Duo MFA failed: {duo_login_form_response.content}")
duo_txid = duo_login_response_data['response']['txid']
return duo_factor, duo_sid, duo_txid, duo_xsrf

def _handle_duo_plugin_form(self, duo_prompt_url):
# Request Duo prompt
verify_get_response = self.session.get(
duo_prompt_url,
)
verify_get_response.raise_for_status()
duo_origin = furl(verify_get_response.url).origin
# Submit first Duo form (plugin_form)
form_data = self._get_duo_universal_plugin_form_data(verify_get_response)
duo_plugin_form_response = self.session.post(
verify_get_response.url,
data=form_data,
headers=self._get_form_headers(),
)
duo_plugin_form_response.raise_for_status()
return duo_origin, duo_plugin_form_response

def _initiate_okta_factor_verification(self):
# POST to the Okta factor verify URL gives us the URL to request to load Duo
verify_post_response = self.session.post(
self.okta_factor['_links']['verify']['href'],
params={'rememberDevice': self.remember_device},
json={'stateToken': self.state_token},
)
verify_post_response.raise_for_status()
verify_response_data = verify_post_response.json()
duo_prompt_url = verify_response_data['_links']['next']['href']
okta_profile_login = verify_response_data['_embedded']['user']['profile']['login']
return duo_prompt_url, okta_profile_login

def _wait_for_duo_universal_transaction(self, duo_host, txid, sid):
status_url = furl(duo_host) / 'frame/v4/status'
status_data = {
'txid': txid,
'sid': sid
}
headers = self._get_form_headers()

tries = 0
while tries < 16:
tries += 1
time.sleep(0.5)

status_response = self.session.post(
status_url.url,
data=status_data,
headers=headers,
)
status_response.raise_for_status()

json_response = status_response.json()
if json_response['stat'] != 'OK':
raise Exception(f"Error checking Duo MFA status: {status_response.text}")

if json_response['response']['status_code'] == 'allow':
return txid
if json_response['response']['status_code'] == 'deny':
raise DuoMfaDenied(json_response)

raise Exception('Timed out waiting for Duo MFA')

@staticmethod
def _get_form_headers():
form_headers = {
'User-Agent': "gimme-aws-creds {}".format(version),
'Accept': 'application/json',
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8'
}
return form_headers

def _get_duo_universal_login_form_data(self, plugin_form_response):
""" Get form data to post when submitting the Duo login-form """

doc = html5lib.parse(plugin_form_response.content, namespaceHTMLElements=False)
form_action = doc.find('.//form[@id="login-form"]').get('action')
form_data = {}
for field in doc.iterfind('.//form[@id="login-form"]/input'):
form_data[field.get('name')] = field.get('value')

preferred_device = self._find_device_to_use(doc)

form_data['factor'] = self.duo_factor
form_data['device'] = preferred_device
form_data['postAuthDestination'] = 'OIDC_EXIT'
if self.duo_passcode:
form_data['passcode'] = self.duo_passcode

return form_action, form_data

@staticmethod
def _find_device_to_use(doc):
device = doc.find('.//input[@name="preferred_device"]').get('value')
if device is None or device == '':
device = doc.find('.//select[@name="device"]/option').get('value')
return device

@staticmethod
def _get_duo_universal_plugin_form_data(response):
""" Get form data to post when submitting the Duo plugin_form """

doc = html5lib.parse(response.content, namespaceHTMLElements=False)
form_data = {}
for field in doc.iterfind('.//form[@id="plugin_form"]/input'):
form_data[field.get('name')] = field.get('value')

return form_data
3 changes: 3 additions & 0 deletions gimme_aws_creds/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,9 @@ def okta(self):
if self.conf_dict.get('preferred_mfa_type'):
okta.set_preferred_mfa_type(self.conf_dict['preferred_mfa_type'])

if self.conf_dict.get('duo_universal_factor'):
okta.set_duo_universal_factor(self.conf_dict.get('duo_universal_factor'))

if self.config.mfa_code is not None:
okta.set_mfa_code(self.config.mfa_code)
elif self.conf_dict.get('okta_mfa_code'):
Expand Down
73 changes: 49 additions & 24 deletions gimme_aws_creds/okta_classic.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from gimme_aws_creds.u2f import FactorU2F
from gimme_aws_creds.webauthn import WebAuthnClient, FakeAssertion
from . import errors, ui, version, duo
from .duo_universal import OktaDuoUniversal
from .errors import GimmeAWSCredsMFAEnrollStatus
from .registered_authenticators import RegisteredAuthenticators

Expand Down Expand Up @@ -62,6 +63,7 @@ def __init__(self, gac_ui, okta_org_url, verify_ssl_certs=True, device_token=Non
self._username = None
self._password = None
self._preferred_mfa_type = None
self._duo_universal_factor = 'Duo Push'
self._mfa_code = None
self._remember_device = None

Expand Down Expand Up @@ -104,6 +106,9 @@ def set_preferred_mfa_type(self, preferred_mfa_type):
def set_mfa_code(self, mfa_code):
self._mfa_code = mfa_code

def set_duo_universal_factor(self, duo_universal_factor):
self._duo_universal_factor = duo_universal_factor

def set_remember_device(self, remember_device):
self._remember_device = bool(remember_device)

Expand Down Expand Up @@ -158,30 +163,33 @@ def auth_session(self, **kwargs):
""" Authenticate the user and return the Okta Session ID and username"""
login_response = self.auth()

session_url = self._okta_org_url + '/login/sessionCookieRedirect'

if 'redirect_uri' not in kwargs:
redirect_uri = 'http://localhost:8080/login'
if 'userSession' in login_response:
return login_response['userSession']
else:
redirect_uri = kwargs['redirect_uri']
session_url = self._okta_org_url + '/login/sessionCookieRedirect'

params = {
'token': login_response['sessionToken'],
'redirectUrl': redirect_uri
}
if 'redirect_uri' not in kwargs:
redirect_uri = 'http://localhost:8080/login'
else:
redirect_uri = kwargs['redirect_uri']

response = self._http_client.get(
session_url,
params=params,
headers=self._get_headers(),
verify=self._verify_ssl_certs,
allow_redirects=False
)
return {
"username": login_response['_embedded']['user']['profile']['login'],
"session": response.cookies['sid'],
"device_token": self._http_client.cookies['DT']
}
params = {
'token': login_response['sessionToken'],
'redirectUrl': redirect_uri
}

response = self._http_client.get(
session_url,
params=params,
headers=self._get_headers(),
verify=self._verify_ssl_certs,
allow_redirects=False
)
return {
"username": login_response['_embedded']['user']['profile']['login'],
"session": response.cookies['sid'],
"device_token": self._http_client.cookies['DT']
}

def auth_oauth(self, client_id, **kwargs):
""" Login to Okta and retrieve access token, ID token or both """
Expand Down Expand Up @@ -451,6 +459,19 @@ def _login_send_push(self, state_token, factor):
if 'sessionToken' in response_data:
return {'stateToken': None, 'sessionToken': response_data['sessionToken'], 'apiResponse': response_data}

def _login_duo_universal(self, state_token, factor):
duo_passcode = None
if self._duo_universal_factor == 'Passcode':
duo_passcode = self.ui.input(message='Duo Passcode: ')
duo_client = OktaDuoUniversal(self.ui,
self._http_client,
state_token,
factor,
self._remember_device,
self._duo_universal_factor,
duo_passcode)
return duo_client.do_auth()

def _login_input_webauthn_challenge(self, state_token, factor):
""" Retrieve nonce """
response = self._http_client.post(
Expand Down Expand Up @@ -593,6 +614,8 @@ def _login_multi_factor(self, state_token, login_data):
return self._login_input_webauthn_challenge(state_token, factor)
elif factor['factorType'] == 'token:hardware':
return self._login_input_mfa_challenge(state_token, factor['_links']['verify']['href'])
elif factor['factorType'] == 'claims_provider':
return self._login_duo_universal(state_token, factor)

def _login_input_mfa_challenge(self, state_token, next_url):
""" Submit verification code for SMS or TOTP authentication methods"""
Expand Down Expand Up @@ -717,7 +740,7 @@ def _check_webauthn_result(self, state_token, login_data):
else:
return {'stateToken': None, 'sessionToken': None, 'apiResponse': response_data}

def get_saml_response(self, url, auth_session = None):
def get_saml_response(self, url, auth_session=None):
""" return the base64 SAML value object from the SAML Response"""
response = self._http_client.get(url, verify=self._verify_ssl_certs)
response.raise_for_status()
Expand Down Expand Up @@ -888,6 +911,8 @@ def _build_factor_name(self, factor):
return factor['factorType'] + ": " + factor_name
elif factor['factorType'] == 'token:hardware':
return factor['factorType'] + ": " + factor['provider']
elif factor['factorType'] == 'claims_provider':
return factor['factorType'] + ": " + factor['vendorName']

else:
return "Unknown MFA type: " + factor['factorType']
Expand Down Expand Up @@ -1063,7 +1088,7 @@ def _introspect_factors(self, state_token):
@staticmethod
def _extract_state_token_from_http_response(http_res):
# extract the stateToken from a javascript variable
state_token_re = re.search(r"var stateToken = '(.*)';", http_res.text)
state_token_re = re.search(r"var stateToken = '(.*)';", http_res.text)
if state_token_re is not None:
return decode(state_token_re.group(1), "unicode-escape")

Expand All @@ -1074,4 +1099,4 @@ def _extract_state_token_from_http_response(http_res):
state_token_re = re.search(r"stateToken=(.*?[ \"])", http_res.text)
if state_token_re is not None:
pre_state_token = decode(state_token_re.group(1), "unicode-escape")
return pre_state_token.rstrip('\"')
return pre_state_token.rstrip('\"')
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ okta>=0.0.4,<1.0.0
ctap-keyring-device==1.0.6
pyjwt>=2.4.0,<3.0.0
urllib3>=1.26.0,<2.0.0
html5lib>=1.1,<2.0.0
furl>=2.1.3,<3.0.0
8 changes: 8 additions & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import os


def read_fixture(file_name):
"""Read a fixture file"""
fixture_path = os.path.join(os.path.dirname(__file__), 'fixtures', file_name)
with open(fixture_path, 'r', encoding='utf-8') as file:
return file.read()
Loading