From f4e233c448738a0e978d3e719887f36c0849dc3d Mon Sep 17 00:00:00 2001 From: Milos <161627443+BEdev24@users.noreply.github.com> Date: Sat, 4 Jan 2025 13:04:22 +0100 Subject: [PATCH] fix/announce-cid-with-cronjob: Provided announcing cids with cronjob process (#528) --- ipfs-service/.eslintrc.cjs | 25 +++ ipfs-service/example.env | 5 +- ipfs-service/src/app.controller.ts | 2 +- ipfs-service/src/app.module.ts | 17 +- ipfs-service/src/app.service.ts | 147 ++++++++---------- ipfs-service/src/bullmq/bullmq.module.ts | 34 ++-- .../src/constants/bullmq.constants.ts | 11 +- ipfs-service/src/dto/ipfs.dto.ts | 9 +- .../src/jobs/impl/provide-all-cids.job.ts | 25 +++ .../src/jobs/impl/prune-peer-store.job.ts | 3 +- ipfs-service/src/main.ts | 21 +-- ipfs-service/src/mapper/ipfs.mapper.ts | 8 +- .../processors/provide-all-cids.processor.ts | 43 +++++ .../processors/provide-to-dht.processor.ts | 16 +- .../processors/prune-peer-store.processor.ts | 15 +- .../producers/provide-all-cids.producer.ts | 34 ++++ .../producers/provide-to-dht.producer.ts | 30 ++-- .../producers/prune-peer-store.producer.ts | 24 +-- ipfs-service/tsconfig.json | 4 +- 19 files changed, 318 insertions(+), 155 deletions(-) create mode 100644 ipfs-service/.eslintrc.cjs create mode 100644 ipfs-service/src/jobs/impl/provide-all-cids.job.ts create mode 100644 ipfs-service/src/queues/processors/provide-all-cids.processor.ts create mode 100644 ipfs-service/src/queues/producers/provide-all-cids.producer.ts diff --git a/ipfs-service/.eslintrc.cjs b/ipfs-service/.eslintrc.cjs new file mode 100644 index 00000000..259de13c --- /dev/null +++ b/ipfs-service/.eslintrc.cjs @@ -0,0 +1,25 @@ +module.exports = { + parser: '@typescript-eslint/parser', + parserOptions: { + project: 'tsconfig.json', + tsconfigRootDir: __dirname, + sourceType: 'module', + }, + plugins: ['@typescript-eslint/eslint-plugin'], + extends: [ + 'plugin:@typescript-eslint/recommended', + 'plugin:prettier/recommended', + ], + root: true, + env: { + node: true, + jest: true, + }, + ignorePatterns: ['.eslintrc.js'], + rules: { + '@typescript-eslint/interface-name-prefix': 'off', + '@typescript-eslint/explicit-function-return-type': 'off', + '@typescript-eslint/explicit-module-boundary-types': 'off', + '@typescript-eslint/no-explicit-any': 'off', + }, +}; diff --git a/ipfs-service/example.env b/ipfs-service/example.env index e52780f3..2cdaab92 100644 --- a/ipfs-service/example.env +++ b/ipfs-service/example.env @@ -22,4 +22,7 @@ DHT_QUEUE_BACKOFF_TYPE=exponential DHT_QUEUE_BACKOFF_DELAY=1000 MAX_PEERS=5000 -PRUNE_PEER_STORE_INTERVAL=0 0 */12 * * * # every 12h \ No newline at end of file +PRUNE_PEER_STORE_INTERVAL=0 0 */12 * * * # every 12h + +PROVIDE_ALL_CIDS_PER_PAGE=5 +PROVIDE_ALL_CIDS_INTERVAL=0 0 */12 * * * # every 12h \ No newline at end of file diff --git a/ipfs-service/src/app.controller.ts b/ipfs-service/src/app.controller.ts index 18471ad5..463c0b50 100644 --- a/ipfs-service/src/app.controller.ts +++ b/ipfs-service/src/app.controller.ts @@ -38,7 +38,7 @@ export class AppController { if (!doc) { throw new NotFoundException(`Document with cid: ${cid} not found`); } - return doc + return doc; } @Get('ipns/url') diff --git a/ipfs-service/src/app.module.ts b/ipfs-service/src/app.module.ts index cb4a3325..a3f7c457 100644 --- a/ipfs-service/src/app.module.ts +++ b/ipfs-service/src/app.module.ts @@ -6,12 +6,19 @@ import { BullModule } from '@nestjs/bullmq'; import { BullmqModule } from './bullmq/bullmq.module.js'; import { ProvideToDHTProcessor } from './queues/processors/provide-to-dht.processor.js'; import { ProvideToDHTProducer } from './queues/producers/provide-to-dht.producer.js'; -import { QUEUE_NAME_PROVIDE_TO_DHT, QUEUE_NAME_PRUNE_PEER_STORE } from './constants/bullmq.constants.js'; +import { + QUEUE_NAME_PROVIDE_ALL_CIDS, + QUEUE_NAME_PROVIDE_TO_DHT, + QUEUE_NAME_PRUNE_PEER_STORE, +} from './constants/bullmq.constants.js'; import { PrunePeerStoreProducer } from './queues/producers/prune-peer-store.producer.js'; import { PrunePeerStoreProcessor } from './queues/processors/prune-peer-store.processor.js'; import { PrunePeerStoreJob } from './jobs/impl/prune-peer-store.job.js'; import { Scheduler } from './jobs/scheduler.js'; import { SchedulerRegistry } from '@nestjs/schedule'; +import { ProvideAllCidsJob } from './jobs/impl/provide-all-cids.job.js'; +import { ProvideAllCidsProducer } from './queues/producers/provide-all-cids.producer.js'; +import { ProvideAllCidsProcessor } from './queues/processors/provide-all-cids.processor.js'; @Module({ imports: [ @@ -23,6 +30,9 @@ import { SchedulerRegistry } from '@nestjs/schedule'; BullModule.registerQueue({ name: QUEUE_NAME_PRUNE_PEER_STORE, }), + BullModule.registerQueue({ + name: QUEUE_NAME_PROVIDE_ALL_CIDS, + }), ], controllers: [AppController], providers: [ @@ -31,18 +41,23 @@ import { SchedulerRegistry } from '@nestjs/schedule'; ProvideToDHTProducer, PrunePeerStoreProducer, PrunePeerStoreProcessor, + ProvideAllCidsProducer, + ProvideAllCidsProcessor, SchedulerRegistry, Scheduler, PrunePeerStoreJob, + ProvideAllCidsJob, ], }) export class AppModule { constructor( private readonly scheduler: Scheduler, private readonly prunePeerStoreJob: PrunePeerStoreJob, + private readonly provideAllCidsJob: ProvideAllCidsJob, ) {} onModuleInit() { this.scheduler.registerJob(this.prunePeerStoreJob); + this.scheduler.registerJob(this.provideAllCidsJob); } } diff --git a/ipfs-service/src/app.service.ts b/ipfs-service/src/app.service.ts index fa347d33..a853c367 100644 --- a/ipfs-service/src/app.service.ts +++ b/ipfs-service/src/app.service.ts @@ -4,99 +4,27 @@ import { Logger, OnModuleInit, } from '@nestjs/common'; -import { ConfigService } from '@nestjs/config'; import { createHelia } from 'helia'; +import type { HeliaLibp2p } from 'helia'; import { CID } from 'multiformats/cid'; -import { bootstrap } from '@libp2p/bootstrap'; -import { identify } from '@libp2p/identify'; -import { webSockets } from '@libp2p/websockets'; -import { tcp } from '@libp2p/tcp'; -import { noise } from '@chainsafe/libp2p-noise'; -import { yamux } from '@chainsafe/libp2p-yamux'; +import * as raw from 'multiformats/codecs/raw'; import { unixfs } from '@helia/unixfs'; import { FsBlockstore } from 'blockstore-fs'; import { LevelDatastore } from 'datastore-level'; import { IPNS, ipns } from '@helia/ipns'; -import { keychain, type Keychain } from '@libp2p/keychain'; -import { kadDHT, removePrivateAddressesMapper } from '@libp2p/kad-dht'; -import { ipnsSelector } from 'ipns/selector'; -import { ipnsValidator } from 'ipns/validator'; -import { dcutr } from '@libp2p/dcutr'; -import { autoNAT } from '@libp2p/autonat'; -import { ping } from '@libp2p/ping'; -import { uPnPNAT } from '@libp2p/upnp-nat'; -import { mdns } from '@libp2p/mdns'; -import { createDelegatedRoutingV1HttpApiClient } from '@helia/delegated-routing-v1-http-api-client'; -import { - circuitRelayTransport, - circuitRelayServer, -} from '@libp2p/circuit-relay-v2'; import { IpfsMapper } from './mapper/ipfs.mapper.js'; import { IpfsDto } from './dto/ipfs.dto.js'; import { PeerId } from '@libp2p/interface'; import { config } from 'dotenv'; import { ProvideToDHTProducer } from './queues/producers/provide-to-dht.producer.js'; import { PrunePeerStoreProducer } from './queues/producers/prune-peer-store.producer.js'; +import { ProvideAllCidsProducer } from './queues/producers/provide-all-cids.producer.js'; +import fs from 'fs'; config(); -const libp2pOptions = { - config: { - dht: { - enabled: true - } - }, - addresses: { - listen: [ - // add a listen address (localhost) to accept TCP connections on a random port - process.env.LISTEN_TCP_ADDRESS, - process.env.LISTEN_WS_ADDRESS, - process.env.LISTEN_QUIC_ADDRESS, - ], - }, - transports: [ - circuitRelayTransport({ discoverRelays: 1 }), - tcp(), - webSockets(), - ], - connectionEncryption: [noise()], - streamMuxers: [yamux()], - peerDiscovery: [ - mdns(), - bootstrap({ - list: [ - '/dns4/am6.bootstrap.libp2p.io/tcp/443/wss/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb', - '/dns4/sg1.bootstrap.libp2p.io/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt', - '/dns4/sv15.bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN', - // va1 is not in the TXT records for _dnsaddr.bootstrap.libp2p.io yet - // so use the host name directly - '/dnsaddr/va1.bootstrap.libp2p.io/p2p/12D3KooWKnDdG3iXw9eTFijk3EWSunZcFi54Zka4wmtqtt6rPxc8', - '/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ' - ], - }), - ], - services: { - autoNAT: autoNAT(), - dcutr: dcutr(), - delegatedRouting: () => - createDelegatedRoutingV1HttpApiClient('https://delegated-ipfs.dev'), - aminoDHT: kadDHT({ - clientMode: false, - peerInfoMapper: removePrivateAddressesMapper, - protocol: '/ipfs/kad/1.0.0', - validators: { ipns: ipnsValidator }, - selectors: { ipns: ipnsSelector }, - }), - identify: identify(), - keychain: keychain(), - ping: ping(), - relay: circuitRelayServer(), - upnp: uPnPNAT(), - }, -}; - @Injectable() export class AppService implements OnModuleInit { - private helia; + private helia: HeliaLibp2p; private fs; private ipns: IPNS; private ipnsPeerId: PeerId; @@ -105,6 +33,7 @@ export class AppService implements OnModuleInit { constructor( private readonly provideToDHTProducer: ProvideToDHTProducer, private readonly prunePeerStoreProducer: PrunePeerStoreProducer, + private readonly provideAllCidsProducer: ProvideAllCidsProducer, ) {} async onModuleInit() { @@ -119,7 +48,7 @@ export class AppService implements OnModuleInit { } } - async getHelia() { + async getHelia(): Promise { if (this.helia == null) { const blockstore = new FsBlockstore('ipfs/blockstore'); const datastore = new LevelDatastore('ipfs/datastore'); @@ -128,7 +57,6 @@ export class AppService implements OnModuleInit { this.helia = await createHelia({ blockstore, datastore, - libp2p: libp2pOptions, }); this.logger.log('PeerId: ' + this.helia.libp2p.peerId.toString()); @@ -232,7 +160,9 @@ export class AppService implements OnModuleInit { } return IpfsMapper.ipfsToIpfsDto(cidString, text); } catch (error) { - this.logger.error(`Failed to get doc by CID - ${cidString} error: ${error}`); + this.logger.error( + `Failed to get doc by CID - ${cidString} error: ${error}`, + ); return null; } } @@ -246,7 +176,7 @@ export class AppService implements OnModuleInit { if (!this.ipnsPeerId) { throw new InternalServerErrorException(`IPNS Peer Id not exists`); } - const ipnsUrl = process.env.IPNS_PUBLIC_URL + this.ipnsPeerId.toString() + const ipnsUrl = process.env.IPNS_PUBLIC_URL + this.ipnsPeerId.toString(); return ipnsUrl; } @@ -258,7 +188,7 @@ export class AppService implements OnModuleInit { let removedPeers = 0; const peers = await this.helia.libp2p.peerStore.all(); this.logger.debug(`Total number of stored peers: ${peers.length}`); - + if (peers.length > maxPeers) { const excessPeers = peers.slice(0, peers.length - maxPeers); for (const peer of excessPeers) { @@ -268,4 +198,57 @@ export class AppService implements OnModuleInit { } this.logger.debug(`Removed peers: ${removedPeers}`); } + + async provideCidsToDHTViaQueue(cids: string[]) { + for (const value of cids) { + const cid = CID.parse(value); + this.logger.debug('Job triggered: Provide to DHT - CID:', cid.toString()); + await this.helia.libp2p.contentRouting.provide(cid); + this.logger.log(`Announced CID to the DHT: ${cid.toString()}`); + } + } + + async addProvideAllCidsJob() { + try { + const perPage = Number(process.env.PROVIDE_ALL_CIDS_PER_PAGE); + const allCids = await this.getAllCidsFromBlockstore(); + this.logger.debug(`Total number of stored CIDs: ${allCids.length}`); + if (allCids.length > 0) { + const paginatedCids = this.paginateArray(allCids, perPage); + for (const page of paginatedCids) { + await this.provideAllCidsProducer.addToQueue(page); + } + } + } catch (err) { + this.logger.error(`Error add provide all cids job: ${err}`); + } + } + + private async getAllCidsFromBlockstore(): Promise { + try { + const blockstorePath = 'ipfs/blockstore'; + if (!fs.existsSync(blockstorePath)) { + return []; + } + const files = this.helia.blockstore.getAll(); + const cids: string[] = []; + for await (const file of files) { + const cidV1 = file.cid.toV1(); + // Create a new CID with the `raw` codec + const rawCid = CID.create(1, raw.code, cidV1.multihash); + cids.push(rawCid.toString()); + } + return cids; + } catch (err) { + this.logger.error('Error listing blocks:', err); + } + } + + private paginateArray(array: T[], perPage: number): T[][] { + const paginated: T[][] = []; + for (let i = 0; i < array.length; i += perPage) { + paginated.push(array.slice(i, i + perPage)); + } + return paginated; + } } diff --git a/ipfs-service/src/bullmq/bullmq.module.ts b/ipfs-service/src/bullmq/bullmq.module.ts index c0239ef1..80d13b2b 100644 --- a/ipfs-service/src/bullmq/bullmq.module.ts +++ b/ipfs-service/src/bullmq/bullmq.module.ts @@ -1,23 +1,29 @@ -import { BullBoardModule } from "@bull-board/nestjs"; -import { BullModule } from "@nestjs/bullmq"; -import { Module } from "@nestjs/common"; -import { ConfigService } from "@nestjs/config"; -import { QUEUE_NAME_PROVIDE_TO_DHT, QUEUE_NAME_PRUNE_PEER_STORE } from "../constants/bullmq.constants.js"; -import { BullMQAdapter } from "@bull-board/api/bullMQAdapter.js"; -import { ExpressAdapter } from "@bull-board/express"; +import { BullBoardModule } from '@bull-board/nestjs'; +import { BullModule } from '@nestjs/bullmq'; +import { Module } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { + QUEUE_NAME_PROVIDE_ALL_CIDS, + QUEUE_NAME_PROVIDE_TO_DHT, + QUEUE_NAME_PRUNE_PEER_STORE, +} from '../constants/bullmq.constants.js'; +import { BullMQAdapter } from '@bull-board/api/bullMQAdapter.js'; +import { ExpressAdapter } from '@bull-board/express'; @Module({ imports: [ BullModule.forRootAsync({ useFactory: async (configService: ConfigService) => ({ connection: { - host: configService.getOrThrow("REDIS_HOST"), - port: configService.getOrThrow("REDIS_PORT"), - password: configService.getOrThrow("REDIS_PASSWORD"), + host: configService.getOrThrow('REDIS_HOST'), + port: configService.getOrThrow('REDIS_PORT'), + password: configService.getOrThrow('REDIS_PASSWORD'), connectTimeout: 20000, reconnectOnError: (err) => { const targetErrors = ['READONLY', 'ETIMEDOUT', 'ECONNRESET']; - return targetErrors.some(targetError => err.message.includes(targetError)); + return targetErrors.some((targetError) => + err.message.includes(targetError), + ); }, ...(configService.get('REDIS_TLS') === 'false' ? {} : { tls: {} }), }, @@ -25,7 +31,7 @@ import { ExpressAdapter } from "@bull-board/express"; inject: [ConfigService], }), BullBoardModule.forRoot({ - route: "/bull-board", + route: '/bull-board', adapter: ExpressAdapter, }), BullBoardModule.forFeature({ @@ -36,6 +42,10 @@ import { ExpressAdapter } from "@bull-board/express"; name: QUEUE_NAME_PRUNE_PEER_STORE, adapter: BullMQAdapter, }), + BullBoardModule.forFeature({ + name: QUEUE_NAME_PROVIDE_ALL_CIDS, + adapter: BullMQAdapter, + }), ], }) export class BullmqModule {} diff --git a/ipfs-service/src/constants/bullmq.constants.ts b/ipfs-service/src/constants/bullmq.constants.ts index 39f98014..c25990dd 100644 --- a/ipfs-service/src/constants/bullmq.constants.ts +++ b/ipfs-service/src/constants/bullmq.constants.ts @@ -1,5 +1,8 @@ -export const QUEUE_NAME_PROVIDE_TO_DHT = "provide_to_dht_queue"; -export const JOB_NAME_PROVIDE_TO_DHT = "provide_to_dht_job"; +export const QUEUE_NAME_PROVIDE_TO_DHT = 'provide_to_dht_queue'; +export const JOB_NAME_PROVIDE_TO_DHT = 'provide_to_dht_job'; -export const QUEUE_NAME_PRUNE_PEER_STORE = "prune_peer_store_queue"; -export const JOB_NAME_PRUNE_PEER_STORE = "prune_peer_store_job"; \ No newline at end of file +export const QUEUE_NAME_PRUNE_PEER_STORE = 'prune_peer_store_queue'; +export const JOB_NAME_PRUNE_PEER_STORE = 'prune_peer_store_job'; + +export const QUEUE_NAME_PROVIDE_ALL_CIDS = 'provide_all_cids_queue'; +export const JOB_NAME_PROVIDE_ALL_CIDS = 'provide_all_cids_job'; diff --git a/ipfs-service/src/dto/ipfs.dto.ts b/ipfs-service/src/dto/ipfs.dto.ts index 3880d0ec..cc1b8fce 100644 --- a/ipfs-service/src/dto/ipfs.dto.ts +++ b/ipfs-service/src/dto/ipfs.dto.ts @@ -1,6 +1,5 @@ export class IpfsDto { - cid?: string; - content?: string; - url?: string; - } - \ No newline at end of file + cid?: string; + content?: string; + url?: string; +} diff --git a/ipfs-service/src/jobs/impl/provide-all-cids.job.ts b/ipfs-service/src/jobs/impl/provide-all-cids.job.ts new file mode 100644 index 00000000..ed36e725 --- /dev/null +++ b/ipfs-service/src/jobs/impl/provide-all-cids.job.ts @@ -0,0 +1,25 @@ +import { ConfigService } from '@nestjs/config'; +import { IJob } from '../i-job'; +import { Injectable } from '@nestjs/common'; +import { JOB_NAME_PROVIDE_ALL_CIDS } from '../../constants/bullmq.constants.js'; +import { AppService } from '../../app.service.js'; + +@Injectable() +export class ProvideAllCidsJob implements IJob { + constructor( + private readonly configService: ConfigService, + private readonly appService: AppService, + ) {} + getJobName(): string { + return JOB_NAME_PROVIDE_ALL_CIDS; + } + getInterval(): string { + return ( + this.configService.get('PROVIDE_ALL_CIDS_INTERVAL') || + '0 0 */12 * * *' + ); + } + execute(): void { + this.appService.addProvideAllCidsJob(); + } +} diff --git a/ipfs-service/src/jobs/impl/prune-peer-store.job.ts b/ipfs-service/src/jobs/impl/prune-peer-store.job.ts index 0388c42a..44734121 100644 --- a/ipfs-service/src/jobs/impl/prune-peer-store.job.ts +++ b/ipfs-service/src/jobs/impl/prune-peer-store.job.ts @@ -15,7 +15,8 @@ export class PrunePeerStoreJob implements IJob { } getInterval(): string { return ( - this.configService.get('PRUNE_PEER_STORE_INTERVAL') || '0 0 * * * *' + this.configService.get('PRUNE_PEER_STORE_INTERVAL') || + '0 0 * * * *' ); } execute(): void { diff --git a/ipfs-service/src/main.ts b/ipfs-service/src/main.ts index 91f07a94..bf44288e 100644 --- a/ipfs-service/src/main.ts +++ b/ipfs-service/src/main.ts @@ -3,7 +3,7 @@ import { AppModule } from './app.module.js'; import morgan from 'morgan'; import { LoggerFactory } from './util/logger-factory.js'; import { LoggerService } from '@nestjs/common'; - +import events from 'events'; async function bootstrap() { const app = await NestFactory.create(AppModule); @@ -11,16 +11,19 @@ async function bootstrap() { const logger: LoggerService = LoggerFactory('IPFS Service'); const morganFormat = - ':method :url :status :res[content-length] - :response-time ms'; + ':method :url :status :res[content-length] - :response-time ms'; app.use( - morgan(morganFormat, { - stream: { - write: (message) => { - logger.log(message); + morgan(morganFormat, { + stream: { + write: (message) => { + logger.log(message); + }, }, - }, - }), -); + }), + ); + + // Increase the limit globally for all event emitters + events.EventEmitter.defaultMaxListeners = 50; // Set your desired limit await app.listen(3001); } diff --git a/ipfs-service/src/mapper/ipfs.mapper.ts b/ipfs-service/src/mapper/ipfs.mapper.ts index bae917d5..102f34b7 100644 --- a/ipfs-service/src/mapper/ipfs.mapper.ts +++ b/ipfs-service/src/mapper/ipfs.mapper.ts @@ -1,13 +1,11 @@ - import { IpfsDto } from '../dto/ipfs.dto.js'; export class IpfsMapper { - static ipfsToIpfsDto(cid?: string, content?: string, url?: string): IpfsDto { const ipfsDto = new IpfsDto(); - if(cid) ipfsDto.cid = cid; - if(content) ipfsDto.content = content; - if(url) ipfsDto.url = url; + if (cid) ipfsDto.cid = cid; + if (content) ipfsDto.content = content; + if (url) ipfsDto.url = url; return ipfsDto; } } diff --git a/ipfs-service/src/queues/processors/provide-all-cids.processor.ts b/ipfs-service/src/queues/processors/provide-all-cids.processor.ts new file mode 100644 index 00000000..94acaf0c --- /dev/null +++ b/ipfs-service/src/queues/processors/provide-all-cids.processor.ts @@ -0,0 +1,43 @@ +import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq'; +import { Job } from 'bullmq'; +import { + JOB_NAME_PROVIDE_ALL_CIDS, + QUEUE_NAME_PROVIDE_ALL_CIDS, +} from '../../constants/bullmq.constants.js'; +import { Logger } from '@nestjs/common'; +import { AppService } from '../../app.service.js'; + +@Processor(QUEUE_NAME_PROVIDE_ALL_CIDS) +export class ProvideAllCidsProcessor extends WorkerHost { + protected readonly logger = new Logger(ProvideAllCidsProcessor.name); + constructor(private readonly appService: AppService) { + super(); + } + + async process(job: Job): Promise { + switch (job.name) { + case JOB_NAME_PROVIDE_ALL_CIDS: { + try { + this.logger.debug( + `Processing CIDs - amount of CIDs to be processed, ${job.data?.length}`, + ); + await this.appService.provideCidsToDHTViaQueue(job.data); + } catch (error) { + this.logger.error( + `Error processing job id: ${job.id}, name: ${job.name}. - Error: ${error}`, + ); + throw error; + } + } + } + } + + @OnWorkerEvent('completed') + onCompleted(job: Job) { + const { id, name, queueName, finishedOn } = job; + const completionTime = finishedOn ? new Date(finishedOn).toISOString() : ''; + this.logger.log( + `Job Finished - id: ${id}, name: ${name} in queue ${queueName} on ${completionTime}.`, + ); + } +} diff --git a/ipfs-service/src/queues/processors/provide-to-dht.processor.ts b/ipfs-service/src/queues/processors/provide-to-dht.processor.ts index af9048e3..a41d7b0a 100644 --- a/ipfs-service/src/queues/processors/provide-to-dht.processor.ts +++ b/ipfs-service/src/queues/processors/provide-to-dht.processor.ts @@ -19,22 +19,28 @@ export class ProvideToDHTProcessor extends WorkerHost { switch (job.name) { case JOB_NAME_PROVIDE_TO_DHT: { const cid = CID.parse(job.data); - this.logger.debug('Job triggered: Provide to DHT - CID:', cid.toString()); + this.logger.debug( + 'Job triggered: Provide to DHT - CID:', + cid.toString(), + ); try { await this.appService.provideCidtoDHTViaQueue(cid); } catch (error) { - this.logger.error(`Error processing job id: ${job.id}, name: ${job.name}. - Error: ${error}`); + this.logger.error( + `Error processing job id: ${job.id}, name: ${job.name}. - Error: ${error}`, + ); throw error; - } + } } } } @OnWorkerEvent('completed') onCompleted(job: Job) { - const { id, name, queueName, finishedOn, returnvalue } = job; + const { id, name, queueName, finishedOn } = job; const completionTime = finishedOn ? new Date(finishedOn).toISOString() : ''; this.logger.log( - `Job Finished - id: ${id}, name: ${name} in queue ${queueName} on ${completionTime}.`,); + `Job Finished - id: ${id}, name: ${name} in queue ${queueName} on ${completionTime}.`, + ); } } diff --git a/ipfs-service/src/queues/processors/prune-peer-store.processor.ts b/ipfs-service/src/queues/processors/prune-peer-store.processor.ts index 6bb85951..e9d06bca 100644 --- a/ipfs-service/src/queues/processors/prune-peer-store.processor.ts +++ b/ipfs-service/src/queues/processors/prune-peer-store.processor.ts @@ -1,8 +1,8 @@ import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq'; import { Job } from 'bullmq'; import { - JOB_NAME_PRUNE_PEER_STORE, - QUEUE_NAME_PRUNE_PEER_STORE, + JOB_NAME_PRUNE_PEER_STORE, + QUEUE_NAME_PRUNE_PEER_STORE, } from '../../constants/bullmq.constants.js'; import { Logger } from '@nestjs/common'; import { AppService } from '../../app.service.js'; @@ -22,18 +22,21 @@ export class PrunePeerStoreProcessor extends WorkerHost { try { await this.appService.prunePeerStore(maxPeers); } catch (error) { - this.logger.error(`Error processing job id: ${job.id}, name: ${job.name}. - Error: ${error}`); + this.logger.error( + `Error processing job id: ${job.id}, name: ${job.name}. - Error: ${error}`, + ); throw error; - } + } } } } @OnWorkerEvent('completed') onCompleted(job: Job) { - const { id, name, queueName, finishedOn, returnvalue } = job; + const { id, name, queueName, finishedOn } = job; const completionTime = finishedOn ? new Date(finishedOn).toISOString() : ''; this.logger.log( - `Job Finished - id: ${id}, name: ${name} in queue ${queueName} on ${completionTime}.`,); + `Job Finished - id: ${id}, name: ${name} in queue ${queueName} on ${completionTime}.`, + ); } } diff --git a/ipfs-service/src/queues/producers/provide-all-cids.producer.ts b/ipfs-service/src/queues/producers/provide-all-cids.producer.ts new file mode 100644 index 00000000..2e0133a2 --- /dev/null +++ b/ipfs-service/src/queues/producers/provide-all-cids.producer.ts @@ -0,0 +1,34 @@ +import { InjectQueue } from '@nestjs/bullmq'; +import { Injectable } from '@nestjs/common'; +import { Queue } from 'bullmq'; +import { + JOB_NAME_PROVIDE_ALL_CIDS, + QUEUE_NAME_PROVIDE_ALL_CIDS, +} from '../../constants/bullmq.constants.js'; +import { randomUUID } from 'crypto'; + +@Injectable() +export class ProvideAllCidsProducer { + constructor( + @InjectQueue(QUEUE_NAME_PROVIDE_ALL_CIDS) + private readonly provideAllCidsQueue: Queue, + ) {} + + async addToQueue(inputData: string[]) { + const job = await this.provideAllCidsQueue.add( + JOB_NAME_PROVIDE_ALL_CIDS, + inputData, + { + jobId: randomUUID(), + removeOnComplete: true, + removeOnFail: false, + attempts: 3, + backoff: { + type: 'fixed', + delay: 60000, // 60 seconds + }, + }, + ); + return job; + } +} diff --git a/ipfs-service/src/queues/producers/provide-to-dht.producer.ts b/ipfs-service/src/queues/producers/provide-to-dht.producer.ts index 4d890a90..6193a3d5 100644 --- a/ipfs-service/src/queues/producers/provide-to-dht.producer.ts +++ b/ipfs-service/src/queues/producers/provide-to-dht.producer.ts @@ -11,20 +11,26 @@ import { randomUUID } from 'crypto'; @Injectable() export class ProvideToDHTProducer { constructor( - @InjectQueue(QUEUE_NAME_PROVIDE_TO_DHT) private readonly provideToDHTQueue: Queue, - private readonly configService: ConfigService + @InjectQueue(QUEUE_NAME_PROVIDE_TO_DHT) + private readonly provideToDHTQueue: Queue, + private readonly configService: ConfigService, ) {} - async addToQueue(inputData: string) { - const job = await this.provideToDHTQueue.add(JOB_NAME_PROVIDE_TO_DHT, inputData, { - jobId: randomUUID(), - removeOnComplete: true, - removeOnFail: false, - attempts: this.configService.getOrThrow('DHT_QUEUE_ATTEMPTS'), - backoff: { - type: this.configService.getOrThrow('DHT_QUEUE_BACKOFF_TYPE'), - delay: this.configService.getOrThrow('DHT_QUEUE_BACKOFF_DELAY') }, - }); + async addToQueue(inputData: string) { + const job = await this.provideToDHTQueue.add( + JOB_NAME_PROVIDE_TO_DHT, + inputData, + { + jobId: randomUUID(), + removeOnComplete: true, + removeOnFail: false, + attempts: this.configService.getOrThrow('DHT_QUEUE_ATTEMPTS'), + backoff: { + type: this.configService.getOrThrow('DHT_QUEUE_BACKOFF_TYPE'), + delay: this.configService.getOrThrow('DHT_QUEUE_BACKOFF_DELAY'), + }, + }, + ); return job; } } diff --git a/ipfs-service/src/queues/producers/prune-peer-store.producer.ts b/ipfs-service/src/queues/producers/prune-peer-store.producer.ts index 74bada14..38a90151 100644 --- a/ipfs-service/src/queues/producers/prune-peer-store.producer.ts +++ b/ipfs-service/src/queues/producers/prune-peer-store.producer.ts @@ -1,6 +1,5 @@ import { InjectQueue } from '@nestjs/bullmq'; import { Injectable } from '@nestjs/common'; -import { ConfigService } from '@nestjs/config'; import { Queue } from 'bullmq'; import { JOB_NAME_PRUNE_PEER_STORE, @@ -11,17 +10,22 @@ import { randomUUID } from 'crypto'; @Injectable() export class PrunePeerStoreProducer { constructor( - @InjectQueue(QUEUE_NAME_PRUNE_PEER_STORE) private readonly prunePeerStoreQueue: Queue, + @InjectQueue(QUEUE_NAME_PRUNE_PEER_STORE) + private readonly prunePeerStoreQueue: Queue, ) {} - async addToQueue(inputData: string) { - const job = await this.prunePeerStoreQueue.add(JOB_NAME_PRUNE_PEER_STORE, inputData, { - jobId: randomUUID(), - removeOnComplete: true, - removeOnFail: false, - attempts: 3, - backoff: { type: 'fixed', delay: 300000 }, // 300 seconds - }); + async addToQueue(inputData: string) { + const job = await this.prunePeerStoreQueue.add( + JOB_NAME_PRUNE_PEER_STORE, + inputData, + { + jobId: randomUUID(), + removeOnComplete: true, + removeOnFail: false, + attempts: 3, + backoff: { type: 'fixed', delay: 300000 }, // 300 seconds + }, + ); return job; } } diff --git a/ipfs-service/tsconfig.json b/ipfs-service/tsconfig.json index 1ff97afa..7f4620f9 100644 --- a/ipfs-service/tsconfig.json +++ b/ipfs-service/tsconfig.json @@ -25,5 +25,7 @@ "@dto/*": ["src/dto/*"], "@mapper/*": ["src/mapper/*"], } - } + }, + "include": ["src/**/*", ".eslintrc.cjs"], + "exclude": ["node_modules", "dist"] }