From c1c0e1ab2d99fe07f89932d91429f7fd25f6575d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Gomez?= <89980752+kot0dama@users.noreply.github.com> Date: Sun, 14 Apr 2024 17:44:57 +0900 Subject: [PATCH] Add filtering on fqdn/username for API calls (domains, users, domain-user-permissions) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Loïc Gomez <89980752+kot0dama@users.noreply.github.com> --- httprequest_lego_provider/tests/conftest.py | 21 +++ httprequest_lego_provider/tests/test_views.py | 125 ++++++++++++++++++ httprequest_lego_provider/views.py | 41 +++++- 3 files changed, 186 insertions(+), 1 deletion(-) diff --git a/httprequest_lego_provider/tests/conftest.py b/httprequest_lego_provider/tests/conftest.py index b3aba71..514c6c0 100644 --- a/httprequest_lego_provider/tests/conftest.py +++ b/httprequest_lego_provider/tests/conftest.py @@ -33,6 +33,18 @@ def user_fixture(username: str, user_password: str) -> User: return User.objects.create_user(username, password=user_password) +@pytest.fixture(scope="module", name="other_username") +def other_username_fixture() -> str: + """Provide another user username.""" + return "other_user" + + +@pytest.fixture(scope="function", name="other_user") +def other_user_fixture(other_username: str) -> User: + """Provide another user.""" + return User.objects.create_user(other_username, password=None) + + @pytest.fixture(scope="function", name="user_auth_token") def user_auth_token_fixture(username: str, user_password: str, user: User) -> str: """Provide the auth_token for the default user.""" @@ -79,6 +91,15 @@ def domain_fixture(fqdn: str) -> Domain: return Domain.objects.create(fqdn=f"{FQDN_PREFIX}{fqdn}") +@pytest.fixture(scope="function", name="domains") +def domains_fixture(fqdns: list) -> list: + """Create all domains and return the list of Domain objects.""" + domains = [] + for fqdn in fqdns: + domains.append(Domain.objects.create(fqdn=f"{FQDN_PREFIX}{fqdn}")) + return domains + + @pytest.fixture(scope="function", name="domain_user_permission") def domain_user_permission_fixture(domain: Domain, user: User) -> DomainUserPermission: """Provide a valid domain user permission.""" diff --git a/httprequest_lego_provider/tests/test_views.py b/httprequest_lego_provider/tests/test_views.py index e02e365..8cc690a 100644 --- a/httprequest_lego_provider/tests/test_views.py +++ b/httprequest_lego_provider/tests/test_views.py @@ -321,6 +321,62 @@ def test_test_jwt_token_login( assert response.status_code == 204 +@pytest.mark.django_db +def test_get_domain_when_logged_in_as_non_admin_user(client: Client, user_auth_token: str): + """ + arrange: log in a non-admin user. + act: submit a GET request for the domain URL. + assert: a 403 is returned + """ + response = client.get( + "/api/v1/domains/", + format="json", + headers={"AUTHORIZATION": f"Basic {user_auth_token}"}, + ) + + assert response.status_code == 403 + + +@pytest.mark.django_db +def test_get_domain_when_logged_in_as_admin_user( + client: Client, admin_user_auth_token: str, domains: list +): + """ + arrange: log in an admin user. + act: submit a GET request for the domain URL. + assert: a 200 is returned and the domains are all returned. + """ + assert len(Domain.objects.all()) != 0 + response = client.get( + "/api/v1/domains/", + format="json", + headers={"AUTHORIZATION": f"Basic {admin_user_auth_token}"}, + ) + json = response.json() + + assert response.status_code == 200 + assert len(json) == len(domains) + + +@pytest.mark.django_db +def test_get_domain_with_fqdn_filter(client: Client, admin_user_auth_token: str, domains: list): + """ + arrange: log in an admin user. + act: submit a GET request for the domain URL. + assert: a 200 is returned and the domain matching FQDN is returned. + """ + response = client.get( + "/api/v1/domains/?fqdn=example2.com", + format="json", + headers={"AUTHORIZATION": f"Basic {admin_user_auth_token}"}, + ) + json = response.json() + + assert response.status_code == 200 + assert len(json) == 1 + assert json[0]["fqdn"] == f"{FQDN_PREFIX}example2.com" + + @pytest.mark.django_db def test_post_domain_when_logged_in_as_non_admin_user(client: Client, user_auth_token: str): """ @@ -379,6 +435,75 @@ def test_post_domain_when_logged_in_as_admin_user_and_domain_invalid( assert response.status_code == 400 +@pytest.mark.django_db +def test_get_domain_user_permission_when_logged_in_as_non_admin_user( + client: Client, user_auth_token: str, domain: Domain, user: User +): + """ + arrange: log in a non-admin user. + act: submit a GET request for the domain user permission URL. + assert: a 403 is returned + """ + response = client.get( + "/api/v1/domain-user-permissions/", + format="json", + headers={"AUTHORIZATION": f"Basic {user_auth_token}"}, + ) + + assert response.status_code == 403 + + +@pytest.mark.django_db +def test_get_domain_user_permission_when_logged_in_as_admin_user( + client: Client, + admin_user_auth_token: str, + user: User, + domain_user_permissions: list, +): + """ + arrange: log in an admin user. + act: submit a GET request for the domain user permission URL for a existing domain. + assert: a 200 is returned, the json result does not contain unwanted domain-user-permissions. + """ + assert len(DomainUserPermission.objects.all()) != 0 + response = client.get( + "/api/v1/domain-user-permissions/", + headers={"AUTHORIZATION": f"Basic {admin_user_auth_token}"}, + ) + json = response.json() + + assert response.status_code == 200 + assert len(json) == len(DomainUserPermission.objects.all()) + + +@pytest.mark.django_db +def test_get_domain_user_permission_with_filters( + client: Client, + admin_user_auth_token: str, + user: User, + domain_user_permissions: list, +): + """ + arrange: log in an admin user. + act: submit a GET request for the domain user permission URL for a existing domain. + assert: a 200 is returned, the json result does not contain unwanted domain-user-permissions. + """ + assert len(DomainUserPermission.objects.filter()) != 0 + response = client.get( + "/api/v1/domain-user-permissions/", + data={"fqdn": "example2.com", "username": user.username}, + headers={"AUTHORIZATION": f"Basic {admin_user_auth_token}"}, + ) + json = response.json() + + assert response.status_code == 200 + assert len(json) > 0 + + for entry in json: + assert entry["domain"] == Domain.objects.get(fqdn=f"{FQDN_PREFIX}example2.com").id + assert entry["user"] == User.objects.get(username=user.username).id + + @pytest.mark.django_db def test_post_domain_user_permission_when_logged_in_as_non_admin_user( client: Client, user_auth_token: str, domain: Domain, user: User diff --git a/httprequest_lego_provider/views.py b/httprequest_lego_provider/views.py index fde2506..6768de3 100644 --- a/httprequest_lego_provider/views.py +++ b/httprequest_lego_provider/views.py @@ -16,7 +16,7 @@ from rest_framework.permissions import IsAdminUser from .dns import remove_dns_record, write_dns_record -from .forms import CleanupForm, PresentForm +from .forms import FQDN_PREFIX, CleanupForm, PresentForm from .models import Domain, DomainUserPermission from .serializers import DomainSerializer, DomainUserPermissionSerializer, UserSerializer @@ -91,6 +91,18 @@ class DomainViewSet(viewsets.ModelViewSet): serializer_class = DomainSerializer permission_classes = [IsAdminUser] + def get_queryset(self): + """Optionally restricts the returned object list to a given domain. + + Returns: + a filtered queryset against a `fqdn` query parameter in the URL. + """ + queryset = self.queryset + fqdn = self.request.query_params.get("fqdn") + if fqdn is not None: + queryset = queryset.filter(fqdn=f"{FQDN_PREFIX}{fqdn}") + return queryset + class DomainUserPermissionViewSet(viewsets.ModelViewSet): """Views for the DomainUserPermission. @@ -105,6 +117,21 @@ class DomainUserPermissionViewSet(viewsets.ModelViewSet): serializer_class = DomainUserPermissionSerializer permission_classes = [IsAdminUser] + def get_queryset(self): + """Optionally restricts the returned object list to a given user/domain. + + Returns: + A filtered queryset against `username` / `fqdn` query parameters in the URL. + """ + queryset = self.queryset + username = self.request.query_params.get("username") + if username is not None: + queryset = queryset.filter(user__username=username) + fqdn = self.request.query_params.get("fqdn") + if fqdn is not None: + queryset = queryset.filter(domain__fqdn=f"{FQDN_PREFIX}{fqdn}") + return queryset + class UserViewSet(viewsets.ModelViewSet): """Views for the User. @@ -118,3 +145,15 @@ class UserViewSet(viewsets.ModelViewSet): queryset = User.objects.all().order_by("-date_joined") serializer_class = UserSerializer permission_classes = [IsAdminUser] + + def get_queryset(self): + """Optionally restricts the returned object list to a given user. + + Returns: + A filtered queryset against a `username` query parameter in the URL. + """ + queryset = self.queryset + username = self.request.query_params.get("username") + if username is not None: + queryset = queryset.filter(username=username) + return queryset