Skip to content

Commit

Permalink
Improve error handeling in get_access_token
Browse files Browse the repository at this point in the history
  • Loading branch information
halilbahar committed Apr 8, 2024
1 parent 9f1214c commit 752028d
Showing 1 changed file with 18 additions and 25 deletions.
43 changes: 18 additions & 25 deletions apiserver/plane/app/views/oidc.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,27 +101,19 @@ def get_access_token(request_token: str, client_id: str) -> str:
A string representing the access token issued out by the OIDC Provider
"""

if not request_token:
raise ValueError("The request token has to be supplied!")
if not request_token or not client_id:
raise ValueError("Both request_token and client_id must be supplied!")

(ACCESS_TOKEN_URL, CLIENT_SECRET, WEB_URL) = get_configuration_value(
[
{
"key": "OIDC_URL_TOKEN",
"default": os.environ.get("OIDC_URL_TOKEN", None),
},
{
"key": "OIDC_CLIENT_SECRET",
"default": os.environ.get("OIDC_CLIENT_SECRET", None),
},
{
"key": "WEB_URL",
"default": os.environ.get("WEB_URL", None),
},
]
)
(ACCESS_TOKEN_URL, CLIENT_SECRET, WEB_URL) = get_configuration_value([
{"key": "OIDC_URL_TOKEN", "default": os.environ.get("OIDC_URL_TOKEN")},
{"key": "OIDC_CLIENT_SECRET", "default": os.environ.get("OIDC_CLIENT_SECRET")},
{"key": "WEB_URL", "default": os.environ.get("WEB_URL")},
])

url = f"{ACCESS_TOKEN_URL}"
if not all([ACCESS_TOKEN_URL, CLIENT_SECRET, WEB_URL]):
raise ValueError("Configuration values for ACCESS_TOKEN_URL, CLIENT_SECRET, or WEB_URL are missing.")

url = ACCESS_TOKEN_URL
data = {
"grant_type": "authorization_code",
"code": request_token,
Expand All @@ -131,15 +123,16 @@ def get_access_token(request_token: str, client_id: str) -> str:
headers = {
"accept": "application/json",
"content-type": "application/x-www-form-urlencoded",
"Authorization": "Basic " + basic_auth,
"Authorization": f"Basic {basic_auth}",
}

res = requests.post(url, headers=headers, data=data)

data = res.json()
access_token = data["access_token"]
response = requests.post(url, headers=headers, data=data)

return access_token
data = response.json()
if 'access_token' in data:
return data["access_token"]
else:
raise Exception(f"Failed to obtain access token: {str(data)}")


def get_user_data(access_token: str) -> dict:
Expand Down

0 comments on commit 752028d

Please sign in to comment.