Skip to content

Commit

Permalink
app consume one_time token
Browse files Browse the repository at this point in the history
  • Loading branch information
jefer94 committed Aug 23, 2024
1 parent 36cf5a2 commit 3346c8f
Show file tree
Hide file tree
Showing 5 changed files with 300 additions and 5 deletions.
21 changes: 17 additions & 4 deletions breathecode/authenticate/models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime
from typing import Any
from typing import Tuple, TypedDict, Unpack

import rest_framework.authtoken.models
from django import forms
Expand Down Expand Up @@ -530,6 +530,15 @@ class CredentialsGoogle(models.Model):
updated_at = models.DateTimeField(auto_now=True, editable=False)


class TokenGetOrCreateArgs(TypedDict):
hours_length: int
expires_at: datetime


class TokenFilterArgs(TypedDict):
token_type: str


class Token(rest_framework.authtoken.models.Token):
"""Bearer Token that support different types like `'login'`, `'temporal'` or `'permanent'`."""

Expand Down Expand Up @@ -563,7 +572,7 @@ def delete_expired_tokens() -> None:
Token.objects.filter(expires_at__lt=utc_now).delete()

@classmethod
def get_or_create(cls, user, token_type: str, **kwargs: Any):
def get_or_create(cls, user, token_type: str, **kwargs: Unpack[TokenGetOrCreateArgs]) -> Tuple["Token", bool]:
utc_now = timezone.now()
kwargs["token_type"] = token_type

Expand Down Expand Up @@ -606,12 +615,16 @@ def get_or_create(cls, user, token_type: str, **kwargs: Any):
return token, created

@classmethod
def get_valid(cls, token: str):
def get_valid(cls, token: str, **kwargs: Unpack[TokenFilterArgs]) -> "Token | None":
utc_now = timezone.now()
cls.delete_expired_tokens()

# find among any non-expired token
return Token.objects.filter(key=token).filter(Q(expires_at__gt=utc_now) | Q(expires_at__isnull=True)).first()
return (
Token.objects.filter(key=token, **kwargs)
.filter(Q(expires_at__gt=utc_now) | Q(expires_at__isnull=True))
.first()
)

@classmethod
def validate_and_destroy(cls, hash: str) -> User:
Expand Down
177 changes: 177 additions & 0 deletions breathecode/authenticate/tests/urls/tests_app_token.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
"""
Test cases for /user
"""

from datetime import datetime, timedelta
from typing import Callable
from unittest.mock import MagicMock

import pytest
from django.urls.base import reverse_lazy
from rest_framework import status

from breathecode.tests.mixins.breathecode_mixin.breathecode import Breathecode
from capyc.rest_framework import pytest as capy


def credentials_github_serializer(credentials_github):
return {
"avatar_url": credentials_github.avatar_url,
"name": credentials_github.name,
"username": credentials_github.username,
}


def profile_serializer(credentials_github):
return {
"avatar_url": credentials_github.avatar_url,
}


def get_serializer(user, credentials_github=None, profile=None, **data):
return {
"email": user.email,
"username": user.username,
"first_name": user.first_name,
"github": credentials_github_serializer(credentials_github) if credentials_github else None,
"id": user.id,
"last_name": user.last_name,
"profile": profile_serializer(profile) if profile else None,
**data,
}


@pytest.fixture(autouse=True)
def setup(db, monkeypatch):
from linked_services.django.actions import reset_app_cache

reset_app_cache()
monkeypatch.setattr("linked_services.django.tasks.check_credentials.delay", MagicMock())
yield


def test_no_auth(bc: Breathecode, client: capy.Client):
url = reverse_lazy("authenticate:app_token")
response = client.post(url)

json = response.json()
expected = {
"detail": "no-authorization-header",
"status_code": status.HTTP_401_UNAUTHORIZED,
}

assert json == expected
assert response.status_code == status.HTTP_401_UNAUTHORIZED
assert bc.database.list_of("authenticate.Token") == []


def test_no_data(bc: Breathecode, client: capy.Client, sign_jwt_link: Callable[..., None]):
app = {"require_an_agreement": False, "slug": "rigobot"}
model = bc.database.create(
app=app,
first_party_credentials={
"app": {
"rigobot": 1,
},
},
)

sign_jwt_link(client, model.app)

url = reverse_lazy("authenticate:app_token")
response = client.post(url)

json = response.json()
expected = {"detail": "token-not-provided", "status_code": 400}

assert json == expected
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert bc.database.list_of("authenticate.Token") == []


@pytest.mark.parametrize("token_type", [None, "login", "temporal", "permanent"])
@pytest.mark.parametrize("delta", [-timedelta(hours=10), timedelta(0), timedelta(hours=10)])
def test_bad_token(
bc: Breathecode,
client: capy.Client,
sign_jwt_link: Callable[..., None],
token_type: str,
delta: timedelta,
utc_now: datetime,
format: capy.Format,
):
extra = {}
app = {"require_an_agreement": False, "slug": "rigobot"}

if token_type:
extra["user"] = 1
expires_at = None
if delta:
expires_at = utc_now + delta

extra["token"] = {"token_type": token_type, "key": "1234", "expires_at": expires_at}

model = bc.database.create(
app=app,
first_party_credentials={
"app": {
"rigobot": 1,
},
},
**extra,
)

sign_jwt_link(client, model.app)

url = reverse_lazy("authenticate:app_token")
data = {"token": "1234"}
response = client.post(url, data)

json = response.json()
expected = {"detail": "invalid-token", "status_code": 401}

assert json == expected
assert response.status_code == status.HTTP_401_UNAUTHORIZED
if "token" in extra and (token_type == "permanent" or delta >= timedelta(0)):
assert bc.database.list_of("authenticate.Token") == [format.to_obj_repr(model.token)]
else:
assert bc.database.list_of("authenticate.Token") == []


@pytest.mark.parametrize("delta", [timedelta(0), timedelta(hours=10)])
def test_get_token(
bc: Breathecode, client: capy.Client, sign_jwt_link: Callable[..., None], utc_now: datetime, delta: timedelta
):
app = {"require_an_agreement": False, "slug": "rigobot"}
expires_at = None
if delta:
expires_at = utc_now + delta
model = bc.database.create(
user=1,
token={"token_type": "one_time", "key": "1234", "expires_at": expires_at},
app=app,
first_party_credentials={
"app": {
"rigobot": 1,
},
},
)

sign_jwt_link(client, model.app)

url = reverse_lazy("authenticate:app_token")
data = {"token": "1234"}
response = client.post(url, data)

json = response.json()
expected = {
"email": model.user.email,
"expires_at": None,
"token": "1234",
"token_type": "one_time",
"user_id": 1,
}

assert json == expected
assert response.status_code == status.HTTP_200_OK
assert bc.database.list_of("authenticate.Token") == []
2 changes: 2 additions & 0 deletions breathecode/authenticate/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
AcademyInviteView,
AcademyTokenView,
AppSync,
AppTokenView,
AppUserAgreementView,
AppUserView,
ConfirmEmailView,
Expand Down Expand Up @@ -165,4 +166,5 @@
path("app/user/<int:user_id>", AppUserView.as_view(), name="app_user_id"),
path("app/webhook", app_webhook, name="app_webhook"),
path("me/app/<str:app_slug>/sync", AppSync.as_view(), name="me_app_slug_sync"),
path("app/token", AppTokenView.as_view(), name="app_token"),
]
36 changes: 36 additions & 0 deletions breathecode/authenticate/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2715,3 +2715,39 @@ async def post(self, request, app_slug: str):
}

return await s.post("/v1/auth/app/user", data)


# app/user/:id
class AppTokenView(APIView):
permission_classes = [AllowAny]
extensions = APIViewExtensions(paginate=True)

@scope(["read:token"])
def post(self, request: LinkedHttpRequest, app: LinkedApp, token: LinkedToken, user_id=None):
lang = get_user_language(request)

hash = request.data.get("token")
if not hash:
raise ValidationException(
translation(lang, en="Token not provided", es="Token no proporcionado", slug="token-not-provided"),
code=400,
)

t = Token.get_valid(hash, token_type="one_time")
if t is None:
raise ValidationException(
translation(lang, en="Invalid token", es="Token inválido", slug="invalid-token"),
code=401,
)

t.delete()

return Response(
{
"token": t.key,
"token_type": t.token_type,
"expires_at": t.expires_at,
"user_id": t.user.pk,
"email": t.user.email,
}
)
69 changes: 68 additions & 1 deletion conftest.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import os
from typing import Generator
from typing import Generator, Optional
from unittest.mock import MagicMock, patch

import jwt
import pytest
from django.core.cache import cache
from django.utils import timezone
from rest_framework.test import APIClient

from breathecode.notify.utils.hook_manager import HookManagerClass
from breathecode.utils.exceptions import TestError
Expand Down Expand Up @@ -244,3 +246,68 @@ def wrapper(first: dict | list[dict], second: dict | list[dict]) -> None:
assert original == second

yield wrapper


@pytest.fixture
def sign_jwt_link():

def wrapper(
client: APIClient,
app,
user_id: Optional[int] = None,
reverse: bool = False,
):
"""
Set Json Web Token in the request.
Usage:
```py
# setup the database
model = self.bc.database.create(app=1, user=1)
# that setup the request to use the credential of user passed
self.bc.request.authenticate(model.app, model.user.id)
```
Keywords arguments:
- user: a instance of user model `breathecode.authenticate.models.User`
"""
from datetime import datetime, timedelta

from django.utils import timezone

now = timezone.now()

# https://datatracker.ietf.org/doc/html/rfc7519#section-4
payload = {
"sub": user_id,
"iss": os.getenv("API_URL", "http://localhost:8000"),
"app": app.slug,
"aud": "breathecode",
"exp": datetime.timestamp(now + timedelta(minutes=2)),
"iat": datetime.timestamp(now) - 1,
"typ": "JWT",
}

if reverse:
payload["aud"] = app.slug
payload["app"] = "breathecode"

if app.algorithm == "HMAC_SHA256":

token = jwt.encode(payload, bytes.fromhex(app.private_key), algorithm="HS256")

elif app.algorithm == "HMAC_SHA512":
token = jwt.encode(payload, bytes.fromhex(app.private_key), algorithm="HS512")

elif app.algorithm == "ED25519":
token = jwt.encode(payload, bytes.fromhex(app.private_key), algorithm="EdDSA")

else:
raise Exception("Algorithm not implemented")

client.credentials(HTTP_AUTHORIZATION=f"Link App={app.slug},Token={token}")

yield wrapper

0 comments on commit 3346c8f

Please sign in to comment.