Skip to content

Commit

Permalink
Make member xlog updates frequency configurable in K8s
Browse files Browse the repository at this point in the history
In the past, Patroni had to update the member key every `loop_wait` interval to prevent its disappearance due to TTL. In Kubernetes, the member key won't disappear, however, Patroni still updates it regularly to propagate current xlog location.

Make those updates configurable. If `kubernetes.xlog_cache_ttl` is set to a positive value and no other member data (i.e. role or timeline) changes, do not update the member pod annotation more frequently than the value of `xlog_cache_ttl` seconds.

This reduces the load on the K8s API, as we don't have to update the pod every loop_wait interval. 

If the member needs to be updated due to other reasons (i.e. role change), the xlog position is set to the up-to-date one received from Postgres.

When DEBUG logging is turned on, Patroni will emit log messages similar to the one below when the xlog-only update is withheld because of this parameter:

```
2024-06-06 11:34:36,308 DEBUG: prevented pod update, keeping cached xlog value for up to 10 seconds
```

This parameter is set to 0 by default, so this feature is disabled unless turned on explicitly by the user. The maximum value is 3600s.
  • Loading branch information
alexeyklyukin authored Jun 13, 2024
1 parent ff31f45 commit 4164ebe
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 7 deletions.
1 change: 1 addition & 0 deletions docs/ENVIRONMENT.rst
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ Kubernetes
- **PATRONI\_KUBERNETES\_PORTS**: (optional) if the Service object has the name for the port, the same name must appear in the Endpoint object, otherwise service won't work. For example, if your service is defined as ``{Kind: Service, spec: {ports: [{name: postgresql, port: 5432, targetPort: 5432}]}}``, then you have to set ``PATRONI_KUBERNETES_PORTS='[{"name": "postgresql", "port": 5432}]'`` and Patroni will use it for updating subsets of the leader Endpoint. This parameter is used only if `PATRONI_KUBERNETES_USE_ENDPOINTS` is set.
- **PATRONI\_KUBERNETES\_CACERT**: (optional) Specifies the file with the CA_BUNDLE file with certificates of trusted CAs to use while verifying Kubernetes API SSL certs. If not provided, patroni will use the value provided by the ServiceAccount secret.
- **PATRONI\_RETRIABLE\_HTTP\_CODES**: (optional) list of HTTP status codes from K8s API to retry on. By default Patroni is retrying on ``500``, ``503``, and ``504``, or if K8s API response has ``retry-after`` HTTP header.
- **PATRONI\_KUBERNETES\_XLOG\_CACHE\_TTL**: (optional) duration in seconds to retain the previous value of the member xlog position when updating the member pod metadata. Higher values reduce the frequency of pod metadata updates from Patroni, at the expense of having an outdated xlog position. The default value is ``0``, indicating that the system should always use the up-to-date position. Setting it to the value that is a multiple of ``loop_wait`` reduces the number of API server requests in the Kubernetes cluster.

Raft (deprecated)
-----------------
Expand Down
12 changes: 12 additions & 0 deletions fork.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
.. _fork:

Timescale patroni fork
======================


TS_1
-----

- Add ``xlog_cache_ttl`` parameter for the Kubernetes DCS

Setting it to multiple of ``loop_wait`` prevents frequent pod updates, reducing the load on the API server, at the expense of stale value of the xlog position in the member's metadata.
4 changes: 3 additions & 1 deletion patroni/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ def _get_auth(name: str, params: Collection[str] = _AUTH_ALLOWED_PARAMETERS[:2])
'SERVICE_TAGS', 'NAMESPACE', 'CONTEXT', 'USE_ENDPOINTS', 'SCOPE_LABEL', 'ROLE_LABEL',
'POD_IP', 'PORTS', 'LABELS', 'BYPASS_API_SERVICE', 'RETRIABLE_HTTP_CODES', 'KEY_PASSWORD',
'USE_SSL', 'SET_ACLS', 'GROUP', 'DATABASE', 'LEADER_LABEL_VALUE', 'FOLLOWER_LABEL_VALUE',
'STANDBY_LEADER_LABEL_VALUE', 'TMP_ROLE_LABEL', 'AUTH_DATA') and name:
'STANDBY_LEADER_LABEL_VALUE', 'TMP_ROLE_LABEL', 'AUTH_DATA', 'XLOG_CACHE_TTL') and name:
value = os.environ.pop(param)
if name == 'CITUS':
if suffix == 'GROUP':
Expand All @@ -665,6 +665,8 @@ def _get_auth(name: str, params: Collection[str] = _AUTH_ALLOWED_PARAMETERS[:2])
continue
elif suffix == 'PORT':
value = value and parse_int(value)
elif suffix == 'XLOG_CACHE_TTL':
value = value and parse_int(value, 's')
elif suffix in ('HOSTS', 'PORTS', 'CHECKS', 'SERVICE_TAGS', 'RETRIABLE_HTTP_CODES'):
value = value and _parse_list(value)
elif suffix in ('LABELS', 'SET_ACLS', 'AUTH_DATA'):
Expand Down
4 changes: 2 additions & 2 deletions patroni/ctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -2176,7 +2176,7 @@ def version(cluster_name: str, group: Optional[int], member_names: List[str]) ->
:param group: filter which Citus group we should get members from. Refer to the module note for more details.
:param member_names: filter which members we should get version information from.
"""
click.echo("patronictl version {0}".format(__version__))
click.echo("patronictl (timescale fork) version {0}".format(__version__))

if not cluster_name:
return
Expand All @@ -2192,7 +2192,7 @@ def version(cluster_name: str, group: Optional[int], member_names: List[str]) ->
version = data.get('patroni', {}).get('version')
pg_version = data.get('server_version')
pg_version_str = " PostgreSQL {0}".format(format_pg_version(pg_version)) if pg_version else ""
click.echo("{0}: Patroni {1}{2}".format(m.name, version, pg_version_str))
click.echo("{0}: Patroni (timescale fork) {1}{2}".format(m.name, version, pg_version_str))
except Exception as e:
click.echo("{0}: failed to get version: {1}".format(m.name, e))

Expand Down
2 changes: 1 addition & 1 deletion patroni/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def get_base_arg_parser() -> argparse.ArgumentParser:
from .version import __version__

parser = argparse.ArgumentParser()
parser.add_argument('--version', action='version', version='%(prog)s {0}'.format(__version__))
parser.add_argument('--version', action='version', version='%(prog)s (timescale fork) {0}'.format(__version__))
parser.add_argument('configfile', nargs='?', default='',
help='Patroni may also read the configuration from the {0} environment variable'
.format(Config.PATRONI_CONFIG_VARIABLE))
Expand Down
44 changes: 42 additions & 2 deletions patroni/dcs/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from ..exceptions import DCSError
from ..postgresql.mpp import AbstractMPP
from ..utils import deep_compare, iter_response_objects, keepalive_socket_options, \
Retry, RetryFailedError, tzutc, uri, USER_AGENT
parse_int, Retry, RetryFailedError, tzutc, uri, USER_AGENT
if TYPE_CHECKING: # pragma: no cover
from ..config import Config

Expand Down Expand Up @@ -758,6 +758,9 @@ def __init__(self, config: Dict[str, Any], mpp: AbstractMPP) -> None:
self._standby_leader_label_value = config.get('standby_leader_label_value', 'master')
self._tmp_role_label = config.get('tmp_role_label')
self._ca_certs = os.environ.get('PATRONI_KUBERNETES_CACERT', config.get('cacert')) or SERVICE_CERT_FILENAME
self._xlog_cache_ttl = 0
self._cached_xlog_location_modified_timestamp = None
self._cached_xlog_location = None
super(Kubernetes, self).__init__({**config, 'namespace': ''}, mpp)
if self._mpp.is_enabled():
self._labels[self._mpp.k8s_group_label] = str(self._mpp.group)
Expand Down Expand Up @@ -831,10 +834,14 @@ def reload_config(self, config: Union['Config', Dict[str, Any]]) -> None:
super(Kubernetes, self).reload_config(config)
if TYPE_CHECKING: # pragma: no cover
assert self._retry.deadline is not None

# we could be called with only Kubernetes part of the config (module init), or with the whole config
# during reload; make sure only kubernetes part of the config is fetched below.
kconfig = config.get('kubernetes') or config
self._api.configure_timeouts(self.loop_wait, self._retry.deadline, self.ttl)

# retriable_http_codes supposed to be either int, list of integers or comma-separated string with integers.
retriable_http_codes: Union[str, List[Union[str, int]]] = config.get('retriable_http_codes', [])
retriable_http_codes: Union[str, List[Union[str, int]]] = kconfig.get('retriable_http_codes', [])
if not isinstance(retriable_http_codes, list):
retriable_http_codes = [c.strip() for c in str(retriable_http_codes).split(',')]

Expand All @@ -843,6 +850,11 @@ def reload_config(self, config: Union['Config', Dict[str, Any]]) -> None:
except Exception as e:
logger.warning('Invalid value of retriable_http_codes = %s: %r', config['retriable_http_codes'], e)

# cache xlog location for the member, preventing pod update when xlog location is the only update for the pod
self._xlog_cache_ttl = parse_int(kconfig.get('xlog_cache_ttl', '0'), 's') or 0
if self._xlog_cache_ttl > 0:
logger.debug("set xlog_cache_ttl to %d", self._xlog_cache_ttl)

@staticmethod
def member(pod: K8sObject) -> Member:
annotations = pod.metadata.annotations or EMPTY_DICT
Expand Down Expand Up @@ -1302,6 +1314,13 @@ def _config_resource_version(self) -> Optional[str]:
def set_config_value(self, value: str, version: Optional[str] = None) -> bool:
return self.patch_or_create_config({self._CONFIG: value}, version, bool(self._config_resource_version), False)

def _get_cached_xlog_location(self) -> Tuple[Optional[str], Optional[int]]:
return self._cached_xlog_location, self._cached_xlog_location_modified_timestamp

def _set_cached_xlog_location(self, location: str) -> None:
self._cached_xlog_location = location
self._cached_xlog_location_modified_timestamp = int(time.time())

@catch_kubernetes_errors
def touch_member(self, data: Dict[str, Any]) -> bool:
cluster = self.cluster
Expand All @@ -1321,17 +1340,38 @@ def touch_member(self, data: Dict[str, Any]) -> bool:

member = cluster and cluster.get_member(self._name, fallback_to_leader=False)
pod_labels = member and member.data.pop('pod_labels', None)

replaced_xlog_location: Optional[str] = data.get('xlog_location', None)
cached_xlog_location, last_updated = self._get_cached_xlog_location()
now = int(time.time())
use_cached_xlog = False
if last_updated is not None and last_updated + self._xlog_cache_ttl > now:
if cached_xlog_location is not None and replaced_xlog_location is not None:
data['xlog_location'] = cached_xlog_location
use_cached_xlog = True
elif replaced_xlog_location is not None:
# location cache expired
self._set_cached_xlog_location(replaced_xlog_location)
ret = member and pod_labels is not None\
and all(pod_labels.get(k) == v for k, v in role_labels.items())\
and deep_compare(data, member.data)

if not ret:
# if we move forward with an update anyway, make sure to write the actual
# value for the xlog, and not the stale cached value.
if use_cached_xlog and replaced_xlog_location is not None:
self._set_cached_xlog_location(replaced_xlog_location)
data['xlog_location'] = replaced_xlog_location
metadata = {'namespace': self._namespace, 'name': self._name, 'labels': role_labels,
'annotations': {'status': json.dumps(data, separators=(',', ':'))}}
body = k8s_client.V1Pod(metadata=k8s_client.V1ObjectMeta(**metadata))
ret = self._api.patch_namespaced_pod(self._name, self._namespace, body)
if ret:
self._pods.set(self._name, ret)
elif use_cached_xlog and cached_xlog_location != replaced_xlog_location and last_updated is not None:
logger.debug("prevented pod update, keeping cached xlog value for up to %d seconds",
(last_updated + self._xlog_cache_ttl - now))

if self._should_create_config_service:
self._create_config_service()
return bool(ret)
Expand Down
1 change: 1 addition & 0 deletions patroni/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1124,6 +1124,7 @@ def validate_watchdog_mode(value: Any) -> None:
Optional("ports"): [{"name": str, "port": IntValidator(max=65535, expected_type=int, raise_assert=True)}],
Optional("cacert"): str,
Optional("retriable_http_codes"): Or(int, [int]),
Optional('xlog_cache_ttl'): IntValidator(min=0, max=3600, base_unit='s', raise_assert=True)
},
}),
Optional("citus"): {
Expand Down
2 changes: 1 addition & 1 deletion tests/test_ctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ def test_edit_config(self):
@patch('patroni.ctl.request_patroni')
def test_version(self, mock_request):
result = self.runner.invoke(ctl, ['version'])
assert 'patronictl version' in result.output
assert 'patronictl (timescale fork) version' in result.output
mock_request.return_value.data = b'{"patroni":{"version":"1.2.3"},"server_version": 100001}'
result = self.runner.invoke(ctl, ['version', 'dummy'])
assert '1.2.3' in result.output
Expand Down

0 comments on commit 4164ebe

Please sign in to comment.