diff --git a/fork.rst b/fork.rst new file mode 100644 index 000000000..4e3a85308 --- /dev/null +++ b/fork.rst @@ -0,0 +1,12 @@ +.. _fork: + +Timescale patroni fork +====================== + + +TS_1 +----- + +- Add ``xlog_cache_ttl`` parameter for the Kubernetes DCS + + Setting it to multiple of ``loop_wait`` prevents frequent pod updates, reducing the load on the API server, at the expense of stale value of the xlog position in the member's metadata. diff --git a/patroni/config.py b/patroni/config.py index 662047f87..25d57d026 100644 --- a/patroni/config.py +++ b/patroni/config.py @@ -656,7 +656,7 @@ def _get_auth(name: str, params: Collection[str] = _AUTH_ALLOWED_PARAMETERS[:2]) 'SERVICE_TAGS', 'NAMESPACE', 'CONTEXT', 'USE_ENDPOINTS', 'SCOPE_LABEL', 'ROLE_LABEL', 'POD_IP', 'PORTS', 'LABELS', 'BYPASS_API_SERVICE', 'RETRIABLE_HTTP_CODES', 'KEY_PASSWORD', 'USE_SSL', 'SET_ACLS', 'GROUP', 'DATABASE', 'LEADER_LABEL_VALUE', 'FOLLOWER_LABEL_VALUE', - 'STANDBY_LEADER_LABEL_VALUE', 'TMP_ROLE_LABEL', 'AUTH_DATA') and name: + 'STANDBY_LEADER_LABEL_VALUE', 'TMP_ROLE_LABEL', 'AUTH_DATA', 'XLOG_CACHE_TTL') and name: value = os.environ.pop(param) if name == 'CITUS': if suffix == 'GROUP': @@ -665,6 +665,8 @@ def _get_auth(name: str, params: Collection[str] = _AUTH_ALLOWED_PARAMETERS[:2]) continue elif suffix == 'PORT': value = value and parse_int(value) + elif suffix == 'XLOG_CACHE_TTL': + value = value and parse_int(value, 's') elif suffix in ('HOSTS', 'PORTS', 'CHECKS', 'SERVICE_TAGS', 'RETRIABLE_HTTP_CODES'): value = value and _parse_list(value) elif suffix in ('LABELS', 'SET_ACLS', 'AUTH_DATA'): diff --git a/patroni/dcs/kubernetes.py b/patroni/dcs/kubernetes.py index 180b02164..22f4bc85e 100644 --- a/patroni/dcs/kubernetes.py +++ b/patroni/dcs/kubernetes.py @@ -852,6 +852,8 @@ def reload_config(self, config: Union['Config', Dict[str, Any]]) -> None: # cache xlog location for the member, preventing pod update when xlog location is the only update for the pod self._xlog_cache_ttl = parse_int(kconfig.get('xlog_cache_ttl', '0'), 's') or 0 + if self._xlog_cache_ttl is not None: + logger.debug("set xlog_cache_ttl to %d", self._xlog_cache_ttl) @staticmethod def member(pod: K8sObject) -> Member: @@ -1341,7 +1343,8 @@ def touch_member(self, data: Dict[str, Any]) -> bool: replaced_xlog_location: Optional[str] = data.get('xlog_location', None) cached_xlog_location, last_updated = self._get_cached_xlog_location() - if last_updated is not None and last_updated + self._xlog_cache_ttl > time.time(): + now = time.time() + if last_updated is not None and last_updated + self._xlog_cache_ttl > now: if cached_xlog_location is not None and replaced_xlog_location is not None: data['xlog_location'] = cached_xlog_location elif replaced_xlog_location is not None: @@ -1364,6 +1367,10 @@ def touch_member(self, data: Dict[str, Any]) -> bool: ret = self._api.patch_namespaced_pod(self._name, self._namespace, body) if ret: self._pods.set(self._name, ret) + elif cached_xlog_location != replaced_xlog_location and last_updated is not None: + logger.debug("prevented pod update, keeping cached xlog value for up to %d seconds", + (last_updated + self._xlog_cache_ttl - now)) + if self._should_create_config_service: self._create_config_service() return bool(ret) diff --git a/patroni/validator.py b/patroni/validator.py index a3bf53eb9..58e000d2f 100644 --- a/patroni/validator.py +++ b/patroni/validator.py @@ -1124,7 +1124,7 @@ def validate_watchdog_mode(value: Any) -> None: Optional("ports"): [{"name": str, "port": IntValidator(max=65535, expected_type=int, raise_assert=True)}], Optional("cacert"): str, Optional("retriable_http_codes"): Or(int, [int]), - Optional('xlog_cache_ttl'): IntValidator(min=0, max=1200, base_unit='s', raise_assert=True) + Optional('xlog_cache_ttl'): IntValidator(min=0, max=3600, base_unit='s', raise_assert=True) }, }), Optional("citus"): { diff --git a/tests/test_ctl.py b/tests/test_ctl.py index 42d78214f..7613b1292 100644 --- a/tests/test_ctl.py +++ b/tests/test_ctl.py @@ -705,7 +705,7 @@ def test_edit_config(self): @patch('patroni.ctl.request_patroni') def test_version(self, mock_request): result = self.runner.invoke(ctl, ['version']) - assert 'patronictl version' in result.output + assert 'patronictl (timescale fork) version' in result.output mock_request.return_value.data = b'{"patroni":{"version":"1.2.3"},"server_version": 100001}' result = self.runner.invoke(ctl, ['version', 'dummy']) assert '1.2.3' in result.output