From 3e0185b0957d1652de71154be88318fbf7c2a849 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Bartosi=C5=84ski?= Date: Thu, 15 Dec 2022 11:46:14 +0100 Subject: [PATCH 1/4] Queue notification to optimize polling --- tasktiger/task.py | 10 ++++++---- tasktiger/tasktiger.py | 13 +++++++++++++ tasktiger/worker.py | 30 +++++++++++++++++++++++++++--- 3 files changed, 46 insertions(+), 7 deletions(-) diff --git a/tasktiger/task.py b/tasktiger/task.py index bca090de..b053f3de 100644 --- a/tasktiger/task.py +++ b/tasktiger/task.py @@ -344,8 +344,8 @@ def _move(self, from_state=None, to_state=None, when=None, mode=None): _key(from_state), queue, _key(from_state, queue), client=pipeline ) - if to_state == QUEUED and self.tiger.config["PUBLISH_QUEUED_TASKS"]: - pipeline.publish(_key("activity"), queue) + if to_state == QUEUED: + self.tiger._notify_queue(queue, client=pipeline) try: scripts.execute_pipeline(pipeline) @@ -420,8 +420,10 @@ def delay(self, when=None, max_queue_size=None): mode="nx", client=pipeline, ) - if state == QUEUED and tiger.config["PUBLISH_QUEUED_TASKS"]: - pipeline.publish(tiger._key("activity"), self.queue) + + if state == QUEUED: + tiger._notify_queue(self.queue, client=pipeline) + pipeline.execute() self._state = state diff --git a/tasktiger/tasktiger.py b/tasktiger/tasktiger.py index 6c18647a..e61f8057 100644 --- a/tasktiger/tasktiger.py +++ b/tasktiger/tasktiger.py @@ -3,6 +3,7 @@ import datetime import importlib import logging +import secrets from collections import defaultdict import click @@ -256,6 +257,18 @@ def _get_current_tasks(self): current_task = property(_get_current_task) current_tasks = property(_get_current_tasks) + def _notify_queue(self, queue, client=None): + client = client or self.connection + + if self.config["PUBLISH_QUEUED_TASKS"]: + client.publish(self._key("activity"), queue) + + client.set( + self._key("queue_token", queue.split(".", 1)[0]), + secrets.token_hex(), + ex=7200, # XXX Just for tests so we don't clutter Redis + ) + @classproperty def current_instance(self): """ diff --git a/tasktiger/worker.py b/tasktiger/worker.py index 27ade63a..250d0626 100644 --- a/tasktiger/worker.py +++ b/tasktiger/worker.py @@ -86,6 +86,7 @@ def __init__( self._key = tiger._key self._did_work = True self._last_task_check = 0.0 + self._queue_set_token = "" self.stats_thread = None self.id = str(uuid.uuid4()) @@ -221,8 +222,7 @@ def _worker_queue_scheduled_tasks(self) -> None: # XXX: ideally this would be in the same pipeline, but we only want # to announce if there was a result. if result: - if self.config["PUBLISH_QUEUED_TASKS"]: - self.connection.publish(self._key("activity"), queue) + self.tiger._notify_queue(queue) self._did_work = True def _poll_for_queues(self) -> None: @@ -235,7 +235,31 @@ def _poll_for_queues(self) -> None: """ if not self._did_work: time.sleep(self.config["POLL_TASK_QUEUES_INTERVAL"]) - self._refresh_queue_set() + + if self._is_queue_set_out_of_date(): + self._refresh_queue_set() + self.log.info(f"Poll: Done ({len(self._queue_set)})") + + def _is_queue_set_out_of_date(self) -> bool: + if not self.only_queues: + return True + + queue_set_token = ":".join( + token or "" + for token in self.connection.mget( + sorted( + self._key("queue_token", queue) + for queue in self.only_queues + ) + ) + ) + + if queue_set_token != self._queue_set_token: + self.log.info("Poll: Token changed") + self._queue_set_token = queue_set_token + return True + + return False def _pubsub_for_queues(self, timeout=0, batch_timeout=0) -> None: """ From f74702ca70a69256518039d3901348a3fbeec29e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Bartosi=C5=84ski?= Date: Fri, 16 Dec 2022 15:03:07 +0100 Subject: [PATCH 2/4] Reorganise code and add forced polling --- tasktiger/tasktiger.py | 2 +- tasktiger/worker.py | 29 ++++++++++++++++------------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/tasktiger/tasktiger.py b/tasktiger/tasktiger.py index e61f8057..a9e70b8b 100644 --- a/tasktiger/tasktiger.py +++ b/tasktiger/tasktiger.py @@ -266,7 +266,7 @@ def _notify_queue(self, queue, client=None): client.set( self._key("queue_token", queue.split(".", 1)[0]), secrets.token_hex(), - ex=7200, # XXX Just for tests so we don't clutter Redis + ex=3600 * 24 * 30, # To avoid leftover keys as queues change ) @classproperty diff --git a/tasktiger/worker.py b/tasktiger/worker.py index 250d0626..96c51cf7 100644 --- a/tasktiger/worker.py +++ b/tasktiger/worker.py @@ -86,6 +86,7 @@ def __init__( self._key = tiger._key self._did_work = True self._last_task_check = 0.0 + self._next_forced_queue_poll = 0.0 self._queue_set_token = "" self.stats_thread = None self.id = str(uuid.uuid4()) @@ -236,31 +237,33 @@ def _poll_for_queues(self) -> None: if not self._did_work: time.sleep(self.config["POLL_TASK_QUEUES_INTERVAL"]) - if self._is_queue_set_out_of_date(): + if self._should_poll_for_queues(): self._refresh_queue_set() - self.log.info(f"Poll: Done ({len(self._queue_set)})") + self._next_forced_queue_poll = time.monotonic() + 3600 - def _is_queue_set_out_of_date(self) -> bool: + def _should_poll_for_queues(self) -> bool: if not self.only_queues: return True - queue_set_token = ":".join( - token or "" - for token in self.connection.mget( - sorted( - self._key("queue_token", queue) - for queue in self.only_queues - ) - ) - ) + queue_set_token = self._get_queue_set_token() if queue_set_token != self._queue_set_token: - self.log.info("Poll: Token changed") self._queue_set_token = queue_set_token return True + if time.monotonic() > self._next_forced_queue_poll: + self.log.info("Forcing a queue poll due to inactivity.") + return True + return False + def _get_queue_set_token(self) -> str: + keys = sorted( + self._key("queue_token", queue) for queue in self.only_queues + ) + + return ":".join(map(str, self.connection.mget(keys))) + def _pubsub_for_queues(self, timeout=0, batch_timeout=0) -> None: """ Check activity channel for new queues and wait as necessary. From 507d72fde1f67d952a0a26dd34b56865e10b23d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Bartosi=C5=84ski?= Date: Mon, 19 Dec 2022 12:53:16 +0100 Subject: [PATCH 3/4] Post-CR updates --- tasktiger/tasktiger.py | 30 ++++++++++++++++++++++-------- tasktiger/worker.py | 27 ++++++++++++++++----------- 2 files changed, 38 insertions(+), 19 deletions(-) diff --git a/tasktiger/tasktiger.py b/tasktiger/tasktiger.py index a9e70b8b..081e08f1 100644 --- a/tasktiger/tasktiger.py +++ b/tasktiger/tasktiger.py @@ -16,6 +16,7 @@ QUEUED, SCHEDULED, classproperty, + dotted_parts, g, queue_matches, serialize_func_name, @@ -77,6 +78,8 @@ STRING :qlock: (Legacy queue locks that are no longer used) """ +# TODO - mention newly added keys + class TaskTiger: def __init__( @@ -204,6 +207,11 @@ def init(self, connection=None, config=None, setup_structlog=False): # subscribe to the activity channel. Use for more efficient task # processing with a large amount of workers. "POLL_TASK_QUEUES_INTERVAL": 0, + # Set to > 0 to reduce the frequency of queue polling by using + # cache token keys whose values are updated whenever a task is queued. + # TODO - reword to make it sound better + # TODO - caution about increased memory usage + "POLL_CACHE_TOKEN_KEY_EXPIRY": 0, # Whether to publish new tasks to the activity channel. Only set to # False if all the workers are polling queues. "PUBLISH_QUEUED_TASKS": True, @@ -258,16 +266,22 @@ def _get_current_tasks(self): current_tasks = property(_get_current_tasks) def _notify_queue(self, queue, client=None): - client = client or self.connection + _client = client or self.connection.pipeline(transaction=False) if self.config["PUBLISH_QUEUED_TASKS"]: - client.publish(self._key("activity"), queue) - - client.set( - self._key("queue_token", queue.split(".", 1)[0]), - secrets.token_hex(), - ex=3600 * 24 * 30, # To avoid leftover keys as queues change - ) + _client.publish(self._key("activity"), queue) + + cache_token_expiry = self.config["POLL_CACHE_TOKEN_KEY_EXPIRY"] + if cache_token_expiry > 0: + for queue_part in list(dotted_parts(queue)) + [""]: + _client.set( + self._key("queue_cache_token", queue_part), + secrets.token_hex(), + ex=cache_token_expiry, + ) + + if _client is not client: + _client.execute() @classproperty def current_instance(self): diff --git a/tasktiger/worker.py b/tasktiger/worker.py index 96c51cf7..b06bb846 100644 --- a/tasktiger/worker.py +++ b/tasktiger/worker.py @@ -14,7 +14,7 @@ import uuid from collections import OrderedDict from contextlib import ExitStack -from typing import List, Set +from typing import List, Optional, Set, Tuple from redis.exceptions import LockError @@ -87,7 +87,7 @@ def __init__( self._did_work = True self._last_task_check = 0.0 self._next_forced_queue_poll = 0.0 - self._queue_set_token = "" + self._queue_cache_tokens: Optional[Tuple[Optional[str], ...]] = None self.stats_thread = None self.id = str(uuid.uuid4()) @@ -239,30 +239,35 @@ def _poll_for_queues(self) -> None: if self._should_poll_for_queues(): self._refresh_queue_set() - self._next_forced_queue_poll = time.monotonic() + 3600 def _should_poll_for_queues(self) -> bool: - if not self.only_queues: + cache_token_expiry = self.tiger.config["POLL_CACHE_TOKEN_KEY_EXPIRY"] + if cache_token_expiry <= 0: return True - queue_set_token = self._get_queue_set_token() + next_forced_queue_poll = self._next_forced_queue_poll + self._next_forced_queue_poll = time.monotonic() + cache_token_expiry - if queue_set_token != self._queue_set_token: - self._queue_set_token = queue_set_token + queue_cache_tokens = self._get_queue_cache_tokens() + if queue_cache_tokens != self._queue_cache_tokens: + self._queue_cache_tokens = queue_cache_tokens return True - if time.monotonic() > self._next_forced_queue_poll: + # Ensure that we poll queues when we haven't retrieved the cache + # tokens for at least as long as their expiry time. + if time.monotonic() >= next_forced_queue_poll: self.log.info("Forcing a queue poll due to inactivity.") return True return False - def _get_queue_set_token(self) -> str: + def _get_queue_cache_tokens(self) -> Tuple[Optional[str], ...]: keys = sorted( - self._key("queue_token", queue) for queue in self.only_queues + self._key("queue_cache_token", queue) + for queue in self.only_queues or [""] ) - return ":".join(map(str, self.connection.mget(keys))) + return tuple(self.connection.mget(keys)) def _pubsub_for_queues(self, timeout=0, batch_timeout=0) -> None: """ From c0751b30f0b509d2ce368efae795af6c4f63d932 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Bartosi=C5=84ski?= Date: Tue, 20 Dec 2022 11:34:45 +0100 Subject: [PATCH 4/4] Naming changes --- tasktiger/task.py | 4 ++-- tasktiger/tasktiger.py | 4 ++-- tasktiger/worker.py | 14 +++++++------- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/tasktiger/task.py b/tasktiger/task.py index b053f3de..01cf84e4 100644 --- a/tasktiger/task.py +++ b/tasktiger/task.py @@ -345,7 +345,7 @@ def _move(self, from_state=None, to_state=None, when=None, mode=None): ) if to_state == QUEUED: - self.tiger._notify_queue(queue, client=pipeline) + self.tiger._notify_task_queued(queue, client=pipeline) try: scripts.execute_pipeline(pipeline) @@ -422,7 +422,7 @@ def delay(self, when=None, max_queue_size=None): ) if state == QUEUED: - tiger._notify_queue(self.queue, client=pipeline) + tiger._notify_task_queued(self.queue, client=pipeline) pipeline.execute() diff --git a/tasktiger/tasktiger.py b/tasktiger/tasktiger.py index 081e08f1..989d41e7 100644 --- a/tasktiger/tasktiger.py +++ b/tasktiger/tasktiger.py @@ -265,7 +265,7 @@ def _get_current_tasks(self): current_task = property(_get_current_task) current_tasks = property(_get_current_tasks) - def _notify_queue(self, queue, client=None): + def _notify_task_queued(self, queue, client=None): _client = client or self.connection.pipeline(transaction=False) if self.config["PUBLISH_QUEUED_TASKS"]: @@ -275,7 +275,7 @@ def _notify_queue(self, queue, client=None): if cache_token_expiry > 0: for queue_part in list(dotted_parts(queue)) + [""]: _client.set( - self._key("queue_cache_token", queue_part), + self._key("queued_cache_token", queue_part), secrets.token_hex(), ex=cache_token_expiry, ) diff --git a/tasktiger/worker.py b/tasktiger/worker.py index b06bb846..60cfcdcc 100644 --- a/tasktiger/worker.py +++ b/tasktiger/worker.py @@ -87,7 +87,7 @@ def __init__( self._did_work = True self._last_task_check = 0.0 self._next_forced_queue_poll = 0.0 - self._queue_cache_tokens: Optional[Tuple[Optional[str], ...]] = None + self._queued_cache_tokens: Optional[Tuple[Optional[str], ...]] = None self.stats_thread = None self.id = str(uuid.uuid4()) @@ -223,7 +223,7 @@ def _worker_queue_scheduled_tasks(self) -> None: # XXX: ideally this would be in the same pipeline, but we only want # to announce if there was a result. if result: - self.tiger._notify_queue(queue) + self.tiger._notify_task_queued(queue) self._did_work = True def _poll_for_queues(self) -> None: @@ -248,9 +248,9 @@ def _should_poll_for_queues(self) -> bool: next_forced_queue_poll = self._next_forced_queue_poll self._next_forced_queue_poll = time.monotonic() + cache_token_expiry - queue_cache_tokens = self._get_queue_cache_tokens() - if queue_cache_tokens != self._queue_cache_tokens: - self._queue_cache_tokens = queue_cache_tokens + queued_cache_tokens = self._get_queued_cache_tokens() + if queued_cache_tokens != self._queued_cache_tokens: + self._queued_cache_tokens = queued_cache_tokens return True # Ensure that we poll queues when we haven't retrieved the cache @@ -261,9 +261,9 @@ def _should_poll_for_queues(self) -> bool: return False - def _get_queue_cache_tokens(self) -> Tuple[Optional[str], ...]: + def _get_queued_cache_tokens(self) -> Tuple[Optional[str], ...]: keys = sorted( - self._key("queue_cache_token", queue) + self._key("queued_cache_token", queue) for queue in self.only_queues or [""] )