Skip to content

Commit

Permalink
fix/announce-cid-with-cronjob: Provided announcing cids with cronjob …
Browse files Browse the repository at this point in the history
…process (#528)
  • Loading branch information
BEdev24 authored Jan 4, 2025
1 parent 5fd1190 commit f4e233c
Show file tree
Hide file tree
Showing 19 changed files with 318 additions and 155 deletions.
25 changes: 25 additions & 0 deletions ipfs-service/.eslintrc.cjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
module.exports = {
parser: '@typescript-eslint/parser',
parserOptions: {
project: 'tsconfig.json',
tsconfigRootDir: __dirname,
sourceType: 'module',
},
plugins: ['@typescript-eslint/eslint-plugin'],
extends: [
'plugin:@typescript-eslint/recommended',
'plugin:prettier/recommended',
],
root: true,
env: {
node: true,
jest: true,
},
ignorePatterns: ['.eslintrc.js'],
rules: {
'@typescript-eslint/interface-name-prefix': 'off',
'@typescript-eslint/explicit-function-return-type': 'off',
'@typescript-eslint/explicit-module-boundary-types': 'off',
'@typescript-eslint/no-explicit-any': 'off',
},
};
5 changes: 4 additions & 1 deletion ipfs-service/example.env
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@ DHT_QUEUE_BACKOFF_TYPE=exponential
DHT_QUEUE_BACKOFF_DELAY=1000

MAX_PEERS=5000
PRUNE_PEER_STORE_INTERVAL=0 0 */12 * * * # every 12h
PRUNE_PEER_STORE_INTERVAL=0 0 */12 * * * # every 12h

PROVIDE_ALL_CIDS_PER_PAGE=5
PROVIDE_ALL_CIDS_INTERVAL=0 0 */12 * * * # every 12h
2 changes: 1 addition & 1 deletion ipfs-service/src/app.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export class AppController {
if (!doc) {
throw new NotFoundException(`Document with cid: ${cid} not found`);
}
return doc
return doc;
}

@Get('ipns/url')
Expand Down
17 changes: 16 additions & 1 deletion ipfs-service/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,19 @@ import { BullModule } from '@nestjs/bullmq';
import { BullmqModule } from './bullmq/bullmq.module.js';
import { ProvideToDHTProcessor } from './queues/processors/provide-to-dht.processor.js';
import { ProvideToDHTProducer } from './queues/producers/provide-to-dht.producer.js';
import { QUEUE_NAME_PROVIDE_TO_DHT, QUEUE_NAME_PRUNE_PEER_STORE } from './constants/bullmq.constants.js';
import {
QUEUE_NAME_PROVIDE_ALL_CIDS,
QUEUE_NAME_PROVIDE_TO_DHT,
QUEUE_NAME_PRUNE_PEER_STORE,
} from './constants/bullmq.constants.js';
import { PrunePeerStoreProducer } from './queues/producers/prune-peer-store.producer.js';
import { PrunePeerStoreProcessor } from './queues/processors/prune-peer-store.processor.js';
import { PrunePeerStoreJob } from './jobs/impl/prune-peer-store.job.js';
import { Scheduler } from './jobs/scheduler.js';
import { SchedulerRegistry } from '@nestjs/schedule';
import { ProvideAllCidsJob } from './jobs/impl/provide-all-cids.job.js';
import { ProvideAllCidsProducer } from './queues/producers/provide-all-cids.producer.js';
import { ProvideAllCidsProcessor } from './queues/processors/provide-all-cids.processor.js';

@Module({
imports: [
Expand All @@ -23,6 +30,9 @@ import { SchedulerRegistry } from '@nestjs/schedule';
BullModule.registerQueue({
name: QUEUE_NAME_PRUNE_PEER_STORE,
}),
BullModule.registerQueue({
name: QUEUE_NAME_PROVIDE_ALL_CIDS,
}),
],
controllers: [AppController],
providers: [
Expand All @@ -31,18 +41,23 @@ import { SchedulerRegistry } from '@nestjs/schedule';
ProvideToDHTProducer,
PrunePeerStoreProducer,
PrunePeerStoreProcessor,
ProvideAllCidsProducer,
ProvideAllCidsProcessor,
SchedulerRegistry,
Scheduler,
PrunePeerStoreJob,
ProvideAllCidsJob,
],
})
export class AppModule {
constructor(
private readonly scheduler: Scheduler,
private readonly prunePeerStoreJob: PrunePeerStoreJob,
private readonly provideAllCidsJob: ProvideAllCidsJob,
) {}

onModuleInit() {
this.scheduler.registerJob(this.prunePeerStoreJob);
this.scheduler.registerJob(this.provideAllCidsJob);
}
}
147 changes: 65 additions & 82 deletions ipfs-service/src/app.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,99 +4,27 @@ import {
Logger,
OnModuleInit,
} from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { createHelia } from 'helia';
import type { HeliaLibp2p } from 'helia';
import { CID } from 'multiformats/cid';
import { bootstrap } from '@libp2p/bootstrap';
import { identify } from '@libp2p/identify';
import { webSockets } from '@libp2p/websockets';
import { tcp } from '@libp2p/tcp';
import { noise } from '@chainsafe/libp2p-noise';
import { yamux } from '@chainsafe/libp2p-yamux';
import * as raw from 'multiformats/codecs/raw';
import { unixfs } from '@helia/unixfs';
import { FsBlockstore } from 'blockstore-fs';
import { LevelDatastore } from 'datastore-level';
import { IPNS, ipns } from '@helia/ipns';
import { keychain, type Keychain } from '@libp2p/keychain';
import { kadDHT, removePrivateAddressesMapper } from '@libp2p/kad-dht';
import { ipnsSelector } from 'ipns/selector';
import { ipnsValidator } from 'ipns/validator';
import { dcutr } from '@libp2p/dcutr';
import { autoNAT } from '@libp2p/autonat';
import { ping } from '@libp2p/ping';
import { uPnPNAT } from '@libp2p/upnp-nat';
import { mdns } from '@libp2p/mdns';
import { createDelegatedRoutingV1HttpApiClient } from '@helia/delegated-routing-v1-http-api-client';
import {
circuitRelayTransport,
circuitRelayServer,
} from '@libp2p/circuit-relay-v2';
import { IpfsMapper } from './mapper/ipfs.mapper.js';
import { IpfsDto } from './dto/ipfs.dto.js';
import { PeerId } from '@libp2p/interface';
import { config } from 'dotenv';
import { ProvideToDHTProducer } from './queues/producers/provide-to-dht.producer.js';
import { PrunePeerStoreProducer } from './queues/producers/prune-peer-store.producer.js';
import { ProvideAllCidsProducer } from './queues/producers/provide-all-cids.producer.js';
import fs from 'fs';
config();

const libp2pOptions = {
config: {
dht: {
enabled: true
}
},
addresses: {
listen: [
// add a listen address (localhost) to accept TCP connections on a random port
process.env.LISTEN_TCP_ADDRESS,
process.env.LISTEN_WS_ADDRESS,
process.env.LISTEN_QUIC_ADDRESS,
],
},
transports: [
circuitRelayTransport({ discoverRelays: 1 }),
tcp(),
webSockets(),
],
connectionEncryption: [noise()],
streamMuxers: [yamux()],
peerDiscovery: [
mdns(),
bootstrap({
list: [
'/dns4/am6.bootstrap.libp2p.io/tcp/443/wss/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb',
'/dns4/sg1.bootstrap.libp2p.io/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt',
'/dns4/sv15.bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN',
// va1 is not in the TXT records for _dnsaddr.bootstrap.libp2p.io yet
// so use the host name directly
'/dnsaddr/va1.bootstrap.libp2p.io/p2p/12D3KooWKnDdG3iXw9eTFijk3EWSunZcFi54Zka4wmtqtt6rPxc8',
'/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ'
],
}),
],
services: {
autoNAT: autoNAT(),
dcutr: dcutr(),
delegatedRouting: () =>
createDelegatedRoutingV1HttpApiClient('https://delegated-ipfs.dev'),
aminoDHT: kadDHT({
clientMode: false,
peerInfoMapper: removePrivateAddressesMapper,
protocol: '/ipfs/kad/1.0.0',
validators: { ipns: ipnsValidator },
selectors: { ipns: ipnsSelector },
}),
identify: identify(),
keychain: keychain(),
ping: ping(),
relay: circuitRelayServer(),
upnp: uPnPNAT(),
},
};

@Injectable()
export class AppService implements OnModuleInit {
private helia;
private helia: HeliaLibp2p;
private fs;
private ipns: IPNS;
private ipnsPeerId: PeerId;
Expand All @@ -105,6 +33,7 @@ export class AppService implements OnModuleInit {
constructor(
private readonly provideToDHTProducer: ProvideToDHTProducer,
private readonly prunePeerStoreProducer: PrunePeerStoreProducer,
private readonly provideAllCidsProducer: ProvideAllCidsProducer,
) {}

async onModuleInit() {
Expand All @@ -119,7 +48,7 @@ export class AppService implements OnModuleInit {
}
}

async getHelia() {
async getHelia(): Promise<HeliaLibp2p> {
if (this.helia == null) {
const blockstore = new FsBlockstore('ipfs/blockstore');
const datastore = new LevelDatastore('ipfs/datastore');
Expand All @@ -128,7 +57,6 @@ export class AppService implements OnModuleInit {
this.helia = await createHelia({
blockstore,
datastore,
libp2p: libp2pOptions,
});

this.logger.log('PeerId: ' + this.helia.libp2p.peerId.toString());
Expand Down Expand Up @@ -232,7 +160,9 @@ export class AppService implements OnModuleInit {
}
return IpfsMapper.ipfsToIpfsDto(cidString, text);
} catch (error) {
this.logger.error(`Failed to get doc by CID - ${cidString} error: ${error}`);
this.logger.error(
`Failed to get doc by CID - ${cidString} error: ${error}`,
);
return null;
}
}
Expand All @@ -246,7 +176,7 @@ export class AppService implements OnModuleInit {
if (!this.ipnsPeerId) {
throw new InternalServerErrorException(`IPNS Peer Id not exists`);
}
const ipnsUrl = process.env.IPNS_PUBLIC_URL + this.ipnsPeerId.toString()
const ipnsUrl = process.env.IPNS_PUBLIC_URL + this.ipnsPeerId.toString();
return ipnsUrl;
}

Expand All @@ -258,7 +188,7 @@ export class AppService implements OnModuleInit {
let removedPeers = 0;
const peers = await this.helia.libp2p.peerStore.all();
this.logger.debug(`Total number of stored peers: ${peers.length}`);

if (peers.length > maxPeers) {
const excessPeers = peers.slice(0, peers.length - maxPeers);
for (const peer of excessPeers) {
Expand All @@ -268,4 +198,57 @@ export class AppService implements OnModuleInit {
}
this.logger.debug(`Removed peers: ${removedPeers}`);
}

async provideCidsToDHTViaQueue(cids: string[]) {
for (const value of cids) {
const cid = CID.parse(value);
this.logger.debug('Job triggered: Provide to DHT - CID:', cid.toString());
await this.helia.libp2p.contentRouting.provide(cid);
this.logger.log(`Announced CID to the DHT: ${cid.toString()}`);
}
}

async addProvideAllCidsJob() {
try {
const perPage = Number(process.env.PROVIDE_ALL_CIDS_PER_PAGE);
const allCids = await this.getAllCidsFromBlockstore();
this.logger.debug(`Total number of stored CIDs: ${allCids.length}`);
if (allCids.length > 0) {
const paginatedCids = this.paginateArray(allCids, perPage);
for (const page of paginatedCids) {
await this.provideAllCidsProducer.addToQueue(page);
}
}
} catch (err) {
this.logger.error(`Error add provide all cids job: ${err}`);
}
}

private async getAllCidsFromBlockstore(): Promise<string[]> {
try {
const blockstorePath = 'ipfs/blockstore';
if (!fs.existsSync(blockstorePath)) {
return [];
}
const files = this.helia.blockstore.getAll();
const cids: string[] = [];
for await (const file of files) {
const cidV1 = file.cid.toV1();
// Create a new CID with the `raw` codec
const rawCid = CID.create(1, raw.code, cidV1.multihash);
cids.push(rawCid.toString());
}
return cids;
} catch (err) {
this.logger.error('Error listing blocks:', err);
}
}

private paginateArray<T>(array: T[], perPage: number): T[][] {
const paginated: T[][] = [];
for (let i = 0; i < array.length; i += perPage) {
paginated.push(array.slice(i, i + perPage));
}
return paginated;
}
}
34 changes: 22 additions & 12 deletions ipfs-service/src/bullmq/bullmq.module.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,37 @@
import { BullBoardModule } from "@bull-board/nestjs";
import { BullModule } from "@nestjs/bullmq";
import { Module } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { QUEUE_NAME_PROVIDE_TO_DHT, QUEUE_NAME_PRUNE_PEER_STORE } from "../constants/bullmq.constants.js";
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter.js";
import { ExpressAdapter } from "@bull-board/express";
import { BullBoardModule } from '@bull-board/nestjs';
import { BullModule } from '@nestjs/bullmq';
import { Module } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import {
QUEUE_NAME_PROVIDE_ALL_CIDS,
QUEUE_NAME_PROVIDE_TO_DHT,
QUEUE_NAME_PRUNE_PEER_STORE,
} from '../constants/bullmq.constants.js';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter.js';
import { ExpressAdapter } from '@bull-board/express';

@Module({
imports: [
BullModule.forRootAsync({
useFactory: async (configService: ConfigService) => ({
connection: {
host: configService.getOrThrow("REDIS_HOST"),
port: configService.getOrThrow("REDIS_PORT"),
password: configService.getOrThrow("REDIS_PASSWORD"),
host: configService.getOrThrow('REDIS_HOST'),
port: configService.getOrThrow('REDIS_PORT'),
password: configService.getOrThrow('REDIS_PASSWORD'),
connectTimeout: 20000,
reconnectOnError: (err) => {
const targetErrors = ['READONLY', 'ETIMEDOUT', 'ECONNRESET'];
return targetErrors.some(targetError => err.message.includes(targetError));
return targetErrors.some((targetError) =>
err.message.includes(targetError),
);
},
...(configService.get('REDIS_TLS') === 'false' ? {} : { tls: {} }),
},
}),
inject: [ConfigService],
}),
BullBoardModule.forRoot({
route: "/bull-board",
route: '/bull-board',
adapter: ExpressAdapter,
}),
BullBoardModule.forFeature({
Expand All @@ -36,6 +42,10 @@ import { ExpressAdapter } from "@bull-board/express";
name: QUEUE_NAME_PRUNE_PEER_STORE,
adapter: BullMQAdapter,
}),
BullBoardModule.forFeature({
name: QUEUE_NAME_PROVIDE_ALL_CIDS,
adapter: BullMQAdapter,
}),
],
})
export class BullmqModule {}
11 changes: 7 additions & 4 deletions ipfs-service/src/constants/bullmq.constants.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
export const QUEUE_NAME_PROVIDE_TO_DHT = "provide_to_dht_queue";
export const JOB_NAME_PROVIDE_TO_DHT = "provide_to_dht_job";
export const QUEUE_NAME_PROVIDE_TO_DHT = 'provide_to_dht_queue';
export const JOB_NAME_PROVIDE_TO_DHT = 'provide_to_dht_job';

export const QUEUE_NAME_PRUNE_PEER_STORE = "prune_peer_store_queue";
export const JOB_NAME_PRUNE_PEER_STORE = "prune_peer_store_job";
export const QUEUE_NAME_PRUNE_PEER_STORE = 'prune_peer_store_queue';
export const JOB_NAME_PRUNE_PEER_STORE = 'prune_peer_store_job';

export const QUEUE_NAME_PROVIDE_ALL_CIDS = 'provide_all_cids_queue';
export const JOB_NAME_PROVIDE_ALL_CIDS = 'provide_all_cids_job';
Loading

0 comments on commit f4e233c

Please sign in to comment.