Skip to content

Commit

Permalink
*: add queue and retry depth tracking to multiqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
montyanderson committed Jan 16, 2024
1 parent 8723329 commit c830c9d
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 3 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ name: Run Tests

on:
push:
branches:
- master


jobs:
test:
Expand Down
6 changes: 5 additions & 1 deletion deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 40 additions & 0 deletions main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,51 @@ export const createMultiQueue: CreateMultiQueue = <Queue, Job>(
return typeof job === "string" ? JSON.parse(job) as Job : undefined;
};

const getQueueDepths = async () => {
const queues = await redis.zrangebyscore(
queueDepthKey,
"0",
"+inf",
);

const depths = new Map<Queue, number>();

for (const queue of queues) {
depths.set(
queue as Queue,
await redis.zcard(`${queueKeyPrefix}${queue}`),
);
}

return depths;
};

const getRetryDepths = async () => {
const queues = await redis.zrangebyscore(
queueDepthKey,
"0",
"+inf",
);

const depths = new Map<Queue, number>();

for (const queue of queues) {
depths.set(
queue as Queue,
await redis.zcard(`${retryKeyPrefix}${queue}`),
);
}

return depths;
};

return {
push,
pop,
complete,
getDeepest,
popAny,
getQueueDepths,
getRetryDepths,
};
};
48 changes: 48 additions & 0 deletions test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,65 @@ await (async () => {
await mq.push(myQueue, secondJob, 500);
await mq.push(myQueue, firstJob, 250);

assertEquals(
await mq.getQueueDepths(),
new Map([
["myQueue", 2],
]),
"getQueueDepths() must return correct queue sizes",
);

assertEquals(
await mq.getRetryDepths(),
new Map([]),
"getRetryDepths() must return correct queue sizes",
);

// pop one element
assertEquals(
await mq.pop(myQueue),
firstJob,
"pop() must return correct job",
);

assertEquals(
await mq.getQueueDepths(),
new Map([
["myQueue", 1],
]),
"getQueueDepths() must return correct queue sizes",
);

assertEquals(
await mq.getRetryDepths(),
new Map([
["myQueue", 1],
]),
"getRetryDepths() must return correct queue sizes",
);

// pop another element
assertEquals(
await mq.pop(myQueue),
secondJob,
"pop() must return correct job",
);

assertEquals(
await mq.getQueueDepths(),
new Map([
["myQueue", 0],
]),
"getQueueDepths() must return correct queue sizes",
);

assertEquals(
await mq.getRetryDepths(),
new Map([
["myQueue", 2],
]),
"getRetryDepths() must return correct queue sizes",
);
})();

await (async () => {
Expand Down
2 changes: 2 additions & 0 deletions types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ export type MultiQueue<Queue extends JsonValue, Job extends JsonValue> = {
complete: (queue: Queue, job: Job) => Promise<void>;
getDeepest: () => Promise<Queue | undefined>;
popAny: (queues?: Queue[]) => Promise<Job | undefined>;
getQueueDepths: () => Promise<Map<Queue, number>>;
getRetryDepths: () => Promise<Map<Queue, number>>;
};

export type CreateMultiQueueOptions = {
Expand Down

0 comments on commit c830c9d

Please sign in to comment.