From 246f82bfd3ffc447619bdc6d93eab04b8428045a Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Mon, 24 Jul 2023 14:30:08 +0200 Subject: [PATCH 01/13] add kafka upgrade --- actions.yaml | 3 + lib/charms/data_platform_libs/v0/upgrade.py | 775 ++++++++++++++++++++ metadata.yaml | 2 + src/charm.py | 12 +- src/config.py | 1 + src/literals.py | 10 + src/snap.py | 3 +- src/upgrade.py | 98 +++ 8 files changed, 900 insertions(+), 4 deletions(-) create mode 100644 lib/charms/data_platform_libs/v0/upgrade.py create mode 100644 src/upgrade.py diff --git a/actions.yaml b/actions.yaml index 00ee7d37..69d9670e 100644 --- a/actions.yaml +++ b/actions.yaml @@ -30,3 +30,6 @@ get-admin-credentials: description: Get administrator authentication credentials for client commands The returned client_properties can be used for Kafka bin commands using `--bootstrap-server` and `--command-config` for admin level administration This action must be called on the leader unit. + +pre-upgrade-check: + description: Run necessary pre-upgrade checks before executing a charm upgrade. diff --git a/lib/charms/data_platform_libs/v0/upgrade.py b/lib/charms/data_platform_libs/v0/upgrade.py new file mode 100644 index 00000000..f2cd1b56 --- /dev/null +++ b/lib/charms/data_platform_libs/v0/upgrade.py @@ -0,0 +1,775 @@ +# Copyright 2023 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +r"""Handler for `upgrade` relation events for in-place upgrades on VMs.""" + +import json +import logging +from abc import ABC, abstractmethod +from typing import Iterable, List, Literal, Optional, Tuple + +from ops.charm import ( + ActionEvent, + CharmBase, + CharmEvents, + RelationCreatedEvent, + UpgradeCharmEvent, +) +from ops.framework import EventBase, EventSource, Object +from ops.model import Relation, Unit +from pydantic import BaseModel, root_validator, validator + +# The unique Charmhub library identifier, never change it +LIBID = "156258aefb79435a93d933409a8c8684" + +# Increment this major API version when introducing breaking changes +LIBAPI = 0 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 5 + +PYDEPS = ["pydantic>=1.10,<2"] + +logger = logging.getLogger(__name__) + +# --- DEPENDENCY RESOLUTION FUNCTIONS --- + + +def build_complete_sem_ver(version: str) -> list[int]: + """Builds complete major.minor.patch version from version string. + + Returns: + List of major.minor.patch version integers + """ + versions = [int(ver) if ver != "*" else 0 for ver in str(version).split(".")] + + # padding with 0s until complete major.minor.patch + return (versions + 3 * [0])[:3] + + +def verify_caret_requirements(version: str, requirement: str) -> bool: + """Verifies version requirements using carats. + + Args: + version: the version currently in use + requirement: the requirement version + + Returns: + True if `version` meets defined `requirement`. Otherwise False + """ + if not requirement.startswith("^"): + return True + + requirement = requirement[1:] + + sem_version = build_complete_sem_ver(version) + sem_requirement = build_complete_sem_ver(requirement) + + # caret uses first non-zero character, not enough to just count '. + max_version_index = requirement.count(".") + for i, semver in enumerate(sem_requirement): + if semver != 0: + max_version_index = i + break + + for i in range(3): + # version higher than first non-zero + if (i < max_version_index) and (sem_version[i] > sem_requirement[i]): + return False + + # version either higher or lower than first non-zero + if (i == max_version_index) and (sem_version[i] != sem_requirement[i]): + return False + + # valid + if (i > max_version_index) and (sem_version[i] > sem_requirement[i]): + return True + + return False + + +def verify_tilde_requirements(version: str, requirement: str) -> bool: + """Verifies version requirements using tildes. + + Args: + version: the version currently in use + requirement: the requirement version + + Returns: + True if `version` meets defined `requirement`. Otherwise False + """ + if not requirement.startswith("~"): + return True + + requirement = requirement[1:] + + sem_version = build_complete_sem_ver(version) + sem_requirement = build_complete_sem_ver(requirement) + + max_version_index = min(1, requirement.count(".")) + + for i in range(3): + # version higher before requirement level + if (i < max_version_index) and (sem_version[i] > sem_requirement[i]): + return False + + # version either higher or lower at requirement level + if (i == max_version_index) and (sem_version[i] != sem_requirement[i]): + return False + + # version lower after requirement level + if (i > max_version_index) and (sem_version[i] < sem_requirement[i]): + return False + + # must be valid + return True + + +def verify_wildcard_requirements(version: str, requirement: str) -> bool: + """Verifies version requirements using wildcards. + + Args: + version: the version currently in use + requirement: the requirement version + + Returns: + True if `version` meets defined `requirement`. Otherwise False + """ + if "*" not in requirement: + return True + + sem_version = build_complete_sem_ver(version) + sem_requirement = build_complete_sem_ver(requirement) + + max_version_index = requirement.count(".") + + for i in range(3): + # version not the same before wildcard + if (i < max_version_index) and (sem_version[i] != sem_requirement[i]): + return False + + # version not higher after wildcard + if (i == max_version_index) and (sem_version[i] < sem_requirement[i]): + return False + + # must be valid + return True + + +def verify_inequality_requirements(version: str, requirement: str) -> bool: + """Verifies version requirements using inequalities. + + Args: + version: the version currently in use + requirement: the requirement version + + Returns: + True if `version` meets defined `requirement`. Otherwise False + """ + if not any(char for char in [">", ">="] if requirement.startswith(char)): + return True + + raw_requirement = requirement.replace(">", "").replace("=", "") + + sem_version = build_complete_sem_ver(version) + sem_requirement = build_complete_sem_ver(raw_requirement) + + max_version_index = raw_requirement.count(".") or 0 + + for i in range(3): + # valid at same requirement level + if ( + (i == max_version_index) + and ("=" in requirement) + and (sem_version[i] == sem_requirement[i]) + ): + return True + + # version not increased at any point + if sem_version[i] < sem_requirement[i]: + return False + + # valid + if sem_version[i] > sem_requirement[i]: + return True + + # must not be valid + return False + + +def verify_requirements(version: str, requirement: str) -> bool: + """Verifies a specified version against defined requirements. + + Supports caret (`^`), tilde (`~`), wildcard (`*`) and greater-than inequalities (`>`, `>=`) + + Args: + version: the version currently in use + requirement: the requirement version + + Returns: + True if `version` meets defined `requirement`. Otherwise False + """ + if not all( + [ + verify_inequality_requirements(version=version, requirement=requirement), + verify_caret_requirements(version=version, requirement=requirement), + verify_tilde_requirements(version=version, requirement=requirement), + verify_wildcard_requirements(version=version, requirement=requirement), + ] + ): + return False + + return True + + +# --- DEPENDENCY MODEL TYPES --- + + +class DependencyModel(BaseModel): + """Manager for a single dependency. + + To be used as part of another model representing a collection of arbitrary dependencies. + + Example:: + + class KafkaDependenciesModel(BaseModel): + kafka_charm: DependencyModel + kafka_service: DependencyModel + + deps = { + "kafka_charm": { + "dependencies": {"zookeeper": ">5"}, + "name": "kafka", + "upgrade_supported": ">5", + "version": "10", + }, + "kafka_service": { + "dependencies": {"zookeeper": "^3.6"}, + "name": "kafka", + "upgrade_supported": "~3.3", + "version": "3.3.2", + }, + } + + model = KafkaDependenciesModel(**deps) # loading dict in to model + + print(model.dict()) # exporting back validated deps + """ + + dependencies: dict[str, str] + name: str + upgrade_supported: str + version: str + + @validator("dependencies", "upgrade_supported", each_item=True) + @classmethod + def dependencies_validator(cls, value): + """Validates values with dependencies for multiple special characters.""" + if isinstance(value, dict): + deps = value.values() + else: + deps = [value] + + chars = ["~", "^", ">", "*"] + + for dep in deps: + if (count := sum([dep.count(char) for char in chars])) != 1: + raise ValueError( + f"Value uses greater than 1 special character (^ ~ > *). Found {count}." + ) + + return value + + @root_validator(skip_on_failure=True) + @classmethod + def version_upgrade_supported_validator(cls, values): + """Validates specified `version` meets `upgrade_supported` requirement.""" + if not verify_requirements( + version=values.get("version"), requirement=values.get("upgrade_supported") + ): + raise ValueError( + f"upgrade_supported value {values.get('upgrade_supported')} greater than version value {values.get('version')} for {values.get('name')}." + ) + + return values + + def can_upgrade(self, dependency: "DependencyModel") -> bool: + """Compares two instances of :class:`DependencyModel` for upgradability. + + Args: + dependency: a dependency model to compare this model against + + Returns: + True if current model can upgrade from dependent model. Otherwise False + """ + return verify_requirements(version=self.version, requirement=dependency.upgrade_supported) + + +# --- CUSTOM EXCEPTIONS --- + + +class UpgradeError(Exception): + """Base class for upgrade related exceptions in the module.""" + + def __init__(self, message: str, cause: Optional[str], resolution: Optional[str]): + super().__init__(message) + self.message = message + self.cause = cause or "" + self.resolution = resolution or "" + + def __repr__(self): + """Representation of the UpgradeError class.""" + return f"{type(self).__module__}.{type(self).__name__} - {str(vars(self))}" + + def __str__(self): + """String representation of the UpgradeError class.""" + return repr(self) + + +class ClusterNotReadyError(UpgradeError): + """Exception flagging that the cluster is not ready to start upgrading. + + For example, if the cluster fails :class:`DataUpgrade._on_pre_upgrade_check_action` + + Args: + message: string message to be logged out + cause: short human-readable description of the cause of the error + resolution: short human-readable instructions for manual error resolution (optional) + """ + + def __init__(self, message: str, cause: str, resolution: Optional[str] = None): + super().__init__(message, cause=cause, resolution=resolution) + + +class VersionError(UpgradeError): + """Exception flagging that the old `version` fails to meet the new `upgrade_supported`s. + + For example, upgrades from version `2.x` --> `4.x`, + but `4.x` only supports upgrading from `3.x` onwards + + Args: + message: string message to be logged out + cause: short human-readable description of the cause of the error + resolution: short human-readable instructions for manual solutions to the error (optional) + """ + + def __init__(self, message: str, cause: str, resolution: Optional[str] = None): + super().__init__(message, cause=cause, resolution=resolution) + + +class DependencyError(UpgradeError): + """Exception flagging that some new `dependency` is not being met. + + For example, new version requires related App version `2.x`, but currently is `1.x` + + Args: + message: string message to be logged out + cause: short human-readable description of the cause of the error + resolution: short human-readable instructions for manual solutions to the error (optional) + """ + + def __init__(self, message: str, cause: str, resolution: Optional[str] = None): + super().__init__(message, cause=cause, resolution=resolution) + + +# --- CUSTOM EVENTS --- + + +class UpgradeGrantedEvent(EventBase): + """Used to tell units that they can process an upgrade. + + Handlers of this event must meet the following: + - SHOULD check for related application deps from :class:`DataUpgrade.dependencies` + - MAY raise :class:`DependencyError` if dependency not met + - MUST update unit `state` after validating the success of the upgrade, calling one of: + - :class:`DataUpgrade.set_unit_failed` if the unit upgrade fails + - :class:`DataUpgrade.set_unit_completed` if the unit upgrade succeeds + - MUST call :class:`DataUpgarde.on_upgrade_changed` on exit so event not lost on leader + """ + + +class UpgradeEvents(CharmEvents): + """Upgrade events. + + This class defines the events that the lib can emit. + """ + + upgrade_granted = EventSource(UpgradeGrantedEvent) + + +# --- EVENT HANDLER --- + + +class DataUpgrade(Object, ABC): + """Manages `upgrade` relation operators for in-place upgrades.""" + + STATES = ["failed", "idle", "ready", "upgrading", "completed"] + + on = UpgradeEvents() # pyright: ignore [reportGeneralTypeIssues] + + def __init__( + self, + charm: CharmBase, + dependency_model: BaseModel, + relation_name: str = "upgrade", + substrate: Literal["vm", "k8s"] = "vm", + ): + super().__init__(charm, relation_name) + self.charm = charm + self.dependency_model = dependency_model + self.relation_name = relation_name + self.substrate = substrate + self._upgrade_stack = None + + # events + self.framework.observe( + self.charm.on[relation_name].relation_created, self._on_upgrade_created + ) + self.framework.observe( + self.charm.on[relation_name].relation_changed, self.on_upgrade_changed + ) + self.framework.observe(self.charm.on.upgrade_charm, self._on_upgrade_charm) + self.framework.observe(getattr(self.on, "upgrade_granted"), self._on_upgrade_granted) + + # actions + self.framework.observe( + getattr(self.charm.on, "pre_upgrade_check_action"), self._on_pre_upgrade_check_action + ) + + @property + def peer_relation(self) -> Optional[Relation]: + """The upgrade peer relation.""" + return self.charm.model.get_relation(self.relation_name) + + @property + def app_units(self) -> Iterable[Unit]: + """The peer-related units in the application.""" + if not self.peer_relation: + return [] + + return set([self.charm.unit] + list(self.peer_relation.units)) + + @property + def state(self) -> Optional[str]: + """The unit state from the upgrade peer relation.""" + if not self.peer_relation: + return None + + return self.peer_relation.data[self.charm.unit].get("state", None) + + @property + def stored_dependencies(self) -> Optional[BaseModel]: + """The application dependencies from the upgrade peer relation.""" + if not self.peer_relation: + return None + + if not (deps := self.peer_relation.data[self.charm.app].get("dependencies", "")): + return None + + return type(self.dependency_model)(**json.loads(deps)) + + @property + def upgrade_stack(self) -> Optional[List[int]]: + """Gets the upgrade stack from the upgrade peer relation. + + Unit.ids are ordered Last-In-First-Out (LIFO). + i.e unit.id at index `-1` is the first unit to upgrade. + unit.id at index `0` is the last unit to upgrade. + + Returns: + List of integer unit.ids, ordered in upgrade order in a stack + """ + if not self.peer_relation: + return None + + # lazy-load + if self._upgrade_stack is None: + self._upgrade_stack = ( + json.loads(self.peer_relation.data[self.charm.app].get("upgrade-stack", "[]")) + or None + ) + + return self._upgrade_stack + + @upgrade_stack.setter + def upgrade_stack(self, stack: List[int]) -> None: + """Sets the upgrade stack to the upgrade peer relation. + + Unit.ids are ordered Last-In-First-Out (LIFO). + i.e unit.id at index `-1` is the first unit to upgrade. + unit.id at index `0` is the last unit to upgrade. + """ + if not self.peer_relation: + return + + self.peer_relation.data[self.charm.app].update({"upgrade-stack": json.dumps(stack)}) + self._upgrade_stack = stack + + @property + def cluster_state(self) -> Optional[str]: + """Current upgrade state for cluster units. + + Determined from :class:`DataUpgrade.STATE`, taking the lowest ordinal unit state. + + For example, if units in have states: `["ready", "upgrading", "completed"]`, + the overall state for the cluster is `ready`. + + Returns: + String of upgrade state from the furthest behind unit. + """ + if not self.peer_relation: + return None + + states = [self.peer_relation.data[unit].get("state", "") for unit in self.app_units] + + try: + return sorted(states, key=self.STATES.index)[0] + except (ValueError, KeyError): + return None + + @abstractmethod + def pre_upgrade_check(self) -> None: + """Runs necessary checks validating the cluster is in a healthy state to upgrade. + + Called by all units during :meth:`_on_pre_upgrade_check_action`. + + Raises: + :class:`ClusterNotReadyError`: if cluster is not ready to upgrade + """ + pass + + def build_upgrade_stack(self) -> List[int]: + """Builds ordered iterable of all application unit.ids to upgrade in. + + Called by leader unit during :meth:`_on_pre_upgrade_check_action`. + + Returns: + Iterable of integeter unit.ids, LIFO ordered in upgrade order + i.e `[5, 2, 4, 1, 3]`, unit `3` upgrades first, `5` upgrades last + """ + # don't raise if k8s substrate, uses default statefulset order + if self.substrate == "k8s": + return [] + + raise NotImplementedError + + @abstractmethod + def log_rollback_instructions(self) -> None: + """Sets charm state and logs out rollback instructions. + + Called by all units when `state=failed` found during :meth:`_on_upgrade_changed`. + """ + pass + + def set_unit_failed(self) -> None: + """Sets unit `state=failed` to the upgrade peer data.""" + if not self.peer_relation: + return None + + # needed to refresh the stack + # now leader pulls a fresh stack from newly updated relation data + if self.charm.unit.is_leader(): + self._upgrade_stack = None + + self.peer_relation.data[self.charm.unit].update({"state": "failed"}) + + def set_unit_completed(self) -> None: + """Sets unit `state=completed` to the upgrade peer data.""" + if not self.peer_relation: + return None + + # needed to refresh the stack + # now leader pulls a fresh stack from newly updated relation data + if self.charm.unit.is_leader(): + self._upgrade_stack = None + + self.peer_relation.data[self.charm.unit].update({"state": "completed"}) + + def _on_upgrade_created(self, event: RelationCreatedEvent) -> None: + """Handler for `upgrade-relation-created` events.""" + if not self.peer_relation: + event.defer() + return + + # setting initial idle state needed to avoid execution on upgrade-changed events + self.peer_relation.data[self.charm.unit].update({"state": "idle"}) + + if self.charm.unit.is_leader(): + logger.debug("Persisting dependencies to upgrade relation data...") + self.peer_relation.data[self.charm.app].update( + {"dependencies": json.dumps(self.dependency_model.dict())} + ) + + def _on_pre_upgrade_check_action(self, event: ActionEvent) -> None: + """Handler for `pre-upgrade-check-action` events.""" + if not self.peer_relation: + event.fail(message="Could not find upgrade relation.") + return + + if not self.charm.unit.is_leader(): + event.fail(message="Action must be ran on the Juju leader.") + return + + # checking if upgrade in progress + if self.cluster_state != "idle": + event.fail("Cannot run pre-upgrade checks, cluster already upgrading.") + return + + try: + logger.info("Running pre-upgrade-check...") + self.pre_upgrade_check() + + if self.substrate == "k8s": + logger.info("Building upgrade-stack for K8s...") + built_upgrade_stack = sorted( + [int(unit.name.split("/")[1]) for unit in self.app_units] + ) + else: + logger.info("Building upgrade-stack for VMs...") + built_upgrade_stack = self.build_upgrade_stack() + + logger.debug(f"Built upgrade stack of {built_upgrade_stack}") + + except ClusterNotReadyError as e: + logger.error(e) + event.fail(message=e.message) + return + except Exception as e: + logger.error(e) + event.fail(message="Unknown error found.") + return + + logger.info("Setting upgrade-stack to relation data...") + self.upgrade_stack = built_upgrade_stack + + def _upgrade_supported_check(self) -> None: + """Checks if previous versions can be upgraded to new versions. + + Raises: + :class:`VersionError` if upgrading to existing `version` is not supported + """ + keys = self.dependency_model.__fields__.keys() + + compatible = True + incompatibilities: List[Tuple[str, str, str, str]] = [] + for key in keys: + old_dep: DependencyModel = getattr(self.stored_dependencies, key) + new_dep: DependencyModel = getattr(self.dependency_model, key) + + if not old_dep.can_upgrade(dependency=new_dep): + compatible = False + incompatibilities.append( + (key, old_dep.version, new_dep.version, new_dep.upgrade_supported) + ) + + base_message = "Versions incompatible" + base_cause = "Upgrades only supported for specific versions" + if not compatible: + for incompat in incompatibilities: + base_message += ( + f", {incompat[0]} {incompat[1]} can not be upgraded to {incompat[2]}" + ) + base_cause += f", {incompat[0]} versions satisfying requirement {incompat[3]}" + + raise VersionError( + message=base_message, + cause=base_cause, + ) + + def _on_upgrade_charm(self, event: UpgradeCharmEvent) -> None: + """Handler for `upgrade-charm` events.""" + # defer if not all units have pre-upgraded + if not self.peer_relation: + event.defer() + return + + # if any other unit failed or if no stack (i.e pre-upgrade check), mark failed + if not self.upgrade_stack or self.cluster_state == "failed": + logger.error( + "Cluster upgrade failed. Setting failed upgrade state... {}".format( + "Ensure pre-upgrade checks are ran first" if not self.upgrade_stack else "" + ) + ) + self.set_unit_failed() + self.log_rollback_instructions() + return + + # run version checks on leader only + if self.charm.unit.is_leader(): + try: + self._upgrade_supported_check() + except VersionError as e: # not ready if not passed check + logger.error(e) + self.set_unit_failed() + return + + # all units sets state to ready + self.peer_relation.data[self.charm.unit].update({"state": "ready"}) + + def on_upgrade_changed(self, event: EventBase) -> None: + """Handler for `upgrade-relation-changed` events.""" + if not self.peer_relation: + return + + # if any other unit failed, mark as failed + if self.cluster_state == "failed": + logger.error("Cluster upgrade failed. Setting failed upgrade state...") + self.set_unit_failed() + self.log_rollback_instructions() + return + + # if all units completed, mark as complete + if not self.upgrade_stack: + if self.state == "completed" and self.cluster_state in ["idle", "completed"]: + logger.info("All units completed upgrade, setting idle upgrade state...") + self.peer_relation.data[self.charm.unit].update({"state": "idle"}) + return + if self.cluster_state == "idle": + logger.debug("upgrade-changed event handled before pre-checks, exiting...") + return + else: + logger.debug("Did not find upgrade-stack or completed cluster state, deferring...") + event.defer() + return + + # pop mutates the `upgrade_stack` attr + top_unit_id = self.upgrade_stack.pop() + top_unit = self.charm.model.get_unit(f"{self.charm.app.name}/{top_unit_id}") + top_state = self.peer_relation.data[top_unit].get("state") + + # if top of stack is completed, leader pops it + if self.charm.unit.is_leader() and top_state == "completed": + logger.debug(f"{top_unit} has finished upgrading, updating stack...") + + # writes the mutated attr back to rel data + self.peer_relation.data[self.charm.app].update( + {"upgrade-stack": json.dumps(self.upgrade_stack)} + ) + + # recurse on leader to ensure relation changed event not lost + # in case leader is next or the last unit to complete + self.on_upgrade_changed(event) + + # if unit top of stack, emit granted event + if self.charm.unit == top_unit and top_state in ["ready", "upgrading"]: + logger.debug( + f"{top_unit} is next to upgrade, emitting `upgrade_granted` event and upgrading..." + ) + self.peer_relation.data[self.charm.unit].update({"state": "upgrading"}) + getattr(self.on, "upgrade_granted").emit() + + @abstractmethod + def _on_upgrade_granted(self, event: UpgradeGrantedEvent) -> None: + """Handler for `upgrade-granted` events.""" diff --git a/metadata.yaml b/metadata.yaml index f34ff197..791b2e5d 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -19,6 +19,8 @@ peers: interface: cluster restart: interface: rolling_op + upgrade: + interface: upgrade requires: zookeeper: diff --git a/src/charm.py b/src/charm.py index 2dd75568..376c9490 100755 --- a/src/charm.py +++ b/src/charm.py @@ -31,6 +31,7 @@ from literals import ( ADMIN_USER, CHARM_KEY, + DEPENDENCIES, INTERNAL_USERS, JMX_EXPORTER_PORT, LOGS_RULES_DIR, @@ -45,6 +46,7 @@ from snap import KafkaSnap from structured_config import CharmConfig from tls import KafkaTLS +from upgrade import KafkaDependencyModel, KafkaUpgrade from utils import ( broker_active, generate_password, @@ -70,6 +72,12 @@ def __init__(self, *args): self.provider = KafkaProvider(self) self.health = KafkaHealth(self) self.restart = RollingOpsManager(self, relation="restart", callback=self._restart) + self.upgrade = KafkaUpgrade( + self, + dependency_model=KafkaDependencyModel( + **DEPENDENCIES # pyright: ignore[reportGeneralTypeIssues] + ), + ) self.framework.observe(getattr(self.on, "start"), self._on_start) self.framework.observe(getattr(self.on, "install"), self._on_install) @@ -185,7 +193,7 @@ def healthy(self) -> bool: def _on_update_status(self, event: EventBase) -> None: """Handler for `update-status` events.""" - if not self.healthy: + if not self.healthy or not self.upgrade.idle: return # NOTE: integration with kafka-broker-rack-awareness charm. @@ -319,7 +327,7 @@ def _on_start(self, event: EventBase) -> None: def _on_config_changed(self, event: EventBase) -> None: """Generic handler for most `config_changed` events across relations.""" # only overwrite properties if service is already active - if not self.healthy: + if not self.healthy or not self.upgrade.idle: event.defer() return diff --git a/src/config.py b/src/config.py index 654dc895..395f26b5 100644 --- a/src/config.py +++ b/src/config.py @@ -512,6 +512,7 @@ def server_properties(self) -> List[str]: f"listeners={','.join(listeners_repr)}", f"advertised.listeners={','.join(advertised_listeners)}", f"inter.broker.listener.name={self.internal_listener.name}", + f"inter.broker.protocol.version={self.charm.upgrade.current_version}", ] + self.config_properties + self.scram_properties diff --git a/src/literals.py b/src/literals.py index 66d7dbf0..873cab70 100644 --- a/src/literals.py +++ b/src/literals.py @@ -88,3 +88,13 @@ class Status(Enum): ActiveStatus("machine system settings are not optimal - see logs for info"), "WARNING", ) + + +DEPENDENCIES = { + "service": { + "dependencies": {}, + "name": "kafka", + "upgrade_supported": ">3", + "version": "3.3", + }, +} diff --git a/src/snap.py b/src/snap.py index 876263c0..1cf9b937 100644 --- a/src/snap.py +++ b/src/snap.py @@ -49,8 +49,7 @@ def install(self) -> bool: cache = snap.SnapCache() kafka = cache[SNAP_NAME] - if not kafka.present: - kafka.ensure(snap.SnapState.Present, revision=CHARMED_KAFKA_SNAP_REVISION) + kafka.ensure(snap.SnapState.Present, revision=CHARMED_KAFKA_SNAP_REVISION) self.kafka = kafka self.kafka.connect(plug="removable-media") diff --git a/src/upgrade.py b/src/upgrade.py new file mode 100644 index 00000000..19d75712 --- /dev/null +++ b/src/upgrade.py @@ -0,0 +1,98 @@ +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Manager for handling Kafka in-place upgrades.""" + +import logging +from typing import TYPE_CHECKING + +from charms.data_platform_libs.v0.upgrade import ( + ClusterNotReadyError, + DataUpgrade, + DependencyModel, + UpgradeGrantedEvent, +) +from pydantic import BaseModel +from typing_extensions import override + +if TYPE_CHECKING: + from charm import KafkaCharm + +logger = logging.getLogger(__name__) + + +class KafkaDependencyModel(BaseModel): + """Model for Kafka Operator dependencies.""" + + service: DependencyModel + + +class KafkaUpgrade(DataUpgrade): + """Implementation of :class:`DataUpgrade` overrides for in-place upgrades.""" + + def __init__(self, charm: "KafkaCharm", **kwargs): + super().__init__(charm, **kwargs) + self.charm = charm + + @property + def idle(self) -> bool: + """Checks if cluster state is idle. + + Returns: + True if cluster state is idle. Otherwise False + """ + return self.cluster_state == "idle" + + @property + def current_version(self) -> str: + """Get current Kafka version.""" + dependency_model: DependencyModel = getattr(self.dependency_model, "service") + return dependency_model.version + + @override + def pre_upgrade_check(self) -> None: + default_message = "Pre-upgrade check failed and cannot safely upgrade" + if not self.charm.healthy: + raise ClusterNotReadyError(message=default_message, cause="Cluster is not healthy") + + @override + def build_upgrade_stack(self) -> list[int]: + upgrade_stack = [] + units = set([self.charm.unit] + list(self.charm.peer_relation.units)) # type: ignore[reportOptionalMemberAccess] + for unit in units: + upgrade_stack.append(int(unit.name.split("/")[-1])) + + return upgrade_stack + + @override + def log_rollback_instructions(self) -> None: + logger.warning("SOME USEFUL INSTRUCTIONS") # TODO + + @override + def _on_upgrade_granted(self, event: UpgradeGrantedEvent) -> None: + self.charm.snap.stop_snap_service() + + if not self.charm.snap.install(): + logger.error("Unable to install Snap") + self.set_unit_failed() + return + + logger.info(f"{self.charm.unit.name} upgrading service...") + self.charm.snap.restart_snap_service() + + try: + logger.debug("Running post-upgrade check...") + self.pre_upgrade_check() + + logger.debug("Marking unit completed...") + self.set_unit_completed() + + # ensures leader gets it's own relation-changed when it upgrades + if self.charm.unit.is_leader(): + logger.debug("Re-emitting upgrade-changed on leader...") + self.on_upgrade_changed(event) + # If idle run peer config_changed + + except ClusterNotReadyError as e: + logger.error(e.cause) + self.set_unit_failed() From f8f0fb5c5264a7c787bba075b45ef7c38d69ed3d Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Fri, 4 Aug 2023 10:20:21 +0200 Subject: [PATCH 02/13] add zk check --- lib/charms/zookeeper/v0/client.py | 18 ++++++++++++++++++ src/literals.py | 4 ++-- src/upgrade.py | 18 ++++++++++++++++++ src/utils.py | 18 ++++++++++++++++++ 4 files changed, 56 insertions(+), 2 deletions(-) diff --git a/lib/charms/zookeeper/v0/client.py b/lib/charms/zookeeper/v0/client.py index fa20cd7b..e0f3e57f 100644 --- a/lib/charms/zookeeper/v0/client.py +++ b/lib/charms/zookeeper/v0/client.py @@ -399,6 +399,24 @@ def delete_znode_leader(self, path: str) -> None: ) as zk: zk.delete_znode(path=path) + def get_version(self) -> str: + """Get ZooKeeper version. + + Version will have a similar format to + 3.6.4--d65253dcf68e9097c6e95a126463fd5fdeb4521c, built on 12/18/2022 18:10 GMT + """ + with ZooKeeperClient( + host=self.leader, + client_port=self.client_port, + username=self.username, + password=self.password, + use_ssl=self.use_ssl, + keyfile_path=self.keyfile_path, + keyfile_password=self.keyfile_password, + certfile_path=self.certfile_path, + ) as zk: + return zk.srvr["Zookeeper version"] + class ZooKeeperClient: """Handler for ZooKeeper connections and running 4lw client commands.""" diff --git a/src/literals.py b/src/literals.py index 873cab70..4a968fa6 100644 --- a/src/literals.py +++ b/src/literals.py @@ -13,7 +13,7 @@ CHARM_KEY = "kafka" SNAP_NAME = "charmed-kafka" -CHARMED_KAFKA_SNAP_REVISION = 16 +CHARMED_KAFKA_SNAP_REVISION = 17 PEER = "cluster" ZK = "zookeeper" @@ -92,7 +92,7 @@ class Status(Enum): DEPENDENCIES = { "service": { - "dependencies": {}, + "dependencies": {"zookeeper": ">3"}, "name": "kafka", "upgrade_supported": ">3", "version": "3.3", diff --git a/src/upgrade.py b/src/upgrade.py index 19d75712..a1f807fa 100644 --- a/src/upgrade.py +++ b/src/upgrade.py @@ -11,10 +11,13 @@ DataUpgrade, DependencyModel, UpgradeGrantedEvent, + verify_requirements, ) from pydantic import BaseModel from typing_extensions import override +from utils import get_zookeeper_version + if TYPE_CHECKING: from charm import KafkaCharm @@ -49,6 +52,12 @@ def current_version(self) -> str: dependency_model: DependencyModel = getattr(self.dependency_model, "service") return dependency_model.version + @property + def zookeeper_current_version(self) -> str: + """Get current Zookeeper version.""" + version = get_zookeeper_version(zookeeper_config=self.charm.kafka_config.zookeeper_config) + return version.split("-")[0] # Remove build information from version + @override def pre_upgrade_check(self) -> None: default_message = "Pre-upgrade check failed and cannot safely upgrade" @@ -70,6 +79,15 @@ def log_rollback_instructions(self) -> None: @override def _on_upgrade_granted(self, event: UpgradeGrantedEvent) -> None: + dependency_model: DependencyModel = getattr(self.dependency_model, "service") + if not verify_requirements( + version=self.zookeeper_current_version, + requirement=dependency_model.dependencies["zookeeper"], + ): + logger.error("ZooKeeper requirement not met") + self.set_unit_failed() + return + self.charm.snap.stop_snap_service() if not self.charm.snap.install(): diff --git a/src/utils.py b/src/utils.py index d02a9778..b331be67 100644 --- a/src/utils.py +++ b/src/utils.py @@ -74,6 +74,24 @@ def get_active_brokers(zookeeper_config: Dict[str, str]) -> Set[str]: return brokers +def get_zookeeper_version(zookeeper_config: Dict[str, str]) -> str: + """Get running zookeeper version. + + Args: + zookeeper_config: the relation provided by ZooKeeper + + Returns: + zookeeper version + """ + hosts = zookeeper_config.get("endpoints", "").split(",") + username = zookeeper_config.get("username", "") + password = zookeeper_config.get("password", "") + + zk = ZooKeeperManager(hosts=hosts, username=username, password=password) + + return zk.get_version() + + def safe_get_file(filepath: str) -> Optional[List[str]]: """Load file contents from charm workload. From 8855276770292659b5acf6b1f72dcde931663253 Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Fri, 4 Aug 2023 11:02:22 +0200 Subject: [PATCH 03/13] fix snap revision --- src/literals.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/literals.py b/src/literals.py index 4a968fa6..f54cd8c7 100644 --- a/src/literals.py +++ b/src/literals.py @@ -13,7 +13,7 @@ CHARM_KEY = "kafka" SNAP_NAME = "charmed-kafka" -CHARMED_KAFKA_SNAP_REVISION = 17 +CHARMED_KAFKA_SNAP_REVISION = 16 PEER = "cluster" ZK = "zookeeper" From 560675243d3abba9dd1e04cb9c07b8e30db956b4 Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Mon, 7 Aug 2023 12:22:21 +0200 Subject: [PATCH 04/13] fix unit tests --- tests/unit/test_charm.py | 13 +++++++++++-- tests/unit/test_upgrade.py | 0 2 files changed, 11 insertions(+), 2 deletions(-) create mode 100644 tests/unit/test_upgrade.py diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index fda5e373..523e7e95 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -158,6 +158,7 @@ def test_update_status_blocks_if_broker_not_active(harness, zk_data, passwords_d with ( patch("snap.KafkaSnap.active", return_value=True), patch("charm.broker_active", return_value=False) as patched_broker_active, + patch("upgrade.KafkaUpgrade.idle", return_value=True), ): harness.charm.on.update_status.emit() assert patched_broker_active.call_count == 1 @@ -178,6 +179,7 @@ def test_update_status_blocks_if_no_service(harness, zk_data, passwords_data): ), patch("charm.KafkaCharm.healthy", return_value=True), patch("charm.broker_active", return_value=True), + patch("upgrade.KafkaUpgrade.idle", return_value=True), ): harness.charm.on.update_status.emit() assert isinstance(harness.charm.unit.status, BlockedStatus) @@ -194,6 +196,7 @@ def test_update_status_sets_active(harness, zk_data, passwords_data): patch("snap.KafkaSnap.active", return_value=True), patch("charm.broker_active", return_value=True), patch("health.KafkaHealth.machine_configured", return_value=True), + patch("upgrade.KafkaUpgrade.idle", return_value=True), ): harness.charm.on.update_status.emit() assert isinstance(harness.charm.unit.status, ActiveStatus) @@ -256,6 +259,7 @@ def test_storage_add_disableenables_and_starts(harness, zk_data, passwords_data) with ( patch("snap.KafkaSnap.active", return_value=True), patch("charm.KafkaCharm.healthy", new_callable=PropertyMock(return_value=True)), + patch("upgrade.KafkaUpgrade.idle", return_value=True), patch("config.KafkaConfig.set_server_properties"), patch("config.KafkaConfig.set_client_properties"), patch("charm.safe_get_file", return_value=["gandalf=grey"]), @@ -287,6 +291,7 @@ def test_storage_detaching_disableenables_and_starts(harness, zk_data, passwords with ( patch("snap.KafkaSnap.active", return_value=True), patch("charm.KafkaCharm.healthy", new_callable=PropertyMock(return_value=True)), + patch("upgrade.KafkaUpgrade.idle", return_value=True), patch("config.KafkaConfig.set_server_properties"), patch("config.KafkaConfig.set_client_properties"), patch("charm.safe_get_file", return_value=["gandalf=grey"]), @@ -515,6 +520,7 @@ def test_config_changed_updates_server_properties(harness): ), patch("charm.KafkaCharm.ready_to_start", new_callable=PropertyMock, return_value=True), patch("charm.KafkaCharm.healthy", new_callable=PropertyMock, return_value=True), + patch("upgrade.KafkaUpgrade.idle", return_value=True), patch("charm.safe_get_file", return_value=["gandalf=grey"]), patch("config.KafkaConfig.set_server_properties") as set_server_properties, patch("config.KafkaConfig.set_client_properties"), @@ -542,6 +548,7 @@ def test_config_changed_updates_client_properties(harness): ), patch("charm.KafkaCharm.ready_to_start", new_callable=PropertyMock, return_value=True), patch("charm.KafkaCharm.healthy", new_callable=PropertyMock, return_value=True), + patch("upgrade.KafkaUpgrade.idle", return_value=True), patch("charm.safe_get_file", return_value=["gandalf=grey"]), patch("config.KafkaConfig.set_server_properties"), patch("config.KafkaConfig.set_client_properties") as set_client_properties, @@ -564,9 +571,10 @@ def test_config_changed_updates_client_data(harness): return_value=["gandalf=white"], ), patch("charm.KafkaCharm.ready_to_start", new_callable=PropertyMock, return_value=True), + patch("charm.KafkaCharm.healthy", new_callable=PropertyMock, return_value=True), + patch("upgrade.KafkaUpgrade.idle", return_value=True), patch("charm.safe_get_file", return_value=["gandalf=white"]), patch("provider.KafkaProvider.update_connection_info") as patched_update_connection_info, - patch("charm.KafkaCharm.healthy", new_callable=PropertyMock, return_value=True), patch("config.KafkaConfig.set_client_properties") as patched_set_client_properties, ): harness.set_leader(True) @@ -591,13 +599,14 @@ def test_config_changed_restarts(harness): return_value=["gandalf=grey"], ), patch("charm.KafkaCharm.ready_to_start", new_callable=PropertyMock, return_value=True), + patch("charm.KafkaCharm.healthy", new_callable=PropertyMock, return_value=True), patch("charm.safe_get_file", return_value=["gandalf=white"]), + patch("upgrade.KafkaUpgrade.idle", return_value=True), patch("config.safe_write_to_file", return_value=None), patch("snap.KafkaSnap.restart_snap_service") as patched_restart_snap_service, patch("charm.broker_active", return_value=True), patch("config.KafkaConfig.zookeeper_connected", return_value=True), patch("auth.KafkaAuth.add_user"), - patch("charm.KafkaCharm.healthy", new_callable=PropertyMock, return_value=True), patch("config.KafkaConfig.set_zk_jaas_config"), patch("config.KafkaConfig.set_server_properties"), ): diff --git a/tests/unit/test_upgrade.py b/tests/unit/test_upgrade.py new file mode 100644 index 00000000..e69de29b From 428d8b55a7a981b7c25cf6ef8459571823d71649 Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Wed, 9 Aug 2023 10:39:29 +0200 Subject: [PATCH 05/13] add unit tests --- tests/unit/test_upgrade.py | 172 +++++++++++++++++++++++++++++++++++++ 1 file changed, 172 insertions(+) diff --git a/tests/unit/test_upgrade.py b/tests/unit/test_upgrade.py index e69de29b..345625bc 100644 --- a/tests/unit/test_upgrade.py +++ b/tests/unit/test_upgrade.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python3 +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + +import logging +from pathlib import Path +from unittest.mock import MagicMock, PropertyMock, patch + +import pytest +import yaml +from charms.data_platform_libs.v0.upgrade import ClusterNotReadyError, DependencyModel +from kazoo.client import KazooClient +from ops.testing import Harness + +from charm import KafkaCharm +from literals import CHARM_KEY, DEPENDENCIES, PEER, ZK +from snap import KafkaSnap +from upgrade import KafkaDependencyModel, KafkaUpgrade + +logger = logging.getLogger(__name__) + + +CONFIG = str(yaml.safe_load(Path("./config.yaml").read_text())) +ACTIONS = str(yaml.safe_load(Path("./actions.yaml").read_text())) +METADATA = str(yaml.safe_load(Path("./metadata.yaml").read_text())) + + +@pytest.fixture +def harness(zk_data): + harness = Harness(KafkaCharm, meta=METADATA, config=CONFIG, actions=ACTIONS) + harness.add_relation("restart", CHARM_KEY) + harness.add_relation("upgrade", CHARM_KEY) + peer_rel_id = harness.add_relation(PEER, CHARM_KEY) + zk_rel_id = harness.add_relation(ZK, ZK) + harness._update_config( + { + "log_retention_ms": "-1", + "compression_type": "producer", + } + ) + harness.begin() + with harness.hooks_disabled(): + harness.add_relation_unit(peer_rel_id, f"{CHARM_KEY}/0") + harness.update_relation_data( + peer_rel_id, f"{CHARM_KEY}/0", {"private-address": "000.000.000"} + ) + harness.update_relation_data(zk_rel_id, ZK, zk_data) + + return harness + + +def test_pre_upgrade_check_raises_not_stable(harness): + with pytest.raises(ClusterNotReadyError): + harness.charm.upgrade.pre_upgrade_check() + + +def test_pre_upgrade_check_succeeds(harness): + with patch("charm.KafkaCharm.healthy", return_value=True): + harness.charm.upgrade.pre_upgrade_check() + + +def test_build_upgrade_stack(harness): + with harness.hooks_disabled(): + harness.add_relation_unit(harness.charm.peer_relation.id, f"{CHARM_KEY}/1") + harness.update_relation_data( + harness.charm.peer_relation.id, f"{CHARM_KEY}/1", {"private-address": "111.111.111"} + ) + harness.add_relation_unit(harness.charm.peer_relation.id, f"{CHARM_KEY}/2") + harness.update_relation_data( + harness.charm.peer_relation.id, f"{CHARM_KEY}/2", {"private-address": "222.222.222"} + ) + + stack = harness.charm.upgrade.build_upgrade_stack() + + assert len(stack) == 3 + assert len(stack) == len(set(stack)) + + +def test_kafka_dependency_model(): + assert sorted(KafkaDependencyModel.__fields__.keys()) == sorted(DEPENDENCIES.keys()) + + for value in DEPENDENCIES.values(): + assert DependencyModel(**value) + + +def test_upgrade_granted_sets_failed_if_zookeeper_dependency_check_fails(harness): + with ( + patch.object(KazooClient, "start"), + patch("utils.ZooKeeperManager.get_leader", return_value="000.000.000"), + # NOTE: Dependency requires >3 + patch("utils.ZooKeeperManager.get_version", return_value="1.2.3"), + ): + mock_event = MagicMock() + harness.charm.upgrade._on_upgrade_granted(mock_event) + + assert harness.charm.upgrade.state == "failed" + + +def test_upgrade_granted_sets_failed_if_failed_snap(harness): + with ( + patch( + "upgrade.KafkaUpgrade.zookeeper_current_version", + new_callable=PropertyMock, + return_value="3.6", + ), + patch.object(KafkaSnap, "stop_snap_service") as patched_stop, + patch.object(KafkaSnap, "install", return_value=False), + ): + mock_event = MagicMock() + harness.charm.upgrade._on_upgrade_granted(mock_event) + + patched_stop.assert_called_once() + assert harness.charm.upgrade.state == "failed" + + +def test_upgrade_granted_sets_failed_if_failed_upgrade_check(harness): + with ( + patch( + "upgrade.KafkaUpgrade.zookeeper_current_version", + new_callable=PropertyMock, + return_value="3.6", + ), + patch.object(KafkaSnap, "stop_snap_service"), + patch.object(KafkaSnap, "restart_snap_service") as patched_restart, + patch.object(KafkaSnap, "install", return_value=True), + patch("charm.KafkaCharm.healthy", new_callable=PropertyMock, return_value=False), + ): + mock_event = MagicMock() + harness.charm.upgrade._on_upgrade_granted(mock_event) + + patched_restart.assert_called_once() + assert harness.charm.upgrade.state == "failed" + + +def test_upgrade_granted_succeeds(harness): + with ( + patch( + "upgrade.KafkaUpgrade.zookeeper_current_version", + new_callable=PropertyMock, + return_value="3.6", + ), + patch.object(KafkaSnap, "stop_snap_service"), + patch.object(KafkaSnap, "restart_snap_service"), + patch.object(KafkaSnap, "install", return_value=True), + patch("charm.KafkaCharm.healthy", new_callable=PropertyMock, return_value=True), + ): + mock_event = MagicMock() + harness.charm.upgrade._on_upgrade_granted(mock_event) + + assert harness.charm.upgrade.state == "completed" + + +def test_upgrade_granted_recurses_upgrade_changed_on_leader(harness): + with harness.hooks_disabled(): + harness.set_leader(True) + + with ( + patch( + "upgrade.KafkaUpgrade.zookeeper_current_version", + new_callable=PropertyMock, + return_value="3.6", + ), + patch.object(KafkaSnap, "stop_snap_service"), + patch.object(KafkaSnap, "restart_snap_service"), + patch.object(KafkaSnap, "install", return_value=True), + patch("charm.KafkaCharm.healthy", new_callable=PropertyMock, return_value=True), + patch.object(KafkaUpgrade, "on_upgrade_changed") as patched_upgrade, + ): + mock_event = MagicMock() + harness.charm.upgrade._on_upgrade_granted(mock_event) + + patched_upgrade.assert_called_once() From 5d7d3a49c8a80425be8b22e0870cce5c2db895fe Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Wed, 9 Aug 2023 17:04:45 +0200 Subject: [PATCH 06/13] update client lib --- lib/charms/zookeeper/v0/client.py | 40 +++++++++++++++++++++++++------ src/upgrade.py | 3 +-- 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/lib/charms/zookeeper/v0/client.py b/lib/charms/zookeeper/v0/client.py index e0f3e57f..7bd2a65c 100644 --- a/lib/charms/zookeeper/v0/client.py +++ b/lib/charms/zookeeper/v0/client.py @@ -74,7 +74,7 @@ def update_cluster(new_members: List[str], event: EventBase) -> None: # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 5 +LIBPATCH = 7 logger = logging.getLogger(__name__) @@ -235,7 +235,6 @@ def members_syncing(self) -> bool: try: zk_pending_syncs = result["zk_pending_syncs"] except KeyError: # missing key, no quorum, no syncing - logger.debug("no zk_pending_syncs key found, units not syncing") return False if ( @@ -246,6 +245,33 @@ def members_syncing(self) -> bool: return True + @property + def members_broadcasting(self) -> bool: + """Flag to check if any quorum members are currently broadcasting. + + Returns: + True if any members are currently broadcasting. Otherwise False. + """ + for host in self.hosts: + try: + with ZooKeeperClient( + host=host, + client_port=self.client_port, + username=self.username, + password=self.password, + use_ssl=self.use_ssl, + keyfile_path=self.keyfile_path, + keyfile_password=self.keyfile_password, + certfile_path=self.certfile_path, + ) as zk: + if not zk.is_ready: + return False + except KazooTimeoutError: # in the case of having a dead unit in relation data + logger.debug(f"TIMEOUT - {host}") + return False + + return True + def add_members(self, members: Iterable[str]) -> None: """Adds new members to the members' dynamic config. @@ -292,7 +318,7 @@ def add_members(self, members: Iterable[str]) -> None: joining=member, leaving=None, new_members=None, from_config=self.config_version ) - def remove_members(self, members: Iterable[str]): + def remove_members(self, members: Iterable[str]) -> None: """Removes members from the members' dynamic config. Raises: @@ -400,10 +426,10 @@ def delete_znode_leader(self, path: str) -> None: zk.delete_znode(path=path) def get_version(self) -> str: - """Get ZooKeeper version. + """Get ZooKeeper service version from srvr 4lw. - Version will have a similar format to - 3.6.4--d65253dcf68e9097c6e95a126463fd5fdeb4521c, built on 12/18/2022 18:10 GMT + Returns: + String of ZooKeeper service version """ with ZooKeeperClient( host=self.leader, @@ -415,7 +441,7 @@ def get_version(self) -> str: keyfile_password=self.keyfile_password, certfile_path=self.certfile_path, ) as zk: - return zk.srvr["Zookeeper version"] + return zk.srvr["Zookeeper version"].split("-", maxsplit=1)[0] class ZooKeeperClient: diff --git a/src/upgrade.py b/src/upgrade.py index a1f807fa..644bd4f2 100644 --- a/src/upgrade.py +++ b/src/upgrade.py @@ -55,8 +55,7 @@ def current_version(self) -> str: @property def zookeeper_current_version(self) -> str: """Get current Zookeeper version.""" - version = get_zookeeper_version(zookeeper_config=self.charm.kafka_config.zookeeper_config) - return version.split("-")[0] # Remove build information from version + return get_zookeeper_version(zookeeper_config=self.charm.kafka_config.zookeeper_config) @override def pre_upgrade_check(self) -> None: From fc01c03d113182b67aaa0db674c35940693c95be Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Fri, 11 Aug 2023 11:40:35 +0200 Subject: [PATCH 07/13] add pr feedback --- lib/charms/zookeeper/v0/client.py | 2 +- src/upgrade.py | 13 +++++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/lib/charms/zookeeper/v0/client.py b/lib/charms/zookeeper/v0/client.py index 7bd2a65c..e7f03cc5 100644 --- a/lib/charms/zookeeper/v0/client.py +++ b/lib/charms/zookeeper/v0/client.py @@ -74,7 +74,7 @@ def update_cluster(new_members: List[str], event: EventBase) -> None: # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 7 +LIBPATCH = 6 logger = logging.getLogger(__name__) diff --git a/src/upgrade.py b/src/upgrade.py index 644bd4f2..02283855 100644 --- a/src/upgrade.py +++ b/src/upgrade.py @@ -23,6 +23,11 @@ logger = logging.getLogger(__name__) +ROLLBACK_INSTRUCTIONS = """Unit failed to upgrade and requires manual rollback to previous stable version. + 1. Re-run `pre-upgrade-check` action on the leader unit to enter 'recovery' state + 2. Run `juju refresh` to the previously deployed charm revision +""" + class KafkaDependencyModel(BaseModel): """Model for Kafka Operator dependencies.""" @@ -57,6 +62,10 @@ def zookeeper_current_version(self) -> str: """Get current Zookeeper version.""" return get_zookeeper_version(zookeeper_config=self.charm.kafka_config.zookeeper_config) + def post_upgrade_check(self) -> None: + """Runs necessary checks validating the unit is in a healthy state after upgrade.""" + self.pre_upgrade_check() + @override def pre_upgrade_check(self) -> None: default_message = "Pre-upgrade check failed and cannot safely upgrade" @@ -74,7 +83,7 @@ def build_upgrade_stack(self) -> list[int]: @override def log_rollback_instructions(self) -> None: - logger.warning("SOME USEFUL INSTRUCTIONS") # TODO + logger.critical(ROLLBACK_INSTRUCTIONS) @override def _on_upgrade_granted(self, event: UpgradeGrantedEvent) -> None: @@ -99,7 +108,7 @@ def _on_upgrade_granted(self, event: UpgradeGrantedEvent) -> None: try: logger.debug("Running post-upgrade check...") - self.pre_upgrade_check() + self.post_upgrade_check() logger.debug("Marking unit completed...") self.set_unit_completed() From 0cc774ca44212030f9f072b3946bd62781664adc Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Mon, 14 Aug 2023 20:59:28 +0200 Subject: [PATCH 08/13] add pr feedback --- lib/charms/data_platform_libs/v0/upgrade.py | 586 ++++++++++++++++++-- src/literals.py | 6 +- src/upgrade.py | 21 +- 3 files changed, 539 insertions(+), 74 deletions(-) diff --git a/lib/charms/data_platform_libs/v0/upgrade.py b/lib/charms/data_platform_libs/v0/upgrade.py index f2cd1b56..4d528d05 100644 --- a/lib/charms/data_platform_libs/v0/upgrade.py +++ b/lib/charms/data_platform_libs/v0/upgrade.py @@ -12,12 +12,258 @@ # See the License for the specific language governing permissions and # limitations under the License. -r"""Handler for `upgrade` relation events for in-place upgrades on VMs.""" +r"""Library to manage in-place upgrades for charms running on VMs and K8s. + +This library contains handlers for `upgrade` relation events used to coordinate +between units in an application during a `juju refresh`, as well as `Pydantic` models +for instantiating, validating and comparing dependencies. + +An upgrade on VMs is initiated with the command `juju refresh`. Once executed, the following +events are emitted to each unit at random: + - `upgrade-charm` + - `config-changed` + - `leader-settings-changed` - Non-leader only + +Charm authors can implement the classes defined in this library to streamline the process of +coordinating which unit updates when, achieved through updating of unit-data `state` throughout. + +At a high-level, the upgrade steps are as follows: + - Run pre-checks on the cluster to confirm it is safe to upgrade + - Create stack of unit.ids, to serve as the upgrade order (generally workload leader is last) + - Start the upgrade by issuing a Juju CLI command + - The unit at the top of the stack gets permission to upgrade + - The unit handles the upgrade and restarts their service + - Repeat, until all units have restarted + +### Usage by charm authors + +#### `upgrade` relation + +Charm authors must implement an additional peer-relation. + +As this library uses relation data exchanged between units to coordinate, charm authors +need to add a new relation interface. The relation name does not matter. + +`metadata.yaml` +```yaml +peers: + upgrade: + interface: upgrade +``` + +#### Dependencies JSON/Dict + +Charm authors must implement a dict object tracking current charm versions, requirements + upgradability. + +Many workload versions may be incompatible with older/newer versions. This same idea also can apply to +charm or snap versions. Workloads with required related applications (e.g Kafka + ZooKeeper) also need to +ensure their versions are compatible during an upgrade, to avoid cluster failure. + +As such, it is necessasry to freeze any dependencies within each published charm. An example of this could +be creating a `DEPENDENCIES` dict within the charm code, with the following structure: + +`src/literals.py` +```python +DEPENDENCIES = { + "kafka_charm": { + "dependencies": {"zookeeper": ">50"}, + "name": "kafka", + "upgrade_supported": ">90", + "version": "100", + }, + "kafka_service": { + "dependencies": {"zookeeper": "^3"}, + "name": "kafka", + "upgrade_supported": ">=0.8", + "version": "3.3.2", + }, +} +``` + +The first-level key names are arbitrary labels for tracking what those versions+dependencies are for. +The `dependencies` second-level values are a key-value map of any required external applications, + and the versions this packaged charm can support. +The `upgrade_suppported` second-level values are requirements from which an in-place upgrade can be + supported by the charm. +The `version` second-level values correspond to the current version of this packaged charm. + +Any requirements comply with [`poetry`'s dependency specifications](https://python-poetry.org/docs/dependency-specification/#caret-requirements). + +### Dependency Model + +Charm authors must implement their own class inheriting from `DependencyModel`. + +Using a `Pydantic` model to instantiate the aforementioned `DEPENDENCIES` dict gives stronger type safety and additional +layers of validation. + +Implementation just needs to ensure that the top-level key names from `DEPENDENCIES` are defined as attributed in the model. + +`src/upgrade.py` +```python +from pydantic import BaseModel + +class KafkaDependenciesModel(BaseModel): + kafka_charm: DependencyModel + kafka_service: DependencyModel +``` + +### Overrides for `DataUpgrade` + +Charm authors must define their own class, inheriting from `DataUpgrade`, overriding all required `abstractmethod`s. + +```python +class ZooKeeperUpgrade(DataUpgrade): + def __init__(self, charm: "ZooKeeperUpgrade", **kwargs): + super().__init__(charm, **kwargs) + self.charm = charm +``` + +#### Implementation of `pre_upgrade_check()` + +Before upgrading a cluster, it's a good idea to check that it is stable and healthy before permitting it. +Here, charm authors can validate upgrade safety through API calls, relation-data checks, etc. +If any of these checks fail, raise `ClusterNotReadyError`. + +```python + @override + def pre_upgrade_check(self) -> None: + default_message = "Pre-upgrade check failed and cannot safely upgrade" + try: + if not self.client.members_broadcasting or not len(self.client.server_members) == len( + self.charm.cluster.peer_units + ): + raise ClusterNotReadyError( + message=default_message, + cause="Not all application units are connected and broadcasting in the quorum", + ) + + if self.client.members_syncing: + raise ClusterNotReadyError( + message=default_message, cause="Some quorum members are syncing data" + ) + + if not self.charm.cluster.stable: + raise ClusterNotReadyError( + message=default_message, cause="Charm has not finished initialising" + ) + + except QuorumLeaderNotFoundError: + raise ClusterNotReadyError(message=default_message, cause="Quorum leader not found") + except ConnectionClosedError: + raise ClusterNotReadyError( + message=default_message, cause="Unable to connect to the cluster" + ) +``` + +#### Implementation of `build_upgrade_stack()` - VM ONLY + +Oftentimes, it is necessary to ensure that the workload leader is the last unit to upgrade, +to ensure high-availability during the upgrade process. +Here, charm authors can create a LIFO stack of unit.ids, represented as a list of unit.id strings, +with the leader unit being at i[0]. + +```python +@override +def build_upgrade_stack(self) -> list[int]: + upgrade_stack = [] + for unit in self.charm.cluster.peer_units: + config = self.charm.cluster.unit_config(unit=unit) + + # upgrade quorum leader last + if config["host"] == self.client.leader: + upgrade_stack.insert(0, int(config["unit_id"])) + else: + upgrade_stack.append(int(config["unit_id"])) + + return upgrade_stack +``` + +#### Implementation of `_on_upgrade_granted()` + +On relation-changed events, each unit will check the current upgrade-stack persisted to relation data. +If that unit is at the top of the stack, it will emit an `upgrade-granted` event, which must be handled. +Here, workloads can be re-installed with new versions, checks can be made, data synced etc. +If the new unit successfully rejoined the cluster, call `set_unit_completed()`. +If the new unit failed to rejoin the cluster, call `set_unit_failed()`. + +NOTE - It is essential here to manually call `on_upgrade_changed` if the unit is the current leader. +This ensures that the leader gets it's own relation-changed event, and updates the upgrade-stack for +other units to follow suit. + +```python +@override +def _on_upgrade_granted(self, event: UpgradeGrantedEvent) -> None: + self.charm.snap.stop_snap_service() + + if not self.charm.snap.install(): + logger.error("Unable to install ZooKeeper Snap") + self.set_unit_failed() + return None + + logger.info(f"{self.charm.unit.name} upgrading service...") + self.charm.snap.restart_snap_service() + + try: + logger.debug("Running post-upgrade check...") + self.pre_upgrade_check() + + logger.debug("Marking unit completed...") + self.set_unit_completed() + + # ensures leader gets it's own relation-changed when it upgrades + if self.charm.unit.is_leader(): + logger.debug("Re-emitting upgrade-changed on leader...") + self.on_upgrade_changed(event) + + except ClusterNotReadyError as e: + logger.error(e.cause) + self.set_unit_failed() +``` + +#### Implementation of `log_rollback_instructions()` + +If the upgrade fails, manual intervention may be required for cluster recovery. +Here, charm authors can log out any necessary steps to take to recover from a failed upgrade. +When a unit fails, this library will automatically log out this message. + +```python +@override +def log_rollback_instructions(self) -> None: + logger.error("Upgrade failed. Please run `juju refresh` to previous version.") +``` + +### Instantiating in the charm and deferring events + +Charm authors must add a class attribute for the child class of `DataUpgrade` in the main charm. +They must also ensure that any non-upgrade related events that may be unsafe to handle during +an upgrade, are deferred if the unit is not in the `idle` state - i.e not currently upgrading. + +```python +class ZooKeeperCharm(CharmBase): + def __init__(self, *args): + super().__init__(*args) + self.upgrade = ZooKeeperUpgrade( + self, + relation_name = "upgrade", + substrate = "vm", + dependency_model=ZooKeeperDependencyModel( + **DEPENDENCIES + ), + ) + + def restart(self, event) -> None: + if not self.upgrade.state == "idle": + event.defer() + return None + + self.restart_snap_service() +``` +""" import json import logging from abc import ABC, abstractmethod -from typing import Iterable, List, Literal, Optional, Tuple +from typing import List, Literal, Optional, Set, Tuple from ops.charm import ( ActionEvent, @@ -27,7 +273,7 @@ UpgradeCharmEvent, ) from ops.framework import EventBase, EventSource, Object -from ops.model import Relation, Unit +from ops.model import ActiveStatus, BlockedStatus, MaintenanceStatus, Relation, Unit, WaitingStatus from pydantic import BaseModel, root_validator, validator # The unique Charmhub library identifier, never change it @@ -38,7 +284,7 @@ # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 5 +LIBPATCH = 8 PYDEPS = ["pydantic>=1.10,<2"] @@ -353,6 +599,21 @@ def __init__(self, message: str, cause: str, resolution: Optional[str] = None): super().__init__(message, cause=cause, resolution=resolution) +class KubernetesClientError(UpgradeError): + """Exception flagging that a call to Kubernetes API failed. + + For example, if the cluster fails :class:`DataUpgrade._set_rolling_update_partition` + + Args: + message: string message to be logged out + cause: short human-readable description of the cause of the error + resolution: short human-readable instructions for manual error resolution (optional) + """ + + def __init__(self, message: str, cause: str, resolution: Optional[str] = None): + super().__init__(message, cause=cause, resolution=resolution) + + class VersionError(UpgradeError): """Exception flagging that the old `version` fails to meet the new `upgrade_supported`s. @@ -388,16 +649,11 @@ def __init__(self, message: str, cause: str, resolution: Optional[str] = None): class UpgradeGrantedEvent(EventBase): - """Used to tell units that they can process an upgrade. - - Handlers of this event must meet the following: - - SHOULD check for related application deps from :class:`DataUpgrade.dependencies` - - MAY raise :class:`DependencyError` if dependency not met - - MUST update unit `state` after validating the success of the upgrade, calling one of: - - :class:`DataUpgrade.set_unit_failed` if the unit upgrade fails - - :class:`DataUpgrade.set_unit_completed` if the unit upgrade succeeds - - MUST call :class:`DataUpgarde.on_upgrade_changed` on exit so event not lost on leader - """ + """Used to tell units that they can process an upgrade.""" + + +class UpgradeFinishedEvent(EventBase): + """Used to tell units that they finished the upgrade.""" class UpgradeEvents(CharmEvents): @@ -407,15 +663,16 @@ class UpgradeEvents(CharmEvents): """ upgrade_granted = EventSource(UpgradeGrantedEvent) + upgrade_finished = EventSource(UpgradeFinishedEvent) # --- EVENT HANDLER --- class DataUpgrade(Object, ABC): - """Manages `upgrade` relation operators for in-place upgrades.""" + """Manages `upgrade` relation operations for in-place upgrades.""" - STATES = ["failed", "idle", "ready", "upgrading", "completed"] + STATES = ["recovery", "failed", "idle", "ready", "upgrading", "completed"] on = UpgradeEvents() # pyright: ignore [reportGeneralTypeIssues] @@ -442,11 +699,16 @@ def __init__( ) self.framework.observe(self.charm.on.upgrade_charm, self._on_upgrade_charm) self.framework.observe(getattr(self.on, "upgrade_granted"), self._on_upgrade_granted) + self.framework.observe(getattr(self.on, "upgrade_finished"), self._on_upgrade_finished) # actions self.framework.observe( getattr(self.charm.on, "pre_upgrade_check_action"), self._on_pre_upgrade_check_action ) + if self.substrate == "k8s": + self.framework.observe( + getattr(self.charm.on, "resume_upgrade_action"), self._on_resume_upgrade_action + ) @property def peer_relation(self) -> Optional[Relation]: @@ -454,10 +716,10 @@ def peer_relation(self) -> Optional[Relation]: return self.charm.model.get_relation(self.relation_name) @property - def app_units(self) -> Iterable[Unit]: + def app_units(self) -> Set[Unit]: """The peer-related units in the application.""" if not self.peer_relation: - return [] + return set() return set([self.charm.unit] + list(self.peer_relation.units)) @@ -517,6 +779,18 @@ def upgrade_stack(self, stack: List[int]) -> None: self.peer_relation.data[self.charm.app].update({"upgrade-stack": json.dumps(stack)}) self._upgrade_stack = stack + @property + def unit_states(self) -> list: + """Current upgrade state for all units. + + Returns: + Unsorted list of upgrade states for all units. + """ + if not self.peer_relation: + return [] + + return [self.peer_relation.data[unit].get("state", "") for unit in self.app_units] + @property def cluster_state(self) -> Optional[str]: """Current upgrade state for cluster units. @@ -529,16 +803,23 @@ def cluster_state(self) -> Optional[str]: Returns: String of upgrade state from the furthest behind unit. """ - if not self.peer_relation: + if not self.unit_states: return None - states = [self.peer_relation.data[unit].get("state", "") for unit in self.app_units] - try: - return sorted(states, key=self.STATES.index)[0] + return sorted(self.unit_states, key=self.STATES.index)[0] except (ValueError, KeyError): return None + @property + def idle(self) -> Optional[bool]: + """Flag for whether the cluster is in an idle upgrade state. + + Returns: + True if all application units in idle state. Otherwise False + """ + return self.cluster_state == "idle" + @abstractmethod def pre_upgrade_check(self) -> None: """Runs necessary checks validating the cluster is in a healthy state to upgrade. @@ -556,7 +837,7 @@ def build_upgrade_stack(self) -> List[int]: Called by leader unit during :meth:`_on_pre_upgrade_check_action`. Returns: - Iterable of integeter unit.ids, LIFO ordered in upgrade order + Iterable of integer unit.ids, LIFO ordered in upgrade order i.e `[5, 2, 4, 1, 3]`, unit `3` upgrades first, `5` upgrades last """ # don't raise if k8s substrate, uses default statefulset order @@ -573,8 +854,36 @@ def log_rollback_instructions(self) -> None: """ pass - def set_unit_failed(self) -> None: - """Sets unit `state=failed` to the upgrade peer data.""" + def _repair_upgrade_stack(self) -> None: + """Ensures completed units are re-added to the upgrade-stack after failure.""" + # need to update the stack as it was not refreshed by rollback run of pre-upgrade-check + # avoids difficult health check implementation by charm-authors needing to exclude dead units + + # if the first unit in the stack fails, the stack will be the same length as units + # i.e this block not ran + if ( + self.cluster_state in ["failed", "recovery"] + and self.upgrade_stack + and len(self.upgrade_stack) != len(self.app_units) + and self.charm.unit.is_leader() + ): + new_stack = self.upgrade_stack + for unit in self.app_units: + unit_id = int(unit.name.split("/")[1]) + + # if a unit fails, it rolls back first + if unit_id not in new_stack: + new_stack.insert(-1, unit_id) + logger.debug(f"Inserted {unit_id} in to upgrade-stack - {new_stack}") + + self.upgrade_stack = new_stack + + def set_unit_failed(self, cause: Optional[str] = None) -> None: + """Sets unit `state=failed` to the upgrade peer data. + + Args: + cause: short description of cause of failure + """ if not self.peer_relation: return None @@ -583,7 +892,9 @@ def set_unit_failed(self) -> None: if self.charm.unit.is_leader(): self._upgrade_stack = None + self.charm.unit.status = BlockedStatus(cause if cause else "") self.peer_relation.data[self.charm.unit].update({"state": "failed"}) + self.log_rollback_instructions() def set_unit_completed(self) -> None: """Sets unit `state=completed` to the upgrade peer data.""" @@ -595,8 +906,16 @@ def set_unit_completed(self) -> None: if self.charm.unit.is_leader(): self._upgrade_stack = None + self.charm.unit.status = MaintenanceStatus("upgrade completed") self.peer_relation.data[self.charm.unit].update({"state": "completed"}) + # Emit upgrade_finished event to run unit's post upgrade operations. + if self.substrate == "k8s": + logger.debug( + f"{self.charm.unit.name} has completed the upgrade, emitting `upgrade_finished` event..." + ) + getattr(self.on, "upgrade_finished").emit() + def _on_upgrade_created(self, event: RelationCreatedEvent) -> None: """Handler for `upgrade-relation-created` events.""" if not self.peer_relation: @@ -622,6 +941,13 @@ def _on_pre_upgrade_check_action(self, event: ActionEvent) -> None: event.fail(message="Action must be ran on the Juju leader.") return + if self.cluster_state == "failed": + logger.info("Entering recovery state for rolling-back to previous version...") + self._repair_upgrade_stack() + self.charm.unit.status = BlockedStatus("ready to rollback application") + self.peer_relation.data[self.charm.unit].update({"state": "recovery"}) + return + # checking if upgrade in progress if self.cluster_state != "idle": event.fail("Cannot run pre-upgrade checks, cluster already upgrading.") @@ -654,6 +980,37 @@ def _on_pre_upgrade_check_action(self, event: ActionEvent) -> None: logger.info("Setting upgrade-stack to relation data...") self.upgrade_stack = built_upgrade_stack + def _on_resume_upgrade_action(self, event: ActionEvent) -> None: + """Handle resume upgrade action. + + Continue the upgrade by setting the partition to the next unit. + """ + if not self.peer_relation: + event.fail(message="Could not find upgrade relation.") + return + + if not self.charm.unit.is_leader(): + event.fail(message="Action must be ran on the Juju leader.") + return + + if not self.upgrade_stack: + event.fail(message="Nothing to resume, upgrade stack unset.") + return + + # Check whether this is being run after juju refresh was called + # (the size of the upgrade stack should match the number of total + # unit minus one). + if len(self.upgrade_stack) != len(self.peer_relation.units): + event.fail(message="Upgrade can be resumed only once after juju refresh is called.") + return + + try: + next_partition = self.upgrade_stack[-1] + self._set_rolling_update_partition(partition=next_partition) + event.set_results({"message": f"Upgrade will resume on unit {next_partition}"}) + except KubernetesClientError: + event.fail(message="Cannot set rolling update partition.") + def _upgrade_supported_check(self) -> None: """Checks if previous versions can be upgraded to new versions. @@ -695,54 +1052,78 @@ def _on_upgrade_charm(self, event: UpgradeCharmEvent) -> None: event.defer() return - # if any other unit failed or if no stack (i.e pre-upgrade check), mark failed - if not self.upgrade_stack or self.cluster_state == "failed": - logger.error( - "Cluster upgrade failed. Setting failed upgrade state... {}".format( - "Ensure pre-upgrade checks are ran first" if not self.upgrade_stack else "" - ) - ) - self.set_unit_failed() - self.log_rollback_instructions() + if not self.upgrade_stack: + logger.error("Cluster upgrade failed, ensure pre-upgrade checks are ran first.") return - # run version checks on leader only - if self.charm.unit.is_leader(): - try: - self._upgrade_supported_check() - except VersionError as e: # not ready if not passed check - logger.error(e) - self.set_unit_failed() - return + if self.substrate == "vm": + # for VM run version checks on leader only + if self.charm.unit.is_leader(): + try: + self._upgrade_supported_check() + except VersionError as e: # not ready if not passed check + logger.error(e) + self.set_unit_failed() + return + self.charm.unit.status = WaitingStatus("other units upgrading first...") + self.peer_relation.data[self.charm.unit].update({"state": "ready"}) - # all units sets state to ready - self.peer_relation.data[self.charm.unit].update({"state": "ready"}) + else: + # for k8s run version checks only on highest ordinal unit + if ( + self.charm.unit.name + == f"{self.charm.app.name}/{self.charm.app.planned_units() -1}" + ): + try: + self._upgrade_supported_check() + except VersionError as e: # not ready if not passed check + logger.error(e) + self.set_unit_failed() + return + # On K8s an unit that receives the upgrade-charm event is upgrading + self.charm.unit.status = MaintenanceStatus("upgrading unit") + self.peer_relation.data[self.charm.unit].update({"state": "upgrading"}) def on_upgrade_changed(self, event: EventBase) -> None: """Handler for `upgrade-relation-changed` events.""" if not self.peer_relation: return - # if any other unit failed, mark as failed + # if any other unit failed, don't continue with upgrade if self.cluster_state == "failed": - logger.error("Cluster upgrade failed. Setting failed upgrade state...") - self.set_unit_failed() - self.log_rollback_instructions() + logger.debug("Cluster failed to upgrade, exiting...") + return + + if self.cluster_state == "recovery": + logger.debug("Cluster in recovery, deferring...") + event.defer() return # if all units completed, mark as complete if not self.upgrade_stack: if self.state == "completed" and self.cluster_state in ["idle", "completed"]: logger.info("All units completed upgrade, setting idle upgrade state...") + self.charm.unit.status = ActiveStatus() self.peer_relation.data[self.charm.unit].update({"state": "idle"}) + + if self.charm.unit.is_leader(): + logger.debug("Persisting new dependencies to upgrade relation data...") + self.peer_relation.data[self.charm.app].update( + {"dependencies": json.dumps(self.dependency_model.dict())} + ) return + if self.cluster_state == "idle": logger.debug("upgrade-changed event handled before pre-checks, exiting...") return - else: - logger.debug("Did not find upgrade-stack or completed cluster state, deferring...") - event.defer() - return + + logger.debug("Did not find upgrade-stack or completed cluster state, deferring...") + event.defer() + return + + # upgrade ongoing, set status for waiting units + if "upgrading" in self.unit_states and self.state in ["idle", "ready"]: + self.charm.unit.status = WaitingStatus("other units upgrading first...") # pop mutates the `upgrade_stack` attr top_unit_id = self.upgrade_stack.pop() @@ -762,14 +1143,103 @@ def on_upgrade_changed(self, event: EventBase) -> None: # in case leader is next or the last unit to complete self.on_upgrade_changed(event) - # if unit top of stack, emit granted event - if self.charm.unit == top_unit and top_state in ["ready", "upgrading"]: + # if unit top of stack and all units ready (i.e stack), emit granted event + if ( + self.charm.unit == top_unit + and top_state in ["ready", "upgrading"] + and self.cluster_state == "ready" + ): logger.debug( - f"{top_unit} is next to upgrade, emitting `upgrade_granted` event and upgrading..." + f"{top_unit.name} is next to upgrade, emitting `upgrade_granted` event and upgrading..." ) + self.charm.unit.status = MaintenanceStatus("upgrading...") self.peer_relation.data[self.charm.unit].update({"state": "upgrading"}) - getattr(self.on, "upgrade_granted").emit() - @abstractmethod + try: + getattr(self.on, "upgrade_granted").emit() + except DependencyError as e: + logger.error(e) + self.set_unit_failed() + return + def _on_upgrade_granted(self, event: UpgradeGrantedEvent) -> None: - """Handler for `upgrade-granted` events.""" + """Handler for `upgrade-granted` events. + + Handlers of this event must meet the following: + - SHOULD check for related application deps from :class:`DataUpgrade.dependencies` + - MAY raise :class:`DependencyError` if dependency not met + - MUST update unit `state` after validating the success of the upgrade, calling one of: + - :class:`DataUpgrade.set_unit_failed` if the unit upgrade fails + - :class:`DataUpgrade.set_unit_completed` if the unit upgrade succeeds + - MUST call :class:`DataUpgarde.on_upgrade_changed` on exit so event not lost on leader + """ + # don't raise if k8s substrate, only return + if self.substrate == "k8s": + return + + raise NotImplementedError + + def _on_upgrade_finished(self, _) -> None: + """Handler for `upgrade-finished` events.""" + if self.substrate == "vm" or not self.peer_relation: + return + + # Emit the upgrade relation changed event in the leader to update the upgrade_stack. + if self.charm.unit.is_leader(): + self.charm.on[self.relation_name].relation_changed.emit( + self.model.get_relation(self.relation_name) + ) + + # This hook shouldn't run for the last unit (the first that is upgraded). For that unit it + # should be done through an action after the upgrade success on that unit is double-checked. + unit_number = int(self.charm.unit.name.split("/")[1]) + if unit_number == len(self.peer_relation.units): + logger.info( + f"{self.charm.unit.name} unit upgraded. Evaluate and run `resume-upgrade` action to continue upgrade" + ) + return + + # Also, the hook shouldn't run for the first unit (the last that is upgraded). + if unit_number == 0: + logger.info(f"{self.charm.unit.name} unit upgraded. Upgrade is complete") + return + + try: + # Use the unit number instead of the upgrade stack to avoid race conditions + # (i.e. the leader updates the upgrade stack after this hook runs). + next_partition = unit_number - 1 + logger.debug(f"Set rolling update partition to unit {next_partition}") + self._set_rolling_update_partition(partition=next_partition) + except KubernetesClientError: + logger.exception("Cannot set rolling update partition") + self.set_unit_failed() + self.log_rollback_instructions() + + def _set_rolling_update_partition(self, partition: int) -> None: + """Patch the StatefulSet's `spec.updateStrategy.rollingUpdate.partition`. + + Args: + partition: partition to set. + + K8s only. It should decrement the rolling update strategy partition by using a code + like the following: + + from lightkube.core.client import Client + from lightkube.core.exceptions import ApiError + from lightkube.resources.apps_v1 import StatefulSet + + try: + patch = {"spec": {"updateStrategy": {"rollingUpdate": {"partition": partition}}}} + Client().patch(StatefulSet, name=self.charm.model.app.name, namespace=self.charm.model.name, obj=patch) + logger.debug(f"Kubernetes StatefulSet partition set to {partition}") + except ApiError as e: + if e.status.code == 403: + cause = "`juju trust` needed" + else: + cause = str(e) + raise KubernetesClientError("Kubernetes StatefulSet patch failed", cause) + """ + if self.substrate == "vm": + return + + raise NotImplementedError diff --git a/src/literals.py b/src/literals.py index 4bd33d80..e5c697cd 100644 --- a/src/literals.py +++ b/src/literals.py @@ -101,10 +101,10 @@ class Status(Enum): DEPENDENCIES = { - "service": { - "dependencies": {"zookeeper": ">3"}, + "kafka_service": { + "dependencies": {"zookeeper": "~3.6"}, "name": "kafka", "upgrade_supported": ">3", - "version": "3.3", + "version": "3.3.2", }, } diff --git a/src/upgrade.py b/src/upgrade.py index 02283855..c91033a4 100644 --- a/src/upgrade.py +++ b/src/upgrade.py @@ -32,7 +32,7 @@ class KafkaDependencyModel(BaseModel): """Model for Kafka Operator dependencies.""" - service: DependencyModel + kafka_service: DependencyModel class KafkaUpgrade(DataUpgrade): @@ -42,19 +42,10 @@ def __init__(self, charm: "KafkaCharm", **kwargs): super().__init__(charm, **kwargs) self.charm = charm - @property - def idle(self) -> bool: - """Checks if cluster state is idle. - - Returns: - True if cluster state is idle. Otherwise False - """ - return self.cluster_state == "idle" - @property def current_version(self) -> str: """Get current Kafka version.""" - dependency_model: DependencyModel = getattr(self.dependency_model, "service") + dependency_model: DependencyModel = getattr(self.dependency_model, "kafka_service") return dependency_model.version @property @@ -87,12 +78,16 @@ def log_rollback_instructions(self) -> None: @override def _on_upgrade_granted(self, event: UpgradeGrantedEvent) -> None: - dependency_model: DependencyModel = getattr(self.dependency_model, "service") + dependency_model: DependencyModel = getattr(self.dependency_model, "kafka_service") if not verify_requirements( version=self.zookeeper_current_version, requirement=dependency_model.dependencies["zookeeper"], ): - logger.error("ZooKeeper requirement not met") + logger.error( + "Current ZooKeeper version %s does not meet requirement %s", + self.zookeeper_current_version, + dependency_model.dependencies["zookeeper"], + ) self.set_unit_failed() return From 11687f757a8ecfa8a671a32a1bb9952e48d817b5 Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Wed, 16 Aug 2023 11:27:53 +0200 Subject: [PATCH 09/13] update upgrade lib --- lib/charms/data_platform_libs/v0/upgrade.py | 27 ++++++++++----------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/lib/charms/data_platform_libs/v0/upgrade.py b/lib/charms/data_platform_libs/v0/upgrade.py index 4d528d05..37ae4cc6 100644 --- a/lib/charms/data_platform_libs/v0/upgrade.py +++ b/lib/charms/data_platform_libs/v0/upgrade.py @@ -284,7 +284,7 @@ def restart(self, event) -> None: # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 8 +LIBPATCH = 9 PYDEPS = ["pydantic>=1.10,<2"] @@ -323,27 +323,26 @@ def verify_caret_requirements(version: str, requirement: str) -> bool: sem_version = build_complete_sem_ver(version) sem_requirement = build_complete_sem_ver(requirement) - # caret uses first non-zero character, not enough to just count '. - max_version_index = requirement.count(".") - for i, semver in enumerate(sem_requirement): - if semver != 0: - max_version_index = i - break + # caret uses first non-zero character, not enough to just count '.' + if sem_requirement[0] == 0: + max_version_index = requirement.count(".") + for i, semver in enumerate(sem_requirement): + if semver != 0: + max_version_index = i + break + else: + max_version_index = 0 for i in range(3): # version higher than first non-zero - if (i < max_version_index) and (sem_version[i] > sem_requirement[i]): + if (i <= max_version_index) and (sem_version[i] != sem_requirement[i]): return False # version either higher or lower than first non-zero - if (i == max_version_index) and (sem_version[i] != sem_requirement[i]): + if (i > max_version_index) and (sem_version[i] < sem_requirement[i]): return False - # valid - if (i > max_version_index) and (sem_version[i] > sem_requirement[i]): - return True - - return False + return True def verify_tilde_requirements(version: str, requirement: str) -> bool: From 4d8172b9940de5bd038aa7ccaa0d61a02c02ad98 Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Fri, 18 Aug 2023 15:23:07 +0200 Subject: [PATCH 10/13] fix versioning config string --- lib/charms/data_platform_libs/v0/upgrade.py | 4 ++-- src/config.py | 14 ++++++++++++- tests/unit/test_config.py | 22 +++++++++++++++++++++ 3 files changed, 37 insertions(+), 3 deletions(-) diff --git a/lib/charms/data_platform_libs/v0/upgrade.py b/lib/charms/data_platform_libs/v0/upgrade.py index 37ae4cc6..0db6f63b 100644 --- a/lib/charms/data_platform_libs/v0/upgrade.py +++ b/lib/charms/data_platform_libs/v0/upgrade.py @@ -284,7 +284,7 @@ def restart(self, event) -> None: # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 9 +LIBPATCH = 10 PYDEPS = ["pydantic>=1.10,<2"] @@ -817,7 +817,7 @@ def idle(self) -> Optional[bool]: Returns: True if all application units in idle state. Otherwise False """ - return self.cluster_state == "idle" + return set(self.unit_states) == {"idle"} @abstractmethod def pre_upgrade_check(self) -> None: diff --git a/src/config.py b/src/config.py index 395f26b5..8375d918 100644 --- a/src/config.py +++ b/src/config.py @@ -453,6 +453,18 @@ def log_dirs(self) -> str: [os.fspath(storage.location) for storage in self.charm.model.storages["data"]] ) + @property + def inter_broker_protocol_version(self) -> str: + """Creates the protocol version from the kafka version. + + Returns: + string with the `major.minor` version + """ + # Remove patch number from full vervion. + major_minor = self.charm.upgrade.current_version.split(".", maxsplit=2) + major_minor.pop() + return ".".join(major_minor) + @property def rack_properties(self) -> List[str]: """Builds all properties related to rack awareness configuration. @@ -512,7 +524,7 @@ def server_properties(self) -> List[str]: f"listeners={','.join(listeners_repr)}", f"advertised.listeners={','.join(advertised_listeners)}", f"inter.broker.listener.name={self.internal_listener.name}", - f"inter.broker.protocol.version={self.charm.upgrade.current_version}", + f"inter.broker.protocol.version={self.inter_broker_protocol_version}", ] + self.config_properties + self.scram_properties diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 3a95d01e..799ddc0a 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -13,6 +13,7 @@ from literals import ( ADMIN_USER, CHARM_KEY, + DEPENDENCIES, INTER_BROKER_USER, INTERNAL_USERS, JMX_EXPORTER_PORT, @@ -402,6 +403,27 @@ def test_rack_properties(harness: Harness): assert "broker.rack=gondor-west" in harness.charm.kafka_config.server_properties +def test_inter_broker_protocol_version(harness): + """Checks that rack properties are added to server properties.""" + harness.add_relation(PEER, CHARM_KEY) + zk_relation_id = harness.add_relation(ZK, CHARM_KEY) + harness.update_relation_data( + zk_relation_id, + harness.charm.app.name, + { + "chroot": "/kafka", + "username": "moria", + "password": "mellon", + "endpoints": "1.1.1.1,2.2.2.2", + "uris": "1.1.1.1:2181/kafka,2.2.2.2:2181/kafka", + "tls": "disabled", + }, + ) + assert len(DEPENDENCIES["kafka_service"]["version"].split(".")) == 3 + + assert f"inter.broker.protocol.version=3.3" in harness.charm.kafka_config.server_properties + + def test_super_users(harness): """Checks super-users property is updated for new admin clients.""" peer_relation_id = harness.add_relation(PEER, CHARM_KEY) From 14153e800862aa8aed50b7678e591196125fd32d Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Fri, 18 Aug 2023 15:27:20 +0200 Subject: [PATCH 11/13] fix lint --- tests/unit/test_config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 799ddc0a..6848ebba 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -421,7 +421,7 @@ def test_inter_broker_protocol_version(harness): ) assert len(DEPENDENCIES["kafka_service"]["version"].split(".")) == 3 - assert f"inter.broker.protocol.version=3.3" in harness.charm.kafka_config.server_properties + assert "inter.broker.protocol.version=3.3" in harness.charm.kafka_config.server_properties def test_super_users(harness): From d8a3b77f7151da112f7031b99638857c8c1f7852 Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Fri, 18 Aug 2023 17:02:13 +0200 Subject: [PATCH 12/13] fix docstring --- src/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/config.py b/src/config.py index 8375d918..a3904ff7 100644 --- a/src/config.py +++ b/src/config.py @@ -458,7 +458,7 @@ def inter_broker_protocol_version(self) -> str: """Creates the protocol version from the kafka version. Returns: - string with the `major.minor` version + String with the `major.minor` version """ # Remove patch number from full vervion. major_minor = self.charm.upgrade.current_version.split(".", maxsplit=2) From 82efb8fb651a7b21c0dd5909bd663d818049e1cf Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Mon, 21 Aug 2023 09:25:45 +0200 Subject: [PATCH 13/13] improve version handling --- src/config.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/config.py b/src/config.py index a3904ff7..cd9dd21b 100644 --- a/src/config.py +++ b/src/config.py @@ -462,8 +462,7 @@ def inter_broker_protocol_version(self) -> str: """ # Remove patch number from full vervion. major_minor = self.charm.upgrade.current_version.split(".", maxsplit=2) - major_minor.pop() - return ".".join(major_minor) + return ".".join(major_minor[:2]) @property def rack_properties(self) -> List[str]: