diff --git a/CHANGES.rst b/CHANGES.rst index cca1df18..d78052c0 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -12,6 +12,8 @@ Unreleased * Set CORS annotations in ``grand-central`` ingress. +* Update ``-CProccessors`` on scale compute. + 2.42.0 (2024-10-02) ------------------- diff --git a/crate/operator/change_compute.py b/crate/operator/change_compute.py index 1c4925b4..8a094e5b 100644 --- a/crate/operator/change_compute.py +++ b/crate/operator/change_compute.py @@ -19,7 +19,7 @@ # with Crate these terms will supersede the license and you may use the # software solely pursuant to the terms of the relevant commercial agreement. import logging -from typing import Any +from typing import Any, List import kopf from kubernetes_asyncio.client import AppsV1Api @@ -33,6 +33,7 @@ from crate.operator.utils import crate from crate.operator.utils.k8s_api_client import GlobalApiClient from crate.operator.utils.kopf import StateBasedSubHandler +from crate.operator.utils.kubeapi import get_cratedb_resource from crate.operator.webhooks import ( WebhookChangeComputePayload, WebhookEvent, @@ -109,6 +110,40 @@ def generate_change_compute_payload(old, body): ) +async def update_cprocessor_crate_settings( + apps: AppsV1Api, + namespace: str, + sts_name: str, + processors: int, +) -> List[str]: + """ + Call the Kubernetes API, update the -Cprocessors value in the crate + container command, and return the updated command list. + + :param apps: An instance of the Kubernetes Apps V1 API. + :param namespace: The Kubernetes namespace for the CrateDB cluster. + :param sts_name: The name of the Kubernetes StatefulSet to update. + :param processors: The new number of processors. + :return: The updated command list. + """ + stateful_set = await apps.read_namespaced_stateful_set( + namespace=namespace, name=sts_name + ) + + for container in stateful_set.spec.template.spec.containers: + if container.name == "crate": + updated_command = [] + for cmd in container.command: + if cmd.startswith("-Cprocessors=") and processors is not None: + updated_command.append(f"-Cprocessors={processors}") + else: + updated_command.append(cmd) + container.command = updated_command + return container.command + + raise ValueError("Container 'crate' not found in the StatefulSet.") + + async def change_cluster_compute( apps: AppsV1Api, namespace: str, @@ -119,7 +154,7 @@ async def change_cluster_compute( """ Patches the statefulset with the new cpu and memory requests and limits. """ - body = generate_body_patch(name, compute_change_data, logger) + body = await generate_body_patch(apps, name, namespace, compute_change_data, logger) # Note only the stateful set is updated. Pods will become updated on restart sts_name = f"crate-data-hot-{name}" @@ -132,8 +167,10 @@ async def change_cluster_compute( pass -def generate_body_patch( +async def generate_body_patch( + apps: AppsV1Api, name: str, + namespace: str, compute_change_data: WebhookChangeComputePayload, logger: logging.Logger, ) -> dict: @@ -142,8 +179,30 @@ def generate_body_patch( That patch modifies cpu/memory requests/limits based on compute_change_data. It also patches affinity as needed based on the existence or not of requests data. """ + + crd = await get_cratedb_resource(namespace, name) + if crd is None: + raise ValueError(f"CRD {name} not found in namespace {namespace}") + + try: + sts_name = f"crate-data-{crd['spec']['nodes']['data'][0]['name']}-{name}" + except (KeyError, IndexError) as e: + logger.error(f"Failed to construct sts_name: {e}") + raise ValueError(f"Failed to construct sts_name: {e}") + + # sts_name = sts_name + f"-{name}" + + updated_command = await update_cprocessor_crate_settings( + apps=apps, + namespace=namespace, + sts_name=sts_name, + # sts_name=f"crate-data-hot-{name}", + processors=compute_change_data["new_cpu_limit"], + ) + node_spec = { "name": "crate", + "command": updated_command, "env": [ get_statefulset_env_crate_heap( memory=compute_change_data["new_memory_limit"], @@ -167,6 +226,7 @@ def generate_body_patch( }, }, } + body = { "spec": { "template": { diff --git a/tests/test_change_compute.py b/tests/test_change_compute.py index 998cc59e..dfc6dc1a 100644 --- a/tests/test_change_compute.py +++ b/tests/test_change_compute.py @@ -24,6 +24,7 @@ import pytest from kubernetes_asyncio.client import ( + AppsV1Api, CoreV1Api, CustomObjectsApi, V1NodeAffinity, @@ -280,6 +281,8 @@ async def test_change_compute_from_request_to_limit( ), "A success notification was expected but was not sent" +@pytest.mark.k8s +@pytest.mark.asyncio @pytest.mark.parametrize( "old_cpu_limit, old_memory_limit, old_cpu_request, old_memory_request, " "new_cpu_limit, new_memory_limit, new_cpu_request, new_memory_request, " @@ -291,7 +294,7 @@ async def test_change_compute_from_request_to_limit( (1, "2Gi", None, None, 3, "5Gi", 5, "8Gi", "shared", "shared"), ], ) -def test_generate_body_patch( +async def test_generate_body_patch( old_cpu_limit, old_memory_limit, old_cpu_request, @@ -303,6 +306,9 @@ def test_generate_body_patch( old_nodepool, new_nodepool, faker, + kopf_runner, + api_client, + namespace, ): compute_change_data = WebhookChangeComputePayload( old_cpu_limit=old_cpu_limit, @@ -319,13 +325,34 @@ def test_generate_body_patch( new_nodepool=new_nodepool, ) + coapi = CustomObjectsApi(api_client) + core = CoreV1Api(api_client) + apps = AppsV1Api(api_client) name = faker.domain_word() + + # Start a cluster with requests set to half the original limits + crate_resources = { + "limits": {"cpu": old_cpu_limit, "memory": old_memory_limit}, + "requests": {"cpu": old_cpu_request, "memory": old_memory_request}, + } + host, password = await start_cluster( + name, namespace, core, coapi, 1, resource_requests=crate_resources + ) + with mock.patch("crate.operator.create.config.TESTING", False): - body = generate_body_patch( - name, compute_change_data, logging.getLogger(__name__) + body = await generate_body_patch( + apps, + name, + namespace.metadata.name, + compute_change_data, + logging.getLogger(__name__), ) resources = body["spec"]["template"]["spec"]["containers"][0]["resources"] + command = body["spec"]["template"]["spec"]["containers"][0]["command"] + + assert f"-Cprocessors={new_cpu_limit}" in command + assert resources["limits"]["cpu"] == new_cpu_limit assert resources["limits"]["memory"] == new_memory_limit diff --git a/tests/utils.py b/tests/utils.py index 0b3d36c8..093cd8a4 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -156,10 +156,20 @@ async def start_cluster( } if resource_requests: - body["spec"]["nodes"]["data"][0]["resources"]["requests"] = { - "cpu": resource_requests["cpu"], - "memory": resource_requests["memory"], - } + if "limits" in resource_requests and "requests" in resource_requests: + body["spec"]["nodes"]["data"][0]["resources"]["limits"] = { + "cpu": resource_requests.get("limits", {}).get("cpu", "2"), + "memory": resource_requests.get("limits", {}).get("memory", "4Gi"), + } + body["spec"]["nodes"]["data"][0]["resources"]["requests"] = { + "cpu": resource_requests.get("requests", {}).get("cpu", "2"), + "memory": resource_requests.get("requests", {}).get("memory", "4Gi"), + } + else: + body["spec"]["nodes"]["data"][0]["resources"]["requests"] = { + "cpu": resource_requests.get("cpu", "2"), + "memory": resource_requests.get("memory", "4Gi"), + } if backups_spec: body["spec"]["backups"] = backups_spec