Skip to content

Commit

Permalink
Add typing #2
Browse files Browse the repository at this point in the history
  • Loading branch information
András Váczi committed Jan 8, 2025
1 parent f0ac10e commit 333eb3b
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 30 deletions.
3 changes: 2 additions & 1 deletion patroni/ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ def acquire_lock(self) -> bool:
multisite_ret = self.patroni.multisite.resolve_leader()
if multisite_ret:
logger.error("Releasing leader lock because multi site status is: %s", multisite_ret)
self.dcs.delete_leader()
self.dcs.delete_leader(None, None)
return False
return ret

Expand Down Expand Up @@ -1599,6 +1599,7 @@ def before_shutdown() -> None:
with self._async_executor:
self.release_leader_key_voluntarily(checkpoint_location)
time.sleep(2) # Give a time to somebody to take the leader lock
# FIXME: multisite.on_shutdown() was already called above with _state_handler.stop(), do we really need it here?
if mode == 'multisite':
self.patroni.multisite.on_shutdown(self.state_handler.latest_checkpoint_location())

Check failure on line 1604 in patroni/ha.py

View workflow job for this annotation

GitHub Actions / pyright

Argument missing for parameter "prev_location" (reportCallIssue)
if mode_control['offline']:
Expand Down
65 changes: 36 additions & 29 deletions patroni/multisite.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
import logging
import time

from datetime import datetime
from collections.abc import Callable
from datetime import datetime, UTC
from threading import Event, Thread
from typing import List, Optional
from typing import Any, Dict, List, Optional, TYPE_CHECKING

import six

Expand All @@ -15,6 +16,10 @@
from .dcs.kubernetes import catch_kubernetes_errors
from .exceptions import DCSError

if TYPE_CHECKING: # pragma: no cover
from .config import Config
from .dcs import Cluster

logger = logging.getLogger(__name__)


Expand All @@ -24,19 +29,20 @@ class AbstractSiteController(object):
is_active = False

dcs: AbstractDCS
_has_leader: bool

def start(self):
pass

def shutdown(self):
pass

def get_active_standby_config(self) -> Optional[dict]:
def get_active_standby_config(self) -> Dict[str, Any]:
"""Returns currently active configuration for standby leader"""
return {}

def is_leader_site(self):
return self.get_active_standby_config() is None
def is_leader_site(self) -> bool:
return self.get_active_standby_config() == {}

def resolve_leader(self) -> Optional[str]:
"""Try to become leader, update active config correspondingly.
Expand All @@ -54,13 +60,13 @@ def heartbeat(self):
def release(self):
pass

def status(self) -> dict:
def status(self) -> Dict[str, Any]:
return {}

def should_failover(self) -> bool:
return False

def on_shutdown(self, checkpoint_location):
def on_shutdown(self, checkpoint_location: int, prev_location: int):
pass

def append_metrics(self, metrics: List[str], labels: str) -> None:
Expand All @@ -76,7 +82,7 @@ def status(self):
class MultisiteController(Thread, AbstractSiteController):
is_active = True

def __init__(self, config, on_change=None):
def __init__(self, config: 'Config', on_change: Callable[[], None]):
super().__init__()
self.stop_requested = False
self.on_change = on_change
Expand Down Expand Up @@ -116,21 +122,21 @@ def __init__(self, config, on_change=None):
self.switchover_timeout = msconfig.get('switchover_timeout', 300)

self._heartbeat = Event()
self._standby_config = None
self._standby_config = {}
self._leader_resolved = Event()
self._has_leader = False
self._release = False
self._status = None
self._failover_target = None
self._failover_timeout = None
self._failover_timeout = 0

self.site_switches = None

self._dcs_error = None

def status(self):
return {
"status": "Leader" if self._has_leader or self._standby_config is None else "Standby",
"status": "Leader" if self._has_leader or self._standby_config == {} else "Standby",
"active": True,
"name": self.name,
"standby_config": self.get_active_standby_config(),
Expand Down Expand Up @@ -167,7 +173,7 @@ def release(self):
def should_failover(self):
return self._failover_target is not None and self._failover_target != self.name

def on_shutdown(self, checkpoint_location):
def on_shutdown(self, checkpoint_location: int, prev_location: int):
""" Called when shutdown for multisite failover has completed.
"""
# TODO: check if we replicated everything to standby site
Expand All @@ -193,12 +199,11 @@ def _set_standby_config(self, other: Member):
logger.info(f"Setting standby configuration to: {self._standby_config}")
return old_conf != self._standby_config

def _check_transition(self, leader, note=None):
def _check_transition(self, leader: bool, note: str):
if self._has_leader != leader:
logger.info("Multisite state transition")
self._has_leader = leader
if self.on_change:
self.on_change()
self.on_change()
if self._state_updater and self._status != leader:
self._state_updater.state_transition('Leader' if leader else 'Standby', note)
self._status = leader
Expand All @@ -225,7 +230,7 @@ def _resolve_multisite_leader(self):
# Became leader of unlocked cluster
if self.dcs.attempt_to_acquire_leader():
logger.info("Became multisite leader")
self._standby_config = None
self._standby_config = {}
self._check_transition(leader=True, note="Acquired multisite leader status")
if cluster.failover and cluster.failover.target_site and cluster.failover.target_site == self.name:
logger.info("Cleaning up multisite failover key after acquiring leader status")
Expand Down Expand Up @@ -256,7 +261,7 @@ def _resolve_multisite_leader(self):
if self.dcs.update_leader(cluster, None):
logger.info("Updated multisite leader lease")
# Make sure we are disabled from standby mode
self._standby_config = None
self._standby_config = {}
self._check_transition(leader=True, note="Already have multisite leader status")
self._check_for_failover(cluster)
else:
Expand All @@ -270,8 +275,8 @@ def _resolve_multisite_leader(self):
# Failover successful or someone else took over
if self._failover_target is not None:
self._failover_target = None
self._failover_timeout = None
if self._set_standby_config(cluster.leader.member):
self._failover_timeout = 0
if cluster.leader and self._set_standby_config(cluster.leader.member):
# Wake up anyway to notice that we need to replicate from new leader. For the other case
# _check_transition() handles the wake.
if not self._has_leader:
Expand Down Expand Up @@ -313,15 +318,15 @@ def _observe_leader(self):
# The leader is us
if lock_owner == self.name:
logger.info("Multisite leader is us")
self._standby_config = None
self._standby_config = {}
else:
logger.info(f"Multisite leader is {lock_owner}")
self._set_standby_config(cluster.leader.member)
self._set_standby_config(cluster.leader.member) # pyright: ignore
except DCSError as e:
# On replicas we need to know the multisite status only for rewinding.
logger.warning(f"Error accessing multisite DCS: {e}")

def _update_history(self, cluster):
def _update_history(self, cluster: 'Cluster'):
if cluster.history and cluster.history.lines and isinstance(cluster.history.lines[0], dict):
self.site_switches = cluster.history.lines[0].get('switches')

Check failure on line 331 in patroni/multisite.py

View workflow job for this annotation

GitHub Actions / pyright

Type of "site_switches" is partially unknown   Type of "site_switches" is "Unknown | None" (reportUnknownMemberType)

Check failure on line 331 in patroni/multisite.py

View workflow job for this annotation

GitHub Actions / pyright

Type of "get" is partially unknown   Type of "get" is "Overload[(key: Unknown, /) -> (Unknown | None), (key: Unknown, default: Unknown, /) -> Unknown, (key: Unknown, default: _T@get, /) -> (Unknown | _T@get)]" (reportUnknownMemberType)

Expand Down Expand Up @@ -349,7 +354,7 @@ def _check_for_failover(self, cluster: Cluster):
self._failover_target = cluster.failover.target_site
else:
self._failover_target = None
self._failover_timeout = None
self._failover_timeout = 0

def touch_member(self):
data = {
Expand Down Expand Up @@ -379,14 +384,14 @@ def shutdown(self):
self._heartbeat.set()
self.join()

def append_metrics(self, metrics, labels):
def append_metrics(self, metrics: List[str], labels: str):
metrics.append("# HELP patroni_multisite_switches Number of times multisite leader has been switched")
metrics.append("# TYPE patroni_multisite_switches counter")
metrics.append("patroni_multisite_switches{0} {1}".format(labels, self.site_switches))


class KubernetesStateManagement:
def __init__(self, crd_name, crd_uid, reporter, crd_api):
def __init__(self, crd_name: str, crd_uid: str, reporter: str, crd_api: str):
self.crd_namespace, self.crd_name = (['default'] + crd_name.rsplit('.', 1))[-2:]
self.crd_uid = crd_uid
self.reporter = reporter
Expand All @@ -402,13 +407,15 @@ def __init__(self, crd_name, crd_uid, reporter, crd_api):
self._status_update = None
self._event_obj = None

def state_transition(self, new_state, note):
def state_transition(self, new_state: str, note: str):
self._status_update = {"status": {"Multisite": new_state}}

failover_time = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
failover_time = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
reason = 'Promote' if new_state == 'Leader' else 'Demote'
if note is None:
note = 'Acquired multisite leader' if new_state == 'Leader' else 'Became a standby cluster'

# TODO: check if this is needed, no current call comes without note (this is already reflected in the signature)
# if note is None:
# note = 'Acquired multisite leader' if new_state == 'Leader' else 'Became a standby cluster'

self._event_obj = kubernetes.client.EventsV1Event(
action='Failover',
Expand Down

0 comments on commit 333eb3b

Please sign in to comment.