diff --git a/backend/src/bin/jobs/syncActivities.ts b/backend/src/bin/jobs/syncActivities.ts index 5708b0bc5a..3b41f19a7a 100644 --- a/backend/src/bin/jobs/syncActivities.ts +++ b/backend/src/bin/jobs/syncActivities.ts @@ -11,6 +11,7 @@ import { PlatformType } from '@crowd/types' import { DB_CONFIG } from '@/conf' import { CrowdJob } from '../../types/jobTypes' +import { retryBackoff } from '../../utils/backoff' async function decideUpdatedAt(pgQx: QueryExecutor, maxUpdatedAt?: string): Promise { if (!maxUpdatedAt) { @@ -32,10 +33,15 @@ function createWhereClause(updatedAt: string): string { return formatQuery('"updatedAt" > $(updatedAt)', { updatedAt }) } -async function syncActivitiesBatch( - activityRepo: ActivityRepository, - activities: IDbActivityCreateData[], -) { +async function syncActivitiesBatch({ + logger, + activityRepo, + activities, +}: { + logger: Logger + activityRepo: ActivityRepository + activities: IDbActivityCreateData[] +}) { const result = { inserted: 0, updated: 0, @@ -44,15 +50,19 @@ async function syncActivitiesBatch( for (const activity of activities) { const existingActivity = await activityRepo.existsWithId(activity.id) - if (existingActivity) { - await activityRepo.rawUpdate(activity.id, { - ...activity, - platform: activity.platform as PlatformType, - }) - result.updated++ - } else { - await activityRepo.rawInsert(activity) - result.inserted++ + try { + if (existingActivity) { + await activityRepo.rawUpdate(activity.id, { + ...activity, + platform: activity.platform as PlatformType, + }) + result.updated++ + } else { + await activityRepo.rawInsert(activity) + result.inserted++ + } + } catch (error) { + logger.error(`Error syncing activity ${activity.id}: ${error}`) } } @@ -97,15 +107,17 @@ export async function syncActivities(logger: Logger, maxUpdatedAt?: string) { const result = await logExecutionTimeV2( // eslint-disable-next-line @typescript-eslint/no-loop-func () => - qdbQx.select( - ` + retryBackoff(() => + qdbQx.select( + ` SELECT * FROM activities WHERE "updatedAt" > $(updatedAt) ORDER BY "updatedAt" LIMIT 1000; `, - { updatedAt }, + { updatedAt }, + ), ), logger, `getting activities with updatedAt > ${updatedAt}`, @@ -116,7 +128,11 @@ export async function syncActivities(logger: Logger, maxUpdatedAt?: string) { } const t = timer(logger) - const { inserted, updated } = await syncActivitiesBatch(activityRepo, result) + const { inserted, updated } = await syncActivitiesBatch({ + logger, + activityRepo, + activities: result, + }) t.end(`Inserting ${inserted} and updating ${updated} activities`) counter += inserted + updated diff --git a/backend/src/utils/backoff.ts b/backend/src/utils/backoff.ts new file mode 100644 index 0000000000..bf45515c1b --- /dev/null +++ b/backend/src/utils/backoff.ts @@ -0,0 +1,18 @@ +import { timeout } from '@crowd/common' + +export async function retryBackoff(fn: () => Promise, maxRetries: number = 3): Promise { + let retries = 0 + while (retries < maxRetries) { + try { + return await fn() + } catch (error) { + retries++ + // Exponential backoff with base of 2 seconds + // 1st retry: 2s, 2nd: 4s, 3rd: 8s, etc + const backoffMs = 2000 * 2 ** (retries - 1) + await timeout(backoffMs) + } + } + + throw new Error('Max retries reached') +} diff --git a/services/apps/data_sink_worker/src/queue/index.ts b/services/apps/data_sink_worker/src/queue/index.ts index ab69ee4f72..8aa8eb4465 100644 --- a/services/apps/data_sink_worker/src/queue/index.ts +++ b/services/apps/data_sink_worker/src/queue/index.ts @@ -1,5 +1,3 @@ -import { performance } from 'perf_hooks' - import { DataSinkWorkerEmitter, SearchSyncWorkerEmitter } from '@crowd/common_services' import { DbConnection, DbStore } from '@crowd/data-access-layer/src/database' import { Logger } from '@crowd/logging' @@ -17,8 +15,6 @@ import { import DataSinkService from '../service/dataSink.service' export class WorkerQueueReceiver extends PrioritizedQueueReciever { - private readonly timingMap = new Map() - constructor( level: QueuePriorityLevel, client: IQueue, @@ -43,8 +39,6 @@ export class WorkerQueueReceiver extends PrioritizedQueueReciever { } override async processMessage(message: IQueueMessage): Promise { - const startTime = performance.now() - try { this.log.trace({ messageType: message.type }, 'Processing message!') @@ -83,25 +77,6 @@ export class WorkerQueueReceiver extends PrioritizedQueueReciever { } catch (err) { this.log.error(err, 'Error while processing message!') throw err - } finally { - const endTime = performance.now() - const duration = endTime - startTime - this.log.info({ msgType: message.type }, `Message processed in ${duration.toFixed(2)}ms!`) - - if (this.timingMap.has(message.type)) { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const data = this.timingMap.get(message.type)! - this.timingMap.set(message.type, { count: data.count + 1, time: data.time + duration }) - } else { - this.timingMap.set(message.type, { count: 1, time: duration }) - } - - const data = this.timingMap.get(message.type) - - this.log.info( - { msgType: message.type }, - `Average processing time: ${data.time / data.count}ms!`, - ) } } } diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index 755f830ab0..7306feb1e1 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -42,6 +42,13 @@ import { IActivityCreateData, IActivityUpdateData, ISentimentActivityInput } fro import MemberService from './member.service' import MemberAffiliationService from './memberAffiliation.service' +export class SuppressedActivityError extends Error { + constructor(message: string) { + super(message) + this.name = 'SuppressedActivityError' + } +} + export default class ActivityService extends LoggerBase { constructor( private readonly pgStore: DbStore, @@ -441,10 +448,18 @@ export default class ActivityService extends LoggerBase { (i) => i.platform === platform && i.type === MemberIdentityType.USERNAME, ) if (!identity) { - this.log.error("Activity's member does not have an identity for the platform.") - throw new Error( - `Activity's member does not have an identity for the platform: ${platform}!`, - ) + if (platform === PlatformType.JIRA) { + throw new SuppressedActivityError( + `Activity's member does not have an identity for the platform: ${platform}!`, + ) + } else { + this.log.error( + "Activity's member does not have an identity for the platform. Suppressing it!", + ) + throw new Error( + `Activity's member does not have an identity for the platform: ${platform}!`, + ) + } } username = identity.value @@ -1133,7 +1148,9 @@ export default class ActivityService extends LoggerBase { await this.redisClient.sAdd('organizationIdsForAggComputation', organizationId) } } catch (err) { - this.log.error(err, 'Error while processing an activity!') + if (!(err instanceof SuppressedActivityError)) { + this.log.error(err, 'Error while processing an activity!') + } throw err } } diff --git a/services/libs/data-access-layer/src/activities/sql.ts b/services/libs/data-access-layer/src/activities/sql.ts index f6a58a3bbb..2985350b76 100644 --- a/services/libs/data-access-layer/src/activities/sql.ts +++ b/services/libs/data-access-layer/src/activities/sql.ts @@ -4,6 +4,7 @@ import merge from 'lodash.merge' import { RawQueryParser, getEnv } from '@crowd/common' import { DbConnOrTx } from '@crowd/database' import { ActivityDisplayService } from '@crowd/integrations' +import { getServiceChildLogger } from '@crowd/logging' import { ActivityDisplayVariant, IActivityBySentimentMoodResult, @@ -383,6 +384,8 @@ export const ALL_COLUMNS_TO_SELECT: ActivityColumn[] = DEFAULT_COLUMNS_TO_SELECT 'gitIsMerge', ]) +const logger = getServiceChildLogger('activities') + export async function queryActivities( qdbConn: DbConnOrTx, arg: IQueryActivitiesParameters, @@ -549,7 +552,7 @@ export async function queryActivities( query += ';' - console.log('QuestDB activity query', query) + logger.info('QuestDB activity query', query) const [results, countResults] = await Promise.all([ qdbConn.any(query, params), diff --git a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts index b1747b1b22..0ae966940c 100644 --- a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts +++ b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts @@ -138,6 +138,6 @@ export default class ActivityRepository extends RepositoryBase { const prepared = RepositoryBase.prepare(data, this.insertActivityColumnSet) const query = this.dbInstance.helpers.insert(prepared, this.insertActivityColumnSet) - await this.db().none(query) + await this.db().none(`${query} on conflict do nothing`) } } diff --git a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/requestedForErasureMemberIdentities.repo.ts b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/requestedForErasureMemberIdentities.repo.ts index 2667d75474..2fd9f1508c 100644 --- a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/requestedForErasureMemberIdentities.repo.ts +++ b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/requestedForErasureMemberIdentities.repo.ts @@ -29,6 +29,10 @@ export default class RequestedForErasureMemberIdentitiesRepository extends Repos private async wasIdentityRequestedForErasure( identity: IMemberIdentity, ): Promise<{ id: string } | null> { + if (!identity.value) { + return null + } + // eslint-disable-next-line @typescript-eslint/no-explicit-any const params: any = { type: identity.type,