Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add filtering on fqdn/username for API calls #57

Merged
merged 3 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .trivyignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
CVE-2022-40897

Check notice on line 1 in .trivyignore

View workflow job for this annotation

GitHub Actions / integration-tests / Scan Image (httprequest-lego-provider_0.1_amd64.rock)

CVE-2022-40897 not present anymore, can be safely removed.
CVE-2024-6345

Check notice on line 2 in .trivyignore

View workflow job for this annotation

GitHub Actions / integration-tests / Scan Image (httprequest-lego-provider_0.1_amd64.rock)

CVE-2024-6345 not present anymore, can be safely removed.
CVE-2024-34156 # https://github.com/canonical/pebble/issues/498
CVE-2024-34156 # https://github.com/canonical/pebble/issues/498
21 changes: 21 additions & 0 deletions httprequest_lego_provider/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@ def user_fixture(username: str, user_password: str) -> User:
return User.objects.create_user(username, password=user_password)


@pytest.fixture(scope="module", name="other_username")
def other_username_fixture() -> str:
"""Provide another user username."""
return "other_user"


@pytest.fixture(scope="function", name="other_user")
def other_user_fixture(other_username: str) -> User:
"""Provide another user."""
return User.objects.create_user(other_username, password=None)


@pytest.fixture(scope="function", name="user_auth_token")
def user_auth_token_fixture(username: str, user_password: str, user: User) -> str:
"""Provide the auth_token for the default user."""
Expand Down Expand Up @@ -79,6 +91,15 @@ def domain_fixture(fqdn: str) -> Domain:
return Domain.objects.create(fqdn=f"{FQDN_PREFIX}{fqdn}")


@pytest.fixture(scope="function", name="domains")
def domains_fixture(fqdns: list) -> list:
"""Create all domains and return the list of Domain objects."""
domains = []
for fqdn in fqdns:
domains.append(Domain.objects.create(fqdn=f"{FQDN_PREFIX}{fqdn}"))
return domains


@pytest.fixture(scope="function", name="domain_user_permission")
def domain_user_permission_fixture(domain: Domain, user: User) -> DomainUserPermission:
"""Provide a valid domain user permission."""
Expand Down
147 changes: 147 additions & 0 deletions httprequest_lego_provider/tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,62 @@ def test_test_jwt_token_login(
assert response.status_code == 204


@pytest.mark.django_db
def test_get_domain_when_logged_in_as_non_admin_user(client: Client, user_auth_token: str):
"""
arrange: log in a non-admin user.
act: submit a GET request for the domain URL.
assert: a 403 is returned
"""
response = client.get(
"/api/v1/domains/",
format="json",
headers={"AUTHORIZATION": f"Basic {user_auth_token}"},
)

assert response.status_code == 403


@pytest.mark.django_db
def test_get_domain_when_logged_in_as_admin_user(
client: Client, admin_user_auth_token: str, domains: list
):
"""
arrange: log in an admin user.
act: submit a GET request for the domain URL.
assert: a 200 is returned and the domains are all returned.
"""
assert len(Domain.objects.all()) != 0
response = client.get(
"/api/v1/domains/",
format="json",
headers={"AUTHORIZATION": f"Basic {admin_user_auth_token}"},
)
json = response.json()

assert response.status_code == 200
assert len(json) == len(domains)


@pytest.mark.django_db
def test_get_domain_with_fqdn_filter(client: Client, admin_user_auth_token: str, domains: list):
"""
arrange: log in an admin user.
act: submit a GET request for the domain URL.
assert: a 200 is returned and the domain matching FQDN is returned.
"""
response = client.get(
"/api/v1/domains/?fqdn=example2.com",
format="json",
headers={"AUTHORIZATION": f"Basic {admin_user_auth_token}"},
)
json = response.json()

assert response.status_code == 200
assert len(json) == 1
assert json[0]["fqdn"] == f"{FQDN_PREFIX}example2.com"


@pytest.mark.django_db
def test_post_domain_when_logged_in_as_non_admin_user(client: Client, user_auth_token: str):
"""
Expand Down Expand Up @@ -379,6 +435,75 @@ def test_post_domain_when_logged_in_as_admin_user_and_domain_invalid(
assert response.status_code == 400


@pytest.mark.django_db
def test_get_domain_user_permission_when_logged_in_as_non_admin_user(
client: Client, user_auth_token: str, domain: Domain, user: User
):
"""
arrange: log in a non-admin user.
act: submit a GET request for the domain user permission URL.
assert: a 403 is returned
"""
response = client.get(
"/api/v1/domain-user-permissions/",
format="json",
headers={"AUTHORIZATION": f"Basic {user_auth_token}"},
)

assert response.status_code == 403


@pytest.mark.django_db
def test_get_domain_user_permission_when_logged_in_as_admin_user(
client: Client,
admin_user_auth_token: str,
user: User,
domain_user_permissions: list,
):
"""
arrange: log in an admin user.
act: submit a GET request for the domain user permission URL for a existing domain.
assert: a 200 is returned, the json result does not contain unwanted domain-user-permissions.
"""
assert len(DomainUserPermission.objects.all()) != 0
response = client.get(
"/api/v1/domain-user-permissions/",
headers={"AUTHORIZATION": f"Basic {admin_user_auth_token}"},
)
json = response.json()

assert response.status_code == 200
assert len(json) == len(DomainUserPermission.objects.all())


@pytest.mark.django_db
def test_get_domain_user_permission_with_filters(
client: Client,
admin_user_auth_token: str,
user: User,
domain_user_permissions: list,
):
"""
arrange: log in an admin user.
act: submit a GET request for the domain user permission URL for a existing domain.
assert: a 200 is returned, the json result does not contain unwanted domain-user-permissions.
"""
assert len(DomainUserPermission.objects.filter()) != 0
response = client.get(
"/api/v1/domain-user-permissions/",
data={"fqdn": "example2.com", "username": user.username},
headers={"AUTHORIZATION": f"Basic {admin_user_auth_token}"},
)
json = response.json()

assert response.status_code == 200
assert len(json) > 0

for entry in json:
assert entry["domain"] == Domain.objects.get(fqdn=f"{FQDN_PREFIX}example2.com").id
assert entry["user"] == User.objects.get(username=user.username).id


@pytest.mark.django_db
def test_post_domain_user_permission_when_logged_in_as_non_admin_user(
client: Client, user_auth_token: str, domain: Domain, user: User
Expand Down Expand Up @@ -495,6 +620,28 @@ def test_get_user_when_logged_in_as_admin_user(
assert "password" not in entry


@pytest.mark.django_db
def test_get_user_with_username_filter(client: Client, admin_user_auth_token: str, user: User):
"""
arrange: log in an admin user.
act: submit a GET request for the user URL.
assert: a 200 is returned and the json result matches the requested username.
"""
response = client.get(
"/api/v1/users/",
data={"username": user.username},
format="json",
headers={"AUTHORIZATION": f"Basic {admin_user_auth_token}"},
)
json = response.json()

assert len(User.objects.all()) > 0
assert response.status_code == 200
assert len(json) == 1
for entry in json:
assert entry["username"] == user.username


@pytest.mark.django_db
def test_post_user_when_logged_in_as_non_admin_user(client: Client, user_auth_token: str):
"""
Expand Down
41 changes: 40 additions & 1 deletion httprequest_lego_provider/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from rest_framework.permissions import IsAdminUser

from .dns import remove_dns_record, write_dns_record
from .forms import CleanupForm, PresentForm
from .forms import FQDN_PREFIX, CleanupForm, PresentForm
from .models import Domain, DomainUserPermission
from .serializers import DomainSerializer, DomainUserPermissionSerializer, UserSerializer

Expand Down Expand Up @@ -91,6 +91,18 @@ class DomainViewSet(viewsets.ModelViewSet):
serializer_class = DomainSerializer
permission_classes = [IsAdminUser]

def get_queryset(self):
"""Optionally restricts the returned object list to a given domain.

Returns:
a filtered queryset against a `fqdn` query parameter in the URL.
"""
queryset = self.queryset
fqdn = self.request.query_params.get("fqdn")
if fqdn is not None:
queryset = queryset.filter(fqdn=f"{FQDN_PREFIX}{fqdn}")
return queryset


class DomainUserPermissionViewSet(viewsets.ModelViewSet):
"""Views for the DomainUserPermission.
Expand All @@ -105,6 +117,21 @@ class DomainUserPermissionViewSet(viewsets.ModelViewSet):
serializer_class = DomainUserPermissionSerializer
permission_classes = [IsAdminUser]

def get_queryset(self):
"""Optionally restricts the returned object list to a given user/domain.

Returns:
A filtered queryset against `username` / `fqdn` query parameters in the URL.
"""
queryset = self.queryset
username = self.request.query_params.get("username")
if username is not None:
queryset = queryset.filter(user__username=username)
fqdn = self.request.query_params.get("fqdn")
if fqdn is not None:
queryset = queryset.filter(domain__fqdn=f"{FQDN_PREFIX}{fqdn}")
return queryset


class UserViewSet(viewsets.ModelViewSet):
"""Views for the User.
Expand All @@ -118,3 +145,15 @@ class UserViewSet(viewsets.ModelViewSet):
queryset = User.objects.all().order_by("-date_joined")
serializer_class = UserSerializer
permission_classes = [IsAdminUser]

def get_queryset(self):
"""Optionally restricts the returned object list to a given user.

Returns:
A filtered queryset against a `username` query parameter in the URL.
"""
queryset = self.queryset
username = self.request.query_params.get("username")
if username is not None:
queryset = queryset.filter(username=username)
return queryset
Loading