Skip to content

Commit

Permalink
Add filtering on fqdn/username for API calls (domains, users, domain-…
Browse files Browse the repository at this point in the history
…user-permissions)

Signed-off-by: Loïc Gomez <[email protected]>
  • Loading branch information
kot0dama committed Apr 14, 2024
1 parent 6d8424f commit c1c0e1a
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 1 deletion.
21 changes: 21 additions & 0 deletions httprequest_lego_provider/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@ def user_fixture(username: str, user_password: str) -> User:
return User.objects.create_user(username, password=user_password)


@pytest.fixture(scope="module", name="other_username")
def other_username_fixture() -> str:
"""Provide another user username."""
return "other_user"


@pytest.fixture(scope="function", name="other_user")
def other_user_fixture(other_username: str) -> User:
"""Provide another user."""
return User.objects.create_user(other_username, password=None)


@pytest.fixture(scope="function", name="user_auth_token")
def user_auth_token_fixture(username: str, user_password: str, user: User) -> str:
"""Provide the auth_token for the default user."""
Expand Down Expand Up @@ -79,6 +91,15 @@ def domain_fixture(fqdn: str) -> Domain:
return Domain.objects.create(fqdn=f"{FQDN_PREFIX}{fqdn}")


@pytest.fixture(scope="function", name="domains")
def domains_fixture(fqdns: list) -> list:
"""Create all domains and return the list of Domain objects."""
domains = []
for fqdn in fqdns:
domains.append(Domain.objects.create(fqdn=f"{FQDN_PREFIX}{fqdn}"))
return domains


@pytest.fixture(scope="function", name="domain_user_permission")
def domain_user_permission_fixture(domain: Domain, user: User) -> DomainUserPermission:
"""Provide a valid domain user permission."""
Expand Down
125 changes: 125 additions & 0 deletions httprequest_lego_provider/tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,62 @@ def test_test_jwt_token_login(
assert response.status_code == 204


@pytest.mark.django_db
def test_get_domain_when_logged_in_as_non_admin_user(client: Client, user_auth_token: str):
"""
arrange: log in a non-admin user.
act: submit a GET request for the domain URL.
assert: a 403 is returned
"""
response = client.get(
"/api/v1/domains/",
format="json",
headers={"AUTHORIZATION": f"Basic {user_auth_token}"},
)

assert response.status_code == 403


@pytest.mark.django_db
def test_get_domain_when_logged_in_as_admin_user(
client: Client, admin_user_auth_token: str, domains: list
):
"""
arrange: log in an admin user.
act: submit a GET request for the domain URL.
assert: a 200 is returned and the domains are all returned.
"""
assert len(Domain.objects.all()) != 0
response = client.get(
"/api/v1/domains/",
format="json",
headers={"AUTHORIZATION": f"Basic {admin_user_auth_token}"},
)
json = response.json()

assert response.status_code == 200
assert len(json) == len(domains)


@pytest.mark.django_db
def test_get_domain_with_fqdn_filter(client: Client, admin_user_auth_token: str, domains: list):
"""
arrange: log in an admin user.
act: submit a GET request for the domain URL.
assert: a 200 is returned and the domain matching FQDN is returned.
"""
response = client.get(
"/api/v1/domains/?fqdn=example2.com",
format="json",
headers={"AUTHORIZATION": f"Basic {admin_user_auth_token}"},
)
json = response.json()

assert response.status_code == 200
assert len(json) == 1
assert json[0]["fqdn"] == f"{FQDN_PREFIX}example2.com"


@pytest.mark.django_db
def test_post_domain_when_logged_in_as_non_admin_user(client: Client, user_auth_token: str):
"""
Expand Down Expand Up @@ -379,6 +435,75 @@ def test_post_domain_when_logged_in_as_admin_user_and_domain_invalid(
assert response.status_code == 400


@pytest.mark.django_db
def test_get_domain_user_permission_when_logged_in_as_non_admin_user(
client: Client, user_auth_token: str, domain: Domain, user: User
):
"""
arrange: log in a non-admin user.
act: submit a GET request for the domain user permission URL.
assert: a 403 is returned
"""
response = client.get(
"/api/v1/domain-user-permissions/",
format="json",
headers={"AUTHORIZATION": f"Basic {user_auth_token}"},
)

assert response.status_code == 403


@pytest.mark.django_db
def test_get_domain_user_permission_when_logged_in_as_admin_user(
client: Client,
admin_user_auth_token: str,
user: User,
domain_user_permissions: list,
):
"""
arrange: log in an admin user.
act: submit a GET request for the domain user permission URL for a existing domain.
assert: a 200 is returned, the json result does not contain unwanted domain-user-permissions.
"""
assert len(DomainUserPermission.objects.all()) != 0
response = client.get(
"/api/v1/domain-user-permissions/",
headers={"AUTHORIZATION": f"Basic {admin_user_auth_token}"},
)
json = response.json()

assert response.status_code == 200
assert len(json) == len(DomainUserPermission.objects.all())


@pytest.mark.django_db
def test_get_domain_user_permission_with_filters(
client: Client,
admin_user_auth_token: str,
user: User,
domain_user_permissions: list,
):
"""
arrange: log in an admin user.
act: submit a GET request for the domain user permission URL for a existing domain.
assert: a 200 is returned, the json result does not contain unwanted domain-user-permissions.
"""
assert len(DomainUserPermission.objects.filter()) != 0
response = client.get(
"/api/v1/domain-user-permissions/",
data={"fqdn": "example2.com", "username": user.username},
headers={"AUTHORIZATION": f"Basic {admin_user_auth_token}"},
)
json = response.json()

assert response.status_code == 200
assert len(json) > 0

for entry in json:
assert entry["domain"] == Domain.objects.get(fqdn=f"{FQDN_PREFIX}example2.com").id
assert entry["user"] == User.objects.get(username=user.username).id


@pytest.mark.django_db
def test_post_domain_user_permission_when_logged_in_as_non_admin_user(
client: Client, user_auth_token: str, domain: Domain, user: User
Expand Down
41 changes: 40 additions & 1 deletion httprequest_lego_provider/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from rest_framework.permissions import IsAdminUser

from .dns import remove_dns_record, write_dns_record
from .forms import CleanupForm, PresentForm
from .forms import FQDN_PREFIX, CleanupForm, PresentForm
from .models import Domain, DomainUserPermission
from .serializers import DomainSerializer, DomainUserPermissionSerializer, UserSerializer

Expand Down Expand Up @@ -91,6 +91,18 @@ class DomainViewSet(viewsets.ModelViewSet):
serializer_class = DomainSerializer
permission_classes = [IsAdminUser]

def get_queryset(self):
"""Optionally restricts the returned object list to a given domain.
Returns:
a filtered queryset against a `fqdn` query parameter in the URL.
"""
queryset = self.queryset
fqdn = self.request.query_params.get("fqdn")
if fqdn is not None:
queryset = queryset.filter(fqdn=f"{FQDN_PREFIX}{fqdn}")
return queryset


class DomainUserPermissionViewSet(viewsets.ModelViewSet):
"""Views for the DomainUserPermission.
Expand All @@ -105,6 +117,21 @@ class DomainUserPermissionViewSet(viewsets.ModelViewSet):
serializer_class = DomainUserPermissionSerializer
permission_classes = [IsAdminUser]

def get_queryset(self):
"""Optionally restricts the returned object list to a given user/domain.
Returns:
A filtered queryset against `username` / `fqdn` query parameters in the URL.
"""
queryset = self.queryset
username = self.request.query_params.get("username")
if username is not None:
queryset = queryset.filter(user__username=username)
fqdn = self.request.query_params.get("fqdn")
if fqdn is not None:
queryset = queryset.filter(domain__fqdn=f"{FQDN_PREFIX}{fqdn}")
return queryset


class UserViewSet(viewsets.ModelViewSet):
"""Views for the User.
Expand All @@ -118,3 +145,15 @@ class UserViewSet(viewsets.ModelViewSet):
queryset = User.objects.all().order_by("-date_joined")
serializer_class = UserSerializer
permission_classes = [IsAdminUser]

def get_queryset(self):
"""Optionally restricts the returned object list to a given user.
Returns:
A filtered queryset against a `username` query parameter in the URL.
"""
queryset = self.queryset
username = self.request.query_params.get("username")
if username is not None:
queryset = queryset.filter(username=username)
return queryset

0 comments on commit c1c0e1a

Please sign in to comment.