diff --git a/.github/ISSUE_TEMPLATE/bug_report.yml b/.github/ISSUE_TEMPLATE/bug_report.yml new file mode 100644 index 0000000..466be6c --- /dev/null +++ b/.github/ISSUE_TEMPLATE/bug_report.yml @@ -0,0 +1,57 @@ +name: Bug Report +description: File a bug report +labels: ["Type: Bug", "Status: Triage"] +body: + - type: markdown + attributes: + value: > + Thanks for taking the time to fill out this bug report! Before submitting your issue, please make + sure you are using the latest version of the charm. If not, please switch to this image prior to + posting your report to make sure it's not already solved. + - type: textarea + id: bug-description + attributes: + label: Bug Description + description: > + If applicable, add screenshots to help explain the problem you are facing. + validations: + required: true + - type: textarea + id: reproduction + attributes: + label: To Reproduce + description: > + Please provide a step-by-step instruction of how to reproduce the behavior. + placeholder: | + 1. `juju deploy ...` + 2. `juju relate ...` + 3. `juju status --relations` + validations: + required: true + - type: textarea + id: environment + attributes: + label: Environment + description: > + We need to know a bit more about the context in which you run the charm. + - Are you running Juju locally, on lxd, in multipass or on some other platform? + - What track and channel you deployed the charm from (i.e. `latest/edge` or similar). + - Version of any applicable components, like the juju snap, the model controller, lxd, microk8s, and/or multipass. + validations: + required: true + - type: textarea + id: logs + attributes: + label: Relevant log output + description: > + Please copy and paste any relevant log output. This will be automatically formatted into code, so no need for backticks. + Fetch the logs using `juju debug-log --replay` and `kubectl logs ...`. Additional details available in the juju docs + at https://juju.is/docs/olm/juju-logs + render: shell + validations: + required: true + - type: textarea + id: additional-context + attributes: + label: Additional context + diff --git a/.github/ISSUE_TEMPLATE/enhancement_proposal.yml b/.github/ISSUE_TEMPLATE/enhancement_proposal.yml new file mode 100644 index 0000000..b2348b9 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/enhancement_proposal.yml @@ -0,0 +1,17 @@ +name: Enhancement Proposal +description: File an enhancement proposal +labels: ["Type: Enhancement", "Status: Triage"] +body: + - type: markdown + attributes: + value: > + Thanks for taking the time to fill out this enhancement proposal! Before submitting your issue, please make + sure there isn't already a prior issue concerning this. If there is, please join that discussion instead. + - type: textarea + id: enhancement-proposal + attributes: + label: Enhancement Proposal + description: > + Describe the enhancement you would like to see in as much detail as needed. + validations: + required: true diff --git a/.github/pull_request_template.yaml b/.github/pull_request_template.yaml new file mode 100644 index 0000000..ab8e713 --- /dev/null +++ b/.github/pull_request_template.yaml @@ -0,0 +1,20 @@ +Applicable spec: + +### Overview + + + +### Rationale + + + +### Module Changes + + + +### Checklist + +- [ ] The [contributing guide](https://github.com/canonical/is-charms-contributing-guide) was applied +- [ ] The PR is tagged with appropriate label (`urgent`, `trivial`, `complex`) + + diff --git a/.github/workflows/bot_pr_approval.yaml b/.github/workflows/bot_pr_approval.yaml new file mode 100644 index 0000000..f284fd7 --- /dev/null +++ b/.github/workflows/bot_pr_approval.yaml @@ -0,0 +1,10 @@ +name: Provide approval for bot PRs + +on: + pull_request: + +jobs: + bot_pr_approval: + uses: canonical/operator-workflows/.github/workflows/bot_pr_approval.yaml@main + secrets: inherit + diff --git a/.github/workflows/comment.yaml b/.github/workflows/comment.yaml new file mode 100644 index 0000000..26ac226 --- /dev/null +++ b/.github/workflows/comment.yaml @@ -0,0 +1,12 @@ +name: Comment on the pull request + +on: + workflow_run: + workflows: ["Tests"] + types: + - completed + +jobs: + comment-on-pr: + uses: canonical/operator-workflows/.github/workflows/comment.yaml@main + secrets: inherit diff --git a/.github/workflows/comment_contributing.yaml b/.github/workflows/comment_contributing.yaml new file mode 100644 index 0000000..c51f873 --- /dev/null +++ b/.github/workflows/comment_contributing.yaml @@ -0,0 +1,13 @@ +name: Comment on the pull request + +on: + pull_request: + types: + - opened + branches: + - 'track/**' + +jobs: + comment-on-pr: + uses: canonical/operator-workflows/.github/workflows/comment_contributing.yaml@main + secrets: inherit diff --git a/.github/workflows/issues.yaml b/.github/workflows/issues.yaml new file mode 100644 index 0000000..138fe82 --- /dev/null +++ b/.github/workflows/issues.yaml @@ -0,0 +1,11 @@ +name: Sync issues to Jira + +on: + issues: + # available via github.event.action + types: [opened, reopened, closed] + +jobs: + issues-to-jira: + uses: canonical/operator-workflows/.github/workflows/jira.yaml@main + secrets: inherit diff --git a/.github/workflows/publish_charm.yaml b/.github/workflows/publish_charm.yaml new file mode 100644 index 0000000..399a4b5 --- /dev/null +++ b/.github/workflows/publish_charm.yaml @@ -0,0 +1,15 @@ +name: Publish to edge + +on: + push: + branches: + - main + - track/* + +jobs: + publish-to-edge: + uses: canonical/operator-workflows/.github/workflows/publish_charm.yaml@main + secrets: inherit + with: + integration-test-workflow-file: test.yaml + resource-mapping: '{"httprequest-lego-provider": "django-app-image"}' diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml new file mode 100644 index 0000000..bc550c0 --- /dev/null +++ b/.github/workflows/test.yaml @@ -0,0 +1,18 @@ +name: Tests + +on: + pull_request: + +jobs: + unit-tests: + uses: canonical/operator-workflows/.github/workflows/test.yaml@main + secrets: inherit + with: + self-hosted-runner: true + self-hosted-runner-label: "edge" + integration-tests: + uses: canonical/operator-workflows/.github/workflows/integration_test.yaml@main + secrets: inherit + with: + channel: 1.28-strict/stable + juju-channel: 3.4/stable diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..cd31ed5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,13 @@ +venv/ +build/ +*.charm +.tox/ +.coverage +__pycache__/ +*.py[cod] +.idea +.vscode +.mypy_cache +*.egg-info/ +**/*.rock +*/*.sqlite3 diff --git a/.jujuignore b/.jujuignore new file mode 100644 index 0000000..65f4410 --- /dev/null +++ b/.jujuignore @@ -0,0 +1,4 @@ +/venv +*.py[cod] +*.charm +/.github diff --git a/.licenserc.yaml b/.licenserc.yaml new file mode 100644 index 0000000..099beaf --- /dev/null +++ b/.licenserc.yaml @@ -0,0 +1,40 @@ +header: + license: + spdx-id: Apache-2.0 + copyright-owner: Canonical Ltd. + content: | + Copyright [year] [owner] + See LICENSE file for licensing details. + paths: + - '**' + paths-ignore: + - '.github/**' + - '**/.gitkeep' + - '**/*.cfg' + - '**/*.conf' + - '**/*.j2' + - '**/*.json' + - '**/*.md' + - '**/*.rule' + - '**/*.tmpl' + - '**/*.txt' + - '.codespellignore' + - '.dockerignore' + - '.flake8' + - '.jujuignore' + - '.gitignore' + - '.licenserc.yaml' + - '.trivyignore' + - '.woke.yaml' + - '.woke.yml' + - 'CODEOWNERS' + - 'icon.svg' + - 'LICENSE' + - 'trivy.yaml' + - 'pyproject.toml' + - 'zap_rules.tsv' + - 'manage.py' + - 'app/**' + - 'charm/lib/**' + - '**/migrations/**' + comment: on-failure diff --git a/.trivyignore b/.trivyignore new file mode 100644 index 0000000..b4a632a --- /dev/null +++ b/.trivyignore @@ -0,0 +1,3 @@ +CVE-2022-40897 +CVE-2024-6345 +CVE-2024-34156 # https://github.com/canonical/pebble/issues/498 \ No newline at end of file diff --git a/.woke.yaml b/.woke.yaml new file mode 100644 index 0000000..a33a7a6 --- /dev/null +++ b/.woke.yaml @@ -0,0 +1,4 @@ +ignore_files: + # Ignore ingress charm library as it uses non compliant terminology: + # whitelist. + - charm/lib/charms/redis_k8s/v0/redis.py diff --git a/CODEOWNERS b/CODEOWNERS new file mode 100644 index 0000000..e3e8c01 --- /dev/null +++ b/CODEOWNERS @@ -0,0 +1 @@ +* @canonical/is-charms \ No newline at end of file diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..288f021 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,33 @@ +# Contributing + +To make contributions to this charm, you'll need a working [development setup](https://juju.is/docs/sdk/dev-setup). + +You can create an environment for development with `tox`: + +```shell +tox devenv -e integration +source venv/bin/activate +``` + +## Testing + +This project uses `tox` for managing test environments. There are some pre-configured environments +that can be used for linting and formatting code when you're preparing contributions to the charm: + +```shell +tox run -e format # update your code according to linting rules +tox run -e lint # code style +tox run -e unit # unit tests +tox run -e integration # integration tests +tox # runs 'format', 'lint', and 'unit' environments +``` + +## Build the charm + +Build the charm in this git repository using: + +```shell +charmcraft pack +``` + + +{% block content %} + + {% if form.errors %} +

Your username and password didn't match. Please try again.

+ {% endif %} + + {% if next %} + {% if user.is_authenticated %} +

Your account doesn't have access to this page. To proceed, + please login with an account that has access.

+ {% else %} +

Please login to see this page.

+ {% endif %} + {% endif %} + +
+ {% csrf_token %} + + + + + + + + + +
{{ form.username.label_tag }}{{ form.username }}
{{ form.password.label_tag }}{{ form.password }}
+ + +
+ + {# Assumes you set up the password_reset view in your URLconf #} +

Lost password?

+ +{% endblock %} diff --git a/httprequest_lego_provider/tests/conftest.py b/httprequest_lego_provider/tests/conftest.py new file mode 100644 index 0000000..514c6c0 --- /dev/null +++ b/httprequest_lego_provider/tests/conftest.py @@ -0,0 +1,126 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Fixtures for unit tests.""" + +# pylint:disable=unused-argument + +import base64 +import secrets + +import pytest +from django.contrib.auth.models import User + +from httprequest_lego_provider.forms import FQDN_PREFIX +from httprequest_lego_provider.models import Domain, DomainUserPermission + + +@pytest.fixture(scope="module", name="username") +def username_fixture() -> str: + """Provide a default username.""" + return "test_user" + + +@pytest.fixture(scope="module", name="user_password") +def user_password_fixture() -> str: + """Provide a default user password.""" + return secrets.token_hex() + + +@pytest.fixture(scope="function", name="user") +def user_fixture(username: str, user_password: str) -> User: + """Provide a default user.""" + return User.objects.create_user(username, password=user_password) + + +@pytest.fixture(scope="module", name="other_username") +def other_username_fixture() -> str: + """Provide another user username.""" + return "other_user" + + +@pytest.fixture(scope="function", name="other_user") +def other_user_fixture(other_username: str) -> User: + """Provide another user.""" + return User.objects.create_user(other_username, password=None) + + +@pytest.fixture(scope="function", name="user_auth_token") +def user_auth_token_fixture(username: str, user_password: str, user: User) -> str: + """Provide the auth_token for the default user.""" + return base64.b64encode(bytes(f"{username}:{user_password}", "utf-8")).decode("utf-8") + + +@pytest.fixture(scope="module", name="admin_username") +def admin_username_fixture() -> str: + """Provide an admin username.""" + return "test_admin_user" + + +@pytest.fixture(scope="module", name="admin_user_password") +def admin_user_password_fixture() -> str: + """Provide an admin user password.""" + return secrets.token_hex() + + +@pytest.fixture(scope="function", name="admin_user") +def admin_user_fixture(admin_username: str, admin_user_password: str) -> User: + """Provide an admin user.""" + return User.objects.create_user(admin_username, password=admin_user_password, is_staff=True) + + +@pytest.fixture(scope="function", name="admin_user_auth_token") +def admin_user_auth_token_fixture( + admin_username: str, admin_user_password: str, admin_user: User +) -> str: + """Provide the auth_token for the admin user.""" + return base64.b64encode(bytes(f"{admin_username}:{admin_user_password}", "utf-8")).decode( + "utf-8" + ) + + +@pytest.fixture(scope="module", name="fqdn") +def fqdn_fixture() -> str: + """Provide a valid FQDN.""" + return "example.com" + + +@pytest.fixture(scope="function", name="domain") +def domain_fixture(fqdn: str) -> Domain: + """Provide a valid domain.""" + return Domain.objects.create(fqdn=f"{FQDN_PREFIX}{fqdn}") + + +@pytest.fixture(scope="function", name="domains") +def domains_fixture(fqdns: list) -> list: + """Create all domains and return the list of Domain objects.""" + domains = [] + for fqdn in fqdns: + domains.append(Domain.objects.create(fqdn=f"{FQDN_PREFIX}{fqdn}")) + return domains + + +@pytest.fixture(scope="function", name="domain_user_permission") +def domain_user_permission_fixture(domain: Domain, user: User) -> DomainUserPermission: + """Provide a valid domain user permission.""" + return DomainUserPermission.objects.create(domain=domain, user=user) + + +@pytest.fixture(scope="module", name="fqdns") +def fqdns_fixture() -> list[str]: + """Provide a list of valid FQDNs.""" + return ["some.com", "example2.com", "example.es"] + + +@pytest.fixture(scope="function", name="domain_user_permissions") +def domain_user_permissions_fixture(fqdns: list[str], user: User) -> list[DomainUserPermission]: + """Provide list of valid domain user permissions.""" + domains = [] + for fqdn in fqdns: + domain = Domain.objects.create(fqdn=f"{FQDN_PREFIX}{fqdn}") + domains.append(domain) + dups = [] + for domain in domains: + dup = DomainUserPermission.objects.create(domain=domain, user=user) + dups.append(dup) + return dups diff --git a/httprequest_lego_provider/tests/management/__init__.py b/httprequest_lego_provider/tests/management/__init__.py new file mode 100644 index 0000000..57adae2 --- /dev/null +++ b/httprequest_lego_provider/tests/management/__init__.py @@ -0,0 +1,3 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. +"""HTTPRequest management tests.""" diff --git a/httprequest_lego_provider/tests/management/commands/__init__.py b/httprequest_lego_provider/tests/management/commands/__init__.py new file mode 100644 index 0000000..1460f03 --- /dev/null +++ b/httprequest_lego_provider/tests/management/commands/__init__.py @@ -0,0 +1,3 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. +"""HTTPRequest management commands tests.""" diff --git a/httprequest_lego_provider/tests/management/commands/test_allow_domains.py b/httprequest_lego_provider/tests/management/commands/test_allow_domains.py new file mode 100644 index 0000000..ed5d44d --- /dev/null +++ b/httprequest_lego_provider/tests/management/commands/test_allow_domains.py @@ -0,0 +1,40 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. +"""Unit tests for the allow_domains module.""" + +# imported-auth-user has to be disable as the conflicting import is needed for typing +# pylint:disable=imported-auth-user + +import pytest +from django.contrib.auth.models import User +from django.core.management import call_command +from django.core.management.base import CommandError + +from httprequest_lego_provider.forms import FQDN_PREFIX +from httprequest_lego_provider.models import DomainUserPermission + + +@pytest.mark.django_db +def test_allow_domains(user: User, fqdns: list[str]): + """ + arrange: given a user. + act: call the allow_domains command. + assert: new domains are created an associated to the user. + """ + mixed_prefix_fqdns = fqdns.copy() + mixed_prefix_fqdns[0] = f"{FQDN_PREFIX}{fqdns[0]}" + call_command("allow_domains", user.username, *mixed_prefix_fqdns) + + dups = DomainUserPermission.objects.filter(user=user) + assert [dup.domain.fqdn for dup in dups] == [f"{FQDN_PREFIX}{fqdn}" for fqdn in fqdns] + + +@pytest.mark.django_db +def test_allow_domains_raises_exception(fqdns: list[str]): + """ + arrange: do nothing. + act: call the allow_domains command for a non existing user. + assert: a CommandError exception is raised. + """ + with pytest.raises(CommandError): + call_command("allow_domains", "non-existing-user", *fqdns) diff --git a/httprequest_lego_provider/tests/management/commands/test_create_user.py b/httprequest_lego_provider/tests/management/commands/test_create_user.py new file mode 100644 index 0000000..c217261 --- /dev/null +++ b/httprequest_lego_provider/tests/management/commands/test_create_user.py @@ -0,0 +1,27 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. +"""Unit tests for the create_user module.""" + +# imported-auth-user has to be disable as the conflicting import is needed for typing +# pylint:disable=imported-auth-user + +from io import StringIO + +import pytest +from django.contrib.auth.models import User +from django.core.management import call_command + + +@pytest.mark.django_db +def test_create_user(username: str, user_password: str): + """ + arrange: do nothing. + act: call the create_username command. + assert: a new user is inserted in the database. + """ + out = StringIO() + call_command("create_user", username, user_password, stdout=out) + user = User.objects.get(username=username) + assert user.username == username + assert user.check_password(user_password) + assert user_password in out.getvalue() diff --git a/httprequest_lego_provider/tests/management/commands/test_list_domains.py b/httprequest_lego_provider/tests/management/commands/test_list_domains.py new file mode 100644 index 0000000..3269b0f --- /dev/null +++ b/httprequest_lego_provider/tests/management/commands/test_list_domains.py @@ -0,0 +1,54 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. +"""Unit tests for the list_domains module.""" + +from io import StringIO + +import pytest +from django.core.management import call_command +from django.core.management.base import CommandError + +from httprequest_lego_provider.models import DomainUserPermission + + +@pytest.mark.django_db +def test_list_domains(domain_user_permissions: list[DomainUserPermission]): + """ + arrange: given existing domains allowed for an user. + act: call the list_domains command. + assert: the list of associated domains is returned in the stdout. + """ + out = StringIO() + call_command("list_domains", domain_user_permissions[0].user.username, stdout=out) + for dup in domain_user_permissions: + assert dup.domain.fqdn in out.getvalue() + + +@pytest.mark.django_db +def test_list_domains_all_users(domain_user_permissions: list[DomainUserPermission]): + """ + arrange: given existing domains allowed for all users. + act: call the list_domains command. + assert: the list of associated domains is returned in the stdout. + """ + out = StringIO() + call_command("list_domains", "*", stdout=out) + # Username on one line with a semi-colon followed by list of domains + # they have access to. + expected_output = ( + "test_user:\n" + "_acme-challenge.some.com, _acme-challenge.example2.com, " + "_acme-challenge.example.es" + ) + assert expected_output in out.getvalue() + + +@pytest.mark.django_db +def test_list_domains_raises_exception(fqdns: list[str]): + """ + arrange: do nothing. + act: call the list_domains command for a non existing user. + assert: a CommandError exception is raised. + """ + with pytest.raises(CommandError): + call_command("list_domains", "non-existing-user") diff --git a/httprequest_lego_provider/tests/management/commands/test_revoke_domains.py b/httprequest_lego_provider/tests/management/commands/test_revoke_domains.py new file mode 100644 index 0000000..52d2683 --- /dev/null +++ b/httprequest_lego_provider/tests/management/commands/test_revoke_domains.py @@ -0,0 +1,38 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. +"""Unit tests for the revoke_domains module.""" + +import pytest +from django.core.management import call_command +from django.core.management.base import CommandError + +from httprequest_lego_provider.forms import FQDN_PREFIX +from httprequest_lego_provider.models import DomainUserPermission + + +@pytest.mark.django_db +def test_revoke_domains(domain_user_permissions: list[DomainUserPermission]): + """ + arrange: given a user. + act: call the revoke_domains command for a subset of the allowed domains. + assert: the domains are revoked for the user and the rest are still allowed. + """ + fqdns = [dup.domain.fqdn for dup in domain_user_permissions] + prefix_index = len(FQDN_PREFIX) + revoke_fqdns = [fqdns[0][prefix_index:], fqdns[1]] + allowed_fqdns = fqdns[2:] + call_command("revoke_domains", domain_user_permissions[0].user.username, *revoke_fqdns) + + dups = DomainUserPermission.objects.filter(user=domain_user_permissions[0].user) + assert [dup.domain.fqdn for dup in dups] == allowed_fqdns + + +@pytest.mark.django_db +def test_revoke_domains_raises_exception(fqdns: list[str]): + """ + arrange: do nothing. + act: call the revoke_domains command for a non existing user. + assert: a CommandError exception is raised. + """ + with pytest.raises(CommandError): + call_command("revoke_domains", "non-existing-user", *fqdns) diff --git a/httprequest_lego_provider/tests/settings.py b/httprequest_lego_provider/tests/settings.py new file mode 100644 index 0000000..7219e1d --- /dev/null +++ b/httprequest_lego_provider/tests/settings.py @@ -0,0 +1,42 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. +"""Test settings.""" + +# pylint:disable=wildcard-import,unused-wildcard-import + +import datetime +import os +import secrets +from pathlib import Path + +from app.settings import * # noqa: F401, F403 + +# Build paths inside the project like this: BASE_DIR / 'subdir'. +BASE_DIR = Path(__file__).resolve().parent.parent + + +# Quick-start development settings - unsuitable for production +# See https://docs.djangoproject.com/en/4.2/howto/deployment/checklist/ + +# SECURITY WARNING: keep the secret key used in production secret! +SECRET_KEY = secrets.token_hex() + +# SECURITY WARNING: don't run with debug turned on in production! +DEBUG = True + +# Database +# https://docs.djangoproject.com/en/4.2/ref/settings/#databases + +DATABASES = { + "default": { + "ENGINE": "django.db.backends.sqlite3", + "NAME": os.path.join(BASE_DIR, "db.sqlite3"), + } +} + +SIMPLE_JWT = { + "ACCESS_TOKEN_LIFETIME": datetime.timedelta(days=1), + "REFRESH_TOKEN_LIFETIME": datetime.timedelta(days=7), + "ROTATE_REFRESH_TOKENS": True, + "SIGNING_KEY": SECRET_KEY, +} diff --git a/httprequest_lego_provider/tests/test_dns.py b/httprequest_lego_provider/tests/test_dns.py new file mode 100644 index 0000000..5cd3978 --- /dev/null +++ b/httprequest_lego_provider/tests/test_dns.py @@ -0,0 +1,153 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. +"""Unit tests for the dns module.""" + +import secrets +from pathlib import Path +from unittest.mock import ANY, MagicMock, Mock, patch + +import pytest +from git import GitCommandError, Repo + +from httprequest_lego_provider.dns import ( + DnsSourceUpdateError, + parse_repository_url, + remove_dns_record, + write_dns_record, +) + + +@patch.object(Path, "write_text") +@patch.object(Repo, "clone_from") +@patch("httprequest_lego_provider.dns.GIT_REPO_URL", "git+ssh://user@git.server/repo_name") +def test_write_dns_record_raises_exception(repo_patch: Mock, _): + """ + arrange: mock the repo so that it raises a GitCommandError. + act: attempt to write a new DNS record. + assert: a DnsSourceUpdateError exception is raised. + """ + repo_patch.side_effect = GitCommandError("Error executing command") + + fqdn = "site.example.com" + + with pytest.raises(DnsSourceUpdateError): + write_dns_record(fqdn, secrets.token_hex()) + + +@pytest.mark.parametrize( + "fqdn,record", + [ + ("site.example.com", "site 600 IN TXT \042{token}\042\n"), + ("sïte.example.com", "sïte 600 IN TXT \042{token}\042\n"), + ("example.com", ". 600 IN TXT \042{token}\042\n"), + ("some.other.site.example.com", "some.other.site 600 IN TXT \042{token}\042\n"), + ], +) +@patch.object(Path, "write_text") +@patch.object(Path, "read_text") +@patch.object(Repo, "clone_from") +@patch("httprequest_lego_provider.dns.GIT_REPO_URL", "git+ssh://user@git.server/repo_name@lego") +def test_write_dns_record( + repo_patch: Mock, read_patch: Mock, write_patch: Mock, fqdn: str, record: str +): + """ + arrange: mock the repo. + act: attempt to write a new DNS record. + assert: a new file with filename matching the record is committed and pushed to the repository. + """ + repo_mock = MagicMock(spec=Repo) + repo_patch.return_value = repo_mock + token = secrets.token_hex() + read_patch.return_value = ( + "site2 600 IN TXT \042sometoken\042\n" + "sïte1 600 IN TXT \042sometoken\042\n" + "site3 600 IN TXT \042sometoken\042\n" + ) + write_dns_record(fqdn, token) + + repo_patch.assert_called_once_with("git+ssh://user@git.server/repo_name", ANY, branch="lego") + repo_mock.config_writer().set_value.assert_called_once_with("user", "name", "user") + write_patch.assert_called_once_with( + ( + "site2 600 IN TXT \042sometoken\042\n" + "sïte1 600 IN TXT \042sometoken\042\n" + "site3 600 IN TXT \042sometoken\042\n" + record + ).format(token=token), + encoding="utf-8", + ) + repo_mock.index.add.assert_called_with(["example.com.domain"]) + repo_mock.git.commit.assert_called_once() + repo_mock.remote(name="origin").push.assert_called_once() + + +@patch.object(Path, "write_text") +@patch.object(Repo, "clone_from") +@patch("httprequest_lego_provider.dns.GIT_REPO_URL", "git+ssh://user@git.server/repo_name") +def test_remove_dns_record_raises_exception(repo_patch: Mock, _): + """ + arrange: mock the repo so that it raises a GitCommandError. + act: attempt to remove a DNS record. + assert: a DnsSourceUpdateError exception is raised. + """ + repo_patch.side_effect = GitCommandError("Error executing command") + + fqdn = "site.example.com" + + with pytest.raises(DnsSourceUpdateError): + remove_dns_record(fqdn) + + +@pytest.mark.parametrize( + "fqdn,record", + [ + ("site.example.com", "site 600 IN TXT \042{token}\042\n"), + ("sïte.example.com", "sïte 600 IN TXT \042{token}\042\n"), + ("example.com", ". 600 IN TXT \042{token}\042\n"), + ("some.other.site.example.com", "some.other.site 600 IN TXT \042{token}\042\n"), + ], +) +@patch.object(Path, "write_text") +@patch.object(Path, "read_text") +@patch.object(Repo, "clone_from") +@patch("httprequest_lego_provider.dns.GIT_REPO_URL", "git+ssh://user@git.server/repo_name") +def test_remove_dns_record( + repo_patch: Mock, read_patch: Mock, write_patch: Mock, fqdn: str, record: str +): + """ + arrange: mock the repo and filesystem so that the file matching a DNS exists. + act: attempt to delete a new DNS record. + assert: the file with filename matching the record is emptied and pushed to the repository. + """ + repo_mock = MagicMock(spec=Repo) + repo_patch.return_value = repo_mock + read_patch.return_value = ( + "site1 600 IN TXT \042sometoken\042\n" + record + "site3 600 IN TXT \042sometoken\042\n" + ) + + remove_dns_record(fqdn) + + repo_patch.assert_called_once_with("git+ssh://user@git.server/repo_name", ANY, branch=None) + repo_mock.config_writer().set_value.assert_called_once_with("user", "name", "user") + write_patch.assert_called_once_with( + "site1 600 IN TXT \042sometoken\042\nsite3 600 IN TXT \042sometoken\042\n", + encoding="utf-8", + ) + repo_mock.index.add.assert_called_with(["example.com.domain"]) + repo_mock.git.commit.assert_called_once() + repo_mock.remote(name="origin").push.assert_called_once() + + +def test_parse_repository_url(): + """ + arrange: do nothing. + act: given a set of valid repository connection strings. + assert: the connection strings are parsed successfully. + """ + user, url, branch = parse_repository_url("git+ssh://user@git.server/repo_name") + assert user == "user" + assert url == "git+ssh://user@git.server/repo_name" + assert branch is None + user, url, branch = parse_repository_url("git+ssh://user1@git.server:8080/repo_name@main") + assert user == "user1" + assert url == "git+ssh://user1@git.server:8080/repo_name" + assert branch == "main" diff --git a/httprequest_lego_provider/tests/test_forms.py b/httprequest_lego_provider/tests/test_forms.py new file mode 100644 index 0000000..97df7c1 --- /dev/null +++ b/httprequest_lego_provider/tests/test_forms.py @@ -0,0 +1,22 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. +"""Unit tests for the forms module.""" + +from httprequest_lego_provider.forms import is_fqdn_compliant + + +def test_is_fqdn_compliant(): + """ + arrange: do nothing. + act: do nothing. + assert: FQDN should start with '_acme-challenge' and be valid. + """ + assert not is_fqdn_compliant("example.com") + assert not is_fqdn_compliant("smth.example.com") + assert not is_fqdn_compliant("_acme-challenge.") + assert not is_fqdn_compliant("com") + assert not is_fqdn_compliant("_acme-challenge.com") + assert not is_fqdn_compliant("_acme-challenge1.example.com") + assert is_fqdn_compliant("_acme-challenge.example.com") + assert is_fqdn_compliant("_acme-challenge.1example.com") + assert is_fqdn_compliant("_acme-challenge.example.com.") diff --git a/httprequest_lego_provider/tests/test_views.py b/httprequest_lego_provider/tests/test_views.py new file mode 100644 index 0000000..6625a8e --- /dev/null +++ b/httprequest_lego_provider/tests/test_views.py @@ -0,0 +1,701 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. +"""Unit tests for the views module.""" + +# imported-auth-user has to be disable as the conflicting import is needed for typing +# pylint:disable=imported-auth-user + +import base64 +import json +import secrets +from unittest.mock import patch + +import pytest +from django.contrib.auth.hashers import check_password +from django.contrib.auth.models import User +from django.test import Client + +from httprequest_lego_provider.forms import FQDN_PREFIX +from httprequest_lego_provider.models import Domain, DomainUserPermission + + +@pytest.mark.django_db +def test_post_present_when_not_logged_in(client: Client): + """ + arrange: do nothing. + act: submit a POST request for the present URL. + assert: a 401 is returned. + """ + response = client.post("/present") + + assert response.status_code == 401 + + +@pytest.mark.django_db +def test_post_present_when_auth_header_empty(client: Client): + """ + arrange: do nothing. + act: submit a POST request for the present URL with an empty authorization header. + assert: a 401 is returned. + """ + response = client.post("/present", headers={"AUTHORIZATION": ""}) + + assert response.status_code == 401 + + +@pytest.mark.django_db +def test_post_present_when_auth_header_invalid(client: Client): + """ + arrange: do nothing. + act: submit a POST request for the present URL with an invalid authorization header. + assert: a 401 is returned. + """ + auth_token = base64.b64encode(bytes("invalid:invalid", "utf-8")).decode("utf-8") + response = client.post("/present", headers={"AUTHORIZATION": f"Basic {auth_token}"}) + + assert response.status_code == 401 + + +@pytest.mark.django_db +def test_post_present_when_logged_in_and_no_fqdn(client: Client, user_auth_token: str, fqdn: str): + """ + arrange: log in a non-admin user. + act: submit a POST request for the present URL. + assert: a 403 is returned. + """ + value = secrets.token_hex() + response = client.post( + "/present", + data={"fqdn": f"{FQDN_PREFIX}{fqdn}", "value": value}, + format="json", + headers={"AUTHORIZATION": f"Basic {user_auth_token}"}, + ) + + assert response.status_code == 403 + + +@pytest.mark.django_db +def test_post_present_when_logged_in_and_no_permission( + client: Client, user_auth_token: str, domain: Domain +): + """ + arrange: log in a non-admin user and insert a domain in the database. + act: submit a POST request for the present URL. + assert: a 403 is returned. + """ + value = secrets.token_hex() + response = client.post( + "/present", + data={"fqdn": domain.fqdn, "value": value}, + format="json", + headers={"AUTHORIZATION": f"Basic {user_auth_token}"}, + ) + + assert response.status_code == 403 + + +@pytest.mark.django_db +def test_post_present_when_logged_in_and_permission( + client: Client, user_auth_token: str, domain_user_permission: DomainUserPermission +): + """ + arrange: mock the write_dns_recod method, log in a user and give him permissions on a FQDN. + act: submit a POST request for the present URL containing the fqdn above. + assert: a 204 is returned. + """ + with patch("httprequest_lego_provider.views.write_dns_record") as mocked_dns_write: + value = secrets.token_hex() + response = client.post( + "/present", + data={"fqdn": domain_user_permission.domain.fqdn, "value": value}, + format="json", + headers={"AUTHORIZATION": f"Basic {user_auth_token}"}, + ) + mocked_dns_write.assert_called_once_with(domain_user_permission.domain.fqdn, value) + + assert response.status_code == 204 + + +@pytest.mark.django_db +def test_post_present_when_logged_in_and_permission_with_trailing_dor( + client: Client, user_auth_token: str, domain_user_permission: DomainUserPermission +): + """ + arrange: mock the write_dns_recod method, log in a user and give him permissions on a FQDN. + act: submit a POST request for the present URL containing the fqdn above. + assert: a 204 is returned. + """ + with patch("httprequest_lego_provider.views.write_dns_record") as mocked_dns_write: + value = secrets.token_hex() + response = client.post( + "/present", + data={"fqdn": f"{domain_user_permission.domain.fqdn}.", "value": value}, + format="json", + headers={"AUTHORIZATION": f"Basic {user_auth_token}"}, + ) + mocked_dns_write.assert_called_once_with(domain_user_permission.domain.fqdn, value) + + assert response.status_code == 204 + + +@pytest.mark.django_db +def test_post_present_when_logged_in_and_fqdn_invalid(client: Client, user_auth_token: str): + """ + arrange: mock the write_dns_recod method and log in a user. + act: submit a POST request for the present URL containing an invalid FQDN. + assert: a 400 is returned. + """ + with patch("httprequest_lego_provider.views.write_dns_record"): + value = secrets.token_hex() + response = client.post( + "/present", + data={"fqdn": "example.com", "value": value}, + format="json", + headers={"AUTHORIZATION": f"Basic {user_auth_token}"}, + ) + + assert response.status_code == 400 + + +@pytest.mark.django_db +def test_get_present_when_logged_in(client: Client, user_auth_token: str): + """ + arrange: log in a non-admin user. + act: submit a GET request for the present URL. + assert: a 405 is returned. + """ + response = client.get("/present", headers={"AUTHORIZATION": f"Basic {user_auth_token}"}) + + assert response.status_code == 405 + + +@pytest.mark.django_db +def test_post_cleanup_when_not_logged_in(client: Client): + """ + arrange: do nothing. + act: submit a POST request for the cleanup URL. + assert: a 401 is returned. + """ + response = client.post("/cleanup") + + assert response.status_code == 401 + + +@pytest.mark.django_db +def test_post_cleanup_when_logged_in_and_no_fqdn(client: Client, user_auth_token: str): + """ + arrange: log in a non-admin user. + act: submit a POST request for the cleanup URL. + assert: a 403 is returned. + """ + value = secrets.token_hex() + response = client.post( + "/cleanup", + data={"fqdn": f"{FQDN_PREFIX}example.com", "value": value}, + format="json", + headers={"AUTHORIZATION": f"Basic {user_auth_token}"}, + ) + + assert response.status_code == 403 + + +@pytest.mark.django_db +def test_post_cleanup_when_logged_in_and_no_permission( + client: Client, user_auth_token: str, domain: Domain +): + """ + arrange: log in a non-admin user. + act: submit a POST request for the cleanup URL. + assert: a 403 is returned. + """ + value = secrets.token_hex() + response = client.post( + "/cleanup", + data={"fqdn": domain.fqdn, "value": value}, + format="json", + headers={"AUTHORIZATION": f"Basic {user_auth_token}"}, + ) + + assert response.status_code == 403 + + +@pytest.mark.django_db +def test_post_cleanup_when_logged_in_and_permission( + client: Client, user_auth_token: str, domain_user_permission: DomainUserPermission +): + """ + arrange: mock the dns module, log in a user and give him permissions on a FQDN. + act: submit a POST request for the cleanup URL containing the fqdn above. + assert: a 200 is returned. + """ + with patch("httprequest_lego_provider.views.remove_dns_record") as mocked_dns_remove: + value = secrets.token_hex() + response = client.post( + "/cleanup", + data={"fqdn": domain_user_permission.domain.fqdn, "value": value}, + format="json", + headers={"AUTHORIZATION": f"Basic {user_auth_token}"}, + ) + mocked_dns_remove.assert_called_once_with(domain_user_permission.domain.fqdn) + + assert response.status_code == 204 + + +@pytest.mark.django_db +def test_post_cleanup_when_logged_in_and_permission_with_trailing_dot( + client: Client, user_auth_token: str, domain_user_permission: DomainUserPermission +): + """ + arrange: mock the dns module, log in a user and give him permissions on a FQDN. + act: submit a POST request for the cleanup URL containing the fqdn above. + assert: a 200 is returned. + """ + with patch("httprequest_lego_provider.views.remove_dns_record") as mocked_dns_remove: + value = secrets.token_hex() + response = client.post( + "/cleanup", + data={"fqdn": f"{domain_user_permission.domain.fqdn}.", "value": value}, + format="json", + headers={"AUTHORIZATION": f"Basic {user_auth_token}"}, + ) + mocked_dns_remove.assert_called_once_with(domain_user_permission.domain.fqdn) + + assert response.status_code == 204 + + +@pytest.mark.django_db +def test_post_cleanup_when_logged_in_and_fqdn_invalid(client: Client, user_auth_token: str): + """ + arrange: mock the dns module and log in a user. + act: submit a POST request for the cleanup URL containing an invalid FQDN. + assert: a 400 is returned. + """ + with patch("httprequest_lego_provider.views.remove_dns_record"): + value = secrets.token_hex() + response = client.post( + "/cleanup", + data={"fqdn": "example.com", "value": value}, + format="json", + headers={"AUTHORIZATION": f"Basic {user_auth_token}"}, + ) + + assert response.status_code == 400 + + +@pytest.mark.django_db +def test_get_cleanup_when_logged_in(client: Client, user_auth_token: str): + """ + arrange: log in a non-admin user. + act: submit a GET request for the cleanup URL. + assert: a 405 is returned. + """ + response = client.get("/present", headers={"AUTHORIZATION": f"Basic {user_auth_token}"}) + + assert response.status_code == 405 + + +@pytest.mark.django_db +def test_test_jwt_token_login( + client: Client, username: str, user_password: str, domain_user_permission: DomainUserPermission +): + """ + arrange: mock the write_dns_recod method, log in a user and give him permissions on a FQDN. + act: submit a POST request for the present URL containing the fqdn above. + assert: a 204 is returned. + """ + response = client.post( + "/api/v1/auth/token/", + data={"username": username, "password": user_password}, + ) + token = json.loads(response.content)["access"] + + with patch("httprequest_lego_provider.views.write_dns_record"): + value = secrets.token_hex() + response = client.post( + "/present", + data={"fqdn": domain_user_permission.domain.fqdn, "value": value}, + format="json", + headers={"AUTHORIZATION": f"Bearer {token}"}, + ) + + assert response.status_code == 204 + + +@pytest.mark.django_db +def test_get_domain_when_logged_in_as_non_admin_user(client: Client, user_auth_token: str): + """ + arrange: log in a non-admin user. + act: submit a GET request for the domain URL. + assert: a 403 is returned + """ + response = client.get( + "/api/v1/domains/", + format="json", + headers={"AUTHORIZATION": f"Basic {user_auth_token}"}, + ) + + assert response.status_code == 403 + + +@pytest.mark.django_db +def test_get_domain_when_logged_in_as_admin_user( + client: Client, admin_user_auth_token: str, domains: list +): + """ + arrange: log in an admin user. + act: submit a GET request for the domain URL. + assert: a 200 is returned and the domains are all returned. + """ + assert len(Domain.objects.all()) != 0 + response = client.get( + "/api/v1/domains/", + format="json", + headers={"AUTHORIZATION": f"Basic {admin_user_auth_token}"}, + ) + json = response.json() + + assert response.status_code == 200 + assert len(json) == len(domains) + + +@pytest.mark.django_db +def test_get_domain_with_fqdn_filter(client: Client, admin_user_auth_token: str, domains: list): + """ + arrange: log in an admin user. + act: submit a GET request for the domain URL. + assert: a 200 is returned and the domain matching FQDN is returned. + """ + response = client.get( + "/api/v1/domains/?fqdn=example2.com", + format="json", + headers={"AUTHORIZATION": f"Basic {admin_user_auth_token}"}, + ) + json = response.json() + + assert response.status_code == 200 + assert len(json) == 1 + assert json[0]["fqdn"] == f"{FQDN_PREFIX}example2.com" + + +@pytest.mark.django_db +def test_post_domain_when_logged_in_as_non_admin_user(client: Client, user_auth_token: str): + """ + arrange: log in a non-admin user. + act: submit a POST request for the domain URL. + assert: a 403 is returned and the domain is not inserted in the database. + """ + response = client.post( + "/api/v1/domains/", + data={"fqdn": "example.com"}, + format="json", + headers={"AUTHORIZATION": f"Basic {user_auth_token}"}, + ) + + with pytest.raises(Domain.DoesNotExist): + Domain.objects.get(fqdn="example.com") + assert response.status_code == 403 + + +@pytest.mark.django_db +def test_post_domain_when_logged_in_as_admin_user(client: Client, admin_user_auth_token: str): + """ + arrange: log in an admin user. + act: submit a POST request for the domain URL. + assert: a 201 is returned and the domain is inserted in the database. + """ + response = client.post( + "/api/v1/domains/", + data={"fqdn": "example.com"}, + format="json", + headers={"AUTHORIZATION": f"Basic {admin_user_auth_token}"}, + ) + + assert Domain.objects.get(fqdn=f"{FQDN_PREFIX}example.com") is not None + assert response.status_code == 201 + + +@pytest.mark.django_db +def test_post_domain_when_logged_in_as_admin_user_and_domain_invalid( + client: Client, admin_user_auth_token: str +): + """ + arrange: log in a admin user. + act: submit a POST request with an invalid value for the domain URL. + assert: a 400 is returned. + """ + response = client.post( + "/api/v1/domains/", + data={"fqdn": "invalid-value"}, + format="json", + headers={"AUTHORIZATION": f"Basic {admin_user_auth_token}"}, + ) + + with pytest.raises(Domain.DoesNotExist): + Domain.objects.get(fqdn="invalid-value") + assert response.status_code == 400 + + +@pytest.mark.django_db +def test_get_domain_user_permission_when_logged_in_as_non_admin_user( + client: Client, user_auth_token: str, domain: Domain, user: User +): + """ + arrange: log in a non-admin user. + act: submit a GET request for the domain user permission URL. + assert: a 403 is returned + """ + response = client.get( + "/api/v1/domain-user-permissions/", + format="json", + headers={"AUTHORIZATION": f"Basic {user_auth_token}"}, + ) + + assert response.status_code == 403 + + +@pytest.mark.django_db +def test_get_domain_user_permission_when_logged_in_as_admin_user( + client: Client, + admin_user_auth_token: str, + user: User, + domain_user_permissions: list, +): + """ + arrange: log in an admin user. + act: submit a GET request for the domain user permission URL for a existing domain. + assert: a 200 is returned, the json result does not contain unwanted domain-user-permissions. + """ + assert len(DomainUserPermission.objects.all()) != 0 + response = client.get( + "/api/v1/domain-user-permissions/", + headers={"AUTHORIZATION": f"Basic {admin_user_auth_token}"}, + ) + json = response.json() + + assert response.status_code == 200 + assert len(json) == len(DomainUserPermission.objects.all()) + + +@pytest.mark.django_db +def test_get_domain_user_permission_with_filters( + client: Client, + admin_user_auth_token: str, + user: User, + domain_user_permissions: list, +): + """ + arrange: log in an admin user. + act: submit a GET request for the domain user permission URL for a existing domain. + assert: a 200 is returned, the json result does not contain unwanted domain-user-permissions. + """ + assert len(DomainUserPermission.objects.filter()) != 0 + response = client.get( + "/api/v1/domain-user-permissions/", + data={"fqdn": "example2.com", "username": user.username}, + headers={"AUTHORIZATION": f"Basic {admin_user_auth_token}"}, + ) + json = response.json() + + assert response.status_code == 200 + assert len(json) > 0 + + for entry in json: + assert entry["domain"] == Domain.objects.get(fqdn=f"{FQDN_PREFIX}example2.com").id + assert entry["user"] == User.objects.get(username=user.username).id + + +@pytest.mark.django_db +def test_post_domain_user_permission_when_logged_in_as_non_admin_user( + client: Client, user_auth_token: str, domain: Domain, user: User +): + """ + arrange: log in a non-admin user. + act: submit a POST request for the domain user permission URL. + assert: a 403 is returned and the domain is not inserted in the database. + """ + response = client.post( + "/api/v1/domain-user-permissions/", + data={"domain": domain.id, "user": user.id, "text": "whatever"}, + format="json", + headers={"AUTHORIZATION": f"Basic {user_auth_token}"}, + ) + + assert not DomainUserPermission.objects.filter(user=user, domain=domain) + assert response.status_code == 403 + + +@pytest.mark.django_db +def test_post_domain_user_permission_with_invalid_domain_when_logged_in_as_admin_user( + client: Client, admin_user_auth_token: str, user: User +): + """ + arrange: log in an admin user. + act: submit a POST request for the domain user permission URL for a non existing domain. + assert: a 400 is returned and the domain is not inserted in the database. + """ + response = client.post( + "/api/v1/domain-user-permissions/", + data={"domain": 1, "user": user.id, "text": "whatever"}, + headers={"AUTHORIZATION": f"Basic {admin_user_auth_token}"}, + ) + + assert not DomainUserPermission.objects.filter(user=user, domain=1) + assert response.status_code == 400 + + +@pytest.mark.django_db +def test_post_domain_user_permission_with_invalid_user_when_logged_in_as_admin_user( + client: Client, admin_user_auth_token: str, domain: Domain +): + """ + arrange: log in an admin user. + act: submit a POST request for the domain user permission URL for a non existing user. + assert: a 400 is returned and the domain is not inserted in the database. + """ + response = client.post( + "/api/v1/domain-user-permissions/", + data={"domain": domain.id, "user": 99, "text": "whatever"}, + headers={"AUTHORIZATION": f"Basic {admin_user_auth_token}"}, + ) + + assert not DomainUserPermission.objects.filter(user=99, domain=domain) + assert response.status_code == 400 + + +@pytest.mark.django_db +def test_post_domain_user_permission_when_logged_in_as_admin_user( + client: Client, admin_user_auth_token: str, user: User, domain: Domain +): + """ + arrange: log in an admin user. + act: submit a POST request for the domain user permission URL for a existing domain. + assert: a 201 is returned and the domain user permission is inserted in the database. + """ + response = client.post( + "/api/v1/domain-user-permissions/", + data={"domain": domain.id, "user": user.id, "text": "whatever"}, + headers={"AUTHORIZATION": f"Basic {admin_user_auth_token}"}, + ) + + assert DomainUserPermission.objects.filter(user=99, domain=domain) is not None + assert response.status_code == 201 + + +@pytest.mark.django_db +def test_get_user_when_logged_in_as_non_admin_user(client: Client, user_auth_token: str): + """ + arrange: log in a non-admin user. + act: submit a GET request for the user URL. + assert: a 403 is returned. + """ + response = client.get( + "/api/v1/users/", + format="json", + headers={"AUTHORIZATION": f"Basic {user_auth_token}"}, + ) + + assert response.status_code == 403 + + +@pytest.mark.django_db +def test_get_user_when_logged_in_as_admin_user( + client: Client, admin_user_auth_token: str, user: User +): + """ + arrange: log in an admin user. + act: submit a GET request for the user URL. + assert: a 200 is returned and the json result does not contain passwords. + """ + response = client.get( + "/api/v1/users/", + format="json", + headers={"AUTHORIZATION": f"Basic {admin_user_auth_token}"}, + ) + json = response.json() + + assert len(User.objects.all()) > 0 + assert response.status_code == 200 + assert len(json) == len(User.objects.all()) + for entry in json: + assert "password" not in entry + + +@pytest.mark.django_db +def test_get_user_with_username_filter(client: Client, admin_user_auth_token: str, user: User): + """ + arrange: log in an admin user. + act: submit a GET request for the user URL. + assert: a 200 is returned and the json result matches the requested username. + """ + response = client.get( + "/api/v1/users/", + data={"username": user.username}, + format="json", + headers={"AUTHORIZATION": f"Basic {admin_user_auth_token}"}, + ) + json = response.json() + + assert len(User.objects.all()) > 0 + assert response.status_code == 200 + assert len(json) == 1 + for entry in json: + assert entry["username"] == user.username + + +@pytest.mark.django_db +def test_post_user_when_logged_in_as_non_admin_user(client: Client, user_auth_token: str): + """ + arrange: log in a non-admin user. + act: submit a POST request for the user URL. + assert: a 403 is returned and the user is not inserted in the database. + """ + response = client.post( + "/api/v1/users/", + data={"username": "non-existing-user"}, + format="json", + headers={"AUTHORIZATION": f"Basic {user_auth_token}"}, + ) + + assert response.status_code == 403 + with pytest.raises(User.DoesNotExist): + User.objects.get(username="non-existing-user") + + +@pytest.mark.django_db +def test_post_user_when_logged_in_as_admin_user(client: Client, admin_user_auth_token: str): + """ + arrange: log in an admin user. + act: submit a POST request for the user URL. + assert: a 201 is returned and the user is inserted in the database. + """ + response = client.post( + "/api/v1/users/", + data={"username": "new-user", "password": "test!pw"}, + format="json", + headers={"AUTHORIZATION": f"Basic {admin_user_auth_token}"}, + ) + + assert response.status_code == 201 + newu = User.objects.get(username="new-user") + assert newu is not None + assert check_password("test!pw", newu.password) is True + + +@pytest.mark.django_db +def test_post_user_when_logged_in_as_admin_user_and_user_invalid( + client: Client, admin_user_auth_token: str +): + """ + arrange: log in a admin user. + act: submit a POST request with an invalid value for the user URL. + assert: a 400 is returned. + """ + existing = User.objects.all()[0] + response = client.post( + "/api/v1/users/", + data={"username": existing.username}, + format="json", + headers={"AUTHORIZATION": f"Basic {admin_user_auth_token}"}, + ) + + assert response.status_code == 400 diff --git a/httprequest_lego_provider/urls.py b/httprequest_lego_provider/urls.py new file mode 100644 index 0000000..5c6d4b8 --- /dev/null +++ b/httprequest_lego_provider/urls.py @@ -0,0 +1,20 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. +"""Urls.""" + +from django.urls import include, path +from rest_framework.routers import DefaultRouter + +from . import views + +router = DefaultRouter() +router.register("domains", views.DomainViewSet) +router.register("domain-user-permissions", views.DomainUserPermissionViewSet) +router.register("users", views.UserViewSet) + +urlpatterns = [ + path("cleanup", views.handle_cleanup, name="cleanup"), + path("present", views.handle_present, name="present"), + path("api/v1/accounts/", include("django.contrib.auth.urls")), + path("api/v1/", include(router.urls)), +] diff --git a/httprequest_lego_provider/views.py b/httprequest_lego_provider/views.py new file mode 100644 index 0000000..6768de3 --- /dev/null +++ b/httprequest_lego_provider/views.py @@ -0,0 +1,159 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. +"""Views.""" + +# Disable too-many-ancestors rule since we can't control inheritance for the ViewSets. +# pylint:disable=too-many-ancestors + +from typing import Optional + +# imported-auth-user has to be disabled as the import is needed for UserViewSet +# pylint:disable=imported-auth-user +from django.contrib.auth.models import User +from django.http import HttpRequest, HttpResponse +from rest_framework import viewsets +from rest_framework.decorators import api_view +from rest_framework.permissions import IsAdminUser + +from .dns import remove_dns_record, write_dns_record +from .forms import FQDN_PREFIX, CleanupForm, PresentForm +from .models import Domain, DomainUserPermission +from .serializers import DomainSerializer, DomainUserPermissionSerializer, UserSerializer + + +@api_view(["POST"]) +def handle_present(request: HttpRequest) -> Optional[HttpResponse]: + """Handle the submissing of the present form. + + Args: + request: the HTTP request. + + Returns: + an HTTP response. + """ + form = PresentForm(request.data) + if not form.is_valid(): + return HttpResponse(content=form.errors.as_json(), status=400) + user = request.user + fqdn = form.cleaned_data["fqdn"] + try: + domain = Domain.objects.get(fqdn=fqdn) + value = form.cleaned_data["value"] + if DomainUserPermission.objects.filter(user=user, domain=domain): + write_dns_record(domain.fqdn, value) + return HttpResponse(status=204) + except Domain.DoesNotExist: + pass + return HttpResponse( + status=403, + content=f"The user {user} does not have permission to manage {fqdn}", + ) + + +@api_view(["POST"]) +def handle_cleanup(request: HttpRequest) -> Optional[HttpResponse]: + """Handle the submissing of the cleanup form. + + Args: + request: the HTTP request. + + Returns: + an HTTP response. + """ + form = CleanupForm(request.data) + if not form.is_valid(): + return HttpResponse(content=form.errors.as_json(), status=400) + user = request.user + fqdn = form.cleaned_data["fqdn"] + try: + domain = Domain.objects.get(fqdn=fqdn) + if DomainUserPermission.objects.filter(user=user, domain=domain): + remove_dns_record(domain.fqdn) + return HttpResponse(status=204) + except Domain.DoesNotExist: + pass + return HttpResponse( + status=403, + content=f"The user {user} does not have permission to manage {fqdn}", + ) + + +class DomainViewSet(viewsets.ModelViewSet): + """Views for the Domain. + + Attributes: + queryset: query for the objects in the model. + serializer_class: class used for serialization. + permission_classes: list of classes to match permissions. + """ + + queryset = Domain.objects.all() + serializer_class = DomainSerializer + permission_classes = [IsAdminUser] + + def get_queryset(self): + """Optionally restricts the returned object list to a given domain. + + Returns: + a filtered queryset against a `fqdn` query parameter in the URL. + """ + queryset = self.queryset + fqdn = self.request.query_params.get("fqdn") + if fqdn is not None: + queryset = queryset.filter(fqdn=f"{FQDN_PREFIX}{fqdn}") + return queryset + + +class DomainUserPermissionViewSet(viewsets.ModelViewSet): + """Views for the DomainUserPermission. + + Attributes: + queryset: query for the objects in the model. + serializer_class: class used for serialization. + permission_classes: list of classes to match permissions. + """ + + queryset = DomainUserPermission.objects.all() + serializer_class = DomainUserPermissionSerializer + permission_classes = [IsAdminUser] + + def get_queryset(self): + """Optionally restricts the returned object list to a given user/domain. + + Returns: + A filtered queryset against `username` / `fqdn` query parameters in the URL. + """ + queryset = self.queryset + username = self.request.query_params.get("username") + if username is not None: + queryset = queryset.filter(user__username=username) + fqdn = self.request.query_params.get("fqdn") + if fqdn is not None: + queryset = queryset.filter(domain__fqdn=f"{FQDN_PREFIX}{fqdn}") + return queryset + + +class UserViewSet(viewsets.ModelViewSet): + """Views for the User. + + Attributes: + queryset: query for the objects in the model. + serializer_class: class used for serialization. + permission_classes: list of classes to match permissions. + """ + + queryset = User.objects.all().order_by("-date_joined") + serializer_class = UserSerializer + permission_classes = [IsAdminUser] + + def get_queryset(self): + """Optionally restricts the returned object list to a given user. + + Returns: + A filtered queryset against a `username` query parameter in the URL. + """ + queryset = self.queryset + username = self.request.query_params.get("username") + if username is not None: + queryset = queryset.filter(username=username) + return queryset diff --git a/manage.py b/manage.py new file mode 100755 index 0000000..1a64b14 --- /dev/null +++ b/manage.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python +"""Django's command-line utility for administrative tasks.""" +import os +import sys + + +def main(): + """Run administrative tasks.""" + os.environ.setdefault("DJANGO_SETTINGS_MODULE", "app.settings") + try: + from django.core.management import execute_from_command_line + except ImportError as exc: + raise ImportError( + "Couldn't import Django. Are you sure it's installed and " + "available on your PYTHONPATH environment variable? Did you " + "forget to activate a virtual environment?" + ) from exc + execute_from_command_line(sys.argv) + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..faefb8e --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,82 @@ +[tool.bandit] +exclude_dirs = ["/venv/"] +[tool.bandit.assert_used] +skips = ["*/*test.py", "*/test_*.py", "*tests/*.py"] + +# Testing tools configuration +[tool.coverage.run] +branch = true + +# Formatting tools configuration +[tool.black] +line-length = 99 +target-version = ["py38"] +exclude = "httprequest_lego_provider/migrations" + +[tool.coverage.report] +fail_under = 100 +show_missing = true +omit = ["httprequest_lego_provider/settings.py"] + +# Linting tools configuration +[tool.flake8] +max-line-length = 99 +max-doc-length = 99 +max-complexity = 10 +exclude = [".git", "__pycache__", ".tox", "build", "dist", "*.egg_info", "venv", "httprequest_lego_provider/migrations"] +select = ["E", "W", "F", "C", "N", "R", "D", "H"] +# Ignore W503, E501 because using black creates errors with this +# Ignore D107 Missing docstring in __init__ +ignore = ["W503", "E501", "D107"] +# D100, D101, D102, D103: Ignore missing docstrings in tests +per-file-ignores = ["tests/*:D100,D101,D102,D103,D104,D205,D212,D415","httprequest_lego_provider/tests/*:D100,D101,D102,D103,D104,D205,D212,D415"] +docstring-convention = "google" + +[tool.isort] +line_length = 99 +profile = "black" +skip = ["httprequest_lego_provider/migrations"] + +[tool.mypy] +ignore_missing_imports = true +explicit_package_bases = true +namespace_packages = true + +[tool.pylint] +disable = "wrong-import-order" +ignore = "httprequest_lego_provider/migrations" + +[tool.pytest.ini_options] +minversion = "6.0" +log_cli_level = "INFO" +DJANGO_SETTINGS_MODULE = "httprequest_lego_provider.tests.settings" + +# Linting tools configuration +[tool.ruff] +line-length = 99 +select = ["E", "W", "F", "C", "N", "D", "I001"] +extend-ignore = [ + "D203", + "D204", + "D213", + "D215", + "D400", + "D404", + "D406", + "D407", + "D408", + "D409", + "D413", +] +ignore = ["E501", "D107"] +extend-exclude = ["__pycache__", "*.egg_info"] +per-file-ignores = {"tests/*" = ["D100","D101","D102","D103","D104"],"httprequest_lego_provider/tests/*" = ["D100","D101","D102","D103","D104"]} + +[tool.ruff.mccabe] +max-complexity = 10 + +[tool.codespell] +skip = "build,lib,venv,icon.svg,.tox,.git,.mypy_cache,.ruff_cache,.coverage" + +[tool.pydocstyle] +match_dir = "match-dir=^(?!httprequest_lego_provider/migrations)" diff --git a/renovate.json b/renovate.json new file mode 100644 index 0000000..e6c587e --- /dev/null +++ b/renovate.json @@ -0,0 +1,32 @@ +{ + "$schema": "https://docs.renovatebot.com/renovate-schema.json", + "extends": [ + "config:base" + ], + "regexManagers": [ + { + "fileMatch": ["(^|/)rockcraft.yaml$"], + "description": "Update base image references", + "matchStringsStrategy": "any", + "matchStrings": ["# renovate: build-base:\\s+(?[^:]*):(?[^\\s@]*)(@(?sha256:[0-9a-f]*))?", + "# renovate: base:\\s+(?[^:]*):(?[^\\s@]*)(@(?sha256:[0-9a-f]*))?"], + "datasourceTemplate": "docker", + "versioningTemplate": "ubuntu" + } + ], + "packageRules": [ + { + "enabled": true, + "matchDatasources": [ + "docker" + ], + "pinDigests": true + }, + { + "matchFiles": ["rockcraft.yaml"], + "matchUpdateTypes": ["major", "minor", "patch"], + "enabled": false + } + ] +} + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..dfa17da --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +Django==5.1.4 +djangorestframework==3.15.2 +djangorestframework-simplejwt==5.3.1 +GitPython==3.1.43 +psycopg2-binary==2.9.10 +tzdata==2024.2 \ No newline at end of file diff --git a/rockcraft.yaml b/rockcraft.yaml new file mode 100644 index 0000000..050f861 --- /dev/null +++ b/rockcraft.yaml @@ -0,0 +1,44 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +name: httprequest-lego-provider +base: ubuntu@22.04 +version: "0.1" +summary: A Django application implementing HTTPRequest Lego Provider +description: | + A Django application implementing [HTTPRequest Lego Provider] + (https://go-acme.github.io/lego/dns/httpreq/) +license: Apache-2.0 +platforms: + amd64: + +# To ensure the django-framework extension works properly, your Django application +# should have an `wsgi.py` file with an `application` object as the WSGI entrypoint. +extensions: + - django-framework + +services: + django: + command: "/bin/python3 -m gunicorn -c /django/gunicorn.conf.py app.wsgi:application" + +parts: + # Exclude pyproject.yaml so that the python plugin doesn't handle this project as a package + django-framework/dependencies: + stage-packages: + - git + - openssh-client + override-build: | + rm -f pyproject.toml + craftctl default + django-framework/install-app: + plugin: dump + source: . + organize: + '*': django/app/ + .*: django/app/ + prime: + - "django/app/httprequest_lego_provider" + - "django/app/app" + - "django/app/LICENSE" + - "django/app/manage.py" + - "django/app/requirements.txt" diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..d2a1a30 --- /dev/null +++ b/tox.ini @@ -0,0 +1,116 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +[tox] +skipsdist=True +skip_missing_interpreters = True +envlist = lint, unit, static, coverage-report + +[vars] +src_path = {toxinidir}/httprequest_lego_provider/ +tst_path = {toxinidir}/httprequest_lego_provider/tests/ +charm_src_path = {toxinidir}/charm/src +charm_tst_path = {toxinidir}/charm/src +;lib_path = {toxinidir}/lib/charms/operator_name_with_underscores +all_src_path = {[vars]src_path} {[vars]charm_src_path} +all_tst_path = {[vars]tst_path} {[vars]charm_tst_path} +all_django_path = {[vars]src_path} {[vars]tst_path} +all_path = {[vars]all_src_path} {[vars]all_tst_path} + +[testenv] +setenv = + PYTHONPATH = {toxinidir}:{toxinidir}/lib:{[vars]src_path}:{[vars]charm_src_path} + PYTHONBREAKPOINT=ipdb.set_trace + PY_COLORS=1 +passenv = + PYTHONPATH + CHARM_BUILD_DIR + MODEL_SETTINGS + +[testenv:fmt] +description = Apply coding style standards to code +deps = + black + isort +commands = + isort {[vars]all_django_path} + black {[vars]all_django_path} + +[testenv:lint] +description = Check code against coding style standards +deps = + black + codespell + flake8<6.0.0 + flake8-builtins + flake8-copyright<6.0.0 + flake8-docstrings>=1.6.0 + flake8-docstrings-complete>=1.0.3 + flake8-test-docs>=1.0 + isort + mypy + pep8-naming + pydocstyle>=2.10 + pylint + pylint-django + pyproject-flake8<6.0.0 + pytest + pytest-asyncio + pytest-operator + requests + types-PyYAML + types-requests + -r{toxinidir}/requirements.txt + -r{toxinidir}/charm/requirements.txt +commands = + pydocstyle {[vars]all_src_path} + # uncomment the following line if this charm owns a lib + # codespell {[vars]lib_path} + codespell {toxinidir} --skip {toxinidir}/.git --skip {toxinidir}/.tox \ + --skip {toxinidir}/build --skip {toxinidir}/lib --skip {toxinidir}/venv \ + --skip {toxinidir}/.mypy_cache --skip {toxinidir}/icon.svg + # pflake8 wrapper supports config from pyproject.toml + pflake8 {[vars]all_django_path} --ignore=W503 + isort --check-only --diff {[vars]all_src_path} + black --check --diff {[vars]all_src_path} + mypy {[vars]all_src_path} + pylint --load-plugins=pylint_django --disable=django-not-configured {[vars]all_src_path} + +[testenv:unit] +description = Run unit tests +deps = + coverage[toml] + pytest + pytest-django + -r{toxinidir}/requirements.txt +commands = + coverage run --source={[vars]src_path} --omit={[vars]tst_path}* \ + -m pytest -v --tb native -s --ignore=charm/tests {posargs} + +[testenv:integration] +description = Run integration tests (placeholder) +deps = + pytest + pytest-asyncio + pytest-operator + -r{toxinidir}/charm/requirements.txt +commands = + pytest -v --tb native \ + --log-cli-level=INFO -s {posargs} charm/tests/integration + +[testenv:coverage-report] +description = Create test coverage report +deps = + coverage[toml] + pytest + -r{toxinidir}/requirements.txt +commands = + coverage report -m + +[testenv:static] +description = Run static analysis tests +deps = + bandit[toml] + -r{toxinidir}/requirements.txt +commands = + bandit -c {toxinidir}/pyproject.toml -r {[vars]all_src_path} {[vars]all_tst_path} diff --git a/trivy.yaml b/trivy.yaml new file mode 100644 index 0000000..c895d69 --- /dev/null +++ b/trivy.yaml @@ -0,0 +1,3 @@ +timeout: 20m +scan: + offline-scan: true diff --git a/zap_rules.tsv b/zap_rules.tsv new file mode 100644 index 0000000..e69de29