diff --git a/app/api/queue.v2/configuration/factories.ts b/app/api/queue.v2/configuration/factories.ts index b356f33642..a6712e5ee5 100644 --- a/app/api/queue.v2/configuration/factories.ts +++ b/app/api/queue.v2/configuration/factories.ts @@ -5,7 +5,7 @@ import { getSharedClient, getSharedConnection, } from 'api/common.v2/database/getConnectionForCurrentTenant'; -import { DefaultLogger } from 'api/log.v2/infrastructure/StandardLogger'; +import { DefaultLogger, SystemLogger } from 'api/log.v2/infrastructure/StandardLogger'; import { JobsRouter } from '../infrastructure/JobsRouter'; import { MongoQueueAdapter } from '../infrastructure/MongoQueueAdapter'; import { NamespacedDispatcher } from '../infrastructure/NamespacedDispatcher'; @@ -13,7 +13,7 @@ import { NamespacedDispatcher } from '../infrastructure/NamespacedDispatcher'; export function DefaultQueueAdapter() { return new MongoQueueAdapter( getSharedConnection(), - new MongoTransactionManager(getSharedClient(), DefaultLogger()) + new MongoTransactionManager(getSharedClient(), SystemLogger()) ); } diff --git a/app/queueRegistry.ts b/app/queueRegistry.ts index c25931535b..fe4fab9aa6 100644 --- a/app/queueRegistry.ts +++ b/app/queueRegistry.ts @@ -1,4 +1,4 @@ -import { Dispatchable } from 'api/queue.v2/application/contracts/Dispatchable'; +import { Dispatchable, HeartbeatCallback } from 'api/queue.v2/application/contracts/Dispatchable'; import { DispatchableClass } from 'api/queue.v2/application/contracts/JobsDispatcher'; import { UpdateTemplateRelationshipPropertiesJob as createUpdateTemplateRelationshipPropertiesJob, @@ -7,6 +7,23 @@ import { import { UpdateRelationshipPropertiesJob } from 'api/relationships.v2/services/propertyUpdateStrategies/UpdateRelationshipPropertiesJob'; import { UpdateTemplateRelationshipPropertiesJob } from 'api/relationships.v2/services/propertyUpdateStrategies/UpdateTemplateRelationshipPropertiesJob'; +function randomIntFromInterval(min, max) { + // min and max included + return Math.floor(Math.random() * (max - min + 1) + min); +} + +export class TestJob implements Dispatchable { + static BATCH_SIZE = 200; + + constructor() {} + + async handleDispatch(_heartbeat: HeartbeatCallback) { + await new Promise(resolve => { + setTimeout(resolve, randomIntFromInterval(1000, 2000)); + }); + } +} + export function registerJobs( register: ( dispatchable: DispatchableClass, @@ -15,4 +32,5 @@ export function registerJobs( ) { register(UpdateRelationshipPropertiesJob, async () => createUpdateRelationshipPropertiesJob()); register(UpdateTemplateRelationshipPropertiesJob, createUpdateTemplateRelationshipPropertiesJob); + register(TestJob, async () => new TestJob()); } diff --git a/scripts/scripts.v2/dispatchTestJobs.ts b/scripts/scripts.v2/dispatchTestJobs.ts new file mode 100644 index 0000000000..2dd030f6fd --- /dev/null +++ b/scripts/scripts.v2/dispatchTestJobs.ts @@ -0,0 +1,23 @@ +import { DefaultDispatcher } from 'api/queue.v2/configuration/factories'; +import { TestJob } from '../../app/queueRegistry'; +import { DB } from 'api/odm'; +import { config } from 'api/config'; + +let dbAuth = {}; + +if (process.env.DBUSER) { + dbAuth = { + auth: { authSource: 'admin' }, + user: process.env.DBUSER, + pass: process.env.DBPASS, + }; +} + +(async () => { + await DB.connect(config.DBHOST, dbAuth); + const dispatcher = await DefaultDispatcher('default'); + for (let i = 0; i < 100; i++) { + await dispatcher.dispatch(TestJob, {}); + } + await DB.disconnect(); +})();