Skip to content

Commit

Permalink
Crustdata enrichment tweaks (#2680)
Browse files Browse the repository at this point in the history
  • Loading branch information
epipav authored Nov 8, 2024
1 parent 65cb14b commit e3e0784
Show file tree
Hide file tree
Showing 20 changed files with 777 additions and 51 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
isCacheObsolete,
isEnrichableBySource,
normalizeEnrichmentData,
refreshMemberEnrichmentMaterializedView,
touchMemberEnrichmentCacheUpdatedAt,
updateMemberEnrichmentCache,
} from './activities/enrichment'
Expand Down Expand Up @@ -52,4 +53,5 @@ export {
updateMemberEnrichmentCache,
isEnrichableBySource,
findMemberIdentityWithTheMostActivityInPlatform,
refreshMemberEnrichmentMaterializedView,
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
touchMemberEnrichmentCacheUpdatedAtDb,
updateMemberEnrichmentCacheDb,
} from '@crowd/data-access-layer/src/old/apps/premium/members_enrichment_worker'
import { refreshMaterializedView } from '@crowd/data-access-layer/src/utils'
import { RedisCache } from '@crowd/redis'
import {
IEnrichableMemberIdentityActivityAggregate,
Expand Down Expand Up @@ -136,3 +137,7 @@ export async function findMemberIdentityWithTheMostActivityInPlatform(
): Promise<IEnrichableMemberIdentityActivityAggregate> {
return findMemberIdentityWithTheMostActivityInPlatformQuestDb(svc.questdbSQL, memberId, platform)
}

export async function refreshMemberEnrichmentMaterializedView(mvName: string): Promise<void> {
await refreshMaterializedView(svc.postgres.writer.connection(), mvName)
}
7 changes: 6 additions & 1 deletion services/apps/premium/members_enrichment_worker/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ import { Config } from '@crowd/archetype-standard'
import { Options, ServiceWorker } from '@crowd/archetype-worker'
import { Edition } from '@crowd/types'

import { scheduleMembersEnrichment, scheduleMembersLFIDEnrichment } from './schedules'
import {
scheduleMembersEnrichment,
scheduleMembersLFIDEnrichment,
scheduleRefreshMembersEnrichmentMaterializedViews,
} from './schedules'

const config: Config = {
envvars: [
Expand Down Expand Up @@ -43,6 +47,7 @@ export const svc = new ServiceWorker(config, options)
setImmediate(async () => {
await svc.init()

await scheduleRefreshMembersEnrichmentMaterializedViews()
await scheduleMembersEnrichment()

if (process.env['CROWD_EDITION'] === Edition.LFX) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import {
scheduleMembersEnrichment,
scheduleMembersLFIDEnrichment,
scheduleRefreshMembersEnrichmentMaterializedViews,
} from './schedules/getMembersToEnrich'

export { scheduleMembersEnrichment, scheduleMembersLFIDEnrichment }
export {
scheduleMembersEnrichment,
scheduleMembersLFIDEnrichment,
scheduleRefreshMembersEnrichmentMaterializedViews,
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ import { ScheduleAlreadyRunning, ScheduleOverlapPolicy } from '@temporalio/clien
import { IS_DEV_ENV, IS_TEST_ENV } from '@crowd/common'

import { svc } from '../main'
import { getMembersForLFIDEnrichment, getMembersToEnrich } from '../workflows'
import {
getMembersForLFIDEnrichment,
getMembersToEnrich,
refreshMemberEnrichmentMaterializedViews,
} from '../workflows'

export const scheduleMembersEnrichment = async () => {
try {
Expand Down Expand Up @@ -42,6 +46,38 @@ export const scheduleMembersEnrichment = async () => {
}
}

export const scheduleRefreshMembersEnrichmentMaterializedViews = async () => {
try {
await svc.temporal.schedule.create({
scheduleId: 'refresh-members-enrichment-materialized-views',
spec: {
cronExpressions: ['0 5 * * *'],
},
policies: {
overlap: ScheduleOverlapPolicy.SKIP,
catchupWindow: '1 minute',
},
action: {
type: 'startWorkflow',
workflowType: refreshMemberEnrichmentMaterializedViews,
taskQueue: 'members-enrichment',
retry: {
initialInterval: '15 seconds',
backoffCoefficient: 2,
maximumAttempts: 3,
},
},
})
} catch (err) {
if (err instanceof ScheduleAlreadyRunning) {
svc.log.info('Schedule already registered in Temporal.')
svc.log.info('Configuration may have changed since. Please make sure they are in sync.')
} else {
throw new Error(err)
}
}
}

export const scheduleMembersLFIDEnrichment = async () => {
try {
await svc.temporal.schedule.create({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export default class EnrichmentServiceClearbit extends LoggerBase implements IEn
public platform = `enrichment-${this.source}`
public enrichMembersWithActivityMoreThan = 10

public enrichableBySql = `"activitySummary".total_count > ${this.enrichMembersWithActivityMoreThan} AND mi.type = 'email' and mi.verified`
public enrichableBySql = `"membersGlobalActivityCount".total_count > ${this.enrichMembersWithActivityMoreThan} AND mi.type = 'email' and mi.verified`

// bust cache after 120 days
public cacheObsoleteAfterSeconds = 60 * 60 * 24 * 120
Expand Down Expand Up @@ -172,7 +172,9 @@ export default class EnrichmentServiceClearbit extends LoggerBase implements IEn
if (data.linkedin?.handle) {
normalized = normalizeSocialIdentity(
{
handle: data.linkedin.handle.split('/').pop(),
handle: data.linkedin.handle.endsWith('/')
? data.linkedin.handle.slice(0, -1).split('/').pop()
: data.linkedin.handle.split('/').pop(),
platform: PlatformType.LINKEDIN,
},
MemberIdentityType.USERNAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export default class EnrichmentServiceCrustdata extends LoggerBase implements IE

public enrichMembersWithActivityMoreThan = 1000

public enrichableBySql = `("activitySummary".total_count > ${this.enrichMembersWithActivityMoreThan}) AND mi.verified AND mi.type = 'username' and mi.platform = 'linkedin'`
public enrichableBySql = `("membersGlobalActivityCount".total_count > ${this.enrichMembersWithActivityMoreThan}) AND mi.verified AND mi.type = 'username' and mi.platform = 'linkedin'`

public cacheObsoleteAfterSeconds = 60 * 60 * 24 * 90

Expand Down Expand Up @@ -216,6 +216,7 @@ export default class EnrichmentServiceCrustdata extends LoggerBase implements IE
if (!linkedinUrlHashmap.get(input.linkedin.value)) {
consumableIdentities.push({
...input.linkedin,
value: input.linkedin.value.replace(/\//g, ''),
repeatedTimesInDifferentSources: 1,
isFromVerifiedSource: true,
})
Expand Down Expand Up @@ -279,7 +280,15 @@ export default class EnrichmentServiceCrustdata extends LoggerBase implements IE
}

if (data.email) {
for (const email of data.email.split(',').filter(isEmail)) {
let emails: string[]

if (Array.isArray(data.email)) {
emails = data.email
} else {
emails = data.email.split(',').filter(isEmail)
}

for (const email of emails) {
normalized.identities.push({
type: MemberIdentityType.EMAIL,
platform: this.platform,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export interface IMemberEnrichmentDataCrustdata {
linkedin_profile_url: string
linkedin_flagship_url: string
name: string
email: string
email: string | string[]
title: string
last_updated: string
headline: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ export default class EnrichmentServiceProgAILinkedinScraper
if (!linkedinUrlHashmap.get(input.linkedin.value)) {
consumableIdentities.push({
...input.linkedin,
value: input.linkedin.value.replace(/\//g, ''),
repeatedTimesInDifferentSources: 1,
isFromVerifiedSource: true,
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export default class EnrichmentServiceSerpApi extends LoggerBase implements IEnr
public enrichMembersWithActivityMoreThan = 500

public enrichableBySql = `
("activitySummary".total_count > ${this.enrichMembersWithActivityMoreThan}) AND
("membersGlobalActivityCount".total_count > ${this.enrichMembersWithActivityMoreThan}) AND
(members."displayName" like '% %') AND
(members.attributes->'location'->>'default' is not null and members.attributes->'location'->>'default' <> '') AND
((members.attributes->'websiteUrl'->>'default' is not null and members.attributes->'websiteUrl'->>'default' <> '') OR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export interface IEnrichmentService {
// SQL filter to get enrichable members for a source
// members table is available as "members" alias
// memberIdentities table is available as "mi" alias
// activity count is available in "activitySummary" alias, "activitySummary".total_count field
// activity count is available in "membersGlobalActivityCount" alias, "membersGlobalActivityCount".total_count field
enrichableBySql: string

// should either return the data or null if it's a miss
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,11 @@ export function normalizeAttributes(

return normalized
}

export function chunkArray<T>(array: T[], chunkSize: number): T[][] {
const chunks = []
for (let i = 0; i < array.length; i += chunkSize) {
chunks.push(array.slice(i, i + chunkSize))
}
return chunks
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { enrichMemberWithLFAuth0 } from './workflows/lf-auth0/enrichMemberWithLF
import { findAndSaveGithubSourceIds } from './workflows/lf-auth0/findAndSaveGithubSourceIds'
import { getEnrichmentData } from './workflows/lf-auth0/getEnrichmentData'
import { getMembersForLFIDEnrichment } from './workflows/lf-auth0/getMembersForLFIDEnrichment'
import { refreshMemberEnrichmentMaterializedViews } from './workflows/refreshMemberEnrichmentMaterializedViews'

export {
getMembersToEnrich,
Expand All @@ -12,4 +13,5 @@ export {
enrichMemberWithLFAuth0,
findAndSaveGithubSourceIds,
getEnrichmentData,
refreshMemberEnrichmentMaterializedViews,
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const {
normalizeEnrichmentData,
findMemberIdentityWithTheMostActivityInPlatform,
} = proxyActivities<typeof activities>({
startToCloseTimeout: '20 seconds',
startToCloseTimeout: '1 minute',
retry: {
initialInterval: '5s',
backoffCoefficient: 2.0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import {
proxyActivities,
} from '@temporalio/workflow'

import { MemberEnrichmentSource } from '@crowd/types'
import { IEnrichableMember, MemberEnrichmentSource } from '@crowd/types'

import * as activities from '../activities/getMembers'
import { IGetMembersForEnrichmentArgs } from '../types'
import { chunkArray } from '../utils/common'

import { enrichMember } from './enrichMember'

Expand All @@ -18,7 +19,8 @@ const { getEnrichableMembers } = proxyActivities<typeof activities>({
})

export async function getMembersToEnrich(args: IGetMembersForEnrichmentArgs): Promise<void> {
const MEMBER_ENRICHMENT_PER_RUN = 500
const QUERY_FOR_ENRICHABLE_MEMBERS_PER_RUN = 1000
const PARALLEL_ENRICHMENT_WORKFLOWS = 5
const afterCursor = args?.afterCursor || null
const sources = [
MemberEnrichmentSource.PROGAI,
Expand All @@ -28,37 +30,45 @@ export async function getMembersToEnrich(args: IGetMembersForEnrichmentArgs): Pr
MemberEnrichmentSource.CRUSTDATA,
]

const members = await getEnrichableMembers(MEMBER_ENRICHMENT_PER_RUN, sources, afterCursor)
const members = await getEnrichableMembers(
QUERY_FOR_ENRICHABLE_MEMBERS_PER_RUN,
sources,
afterCursor,
)

if (members.length === 0) {
return
}

await Promise.all(
members.map((member) => {
return executeChild(enrichMember, {
workflowId: 'member-enrichment/' + member.tenantId + '/' + member.id,
cancellationType: ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED,
parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
workflowExecutionTimeout: '15 minutes',
retry: {
backoffCoefficient: 2,
maximumAttempts: 10,
initialInterval: 2 * 1000,
maximumInterval: 30 * 1000,
},
args: [member, sources],
searchAttributes: {
TenantId: [member.tenantId],
},
})
}),
)
const chunks = chunkArray<IEnrichableMember>(members, PARALLEL_ENRICHMENT_WORKFLOWS)

for (const chunk of chunks) {
await Promise.all(
chunk.map((member) => {
return executeChild(enrichMember, {
workflowId: 'member-enrichment/' + member.tenantId + '/' + member.id,
cancellationType: ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED,
parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
workflowExecutionTimeout: '15 minutes',
retry: {
backoffCoefficient: 2,
maximumAttempts: 10,
initialInterval: 2 * 1000,
maximumInterval: 30 * 1000,
},
args: [member, sources],
searchAttributes: {
TenantId: [member.tenantId],
},
})
}),
)
}

await continueAsNew<typeof getMembersToEnrich>({
afterCursor: {
memberId: members[members.length - 1].id,
activityCount: members[members.length - 1].activityCount,
memberId: chunks[chunks.length - 1][chunks[chunks.length - 1].length - 1].id,
activityCount: chunks[chunks.length - 1][chunks[chunks.length - 1].length - 1].activityCount,
},
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { proxyActivities } from '@temporalio/workflow'

import { MemberEnrichmentMaterializedView } from '@crowd/types'

import * as activities from '../activities/enrichment'

const { refreshMemberEnrichmentMaterializedView } = proxyActivities<typeof activities>({
startToCloseTimeout: '10 minutes',
})

export async function refreshMemberEnrichmentMaterializedViews(): Promise<void> {
for (const mv of Object.values(MemberEnrichmentMaterializedView)) {
await refreshMemberEnrichmentMaterializedView(mv)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export async function fetchMembersForEnrichment(
afterCursor: { activityCount: number; memberId: string } | null,
): Promise<IEnrichableMember[]> {
const cursorFilter = afterCursor
? `AND ((coalesce("activitySummary".total_count, 0) < $2) OR (coalesce("activitySummary".total_count, 0) = $2 AND members.id < $3))`
? `AND ((coalesce("membersGlobalActivityCount".total_count, 0) < $2) OR (coalesce("membersGlobalActivityCount".total_count, 0) = $2 AND members.id < $3))`
: ''

const sourceInnerQueryItems = []
Expand Down Expand Up @@ -47,18 +47,6 @@ export async function fetchMembersForEnrichment(

return db.connection().query(
`
WITH "activitySummary" AS (
SELECT
msa."memberId",
SUM(msa."activityCount") AS total_count
FROM "memberSegmentsAgg" msa
WHERE msa."segmentId" IN (
SELECT id
FROM segments
WHERE "grandparentId" IS NOT NULL AND "parentId" IS NOT NULL
)
GROUP BY msa."memberId"
)
SELECT
members."id",
members."tenantId",
Expand All @@ -73,11 +61,11 @@ export async function fetchMembersForEnrichment(
'verified', mi.verified
)
) AS identities,
MAX(coalesce("activitySummary".total_count, 0)) AS "activityCount"
MAX(coalesce("membersGlobalActivityCount".total_count, 0)) AS "activityCount"
FROM members
INNER JOIN tenants ON tenants.id = members."tenantId"
INNER JOIN "memberIdentities" mi ON mi."memberId" = members.id
LEFT JOIN "activitySummary" ON "activitySummary"."memberId" = members.id
LEFT JOIN "membersGlobalActivityCount" ON "membersGlobalActivityCount"."memberId" = members.id
WHERE
${enrichableBySqlJoined}
AND tenants."deletedAt" IS NULL
Expand Down
Loading

0 comments on commit e3e0784

Please sign in to comment.