Skip to content

Commit

Permalink
Test concurrent submissions that affect same entity (#1202)
Browse files Browse the repository at this point in the history
  • Loading branch information
matthew-white authored Oct 3, 2024
1 parent 82696a0 commit e20a059
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 14 deletions.
52 changes: 40 additions & 12 deletions lib/worker/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const { min } = Math;
const { inspect } = require('util');
const { head } = require('ramda');
const { sql } = require('slonik');
const { timebound, runSequentially } = require('../util/promise');
const { timebound, runSequentially, block } = require('../util/promise');
const defaultJobMap = require('./jobs').jobs;
const { noop } = require('../util/util');

Expand Down Expand Up @@ -137,22 +137,50 @@ update audits set claimed=clock_timestamp() from q where audits.id=q.id returnin
for (let i = 0; i < count; i += 1) loop();
};

// for testing: chews through the event queue serially until there is nothing left to process.
const exhaust = async () => {
const runWait = (event) => new Promise((done) => {
if (!runJobs(event, () => { done(true); })) done(false);
});
while (await check().then(runWait)); // eslint-disable-line no-await-in-loop
};

return {
loop, loops,
// for testing
exhaust, run: runJobs, check
run: runJobs, check
};
};

const exhaust = (container) => workerQueue(container).exhaust();
// for testing: chews through the event queue serially until there is nothing left to process.
const exhaust = async (container) => {
const queue = workerQueue(container);
const runWait = (event) => new Promise((done) => {
if (!queue.run(event, () => { done(true); })) done(false);
});
while (await queue.check().then(runWait)); // eslint-disable-line no-await-in-loop
};

// For testing. Similar to exhaust(), but processes events in parallel rather
// than serially.
const exhaustParallel = async (container) => {
const queue = workerQueue(container);
// The count of events for which a job has started
let startCount = 0;
const run = (event, done) => { if (queue.run(event, done)) startCount += 1; };
// The count of events for which all jobs have finished
let doneCount = 0;
const [lock, unlock] = block();
const done = async () => {
// Processing the event may have logged more events. Here, we check for
// additional events, processing each immediately.
// eslint-disable-next-line no-await-in-loop
for (let event = await queue.check(); event != null; event = await queue.check())
run(event, done);
doneCount += 1;
if (doneCount === startCount) unlock();
};
// Collect all pending events, then process them in parallel.
const events = [];
// eslint-disable-next-line no-await-in-loop
for (let event = await queue.check(); event != null; event = await queue.check())
events.push(event);
for (const event of events) run(event, done);
if (startCount === 0) unlock();
return lock;
};

module.exports = { workerQueue, exhaust };
module.exports = { workerQueue, exhaust, exhaustParallel };

55 changes: 53 additions & 2 deletions test/integration/api/offline-entities.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
const appRoot = require('app-root-path');
const { testService } = require('../setup');
const { testService, testServiceFullTrx } = require('../setup');
const testData = require('../../data/xml');
const uuid = require('uuid').v4;
const should = require('should');
const { sql } = require('slonik');

const { exhaust } = require(appRoot + '/lib/worker/worker');
const { exhaust, exhaustParallel } = require(appRoot + '/lib/worker/worker');

const testOfflineEntities = (test) => testService(async (service, container) => {
const asAlice = await service.login('alice');
Expand Down Expand Up @@ -1342,4 +1342,55 @@ describe('Offline Entities', () => {
}));
});
});

describe('locking an entity while processing a related submission', function() {
this.timeout(8000);

// https://github.com/getodk/central/issues/705
it('should concurrently process an offline create + update @slow', testServiceFullTrx(async (service, container) => {
const asAlice = await service.login('alice');
await asAlice.post('/v1/projects/1/forms?publish=true')
.send(testData.forms.offlineEntity)
.set('Content-Type', 'application/xml')
.expect(200);
await exhaust(container);

// Set up the race condition.
const race = async () => {
const entityUuid = uuid();
await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions')
.send(testData.instances.offlineEntity.two
.replace('two', uuid())
.replace('12345678-1234-4123-8234-123456789ddd', entityUuid))
.set('Content-Type', 'application/xml')
.expect(200);
await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions')
.send(testData.instances.offlineEntity.two
.replace('two', uuid())
.replace('12345678-1234-4123-8234-123456789ddd', entityUuid)
.replace('create="1"', 'update="1"')
.replace('branchId=""', `branchId="${uuid()}"`)
.replace('baseVersion=""', 'baseVersion="1"'))
.set('Content-Type', 'application/xml')
.expect(200);
await exhaustParallel(container);

const { body: entity } = await asAlice.get(`/v1/projects/1/datasets/people/entities/${entityUuid}`)
.expect(200);
const backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`);
return entity.currentVersion.version === 2 && backlogCount === 0;
};
// Run the race condition 50 times. If I remove locking in
// Entities._processSubmissionEvent(), then successCount < 10. With
// locking in place, successCount === 50. It's because it's often the case
// that successCount > 0 even without locking that we run the race
// condition multiple times.
let successCount = 0;
for (let i = 0; i < 50; i += 1) {
// eslint-disable-next-line no-await-in-loop
if (await race()) successCount += 1;
}
successCount.should.equal(50);
}));
});
});

0 comments on commit e20a059

Please sign in to comment.