From 26900cca29b7d606677edbd300de397301d64d10 Mon Sep 17 00:00:00 2001 From: keiranjprice101 <44777678+keiranjprice101@users.noreply.github.com> Date: Mon, 4 Mar 2024 11:19:36 +0000 Subject: [PATCH] Add more kopf handlers (#102) * Add delete and type hints and library and update action * Lint the operator * Fix doc spacing * Fix doc spacing * Include operator dependencies for inspection * Add pylint disable * Update containerfile * Update event handler * Rollback spec change * Update docstring --- .github/workflows/linting.yml | 4 +- README.md | 5 +- container/file_watcher_operator.D | 2 +- .../file_watcher_operator.py | 108 +++++++++++++++--- pyproject.toml | 10 +- 5 files changed, 109 insertions(+), 20 deletions(-) diff --git a/.github/workflows/linting.yml b/.github/workflows/linting.yml index bb26eeb..4139c80 100644 --- a/.github/workflows/linting.yml +++ b/.github/workflows/linting.yml @@ -22,7 +22,7 @@ jobs: python -m pip install .[code-inspection] - name: Run pylint - run: pylint file_watcher + run: pylint file_watcher file_watcher_operator/file_watcher_operator.py - name: Run MyPy - run: mypy --strict file_watcher + run: mypy --strict file_watcher file_watcher_operator diff --git a/README.md b/README.md index 41f864e..b095c3a 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,10 @@ There is a recovery attempt that can be made for the instrument, by checking if instrument by using some saved state in the Database. ## FileWatcherOperator -The point of the operator is to check for CustomResourceDefinition files that have been applied to the cluster. Examples can be found in the GitOps repository of what these should look like as part of the deployment for the file-watcher-operator. When a CRD is applied, this software should create a Deployment responsible for ensuring a file-watcher exists for the parameters in the CRD file. +The point of the operator is to check for CustomResourceDefinition files that have been applied to the cluster. +Examples can be found in the GitOps repository of what these should look like as part of the deployment for the +file-watcher-operator. When a CRD is applied, this software should create a Deployment responsible for ensuring a +file-watcher exists for the parameters in the CRD file. ## Docker diff --git a/container/file_watcher_operator.D b/container/file_watcher_operator.D index 258c55f..8b7fa54 100644 --- a/container/file_watcher_operator.D +++ b/container/file_watcher_operator.D @@ -1,4 +1,4 @@ -FROM python:3.10 +FROM python:3.12-slim RUN pip install kopf RUN pip install kubernetes diff --git a/file_watcher_operator/file_watcher_operator.py b/file_watcher_operator/file_watcher_operator.py index 0e92564..db32944 100644 --- a/file_watcher_operator/file_watcher_operator.py +++ b/file_watcher_operator/file_watcher_operator.py @@ -1,11 +1,17 @@ +""" +Filewatcher operator controls the deployments, PVs, and PVCs based on instrument CRDs +""" import logging import os import sys +from typing import Dict, Any, Mapping, Tuple, List, MutableMapping -import kopf as kopf -import kubernetes +import kopf +import kubernetes # type: ignore import yaml +# pylint: disable = (duplicate-code) +# This will be detected from the file watcher which is not the same application. stdout_handler = logging.StreamHandler(stream=sys.stdout) logging.basicConfig( handlers=[stdout_handler], @@ -13,9 +19,18 @@ level=logging.INFO, ) logger = logging.getLogger(__name__) +# pylint: enable = duplicate-code -def generate_deployment_body(spec, name): +def generate_deployment_body( + spec: Mapping[str, Any], name: str +) -> Tuple[MutableMapping[str, Any], MutableMapping[str, Any], MutableMapping[str, Any]]: + """ + Create and return a Kubernetes deployment yaml for each deployment + :param spec: The kopf spec + :param name: The instrument name + :return: Tuple of the mutable mappings containing the deployment specs + """ archive_dir = os.environ.get("ARCHIVE_DIR", "/archive") queue_host = os.environ.get("QUEUE_HOST", "rabbitmq-cluster.rabbitmq.svc.cluster.local") queue_name = os.environ.get("EGRESS_QUEUE_NAME", "watched-files") @@ -144,16 +159,32 @@ def generate_deployment_body(spec, name): return deployment_spec, pvc_spec, pv_spec -def deploy_deployment(deployment_spec, name, children): +def deploy_deployment(deployment_spec: Mapping[str, Any], name: str, children: List[Any]) -> None: + """ + Given a deployment spec, name, and operators children, create the namespaced deployment and add it's uid to the + children + :param deployment_spec: The deployment spec + :param name: The name of the spec + :param children: The operators children + :return: None + """ app_api = kubernetes.client.AppsV1Api() - logger.info(f"Starting deployment of: {name} filewatcher") + logger.info("Starting deployment of: %s filewatcher", name) namespace = os.environ.get("FILEWATCHER_NAMESPACE", "ir") depl = app_api.create_namespaced_deployment(namespace=namespace, body=deployment_spec) children.append(depl.metadata.uid) - logger.info(f"Deployed: {name} filewatcher") + logger.info("Deployed: %s filewatcher", name) -def deploy_pvc(pvc_spec, name, children): +def deploy_pvc(pvc_spec: Mapping[str, Any], name: str, children: List[Any]) -> None: + """ + Given a pvc spec, name, and the operators children, create the namespaced persistent volume claim and add its uid + to the operators children + :param pvc_spec: The pvc spec + :param name: The name of the pvc + :param children: The operators children + :return: None + """ namespace = os.environ.get("FILEWATCHER_NAMESPACE", "ir") core_api = kubernetes.client.CoreV1Api() # Check if PVC exists else deploy a new one: @@ -161,35 +192,82 @@ def deploy_pvc(pvc_spec, name, children): ii.metadata.name for ii in core_api.list_namespaced_persistent_volume_claim(pvc_spec["metadata"]["namespace"]).items ]: - logger.info(f"Starting deployment of PVC: {name} filewatcher") + logger.info("Starting deployment of PVC: %s filewatcher", name) pvc = core_api.create_namespaced_persistent_volume_claim(namespace=namespace, body=pvc_spec) children.append(pvc.metadata.uid) - logger.info(f"Deployed PVC: {name} filewatcher") + logger.info("Deployed PVC: %s filewatcher", name) -def deploy_pv(pv_spec, name, children): +def deploy_pv(pv_spec: Mapping[str, Any], name: str, children: List[Any]) -> None: + """ + Given a pvc spec, name, and the operators children, create the namespaced persistent volume and add its uid + to the operators children + :param pv_spec: The pv spec + :param name: The name of the pv + :param children: The operators children + :return: None + """ core_api = kubernetes.client.CoreV1Api() # Check if PV exists else deploy a new one if pv_spec["metadata"]["name"] not in [ii.metadata.name for ii in core_api.list_persistent_volume().items]: - logger.info(f"Starting deployment of PV: {name} filewatcher") + logger.info("Starting deployment of PV: %s filewatcher", name) pv = core_api.create_persistent_volume(body=pv_spec) children.append(pv.metadata.uid) - logger.info(f"Deployed PV: {name} filewatcher") + logger.info("Deployed PV: %s filewatcher", name) @kopf.on.create("ir.com", "v1", "filewatchers") -def create_fn(spec, **kwargs): +def create_fn(spec: Any, **kwargs: Any) -> Dict[str, List[Any]]: + """ + Kopf create event handler, generates all 3 specs then creates them in the cluster, while creating the children and + adopting the deployment and pvc + :param spec: Spec of the CRD intercepted by kopf + :param kwargs: KWARGS + :return: None + """ name = kwargs["body"]["metadata"]["name"] - logger.info(f"Name is {name}") + logger.info("Name is %s", name) deployment_spec, pvc_spec, pv_spec = generate_deployment_body(spec, name) # Make the deployment the child of this operator kopf.adopt(deployment_spec) kopf.adopt(pvc_spec) - children = [] + children: List[Any] = [] deploy_pv(pv_spec, name, children) deploy_pvc(pvc_spec, name, children) deploy_deployment(deployment_spec, name, children) # Update controller's status with child deployment return {"children": children} + + +@kopf.on.delete("ir.com", "v1", "filewatchers") +def delete_func(**kwargs: Any) -> None: + """ + Kopf delete event handler. This will automatically delete the filewatcher deployment and pvc, and will manually + delete the persitent volume + :param kwargs: kwargs + :return: None + """ + name = kwargs["body"]["metadata"]["name"] + client = kubernetes.client.CoreV1Api() + client.delete_persistent_volume(name=f"{name}-file-watcher-pv") + + +@kopf.on.update("ir.com", "v1", "filewatchers") +def update_func(spec: Any, **kwargs: Any) -> None: + """ + kopf update event handler. This automatically updates the filewatcher deployment when the CRD changes + :param spec: the spec + :param kwargs: kwargs + :return: None + """ + name = kwargs["body"]["metadata"]["name"] + + namespace = kwargs["body"]["metadata"]["namespace"] + deployment_spec, _, __ = generate_deployment_body(spec, name) + app_api = kubernetes.client.AppsV1Api() + + app_api.patch_namespaced_deployment( + name=f"{name}-file-watcher-deployment", namespace=namespace, body=deployment_spec + ) diff --git a/pyproject.toml b/pyproject.toml index a5f8f3b..71fb8f3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,6 +15,12 @@ dependencies = [ file_watcher = "file_watcher.main:main" [project.optional-dependencies] +operator = [ + "kopf==1.37.1", + "kubernetes==29.0.0", + "PyYAML==6.0.1" +] + formatting = [ "black==23.10.1" ] @@ -28,7 +34,9 @@ test = [ code-inspection = [ "pylint==3.0.2", "mypy==1.6.0", - "file_watcher[test]" + "types-PyYAML==6.0.12.12", + "file_watcher[test]", + "file_watcher[operator]" ] dev = [