Skip to content

Commit

Permalink
Activities sync fixes (#2685)
Browse files Browse the repository at this point in the history
  • Loading branch information
sausage-todd authored Nov 12, 2024
1 parent be60f4b commit 25cbd7d
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 49 deletions.
50 changes: 33 additions & 17 deletions backend/src/bin/jobs/syncActivities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { PlatformType } from '@crowd/types'
import { DB_CONFIG } from '@/conf'

import { CrowdJob } from '../../types/jobTypes'
import { retryBackoff } from '../../utils/backoff'

async function decideUpdatedAt(pgQx: QueryExecutor, maxUpdatedAt?: string): Promise<string> {
if (!maxUpdatedAt) {
Expand All @@ -32,10 +33,15 @@ function createWhereClause(updatedAt: string): string {
return formatQuery('"updatedAt" > $(updatedAt)', { updatedAt })
}

async function syncActivitiesBatch(
activityRepo: ActivityRepository,
activities: IDbActivityCreateData[],
) {
async function syncActivitiesBatch({
logger,
activityRepo,
activities,
}: {
logger: Logger
activityRepo: ActivityRepository
activities: IDbActivityCreateData[]
}) {
const result = {
inserted: 0,
updated: 0,
Expand All @@ -44,15 +50,19 @@ async function syncActivitiesBatch(
for (const activity of activities) {
const existingActivity = await activityRepo.existsWithId(activity.id)

if (existingActivity) {
await activityRepo.rawUpdate(activity.id, {
...activity,
platform: activity.platform as PlatformType,
})
result.updated++
} else {
await activityRepo.rawInsert(activity)
result.inserted++
try {
if (existingActivity) {
await activityRepo.rawUpdate(activity.id, {
...activity,
platform: activity.platform as PlatformType,
})
result.updated++
} else {
await activityRepo.rawInsert(activity)
result.inserted++
}
} catch (error) {
logger.error(`Error syncing activity ${activity.id}: ${error}`)
}
}

Expand Down Expand Up @@ -97,15 +107,17 @@ export async function syncActivities(logger: Logger, maxUpdatedAt?: string) {
const result = await logExecutionTimeV2(
// eslint-disable-next-line @typescript-eslint/no-loop-func
() =>
qdbQx.select(
`
retryBackoff(() =>
qdbQx.select(
`
SELECT *
FROM activities
WHERE "updatedAt" > $(updatedAt)
ORDER BY "updatedAt"
LIMIT 1000;
`,
{ updatedAt },
{ updatedAt },
),
),
logger,
`getting activities with updatedAt > ${updatedAt}`,
Expand All @@ -116,7 +128,11 @@ export async function syncActivities(logger: Logger, maxUpdatedAt?: string) {
}

const t = timer(logger)
const { inserted, updated } = await syncActivitiesBatch(activityRepo, result)
const { inserted, updated } = await syncActivitiesBatch({
logger,
activityRepo,
activities: result,
})
t.end(`Inserting ${inserted} and updating ${updated} activities`)

counter += inserted + updated
Expand Down
18 changes: 18 additions & 0 deletions backend/src/utils/backoff.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { timeout } from '@crowd/common'

export async function retryBackoff<T>(fn: () => Promise<T>, maxRetries: number = 3): Promise<T> {
let retries = 0
while (retries < maxRetries) {
try {
return await fn()
} catch (error) {
retries++
// Exponential backoff with base of 2 seconds
// 1st retry: 2s, 2nd: 4s, 3rd: 8s, etc
const backoffMs = 2000 * 2 ** (retries - 1)
await timeout(backoffMs)
}
}

throw new Error('Max retries reached')
}
25 changes: 0 additions & 25 deletions services/apps/data_sink_worker/src/queue/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import { performance } from 'perf_hooks'

import { DataSinkWorkerEmitter, SearchSyncWorkerEmitter } from '@crowd/common_services'
import { DbConnection, DbStore } from '@crowd/data-access-layer/src/database'
import { Logger } from '@crowd/logging'
Expand All @@ -17,8 +15,6 @@ import {
import DataSinkService from '../service/dataSink.service'

export class WorkerQueueReceiver extends PrioritizedQueueReciever {
private readonly timingMap = new Map<string, { count: number; time: number }>()

constructor(
level: QueuePriorityLevel,
client: IQueue,
Expand All @@ -43,8 +39,6 @@ export class WorkerQueueReceiver extends PrioritizedQueueReciever {
}

override async processMessage(message: IQueueMessage): Promise<void> {
const startTime = performance.now()

try {
this.log.trace({ messageType: message.type }, 'Processing message!')

Expand Down Expand Up @@ -83,25 +77,6 @@ export class WorkerQueueReceiver extends PrioritizedQueueReciever {
} catch (err) {
this.log.error(err, 'Error while processing message!')
throw err
} finally {
const endTime = performance.now()
const duration = endTime - startTime
this.log.info({ msgType: message.type }, `Message processed in ${duration.toFixed(2)}ms!`)

if (this.timingMap.has(message.type)) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const data = this.timingMap.get(message.type)!
this.timingMap.set(message.type, { count: data.count + 1, time: data.time + duration })
} else {
this.timingMap.set(message.type, { count: 1, time: duration })
}

const data = this.timingMap.get(message.type)

this.log.info(
{ msgType: message.type },
`Average processing time: ${data.time / data.count}ms!`,
)
}
}
}
27 changes: 22 additions & 5 deletions services/apps/data_sink_worker/src/service/activity.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ import { IActivityCreateData, IActivityUpdateData, ISentimentActivityInput } fro
import MemberService from './member.service'
import MemberAffiliationService from './memberAffiliation.service'

export class SuppressedActivityError extends Error {
constructor(message: string) {
super(message)
this.name = 'SuppressedActivityError'
}
}

export default class ActivityService extends LoggerBase {
constructor(
private readonly pgStore: DbStore,
Expand Down Expand Up @@ -441,10 +448,18 @@ export default class ActivityService extends LoggerBase {
(i) => i.platform === platform && i.type === MemberIdentityType.USERNAME,
)
if (!identity) {
this.log.error("Activity's member does not have an identity for the platform.")
throw new Error(
`Activity's member does not have an identity for the platform: ${platform}!`,
)
if (platform === PlatformType.JIRA) {
throw new SuppressedActivityError(
`Activity's member does not have an identity for the platform: ${platform}!`,
)
} else {
this.log.error(
"Activity's member does not have an identity for the platform. Suppressing it!",
)
throw new Error(
`Activity's member does not have an identity for the platform: ${platform}!`,
)
}
}

username = identity.value
Expand Down Expand Up @@ -1133,7 +1148,9 @@ export default class ActivityService extends LoggerBase {
await this.redisClient.sAdd('organizationIdsForAggComputation', organizationId)
}
} catch (err) {
this.log.error(err, 'Error while processing an activity!')
if (!(err instanceof SuppressedActivityError)) {
this.log.error(err, 'Error while processing an activity!')
}
throw err
}
}
Expand Down
5 changes: 4 additions & 1 deletion services/libs/data-access-layer/src/activities/sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import merge from 'lodash.merge'
import { RawQueryParser, getEnv } from '@crowd/common'
import { DbConnOrTx } from '@crowd/database'
import { ActivityDisplayService } from '@crowd/integrations'
import { getServiceChildLogger } from '@crowd/logging'
import {
ActivityDisplayVariant,
IActivityBySentimentMoodResult,
Expand Down Expand Up @@ -383,6 +384,8 @@ export const ALL_COLUMNS_TO_SELECT: ActivityColumn[] = DEFAULT_COLUMNS_TO_SELECT
'gitIsMerge',
])

const logger = getServiceChildLogger('activities')

export async function queryActivities(
qdbConn: DbConnOrTx,
arg: IQueryActivitiesParameters,
Expand Down Expand Up @@ -549,7 +552,7 @@ export async function queryActivities(

query += ';'

console.log('QuestDB activity query', query)
logger.info('QuestDB activity query', query)

const [results, countResults] = await Promise.all([
qdbConn.any(query, params),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,6 @@ export default class ActivityRepository extends RepositoryBase<ActivityRepositor
public async rawInsert(data: IDbActivityCreateData): Promise<void> {
const prepared = RepositoryBase.prepare(data, this.insertActivityColumnSet)
const query = this.dbInstance.helpers.insert(prepared, this.insertActivityColumnSet)
await this.db().none(query)
await this.db().none(`${query} on conflict do nothing`)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ export default class RequestedForErasureMemberIdentitiesRepository extends Repos
private async wasIdentityRequestedForErasure(
identity: IMemberIdentity,
): Promise<{ id: string } | null> {
if (!identity.value) {
return null
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const params: any = {
type: identity.type,
Expand Down

0 comments on commit 25cbd7d

Please sign in to comment.