Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement Factory and Template patterns #2607

Merged
merged 2 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 18 additions & 41 deletions packages/core/src/scorekeeper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@ import {
import Nominator from "./nominator";
import Claimer from "./claimer";
import Monitor from "./monitor";
import { startMicroserviceJobs } from "./scorekeeper/jobs/MicroserviceJobs";
import { startMonolithJobs } from "./scorekeeper/jobs/MonolithJobs";
import { startScorekeeperJobs } from "./scorekeeper/jobs/ScorekeeperJobs";
import { startRound } from "./scorekeeper/Round";
import { startMainScorekeeperJob } from "./cron";
import { registerHandler } from "./scorekeeper/RegisterHandler";
import { jobsMetadata } from "./scorekeeper/jobs/Jobs";
import { JobsFactory } from "./scorekeeper/jobs/JobsFactory";
// import { monitorJob } from "./jobs";

export type NominatorGroup = Config.NominatorConfig[];
Expand Down Expand Up @@ -251,42 +249,21 @@ export default class ScoreKeeper {
}

// Start all Cron Jobs
try {
// Start Jobs in either microservice or monolith mode
if (this.config?.redis?.host && this.config?.redis?.port) {
await startMicroserviceJobs(this.config, this.chaindata);
} else {
await startMonolithJobs(this.config, this.chaindata, this.constraints);
}

// Start all scorekeeper / core jobs
await startScorekeeperJobs(
this.handler,
this.nominatorGroups,
this.config,
this.bot,
this.claimer,
this.chaindata,
);
} catch (e) {
logger.warn(
`There was an error running some cron jobs...`,
scorekeeperLabel,
);
logger.error(e);
}
logger.info(`going to start mainCron: `, scorekeeperLabel);
await startMainScorekeeperJob(
this.config,
this.ending,
this.chaindata,
this.nominatorGroups,
this.nominating,
this.currentEra,
this.bot,
this.constraints,
this.handler,
this.currentTargets,
);
const metadata: jobsMetadata = {
config: this.config,
ending: this.ending,
chainData: this.chaindata,
nominatorGroups: this.nominatorGroups,
nominating: this.nominating,
currentEra: this.currentEra,
bot: this.bot,
constraints: this.constraints,
handler: this.handler,
currentTargets: this.currentTargets,
claimer: this.claimer,
};

const jobs = await JobsFactory.makeJobs(metadata);
jobs.startJobs();
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,92 @@
/**
* Functions for staring Scorekeeper jobs
*
* @function ScorekeeperJobs
*/
import {
ApiHandler,
ChainData,
Config,
Constraints,
logger,
} from "@1kv/common";

import { SpawnedNominatorGroup, scorekeeperLabel } from "../../scorekeeper";
import {
startCancelCron,
startExecutionJob,
startMainScorekeeperJob,
startRewardClaimJob,
startStaleNominationCron,
startUnclaimedEraJob,
} from "../../cron";
import Claimer from "../../claimer";

export type jobsMetadata = {
config: Config.ConfigSchema;
ending: boolean;
chainData: ChainData;
nominatorGroups: Array<SpawnedNominatorGroup>;
nominating: boolean;
currentEra: number;
bot: any;
constraints: Constraints.OTV;
handler: ApiHandler;
currentTargets: string[];
claimer: Claimer;
};

export abstract class Jobs {
constructor(protected readonly metadata: jobsMetadata) {}

abstract _startSpecificJobs(): Promise<void>;

public startJobs = async (): Promise<void> => {
const {
handler,
nominatorGroups,
config,
bot,
claimer,
chainData,
ending,
nominating,
currentEra,
constraints,
currentTargets,
} = this.metadata;

try {
await this._startSpecificJobs();

// Start all scorekeeper / core jobs
await startScorekeeperJobs(
//TODO: find better name for this function
handler,
nominatorGroups,
config,
bot,
claimer,
chainData,
);
} catch (e) {
logger.warn(
`There was an error running some cron jobs...`,
scorekeeperLabel,
);
logger.error(e);
}
logger.info(`going to start mainCron: `, scorekeeperLabel);
await startMainScorekeeperJob(
//TODO: find better name for this function
config,
ending,
chainData,
nominatorGroups,
nominating,
currentEra,
bot,
constraints,
handler,
currentTargets,
);
};
}

/**
* Orchestrates the initiation of various jobs related to scorekeeping in a blockchain context. This function
Expand Down Expand Up @@ -46,7 +122,7 @@ import {
*
* await startScorekeeperJobs(handler, nominatorGroups, config, bot, claimer, chaindata);
*/
export const startScorekeeperJobs = async (
const startScorekeeperJobs = async (
handler,
nominatorGroups,
config,
Expand Down
11 changes: 11 additions & 0 deletions packages/core/src/scorekeeper/jobs/JobsFactory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { JobsMicroservice } from "./JobsMicroservice";
import { JobsMonolith } from "./JobsMonolith";
import { jobsMetadata, Jobs } from "./Jobs";

export class JobsFactory {
static makeJobs = async (metadata: jobsMetadata): Promise<Jobs> => {
if (!metadata.config?.redis?.host && metadata.config?.redis?.port)
return new JobsMicroservice(metadata);
else return new JobsMonolith(metadata);
};
}
80 changes: 80 additions & 0 deletions packages/core/src/scorekeeper/jobs/JobsMicroservice.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import { logger } from "@1kv/common";

import { otvWorker } from "@1kv/worker";
import { scorekeeperLabel } from "../../scorekeeper";
import { Jobs } from "./Jobs";

export class JobsMicroservice extends Jobs {
_startSpecificJobs = async (): Promise<void> => {
const { config, chainData } = this.metadata;
if (!config?.redis?.host || !config?.redis?.port) {
logger.error(
`No redis config found. Microservice Jobs will not be started.`,
scorekeeperLabel,
);
return;
}
try {
// Jobs get run in separate worker
logger.info(`Starting bullmq Queues and Workers....`, scorekeeperLabel);
const releaseMonitorQueue =
await otvWorker.queues.createReleaseMonitorQueue(
config.redis.host,
config.redis.port,
);
const constraintsQueue = await otvWorker.queues.createConstraintsQueue(
config.redis.host,
config.redis.port,
);
const chaindataQueue = await otvWorker.queues.createChainDataQueue(
config.redis.host,
config.redis.port,
);
const blockQueue = await otvWorker.queues.createBlockQueue(
config.redis.host,
config.redis.port,
);

const removeRepeatableJobs = true;
if (removeRepeatableJobs) {
logger.info(`remove jobs: ${removeRepeatableJobs}`, scorekeeperLabel);
// Remove any previous repeatable jobs
await otvWorker.queues.removeRepeatableJobsFromQueues([
releaseMonitorQueue,
constraintsQueue,
chaindataQueue,
blockQueue,
]);
}

const obliterateQueues = false;
if (obliterateQueues) {
await otvWorker.queues.obliterateQueues([
releaseMonitorQueue,
constraintsQueue,
chaindataQueue,
blockQueue,
]);
}

// Add repeatable jobs to the queues
// Queues need to have different repeat time intervals
await otvWorker.queues.addReleaseMonitorJob(releaseMonitorQueue, 60000);
await otvWorker.queues.addValidityJob(constraintsQueue, 1000001);
await otvWorker.queues.addScoreJob(constraintsQueue, 100002);
await otvWorker.queues.addActiveValidatorJob(chaindataQueue, 100003);
await otvWorker.queues.addEraPointsJob(chaindataQueue, 100006);
await otvWorker.queues.addEraStatsJob(chaindataQueue, 110008);
await otvWorker.queues.addInclusionJob(chaindataQueue, 100008);
await otvWorker.queues.addNominatorJob(chaindataQueue, 100009);
await otvWorker.queues.addSessionKeyJob(chaindataQueue, 100010);
await otvWorker.queues.addValidatorPrefJob(chaindataQueue, 100101);
await otvWorker.queues.addAllBlocks(blockQueue, chainData);
// TODO update this as queue job
// await startLocationStatsJob(this.config, this.chaindata);
} catch (e) {
logger.error(e.toString(), scorekeeperLabel);
logger.error("Error starting microservice jobs", scorekeeperLabel);
}
};
}
41 changes: 41 additions & 0 deletions packages/core/src/scorekeeper/jobs/JobsMonolith.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { logger } from "@1kv/common";

import { scorekeeperLabel } from "../../scorekeeper";
import { Jobs } from "./Jobs";
import {
startValidatityJob,
startScoreJob,
startEraPointsJob,
startActiveValidatorJob,
startInclusionJob,
startSessionKeyJob,
startValidatorPrefJob,
startEraStatsJob,
startLocationStatsJob,
startNominatorJob,
startBlockDataJob,
} from "../../cron";
import { monitorJob } from "../../jobs";

export class JobsMonolith extends Jobs {
_startSpecificJobs = async (): Promise<void> => {
const { config, constraints, chainData } = this.metadata;
try {
await monitorJob();
await startValidatityJob(config, constraints);
await startScoreJob(config, constraints);
await startEraPointsJob(config, chainData);
await startActiveValidatorJob(config, chainData);
await startInclusionJob(config, chainData);
await startSessionKeyJob(config, chainData);
await startValidatorPrefJob(config, chainData);
await startEraStatsJob(config, chainData);
await startLocationStatsJob(config, chainData);
await startNominatorJob(config, chainData);
await startBlockDataJob(config, chainData);
} catch (e) {
logger.error(e.toString(), scorekeeperLabel);
logger.error("Error starting monolith jobs", scorekeeperLabel);
}
};
}
Loading