Skip to content

Commit

Permalink
Data sink performance (#1690)
Browse files Browse the repository at this point in the history
  • Loading branch information
sausage-todd authored Oct 16, 2023
1 parent b0f5c64 commit fa96db5
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 14 deletions.
7 changes: 6 additions & 1 deletion services/apps/data_sink_worker/src/bin/process-results.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { DB_CONFIG, REDIS_CONFIG, SQS_CONFIG } from '../conf'
import { DB_CONFIG, REDIS_CONFIG, SQS_CONFIG, SENTIMENT_CONFIG } from '../conf'
import DataSinkRepository from '../repo/dataSink.repo'
import DataSinkService from '../service/dataSink.service'
import { DbStore, getDbConnection } from '@crowd/database'
import { getServiceTracer } from '@crowd/tracing'
import { getServiceLogger } from '@crowd/logging'
import { getRedisClient } from '@crowd/redis'
import { NodejsWorkerEmitter, SearchSyncWorkerEmitter, getSqsClient } from '@crowd/sqs'
import { initializeSentimentAnalysis } from '@crowd/sentiment'

const tracer = getServiceTracer()
const log = getServiceLogger()
Expand All @@ -23,8 +24,12 @@ setImmediate(async () => {
const sqsClient = getSqsClient(SQS_CONFIG())
const redisClient = await getRedisClient(REDIS_CONFIG())

initializeSentimentAnalysis(SENTIMENT_CONFIG())

const nodejsWorkerEmitter = new NodejsWorkerEmitter(sqsClient, tracer, log)
await nodejsWorkerEmitter.init()
const searchSyncWorkerEmitter = new SearchSyncWorkerEmitter(sqsClient, tracer, log)
await searchSyncWorkerEmitter.init()

const dbConnection = await getDbConnection(DB_CONFIG())
const store = new DbStore(log, dbConnection)
Expand Down
20 changes: 12 additions & 8 deletions services/apps/data_sink_worker/src/repo/member.repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,26 +152,30 @@ export default class MemberRepository extends RepositoryBase<MemberRepository> {
},
})

const updatedAt = new Date()

const prepared = RepositoryBase.prepare(
{
...data,
...(data?.weakIdentities &&
data?.weakIdentities?.length > 0 && {
weakIdentities: JSON.stringify(data.weakIdentities),
}),
updatedAt: new Date(),
updatedAt,
},
dynamicColumnSet,
)
const query = this.dbInstance.helpers.update(prepared, dynamicColumnSet)

const condition = this.format('where id = $(id) and "tenantId" = $(tenantId)', {
id,
tenantId,
})
const result = await this.db().result(`${query} ${condition}`)

this.checkUpdateRowCount(result.rowCount, 1)
const condition = this.format(
'where id = $(id) and "tenantId" = $(tenantId) and "updatedAt" < $(updatedAt)',
{
id,
tenantId,
updatedAt,
},
)
await this.db().result(`${query} ${condition}`)
}

public async getIdentities(memberId: string, tenantId: string): Promise<IMemberIdentity[]> {
Expand Down
12 changes: 7 additions & 5 deletions services/apps/data_sink_worker/src/repo/organization.repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -458,24 +458,26 @@ export class OrganizationRepository extends RepositoryBase<OrganizationRepositor
table: 'organizations',
},
})
const updatedAt = new Date()
const prepared = RepositoryBase.prepare(
{
...data,
...(data?.weakIdentities &&
data?.weakIdentities?.length > 0 && {
weakIdentities: JSON.stringify(data.weakIdentities),
}),
updatedAt: new Date(),
updatedAt,
},
dynamicColumnSet,
)

const query = this.dbInstance.helpers.update(prepared, dynamicColumnSet)
const condition = this.format('where id = $(id)', { id })

const result = await this.db().result(`${query} ${condition}`)
const condition = this.format('where id = $(id) and "updatedAt" < $(updatedAt)', {
id,
updatedAt,
})

this.checkUpdateRowCount(result.rowCount, 1)
await this.db().result(`${query} ${condition}`)
}

public async addIdentity(
Expand Down

0 comments on commit fa96db5

Please sign in to comment.