Skip to content

Commit

Permalink
Enrichment monitoring fixes (#2682)
Browse files Browse the repository at this point in the history
  • Loading branch information
epipav authored Nov 11, 2024
1 parent 4875a4c commit be60f4b
Show file tree
Hide file tree
Showing 14 changed files with 167 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,8 @@ from unique_members),
select distinct mi."memberId"
from "memberIdentities" mi
left join "membersGlobalActivityCount" on "membersGlobalActivityCount"."memberId" = mi."memberId"
where mi.verified
where "membersGlobalActivityCount".total_count > 1000
and mi.verified
and mi.platform = 'linkedin'
group by mi."memberId"),
unique_members as (
Expand Down Expand Up @@ -359,7 +360,8 @@ from unique_members),
select distinct mi."memberId"
from "memberIdentities" mi
left join "membersGlobalActivityCount" on "membersGlobalActivityCount"."memberId" = mi."memberId"
where mi.verified
where "membersGlobalActivityCount".total_count > 1000
and mi.verified
and mi.platform = 'linkedin'
group by mi."memberId"),
unique_members as (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import {
findMemberEnrichmentCache,
findMemberIdentityWithTheMostActivityInPlatform,
getEnrichmentData,
getEnrichmentInput,
getObsoleteSourcesOfMember,
insertMemberEnrichmentCache,
isCacheObsolete,
isEnrichableBySource,
Expand All @@ -10,7 +12,7 @@ import {
touchMemberEnrichmentCacheUpdatedAt,
updateMemberEnrichmentCache,
} from './activities/enrichment'
import { getEnrichableMembers } from './activities/getMembers'
import { getEnrichableMembers, getMaxConcurrentRequests } from './activities/getMembers'
import { refreshToken } from './activities/lf-auth0/authenticateLFAuth0'
import {
getIdentitiesExistInOtherMembers,
Expand Down Expand Up @@ -54,4 +56,7 @@ export {
isEnrichableBySource,
findMemberIdentityWithTheMostActivityInPlatform,
refreshMemberEnrichmentMaterializedView,
getEnrichmentInput,
getMaxConcurrentRequests,
getObsoleteSourcesOfMember,
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ import {
import { refreshMaterializedView } from '@crowd/data-access-layer/src/utils'
import { RedisCache } from '@crowd/redis'
import {
IEnrichableMember,
IEnrichableMemberIdentityActivityAggregate,
IMemberEnrichmentCache,
MemberEnrichmentSource,
MemberIdentityType,
PlatformType,
} from '@crowd/types'

import { EnrichmentSourceServiceFactory } from '../factory'
Expand Down Expand Up @@ -41,6 +44,51 @@ export async function getEnrichmentData(
return null
}

export async function getEnrichmentInput(
input: IEnrichableMember,
): Promise<IEnrichmentSourceInput> {
const enrichmentInput: IEnrichmentSourceInput = {
memberId: input.id,
email: input.identities.find((i) => i.verified && i.type === MemberIdentityType.EMAIL),
linkedin: input.identities.find(
(i) =>
i.verified &&
i.platform === PlatformType.LINKEDIN &&
i.type === MemberIdentityType.USERNAME,
),
displayName: input.displayName || undefined,
website: input.website || undefined,
location: input.location || undefined,
activityCount: input.activityCount || 0,
}

// there can be multiple verified identities in github, we select the one with the most activities
const verifiedGithubIdentities = input.identities.filter(
(i) =>
i.verified && i.platform === PlatformType.GITHUB && i.type === MemberIdentityType.USERNAME,
)

if (verifiedGithubIdentities.length > 1) {
const ghIdentityWithTheMostActivities = await findMemberIdentityWithTheMostActivityInPlatform(
input.id,
PlatformType.GITHUB,
)
if (ghIdentityWithTheMostActivities) {
enrichmentInput.github = input.identities.find(
(i) =>
i.verified &&
i.platform === PlatformType.GITHUB &&
i.type === MemberIdentityType.USERNAME &&
i.value === ghIdentityWithTheMostActivities.username,
)
}
} else {
enrichmentInput.github = verifiedGithubIdentities?.[0] || undefined
}

return enrichmentInput
}

export async function normalizeEnrichmentData(
source: MemberEnrichmentSource,
data: IMemberEnrichmentData,
Expand All @@ -53,6 +101,13 @@ export async function isCacheObsolete(
source: MemberEnrichmentSource,
cache: IMemberEnrichmentCache<IMemberEnrichmentData>,
): Promise<boolean> {
return isCacheObsoleteSync(source, cache)
}

export function isCacheObsoleteSync(
source: MemberEnrichmentSource,
cache: IMemberEnrichmentCache<IMemberEnrichmentData>,
): boolean {
const service = EnrichmentSourceServiceFactory.getEnrichmentSourceService(source, svc.log)
return (
!cache ||
Expand Down Expand Up @@ -104,8 +159,13 @@ export async function findMemberEnrichmentCache(

export async function findMemberEnrichmentCacheForAllSources(
memberId: string,
returnRowsWithoutData = false,
): Promise<IMemberEnrichmentCache<IMemberEnrichmentData>[]> {
return findMemberEnrichmentCacheForAllSourcesDb(svc.postgres.reader.connection(), memberId)
return findMemberEnrichmentCacheForAllSourcesDb(
svc.postgres.reader.connection(),
memberId,
returnRowsWithoutData,
)
}

export async function insertMemberEnrichmentCache(
Expand Down Expand Up @@ -138,6 +198,20 @@ export async function findMemberIdentityWithTheMostActivityInPlatform(
return findMemberIdentityWithTheMostActivityInPlatformQuestDb(svc.questdbSQL, memberId, platform)
}

export async function getObsoleteSourcesOfMember(
memberId: string,
possibleSources: MemberEnrichmentSource[],
): Promise<MemberEnrichmentSource[]> {
const caches = await findMemberEnrichmentCacheForAllSources(memberId, true)
const obsoleteSources = possibleSources.filter((source) =>
isCacheObsoleteSync(
source,
caches.find((c) => c.source === source),
),
)
return obsoleteSources
}

export async function refreshMemberEnrichmentMaterializedView(mvName: string): Promise<void> {
await refreshMaterializedView(svc.postgres.writer.connection(), mvName)
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import {

import { EnrichmentSourceServiceFactory } from '../factory'
import { svc } from '../main'
import { IEnrichmentService } from '../types'

export async function getEnrichableMembers(
limit: number,
sources: MemberEnrichmentSource[],
afterCursor: { activityCount: number; memberId: string } | null,
): Promise<IEnrichableMember[]> {
let rows: IEnrichableMember[] = []
const sourceInputs: IMemberEnrichmentSourceQueryInput[] = sources.map((s) => {
Expand All @@ -23,7 +23,34 @@ export async function getEnrichableMembers(
}
})
const db = svc.postgres.reader
rows = await fetchMembersForEnrichment(db, limit, sourceInputs, afterCursor)
rows = await fetchMembersForEnrichment(db, limit, sourceInputs)

return rows
}

// Get the most strict parallelism among existing and enrichable sources
// We only check sources that has activity count cutoff in current range
export async function getMaxConcurrentRequests(
members: IEnrichableMember[],
possibleSources: MemberEnrichmentSource[],
concurrencyLimit: number,
): Promise<number> {
const serviceMap: Partial<Record<MemberEnrichmentSource, IEnrichmentService>> = {}
const currentProcessingActivityCount = members[0].activityCount

let maxConcurrentRequestsInAllSources = concurrencyLimit

for (const source of possibleSources) {
serviceMap[source] = EnrichmentSourceServiceFactory.getEnrichmentSourceService(source, svc.log)
const activityCountCutoff = serviceMap[source].enrichMembersWithActivityMoreThan
if (!activityCountCutoff || activityCountCutoff <= currentProcessingActivityCount) {
maxConcurrentRequestsInAllSources = Math.min(
maxConcurrentRequestsInAllSources,
serviceMap[source].maxConcurrentRequests,
)
}
}
svc.log.info('Setting max concurrent requests', { maxConcurrentRequestsInAllSources })

return maxConcurrentRequestsInAllSources
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,7 @@ export const scheduleMembersEnrichment = async () => {
backoffCoefficient: 2,
maximumAttempts: 3,
},
args: [
{
afterCursor: null,
},
],
args: [],
},
})
} catch (err) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ export default class EnrichmentServiceClearbit extends LoggerBase implements IEn

public enrichableBySql = `"membersGlobalActivityCount".total_count > ${this.enrichMembersWithActivityMoreThan} AND mi.type = 'email' and mi.verified`

public maxConcurrentRequests = 15

// bust cache after 120 days
public cacheObsoleteAfterSeconds = 60 * 60 * 24 * 120

Expand All @@ -57,7 +59,11 @@ export default class EnrichmentServiceClearbit extends LoggerBase implements IEn
}

async isEnrichableBySource(input: IEnrichmentSourceInput): Promise<boolean> {
return !!input.email?.value && input.email?.verified
return (
input.activityCount > this.enrichMembersWithActivityMoreThan &&
!!input.email?.value &&
input.email?.verified
)
}

async getData(input: IEnrichmentSourceInput): Promise<IMemberEnrichmentDataClearbit | null> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ export default class EnrichmentServiceCrustdata extends LoggerBase implements IE

public cacheObsoleteAfterSeconds = 60 * 60 * 24 * 90

public maxConcurrentRequests = 5

public attributeSettings: IMemberEnrichmentAttributeSettings = {
[MemberAttributeName.AVATAR_URL]: {
fields: ['profile_picture_url'],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ export default class EnrichmentServiceProgAILinkedinScraper

public cacheObsoleteAfterSeconds = 60 * 60 * 24 * 90

public maxConcurrentRequests = 1000

constructor(public readonly log: Logger) {
super(log)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ export default class EnrichmentServiceProgAI extends LoggerBase implements IEnri
public source: MemberEnrichmentSource = MemberEnrichmentSource.PROGAI
public platform = `enrichment-${this.source}`

enrichableBySql = `mi.verified and ((mi.type = 'username' AND mi.platform = 'github') OR (mi.type = 'email'))`
public enrichableBySql = `mi.verified and ((mi.type = 'username' AND mi.platform = 'github') OR (mi.type = 'email'))`

// bust cache after 90 days
public cacheObsoleteAfterSeconds = 60 * 60 * 24 * 90

public maxConcurrentRequests = 1000

public attributeSettings: IMemberEnrichmentAttributeSettings = {
[MemberAttributeName.AVATAR_URL]: {
fields: ['profile_pic_url'],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ export default class EnrichmentServiceSerpApi extends LoggerBase implements IEnr
// bust cache after 120 days
public cacheObsoleteAfterSeconds = 60 * 60 * 24 * 120

public maxConcurrentRequests = 300

constructor(public readonly log: Logger) {
super(log)
}
Expand Down
10 changes: 6 additions & 4 deletions services/apps/premium/members_enrichment_worker/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ export interface IEnrichmentService {
// cache rows with older updatedAt than this will be considered obsolete and will be re-enriched
cacheObsoleteAfterSeconds: number

// max concurrent requests that can be made to the source
maxConcurrentRequests: number

// can the source enrich using this input
isEnrichableBySource(input: IEnrichmentSourceInput): Promise<boolean>

Expand All @@ -54,6 +57,9 @@ export interface IEnrichmentService {
// activity count is available in "membersGlobalActivityCount" alias, "membersGlobalActivityCount".total_count field
enrichableBySql: string

// only enrich members with activity more than this number
enrichMembersWithActivityMoreThan?: number

// should either return the data or null if it's a miss
getData(input: IEnrichmentSourceInput): Promise<IMemberEnrichmentData | null>
normalize(
Expand Down Expand Up @@ -88,10 +94,6 @@ export interface IMemberEnrichmentLinkedinScraperMetadata {
isFromVerifiedSource: boolean
}

export interface IGetMembersForEnrichmentArgs {
afterCursor: { activityCount: number; memberId: string } | null
}

export interface IMemberEnrichmentSocialData {
platform: PlatformType
handle: string
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
import { proxyActivities } from '@temporalio/workflow'

import {
IEnrichableMember,
MemberEnrichmentSource,
MemberIdentityType,
PlatformType,
} from '@crowd/types'
import { IEnrichableMember, MemberEnrichmentSource } from '@crowd/types'

import * as activities from '../activities'
import { IEnrichmentSourceInput } from '../types'
Expand All @@ -19,7 +14,7 @@ const {
updateMemberEnrichmentCache,
isCacheObsolete,
normalizeEnrichmentData,
findMemberIdentityWithTheMostActivityInPlatform,
getEnrichmentInput,
} = proxyActivities<typeof activities>({
startToCloseTimeout: '1 minute',
retry: {
Expand All @@ -42,44 +37,7 @@ export async function enrichMember(

// cache is obsolete when it's not found or cache.updatedAt is older than cacheObsoleteAfterSeconds
if (await isCacheObsolete(source, cache)) {
const enrichmentInput: IEnrichmentSourceInput = {
memberId: input.id,
email: input.identities.find((i) => i.verified && i.type === MemberIdentityType.EMAIL),
linkedin: input.identities.find(
(i) =>
i.verified &&
i.platform === PlatformType.LINKEDIN &&
i.type === MemberIdentityType.USERNAME,
),
displayName: input.displayName || undefined,
website: input.website || undefined,
location: input.location || undefined,
activityCount: input.activityCount || 0,
}

// there can be multiple verified identities in github, we select the one with the most activities
const verifiedGithubIdentities = input.identities.filter(
(i) =>
i.verified &&
i.platform === PlatformType.GITHUB &&
i.type === MemberIdentityType.USERNAME,
)

if (verifiedGithubIdentities.length > 1) {
const ghIdentityWithTheMostActivities =
await findMemberIdentityWithTheMostActivityInPlatform(input.id, PlatformType.GITHUB)
if (ghIdentityWithTheMostActivities) {
enrichmentInput.github = input.identities.find(
(i) =>
i.verified &&
i.platform === PlatformType.GITHUB &&
i.type === MemberIdentityType.USERNAME &&
i.value === ghIdentityWithTheMostActivities.username,
)
}
} else {
enrichmentInput.github = verifiedGithubIdentities?.[0] || undefined
}
const enrichmentInput: IEnrichmentSourceInput = await getEnrichmentInput(input)

const data = await getEnrichmentData(source, enrichmentInput)

Expand Down
Loading

0 comments on commit be60f4b

Please sign in to comment.