Skip to content

Commit

Permalink
fix: rework locking to avoid multiple recurring jobs from race condii…
Browse files Browse the repository at this point in the history
…tons
  • Loading branch information
cknight committed Sep 13, 2023
1 parent 1962bb9 commit 6bbb088
Showing 1 changed file with 27 additions and 15 deletions.
42 changes: 27 additions & 15 deletions main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ const kv = await Deno.openKv();

async function enqueue(msg: number, delay: number): Promise<void> {
const result = await kv.atomic()
.check({ key: LOCK_KEY, versionstamp: null }) //check no enqueue lock is present
.enqueue(msg, {
delay: delay,
keysIfUndelivered: [DELIVERY_FAILED_KEY],
}) // enqueue next recurring update
.set(NEXT_UPDATE_KEY, msg) // set next update time (for UI display purposes only)
.set(LOCK_KEY, true) // set lock to prevent multiple enqueues of this topic
.commit();

if (result.ok) {
console.log(`------- Enqueued message for ${new Date(msg).toUTCString()} (UTC)`);
console.log(
`------- Enqueued message for ${new Date(msg).toUTCString()} (UTC)`,
);
} else {
const nextDelivery = (await kv.get(NEXT_UPDATE_KEY)).value as number;
console.log(
Expand All @@ -50,9 +50,9 @@ async function enqueue(msg: number, delay: number): Promise<void> {
* msg is the unix timestamp of the expected delivery date/time of the message
*/
kv.listenQueue(async (msg: unknown) => {
console.log(`Received message: ${msg} (${new Date(msg as number).toUTCString()})`);

await kv.delete(LOCK_KEY); // release lock
console.log(
`Received message: ${msg} (${new Date(msg as number).toUTCString()})`,
);

const now = Date.now();
const msgTime = Number(msg);
Expand All @@ -74,15 +74,27 @@ kv.listenQueue(async (msg: unknown) => {
await enqueue(oneHourFromNow, ONE_HOUR_IN_MS);
});

/**
* This line covers two use cases:
*
* 1. First ever run with need to kick start the recurring job
* 2. Re-start the job after failures have exhausted the retries
*
* If there is already an enqueued message, then this will be a no-op
*/
await enqueue(Date.now(), 0);
// Start recurring job if no other isolate has already started it before
const startRecurring = await kv.atomic().check({
key: LOCK_KEY,
versionstamp: null,
}).set(LOCK_KEY, true).commit();
if (startRecurring.ok) {
console.log("Starting recurring job");
await enqueue(Date.now(), 0);
}

const failedKey = await kv.get(DELIVERY_FAILED_KEY);
if (failedKey.value) {
const shouldRestart = await kv.atomic().check({
key: DELIVERY_FAILED_KEY,
versionstamp: failedKey.versionstamp,
}).delete(DELIVERY_FAILED_KEY).commit();
if (shouldRestart.ok) {
console.log("Restarting recurring job after failed delivery");
await enqueue(Date.now(), 0);
}
}

console.log("Listening for messages...");

Expand Down

0 comments on commit 6bbb088

Please sign in to comment.