-
Notifications
You must be signed in to change notification settings - Fork 752
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enrichment monitoring fixes #2682
Changes from all commits
7ace0d2
08677f0
2ffa4a4
f1e1532
5959585
03d8a43
594574c
9f08e83
e3a4fb3
3cb5ef3
1980bd9
3788cc0
877368b
50aedb7
761611b
d893c8f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,9 +9,12 @@ import { | |
import { refreshMaterializedView } from '@crowd/data-access-layer/src/utils' | ||
import { RedisCache } from '@crowd/redis' | ||
import { | ||
IEnrichableMember, | ||
IEnrichableMemberIdentityActivityAggregate, | ||
IMemberEnrichmentCache, | ||
MemberEnrichmentSource, | ||
MemberIdentityType, | ||
PlatformType, | ||
} from '@crowd/types' | ||
|
||
import { EnrichmentSourceServiceFactory } from '../factory' | ||
|
@@ -41,6 +44,51 @@ export async function getEnrichmentData( | |
return null | ||
} | ||
|
||
export async function getEnrichmentInput( | ||
input: IEnrichableMember, | ||
): Promise<IEnrichmentSourceInput> { | ||
const enrichmentInput: IEnrichmentSourceInput = { | ||
memberId: input.id, | ||
email: input.identities.find((i) => i.verified && i.type === MemberIdentityType.EMAIL), | ||
linkedin: input.identities.find( | ||
(i) => | ||
i.verified && | ||
i.platform === PlatformType.LINKEDIN && | ||
i.type === MemberIdentityType.USERNAME, | ||
), | ||
displayName: input.displayName || undefined, | ||
website: input.website || undefined, | ||
location: input.location || undefined, | ||
activityCount: input.activityCount || 0, | ||
} | ||
|
||
// there can be multiple verified identities in github, we select the one with the most activities | ||
const verifiedGithubIdentities = input.identities.filter( | ||
(i) => | ||
i.verified && i.platform === PlatformType.GITHUB && i.type === MemberIdentityType.USERNAME, | ||
) | ||
|
||
if (verifiedGithubIdentities.length > 1) { | ||
const ghIdentityWithTheMostActivities = await findMemberIdentityWithTheMostActivityInPlatform( | ||
input.id, | ||
PlatformType.GITHUB, | ||
) | ||
if (ghIdentityWithTheMostActivities) { | ||
enrichmentInput.github = input.identities.find( | ||
(i) => | ||
i.verified && | ||
i.platform === PlatformType.GITHUB && | ||
i.type === MemberIdentityType.USERNAME && | ||
i.value === ghIdentityWithTheMostActivities.username, | ||
) | ||
} | ||
} else { | ||
enrichmentInput.github = verifiedGithubIdentities?.[0] || undefined | ||
} | ||
|
||
return enrichmentInput | ||
} | ||
|
||
export async function normalizeEnrichmentData( | ||
source: MemberEnrichmentSource, | ||
data: IMemberEnrichmentData, | ||
|
@@ -53,6 +101,13 @@ export async function isCacheObsolete( | |
source: MemberEnrichmentSource, | ||
cache: IMemberEnrichmentCache<IMemberEnrichmentData>, | ||
): Promise<boolean> { | ||
return isCacheObsoleteSync(source, cache) | ||
} | ||
|
||
export function isCacheObsoleteSync( | ||
source: MemberEnrichmentSource, | ||
cache: IMemberEnrichmentCache<IMemberEnrichmentData>, | ||
): boolean { | ||
const service = EnrichmentSourceServiceFactory.getEnrichmentSourceService(source, svc.log) | ||
return ( | ||
!cache || | ||
|
@@ -104,8 +159,13 @@ export async function findMemberEnrichmentCache( | |
|
||
export async function findMemberEnrichmentCacheForAllSources( | ||
memberId: string, | ||
returnRowsWithoutData = false, | ||
): Promise<IMemberEnrichmentCache<IMemberEnrichmentData>[]> { | ||
return findMemberEnrichmentCacheForAllSourcesDb(svc.postgres.reader.connection(), memberId) | ||
return findMemberEnrichmentCacheForAllSourcesDb( | ||
svc.postgres.reader.connection(), | ||
memberId, | ||
returnRowsWithoutData, | ||
) | ||
} | ||
|
||
export async function insertMemberEnrichmentCache( | ||
|
@@ -138,6 +198,20 @@ export async function findMemberIdentityWithTheMostActivityInPlatform( | |
return findMemberIdentityWithTheMostActivityInPlatformQuestDb(svc.questdbSQL, memberId, platform) | ||
} | ||
|
||
export async function getObsoleteSourcesOfMember( | ||
memberId: string, | ||
possibleSources: MemberEnrichmentSource[], | ||
): Promise<MemberEnrichmentSource[]> { | ||
const caches = await findMemberEnrichmentCacheForAllSources(memberId, true) | ||
const obsoleteSources = possibleSources.filter((source) => | ||
isCacheObsoleteSync( | ||
source, | ||
caches.find((c) => c.source === source), | ||
), | ||
) | ||
return obsoleteSources | ||
} | ||
Comment on lines
+201
to
+213
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Add error handling for database operations. The function should handle potential database errors when calling export async function getObsoleteSourcesOfMember(
memberId: string,
possibleSources: MemberEnrichmentSource[],
): Promise<MemberEnrichmentSource[]> {
+ try {
const caches = await findMemberEnrichmentCacheForAllSources(memberId, true)
const obsoleteSources = possibleSources.filter((source) =>
isCacheObsoleteSync(
source,
caches.find((c) => c.source === source),
),
)
return obsoleteSources
+ } catch (error) {
+ svc.log.error('Failed to get obsolete sources', { error, memberId });
+ throw error;
+ }
}
|
||
|
||
export async function refreshMemberEnrichmentMaterializedView(mvName: string): Promise<void> { | ||
await refreshMaterializedView(svc.postgres.writer.connection(), mvName) | ||
} |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -7,11 +7,11 @@ import { | |||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
import { EnrichmentSourceServiceFactory } from '../factory' | ||||||||||||||||||||||||||||||||||||||
import { svc } from '../main' | ||||||||||||||||||||||||||||||||||||||
import { IEnrichmentService } from '../types' | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
export async function getEnrichableMembers( | ||||||||||||||||||||||||||||||||||||||
limit: number, | ||||||||||||||||||||||||||||||||||||||
sources: MemberEnrichmentSource[], | ||||||||||||||||||||||||||||||||||||||
afterCursor: { activityCount: number; memberId: string } | null, | ||||||||||||||||||||||||||||||||||||||
): Promise<IEnrichableMember[]> { | ||||||||||||||||||||||||||||||||||||||
let rows: IEnrichableMember[] = [] | ||||||||||||||||||||||||||||||||||||||
const sourceInputs: IMemberEnrichmentSourceQueryInput[] = sources.map((s) => { | ||||||||||||||||||||||||||||||||||||||
|
@@ -23,7 +23,34 @@ export async function getEnrichableMembers( | |||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||
const db = svc.postgres.reader | ||||||||||||||||||||||||||||||||||||||
rows = await fetchMembersForEnrichment(db, limit, sourceInputs, afterCursor) | ||||||||||||||||||||||||||||||||||||||
rows = await fetchMembersForEnrichment(db, limit, sourceInputs) | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
return rows | ||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
// Get the most strict parallelism among existing and enrichable sources | ||||||||||||||||||||||||||||||||||||||
// We only check sources that has activity count cutoff in current range | ||||||||||||||||||||||||||||||||||||||
export async function getMaxConcurrentRequests( | ||||||||||||||||||||||||||||||||||||||
members: IEnrichableMember[], | ||||||||||||||||||||||||||||||||||||||
possibleSources: MemberEnrichmentSource[], | ||||||||||||||||||||||||||||||||||||||
concurrencyLimit: number, | ||||||||||||||||||||||||||||||||||||||
): Promise<number> { | ||||||||||||||||||||||||||||||||||||||
const serviceMap: Partial<Record<MemberEnrichmentSource, IEnrichmentService>> = {} | ||||||||||||||||||||||||||||||||||||||
const currentProcessingActivityCount = members[0].activityCount | ||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add input validation for members array The code assumes the members array is non-empty when accessing the first element. This could lead to runtime errors if the array is empty. + if (!members.length) {
+ throw new Error('Members array cannot be empty');
+ }
const currentProcessingActivityCount = members[0].activityCount 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
let maxConcurrentRequestsInAllSources = concurrencyLimit | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
for (const source of possibleSources) { | ||||||||||||||||||||||||||||||||||||||
serviceMap[source] = EnrichmentSourceServiceFactory.getEnrichmentSourceService(source, svc.log) | ||||||||||||||||||||||||||||||||||||||
const activityCountCutoff = serviceMap[source].enrichMembersWithActivityMoreThan | ||||||||||||||||||||||||||||||||||||||
if (!activityCountCutoff || activityCountCutoff <= currentProcessingActivityCount) { | ||||||||||||||||||||||||||||||||||||||
maxConcurrentRequestsInAllSources = Math.min( | ||||||||||||||||||||||||||||||||||||||
maxConcurrentRequestsInAllSources, | ||||||||||||||||||||||||||||||||||||||
serviceMap[source].maxConcurrentRequests, | ||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
Comment on lines
+45
to
+51
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Add type assertion and improve error handling for activityCountCutoff The activityCountCutoff value from the service should be validated to ensure it's a valid number. Apply this diff: - const activityCountCutoff = serviceMap[source].enrichMembersWithActivityMoreThan
+ const activityCountCutoff = Number(serviceMap[source].enrichMembersWithActivityMoreThan)
+ if (isNaN(activityCountCutoff)) {
+ svc.log.warn('Invalid activity count cutoff', { source, activityCountCutoff })
+ continue
+ } 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
svc.log.info('Setting max concurrent requests', { maxConcurrentRequestsInAllSources }) | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
return maxConcurrentRequestsInAllSources | ||||||||||||||||||||||||||||||||||||||
} |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -31,6 +31,8 @@ export default class EnrichmentServiceClearbit extends LoggerBase implements IEn | |||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
public enrichableBySql = `"membersGlobalActivityCount".total_count > ${this.enrichMembersWithActivityMoreThan} AND mi.type = 'email' and mi.verified` | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
public maxConcurrentRequests = 15 | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
// bust cache after 120 days | ||||||||||||||||||||||||||||||||||
public cacheObsoleteAfterSeconds = 60 * 60 * 24 * 120 | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
|
@@ -57,7 +59,11 @@ export default class EnrichmentServiceClearbit extends LoggerBase implements IEn | |||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
async isEnrichableBySource(input: IEnrichmentSourceInput): Promise<boolean> { | ||||||||||||||||||||||||||||||||||
return !!input.email?.value && input.email?.verified | ||||||||||||||||||||||||||||||||||
return ( | ||||||||||||||||||||||||||||||||||
input.activityCount > this.enrichMembersWithActivityMoreThan && | ||||||||||||||||||||||||||||||||||
!!input.email?.value && | ||||||||||||||||||||||||||||||||||
input.email?.verified | ||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||
Comment on lines
+62
to
+66
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Remove unnecessary async and add input validation. The method is marked as async but contains no asynchronous operations. Additionally, consider adding input validation: - async isEnrichableBySource(input: IEnrichmentSourceInput): Promise<boolean> {
+ isEnrichableBySource(input: IEnrichmentSourceInput): boolean {
+ if (!input?.activityCount || !input?.email) {
+ return false;
+ }
+
return (
input.activityCount > this.enrichMembersWithActivityMoreThan &&
!!input.email?.value &&
input.email?.verified
)
} 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
async getData(input: IEnrichmentSourceInput): Promise<IMemberEnrichmentDataClearbit | null> { | ||||||||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,6 +37,8 @@ export default class EnrichmentServiceProgAILinkedinScraper | |
|
||
public cacheObsoleteAfterSeconds = 60 * 60 * 24 * 90 | ||
|
||
public maxConcurrentRequests = 1000 | ||
|
||
Comment on lines
+40
to
+41
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Codebase verification Based on the comprehensive search results, I can now provide a final response: The concurrent request limit of 1000 is too high and should be reduced The high limit of 1000 concurrent requests for the ProgAI LinkedIn scraper service is concerning because:
Consider:
🔗 Analysis chainReview the concurrent request limit value. The
Let's verify if there are any rate limiting configurations or documentation: Consider:
Based on the search results, I can see that:
Let me verify the ProgAI service configuration and documentation: 🏁 Scripts executedThe following scripts were executed for the analysis: Script: #!/bin/bash
# Search for rate limiting related configurations or documentation
rg -i "rate.?limit|concurrent.?requests" --type md
rg "CROWD_ENRICHMENT_PROGAI" .env*
Length of output: 135 Script: #!/bin/bash
# Let's search for rate limiting configurations and usage across the codebase
rg -i "maxconcurrentrequests" -A 3 -B 3
# Search for any ProgAI configuration or documentation
fd -e md -e ts -e js -x rg -l "progai|prog.?ai"
# Look for similar enrichment services to compare configurations
ast-grep --pattern 'maxConcurrentRequests = $_'
Length of output: 79520 Script: #!/bin/bash
# Check for ProgAI configuration files and documentation
rg -i "progai|prog.?ai" -A 5 -B 5 services/apps/premium/members_enrichment_worker/src/sources/progai-linkedin-scraper/
# Look for any rate limiting documentation in README files
fd -e md -x rg -l "rate.?limit|concurrent.?requests|progai"
Length of output: 12715 |
||
constructor(public readonly log: Logger) { | ||
super(log) | ||
} | ||
|
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -33,11 +33,13 @@ export default class EnrichmentServiceProgAI extends LoggerBase implements IEnri | |||||||
public source: MemberEnrichmentSource = MemberEnrichmentSource.PROGAI | ||||||||
public platform = `enrichment-${this.source}` | ||||||||
|
||||||||
enrichableBySql = `mi.verified and ((mi.type = 'username' AND mi.platform = 'github') OR (mi.type = 'email'))` | ||||||||
public enrichableBySql = `mi.verified and ((mi.type = 'username' AND mi.platform = 'github') OR (mi.type = 'email'))` | ||||||||
|
||||||||
// bust cache after 90 days | ||||||||
public cacheObsoleteAfterSeconds = 60 * 60 * 24 * 90 | ||||||||
|
||||||||
public maxConcurrentRequests = 1000 | ||||||||
|
||||||||
Comment on lines
+41
to
+42
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Verify the high concurrent request limit. The
Please verify:
Consider:
- public maxConcurrentRequests = 1000
+ public maxConcurrentRequests = parseInt(process.env.PROGAI_MAX_CONCURRENT_REQUESTS || '100', 10) 📝 Committable suggestion
Suggested change
|
||||||||
public attributeSettings: IMemberEnrichmentAttributeSettings = { | ||||||||
[MemberAttributeName.AVATAR_URL]: { | ||||||||
fields: ['profile_pic_url'], | ||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add error handling and input validation.
The function lacks error handling for the
findMemberIdentityWithTheMostActivityInPlatform
call and doesn't validate if verified identities exist before accessing them.Consider applying these improvements: