Skip to content

Commit

Permalink
Merge pull request #1 from prodialabs/queue-depths-functions
Browse files Browse the repository at this point in the history
`*`: add `getQueueDepths()` and `getRetryDepths()`
  • Loading branch information
montyanderson authored Jan 16, 2024
2 parents 8723329 + 625a152 commit 9fb1779
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 3 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ name: Run Tests

on:
push:
branches:
- master


jobs:
test:
Expand Down
6 changes: 5 additions & 1 deletion deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 40 additions & 0 deletions main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,51 @@ export const createMultiQueue: CreateMultiQueue = <Queue, Job>(
return typeof job === "string" ? JSON.parse(job) as Job : undefined;
};

const getQueueDepths = async () => {
const queues = await redis.zrangebyscore(
queueDepthKey,
"0",
"+inf",
);

const depths = new Map<Queue, number>();

for (const queue of queues) {
depths.set(
JSON.parse(queue) as Queue,
await redis.zcard(`${queueKeyPrefix}${queue}`),
);
}

return depths;
};

const getRetryDepths = async () => {
const queues = await redis.zrangebyscore(
queueDepthKey,
"0",
"+inf",
);

const depths = new Map<Queue, number>();

for (const queue of queues) {
depths.set(
JSON.parse(queue) as Queue,
await redis.zcard(`${retryKeyPrefix}${queue}`),
);
}

return depths;
};

return {
push,
pop,
complete,
getDeepest,
popAny,
getQueueDepths,
getRetryDepths,
};
};
50 changes: 50 additions & 0 deletions test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,67 @@ await (async () => {
await mq.push(myQueue, secondJob, 500);
await mq.push(myQueue, firstJob, 250);

assertEquals(
await mq.getQueueDepths(),
new Map([
[myQueue, 2],
]),
"getQueueDepths() must return correct queue sizes",
);

assertEquals(
await mq.getRetryDepths(),
new Map([
[myQueue, 0],
]),
"getRetryDepths() must return correct queue sizes",
);

// pop one element
assertEquals(
await mq.pop(myQueue),
firstJob,
"pop() must return correct job",
);

assertEquals(
await mq.getQueueDepths(),
new Map([
[myQueue, 1],
]),
"getQueueDepths() must return correct queue sizes",
);

assertEquals(
await mq.getRetryDepths(),
new Map([
[myQueue, 1],
]),
"getRetryDepths() must return correct queue sizes",
);

// pop another element
assertEquals(
await mq.pop(myQueue),
secondJob,
"pop() must return correct job",
);

assertEquals(
await mq.getQueueDepths(),
new Map([
[myQueue, 0],
]),
"getQueueDepths() must return correct queue sizes",
);

assertEquals(
await mq.getRetryDepths(),
new Map([
[myQueue, 2],
]),
"getRetryDepths() must return correct queue sizes",
);
})();

await (async () => {
Expand Down
2 changes: 2 additions & 0 deletions types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ export type MultiQueue<Queue extends JsonValue, Job extends JsonValue> = {
complete: (queue: Queue, job: Job) => Promise<void>;
getDeepest: () => Promise<Queue | undefined>;
popAny: (queues?: Queue[]) => Promise<Job | undefined>;
getQueueDepths: () => Promise<Map<Queue, number>>;
getRetryDepths: () => Promise<Map<Queue, number>>;
};

export type CreateMultiQueueOptions = {
Expand Down

0 comments on commit 9fb1779

Please sign in to comment.