diff --git a/packages/core/src/scorekeeper.ts b/packages/core/src/scorekeeper.ts index 0337d2b0c..1b881f8c6 100644 --- a/packages/core/src/scorekeeper.ts +++ b/packages/core/src/scorekeeper.ts @@ -14,12 +14,10 @@ import { import Nominator from "./nominator"; import Claimer from "./claimer"; import Monitor from "./monitor"; -import { startMicroserviceJobs } from "./scorekeeper/jobs/MicroserviceJobs"; -import { startMonolithJobs } from "./scorekeeper/jobs/MonolithJobs"; -import { startScorekeeperJobs } from "./scorekeeper/jobs/ScorekeeperJobs"; import { startRound } from "./scorekeeper/Round"; -import { startMainScorekeeperJob } from "./cron"; import { registerHandler } from "./scorekeeper/RegisterHandler"; +import { jobsMetadata } from "./scorekeeper/jobs/Jobs"; +import { JobsFactory } from "./scorekeeper/jobs/JobsFactory"; // import { monitorJob } from "./jobs"; export type NominatorGroup = Config.NominatorConfig[]; @@ -251,42 +249,21 @@ export default class ScoreKeeper { } // Start all Cron Jobs - try { - // Start Jobs in either microservice or monolith mode - if (this.config?.redis?.host && this.config?.redis?.port) { - await startMicroserviceJobs(this.config, this.chaindata); - } else { - await startMonolithJobs(this.config, this.chaindata, this.constraints); - } - - // Start all scorekeeper / core jobs - await startScorekeeperJobs( - this.handler, - this.nominatorGroups, - this.config, - this.bot, - this.claimer, - this.chaindata, - ); - } catch (e) { - logger.warn( - `There was an error running some cron jobs...`, - scorekeeperLabel, - ); - logger.error(e); - } - logger.info(`going to start mainCron: `, scorekeeperLabel); - await startMainScorekeeperJob( - this.config, - this.ending, - this.chaindata, - this.nominatorGroups, - this.nominating, - this.currentEra, - this.bot, - this.constraints, - this.handler, - this.currentTargets, - ); + const metadata: jobsMetadata = { + config: this.config, + ending: this.ending, + chainData: this.chaindata, + nominatorGroups: this.nominatorGroups, + nominating: this.nominating, + currentEra: this.currentEra, + bot: this.bot, + constraints: this.constraints, + handler: this.handler, + currentTargets: this.currentTargets, + claimer: this.claimer, + }; + + const jobs = await JobsFactory.makeJobs(metadata); + jobs.startJobs(); } } diff --git a/packages/core/src/scorekeeper/jobs/ScorekeeperJobs.ts b/packages/core/src/scorekeeper/jobs/Jobs.ts similarity index 64% rename from packages/core/src/scorekeeper/jobs/ScorekeeperJobs.ts rename to packages/core/src/scorekeeper/jobs/Jobs.ts index 07def641a..a95cb8c6e 100644 --- a/packages/core/src/scorekeeper/jobs/ScorekeeperJobs.ts +++ b/packages/core/src/scorekeeper/jobs/Jobs.ts @@ -1,16 +1,92 @@ -/** - * Functions for staring Scorekeeper jobs - * - * @function ScorekeeperJobs - */ +import { + ApiHandler, + ChainData, + Config, + Constraints, + logger, +} from "@1kv/common"; +import { SpawnedNominatorGroup, scorekeeperLabel } from "../../scorekeeper"; import { startCancelCron, startExecutionJob, + startMainScorekeeperJob, startRewardClaimJob, startStaleNominationCron, startUnclaimedEraJob, } from "../../cron"; +import Claimer from "../../claimer"; + +export type jobsMetadata = { + config: Config.ConfigSchema; + ending: boolean; + chainData: ChainData; + nominatorGroups: Array; + nominating: boolean; + currentEra: number; + bot: any; + constraints: Constraints.OTV; + handler: ApiHandler; + currentTargets: string[]; + claimer: Claimer; +}; + +export abstract class Jobs { + constructor(protected readonly metadata: jobsMetadata) {} + + abstract _startSpecificJobs(): Promise; + + public startJobs = async (): Promise => { + const { + handler, + nominatorGroups, + config, + bot, + claimer, + chainData, + ending, + nominating, + currentEra, + constraints, + currentTargets, + } = this.metadata; + + try { + await this._startSpecificJobs(); + + // Start all scorekeeper / core jobs + await startScorekeeperJobs( + //TODO: find better name for this function + handler, + nominatorGroups, + config, + bot, + claimer, + chainData, + ); + } catch (e) { + logger.warn( + `There was an error running some cron jobs...`, + scorekeeperLabel, + ); + logger.error(e); + } + logger.info(`going to start mainCron: `, scorekeeperLabel); + await startMainScorekeeperJob( + //TODO: find better name for this function + config, + ending, + chainData, + nominatorGroups, + nominating, + currentEra, + bot, + constraints, + handler, + currentTargets, + ); + }; +} /** * Orchestrates the initiation of various jobs related to scorekeeping in a blockchain context. This function @@ -46,7 +122,7 @@ import { * * await startScorekeeperJobs(handler, nominatorGroups, config, bot, claimer, chaindata); */ -export const startScorekeeperJobs = async ( +const startScorekeeperJobs = async ( handler, nominatorGroups, config, diff --git a/packages/core/src/scorekeeper/jobs/JobsFactory.ts b/packages/core/src/scorekeeper/jobs/JobsFactory.ts new file mode 100644 index 000000000..a83d2479f --- /dev/null +++ b/packages/core/src/scorekeeper/jobs/JobsFactory.ts @@ -0,0 +1,11 @@ +import { JobsMicroservice } from "./JobsMicroservice"; +import { JobsMonolith } from "./JobsMonolith"; +import { jobsMetadata, Jobs } from "./Jobs"; + +export class JobsFactory { + static makeJobs = async (metadata: jobsMetadata): Promise => { + if (!metadata.config?.redis?.host && metadata.config?.redis?.port) + return new JobsMicroservice(metadata); + else return new JobsMonolith(metadata); + }; +} diff --git a/packages/core/src/scorekeeper/jobs/JobsMicroservice.ts b/packages/core/src/scorekeeper/jobs/JobsMicroservice.ts new file mode 100644 index 000000000..3f53f3472 --- /dev/null +++ b/packages/core/src/scorekeeper/jobs/JobsMicroservice.ts @@ -0,0 +1,80 @@ +import { logger } from "@1kv/common"; + +import { otvWorker } from "@1kv/worker"; +import { scorekeeperLabel } from "../../scorekeeper"; +import { Jobs } from "./Jobs"; + +export class JobsMicroservice extends Jobs { + _startSpecificJobs = async (): Promise => { + const { config, chainData } = this.metadata; + if (!config?.redis?.host || !config?.redis?.port) { + logger.error( + `No redis config found. Microservice Jobs will not be started.`, + scorekeeperLabel, + ); + return; + } + try { + // Jobs get run in separate worker + logger.info(`Starting bullmq Queues and Workers....`, scorekeeperLabel); + const releaseMonitorQueue = + await otvWorker.queues.createReleaseMonitorQueue( + config.redis.host, + config.redis.port, + ); + const constraintsQueue = await otvWorker.queues.createConstraintsQueue( + config.redis.host, + config.redis.port, + ); + const chaindataQueue = await otvWorker.queues.createChainDataQueue( + config.redis.host, + config.redis.port, + ); + const blockQueue = await otvWorker.queues.createBlockQueue( + config.redis.host, + config.redis.port, + ); + + const removeRepeatableJobs = true; + if (removeRepeatableJobs) { + logger.info(`remove jobs: ${removeRepeatableJobs}`, scorekeeperLabel); + // Remove any previous repeatable jobs + await otvWorker.queues.removeRepeatableJobsFromQueues([ + releaseMonitorQueue, + constraintsQueue, + chaindataQueue, + blockQueue, + ]); + } + + const obliterateQueues = false; + if (obliterateQueues) { + await otvWorker.queues.obliterateQueues([ + releaseMonitorQueue, + constraintsQueue, + chaindataQueue, + blockQueue, + ]); + } + + // Add repeatable jobs to the queues + // Queues need to have different repeat time intervals + await otvWorker.queues.addReleaseMonitorJob(releaseMonitorQueue, 60000); + await otvWorker.queues.addValidityJob(constraintsQueue, 1000001); + await otvWorker.queues.addScoreJob(constraintsQueue, 100002); + await otvWorker.queues.addActiveValidatorJob(chaindataQueue, 100003); + await otvWorker.queues.addEraPointsJob(chaindataQueue, 100006); + await otvWorker.queues.addEraStatsJob(chaindataQueue, 110008); + await otvWorker.queues.addInclusionJob(chaindataQueue, 100008); + await otvWorker.queues.addNominatorJob(chaindataQueue, 100009); + await otvWorker.queues.addSessionKeyJob(chaindataQueue, 100010); + await otvWorker.queues.addValidatorPrefJob(chaindataQueue, 100101); + await otvWorker.queues.addAllBlocks(blockQueue, chainData); + // TODO update this as queue job + // await startLocationStatsJob(this.config, this.chaindata); + } catch (e) { + logger.error(e.toString(), scorekeeperLabel); + logger.error("Error starting microservice jobs", scorekeeperLabel); + } + }; +} diff --git a/packages/core/src/scorekeeper/jobs/JobsMonolith.ts b/packages/core/src/scorekeeper/jobs/JobsMonolith.ts new file mode 100644 index 000000000..81aed132a --- /dev/null +++ b/packages/core/src/scorekeeper/jobs/JobsMonolith.ts @@ -0,0 +1,41 @@ +import { logger } from "@1kv/common"; + +import { scorekeeperLabel } from "../../scorekeeper"; +import { Jobs } from "./Jobs"; +import { + startValidatityJob, + startScoreJob, + startEraPointsJob, + startActiveValidatorJob, + startInclusionJob, + startSessionKeyJob, + startValidatorPrefJob, + startEraStatsJob, + startLocationStatsJob, + startNominatorJob, + startBlockDataJob, +} from "../../cron"; +import { monitorJob } from "../../jobs"; + +export class JobsMonolith extends Jobs { + _startSpecificJobs = async (): Promise => { + const { config, constraints, chainData } = this.metadata; + try { + await monitorJob(); + await startValidatityJob(config, constraints); + await startScoreJob(config, constraints); + await startEraPointsJob(config, chainData); + await startActiveValidatorJob(config, chainData); + await startInclusionJob(config, chainData); + await startSessionKeyJob(config, chainData); + await startValidatorPrefJob(config, chainData); + await startEraStatsJob(config, chainData); + await startLocationStatsJob(config, chainData); + await startNominatorJob(config, chainData); + await startBlockDataJob(config, chainData); + } catch (e) { + logger.error(e.toString(), scorekeeperLabel); + logger.error("Error starting monolith jobs", scorekeeperLabel); + } + }; +} diff --git a/packages/core/src/scorekeeper/jobs/MicroserviceJobs.ts b/packages/core/src/scorekeeper/jobs/MicroserviceJobs.ts deleted file mode 100644 index 04031e1d9..000000000 --- a/packages/core/src/scorekeeper/jobs/MicroserviceJobs.ts +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Functions for staring microservice jobs - * - * @function MicroserviceJobs - */ - -import { logger } from "@1kv/common"; -import { otvWorker } from "@1kv/worker"; -import { scorekeeperLabel } from "../../scorekeeper"; - -/** - * Initializes and starts various microservice jobs using BullMQ queues based on the provided configuration and chain data. - * It checks for Redis configuration and establishes queues for different types of jobs such as release monitoring, constraints, - * chain data processing, and block processing. This function also handles the removal of repeatable jobs if configured to do so - * and can obliterate existing queues. Each type of job is added to its respective queue with a specified repeat interval. - * - * @param {Object} config - The configuration object which includes Redis connection details among other configurations. - * @param {Object} chaindata - The chain data object required for initializing certain jobs, especially those related to block processing. - * @returns {Promise} A promise that resolves when all the jobs have been successfully initialized and started, or rejects with an error if the process fails. - * @throws {Error} Throws an error if the Redis configuration is missing or if there's an issue starting the microservice jobs. - * - * @example - * const config = { - * redis: { - * host: 'localhost', - * port: 6379 - * } - * }; - * const chaindata = new ChainData(); - * await startMicroserviceJobs(config, chaindata); - */ -export const startMicroserviceJobs = async ( - config, - chaindata, -): Promise => { - if (!config?.redis?.host || !config?.redis?.port) { - logger.error( - `No redis config found. Microservice Jobs will not be started.`, - scorekeeperLabel, - ); - return; - } - try { - // Jobs get run in separate worker - logger.info(`Starting bullmq Queues and Workers....`, scorekeeperLabel); - const releaseMonitorQueue = - await otvWorker.queues.createReleaseMonitorQueue( - config.redis.host, - config.redis.port, - ); - const constraintsQueue = await otvWorker.queues.createConstraintsQueue( - config.redis.host, - config.redis.port, - ); - const chaindataQueue = await otvWorker.queues.createChainDataQueue( - config.redis.host, - config.redis.port, - ); - const blockQueue = await otvWorker.queues.createBlockQueue( - config.redis.host, - config.redis.port, - ); - - const removeRepeatableJobs = true; - if (removeRepeatableJobs) { - logger.info(`remove jobs: ${removeRepeatableJobs}`, scorekeeperLabel); - // Remove any previous repeatable jobs - await otvWorker.queues.removeRepeatableJobsFromQueues([ - releaseMonitorQueue, - constraintsQueue, - chaindataQueue, - blockQueue, - ]); - } - - const obliterateQueues = false; - if (obliterateQueues) { - await otvWorker.queues.obliterateQueues([ - releaseMonitorQueue, - constraintsQueue, - chaindataQueue, - blockQueue, - ]); - } - - // Add repeatable jobs to the queues - // Queues need to have different repeat time intervals - await otvWorker.queues.addReleaseMonitorJob(releaseMonitorQueue, 60000); - await otvWorker.queues.addValidityJob(constraintsQueue, 1000001); - await otvWorker.queues.addScoreJob(constraintsQueue, 100002); - await otvWorker.queues.addActiveValidatorJob(chaindataQueue, 100003); - await otvWorker.queues.addEraPointsJob(chaindataQueue, 100006); - await otvWorker.queues.addEraStatsJob(chaindataQueue, 110008); - await otvWorker.queues.addInclusionJob(chaindataQueue, 100008); - await otvWorker.queues.addNominatorJob(chaindataQueue, 100009); - await otvWorker.queues.addSessionKeyJob(chaindataQueue, 100010); - await otvWorker.queues.addValidatorPrefJob(chaindataQueue, 100101); - await otvWorker.queues.addAllBlocks(blockQueue, chaindata); - // TODO update this as queue job - // await startLocationStatsJob(this.config, this.chaindata); - } catch (e) { - logger.error(e.toString(), scorekeeperLabel); - logger.error("Error starting microservice jobs", scorekeeperLabel); - } -}; diff --git a/packages/core/src/scorekeeper/jobs/MonolithJobs.ts b/packages/core/src/scorekeeper/jobs/MonolithJobs.ts deleted file mode 100644 index c48392441..000000000 --- a/packages/core/src/scorekeeper/jobs/MonolithJobs.ts +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Functions for staring Monolith jobs - * - * @function MonolithJobs - */ - -import { monitorJob } from "../../jobs"; -import { - startActiveValidatorJob, - startBlockDataJob, - startEraPointsJob, - startEraStatsJob, - startInclusionJob, - startLocationStatsJob, - startNominatorJob, - startScoreJob, - startSessionKeyJob, - startValidatityJob, - startValidatorPrefJob, -} from "../../cron"; -import { logger } from "@1kv/common"; -import { scorekeeperLabel } from "../../scorekeeper"; - -export const startMonolithJobs = async ( - config, - chaindata, - constraints, -): Promise => { - try { - await monitorJob(); - await startValidatityJob(config, constraints); - await startScoreJob(config, constraints); - await startEraPointsJob(config, chaindata); - await startActiveValidatorJob(config, chaindata); - await startInclusionJob(config, chaindata); - await startSessionKeyJob(config, chaindata); - await startValidatorPrefJob(config, chaindata); - await startEraStatsJob(config, chaindata); - await startLocationStatsJob(config, chaindata); - await startNominatorJob(config, chaindata); - await startBlockDataJob(config, chaindata); - } catch (e) { - logger.error(e.toString(), scorekeeperLabel); - logger.error("Error starting monolith jobs", scorekeeperLabel); - } -};