Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PoC] Queue notification to optimize polling #257

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions tasktiger/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,8 @@ def _move(self, from_state=None, to_state=None, when=None, mode=None):
_key(from_state), queue, _key(from_state, queue), client=pipeline
)

if to_state == QUEUED and self.tiger.config["PUBLISH_QUEUED_TASKS"]:
pipeline.publish(_key("activity"), queue)
if to_state == QUEUED:
self.tiger._notify_task_queued(queue, client=pipeline)

try:
scripts.execute_pipeline(pipeline)
Expand Down Expand Up @@ -420,8 +420,10 @@ def delay(self, when=None, max_queue_size=None):
mode="nx",
client=pipeline,
)
if state == QUEUED and tiger.config["PUBLISH_QUEUED_TASKS"]:
pipeline.publish(tiger._key("activity"), self.queue)

if state == QUEUED:
tiger._notify_task_queued(self.queue, client=pipeline)

pipeline.execute()

self._state = state
Expand Down
27 changes: 27 additions & 0 deletions tasktiger/tasktiger.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import datetime
import importlib
import logging
import secrets
from collections import defaultdict

import click
Expand All @@ -15,6 +16,7 @@
QUEUED,
SCHEDULED,
classproperty,
dotted_parts,
g,
queue_matches,
serialize_func_name,
Expand Down Expand Up @@ -76,6 +78,8 @@
STRING <prefix>:qlock:<queue> (Legacy queue locks that are no longer used)
"""

# TODO - mention newly added keys


class TaskTiger:
def __init__(
Expand Down Expand Up @@ -203,6 +207,11 @@ def init(self, connection=None, config=None, setup_structlog=False):
# subscribe to the activity channel. Use for more efficient task
# processing with a large amount of workers.
"POLL_TASK_QUEUES_INTERVAL": 0,
# Set to > 0 to reduce the frequency of queue polling by using
# cache token keys whose values are updated whenever a task is queued.
# TODO - reword to make it sound better
# TODO - caution about increased memory usage
"POLL_CACHE_TOKEN_KEY_EXPIRY": 0,
# Whether to publish new tasks to the activity channel. Only set to
# False if all the workers are polling queues.
"PUBLISH_QUEUED_TASKS": True,
Expand Down Expand Up @@ -256,6 +265,24 @@ def _get_current_tasks(self):
current_task = property(_get_current_task)
current_tasks = property(_get_current_tasks)

def _notify_task_queued(self, queue, client=None):
_client = client or self.connection.pipeline(transaction=False)

if self.config["PUBLISH_QUEUED_TASKS"]:
_client.publish(self._key("activity"), queue)

cache_token_expiry = self.config["POLL_CACHE_TOKEN_KEY_EXPIRY"]
if cache_token_expiry > 0:
for queue_part in list(dotted_parts(queue)) + [""]:
_client.set(
self._key("queued_cache_token", queue_part),
secrets.token_hex(),
ex=cache_token_expiry,
)

if _client is not client:
_client.execute()

@classproperty
def current_instance(self):
"""
Expand Down
40 changes: 36 additions & 4 deletions tasktiger/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import uuid
from collections import OrderedDict
from contextlib import ExitStack
from typing import List, Set
from typing import List, Optional, Set, Tuple

from redis.exceptions import LockError

Expand Down Expand Up @@ -86,6 +86,8 @@ def __init__(
self._key = tiger._key
self._did_work = True
self._last_task_check = 0.0
self._next_forced_queue_poll = 0.0
self._queued_cache_tokens: Optional[Tuple[Optional[str], ...]] = None
self.stats_thread = None
self.id = str(uuid.uuid4())

Expand Down Expand Up @@ -221,8 +223,7 @@ def _worker_queue_scheduled_tasks(self) -> None:
# XXX: ideally this would be in the same pipeline, but we only want
# to announce if there was a result.
if result:
if self.config["PUBLISH_QUEUED_TASKS"]:
self.connection.publish(self._key("activity"), queue)
self.tiger._notify_task_queued(queue)
self._did_work = True

def _poll_for_queues(self) -> None:
Expand All @@ -235,7 +236,38 @@ def _poll_for_queues(self) -> None:
"""
if not self._did_work:
time.sleep(self.config["POLL_TASK_QUEUES_INTERVAL"])
self._refresh_queue_set()

if self._should_poll_for_queues():
self._refresh_queue_set()

def _should_poll_for_queues(self) -> bool:
cache_token_expiry = self.tiger.config["POLL_CACHE_TOKEN_KEY_EXPIRY"]
if cache_token_expiry <= 0:
return True

next_forced_queue_poll = self._next_forced_queue_poll
self._next_forced_queue_poll = time.monotonic() + cache_token_expiry

queued_cache_tokens = self._get_queued_cache_tokens()
if queued_cache_tokens != self._queued_cache_tokens:
self._queued_cache_tokens = queued_cache_tokens
return True

# Ensure that we poll queues when we haven't retrieved the cache
# tokens for at least as long as their expiry time.
if time.monotonic() >= next_forced_queue_poll:
self.log.info("Forcing a queue poll due to inactivity.")
return True

return False

def _get_queued_cache_tokens(self) -> Tuple[Optional[str], ...]:
keys = sorted(
self._key("queued_cache_token", queue)
for queue in self.only_queues or [""]
)

return tuple(self.connection.mget(keys))

def _pubsub_for_queues(self, timeout=0, batch_timeout=0) -> None:
"""
Expand Down