diff --git a/plaid/security.py b/plaid/security.py index ca9fbe066ebba..68e06c51ae603 100644 --- a/plaid/security.py +++ b/plaid/security.py @@ -32,6 +32,7 @@ log = logging.getLogger(__name__) +USE_REFRESH_TOKENS = False def get_project_role_name(project_id: str) -> str: @@ -395,7 +396,8 @@ def set_oauth_session(self, provider, oauth_response): Set the current session with OAuth token dict """ # Save users token_dict on encrypted session cookie - session["oauth_token_dict"] = oauth_response + if USE_REFRESH_TOKENS: + session["oauth_token_dict"] = oauth_response super().set_oauth_session(provider, oauth_response) def has_oauth_token(self): @@ -410,36 +412,35 @@ def _internal_validate(): try: if self.auth_type == AUTH_OAUTH: if 'oauth' in session: - # token, secret = session['oauth'] - # if token_is_valid(token): - # return True - # to do the below, it needs custom `set_oauth_session` to save the `oauth_token_dict` - provider = session["oauth_provider"] - token_dict = session['oauth_token_dict'] - logging.info('Provider %s, Token %s', provider, token_dict) - # this will refresh the token if it is expired (via `token_update` listener) - self.appbuilder.sm.oauth_remotes[provider].token = token_dict - user_resp = self.appbuilder.sm.oauth_remotes[provider].get("userinfo") - user_resp.raise_for_status() - logging.info('Got user response') - + # Basic validation of token expiry token, secret = session['oauth'] if token_is_valid(token): return True - #ToDo - I could not get introspection to work, I was calling from FlaskOAuth2App, but needs to be and OAuth2Session which is the _get_oauth_client() of the Flask thing - # maybe we don't need to introspect anyway, can just check expiry. - - # # new token now stored in session - # token_dict = session['oauth_token_dict'] - # logging.info('Provider %s, Revised Token %s', provider, token_dict) - # token_endpoint = self.appbuilder.sm.oauth.plaidkeycloak.access_token_url - # intro_resp = self.appbuilder.sm.oauth_remotes[provider].introspect_token(token_endpoint, token=token_dict) - # intro_resp.raise_for_status() - # logging.info('Did introspection') - # token_info = intro_resp.json() - # if token_info['active']: - # return True + if USE_REFRESH_TOKENS: + # to do the below, it needs custom `set_oauth_session` to save the `oauth_token_dict` + provider = session["oauth_provider"] + token_dict = session['oauth_token_dict'] + logging.info('Provider %s, Token %s', provider, token_dict) + # this will refresh the token if it is expired (via `token_update` listener) + self.appbuilder.sm.oauth_remotes[provider].token = token_dict + user_resp = self.appbuilder.sm.oauth_remotes[provider].get("userinfo") + user_resp.raise_for_status() + logging.info('Got user response') + + #ToDo - I could not get introspection to work, I was calling from FlaskOAuth2App, but needs to be and OAuth2Session which is the _get_oauth_client() of the Flask thing + # maybe we don't need to introspect anyway, can just check expiry. + + # # new token now stored in session + # token_dict = session['oauth_token_dict'] + # logging.info('Provider %s, Revised Token %s', provider, token_dict) + # token_endpoint = self.appbuilder.sm.oauth.plaidkeycloak.access_token_url + # intro_resp = self.appbuilder.sm.oauth_remotes[provider].introspect_token(token_endpoint, token=token_dict) + # intro_resp.raise_for_status() + # logging.info('Did introspection') + # token_info = intro_resp.json() + # if token_info['active']: + # return True elif self.auth_type == AUTH_OID: if 'token' in session: @@ -459,30 +460,6 @@ def _internal_validate(): session.clear() return result - # - # def has_access(self, permission_name: str, view_name: str) -> bool: - # def logout_and_clear(): - # logout_user() - # session.clear() - # return False - # # check token expiry and logout, then continue previous auth check - # if self.auth_type == AUTH_OAUTH: - # if 'oauth' not in session: - # return logout_and_clear() - # token, secret = session['oauth'] - # # provider = session["oauth_provider"] - # if not token_is_valid(token): - # return logout_and_clear() - # - # elif self.auth_type == AUTH_OID: - # if 'token' not in session: - # return logout_and_clear() - # token = session['token'] - # if not token_is_valid(token): - # return logout_and_clear() - # - # return super().has_access(permission_name, view_name) - def token_is_valid(access_token): try: