Skip to content

Commit

Permalink
Only refresh tokens if variable set
Browse files Browse the repository at this point in the history
  • Loading branch information
rad-pat committed Jun 25, 2024
1 parent 7c56fce commit c7cd1ad
Showing 1 changed file with 28 additions and 51 deletions.
79 changes: 28 additions & 51 deletions plaid/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@


log = logging.getLogger(__name__)
USE_REFRESH_TOKENS = False


def get_project_role_name(project_id: str) -> str:
Expand Down Expand Up @@ -395,7 +396,8 @@ def set_oauth_session(self, provider, oauth_response):
Set the current session with OAuth token dict
"""
# Save users token_dict on encrypted session cookie
session["oauth_token_dict"] = oauth_response
if USE_REFRESH_TOKENS:
session["oauth_token_dict"] = oauth_response
super().set_oauth_session(provider, oauth_response)

def has_oauth_token(self):
Expand All @@ -410,36 +412,35 @@ def _internal_validate():
try:
if self.auth_type == AUTH_OAUTH:
if 'oauth' in session:
# token, secret = session['oauth']
# if token_is_valid(token):
# return True
# to do the below, it needs custom `set_oauth_session` to save the `oauth_token_dict`
provider = session["oauth_provider"]
token_dict = session['oauth_token_dict']
logging.info('Provider %s, Token %s', provider, token_dict)
# this will refresh the token if it is expired (via `token_update` listener)
self.appbuilder.sm.oauth_remotes[provider].token = token_dict
user_resp = self.appbuilder.sm.oauth_remotes[provider].get("userinfo")
user_resp.raise_for_status()
logging.info('Got user response')

# Basic validation of token expiry
token, secret = session['oauth']
if token_is_valid(token):
return True

#ToDo - I could not get introspection to work, I was calling from FlaskOAuth2App, but needs to be and OAuth2Session which is the _get_oauth_client() of the Flask thing
# maybe we don't need to introspect anyway, can just check expiry.

# # new token now stored in session
# token_dict = session['oauth_token_dict']
# logging.info('Provider %s, Revised Token %s', provider, token_dict)
# token_endpoint = self.appbuilder.sm.oauth.plaidkeycloak.access_token_url
# intro_resp = self.appbuilder.sm.oauth_remotes[provider].introspect_token(token_endpoint, token=token_dict)
# intro_resp.raise_for_status()
# logging.info('Did introspection')
# token_info = intro_resp.json()
# if token_info['active']:
# return True
if USE_REFRESH_TOKENS:
# to do the below, it needs custom `set_oauth_session` to save the `oauth_token_dict`
provider = session["oauth_provider"]
token_dict = session['oauth_token_dict']
logging.info('Provider %s, Token %s', provider, token_dict)
# this will refresh the token if it is expired (via `token_update` listener)
self.appbuilder.sm.oauth_remotes[provider].token = token_dict
user_resp = self.appbuilder.sm.oauth_remotes[provider].get("userinfo")
user_resp.raise_for_status()
logging.info('Got user response')

#ToDo - I could not get introspection to work, I was calling from FlaskOAuth2App, but needs to be and OAuth2Session which is the _get_oauth_client() of the Flask thing
# maybe we don't need to introspect anyway, can just check expiry.

# # new token now stored in session
# token_dict = session['oauth_token_dict']
# logging.info('Provider %s, Revised Token %s', provider, token_dict)
# token_endpoint = self.appbuilder.sm.oauth.plaidkeycloak.access_token_url
# intro_resp = self.appbuilder.sm.oauth_remotes[provider].introspect_token(token_endpoint, token=token_dict)
# intro_resp.raise_for_status()
# logging.info('Did introspection')
# token_info = intro_resp.json()
# if token_info['active']:
# return True

elif self.auth_type == AUTH_OID:
if 'token' in session:
Expand All @@ -459,30 +460,6 @@ def _internal_validate():
session.clear()
return result

#
# def has_access(self, permission_name: str, view_name: str) -> bool:
# def logout_and_clear():
# logout_user()
# session.clear()
# return False
# # check token expiry and logout, then continue previous auth check
# if self.auth_type == AUTH_OAUTH:
# if 'oauth' not in session:
# return logout_and_clear()
# token, secret = session['oauth']
# # provider = session["oauth_provider"]
# if not token_is_valid(token):
# return logout_and_clear()
#
# elif self.auth_type == AUTH_OID:
# if 'token' not in session:
# return logout_and_clear()
# token = session['token']
# if not token_is_valid(token):
# return logout_and_clear()
#
# return super().has_access(permission_name, view_name)


def token_is_valid(access_token):
try:
Expand Down

0 comments on commit c7cd1ad

Please sign in to comment.