Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enrichment monitoring fixes #2682

Merged
merged 16 commits into from
Nov 11, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,8 @@ from unique_members),
select distinct mi."memberId"
from "memberIdentities" mi
left join "membersGlobalActivityCount" on "membersGlobalActivityCount"."memberId" = mi."memberId"
where mi.verified
where "membersGlobalActivityCount".total_count > 1000
and mi.verified
and mi.platform = 'linkedin'
group by mi."memberId"),
unique_members as (
Expand Down Expand Up @@ -359,7 +360,8 @@ from unique_members),
select distinct mi."memberId"
from "memberIdentities" mi
left join "membersGlobalActivityCount" on "membersGlobalActivityCount"."memberId" = mi."memberId"
where mi.verified
where "membersGlobalActivityCount".total_count > 1000
and mi.verified
and mi.platform = 'linkedin'
group by mi."memberId"),
unique_members as (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import {
findMemberEnrichmentCache,
findMemberIdentityWithTheMostActivityInPlatform,
getEnrichmentData,
getEnrichmentInput,
getObsoleteSourcesOfMember,
insertMemberEnrichmentCache,
isCacheObsolete,
isEnrichableBySource,
Expand All @@ -10,7 +12,7 @@ import {
touchMemberEnrichmentCacheUpdatedAt,
updateMemberEnrichmentCache,
} from './activities/enrichment'
import { getEnrichableMembers } from './activities/getMembers'
import { getEnrichableMembers, getMaxConcurrentRequests } from './activities/getMembers'
import { refreshToken } from './activities/lf-auth0/authenticateLFAuth0'
import {
getIdentitiesExistInOtherMembers,
Expand Down Expand Up @@ -54,4 +56,7 @@ export {
isEnrichableBySource,
findMemberIdentityWithTheMostActivityInPlatform,
refreshMemberEnrichmentMaterializedView,
getEnrichmentInput,
getMaxConcurrentRequests,
getObsoleteSourcesOfMember,
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ import {
import { refreshMaterializedView } from '@crowd/data-access-layer/src/utils'
import { RedisCache } from '@crowd/redis'
import {
IEnrichableMember,
IEnrichableMemberIdentityActivityAggregate,
IMemberEnrichmentCache,
MemberEnrichmentSource,
MemberIdentityType,
PlatformType,
} from '@crowd/types'

import { EnrichmentSourceServiceFactory } from '../factory'
Expand Down Expand Up @@ -41,6 +44,51 @@ export async function getEnrichmentData(
return null
}

export async function getEnrichmentInput(
input: IEnrichableMember,
): Promise<IEnrichmentSourceInput> {
const enrichmentInput: IEnrichmentSourceInput = {
memberId: input.id,
email: input.identities.find((i) => i.verified && i.type === MemberIdentityType.EMAIL),
linkedin: input.identities.find(
(i) =>
i.verified &&
i.platform === PlatformType.LINKEDIN &&
i.type === MemberIdentityType.USERNAME,
),
displayName: input.displayName || undefined,
website: input.website || undefined,
location: input.location || undefined,
activityCount: input.activityCount || 0,
}

// there can be multiple verified identities in github, we select the one with the most activities
const verifiedGithubIdentities = input.identities.filter(
(i) =>
i.verified && i.platform === PlatformType.GITHUB && i.type === MemberIdentityType.USERNAME,
)

if (verifiedGithubIdentities.length > 1) {
const ghIdentityWithTheMostActivities = await findMemberIdentityWithTheMostActivityInPlatform(
input.id,
PlatformType.GITHUB,
)
if (ghIdentityWithTheMostActivities) {
enrichmentInput.github = input.identities.find(
(i) =>
i.verified &&
i.platform === PlatformType.GITHUB &&
i.type === MemberIdentityType.USERNAME &&
i.value === ghIdentityWithTheMostActivities.username,
)
}
} else {
enrichmentInput.github = verifiedGithubIdentities?.[0] || undefined
}

return enrichmentInput
}
Comment on lines +47 to +90
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling and input validation.

The function lacks error handling for the findMemberIdentityWithTheMostActivityInPlatform call and doesn't validate if verified identities exist before accessing them.

Consider applying these improvements:

 export async function getEnrichmentInput(
   input: IEnrichableMember,
 ): Promise<IEnrichmentSourceInput> {
+  if (!input || !input.identities) {
+    throw new Error('Invalid input: Member or identities are missing');
+  }
+
   const enrichmentInput: IEnrichmentSourceInput = {
     memberId: input.id,
     email: input.identities.find((i) => i.verified && i.type === MemberIdentityType.EMAIL),
     linkedin: input.identities.find(
       (i) =>
         i.verified &&
         i.platform === PlatformType.LINKEDIN &&
         i.type === MemberIdentityType.USERNAME,
     ),
     displayName: input.displayName || undefined,
     website: input.website || undefined,
     location: input.location || undefined,
     activityCount: input.activityCount || 0,
   }

   // Extract GitHub identity selection to a separate function for better readability
+  enrichmentInput.github = await selectGithubIdentity(input);
   
   return enrichmentInput
 }
+
+async function selectGithubIdentity(input: IEnrichableMember) {
+  try {
     const verifiedGithubIdentities = input.identities.filter(
       (i) =>
         i.verified && i.platform === PlatformType.GITHUB && i.type === MemberIdentityType.USERNAME,
     )
 
     if (verifiedGithubIdentities.length > 1) {
       const ghIdentityWithTheMostActivities = await findMemberIdentityWithTheMostActivityInPlatform(
         input.id,
         PlatformType.GITHUB,
       )
-      if (ghIdentityWithTheMostActivities) {
-        enrichmentInput.github = input.identities.find(
+      return ghIdentityWithTheMostActivities
+        ? input.identities.find(
           (i) =>
             i.verified &&
             i.platform === PlatformType.GITHUB &&
             i.type === MemberIdentityType.USERNAME &&
             i.value === ghIdentityWithTheMostActivities.username,
         )
-      }
+        : undefined
     } else {
-      enrichmentInput.github = verifiedGithubIdentities?.[0] || undefined
+      return verifiedGithubIdentities[0]
     }
+  } catch (error) {
+    svc.log.error('Error selecting GitHub identity', { error, memberId: input.id });
+    return undefined;
+  }
+}

Committable suggestion skipped: line range outside the PR's diff.


export async function normalizeEnrichmentData(
source: MemberEnrichmentSource,
data: IMemberEnrichmentData,
Expand Down Expand Up @@ -104,8 +152,13 @@ export async function findMemberEnrichmentCache(

export async function findMemberEnrichmentCacheForAllSources(
memberId: string,
returnRowsWithoutData = false,
): Promise<IMemberEnrichmentCache<IMemberEnrichmentData>[]> {
return findMemberEnrichmentCacheForAllSourcesDb(svc.postgres.reader.connection(), memberId)
return findMemberEnrichmentCacheForAllSourcesDb(
svc.postgres.reader.connection(),
memberId,
returnRowsWithoutData,
)
}

export async function insertMemberEnrichmentCache(
Expand Down Expand Up @@ -138,6 +191,20 @@ export async function findMemberIdentityWithTheMostActivityInPlatform(
return findMemberIdentityWithTheMostActivityInPlatformQuestDb(svc.questdbSQL, memberId, platform)
}

export async function getObsoleteSourcesOfMember(
memberId: string,
possibleSources: MemberEnrichmentSource[],
): Promise<MemberEnrichmentSource[]> {
const caches = await findMemberEnrichmentCacheForAllSources(memberId, true)
const obsoleteSources = possibleSources.filter((source) =>
isCacheObsolete(
source,
caches.find((s) => s.source === source),
),
)
return obsoleteSources
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix async operation in Array.filter and add error handling.

The function has a critical issue where isCacheObsolete is an async function but is used in Array.filter without proper await handling.

Apply this fix to handle async operations correctly and add error handling:

 export async function getObsoleteSourcesOfMember(
   memberId: string,
   possibleSources: MemberEnrichmentSource[],
 ): Promise<MemberEnrichmentSource[]> {
-  const caches = await findMemberEnrichmentCacheForAllSources(memberId, true)
-  const obsoleteSources = possibleSources.filter((source) =>
-    isCacheObsolete(
-      source,
-      caches.find((s) => s.source === source),
-    ),
-  )
-  return obsoleteSources
+  try {
+    const caches = await findMemberEnrichmentCacheForAllSources(memberId, true);
+    const obsoleteSources = await Promise.all(
+      possibleSources.map(async (source) => {
+        const isObsolete = await isCacheObsolete(
+          source,
+          caches.find((s) => s.source === source),
+        );
+        return isObsolete ? source : null;
+      })
+    );
+    return obsoleteSources.filter((source): source is MemberEnrichmentSource => source !== null);
+  } catch (error) {
+    svc.log.error('Error checking obsolete sources', { error, memberId });
+    throw error;
+  }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
export async function getObsoleteSourcesOfMember(
memberId: string,
possibleSources: MemberEnrichmentSource[],
): Promise<MemberEnrichmentSource[]> {
const caches = await findMemberEnrichmentCacheForAllSources(memberId, true)
const obsoleteSources = possibleSources.filter((source) =>
isCacheObsolete(
source,
caches.find((s) => s.source === source),
),
)
return obsoleteSources
}
export async function getObsoleteSourcesOfMember(
memberId: string,
possibleSources: MemberEnrichmentSource[],
): Promise<MemberEnrichmentSource[]> {
try {
const caches = await findMemberEnrichmentCacheForAllSources(memberId, true);
const obsoleteSources = await Promise.all(
possibleSources.map(async (source) => {
const isObsolete = await isCacheObsolete(
source,
caches.find((s) => s.source === source),
);
return isObsolete ? source : null;
})
);
return obsoleteSources.filter((source): source is MemberEnrichmentSource => source !== null);
} catch (error) {
svc.log.error('Error checking obsolete sources', { error, memberId });
throw error;
}
}


export async function refreshMemberEnrichmentMaterializedView(mvName: string): Promise<void> {
await refreshMaterializedView(svc.postgres.writer.connection(), mvName)
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import {

import { EnrichmentSourceServiceFactory } from '../factory'
import { svc } from '../main'
import { IEnrichmentService } from '../types'

import { getEnrichmentInput, getObsoleteSourcesOfMember } from './enrichment'

export async function getEnrichableMembers(
limit: number,
Expand All @@ -27,3 +30,43 @@ export async function getEnrichableMembers(

return rows
}

// Get the most strict parallelism among existing and enrichable sources
// If current members are enrichable by multiple sources, we will use the min(maxConcurrentRequests) among sources
export async function getMaxConcurrentRequests(
members: IEnrichableMember[],
possibleSources: MemberEnrichmentSource[],
): Promise<number> {
const serviceMap: Partial<Record<MemberEnrichmentSource, IEnrichmentService>> = {}
const distinctEnrichableSources = new Set<MemberEnrichmentSource>()

for (const source of possibleSources) {
serviceMap[source] = EnrichmentSourceServiceFactory.getEnrichmentSourceService(source, svc.log)
}
for (const member of members) {
const enrichmentInput = await getEnrichmentInput(member)
const obsoleteSources = await getObsoleteSourcesOfMember(member.id, possibleSources)

Object.keys(serviceMap).forEach(async (source) => {
if (
(await serviceMap[source].isEnrichableBySource(enrichmentInput)) &&
(obsoleteSources as string[]).includes(source)
) {
distinctEnrichableSources.add(source as MemberEnrichmentSource)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Logical error: Exclude obsolete sources instead of including them

In the if condition, you're checking if the source is included in obsoleteSources. This means you're adding sources that are obsolete, which might not be the intended behavior. To add only non-obsolete sources, the condition should check for sources not included in obsoleteSources.

Apply this diff to fix the logical condition:

-        (obsoleteSources as string[]).includes(source)
+        !(obsoleteSources as string[]).includes(source)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
(await serviceMap[source].isEnrichableBySource(enrichmentInput)) &&
(obsoleteSources as string[]).includes(source)
) {
distinctEnrichableSources.add(source as MemberEnrichmentSource)
(await serviceMap[source].isEnrichableBySource(enrichmentInput)) &&
!(obsoleteSources as string[]).includes(source)
) {
distinctEnrichableSources.add(source as MemberEnrichmentSource)

}
})
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Critical: Fix async/await usage in forEach loop

The current implementation has two issues:

  1. Using async/await within forEach doesn't properly wait for promises
  2. The obsolete sources check has incorrect logic

Apply this fix:

-    Object.keys(serviceMap).forEach(async (source) => {
-      if (
-        (await serviceMap[source].isEnrichableBySource(enrichmentInput)) &&
-        (obsoleteSources as string[]).includes(source)
-      ) {
-        distinctEnrichableSources.add(source as MemberEnrichmentSource)
-      }
-    })
+    for (const source of Object.keys(serviceMap)) {
+      if (
+        (await serviceMap[source].isEnrichableBySource(enrichmentInput)) &&
+        !(obsoleteSources as string[]).includes(source)
+      ) {
+        distinctEnrichableSources.add(source as MemberEnrichmentSource)
+      }
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Object.keys(serviceMap).forEach(async (source) => {
if (
(await serviceMap[source].isEnrichableBySource(enrichmentInput)) &&
(obsoleteSources as string[]).includes(source)
) {
distinctEnrichableSources.add(source as MemberEnrichmentSource)
}
})
}
for (const source of Object.keys(serviceMap)) {
if (
(await serviceMap[source].isEnrichableBySource(enrichmentInput)) &&
!(obsoleteSources as string[]).includes(source)
) {
distinctEnrichableSources.add(source as MemberEnrichmentSource)
}
}
}


let smallestMaxConcurrentRequests = Infinity

Array.from(distinctEnrichableSources).forEach(async (source) => {
smallestMaxConcurrentRequests = Math.min(
smallestMaxConcurrentRequests,
serviceMap[source].maxConcurrentRequests,
)
})

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid unnecessary async in forEach callback

The forEach callback is marked as async, but it doesn't contain any await expressions. Moreover, using async with forEach does not behave as expected because forEach does not wait for Promises. Since the code inside the loop is synchronous, you can remove the async keyword.

Apply this diff:

-  Array.from(distinctEnrichableSources).forEach(async (source) => {
+  Array.from(distinctEnrichableSources).forEach((source) => {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Array.from(distinctEnrichableSources).forEach(async (source) => {
smallestMaxConcurrentRequests = Math.min(
smallestMaxConcurrentRequests,
serviceMap[source].maxConcurrentRequests,
)
})
Array.from(distinctEnrichableSources).forEach((source) => {
smallestMaxConcurrentRequests = Math.min(
smallestMaxConcurrentRequests,
serviceMap[source].maxConcurrentRequests,
)
})

svc.log.info('Setting max concurrent requests', { smallestMaxConcurrentRequests })

return smallestMaxConcurrentRequests
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ export default class EnrichmentServiceClearbit extends LoggerBase implements IEn

public enrichableBySql = `"membersGlobalActivityCount".total_count > ${this.enrichMembersWithActivityMoreThan} AND mi.type = 'email' and mi.verified`

public maxConcurrentRequests = 15

// bust cache after 120 days
public cacheObsoleteAfterSeconds = 60 * 60 * 24 * 120

Expand All @@ -57,7 +59,11 @@ export default class EnrichmentServiceClearbit extends LoggerBase implements IEn
}

async isEnrichableBySource(input: IEnrichmentSourceInput): Promise<boolean> {
return !!input.email?.value && input.email?.verified
return (
input.activityCount > this.enrichMembersWithActivityMoreThan &&
!!input.email?.value &&
input.email?.verified
)
Comment on lines +62 to +66
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Remove unnecessary async and add input validation.

The method is marked as async but contains no asynchronous operations. Additionally, consider adding input validation:

-  async isEnrichableBySource(input: IEnrichmentSourceInput): Promise<boolean> {
+  isEnrichableBySource(input: IEnrichmentSourceInput): boolean {
+    if (!input?.activityCount || !input?.email) {
+      return false;
+    }
+
     return (
       input.activityCount > this.enrichMembersWithActivityMoreThan &&
       !!input.email?.value &&
       input.email?.verified
     )
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
return (
input.activityCount > this.enrichMembersWithActivityMoreThan &&
!!input.email?.value &&
input.email?.verified
)
isEnrichableBySource(input: IEnrichmentSourceInput): boolean {
if (!input?.activityCount || !input?.email) {
return false;
}
return (
input.activityCount > this.enrichMembersWithActivityMoreThan &&
!!input.email?.value &&
input.email?.verified
)
}

}

async getData(input: IEnrichmentSourceInput): Promise<IMemberEnrichmentDataClearbit | null> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ export default class EnrichmentServiceCrustdata extends LoggerBase implements IE

public cacheObsoleteAfterSeconds = 60 * 60 * 24 * 90

public maxConcurrentRequests = 5

public attributeSettings: IMemberEnrichmentAttributeSettings = {
[MemberAttributeName.AVATAR_URL]: {
fields: ['profile_picture_url'],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ export default class EnrichmentServiceProgAILinkedinScraper

public cacheObsoleteAfterSeconds = 60 * 60 * 24 * 90

public maxConcurrentRequests = 1000

Comment on lines +40 to +41
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

⚠️ Potential issue

Based on the comprehensive search results, I can now provide a final response:

The concurrent request limit of 1000 is too high and should be reduced

The high limit of 1000 concurrent requests for the ProgAI LinkedIn scraper service is concerning because:

  1. It's significantly higher than other enrichment services in the codebase (Clearbit: 15, Crustdata: 5, SerpApi: 300)
  2. The service makes requests to LinkedIn profiles via ProgAI's scraper, which could trigger LinkedIn's rate limiting
  3. No documentation or configuration exists to justify this high limit
  4. The service uses a shared Redis-based rate limiter implementation, suggesting consistency with other services is important

Consider:

  • Reducing the limit to 300 or lower to align with other services
  • Making the limit configurable via CROWD_ENRICHMENT_PROGAI_MAX_CONCURRENT environment variable
  • Adding documentation about rate limiting considerations
🔗 Analysis chain

Review the concurrent request limit value.

The maxConcurrentRequests value of 1000 seems significantly higher compared to other enrichment services (Clearbit: 15, Crustdata: 5, SerpApi: 300). This high limit might:

  1. Overwhelm the ProgAI LinkedIn scraper service
  2. Trigger rate limiting or IP blocking from LinkedIn
  3. Consume excessive system resources

Let's verify if there are any rate limiting configurations or documentation:

Consider:

  1. Aligning the limit with other services (300 or lower)
  2. Adding rate limiting documentation
  3. Making this configurable via environment variables

Based on the search results, I can see that:

  1. The codebase has a rate limiting mechanism implemented through Redis
  2. Other enrichment services have much lower concurrent request limits:
    • Clearbit: 15
    • Crustdata: 5
    • SerpApi: 300
  3. Both ProgAI services (regular and LinkedIn scraper) are set to 1000 concurrent requests
  4. The system has a ConcurrentRequestLimiter class that enforces these limits

Let me verify the ProgAI service configuration and documentation:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for rate limiting related configurations or documentation
rg -i "rate.?limit|concurrent.?requests" --type md
rg "CROWD_ENRICHMENT_PROGAI" .env* 

Length of output: 135


Script:

#!/bin/bash
# Let's search for rate limiting configurations and usage across the codebase
rg -i "maxconcurrentrequests" -A 3 -B 3

# Search for any ProgAI configuration or documentation
fd -e md -e ts -e js -x rg -l "progai|prog.?ai"

# Look for similar enrichment services to compare configurations
ast-grep --pattern 'maxConcurrentRequests = $_'

Length of output: 79520


Script:

#!/bin/bash
# Check for ProgAI configuration files and documentation
rg -i "progai|prog.?ai" -A 5 -B 5 services/apps/premium/members_enrichment_worker/src/sources/progai-linkedin-scraper/

# Look for any rate limiting documentation in README files
fd -e md -x rg -l "rate.?limit|concurrent.?requests|progai"

Length of output: 12715

constructor(public readonly log: Logger) {
super(log)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ export default class EnrichmentServiceProgAI extends LoggerBase implements IEnri
public source: MemberEnrichmentSource = MemberEnrichmentSource.PROGAI
public platform = `enrichment-${this.source}`

enrichableBySql = `mi.verified and ((mi.type = 'username' AND mi.platform = 'github') OR (mi.type = 'email'))`
public enrichableBySql = `mi.verified and ((mi.type = 'username' AND mi.platform = 'github') OR (mi.type = 'email'))`

// bust cache after 90 days
public cacheObsoleteAfterSeconds = 60 * 60 * 24 * 90

public maxConcurrentRequests = 1000

Comment on lines +41 to +42
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Verify the high concurrent request limit.

The maxConcurrentRequests value of 1000 is significantly higher than other enrichment services (Clearbit: 15, Crustdata: 5, SerpApi: 300). This could potentially:

  1. Overwhelm the ProgAI API
  2. Trigger rate limiting
  3. Impact system resources

Please verify:

  • ProgAI API's documented rate limits
  • System resource capacity
  • Historical concurrent request patterns

Consider:

  1. Aligning with other services' more conservative limits
  2. Adding rate limit handling in getData methods
  3. Making this configurable via environment variables
-  public maxConcurrentRequests = 1000
+  public maxConcurrentRequests = parseInt(process.env.PROGAI_MAX_CONCURRENT_REQUESTS || '100', 10)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public maxConcurrentRequests = 1000
public maxConcurrentRequests = parseInt(process.env.PROGAI_MAX_CONCURRENT_REQUESTS || '100', 10)

public attributeSettings: IMemberEnrichmentAttributeSettings = {
[MemberAttributeName.AVATAR_URL]: {
fields: ['profile_pic_url'],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ export default class EnrichmentServiceSerpApi extends LoggerBase implements IEnr
// bust cache after 120 days
public cacheObsoleteAfterSeconds = 60 * 60 * 24 * 120

public maxConcurrentRequests = 300

constructor(public readonly log: Logger) {
super(log)
}
Expand Down
3 changes: 3 additions & 0 deletions services/apps/premium/members_enrichment_worker/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ export interface IEnrichmentService {
// cache rows with older updatedAt than this will be considered obsolete and will be re-enriched
cacheObsoleteAfterSeconds: number

// max concurrent requests that can be made to the source
maxConcurrentRequests: number

// can the source enrich using this input
isEnrichableBySource(input: IEnrichmentSourceInput): Promise<boolean>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
import { proxyActivities } from '@temporalio/workflow'

import {
IEnrichableMember,
MemberEnrichmentSource,
MemberIdentityType,
PlatformType,
} from '@crowd/types'
import { IEnrichableMember, MemberEnrichmentSource } from '@crowd/types'

import * as activities from '../activities'
import { IEnrichmentSourceInput } from '../types'
Expand All @@ -19,7 +14,7 @@ const {
updateMemberEnrichmentCache,
isCacheObsolete,
normalizeEnrichmentData,
findMemberIdentityWithTheMostActivityInPlatform,
getEnrichmentInput,
} = proxyActivities<typeof activities>({
startToCloseTimeout: '1 minute',
retry: {
Expand All @@ -42,44 +37,7 @@ export async function enrichMember(

// cache is obsolete when it's not found or cache.updatedAt is older than cacheObsoleteAfterSeconds
if (await isCacheObsolete(source, cache)) {
const enrichmentInput: IEnrichmentSourceInput = {
memberId: input.id,
email: input.identities.find((i) => i.verified && i.type === MemberIdentityType.EMAIL),
linkedin: input.identities.find(
(i) =>
i.verified &&
i.platform === PlatformType.LINKEDIN &&
i.type === MemberIdentityType.USERNAME,
),
displayName: input.displayName || undefined,
website: input.website || undefined,
location: input.location || undefined,
activityCount: input.activityCount || 0,
}

// there can be multiple verified identities in github, we select the one with the most activities
const verifiedGithubIdentities = input.identities.filter(
(i) =>
i.verified &&
i.platform === PlatformType.GITHUB &&
i.type === MemberIdentityType.USERNAME,
)

if (verifiedGithubIdentities.length > 1) {
const ghIdentityWithTheMostActivities =
await findMemberIdentityWithTheMostActivityInPlatform(input.id, PlatformType.GITHUB)
if (ghIdentityWithTheMostActivities) {
enrichmentInput.github = input.identities.find(
(i) =>
i.verified &&
i.platform === PlatformType.GITHUB &&
i.type === MemberIdentityType.USERNAME &&
i.value === ghIdentityWithTheMostActivities.username,
)
}
} else {
enrichmentInput.github = verifiedGithubIdentities?.[0] || undefined
}
const enrichmentInput: IEnrichmentSourceInput = await getEnrichmentInput(input)

const data = await getEnrichmentData(source, enrichmentInput)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@ import { chunkArray } from '../utils/common'

import { enrichMember } from './enrichMember'

const { getEnrichableMembers } = proxyActivities<typeof activities>({
const { getEnrichableMembers, getMaxConcurrentRequests } = proxyActivities<typeof activities>({
startToCloseTimeout: '2 minutes',
})

export async function getMembersToEnrich(args: IGetMembersForEnrichmentArgs): Promise<void> {
const QUERY_FOR_ENRICHABLE_MEMBERS_PER_RUN = 1000
const PARALLEL_ENRICHMENT_WORKFLOWS = 5
const afterCursor = args?.afterCursor || null
const sources = [
MemberEnrichmentSource.PROGAI,
Expand All @@ -40,7 +39,9 @@ export async function getMembersToEnrich(args: IGetMembersForEnrichmentArgs): Pr
return
}

const chunks = chunkArray<IEnrichableMember>(members, PARALLEL_ENRICHMENT_WORKFLOWS)
const parallelEnrichmentWorkflows = await getMaxConcurrentRequests(members, sources)

const chunks = chunkArray<IEnrichableMember>(members, parallelEnrichmentWorkflows)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add validation for concurrent requests value

The code assumes getMaxConcurrentRequests always returns a positive number. Consider adding validation to prevent potential issues.

-  const parallelEnrichmentWorkflows = await getMaxConcurrentRequests(members, sources)
+  const parallelEnrichmentWorkflows = await getMaxConcurrentRequests(members, sources)
+  if (!parallelEnrichmentWorkflows || parallelEnrichmentWorkflows < 1) {
+    throw new Error('Invalid concurrent requests value. Must be a positive number.')
+  }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const parallelEnrichmentWorkflows = await getMaxConcurrentRequests(members, sources)
const chunks = chunkArray<IEnrichableMember>(members, parallelEnrichmentWorkflows)
const parallelEnrichmentWorkflows = await getMaxConcurrentRequests(members, sources)
if (!parallelEnrichmentWorkflows || parallelEnrichmentWorkflows < 1) {
throw new Error('Invalid concurrent requests value. Must be a positive number.')
}
const chunks = chunkArray<IEnrichableMember>(members, parallelEnrichmentWorkflows)


for (const chunk of chunks) {
await Promise.all(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,13 +523,15 @@ export async function findMemberEnrichmentCacheDb<T>(
export async function findMemberEnrichmentCacheForAllSourcesDb<T>(
tx: DbConnOrTx,
memberId: string,
returnRowsWithoutData = false,
): Promise<IMemberEnrichmentCache<T>[]> {
const dataFilter = returnRowsWithoutData ? '' : 'and data is not null'
const result = await tx.manyOrNone(
`
select *
from "memberEnrichmentCache"
where
"memberId" = $(memberId) and data is not null;
"memberId" = $(memberId) ${dataFilter};
`,
{ memberId },
)
Expand Down
Loading