Skip to content

Commit

Permalink
[DPE-5218] Implement restore flow (#108)
Browse files Browse the repository at this point in the history
  • Loading branch information
Batalex authored Oct 7, 2024
1 parent cc8d076 commit 75eb877
Show file tree
Hide file tree
Showing 18 changed files with 456 additions and 30 deletions.
7 changes: 7 additions & 0 deletions actions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,10 @@ create-backup:

list-backups:
description: List database backups. S3 credentials are retrieved from a relation with the S3 integrator charm.

restore:
description: Restore a database backup. S3 credentials are retrieved from a relation with the S3 integrator charm.
params:
backup-id:
type: string
description: A backup-id to identify the backup to restore. Format of <%Y-%m-%dT%H:%M:%SZ>
16 changes: 15 additions & 1 deletion src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def _on_install(self, event: InstallEvent) -> None:

self.unit.set_workload_version(self.workload.get_version())

def _on_cluster_relation_changed(self, event: EventBase) -> None:
def _on_cluster_relation_changed(self, event: EventBase) -> None: # noqa: C901
"""Generic handler for all 'something changed, update' events across all relations."""
# NOTE: k8s specific check, the container needs to be available before moving on
if not self.workload.container_can_connect:
Expand All @@ -201,6 +201,11 @@ def _on_cluster_relation_changed(self, event: EventBase) -> None:
self._set_status(Status.NO_PEER_RELATION)
return

if self.state.cluster.is_restore_in_progress:
# Ongoing backup restore, we can early return here since the
# chain of events is only relevant to the backup event handler
return

# don't want to prematurely set config using outdated/missing relation data
# also skip update-status overriding statues during upgrades
if not self.upgrade_events.idle:
Expand Down Expand Up @@ -354,6 +359,7 @@ def init_server(self):
logger.debug("setting properties and jaas")
self.config_manager.set_zookeeper_properties()
self.config_manager.set_jaas_config()
self.config_manager.set_client_jaas_config()

# during pod-reschedules (e.g upgrades or otherwise) we lose all files
# need to manually add-back key/truststores
Expand Down Expand Up @@ -438,6 +444,14 @@ def update_quorum(self, event: EventBase) -> None:

self.update_client_data()

def disconnect_clients(self) -> None:
"""Remove a necessary part of the client databag, acting as a logical disconnect."""
if not self.unit.is_leader():
return

for client in self.state.clients:
client.update({"endpoints": ""})

def update_client_data(self) -> None:
"""Writes necessary relation data to all related applications."""
if not self.unit.is_leader():
Expand Down
9 changes: 9 additions & 0 deletions src/core/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,9 @@ def stable(self) -> Status:
if self.stale_quorum:
return Status.STALE_QUORUM

if self.cluster.is_restore_in_progress:
return Status.ONGOING_RESTORE

if not self.all_servers_added:
return Status.NOT_ALL_ADDED

Expand All @@ -372,3 +375,9 @@ def ready(self) -> Status:
return Status.ALL_UNIFIED

return self.stable

@property
def is_next_restore_step_possible(self) -> bool:
"""Are all units done with the current restore instruction?"""
current_instruction = self.cluster.restore_instruction
return all((unit.restore_progress is current_instruction for unit in self.servers))
22 changes: 21 additions & 1 deletion src/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from ops.model import Application, Relation, Unit
from typing_extensions import deprecated, override

from core.stubs import S3ConnectionInfo
from core.stubs import RestoreStep, S3ConnectionInfo
from literals import CHARM_USERS, CLIENT_PORT, ELECTION_PORT, SECRETS_APP, SERVER_PORT

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -289,6 +289,21 @@ def s3_credentials(self) -> S3ConnectionInfo:
# This is checked in events.backup actions
return json.loads(self.relation_data.get("s3-credentials", "{}"))

@property
def id_to_restore(self) -> str:
"""Backup id to restore."""
return self.relation_data.get("id-to-restore", "")

@property
def restore_instruction(self) -> RestoreStep:
"""Current restore flow step to go through."""
return RestoreStep(self.relation_data.get("restore-instruction", ""))

@property
def is_restore_in_progress(self) -> bool:
"""Is the cluster undergoing a restore?"""
return bool(self.id_to_restore)


class ZKServer(RelationState):
"""State collection metadata for a charm unit."""
Expand Down Expand Up @@ -440,3 +455,8 @@ def sans(self) -> dict[str, list[str]]:
"sans_ip": [self.ip],
"sans_dns": [self.hostname, self.fqdn],
}

@property
def restore_progress(self) -> RestoreStep:
"""Latest restore flow step the unit went through."""
return RestoreStep(self.relation_data.get("restore-progress", ""))
25 changes: 25 additions & 0 deletions src/core/stubs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# See LICENSE file for licensing details.

"""Types module."""
from enum import Enum
from typing import TypedDict

S3ConnectionInfo = TypedDict(
Expand All @@ -19,3 +20,27 @@


BackupMetadata = TypedDict("BackupMetadata", {"id": str, "log-sequence-number": int, "path": str})


class RestoreStep(str, Enum):
"""Represent restore flow step."""

NOT_STARTED = ""
STOP_WORKFLOW = "stop"
RESTORE = "restore"
RESTART = "restart"
CLEAN = "clean"

def next_step(self) -> "RestoreStep":
"""Get the next logical restore flow step."""
match self:
case RestoreStep.NOT_STARTED:
return RestoreStep.STOP_WORKFLOW
case RestoreStep.STOP_WORKFLOW:
return RestoreStep.RESTORE
case RestoreStep.RESTORE:
return RestoreStep.RESTART
case RestoreStep.RESTART:
return RestoreStep.CLEAN
case RestoreStep.CLEAN:
return RestoreStep.NOT_STARTED
8 changes: 8 additions & 0 deletions src/core/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ def jaas(self) -> str:
"""
return f"{self.conf_path}/zookeeper-jaas.cfg"

@property
def client_jaas(self) -> str:
"""The client-jaas.cfg filepath.
Contains internal user credentials used in SASL auth.
"""
return f"{self.conf_path}/client-jaas.cfg"

@property
def jmx_prometheus_javaagent(self) -> str:
"""The JMX exporter JAR filepath.
Expand Down
141 changes: 135 additions & 6 deletions src/events/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@
CredentialsGoneEvent,
S3Requirer,
)
from ops import ActionEvent
from ops import (
ActionEvent,
RelationEvent,
)
from ops.framework import Object
from tenacity import retry, retry_if_result, stop_after_attempt, wait_fixed

from core.stubs import S3ConnectionInfo
from core.stubs import RestoreStep, S3ConnectionInfo
from literals import S3_BACKUPS_PATH, S3_REL_NAME, Status
from managers.backup import BackupManager

Expand All @@ -41,7 +45,11 @@ def __init__(self, charm):

self.framework.observe(self.charm.on.create_backup_action, self._on_create_backup_action)
self.framework.observe(self.charm.on.list_backups_action, self._on_list_backups_action)
# self.framework.observe(self.charm.on.restore_action, self._on_restore_action)
self.framework.observe(self.charm.on.restore_action, self._on_restore_action)

self.framework.observe(
getattr(self.charm.on, "cluster_relation_changed"), self._restore_event_dispatch
)

def _on_s3_credentials_changed(self, event: CredentialsChangedEvent):
if not self.charm.unit.is_leader():
Expand Down Expand Up @@ -137,6 +145,127 @@ def _on_list_backups_action(self, event: ActionEvent):
event.log(output)
event.set_results({"backups": json.dumps(backups_metadata)})

def _on_restore_action(self, _):
# TODO
pass
def _on_restore_action(self, event: ActionEvent):
"""Restore a snapshot referenced by its id.
Steps:
- stop client traffic
- stop all units
- backup local state so that we can rollback if anything goes wrong (manual op)
- wipe data folders
- get snapshot from object storage, save in data folder
- restart units
- cleanup leftover files
- notify clients
"""
id_to_restore = event.params.get("backup-id", "")
failure_conditions = [
(
lambda: not self.charm.unit.is_leader(),
"Action must be ran on the application leader",
),
(
lambda: not self.charm.state.cluster.s3_credentials,
"Cluster needs an access to an object storage to make a backup",
),
(
lambda: not id_to_restore,
"No backup id to restore provided",
),
(
lambda: not self.backup_manager.is_snapshot_in_bucket(id_to_restore),
"Backup id not found in storage object",
),
(
lambda: bool(self.charm.state.cluster.is_restore_in_progress),
"A snapshot restore is currently ongoing",
),
]

for check, msg in failure_conditions:
if check():
logging.error(msg)
event.set_results({"error": msg})
event.fail(msg)
return

self.charm.state.cluster.update(
{
"id-to-restore": id_to_restore,
"restore-instruction": RestoreStep.NOT_STARTED.value,
}
)
self.charm.disconnect_clients()

event.log(f"Beginning restore flow for snapshot {id_to_restore}")

def _restore_event_dispatch(self, event: RelationEvent):
"""Dispatch restore event to the proper method."""
if not self.charm.state.cluster.is_restore_in_progress:
if self.charm.state.unit_server.restore_progress is not RestoreStep.NOT_STARTED:
self.charm.state.unit_server.update(
{"restore-progress": RestoreStep.NOT_STARTED.value}
)
self.charm._set_status(self.charm.state.ready)
return

if self.charm.unit.is_leader():
self._maybe_progress_step()

match self.charm.state.cluster.restore_instruction, self.charm.state.unit_server.restore_progress:
case RestoreStep.STOP_WORKFLOW, RestoreStep.NOT_STARTED:
self._stop_workflow()
case RestoreStep.RESTORE, RestoreStep.STOP_WORKFLOW:
self._download_and_restore()
case RestoreStep.RESTART, RestoreStep.RESTORE:
self._restart_workflow()
case RestoreStep.CLEAN, RestoreStep.RESTART:
self._cleaning()
case _:
pass

def _maybe_progress_step(self):
"""Check that all units are done with the current instruction and move to the next if applicable."""
current_instruction = self.charm.state.cluster.restore_instruction
next_instruction = current_instruction.next_step()

if self.charm.state.is_next_restore_step_possible:
payload = {"restore-instruction": next_instruction.value}
if current_instruction is RestoreStep.CLEAN:
payload = payload | {"id-to-restore": "", "to_restore": ""}
# Update ACLs for already related clients and trigger a relation-changed
# on their side to enable them to reconnect.
self.charm.update_client_data()
self.charm.quorum_manager.update_acls()

self.charm.state.cluster.update(payload)

def _stop_workflow(self) -> None:
self.charm._set_status(Status.ONGOING_RESTORE)
logger.info("Restoring - stopping workflow")
self.charm.workload.stop()
self.charm.state.unit_server.update({"restore-progress": RestoreStep.STOP_WORKFLOW.value})

def _download_and_restore(self) -> None:
logger.info("Restoring - restore snapshot")
self.backup_manager.restore_snapshot(
self.charm.state.cluster.id_to_restore, self.charm.workload
)
self.charm.state.unit_server.update({"restore-progress": RestoreStep.RESTORE.value})

def _restart_workflow(self) -> None:
logger.info("Restoring - restarting workflow")
self.charm.workload.restart()
self.charm.state.unit_server.update({"restore-progress": RestoreStep.RESTART.value})

@retry(
wait=wait_fixed(5),
stop=stop_after_attempt(3),
retry=retry_if_result(lambda res: res is False),
)
def _cleaning(self) -> bool | None:
if not self.charm.workload.healthy:
return False
logger.info("Restoring - cleaning files")
self.backup_manager.cleanup_leftover_files(self.charm.workload)
self.charm.state.unit_server.update({"restore-progress": RestoreStep.CLEAN.value})
2 changes: 1 addition & 1 deletion src/events/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def _on_client_relation_updated(self, event: RelationEvent) -> None:
Future `client_relation_changed` events called on non-leader units checks passwords before
restarting.
"""
if not self.charm.unit.is_leader():
if not self.charm.unit.is_leader() or self.charm.state.cluster.is_restore_in_progress:
return

if not self.charm.state.stable:
Expand Down
1 change: 1 addition & 0 deletions src/literals.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class Status(Enum):
BlockedStatus("invalid s3 configuration - missing mandatory parameters"), "ERROR"
)
BUCKET_NOT_CREATED = StatusLevel(BlockedStatus("cannot create s3 bucket"), "ERROR")
ONGOING_RESTORE = StatusLevel(MaintenanceStatus("restoring backup"), "INFO")


SECRETS_APP = ["sync-password", "super-password"]
Expand Down
Loading

0 comments on commit 75eb877

Please sign in to comment.