Skip to content

Commit

Permalink
Add Support for PKCE (#187)
Browse files Browse the repository at this point in the history
Add Support for PKCE

* Add PKCE support in oauth flow
* Change assertion to badInputExceptions
* Add command line oauth example
  • Loading branch information
rogebrd authored Apr 2, 2020
1 parent 4f39e3d commit 9c3302a
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 53 deletions.
47 changes: 28 additions & 19 deletions dropbox/dropbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,23 +168,22 @@ def __init__(self,
:param datetime oauth2_access_token_expiration: Expiration for oauth2_access_token
:param str app_key: application key of requesting application; used for token refresh
:param str app_secret: application secret of requesting application; used for token refresh
Not required if PKCE was used to authorize the token
:param list scope: list of scopes to request on refresh. If left blank,
refresh will request all available scopes for application
"""

assert oauth2_access_token or oauth2_refresh_token, \
'OAuth2 access token or refresh token must be set'
if not (oauth2_access_token or oauth2_refresh_token):
raise BadInputException('OAuth2 access token or refresh token must be set')

assert headers is None or isinstance(headers, dict), \
'Expected dict, got %r' % headers
if headers is not None and not isinstance(headers, dict):
raise BadInputException('Expected dict, got {}'.format(headers))

if oauth2_refresh_token:
assert app_key and app_secret, \
"app_key and app_secret are required to refresh tokens"
if oauth2_refresh_token and not app_key:
raise BadInputException("app_key is required to refresh tokens")

if scope is not None:
assert len(scope) > 0 and isinstance(scope, list), \
"Scope list must be of type list"
if scope is not None and (len(scope) == 0 or not isinstance(scope, list)):
raise BadInputException("Scope list must be of type list")

self._oauth2_access_token = oauth2_access_token
self._oauth2_refresh_token = oauth2_refresh_token
Expand All @@ -197,8 +196,9 @@ def __init__(self,
self._max_retries_on_error = max_retries_on_error
self._max_retries_on_rate_limit = max_retries_on_rate_limit
if session:
assert isinstance(session, requests.sessions.Session), \
'Expected requests.sessions.Session, got %r' % session
if not isinstance(session, requests.sessions.Session):
raise BadInputException('Expected requests.sessions.Session, got {}'
.format(session))
self._session = session
else:
self._session = create_session()
Expand Down Expand Up @@ -346,7 +346,7 @@ def check_and_refresh_access_token(self):
Checks if access token needs to be refreshed and refreshes if possible
:return:
"""
can_refresh = self._oauth2_refresh_token and self._app_key and self._app_secret
can_refresh = self._oauth2_refresh_token and self._app_key
needs_refresh = self._oauth2_access_token_expiration and \
(datetime.utcnow() + timedelta(seconds=TOKEN_EXPIRATION_BUFFER)) >= \
self._oauth2_access_token_expiration
Expand All @@ -363,22 +363,22 @@ def refresh_access_token(self, host=API_HOST, scope=None):
:return:
"""

if scope is not None:
assert len(scope) > 0 and isinstance(scope, list), \
"Scope list must be of type list"
if scope is not None and (len(scope) == 0 or not isinstance(scope, list)):
raise BadInputException("Scope list must be of type list")

if not (self._oauth2_refresh_token and self._app_key and self._app_secret):
if not (self._oauth2_refresh_token and self._app_key):
self._logger.warning('Unable to refresh access token without \
refresh token, app key, and app secret')
refresh token and app key')
return

self._logger.info('Refreshing access token.')
url = "https://{}/oauth2/token".format(host)
body = {'grant_type': 'refresh_token',
'refresh_token': self._oauth2_refresh_token,
'client_id': self._app_key,
'client_secret': self._app_secret,
}
if self._app_secret:
body['client_secret'] = self._app_secret
if scope:
scope = " ".join(scope)
body['scope'] = scope
Expand Down Expand Up @@ -719,3 +719,12 @@ def _get_dropbox_client_with_select_header(self, select_header_name, team_member
session=self._session,
headers=new_headers,
)

class BadInputException(Exception):
"""
Thrown if incorrect types/values are used
This should only ever be thrown during testing, app should have validation of input prior to
reaching this point
"""
pass
129 changes: 97 additions & 32 deletions dropbox/oauth.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import hashlib

__all__ = [
'BadRequestException',
'BadStateException',
Expand All @@ -14,6 +16,7 @@
import os
import six
import urllib
import re
from datetime import datetime, timedelta

from .session import (
Expand All @@ -31,6 +34,7 @@

TOKEN_ACCESS_TYPES = ['offline', 'online', 'legacy']
INCLUDE_GRANTED_SCOPES_TYPES = ['user', 'team']
PKCE_VERIFIER_LENGTH = 128

class OAuth2FlowNoRedirectResult(object):
"""
Expand Down Expand Up @@ -95,7 +99,12 @@ def __init__(self, access_token, account_id, user_id, url_state, refresh_token,
:meth:`DropboxOAuth2Flow.start`.
"""
super(OAuth2FlowResult, self).__init__(
access_token, account_id, user_id, refresh_token, expires_in, scope)
access_token=access_token,
account_id=account_id,
user_id=user_id,
refresh_token=refresh_token,
expires_in=expires_in,
scope=scope)
self.url_state = url_state

@classmethod
Expand All @@ -120,11 +129,17 @@ def __repr__(self):

class DropboxOAuth2FlowBase(object):

def __init__(self, consumer_key, consumer_secret, locale=None, token_access_type='legacy',
scope=None, include_granted_scopes=None):
if scope is not None:
assert len(scope) > 0 and isinstance(scope, list), \
"Scope list must be of type list"
def __init__(self, consumer_key, consumer_secret=None, locale=None, token_access_type='legacy',
scope=None, include_granted_scopes=None, use_pkce=False):
if scope is not None and (len(scope) == 0 or not isinstance(scope, list)):
raise BadInputException("Scope list must be of type list")
if token_access_type is not None and token_access_type not in TOKEN_ACCESS_TYPES:
raise BadInputException("Token access type must be from the following enum: {}".format(
TOKEN_ACCESS_TYPES))
if not (use_pkce or consumer_secret):
raise BadInputException("Must pass in either consumer secret or use PKCE")
if include_granted_scopes and not scope:
raise BadInputException("Must pass in scope to pass include_granted_scopes")

self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
Expand All @@ -134,8 +149,15 @@ def __init__(self, consumer_key, consumer_secret, locale=None, token_access_type
self.scope = scope
self.include_granted_scopes = include_granted_scopes

if use_pkce:
self.code_verifier = _generate_pkce_code_verifier()
self.code_challenge = _generate_pkce_code_challenge(self.code_verifier)
else:
self.code_verifier = None
self.code_challenge = None

def _get_authorize_url(self, redirect_uri, state, token_access_type, scope=None,
include_granted_scopes=None):
include_granted_scopes=None, code_challenge=None):
params = dict(response_type='code',
client_id=self.consumer_key)
if redirect_uri is not None:
Expand All @@ -146,22 +168,28 @@ def _get_authorize_url(self, redirect_uri, state, token_access_type, scope=None,
assert token_access_type in TOKEN_ACCESS_TYPES
if token_access_type != 'legacy':
params['token_access_type'] = token_access_type
if code_challenge:
params['code_challenge'] = code_challenge
params['code_challenge_method'] = 'S256'

if scope is not None:
params['scope'] = " ".join(scope)
if include_granted_scopes is not None:
assert include_granted_scopes in INCLUDE_GRANTED_SCOPES_TYPES
params['include_granted_scopes'] = str(include_granted_scopes)
if include_granted_scopes is not None:
assert include_granted_scopes in INCLUDE_GRANTED_SCOPES_TYPES
params['include_granted_scopes'] = include_granted_scopes

return self.build_url('/oauth2/authorize', params, WEB_HOST)

def _finish(self, code, redirect_uri):
def _finish(self, code, redirect_uri, code_verifier):
url = self.build_url('/oauth2/token')
params = {'grant_type': 'authorization_code',
'code': code,
'client_id': self.consumer_key,
'client_secret': self.consumer_secret,
}
if code_verifier:
params['code_verifier'] = code_verifier
else:
params['client_secret'] = self.consumer_secret
if self.locale is not None:
params['locale'] = self.locale
if redirect_uri is not None:
Expand Down Expand Up @@ -273,9 +301,8 @@ class DropboxOAuth2FlowNoRedirect(DropboxOAuth2FlowBase):
dbx = Dropbox(oauth_result.access_token)
"""

def __init__(self, consumer_key, consumer_secret, locale=None, token_access_type='legacy',
scope=None, include_granted_scopes=None):
# noqa: E501; pylint: disable=useless-super-delegation
def __init__(self, consumer_key, consumer_secret=None, locale=None, token_access_type='legacy',
scope=None, include_granted_scopes=None, use_pkce=False): # noqa: E501;
"""
Construct an instance.
Expand All @@ -298,14 +325,18 @@ def __init__(self, consumer_key, consumer_secret, locale=None, token_access_type
user - include user scopes in the grant
team - include team scopes in the grant
Note: if this user has never linked the app, include_granted_scopes must be None
:param bool use_pkce: Whether or not to use Sha256 based PKCE. PKCE should be only use on
client apps which doesn't call your server. It is less secure than non-PKCE flow but
can be used if you are unable to safely retrieve your app secret
"""
super(DropboxOAuth2FlowNoRedirect, self).__init__(
consumer_key,
consumer_secret,
locale,
token_access_type,
scope,
include_granted_scopes,
consumer_key=consumer_key,
consumer_secret=consumer_secret,
locale=locale,
token_access_type=token_access_type,
scope=scope,
include_granted_scopes=include_granted_scopes,
use_pkce=use_pkce,
)

def start(self):
Expand All @@ -317,8 +348,10 @@ def start(self):
access the user's Dropbox account. Tell the user to visit this URL
and approve your app.
"""
return self._get_authorize_url(None, None, self.token_access_type, self.scope,
self.include_granted_scopes)
return self._get_authorize_url(None, None, self.token_access_type,
scope=self.scope,
include_granted_scopes=self.include_granted_scopes,
code_challenge=self.code_challenge)

def finish(self, code):
"""
Expand All @@ -331,7 +364,7 @@ def finish(self, code):
:rtype: OAuth2FlowNoRedirectResult
:raises: The same exceptions as :meth:`DropboxOAuth2Flow.finish()`.
"""
return self._finish(code, None)
return self._finish(code, None, self.code_verifier)


class DropboxOAuth2Flow(DropboxOAuth2FlowBase):
Expand Down Expand Up @@ -379,9 +412,10 @@ def dropbox_auth_finish(web_app_session, request):
"""

def __init__(self, consumer_key, consumer_secret, redirect_uri, session,
csrf_token_session_key, locale=None, token_access_type='legacy',
scope=None, include_granted_scopes=None):
def __init__(self, consumer_key, redirect_uri, session,
csrf_token_session_key, consumer_secret=None, locale=None,
token_access_type='legacy', scope=None,
include_granted_scopes=None, use_pkce=False):
"""
Construct an instance.
Expand Down Expand Up @@ -412,10 +446,16 @@ def __init__(self, consumer_key, consumer_secret, redirect_uri, session,
user - include user scopes in the grant
team - include team scopes in the grant
Note: if this user has never linked the app, include_granted_scopes must be None
:param bool use_pkce: Whether or not to use Sha256 based PKCE
"""
super(DropboxOAuth2Flow, self).__init__(consumer_key, consumer_secret, locale,
token_access_type, scope,
include_granted_scopes)
super(DropboxOAuth2Flow, self).__init__(
consumer_key=consumer_key,
consumer_secret=consumer_secret,
locale=locale,
token_access_type=token_access_type,
scope=scope,
include_granted_scopes=include_granted_scopes,
use_pkce=use_pkce)
self.redirect_uri = redirect_uri
self.session = session
self.csrf_token_session_key = csrf_token_session_key
Expand Down Expand Up @@ -450,7 +490,9 @@ def start(self, url_state=None):
self.session[self.csrf_token_session_key] = csrf_token

return self._get_authorize_url(self.redirect_uri, state, self.token_access_type,
self.scope, self.include_granted_scopes)
scope=self.scope,
include_granted_scopes=self.include_granted_scopes,
code_challenge=self.code_challenge)

def finish(self, query_params):
"""
Expand Down Expand Up @@ -534,7 +576,7 @@ def finish(self, query_params):

# If everything went ok, make the network call to get an access token.

no_redirect_result = self._finish(code, self.redirect_uri)
no_redirect_result = self._finish(code, self.redirect_uri, self.code_verifier)
return OAuth2FlowResult.from_no_redirect_result(
no_redirect_result, url_state)

Expand Down Expand Up @@ -588,6 +630,16 @@ class ProviderException(Exception):
pass


class BadInputException(Exception):
"""
Thrown if incorrect types/values are used
This should only ever be thrown during testing, app should have validation of input prior to
reaching this point
"""
pass


def _safe_equals(a, b):
if len(a) != len(b):
return False
Expand Down Expand Up @@ -616,3 +668,16 @@ def encode(o):

utf8_params = {encode(k): encode(v) for k, v in six.iteritems(params)}
return url_encode(utf8_params)

def _generate_pkce_code_verifier():
code_verifier = base64.urlsafe_b64encode(os.urandom(PKCE_VERIFIER_LENGTH)).decode('utf-8')
code_verifier = re.sub('[^a-zA-Z0-9]+', '', code_verifier)
if len(code_verifier) > PKCE_VERIFIER_LENGTH:
code_verifier = code_verifier[:128]
return code_verifier

def _generate_pkce_code_challenge(code_verifier):
code_challenge = hashlib.sha256(code_verifier.encode('utf-8')).digest()
code_challenge = base64.urlsafe_b64encode(code_challenge).decode('utf-8')
code_challenge = code_challenge.replace('=', '')
return code_challenge
29 changes: 29 additions & 0 deletions example/oauth/commandline-oauth-pkce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/usr/bin/env python3

import dropbox
from dropbox import DropboxOAuth2FlowNoRedirect

'''
This example uses PKCE, a currently beta feature.
If you are interested in using this, please contact
Dropbox support
'''
APP_KEY = ""

auth_flow = DropboxOAuth2FlowNoRedirect(APP_KEY, pkce_method='S256', token_access_type='offline')

authorize_url = auth_flow.start()
print("1. Go to: " + authorize_url)
print("2. Click \"Allow\" (you might have to log in first).")
print("3. Copy the authorization code.")
auth_code = input("Enter the authorization code here: ").strip()

try:
oauth_result = auth_flow.finish(auth_code)
print(oauth_result)
except Exception as e:
print('Error: %s' % (e,))
exit(1)

dbx = dropbox.Dropbox(oauth2_refresh_token=oauth_result.refresh_token, app_key=APP_KEY)
dbx.users_get_current_account()
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
print(oauth_result)
except Exception as e:
print('Error: %s' % (e,))
exit(1)

dbx = dropbox.Dropbox(oauth2_access_token=oauth_result.access_token,
app_key=APP_KEY, app_secret=APP_SECRET)
Expand Down
Loading

0 comments on commit 9c3302a

Please sign in to comment.