diff --git a/dropbox/dropbox.py b/dropbox/dropbox.py index 3b15acad..e3055c42 100644 --- a/dropbox/dropbox.py +++ b/dropbox/dropbox.py @@ -168,23 +168,22 @@ def __init__(self, :param datetime oauth2_access_token_expiration: Expiration for oauth2_access_token :param str app_key: application key of requesting application; used for token refresh :param str app_secret: application secret of requesting application; used for token refresh + Not required if PKCE was used to authorize the token :param list scope: list of scopes to request on refresh. If left blank, refresh will request all available scopes for application """ - assert oauth2_access_token or oauth2_refresh_token, \ - 'OAuth2 access token or refresh token must be set' + if not (oauth2_access_token or oauth2_refresh_token): + raise BadInputException('OAuth2 access token or refresh token must be set') - assert headers is None or isinstance(headers, dict), \ - 'Expected dict, got %r' % headers + if headers is not None and not isinstance(headers, dict): + raise BadInputException('Expected dict, got {}'.format(headers)) - if oauth2_refresh_token: - assert app_key and app_secret, \ - "app_key and app_secret are required to refresh tokens" + if oauth2_refresh_token and not app_key: + raise BadInputException("app_key is required to refresh tokens") - if scope is not None: - assert len(scope) > 0 and isinstance(scope, list), \ - "Scope list must be of type list" + if scope is not None and (len(scope) == 0 or not isinstance(scope, list)): + raise BadInputException("Scope list must be of type list") self._oauth2_access_token = oauth2_access_token self._oauth2_refresh_token = oauth2_refresh_token @@ -197,8 +196,9 @@ def __init__(self, self._max_retries_on_error = max_retries_on_error self._max_retries_on_rate_limit = max_retries_on_rate_limit if session: - assert isinstance(session, requests.sessions.Session), \ - 'Expected requests.sessions.Session, got %r' % session + if not isinstance(session, requests.sessions.Session): + raise BadInputException('Expected requests.sessions.Session, got {}' + .format(session)) self._session = session else: self._session = create_session() @@ -346,7 +346,7 @@ def check_and_refresh_access_token(self): Checks if access token needs to be refreshed and refreshes if possible :return: """ - can_refresh = self._oauth2_refresh_token and self._app_key and self._app_secret + can_refresh = self._oauth2_refresh_token and self._app_key needs_refresh = self._oauth2_access_token_expiration and \ (datetime.utcnow() + timedelta(seconds=TOKEN_EXPIRATION_BUFFER)) >= \ self._oauth2_access_token_expiration @@ -363,13 +363,12 @@ def refresh_access_token(self, host=API_HOST, scope=None): :return: """ - if scope is not None: - assert len(scope) > 0 and isinstance(scope, list), \ - "Scope list must be of type list" + if scope is not None and (len(scope) == 0 or not isinstance(scope, list)): + raise BadInputException("Scope list must be of type list") - if not (self._oauth2_refresh_token and self._app_key and self._app_secret): + if not (self._oauth2_refresh_token and self._app_key): self._logger.warning('Unable to refresh access token without \ - refresh token, app key, and app secret') + refresh token and app key') return self._logger.info('Refreshing access token.') @@ -377,8 +376,9 @@ def refresh_access_token(self, host=API_HOST, scope=None): body = {'grant_type': 'refresh_token', 'refresh_token': self._oauth2_refresh_token, 'client_id': self._app_key, - 'client_secret': self._app_secret, } + if self._app_secret: + body['client_secret'] = self._app_secret if scope: scope = " ".join(scope) body['scope'] = scope @@ -719,3 +719,12 @@ def _get_dropbox_client_with_select_header(self, select_header_name, team_member session=self._session, headers=new_headers, ) + +class BadInputException(Exception): + """ + Thrown if incorrect types/values are used + + This should only ever be thrown during testing, app should have validation of input prior to + reaching this point + """ + pass diff --git a/dropbox/oauth.py b/dropbox/oauth.py index 974a5e1b..562b8827 100644 --- a/dropbox/oauth.py +++ b/dropbox/oauth.py @@ -1,3 +1,5 @@ +import hashlib + __all__ = [ 'BadRequestException', 'BadStateException', @@ -14,6 +16,7 @@ import os import six import urllib +import re from datetime import datetime, timedelta from .session import ( @@ -31,6 +34,7 @@ TOKEN_ACCESS_TYPES = ['offline', 'online', 'legacy'] INCLUDE_GRANTED_SCOPES_TYPES = ['user', 'team'] +PKCE_VERIFIER_LENGTH = 128 class OAuth2FlowNoRedirectResult(object): """ @@ -95,7 +99,12 @@ def __init__(self, access_token, account_id, user_id, url_state, refresh_token, :meth:`DropboxOAuth2Flow.start`. """ super(OAuth2FlowResult, self).__init__( - access_token, account_id, user_id, refresh_token, expires_in, scope) + access_token=access_token, + account_id=account_id, + user_id=user_id, + refresh_token=refresh_token, + expires_in=expires_in, + scope=scope) self.url_state = url_state @classmethod @@ -120,11 +129,17 @@ def __repr__(self): class DropboxOAuth2FlowBase(object): - def __init__(self, consumer_key, consumer_secret, locale=None, token_access_type='legacy', - scope=None, include_granted_scopes=None): - if scope is not None: - assert len(scope) > 0 and isinstance(scope, list), \ - "Scope list must be of type list" + def __init__(self, consumer_key, consumer_secret=None, locale=None, token_access_type='legacy', + scope=None, include_granted_scopes=None, use_pkce=False): + if scope is not None and (len(scope) == 0 or not isinstance(scope, list)): + raise BadInputException("Scope list must be of type list") + if token_access_type is not None and token_access_type not in TOKEN_ACCESS_TYPES: + raise BadInputException("Token access type must be from the following enum: {}".format( + TOKEN_ACCESS_TYPES)) + if not (use_pkce or consumer_secret): + raise BadInputException("Must pass in either consumer secret or use PKCE") + if include_granted_scopes and not scope: + raise BadInputException("Must pass in scope to pass include_granted_scopes") self.consumer_key = consumer_key self.consumer_secret = consumer_secret @@ -134,8 +149,15 @@ def __init__(self, consumer_key, consumer_secret, locale=None, token_access_type self.scope = scope self.include_granted_scopes = include_granted_scopes + if use_pkce: + self.code_verifier = _generate_pkce_code_verifier() + self.code_challenge = _generate_pkce_code_challenge(self.code_verifier) + else: + self.code_verifier = None + self.code_challenge = None + def _get_authorize_url(self, redirect_uri, state, token_access_type, scope=None, - include_granted_scopes=None): + include_granted_scopes=None, code_challenge=None): params = dict(response_type='code', client_id=self.consumer_key) if redirect_uri is not None: @@ -146,22 +168,28 @@ def _get_authorize_url(self, redirect_uri, state, token_access_type, scope=None, assert token_access_type in TOKEN_ACCESS_TYPES if token_access_type != 'legacy': params['token_access_type'] = token_access_type + if code_challenge: + params['code_challenge'] = code_challenge + params['code_challenge_method'] = 'S256' if scope is not None: params['scope'] = " ".join(scope) - if include_granted_scopes is not None: - assert include_granted_scopes in INCLUDE_GRANTED_SCOPES_TYPES - params['include_granted_scopes'] = str(include_granted_scopes) + if include_granted_scopes is not None: + assert include_granted_scopes in INCLUDE_GRANTED_SCOPES_TYPES + params['include_granted_scopes'] = include_granted_scopes return self.build_url('/oauth2/authorize', params, WEB_HOST) - def _finish(self, code, redirect_uri): + def _finish(self, code, redirect_uri, code_verifier): url = self.build_url('/oauth2/token') params = {'grant_type': 'authorization_code', 'code': code, 'client_id': self.consumer_key, - 'client_secret': self.consumer_secret, } + if code_verifier: + params['code_verifier'] = code_verifier + else: + params['client_secret'] = self.consumer_secret if self.locale is not None: params['locale'] = self.locale if redirect_uri is not None: @@ -273,9 +301,8 @@ class DropboxOAuth2FlowNoRedirect(DropboxOAuth2FlowBase): dbx = Dropbox(oauth_result.access_token) """ - def __init__(self, consumer_key, consumer_secret, locale=None, token_access_type='legacy', - scope=None, include_granted_scopes=None): - # noqa: E501; pylint: disable=useless-super-delegation + def __init__(self, consumer_key, consumer_secret=None, locale=None, token_access_type='legacy', + scope=None, include_granted_scopes=None, use_pkce=False): # noqa: E501; """ Construct an instance. @@ -298,14 +325,18 @@ def __init__(self, consumer_key, consumer_secret, locale=None, token_access_type user - include user scopes in the grant team - include team scopes in the grant Note: if this user has never linked the app, include_granted_scopes must be None + :param bool use_pkce: Whether or not to use Sha256 based PKCE. PKCE should be only use on + client apps which doesn't call your server. It is less secure than non-PKCE flow but + can be used if you are unable to safely retrieve your app secret """ super(DropboxOAuth2FlowNoRedirect, self).__init__( - consumer_key, - consumer_secret, - locale, - token_access_type, - scope, - include_granted_scopes, + consumer_key=consumer_key, + consumer_secret=consumer_secret, + locale=locale, + token_access_type=token_access_type, + scope=scope, + include_granted_scopes=include_granted_scopes, + use_pkce=use_pkce, ) def start(self): @@ -317,8 +348,10 @@ def start(self): access the user's Dropbox account. Tell the user to visit this URL and approve your app. """ - return self._get_authorize_url(None, None, self.token_access_type, self.scope, - self.include_granted_scopes) + return self._get_authorize_url(None, None, self.token_access_type, + scope=self.scope, + include_granted_scopes=self.include_granted_scopes, + code_challenge=self.code_challenge) def finish(self, code): """ @@ -331,7 +364,7 @@ def finish(self, code): :rtype: OAuth2FlowNoRedirectResult :raises: The same exceptions as :meth:`DropboxOAuth2Flow.finish()`. """ - return self._finish(code, None) + return self._finish(code, None, self.code_verifier) class DropboxOAuth2Flow(DropboxOAuth2FlowBase): @@ -379,9 +412,10 @@ def dropbox_auth_finish(web_app_session, request): """ - def __init__(self, consumer_key, consumer_secret, redirect_uri, session, - csrf_token_session_key, locale=None, token_access_type='legacy', - scope=None, include_granted_scopes=None): + def __init__(self, consumer_key, redirect_uri, session, + csrf_token_session_key, consumer_secret=None, locale=None, + token_access_type='legacy', scope=None, + include_granted_scopes=None, use_pkce=False): """ Construct an instance. @@ -412,10 +446,16 @@ def __init__(self, consumer_key, consumer_secret, redirect_uri, session, user - include user scopes in the grant team - include team scopes in the grant Note: if this user has never linked the app, include_granted_scopes must be None + :param bool use_pkce: Whether or not to use Sha256 based PKCE """ - super(DropboxOAuth2Flow, self).__init__(consumer_key, consumer_secret, locale, - token_access_type, scope, - include_granted_scopes) + super(DropboxOAuth2Flow, self).__init__( + consumer_key=consumer_key, + consumer_secret=consumer_secret, + locale=locale, + token_access_type=token_access_type, + scope=scope, + include_granted_scopes=include_granted_scopes, + use_pkce=use_pkce) self.redirect_uri = redirect_uri self.session = session self.csrf_token_session_key = csrf_token_session_key @@ -450,7 +490,9 @@ def start(self, url_state=None): self.session[self.csrf_token_session_key] = csrf_token return self._get_authorize_url(self.redirect_uri, state, self.token_access_type, - self.scope, self.include_granted_scopes) + scope=self.scope, + include_granted_scopes=self.include_granted_scopes, + code_challenge=self.code_challenge) def finish(self, query_params): """ @@ -534,7 +576,7 @@ def finish(self, query_params): # If everything went ok, make the network call to get an access token. - no_redirect_result = self._finish(code, self.redirect_uri) + no_redirect_result = self._finish(code, self.redirect_uri, self.code_verifier) return OAuth2FlowResult.from_no_redirect_result( no_redirect_result, url_state) @@ -588,6 +630,16 @@ class ProviderException(Exception): pass +class BadInputException(Exception): + """ + Thrown if incorrect types/values are used + + This should only ever be thrown during testing, app should have validation of input prior to + reaching this point + """ + pass + + def _safe_equals(a, b): if len(a) != len(b): return False @@ -616,3 +668,16 @@ def encode(o): utf8_params = {encode(k): encode(v) for k, v in six.iteritems(params)} return url_encode(utf8_params) + +def _generate_pkce_code_verifier(): + code_verifier = base64.urlsafe_b64encode(os.urandom(PKCE_VERIFIER_LENGTH)).decode('utf-8') + code_verifier = re.sub('[^a-zA-Z0-9]+', '', code_verifier) + if len(code_verifier) > PKCE_VERIFIER_LENGTH: + code_verifier = code_verifier[:128] + return code_verifier + +def _generate_pkce_code_challenge(code_verifier): + code_challenge = hashlib.sha256(code_verifier.encode('utf-8')).digest() + code_challenge = base64.urlsafe_b64encode(code_challenge).decode('utf-8') + code_challenge = code_challenge.replace('=', '') + return code_challenge diff --git a/example/oauth/commandline-oauth-pkce.py b/example/oauth/commandline-oauth-pkce.py new file mode 100644 index 00000000..9fbb83a7 --- /dev/null +++ b/example/oauth/commandline-oauth-pkce.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python3 + +import dropbox +from dropbox import DropboxOAuth2FlowNoRedirect + +''' +This example uses PKCE, a currently beta feature. +If you are interested in using this, please contact +Dropbox support +''' +APP_KEY = "" + +auth_flow = DropboxOAuth2FlowNoRedirect(APP_KEY, pkce_method='S256', token_access_type='offline') + +authorize_url = auth_flow.start() +print("1. Go to: " + authorize_url) +print("2. Click \"Allow\" (you might have to log in first).") +print("3. Copy the authorization code.") +auth_code = input("Enter the authorization code here: ").strip() + +try: + oauth_result = auth_flow.finish(auth_code) + print(oauth_result) +except Exception as e: + print('Error: %s' % (e,)) + exit(1) + +dbx = dropbox.Dropbox(oauth2_refresh_token=oauth_result.refresh_token, app_key=APP_KEY) +dbx.users_get_current_account() diff --git a/example/commandline-oauth.py b/example/oauth/commandline-oauth.py similarity index 98% rename from example/commandline-oauth.py rename to example/oauth/commandline-oauth.py index 83ad8779..32b038b8 100644 --- a/example/commandline-oauth.py +++ b/example/oauth/commandline-oauth.py @@ -19,6 +19,7 @@ print(oauth_result) except Exception as e: print('Error: %s' % (e,)) + exit(1) dbx = dropbox.Dropbox(oauth2_access_token=oauth_result.access_token, app_key=APP_KEY, app_secret=APP_SECRET) diff --git a/test/test_dropbox_unit.py b/test/test_dropbox_unit.py index a6e2a38a..88dd4f67 100644 --- a/test/test_dropbox_unit.py +++ b/test/test_dropbox_unit.py @@ -6,6 +6,7 @@ # Tests OAuth Flow from dropbox import DropboxOAuth2Flow, session, Dropbox, create_session +from dropbox.dropbox import BadInputException from dropbox.exceptions import AuthError from dropbox.oauth import OAuth2FlowNoRedirectResult, DropboxOAuth2FlowNoRedirect from datetime import datetime, timedelta @@ -24,6 +25,64 @@ class TestOAuth: + def test_authorization_url(self): + flow_obj = DropboxOAuth2Flow(APP_KEY, APP_SECRET, 'http://localhost/dummy', + 'dummy_session', 'dbx-auth-csrf-token') + for redirect_uri in [None, 'localhost']: + for state in [None, 'state']: + for token_access_type in [None, 'legacy', 'offline', 'online']: + for scope in [None, SCOPE_LIST]: + for include_granted_scopes in [None, 'user', 'team']: + for code_challenge in [None, 'mychallenge']: + authorization_url = \ + flow_obj._get_authorize_url(redirect_uri, state, + token_access_type, scope, + include_granted_scopes, + code_challenge) + assert authorization_url\ + .startswith('https://{}/oauth2/authorize?' + .format(session.WEB_HOST)) + assert 'client_id={}'.format(APP_KEY) in authorization_url + assert 'response_type=code' in authorization_url + + if redirect_uri: + assert 'redirect_uri={}'.format(redirect_uri) \ + in authorization_url + else: + assert 'redirect_uri' not in authorization_url + + if state: + assert 'state={}'.format(state) in authorization_url + else: + assert 'state' not in authorization_url + + if token_access_type and token_access_type != 'legacy': + assert 'token_access_type={}'.format(token_access_type) \ + in authorization_url + else: + assert 'token_access_type' not in authorization_url + + if scope: + assert 'scope={}'.format("+".join(scope)) \ + in authorization_url + else: + assert 'scope' not in authorization_url + + if include_granted_scopes and scope: + assert 'include_granted_scopes={}'\ + .format(include_granted_scopes)\ + in authorization_url + else: + assert 'include_granted_scopes' not in authorization_url + + if code_challenge: + assert 'code_challenge_method=S256' in authorization_url + assert 'code_challenge={}'.format(code_challenge)\ + in authorization_url + else: + assert 'code_challenge_method' not in authorization_url + assert 'code_challenge' not in authorization_url + def test_authorization_url_legacy_default(self): flow_obj = DropboxOAuth2Flow(APP_KEY, APP_SECRET, 'http://localhost/dummy', 'dummy_session', 'dbx-auth-csrf-token') @@ -181,12 +240,12 @@ def invalid_grant_session_instance(self, mocker): return session_obj def test_default_Dropbox_raises_assertion_error(self): - with pytest.raises(AssertionError): + with pytest.raises(BadInputException): # Requires either access token or refresh token Dropbox() def test_Dropbox_with_refresh_only_raises_assertion_error(self): - with pytest.raises(AssertionError): + with pytest.raises(BadInputException): # Refresh tokens also require app key and secret Dropbox(oauth2_refresh_token=REFRESH_TOKEN)