From 09ce146563871519cda638bafa82ce6af34bdd25 Mon Sep 17 00:00:00 2001 From: Rogger Valverde Date: Tue, 14 May 2024 23:12:02 -0600 Subject: [PATCH] fix(retry-job): consider priority (#2737) fixes #1755 --- .github/workflows/release.yml | 2 + generateRawScripts.js | 6 +- lib/commands/includes/addJobWithPriority.lua | 16 +++ lib/commands/includes/getTargetQueueList.lua | 12 ++ lib/commands/index.js | 5 +- lib/commands/{promote-4.lua => promote-5.lua} | 27 ++--- .../{retryJob-6.lua => retryJob-7.lua} | 22 ++-- lib/commands/script-loader.js | 107 +++++++----------- lib/queue.js | 13 +-- lib/scripts.js | 10 +- test/test_job.js | 6 +- test/test_queue.js | 54 +++++++++ 12 files changed, 166 insertions(+), 114 deletions(-) create mode 100644 lib/commands/includes/addJobWithPriority.lua create mode 100644 lib/commands/includes/getTargetQueueList.lua rename lib/commands/{promote-4.lua => promote-5.lua} (57%) rename lib/commands/{retryJob-6.lua => retryJob-7.lua} (64%) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 799b91ddc..f6993581d 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -22,6 +22,8 @@ jobs: node-version: 12 - name: Install dependencies run: yarn install --frozen-lockfile --non-interactive + - name: Generate scripts + run: yarn pretest - name: Release env: GITHUB_TOKEN: ${{ secrets.GH_TOKEN }} diff --git a/generateRawScripts.js b/generateRawScripts.js index f71766c06..97e2a9b5f 100644 --- a/generateRawScripts.js +++ b/generateRawScripts.js @@ -26,11 +26,11 @@ class RawScriptLoader extends ScriptLoader { for (const command of scripts) { const { name, - options: { numberOfKeys, lua }, + options: { numberOfKeys, lua } } = command; await writeFile( path.join(writeFilenamePath, `${name}-${numberOfKeys}.lua`), - lua, + lua ); } } @@ -41,5 +41,5 @@ const scriptLoader = new RawScriptLoader(); scriptLoader.transpileScripts( path.join(__dirname, './lib/commands'), - path.join(__dirname, './rawScripts'), + path.join(__dirname, './rawScripts') ); diff --git a/lib/commands/includes/addJobWithPriority.lua b/lib/commands/includes/addJobWithPriority.lua new file mode 100644 index 000000000..721ac6165 --- /dev/null +++ b/lib/commands/includes/addJobWithPriority.lua @@ -0,0 +1,16 @@ +--[[ + Function to add job considering priority. +]] + +local function addJobWithPriority(priorityKey, priority, jobId, targetKey) + rcall("ZADD", priorityKey, priority, jobId) + local count = rcall("ZCOUNT", priorityKey, 0, priority) + + local len = rcall("LLEN", targetKey) + local id = rcall("LINDEX", targetKey, len - (count - 1)) + if id then + rcall("LINSERT", targetKey, "BEFORE", id, jobId) + else + rcall("RPUSH", targetKey, jobId) + end +end diff --git a/lib/commands/includes/getTargetQueueList.lua b/lib/commands/includes/getTargetQueueList.lua new file mode 100644 index 000000000..0714d6d39 --- /dev/null +++ b/lib/commands/includes/getTargetQueueList.lua @@ -0,0 +1,12 @@ +--[[ + Function to check for the meta.paused key to decide if we are paused or not + (since an empty list and !EXISTS are not really the same). +]] + +local function getTargetQueueList(queueMetaKey, waitKey, pausedKey) + if rcall("EXISTS", queueMetaKey) ~= 1 then + return waitKey, false + else + return pausedKey, true + end +end diff --git a/lib/commands/index.js b/lib/commands/index.js index b8b05a061..b476bd842 100644 --- a/lib/commands/index.js +++ b/lib/commands/index.js @@ -4,5 +4,6 @@ const { ScriptLoader } = require('./script-loader'); const scriptLoader = new ScriptLoader(); module.exports = { - ScriptLoader, scriptLoader -} + ScriptLoader, + scriptLoader +}; diff --git a/lib/commands/promote-4.lua b/lib/commands/promote-5.lua similarity index 57% rename from lib/commands/promote-4.lua rename to lib/commands/promote-5.lua index fcd08910e..d4ed6a628 100644 --- a/lib/commands/promote-4.lua +++ b/lib/commands/promote-5.lua @@ -5,7 +5,8 @@ KEYS[1] 'delayed' KEYS[2] 'wait' KEYS[3] 'paused' - KEYS[4] 'priority' + KEYS[4] 'meta-paused' + KEYS[5] 'priority' ARGV[1] queue.toKey('') ARGV[2] jobId @@ -17,30 +18,20 @@ local rcall = redis.call; local jobId = ARGV[2] -if redis.call("ZREM", KEYS[1], jobId) == 1 then - local priority = tonumber(rcall("HGET", ARGV[1] .. jobId, "priority")) or 0 +-- Includes +--- @include "includes/addJobWithPriority" +--- @include "includes/getTargetQueueList" - local target = KEYS[2]; +if rcall("ZREM", KEYS[1], jobId) == 1 then + local priority = tonumber(rcall("HGET", ARGV[1] .. jobId, "priority")) or 0 - if rcall("EXISTS", KEYS[3]) == 1 then - target = KEYS[3] - end + local target = getTargetQueueList(KEYS[4], KEYS[2], KEYS[3]) if priority == 0 then -- LIFO or FIFO rcall("LPUSH", target, jobId) else - -- Priority add - rcall("ZADD", KEYS[4], priority, jobId) - local count = rcall("ZCOUNT", KEYS[4], 0, priority) - - local len = rcall("LLEN", target) - local id = rcall("LINDEX", target, len - (count - 1)) - if id then - rcall("LINSERT", target, "BEFORE", id, jobId) - else - rcall("RPUSH", target, jobId) - end + addJobWithPriority(KEYS[5], priority, jobId, target) end -- Emit waiting event (wait..ing@token) diff --git a/lib/commands/retryJob-6.lua b/lib/commands/retryJob-7.lua similarity index 64% rename from lib/commands/retryJob-6.lua rename to lib/commands/retryJob-7.lua index 463ac22c3..d9903d387 100644 --- a/lib/commands/retryJob-6.lua +++ b/lib/commands/retryJob-7.lua @@ -4,10 +4,11 @@ Input: KEYS[1] 'active', KEYS[2] 'wait' - KEYS[3] jobId + KEYS[3] jobId key KEYS[4] 'meta-paused' KEYS[5] 'paused' KEYS[6] stalled key + KEYS[7] 'priority' ARGV[1] pushCmd ARGV[2] jobId @@ -22,6 +23,11 @@ -2 - Job Not locked ]] local rcall = redis.call + +-- Includes +--- @include "includes/addJobWithPriority" +--- @include "includes/getTargetQueueList" + if rcall("EXISTS", KEYS[3]) == 1 then -- Check for job lock @@ -37,15 +43,17 @@ if rcall("EXISTS", KEYS[3]) == 1 then rcall("LREM", KEYS[1], 0, ARGV[2]) - local target - if rcall("EXISTS", KEYS[4]) ~= 1 then - target = KEYS[2] + local target = getTargetQueueList(KEYS[4], KEYS[2], KEYS[5]) + + local priority = tonumber(rcall("HGET", KEYS[3], "priority")) or 0 + + if priority == 0 then + -- LIFO or FIFO + rcall(ARGV[1], target, ARGV[2]) else - target = KEYS[5] + addJobWithPriority(KEYS[7], priority, ARGV[2], target) end - rcall(ARGV[1], target, ARGV[2]) - return 0 else return -1 diff --git a/lib/commands/script-loader.js b/lib/commands/script-loader.js index 0b4a2bf4c..4577b7829 100644 --- a/lib/commands/script-loader.js +++ b/lib/commands/script-loader.js @@ -16,13 +16,7 @@ class ScriptLoaderError extends Error { * The include stack */ - constructor( - message, - path, - stack = [], - line, - position = 0, - ) { + constructor(message, path, stack = [], line, position = 0) { super(message); // Ensure the name of this error is the same as the class name this.name = this.constructor.name; @@ -33,8 +27,7 @@ class ScriptLoaderError extends Error { } } -const isPossiblyMappedPath = (path) => - path && ['~', '<'].includes(path[0]); +const isPossiblyMappedPath = path => path && ['~', '<'].includes(path[0]); /** * Lua script loader with include support @@ -97,7 +90,7 @@ class ScriptLoader { throw new ScriptLoaderError( `No path mapping found for "${name}"`, scriptName, - stack, + stack ); } scriptName = path.join(mappedPath, scriptName.substring(p + 1)); @@ -114,19 +107,14 @@ class ScriptLoader { * multiple times, we make sure to load it only once. * @param stack - internal stack to prevent circular references */ - async resolveDependencies( - file, - cache, - isInclude = false, - stack = [], - ) { + async resolveDependencies(file, cache, isInclude = false, stack = []) { cache = cache ? cache : new Map(); if (stack.includes(file.path)) { throw new ScriptLoaderError( `circular reference: "${file.path}"`, file.path, - stack, + stack ); } stack.push(file.path); @@ -136,7 +124,7 @@ class ScriptLoader { const arr = content.slice(0, pos).split('\n'); return { line: arr.length, - column: arr[arr.length - 1].length + match.indexOf('@include') + 1, + column: arr[arr.length - 1].length + match.indexOf('@include') + 1 }; } @@ -144,7 +132,7 @@ class ScriptLoader { const pos = findPos(file.content, match); throw new ScriptLoaderError(msg, file.path, stack, pos.line, pos.column); } -// eslint-disable-next-line node/no-unpublished-require + // eslint-disable-next-line node/no-unpublished-require const minimatch = require('minimatch'); if (!minimatch) { @@ -152,7 +140,7 @@ class ScriptLoader { } const Minimatch = minimatch.Minimatch || class Empty {}; -// eslint-disable-next-line node/no-unpublished-require + // eslint-disable-next-line node/no-unpublished-require const fg = require('fast-glob'); if (!fg) { @@ -162,21 +150,21 @@ class ScriptLoader { const nonOp = () => { return ['']; }; - const glob = (fg) ? fg.glob : nonOp; + const glob = fg ? fg.glob : nonOp; - const hasMagic = (pattern) => { + const hasMagic = pattern => { if (!Array.isArray(pattern)) { pattern = [pattern]; } for (const p of pattern) { - if ((new Minimatch(p, GlobOptions)).hasMagic()) { + if (new Minimatch(p, GlobOptions).hasMagic()) { return true; } } return false; }; - const hasFilenamePattern = (path) => hasMagic(path); + const hasFilenamePattern = path => hasMagic(path); async function getFilenamesByPattern(pattern) { return glob(pattern, { dot: true }); @@ -198,14 +186,12 @@ class ScriptLoader { if (hasFilenamePattern(includeFilename)) { const filesMatched = await getFilenamesByPattern(includeFilename); - includePaths = filesMatched.map((x) => path.resolve(x)); + includePaths = filesMatched.map(x => path.resolve(x)); } else { includePaths = [includeFilename]; } - includePaths = includePaths.filter( - (file) => path.extname(file) === '.lua', - ); + includePaths = includePaths.filter(file => path.extname(file) === '.lua'); if (includePaths.length === 0) { raiseError(`include not found: "${reference}"`, match); @@ -216,9 +202,7 @@ class ScriptLoader { for (let i = 0; i < includePaths.length; i++) { const includePath = includePaths[i]; - const hasInclude = file.includes.find( - (x) => x.path === includePath, - ); + const hasInclude = file.includes.find(x => x.path === includePath); if (hasInclude) { /** @@ -229,7 +213,7 @@ class ScriptLoader { */ raiseError( `file "${reference}" already included in "${file.path}"`, - match, + match ); } @@ -243,7 +227,7 @@ class ScriptLoader { const buf = await readFile(includePath, { flag: 'r' }); childContent = buf.toString(); } catch (err) { - if ((err).code === 'ENOENT') { + if (err.code === 'ENOENT') { raiseError(`include not found: "${reference}"`, match); } else { throw err; @@ -257,7 +241,7 @@ class ScriptLoader { path: includePath, content: childContent, token, - includes: [], + includes: [] }; cache.set(includePath, includeMetadata); } else { @@ -292,13 +276,9 @@ class ScriptLoader { * @param content - the content of the script * @param cache - cache */ - async parseScript( - filename, - content, - cache, - ) { + async parseScript(filename, content, cache) { const { name, numberOfKeys } = splitFilename(filename); - const meta = cache ? cache.get(name):undefined; + const meta = cache ? cache.get(name) : undefined; if (meta && meta.content === content) { return meta; } @@ -308,7 +288,7 @@ class ScriptLoader { content, name, numberOfKeys, - includes: [], + includes: [] }; await this.resolveDependencies(fileInfo, cache); @@ -323,8 +303,8 @@ class ScriptLoader { interpolate(file, processed) { processed = processed || new Set(); let content = file.content; - file.includes.forEach((child) => { - const emitted = processed? processed.has(child.path):undefined; + file.includes.forEach(child => { + const emitted = processed ? processed.has(child.path) : undefined; const fragment = this.interpolate(child, processed); const replacement = emitted ? '' : fragment; @@ -337,7 +317,7 @@ class ScriptLoader { content = replaceAll(content, child.token, ''); } - if(processed){ + if (processed) { processed.add(child.path); } }); @@ -345,14 +325,11 @@ class ScriptLoader { return content; } - async loadCommand( - filename, - cache, - ) { + async loadCommand(filename, cache) { filename = path.resolve(filename); const { name: scriptName } = splitFilename(filename); - let script = cache? cache.get(scriptName) : undefined; + let script = cache ? cache.get(scriptName) : undefined; if (!script) { const content = (await readFile(filename)).toString(); script = await this.parseScript(filename, content, cache); @@ -363,7 +340,7 @@ class ScriptLoader { return { name, - options: { numberOfKeys: numberOfKeys, lua }, + options: { numberOfKeys: numberOfKeys, lua } }; } @@ -379,10 +356,7 @@ class ScriptLoader { * moveToFinish-3.lua * */ - async loadScripts( - dir, - cache, - ) { + async loadScripts(dir, cache) { dir = path.normalize(dir || __dirname); let commands = this.commandCache.get(dir); @@ -392,9 +366,7 @@ class ScriptLoader { const files = await readdir(dir); - const luaFiles = files.filter( - (file) => path.extname(file) === '.lua', - ); + const luaFiles = files.filter(file => path.extname(file) === '.lua'); if (luaFiles.length === 0) { /** @@ -424,11 +396,7 @@ class ScriptLoader { * @param client - redis client to attach script to * @param pathname - the path to the directory containing the scripts */ - async load( - client, - pathname, - cache, - ) { + async load(client, pathname, cache) { let paths = this.clientScripts.get(client); if (!paths) { paths = new Set(); @@ -438,11 +406,11 @@ class ScriptLoader { paths.add(pathname); const scripts = await this.loadScripts( pathname, - cache ? cache : new Map(), + cache ? cache : new Map() ); - scripts.forEach((command) => { + scripts.forEach(command => { // Only define the command if not already defined - if (!(client)[command.name]) { + if (!client[command.name]) { client.defineCommand(command.name, command.options); } }); @@ -501,9 +469,9 @@ function getCallerFile() { try { Error.prepareStackTrace = (_, stack) => stack; - const sites = (new Error().stack); + const sites = new Error().stack; const shiftResponse = sites.shift(); - const currentFile = shiftResponse ? shiftResponse.getFileName():undefined; + const currentFile = shiftResponse ? shiftResponse.getFileName() : undefined; while (sites.length) { const newShiftResponse = sites.shift(); @@ -523,7 +491,8 @@ function getCallerFile() { } function sha1(data) { - return createHash('sha1').update(data) + return createHash('sha1') + .update(data) .digest('hex'); } @@ -542,4 +511,4 @@ function removeEmptyLines(str) { module.exports = { ScriptLoaderError, ScriptLoader -} \ No newline at end of file +}; diff --git a/lib/queue.js b/lib/queue.js index 35728ffdf..67f94e895 100755 --- a/lib/queue.js +++ b/lib/queue.js @@ -146,18 +146,17 @@ const Queue = function Queue(name, url, opts) { this.clients = []; const loadCommands = (providedScripts, client) => { - const finalScripts = - providedScripts || (scripts); + const finalScripts = providedScripts || scripts; for (const property in finalScripts) { // Only define the command if not already defined - if (!(client)[finalScripts[property].name]) { - (client).defineCommand(finalScripts[property].name, { + if (!client[finalScripts[property].name]) { + client.defineCommand(finalScripts[property].name, { numberOfKeys: finalScripts[property].keys, - lua: finalScripts[property].content, + lua: finalScripts[property].content }); } } - } + }; const lazyClient = redisClientGetter(this, opts, (type, client) => { // bubble up Redis error events @@ -166,7 +165,7 @@ const Queue = function Queue(name, url, opts) { this.once('close', () => client.removeListener('error', handler)); if (type === 'client') { - this._initializing = (async() => loadCommands(commands, client))().then( + this._initializing = (async () => loadCommands(commands, client))().then( () => { debuglog(name + ' queue ready'); }, diff --git a/lib/scripts.js b/lib/scripts.js index 267d9c416..f3aed40a8 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -109,7 +109,7 @@ const scripts = { progressJson, JSON.stringify({ jobId: job.id, progress }) ]) - .then((code) => { + .then(code => { if (code < 0) { throw scripts.finishedErrors(code, job.id, 'updateProgress'); } @@ -124,10 +124,7 @@ const scripts = { }); const dataJson = JSON.stringify(data); - return queue.client - .updateData(keys, [ - dataJson - ]); + return queue.client.updateData(keys, [dataJson]); }, retryJobsArgs(queue, count) { @@ -426,6 +423,7 @@ const scripts = { queue.keys.delayed, queue.keys.wait, queue.keys.paused, + queue.keys['meta-paused'], queue.keys.priority ]; @@ -478,7 +476,7 @@ const scripts = { const jobId = job.id; const keys = _.map( - ['active', 'wait', jobId, 'meta-paused', 'paused', 'stalled'], + ['active', 'wait', jobId, 'meta-paused', 'paused', 'stalled', 'priority'], name => { return queue.toKey(name); } diff --git a/test/test_job.js b/test/test_job.js index 9a8b662c0..1846e28e8 100644 --- a/test/test_job.js +++ b/test/test_job.js @@ -206,7 +206,7 @@ describe('Job', () => { it('throws an error', async () => { const job = await Job.create(queue, { foo: 'bar' }); await job.remove(); - await job.update({baz: 'qux'}).catch(err => { + await job.update({ baz: 'qux' }).catch(err => { expect(err.message).to.be.equal('Missing key for job 1 updateData'); }); }); @@ -548,7 +548,9 @@ describe('Job', () => { const job = await Job.create(queue, { foo: 'bar' }); await job.remove(); await job.progress({ total: 120, completed: 40 }).catch(err => { - expect(err.message).to.be.equal('Missing key for job 1 updateProgress'); + expect(err.message).to.be.equal( + 'Missing key for job 1 updateProgress' + ); }); }); }); diff --git a/test/test_queue.js b/test/test_queue.js index 22061fb22..1bc5ce7fc 100644 --- a/test/test_queue.js +++ b/test/test_queue.js @@ -2454,6 +2454,60 @@ describe('Queue', () => { }); }); + describe('when job has more priority than delayed jobs', () => { + it('executes retried job first', done => { + queue = utils.buildQueue('test retries and priority'); + let id = 0; + queue.isReady().then(() => { + queue.process(async job => { + await delay(200); + if (job.attemptsMade === 0) { + id++; + expect(job.id).to.be.eql(`${id}`); + } + if (job.id == '1' && job.attemptsMade < 1) { + throw new Error('Not yet!'); + } + }); + + queue.add( + { foo: 'bar' }, + { + attempts: 2, + priority: 1 + } + ); + queue.add( + {}, + { + delay: 200, + priority: 2 + } + ); + queue.add( + {}, + { + delay: 200, + priority: 2 + } + ); + queue.add( + {}, + { + delay: 200, + priority: 2 + } + ); + }); + let count = 0; + queue.on('completed', () => { + if (count++ === 3) { + done(); + } + }); + }); + }); + it('should not retry a failed job more than the number of given attempts times', done => { queue = utils.buildQueue('test retries and backoffs'); let tries = 0;