-
Notifications
You must be signed in to change notification settings - Fork 80
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PoC] Queue notification to optimize polling #257
base: master
Are you sure you want to change the base?
Conversation
4642deb
to
f9e6f13
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An interesting idea, looks promising! W.r.t. naming, queue_token
is indeed a bit too general to know what it's for without reading the code. Not really sure about this but what about queue_etag
? It functions the same way as HTTP's ETag so counting on association here.
tasktiger/tasktiger.py
Outdated
client.set( | ||
self._key("queue_token", queue.split(".", 1)[0]), | ||
secrets.token_hex(), | ||
ex=3600 * 24 * 30, # To avoid leftover keys as queues change |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe make it configurable in self.config
?
tasktiger/worker.py
Outdated
|
||
if self._should_poll_for_queues(): | ||
self._refresh_queue_set() | ||
self._next_forced_queue_poll = time.monotonic() + 3600 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe make the 3600 configurable in TaskTiger.config
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be less than or equal to the token key expiry, so I think we should use the configuration option for that here. See #257 (comment) for more details as to why.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be less than or equal to the token key expiry,
Why?
tasktiger/worker.py
Outdated
self._key("queue_token", queue) for queue in self.only_queues | ||
) | ||
|
||
return ":".join(map(str, self.connection.mget(keys))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we don't need this to be serializable, we could also just return it as a tuple and compare the tuples. The initial _queue_set_token
could just be an empty tuple (or None
).
tasktiger/worker.py
Outdated
|
||
def _get_queue_set_token(self) -> str: | ||
keys = sorted( | ||
self._key("queue_token", queue) for queue in self.only_queues |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- What if we don't use
only_queues
? Would we then only poll every hour? - The way it is currently implemented, this wouldn't work if we process a sub-queue (e.g. "foo.bar"), right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't use only_queues
, we'd poll the same way we do now (every POLL_TASK_QUEUES_INTERVAL
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we don't use only_queues? Would we then only poll every hour?
The way it is currently implemented, this wouldn't work if we process a sub-queue (e.g. "foo.bar"), right?
Correct, but all we need to do is take a root of each queue from self.only_queues
to get it work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't use only_queues, we'd poll the same way we do now (every POLL_TASK_QUEUES_INTERVAL)
Ah, because _should_poll_for_queues
returns True
if not self.only_queues
.
tasktiger/tasktiger.py
Outdated
client.publish(self._key("activity"), queue) | ||
|
||
client.set( | ||
self._key("queue_token", queue.split(".", 1)[0]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this optimization essentially only works for the top level queue name. Would there be an issue if we set the queue token for each level? For example, if the queue is "foo.bar.baz" we set the queue token for "foo.bar.baz", "foo.bar", "foo", and possibly "" (root/top level token for a client that processes all queues). That way, a client with only_queues
would use the specific queues as queue token keys in _get_queue_set_token
, and a client without only_queues
would look at the root token only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I considered this initially.
The first draft of this feature used the empty root ""
to detect changes on all queues, but I removed it because I didn't believe it would be that useful (we don't care about polling so much when there are really few subqueues present and it probably wouldn't help much if there are a lot)
I see no harm in bringing it back though as it could prove beneficial under certain rare conditions.
Regarding allowing subqueues (e.g. foo.bar
), I wanted to do just that, but then realised that it could potentially have a large memory footprint under certain conditions. We could lower the memory footprint by using a relatively low value for key expiry.
However, the forced poll interval must be less than or equal to the expiry time for the token keys, and I would like the forced poll interval to be 30-60min (its main goal is to mitigate a scenario where the token key is empty for two subsequent checks performed by the worker, yet without the worker's knowledge it was set to some value and then expired between these two checks - e.g. because the worker was busy working on something before the second check). That means we'd basically be keeping a key for every subqueue from the last hour.
I'd like to play it safe and only use root for now. Let's consider subqueue tokens in the next iteration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Obviously if we don't use subqueues for this optimisation, the logic needs to be adjusted to take the root of each queue from self.only_queues
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it could potentially have a large memory footprint under certain conditions
What specific footprint are you concerned about? Redis?
I'm okay doing this later and just sticking to the root queue name, it's just that I originally designed the queueing to be independent from how many subqueues you're using. E.g. switching queues from billing.<org_id>
to billing.usage.<org_id>
and billing.invoices.<org_id>
shouldn't have a performance impact. But I'll leave it up to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What specific footprint are you concerned about? Redis?
Yes. We'd need to store a token for every single subqueue of which there could be many (I don't think it's unreasonable to expect up to as many subqueues as there could be tasks). And these keys would stick around for a while. I'm worried we might be trading CPU issues this is supposed to fix for memory issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the current design it needs to be cached for at least as long as the interval for the forced poll implemented in this PR.
That's because we want to avoid a scenario where:
- the last token value retrieved by the worker is empty
- a worker has been busy processing multiple tasks for so long that the key is updated, and then expires again
- by the time the worker is done, it sees that the key is empty again, so it does nothing
And I'd like the forced poll interval to be as long as possible to avoid unnecessary polls.
It's only supposed to mitigate the issue with key expiration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a worker has been busy processing multiple tasks for so long that the key is updated, and then expires again
Could the workers update the token after every task?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They could, but it got me thinking and I realised that if we forced a token update every key expiry interval (instead of forcing the poll) reset the force poll timer on every token update, we'd solve the problem and we could lower the token key expiry without the polling frequency increasing linearly with it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll push the adjustments today.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
f450642
to
507d72f
Compare
95f2fc3
to
c0751b3
Compare
This PR is a PoC for an optimised way to poll queues. Tests were deliberately omitted at this stage.
The way it works is that the key
t:queued_cache_token:<queue root>
(I welcome better naming here) is updated with a random token whenever a task is added to that queue.The workers will monitor the tokens related to the queues they're interested in and do the expensive queue poll only when any of them change (or if an hour elapsed since the last poll, as a safeguard).