Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: read replica service #2377

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions services/workflows-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
"@nestjs/serve-static": "3.0.1",
"@nestjs/testing": "^9.3.12",
"@prisma/client": "4.16.2",
"@prisma/extension-read-replicas": "^0.3.0",
"@sentry/cli": "^2.17.5",
"@sentry/integrations": "^7.52.1",
"@sentry/node": "^7.52.1",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Injectable } from '@nestjs/common';
import { PrismaService } from '@/prisma/prisma.service';
import { inspect } from 'node:util';
import {
InlineRule,
TransactionsAgainstDynamicRulesType,
Expand All @@ -11,6 +12,8 @@ import { AggregateType, TIME_UNITS } from './consts';
import { Prisma } from '@prisma/client';
import { AppLoggerService } from '@/common/app-logger/app-logger.service';
import { isEmpty } from 'lodash';
import { ReadPrismaService } from '@/prisma/prisma.read-replica.service';
import { NotFoundEvaluationError } from './errors';

@Injectable()
export class DataAnalyticsService {
Expand Down Expand Up @@ -46,14 +49,14 @@ export class DataAnalyticsService {
});
}

// Used for exhaustive check
inlineRule satisfies never;

this.logger.error(`No evaluation function found`, {
const error = new NotFoundEvaluationError(
`No evaluation function found for rule name: ${(inlineRule as InlineRule).id}`,
inlineRule,
});
);

this.logger.error(error);

throw new Error(`No evaluation function found for rule name: ${(inlineRule as InlineRule).id}`);
throw error;
}

async evaluateTransactionsAgainstDynamicRules({
Expand Down
10 changes: 10 additions & 0 deletions services/workflows-service/src/data-analytics/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { InlineRule } from './types';

export class NotFoundEvaluationError extends Error {
inlineRule: string;

constructor(message: string, inlineRule: InlineRule) {
super(message);
this.inlineRule = JSON.stringify(inlineRule);
}
}
1 change: 1 addition & 0 deletions services/workflows-service/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ export const env = createEnv({
.string()
.optional()
.describe('Bucket name of Data migration folders'),
READ_REPLICA_DATABASE_URL: z.string().optional().describe('Database replica URL'),
},
client: {},
/**
Expand Down
1 change: 1 addition & 0 deletions services/workflows-service/src/prisma/consts.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const PRISMA_READ_SERVICE = 'PrismaReadService';
18 changes: 16 additions & 2 deletions services/workflows-service/src/prisma/prisma.module.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,23 @@
import { Global, Module } from '@nestjs/common';
import { PrismaService } from './prisma.service';
import { PRISMA_READ_SERVICE, PRISMA_SERVICE } from './consts';
import { AppLoggerService } from '@/common/app-logger/app-logger.service';
import { ReadPrismaService } from './prisma.read-replica.service';
import { env } from '@/env';

@Global()
@Module({
providers: [PrismaService],
exports: [PrismaService],
providers: [
PrismaService,
{
provide: PRISMA_READ_SERVICE,
useClass: Boolean(env.READ_REPLICA_DATABASE_URL) ? ReadPrismaService : PrismaService,
inject: [AppLoggerService],
useFactory(logger: AppLoggerService): PrismaService {
return new ReadPrismaService(logger);
},
},
].filter(Boolean),
exports: [PRISMA_READ_SERVICE, PrismaService],
})
export class PrismaModule {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { readReplicas } from '@prisma/extension-read-replicas';
import { Injectable } from '@nestjs/common';
import { PrismaService } from './prisma.service';
import { AppLoggerService } from '@/common/app-logger/app-logger.service';
import { PrismaClient } from '@prisma/client';
import { env } from '@/env';

@Injectable()
export class ReadPrismaService extends PrismaService {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give an example of how will you use it? Assuming we have a new endpoint that fetches EndUsers, how will you make the query using the read replica?

readonly readClient = this.extendedClient.$extends(
readReplicas({
url: [env.READ_REPLICA_DATABASE_URL!],
}),
);

constructor(protected readonly logger: AppLoggerService) {
super(logger);
Object.assign(this, this.readClient);
}

async onModuleInit() {
await this.readClient.$connect();

// @ts-ignore
this.$on('query', (e: Prisma.QueryEvent) => {
if (e && e.query) {
this.logger.debug(`Read Replica Query: ${e.query}`);
}
});

super.onModuleInit();
}

async onModuleDestroy() {
await this.readClient.$disconnect();

super.onModuleDestroy();
}
}
5 changes: 4 additions & 1 deletion services/workflows-service/src/prisma/prisma.service.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { Prisma, PrismaClient } from '@prisma/client';
import { readReplicas } from '@prisma/extension-read-replicas';

import { AppLoggerService } from '@/common/app-logger/app-logger.service';
import { isErrorWithMessage } from '@ballerine/common';
import { Prisma, PrismaClient } from '@prisma/client';

const prismaExtendedClient = (prismaClient: PrismaClient) =>
prismaClient.$extends({
Expand Down Expand Up @@ -52,6 +53,8 @@ export class PrismaService extends PrismaClient implements OnModuleInit, OnModul
BigInt.prototype.toJSON = function () {
return this.toString();
};

Object.assign(this, this.extendedClient);
}

async onModuleInit() {
Expand Down
Loading