Skip to content

Commit

Permalink
Add more kopf handlers (#102)
Browse files Browse the repository at this point in the history
* Add delete and type hints and library and update action

* Lint the operator

* Fix doc spacing

* Fix doc spacing

* Include operator dependencies for inspection

* Add pylint disable

* Update containerfile

* Update event handler

* Rollback spec change

* Update docstring
  • Loading branch information
keiranjprice101 authored Mar 4, 2024
1 parent e074f74 commit 26900cc
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 20 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/linting.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
python -m pip install .[code-inspection]
- name: Run pylint
run: pylint file_watcher
run: pylint file_watcher file_watcher_operator/file_watcher_operator.py

- name: Run MyPy
run: mypy --strict file_watcher
run: mypy --strict file_watcher file_watcher_operator
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ There is a recovery attempt that can be made for the instrument, by checking if
instrument by using some saved state in the Database.

## FileWatcherOperator
The point of the operator is to check for CustomResourceDefinition files that have been applied to the cluster. Examples can be found in the GitOps repository of what these should look like as part of the deployment for the file-watcher-operator. When a CRD is applied, this software should create a Deployment responsible for ensuring a file-watcher exists for the parameters in the CRD file.
The point of the operator is to check for CustomResourceDefinition files that have been applied to the cluster.
Examples can be found in the GitOps repository of what these should look like as part of the deployment for the
file-watcher-operator. When a CRD is applied, this software should create a Deployment responsible for ensuring a
file-watcher exists for the parameters in the CRD file.

## Docker

Expand Down
2 changes: 1 addition & 1 deletion container/file_watcher_operator.D
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.10
FROM python:3.12-slim

RUN pip install kopf
RUN pip install kubernetes
Expand Down
108 changes: 93 additions & 15 deletions file_watcher_operator/file_watcher_operator.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,36 @@
"""
Filewatcher operator controls the deployments, PVs, and PVCs based on instrument CRDs
"""
import logging
import os
import sys
from typing import Dict, Any, Mapping, Tuple, List, MutableMapping

import kopf as kopf
import kubernetes
import kopf
import kubernetes # type: ignore
import yaml

# pylint: disable = (duplicate-code)
# This will be detected from the file watcher which is not the same application.
stdout_handler = logging.StreamHandler(stream=sys.stdout)
logging.basicConfig(
handlers=[stdout_handler],
format="[%(asctime)s]-%(name)s-%(levelname)s: %(message)s",
level=logging.INFO,
)
logger = logging.getLogger(__name__)
# pylint: enable = duplicate-code


def generate_deployment_body(spec, name):
def generate_deployment_body(
spec: Mapping[str, Any], name: str
) -> Tuple[MutableMapping[str, Any], MutableMapping[str, Any], MutableMapping[str, Any]]:
"""
Create and return a Kubernetes deployment yaml for each deployment
:param spec: The kopf spec
:param name: The instrument name
:return: Tuple of the mutable mappings containing the deployment specs
"""
archive_dir = os.environ.get("ARCHIVE_DIR", "/archive")
queue_host = os.environ.get("QUEUE_HOST", "rabbitmq-cluster.rabbitmq.svc.cluster.local")
queue_name = os.environ.get("EGRESS_QUEUE_NAME", "watched-files")
Expand Down Expand Up @@ -144,52 +159,115 @@ def generate_deployment_body(spec, name):
return deployment_spec, pvc_spec, pv_spec


def deploy_deployment(deployment_spec, name, children):
def deploy_deployment(deployment_spec: Mapping[str, Any], name: str, children: List[Any]) -> None:
"""
Given a deployment spec, name, and operators children, create the namespaced deployment and add it's uid to the
children
:param deployment_spec: The deployment spec
:param name: The name of the spec
:param children: The operators children
:return: None
"""
app_api = kubernetes.client.AppsV1Api()
logger.info(f"Starting deployment of: {name} filewatcher")
logger.info("Starting deployment of: %s filewatcher", name)
namespace = os.environ.get("FILEWATCHER_NAMESPACE", "ir")
depl = app_api.create_namespaced_deployment(namespace=namespace, body=deployment_spec)
children.append(depl.metadata.uid)
logger.info(f"Deployed: {name} filewatcher")
logger.info("Deployed: %s filewatcher", name)


def deploy_pvc(pvc_spec, name, children):
def deploy_pvc(pvc_spec: Mapping[str, Any], name: str, children: List[Any]) -> None:
"""
Given a pvc spec, name, and the operators children, create the namespaced persistent volume claim and add its uid
to the operators children
:param pvc_spec: The pvc spec
:param name: The name of the pvc
:param children: The operators children
:return: None
"""
namespace = os.environ.get("FILEWATCHER_NAMESPACE", "ir")
core_api = kubernetes.client.CoreV1Api()
# Check if PVC exists else deploy a new one:
if pvc_spec["metadata"]["name"] not in [
ii.metadata.name
for ii in core_api.list_namespaced_persistent_volume_claim(pvc_spec["metadata"]["namespace"]).items
]:
logger.info(f"Starting deployment of PVC: {name} filewatcher")
logger.info("Starting deployment of PVC: %s filewatcher", name)
pvc = core_api.create_namespaced_persistent_volume_claim(namespace=namespace, body=pvc_spec)
children.append(pvc.metadata.uid)
logger.info(f"Deployed PVC: {name} filewatcher")
logger.info("Deployed PVC: %s filewatcher", name)


def deploy_pv(pv_spec, name, children):
def deploy_pv(pv_spec: Mapping[str, Any], name: str, children: List[Any]) -> None:
"""
Given a pvc spec, name, and the operators children, create the namespaced persistent volume and add its uid
to the operators children
:param pv_spec: The pv spec
:param name: The name of the pv
:param children: The operators children
:return: None
"""
core_api = kubernetes.client.CoreV1Api()
# Check if PV exists else deploy a new one
if pv_spec["metadata"]["name"] not in [ii.metadata.name for ii in core_api.list_persistent_volume().items]:
logger.info(f"Starting deployment of PV: {name} filewatcher")
logger.info("Starting deployment of PV: %s filewatcher", name)
pv = core_api.create_persistent_volume(body=pv_spec)
children.append(pv.metadata.uid)
logger.info(f"Deployed PV: {name} filewatcher")
logger.info("Deployed PV: %s filewatcher", name)


@kopf.on.create("ir.com", "v1", "filewatchers")
def create_fn(spec, **kwargs):
def create_fn(spec: Any, **kwargs: Any) -> Dict[str, List[Any]]:
"""
Kopf create event handler, generates all 3 specs then creates them in the cluster, while creating the children and
adopting the deployment and pvc
:param spec: Spec of the CRD intercepted by kopf
:param kwargs: KWARGS
:return: None
"""
name = kwargs["body"]["metadata"]["name"]
logger.info(f"Name is {name}")
logger.info("Name is %s", name)

deployment_spec, pvc_spec, pv_spec = generate_deployment_body(spec, name)
# Make the deployment the child of this operator
kopf.adopt(deployment_spec)
kopf.adopt(pvc_spec)

children = []
children: List[Any] = []
deploy_pv(pv_spec, name, children)
deploy_pvc(pvc_spec, name, children)
deploy_deployment(deployment_spec, name, children)
# Update controller's status with child deployment
return {"children": children}


@kopf.on.delete("ir.com", "v1", "filewatchers")
def delete_func(**kwargs: Any) -> None:
"""
Kopf delete event handler. This will automatically delete the filewatcher deployment and pvc, and will manually
delete the persitent volume
:param kwargs: kwargs
:return: None
"""
name = kwargs["body"]["metadata"]["name"]
client = kubernetes.client.CoreV1Api()
client.delete_persistent_volume(name=f"{name}-file-watcher-pv")


@kopf.on.update("ir.com", "v1", "filewatchers")
def update_func(spec: Any, **kwargs: Any) -> None:
"""
kopf update event handler. This automatically updates the filewatcher deployment when the CRD changes
:param spec: the spec
:param kwargs: kwargs
:return: None
"""
name = kwargs["body"]["metadata"]["name"]

namespace = kwargs["body"]["metadata"]["namespace"]
deployment_spec, _, __ = generate_deployment_body(spec, name)
app_api = kubernetes.client.AppsV1Api()

app_api.patch_namespaced_deployment(
name=f"{name}-file-watcher-deployment", namespace=namespace, body=deployment_spec
)
10 changes: 9 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ dependencies = [
file_watcher = "file_watcher.main:main"

[project.optional-dependencies]
operator = [
"kopf==1.37.1",
"kubernetes==29.0.0",
"PyYAML==6.0.1"
]

formatting = [
"black==23.10.1"
]
Expand All @@ -28,7 +34,9 @@ test = [
code-inspection = [
"pylint==3.0.2",
"mypy==1.6.0",
"file_watcher[test]"
"types-PyYAML==6.0.12.12",
"file_watcher[test]",
"file_watcher[operator]"
]

dev = [
Expand Down

0 comments on commit 26900cc

Please sign in to comment.