Skip to content

Commit

Permalink
Changes cProcessor setting on scale Compute
Browse files Browse the repository at this point in the history
  • Loading branch information
WalBeh committed Nov 28, 2024
1 parent 6da16cf commit 97c3ed1
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 10 deletions.
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ Unreleased

* Set CORS annotations in ``grand-central`` ingress.

* Update ``-CProccessors`` on scale compute.

2.42.0 (2024-10-02)
-------------------

Expand Down
66 changes: 63 additions & 3 deletions crate/operator/change_compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
# with Crate these terms will supersede the license and you may use the
# software solely pursuant to the terms of the relevant commercial agreement.
import logging
from typing import Any
from typing import Any, List

import kopf
from kubernetes_asyncio.client import AppsV1Api
Expand All @@ -33,6 +33,7 @@
from crate.operator.utils import crate
from crate.operator.utils.k8s_api_client import GlobalApiClient
from crate.operator.utils.kopf import StateBasedSubHandler
from crate.operator.utils.kubeapi import get_cratedb_resource
from crate.operator.webhooks import (
WebhookChangeComputePayload,
WebhookEvent,
Expand Down Expand Up @@ -109,6 +110,40 @@ def generate_change_compute_payload(old, body):
)


async def update_cprocessor_crate_settings(
apps: AppsV1Api,
namespace: str,
sts_name: str,
processors: int,
) -> List[str]:
"""
Call the Kubernetes API, update the -Cprocessors value in the crate
container command, and return the updated command list.
:param apps: An instance of the Kubernetes Apps V1 API.
:param namespace: The Kubernetes namespace for the CrateDB cluster.
:param sts_name: The name of the Kubernetes StatefulSet to update.
:param processors: The new number of processors.
:return: The updated command list.
"""
stateful_set = await apps.read_namespaced_stateful_set(
namespace=namespace, name=sts_name
)

for container in stateful_set.spec.template.spec.containers:
if container.name == "crate":
updated_command = []
for cmd in container.command:
if cmd.startswith("-Cprocessors=") and processors is not None:
updated_command.append(f"-Cprocessors={processors}")
else:
updated_command.append(cmd)
container.command = updated_command
return container.command

raise ValueError("Container 'crate' not found in the StatefulSet.")


async def change_cluster_compute(
apps: AppsV1Api,
namespace: str,
Expand All @@ -119,7 +154,7 @@ async def change_cluster_compute(
"""
Patches the statefulset with the new cpu and memory requests and limits.
"""
body = generate_body_patch(name, compute_change_data, logger)
body = await generate_body_patch(apps, name, namespace, compute_change_data, logger)

# Note only the stateful set is updated. Pods will become updated on restart
sts_name = f"crate-data-hot-{name}"
Expand All @@ -132,8 +167,10 @@ async def change_cluster_compute(
pass


def generate_body_patch(
async def generate_body_patch(
apps: AppsV1Api,
name: str,
namespace: str,
compute_change_data: WebhookChangeComputePayload,
logger: logging.Logger,
) -> dict:
Expand All @@ -142,8 +179,30 @@ def generate_body_patch(
That patch modifies cpu/memory requests/limits based on compute_change_data.
It also patches affinity as needed based on the existence or not of requests data.
"""

crd = await get_cratedb_resource(namespace, name)
if crd is None:
raise ValueError(f"CRD {name} not found in namespace {namespace}")

try:
sts_name = f"crate-data-{crd['spec']['nodes']['data'][0]['name']}-{name}"
except (KeyError, IndexError) as e:
logger.error(f"Failed to construct sts_name: {e}")
raise ValueError(f"Failed to construct sts_name: {e}")

# sts_name = sts_name + f"-{name}"

updated_command = await update_cprocessor_crate_settings(
apps=apps,
namespace=namespace,
sts_name=sts_name,
# sts_name=f"crate-data-hot-{name}",
processors=compute_change_data["new_cpu_limit"],
)

node_spec = {
"name": "crate",
"command": updated_command,
"env": [
get_statefulset_env_crate_heap(
memory=compute_change_data["new_memory_limit"],
Expand All @@ -167,6 +226,7 @@ def generate_body_patch(
},
},
}

body = {
"spec": {
"template": {
Expand Down
33 changes: 30 additions & 3 deletions tests/test_change_compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import pytest
from kubernetes_asyncio.client import (
AppsV1Api,
CoreV1Api,
CustomObjectsApi,
V1NodeAffinity,
Expand Down Expand Up @@ -280,6 +281,8 @@ async def test_change_compute_from_request_to_limit(
), "A success notification was expected but was not sent"


@pytest.mark.k8s
@pytest.mark.asyncio
@pytest.mark.parametrize(
"old_cpu_limit, old_memory_limit, old_cpu_request, old_memory_request, "
"new_cpu_limit, new_memory_limit, new_cpu_request, new_memory_request, "
Expand All @@ -291,7 +294,7 @@ async def test_change_compute_from_request_to_limit(
(1, "2Gi", None, None, 3, "5Gi", 5, "8Gi", "shared", "shared"),
],
)
def test_generate_body_patch(
async def test_generate_body_patch(
old_cpu_limit,
old_memory_limit,
old_cpu_request,
Expand All @@ -303,6 +306,9 @@ def test_generate_body_patch(
old_nodepool,
new_nodepool,
faker,
kopf_runner,
api_client,
namespace,
):
compute_change_data = WebhookChangeComputePayload(
old_cpu_limit=old_cpu_limit,
Expand All @@ -319,13 +325,34 @@ def test_generate_body_patch(
new_nodepool=new_nodepool,
)

coapi = CustomObjectsApi(api_client)
core = CoreV1Api(api_client)
apps = AppsV1Api(api_client)
name = faker.domain_word()

# Start a cluster with requests set to half the original limits
crate_resources = {
"limits": {"cpu": old_cpu_limit, "memory": old_memory_limit},
"requests": {"cpu": old_cpu_request, "memory": old_memory_request},
}
host, password = await start_cluster(
name, namespace, core, coapi, 1, resource_requests=crate_resources
)

with mock.patch("crate.operator.create.config.TESTING", False):
body = generate_body_patch(
name, compute_change_data, logging.getLogger(__name__)
body = await generate_body_patch(
apps,
name,
namespace.metadata.name,
compute_change_data,
logging.getLogger(__name__),
)

resources = body["spec"]["template"]["spec"]["containers"][0]["resources"]
command = body["spec"]["template"]["spec"]["containers"][0]["command"]

assert f"-Cprocessors={new_cpu_limit}" in command

assert resources["limits"]["cpu"] == new_cpu_limit
assert resources["limits"]["memory"] == new_memory_limit

Expand Down
18 changes: 14 additions & 4 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,20 @@ async def start_cluster(
}

if resource_requests:
body["spec"]["nodes"]["data"][0]["resources"]["requests"] = {
"cpu": resource_requests["cpu"],
"memory": resource_requests["memory"],
}
if "limits" in resource_requests and "requests" in resource_requests:
body["spec"]["nodes"]["data"][0]["resources"]["limits"] = {
"cpu": resource_requests.get("limits", {}).get("cpu", "2"),
"memory": resource_requests.get("limits", {}).get("memory", "4Gi"),
}
body["spec"]["nodes"]["data"][0]["resources"]["requests"] = {
"cpu": resource_requests.get("requests", {}).get("cpu", "2"),
"memory": resource_requests.get("requests", {}).get("memory", "4Gi"),
}
else:
body["spec"]["nodes"]["data"][0]["resources"]["requests"] = {
"cpu": resource_requests.get("cpu", "2"),
"memory": resource_requests.get("memory", "4Gi"),
}

if backups_spec:
body["spec"]["backups"] = backups_spec
Expand Down

0 comments on commit 97c3ed1

Please sign in to comment.