Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Procedure level queue #1946

Merged
merged 2 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## [Unreleased][unreleased]

- Fixed API endpoints local queue settings applying

## [3.0.13][] - 2023-10-22

- Fix serve static not in cache (e.g. certbot challenge)
Expand Down
4 changes: 2 additions & 2 deletions lib/procedure.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class Procedure {

async enter() {
await this.application.semaphore.enter();
if (this.concurrency) {
if (this.semaphore) {
try {
await this.semaphore.enter();
} catch (error) {
Expand All @@ -53,7 +53,7 @@ class Procedure {

leave() {
this.application.semaphore.leave();
if (this.concurrency) this.semaphore.leave();
if (this.semaphore) this.semaphore.leave();
}

async invoke(context, args = {}) {
Expand Down
102 changes: 82 additions & 20 deletions test/procedure.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ metatests.testAsync('lib/procedure', async (test) => {
});

const application = {
server: {
semaphore: {
async enter() {},
leave() {},
},
Error,
semaphore: {
async enter() {},
leave() {},
},
};

Expand Down Expand Up @@ -62,11 +61,9 @@ metatests.testAsync('lib/procedure validate', async (test) => {

const application = {
Error,
server: {
semaphore: {
async enter() {},
leave() {},
},
semaphore: {
async enter() {},
leave() {},
},
};
const procedure = new Procedure(script, 'method', application);
Expand All @@ -93,11 +90,9 @@ metatests.testAsync('lib/procedure validate async', async (test) => {

const application = {
Error,
server: {
semaphore: {
async enter() {},
leave() {},
},
semaphore: {
async enter() {},
leave() {},
},
};
const procedure = new Procedure(script, 'method', application);
Expand All @@ -124,11 +119,9 @@ metatests.testAsync('lib/procedure timeout', async (test) => {

const application = {
Error,
server: {
semaphore: {
async enter() {},
leave() {},
},
semaphore: {
async enter() {},
leave() {},
},
};

Expand All @@ -141,3 +134,72 @@ metatests.testAsync('lib/procedure timeout', async (test) => {

await test.resolves(() => procedure.invoke({}, { waitTime: 50 }), DONE);
});

metatests.testAsync('lib/procedure queue', async (test) => {
const DONE = 'success';

const script = () => ({
queue: {
concurrency: 1,
size: 1,
timeout: 15,
},

method: async ({ waitTime }) =>
new Promise((resolve) => {
setTimeout(() => resolve(DONE), waitTime);
}),
});

const application = {
Error,
semaphore: {
async enter() {},
leave() {},
},
};

const rpc = async (proc, args) => {
let result = null;
await proc.enter();
try {
result = await proc.invoke({}, args);
} catch (error) {
throw new Error('Procedure.invoke failed. Check your script.method');
}
proc.leave();
return result;
};

const procedure = new Procedure(script, 'method', application);

await test.resolves(async () => {
const invokes = await Promise.allSettled([
rpc(procedure, { waitTime: 2 }),
rpc(procedure, { waitTime: 1 }),
]);
const last = invokes[1];
return last.value;
}, DONE);

await test.rejects(async () => {
const invokes = await Promise.allSettled([
rpc(procedure, { waitTime: 16 }),
rpc(procedure, { waitTime: 1 }),
]);
const last = invokes[1];
if (last.status === 'rejected') throw last.reason;
return last.value;
}, new Error('Semaphore timeout'));

await test.rejects(async () => {
const invokes = await Promise.allSettled([
rpc(procedure, { waitTime: 1 }),
rpc(procedure, { waitTime: 1 }),
rpc(procedure, { waitTime: 1 }),
]);
const last = invokes[2];
if (last.status === 'rejected') throw last.reason;
return last.value;
}, new Error('Semaphore queue is full'));
});
Loading