Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enrichment monitoring fixes #2682

Merged
merged 16 commits into from
Nov 11, 2024
Merged

Enrichment monitoring fixes #2682

merged 16 commits into from
Nov 11, 2024

Conversation

epipav
Copy link
Collaborator

@epipav epipav commented Nov 9, 2024

Changes proposed ✍️

What

copilot:summary

copilot:poem

Why

How

copilot:walkthrough

Checklist ✅

  • Label appropriately with Feature, Improvement, or Bug.
  • Add screenshots to the PR description for relevant FE changes
  • New backend functionality has been unit-tested.
  • API documentation has been updated (if necessary) (see docs on API documentation).
  • Quality standards are met.

Summary by CodeRabbit

  • New Features

    • Enhanced member enrichment monitoring with new materialized views for various data sources.
    • Introduced new functions for managing enrichment inputs and identifying obsolete sources.
    • Added dynamic retrieval of maximum concurrent requests for enrichment processes.
  • Bug Fixes

    • Improved logic for determining enrichable members based on activity counts.
  • Documentation

    • Updated interfaces to include new properties for managing concurrent requests and filtering members.
  • Chores

    • Simplified function signatures and removed unnecessary parameters to enhance maintainability.

Copy link

coderabbitai bot commented Nov 9, 2024

Walkthrough

The pull request introduces significant changes to the member enrichment monitoring system, primarily through the creation and modification of multiple materialized views in the database. It also enhances the functionality of various services related to member enrichment by adding new methods and properties, particularly focusing on managing concurrent requests and enriching member data from different sources. Additionally, several existing functions have been updated to streamline their operations and improve data retrieval processes.

Changes

File Change Summary
backend/src/database/migrations/R__memberEnrichmentMaterializedViews.sql Created and modified materialized views for global member activity counts and specific sources (Clearbit, Crustdata, Progai, Serp) to enhance member enrichment monitoring.
services/apps/premium/members_enrichment_worker/src/activities.ts Added imports for getMaxConcurrentRequests, getEnrichmentInput, and getObsoleteSourcesOfMember, and included them in the export section.
services/apps/premium/members_enrichment_worker/src/activities/enrichment.ts Introduced getEnrichmentInput and getObsoleteSourcesOfMember functions, modified findMemberEnrichmentCacheForAllSources to include a new parameter, and refactored isCacheObsolete.
services/apps/premium/members_enrichment_worker/src/activities/getMembers.ts Added getMaxConcurrentRequests function and modified getEnrichableMembers by removing the afterCursor parameter.
services/apps/premium/members_enrichment_worker/src/sources/clearbit/service.ts Added maxConcurrentRequests property and updated isEnrichableBySource to include activity count checks.
services/apps/premium/members_enrichment_worker/src/sources/crustdata/service.ts Added maxConcurrentRequests property with a default value of 5.
services/apps/premium/members_enrichment_worker/src/sources/progai-linkedin-scraper/service.ts Added maxConcurrentRequests property with a value of 1000.
services/apps/premium/members_enrichment_worker/src/sources/progai/service.ts Changed visibility of enrichableBySql to public and added maxConcurrentRequests property with a default value of 1000.
services/apps/premium/members_enrichment_worker/src/sources/serp/service.ts Added maxConcurrentRequests property with a value of 300.
services/apps/premium/members_enrichment_worker/src/types.ts Added maxConcurrentRequests and enrichMembersWithActivityMoreThan properties to IEnrichmentService, removed IGetMembersForEnrichmentArgs interface.
services/apps/premium/members_enrichment_worker/src/workflows/enrichMember.ts Removed imports for MemberIdentityType and PlatformType, replaced manual construction of enrichmentInput with a call to getEnrichmentInput.
services/apps/premium/members_enrichment_worker/src/workflows/getMembersToEnrich.ts Added getMaxConcurrentRequests to dynamically determine the number of concurrent requests and simplified the function signature by removing args.
services/libs/data-access-layer/src/old/apps/premium/members_enrichment_worker/index.ts Added returnRowsWithoutData parameter to findMemberEnrichmentCacheForAllSourcesDb, modified SQL queries accordingly, and removed cursor logic from fetchMembersForEnrichment.
services/apps/premium/members_enrichment_worker/src/schedules/getMembersToEnrich.ts Updated args property in scheduleMembersEnrichment to an empty array, indicating no arguments will be passed to the workflow when scheduled.

Possibly related PRs

Suggested labels

Improvement

Poem

🐇 In the fields where members play,
New views and functions come to stay.
With concurrent requests in sight,
Enrichment flows, oh what a delight!
So hop along, let's cheer and sing,
For data's dance is a wondrous thing! 🎉


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@epipav epipav self-assigned this Nov 10, 2024
@epipav epipav added the Bug Created by Linear-GitHub Sync label Nov 10, 2024
@epipav epipav marked this pull request as ready for review November 10, 2024 12:28
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

🧹 Outside diff range and nitpick comments (14)
services/apps/premium/members_enrichment_worker/src/workflows/enrichMember.ts (2)

Line range hint 71-71: Consider implementing the data squasher TODO.

The TODO comment indicates a missing implementation for data squashing using LLM. This seems like a critical component for the enrichment process.

Would you like me to help create a GitHub issue to track the implementation of the LLM-based data squasher?


Line range hint 33-56: Consider refactoring cache management logic.

The cache management logic could be simplified and made more maintainable. Consider extracting the cache operations into a separate helper function to improve readability and reduce duplication.

Here's a suggested refactor:

+async function handleCacheUpdate(
+  source: MemberEnrichmentSource,
+  memberId: string,
+  cache: any,
+  data: any
+): Promise<boolean> {
+  if (!cache) {
+    await insertMemberEnrichmentCache(source, memberId, data)
+    return !!data
+  }
+  
+  if (sourceHasDifferentDataComparedToCache(cache, data)) {
+    await updateMemberEnrichmentCache(source, memberId, data)
+    return true
+  }
+  
+  await touchMemberEnrichmentCacheUpdatedAt(source, memberId)
+  return false
+}

 export async function enrichMember(
   input: IEnrichableMember,
   sources: MemberEnrichmentSource[],
 ): Promise<void> {
   let changeInEnrichmentSourceData = false

   for (const source of sources) {
     const cache = await findMemberEnrichmentCache(source, input.id)

     if (await isCacheObsolete(source, cache)) {
       const enrichmentInput: IEnrichmentSourceInput = await getEnrichmentInput(input)
       const data = await getEnrichmentData(source, enrichmentInput)
-      if (!cache) {
-        await insertMemberEnrichmentCache(source, input.id, data)
-        if (data) {
-          changeInEnrichmentSourceData = true
-        }
-      } else if (sourceHasDifferentDataComparedToCache(cache, data)) {
-        await updateMemberEnrichmentCache(source, input.id, data)
-        changeInEnrichmentSourceData = true
-      } else {
-        await touchMemberEnrichmentCacheUpdatedAt(source, input.id)
-      }
+      const didUpdate = await handleCacheUpdate(source, input.id, cache, data)
+      changeInEnrichmentSourceData = changeInEnrichmentSourceData || didUpdate
     }
   }
services/apps/premium/members_enrichment_worker/src/types.ts (1)

44-46: Consider enhancing the maxConcurrentRequests property implementation.

While the addition is well-placed and documented, consider these improvements:

  1. This is a breaking change that requires all implementations of IEnrichmentService to provide this property.
  2. The property would benefit from validation to ensure positive numbers.

Consider applying these enhancements:

-  // max concurrent requests that can be made to the source
-  maxConcurrentRequests: number
+  /**
+   * Maximum number of concurrent requests that can be made to the enrichment source.
+   * @minimum 1
+   * @example
+   * // Clearbit: 5 concurrent requests
+   * maxConcurrentRequests: 5
+   */
+  maxConcurrentRequests: number

Also, consider adding runtime validation in the service implementations:

if (maxConcurrentRequests < 1) {
  throw new Error('maxConcurrentRequests must be greater than 0');
}
services/apps/premium/members_enrichment_worker/src/sources/serp/service.ts (1)

35-35: Consider implementing a dynamic concurrency limit.

Since this service checks remaining credits via hasRemainingCredits(), consider making the maxConcurrentRequests dynamic based on available credits to prevent rapid depletion.

Example implementation:

private baseMaxConcurrentRequests = 300;

public get maxConcurrentRequests(): number {
  // Reduce concurrency when credits are low
  const remainingCredits = await this.hasRemainingCredits();
  if (remainingCredits < 1000) {
    return Math.min(50, this.baseMaxConcurrentRequests);
  }
  return this.baseMaxConcurrentRequests;
}
services/apps/premium/members_enrichment_worker/src/sources/clearbit/service.ts (1)

34-35: Document and consider making maxConcurrentRequests configurable.

While the concurrent request limit is a good addition for rate limiting, consider:

  1. Adding a comment explaining the rationale behind the value of 15
  2. Making it configurable through environment variables to easily adjust based on API limits or performance requirements
+  // Maximum number of concurrent requests to Clearbit API to prevent rate limiting
+  public maxConcurrentRequests = process.env.CLEARBIT_MAX_CONCURRENT_REQUESTS 
+    ? parseInt(process.env.CLEARBIT_MAX_CONCURRENT_REQUESTS, 10)
+    : 15
services/apps/premium/members_enrichment_worker/src/sources/progai-linkedin-scraper/service.ts (1)

40-41: Consider adding resilience patterns for concurrent request handling.

Given the high concurrent request limit and external API dependencies, consider implementing:

  1. Circuit breaker pattern to handle API failures gracefully
  2. Retry mechanism with exponential backoff
  3. Request queuing to smooth out traffic spikes

Example implementation approach:

import { CircuitBreaker } from 'opossum';

// Configure circuit breaker
private breaker = new CircuitBreaker(this.getDataUsingLinkedinHandle, {
  timeout: 30000, // 30 seconds
  errorThresholdPercentage: 50,
  resetTimeout: 30000
});

// Add retry logic
private async getDataWithRetry(handle: string): Promise<IMemberEnrichmentDataProgAI | null> {
  return await retry(
    async () => this.breaker.fire(handle),
    {
      retries: 3,
      factor: 2,
      minTimeout: 1000,
      maxTimeout: 5000
    }
  );
}
services/apps/premium/members_enrichment_worker/src/sources/progai/service.ts (1)

Line range hint 249-282: Add rate limit handling to API calls.

Given the high concurrent request limit, it's important to handle rate limiting gracefully. Consider adding retry logic and rate limit handling to the getData methods.

Example implementation:

private async makeRequest(config: AxiosRequestConfig): Promise<IMemberEnrichmentDataProgAIResponse> {
  const maxRetries = 3;
  const baseDelay = 1000; // 1 second

  for (let attempt = 0; attempt < maxRetries; attempt++) {
    try {
      const response = await axios(config);
      return response.data;
    } catch (error) {
      if (error.response?.status === 429) { // Rate limit exceeded
        const retryAfter = parseInt(error.response.headers['retry-after'] || '1', 10);
        await new Promise(resolve => setTimeout(resolve, retryAfter * 1000 || baseDelay * Math.pow(2, attempt)));
        continue;
      }
      throw error;
    }
  }
  throw new Error('Max retries exceeded');
}

Then update the getData methods to use this helper:

  async getDataUsingGitHubHandle(githubUsername: string): Promise<IMemberEnrichmentDataProgAI> {
    const url = `${process.env['CROWD_ENRICHMENT_PROGAI_URL']}/get_profile`
    const config = {
      method: 'get',
      url,
      params: {
        github_handle: githubUsername,
        with_emails: true,
        api_key: process.env['CROWD_ENRICHMENT_PROGAI_API_KEY'],
      },
      headers: {},
    }

-    const response: IMemberEnrichmentDataProgAIResponse = (await axios(config)).data
+    const response = await this.makeRequest(config)
    return response?.profile || null
  }
services/apps/premium/members_enrichment_worker/src/sources/crustdata/service.ts (1)

50-51: Document and consider making maxConcurrentRequests configurable.

While the value of 5 concurrent requests is significantly lower than other enrichment services (Clearbit: 15, SerpApi: 300), it might be due to Crustdata's API rate limits. Consider:

  1. Adding a comment explaining the rationale behind this specific value
  2. Making it configurable through environment variables to easily adjust in different environments
  3. Adding validation to ensure the value remains positive
+  // Maximum concurrent requests to Crustdata API to respect rate limits
+  public maxConcurrentRequests = process.env.CROWD_ENRICHMENT_CRUSTDATA_MAX_CONCURRENT_REQUESTS
+    ? Math.max(1, parseInt(process.env.CROWD_ENRICHMENT_CRUSTDATA_MAX_CONCURRENT_REQUESTS, 10))
+    : 5
services/libs/data-access-layer/src/old/apps/premium/members_enrichment_worker/index.ts (1)

Line range hint 22-29: Critical: SQL Injection Vulnerability in Dynamic Query Construction

The function fetchMembersForEnrichment constructs SQL queries using string interpolation (${input.source} and ${input.cacheObsoleteAfterSeconds}), which is unsafe and could lead to SQL injection attacks. This is particularly concerning as sourceInputs is an array of IMemberEnrichmentSourceQueryInput that could contain malicious input.

Replace string interpolation with parameterized queries:

-      `
-      ( NOT EXISTS (
-          SELECT 1 FROM "memberEnrichmentCache" mec
-          WHERE mec."memberId" = members.id
-          AND mec.source = '${input.source}'
-          AND EXTRACT(EPOCH FROM (now() - mec."updatedAt")) < ${input.cacheObsoleteAfterSeconds})
-      )`
+      `
+      ( NOT EXISTS (
+          SELECT 1 FROM "memberEnrichmentCache" mec
+          WHERE mec."memberId" = members.id
+          AND mec.source = $${paramIndex}
+          AND EXTRACT(EPOCH FROM (now() - mec."updatedAt")) < $${paramIndex + 1})
+      )`

You'll need to track the parameter index and add the values to the query parameters array.

Also applies to: 41-43

backend/src/database/migrations/R__memberEnrichmentMaterializedViews.sql (3)

Line range hint 17-186: Consider extracting common enrichment criteria into a separate view.

The view contains repeated code blocks for determining enrichable members across different sources. This could be simplified by creating a base view for common enrichment criteria.

Example approach:

-- Create a base view for common enrichment criteria
create materialized view "memberEnrichmentBaseEligibility" as
select distinct mem.id as "memberId",
       case 
         when mi.verified and mi.type = 'email' then true
         when mi.verified and mi.type = 'username' and mi.platform = 'github' then true
         when "membersGlobalActivityCount".total_count > 500 
              and mem."displayName" like '% %'
              and mem.attributes -> 'location' ->> 'default' is not null 
              and mem.attributes -> 'location' ->> 'default' <> '' then true
         else false
       end as is_eligible
from members mem
inner join "memberIdentities" mi on mem.id = mi."memberId"
left join "membersGlobalActivityCount" on "membersGlobalActivityCount"."memberId" = mem.id;

317-318: Extract magic number into a configuration parameter.

The activity count threshold of 1000 is hardcoded in multiple places. Consider using a configurable parameter or a constant view.

Example approach:

create view "enrichmentThresholds" as
select 1000 as "highActivityThreshold",
       500 as "mediumActivityThreshold",
       10 as "lowActivityThreshold";

Also applies to: 363-364


Line range hint 1-644: Consider performance optimizations for materialized views.

  1. Add indexes on commonly queried columns in the materialized views to improve query performance.
  2. Define a refresh strategy for the materialized views to ensure data freshness without impacting system performance.

Example approach:

-- Add indexes after creating each materialized view
create index ix_member_enrichment_monitoring_clearbit_progress 
  on "memberEnrichmentMonitoringClearbit" (progress);

-- Consider implementing a refresh strategy
-- Option 1: Concurrent refresh to avoid blocking reads
refresh materialized view concurrently "memberEnrichmentMonitoringClearbit";

-- Option 2: Create a function to refresh all views in the correct order
create or replace function refresh_enrichment_monitoring_views()
returns void as $$
begin
  refresh materialized view "membersGlobalActivityCount";
  refresh materialized view "memberEnrichmentMonitoringTotal";
  -- ... refresh other views
end;
$$ language plpgsql;
services/apps/premium/members_enrichment_worker/src/activities/getMembers.ts (2)

69-70: Enhance logging information

Consider adding more context to the log message to make debugging easier. Including details like the number of distinct enrichable sources and which sources are being considered can be helpful.

For example:

 svc.log.info('Setting max concurrent requests', {
   smallestMaxConcurrentRequests,
+  distinctEnrichableSources: Array.from(distinctEnrichableSources),
 })

41-42: Use explicit type annotations when possible

While TypeScript can infer types in many cases, being explicit with type annotations can improve code readability and maintainability.

Consider specifying the type for distinctEnrichableSources:

-  const distinctEnrichableSources = new Set<MemberEnrichmentSource>()
+  const distinctEnrichableSources: Set<MemberEnrichmentSource> = new Set()
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between c7919cb and 9f08e83.

📒 Files selected for processing (13)
  • backend/src/database/migrations/R__memberEnrichmentMaterializedViews.sql (2 hunks)
  • services/apps/premium/members_enrichment_worker/src/activities.ts (3 hunks)
  • services/apps/premium/members_enrichment_worker/src/activities/enrichment.ts (4 hunks)
  • services/apps/premium/members_enrichment_worker/src/activities/getMembers.ts (2 hunks)
  • services/apps/premium/members_enrichment_worker/src/sources/clearbit/service.ts (2 hunks)
  • services/apps/premium/members_enrichment_worker/src/sources/crustdata/service.ts (1 hunks)
  • services/apps/premium/members_enrichment_worker/src/sources/progai-linkedin-scraper/service.ts (1 hunks)
  • services/apps/premium/members_enrichment_worker/src/sources/progai/service.ts (1 hunks)
  • services/apps/premium/members_enrichment_worker/src/sources/serp/service.ts (1 hunks)
  • services/apps/premium/members_enrichment_worker/src/types.ts (1 hunks)
  • services/apps/premium/members_enrichment_worker/src/workflows/enrichMember.ts (3 hunks)
  • services/apps/premium/members_enrichment_worker/src/workflows/getMembersToEnrich.ts (2 hunks)
  • services/libs/data-access-layer/src/old/apps/premium/members_enrichment_worker/index.ts (1 hunks)
🔇 Additional comments (14)
services/apps/premium/members_enrichment_worker/src/activities.ts (1)

5-6: LGTM! Verify the usage of new exports.

The changes appropriately extend the module's public API with new enrichment-related functionality while maintaining good practices in import/export organization.

Let's verify the usage of these new exports across the codebase:

Also applies to: 15-15, 59-61

✅ Verification successful

All new exports are properly utilized within the codebase

The verification confirms that all newly exported functions are actively used:

  • getEnrichmentInput: Used in enrichment workflow and member processing
  • getMaxConcurrentRequests: Used in members enrichment workflow
  • getObsoleteSourcesOfMember: Used in member processing logic
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check usage of newly exported functions
# Expected: Find references to these functions in other files to confirm they're being used

echo "Checking usage of getEnrichmentInput..."
rg "getEnrichmentInput\(" --type ts

echo "Checking usage of getMaxConcurrentRequests..."
rg "getMaxConcurrentRequests\(" --type ts

echo "Checking usage of getObsoleteSourcesOfMember..."
rg "getObsoleteSourcesOfMember\(" --type ts

Length of output: 1432

services/apps/premium/members_enrichment_worker/src/workflows/getMembersToEnrich.ts (2)

17-19: LGTM: Clean activity proxy setup

The addition of getMaxConcurrentRequests to the proxied activities follows good practices, with an appropriate timeout for the operation.


42-44: Verify impact of dynamic concurrency on external services

The switch from static to dynamic concurrency could impact external service rate limits and system resources. Please ensure that:

  1. External enrichment services can handle the variable concurrency
  2. System resources are properly monitored
  3. Rate limiting is still effective
✅ Verification successful

Based on the search results, I can now provide a final response since I have sufficient information about rate limiting and concurrency handling in the codebase:

Dynamic concurrency implementation is properly safeguarded

The implementation has robust rate limiting and error handling:

  1. Each enrichment source defines its own maxConcurrentRequests limit:

    • Progai/LinkedIn: 1000 requests
    • SERP: 300 requests
    • Clearbit: 15 requests
    • Crustdata: 5 requests
  2. The getMaxConcurrentRequests function:

    • Takes the minimum concurrent request limit among all active enrichment sources
    • Ensures the chunking size is appropriate for all services involved
  3. Rate limit handling is implemented across services:

    • HTTP 429 responses are properly caught and handled
    • Rate limit reset times are respected
    • Exponential backoff is implemented where needed
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check for rate limit configurations in enrichment services
echo "Checking rate limit configurations..."
rg -A 5 "maxConcurrentRequests|rateLimit" --type typescript

# Look for any error handling related to rate limits
echo "Checking rate limit error handling..."
rg -A 5 "rateLimitExceeded|tooManyRequests|429" --type typescript

Length of output: 369


Script:

#!/bin/bash
# Check for rate limit configurations and error handling with correct file type
echo "Checking rate limit configurations..."
rg -A 5 "maxConcurrentRequests|rateLimit" -t ts

echo "Checking rate limit error handling..."
rg -A 5 "rateLimitExceeded|tooManyRequests|429" -t ts

# Look for the getMaxConcurrentRequests implementation
echo "Checking getMaxConcurrentRequests implementation..."
rg -A 10 "getMaxConcurrentRequests" -t ts

# Look for any enrichment service configurations
echo "Looking for enrichment service configurations..."
rg -A 5 "enrichment.*config|ENRICHMENT.*CONFIG" -t ts

Length of output: 115000

services/apps/premium/members_enrichment_worker/src/workflows/enrichMember.ts (2)

40-40: LGTM! Improved abstraction of enrichment input logic.

The delegation of input construction to getEnrichmentInput improves code organization and maintainability.


3-3: LGTM! Consider verifying the timeout configuration.

The changes to imports and activity proxies look good. The centralization of enrichment input logic through getEnrichmentInput improves code organization.

Let's verify if the 1-minute timeout is sufficient for all enrichment sources:

Also applies to: 17-17

✅ Verification successful

Based on the search results, I can see that the 1-minute timeout configuration for enrichment activities is consistent with other similar operations in the codebase. For example:

  • Export activities use 60 seconds timeout for CSV operations
  • Email activities use 5-10 seconds timeout for simpler operations
  • Integration activities use 2-3 seconds timeout for API calls
  • Script executor uses 10 minutes timeout for long-running operations

Let me verify if there are any timeout-related errors or issues specifically for enrichment operations:


Let me try a different search pattern to find any enrichment-related timeouts or errors:


The 1-minute timeout with retry policy is appropriate for enrichment activities

The timeout configuration is well-aligned with the codebase patterns:

  • The enrichment workflow has a 1-minute timeout with a robust retry policy (4 attempts with exponential backoff)
  • Other enrichment-related activities use similar timeouts:
    • 2 minutes for member enrichment with LF Auth0
    • 2 minutes for getting members to enrich
    • 10 seconds for simpler operations like getting enrichment data

The retry policy (5s initial interval, 2.0 backoff, max 30s interval, 4 attempts) provides good resilience against transient failures.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for any timeout-related configurations or issues in enrichment operations
rg -A 3 "timeout|TIMEOUT" --glob "!{*.md,*.json}"

# Look for any error handling related to timeouts
rg -A 3 "TimeoutError|timeout.*error" --glob "!{*.md,*.json}"

Length of output: 71711


Script:

#!/bin/bash
# Search for timeout errors in enrichment logs/code
rg -A 3 "timeout.*enrich|enrich.*timeout" --glob "!{*.md,*.json}"

# Look for any error handling specific to enrichment timeouts
rg -A 3 "TimeoutError.*enrich|enrich.*TimeoutError" --glob "!{*.md,*.json}"

# Search for any configuration files that might contain enrichment timeout settings
fd -e yaml -e json -e ts -e js -e env "config|settings" --exec grep -l "timeout.*enrich|enrich.*timeout" {} \;

Length of output: 261


Script:

#!/bin/bash
# Search for enrichment activity configurations and their timeout settings
ast-grep --pattern 'proxyActivities<$_>({
  $$$
  startToCloseTimeout: $_,
  $$$
})' services/apps/premium/members_enrichment_worker/src/workflows/

# Look for any error handling in enrichment workflows
rg -A 5 "catch.*error.*enrich|error.*enrich" services/apps/premium/members_enrichment_worker/src/

# Search for any retry policies in enrichment activities
rg -A 5 "retry.*enrich|enrich.*retry" services/apps/premium/members_enrichment_worker/src/

Length of output: 4341

services/apps/premium/members_enrichment_worker/src/sources/serp/service.ts (1)

35-35: ⚠️ Potential issue

Review the concurrent request limit of 300.

The concurrent request limit of 300 seems high for a SERP API service that relies on Google Search. This could:

  • Rapidly consume your SERP API credits
  • Trigger rate limiting from the underlying Google search
  • Lead to increased costs

Consider:

  1. Implementing a more conservative limit initially
  2. Adding gradual scaling based on monitoring results
  3. Implementing exponential backoff for failed requests

Let's verify the SERP API's documentation and current usage:

✅ Verification successful

Let me gather more specific information about the SERP API service implementation and its rate limiting.


Let me check for the actual implementation of request handling and error management in the SERP service.


Let me check one final aspect - the actual SERP API query implementation to understand the request handling and rate limiting.


Based on the gathered evidence, I can now provide a final response about the concurrent request limit concern.

The concurrent request limit of 300 is appropriate and safe.

The implementation shows several safeguards that make this limit reasonable:

  1. The service has built-in credit checking via hasRemainingCredits()
  2. The API includes hourly rate limit tracking (account_rate_limit_per_hour in types)
  3. Requests are naturally throttled by:
    • Cache implementation (120-day cache)
    • Sequential retry logic in getData()
    • Input validation requirements (needs displayName, location, and an identifier)

The high concurrent limit allows for efficient batch processing while the underlying credit and rate limit checks prevent abuse.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for any rate limiting configurations or documentation

# Look for rate limit handling in the codebase
echo "Searching for rate limit handling..."
rg -i "rate.?limit" --type ts

# Look for SERP API related environment variables
echo "Checking for SERP API configuration..."
rg "CROWD_ENRICHMENT_SERP" --type ts

# Look for any error handling related to rate limits
echo "Checking error handling..."
rg -A 5 "serpapi.*error" --type ts

Length of output: 34347


Script:

#!/bin/bash
# Check SERP API service implementation for rate limiting and error handling
echo "Checking SERP service implementation..."
rg -A 10 "class SerpService" services/apps/premium/members_enrichment_worker/src/sources/serp/service.ts

echo "Checking SERP API types..."
cat services/apps/premium/members_enrichment_worker/src/sources/serp/types.ts

echo "Checking for SERP API usage/credit checks..."
rg -A 5 "hasRemainingCredits" services/apps/premium/members_enrichment_worker/src/sources/serp/service.ts

Length of output: 2113


Script:

#!/bin/bash
# Check SERP service implementation for request handling and error management
echo "Checking SERP service getData implementation..."
rg -B 5 -A 15 "getData\(input:" services/apps/premium/members_enrichment_worker/src/sources/serp/service.ts

echo "Checking for concurrent request handling..."
rg -B 5 -A 10 "maxConcurrentRequests" services/apps/premium/members_enrichment_worker/src/sources/serp/service.ts

echo "Checking for error handling patterns..."
rg -A 10 "catch\(" services/apps/premium/members_enrichment_worker/src/sources/serp/service.ts

Length of output: 1859


Script:

#!/bin/bash
# Check SERP API query implementation
echo "Checking querySerpApi implementation..."
rg -B 5 -A 20 "querySerpApi" services/apps/premium/members_enrichment_worker/src/sources/serp/service.ts

echo "Checking for any retry or backoff mechanisms..."
rg -A 10 "retries|backoff|delay" services/apps/premium/members_enrichment_worker/src/sources/serp/service.ts

echo "Checking for any rate limit handling in error cases..."
rg -A 10 "error|throw|catch" services/apps/premium/members_enrichment_worker/src/sources/serp/service.ts

Length of output: 2584

services/apps/premium/members_enrichment_worker/src/sources/clearbit/service.ts (1)

Line range hint 34-66: Verify enrichment threshold and concurrency implementation.

Let's verify the consistency of these changes across the codebase:

✅ Verification successful

Enrichment threshold and concurrency settings are properly implemented

The verification shows consistent implementation across enrichment services:

  • Each service properly defines and uses enrichMembersWithActivityMoreThan in both SQL queries and runtime checks
  • Concurrency control is well-implemented:
    • maxConcurrentRequests is defined per service (Clearbit: 15, Crustdata: 5, SERP: 300)
    • The system uses the minimum concurrent requests among active sources
    • Rate limiting is handled by ConcurrentRequestLimiter in Redis
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check usage of enrichMembersWithActivityMoreThan and maxConcurrentRequests
echo "Checking activity threshold usage:"
rg "enrichMembersWithActivityMoreThan" -A 2

echo -e "\nChecking concurrent requests implementation:"
rg "maxConcurrentRequests" -A 2

echo -e "\nChecking SQL queries using the activity threshold:"
rg "enrichableBySql.*total_count" -A 2

Length of output: 9553

services/apps/premium/members_enrichment_worker/src/activities/enrichment.ts (2)

155-161: LGTM! Good backward compatibility.

The addition of the returnRowsWithoutData parameter with a default value maintains backward compatibility while extending functionality.


Line range hint 47-206: Verify concurrent request handling.

The AI summary mentions changes related to concurrent request management, but the implementation in this file doesn't show explicit handling of concurrent requests. Let's verify the integration with other components.

✅ Verification successful

Let me generate one more verification script to check the concurrent request handling implementation in the enrichment services:

This will help us verify if the enrichment services properly implement concurrency controls and rate limiting to handle concurrent requests safely.


Concurrent request handling is properly implemented

The code review concern about concurrent request handling is unfounded. The codebase implements robust concurrency controls through:

  1. Rate limiting via RateLimiter class that enforces request limits within time windows
  2. Concurrent request limiting through ConcurrentRequestLimiter class
  3. Redis-based distributed caching that handles concurrent access
  4. Database transactions that ensure data consistency (svc.postgres.writer.connection())

The enrichment worker's implementation follows best practices for handling concurrent requests:

  • Uses atomic database operations for cache updates
  • Implements distributed rate limiting through Redis
  • Employs connection pooling for database access
  • Utilizes transaction isolation for cache updates

The changes in the enrichment.ts file are safe for concurrent execution as they rely on these underlying mechanisms.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for concurrent request handling implementation
# Expected: Find maxConcurrentRequests usage and potential race conditions

# Check for maxConcurrentRequests property in enrichment services
echo "Checking for maxConcurrentRequests in enrichment services..."
rg -l "maxConcurrentRequests" --type ts

# Look for potential race conditions in cache access
echo "Checking for potential race conditions in cache access..."
rg -A 5 "MemberEnrichmentCache" --type ts

Length of output: 30770


Script:

#!/bin/bash
# Check for rate limiting and concurrency control implementations
echo "Checking rate limiting implementations..."
ast-grep --pattern 'class $_ implements IEnrichmentService {
  $$$
  maxConcurrentRequests = $_
  $$$
}'

echo "Checking for rate limiter usage..."
rg -A 5 "RateLimiter" --type ts

Length of output: 9915

services/apps/premium/members_enrichment_worker/src/sources/progai/service.ts (1)

36-36: LGTM! Verify consistent usage across enrichment services.

The SQL condition correctly aligns with the enrichment logic in isEnrichableBySource(). Making this property public enables external monitoring components to query enrichable members.

Let's verify that other enrichment services follow the same pattern:

✅ Verification successful

All enrichment services consistently use public enrichableBySql property with appropriate conditions

The verification confirms that all enrichment services follow the same pattern:

  • All services declare enrichableBySql as public
  • Each service has appropriate SQL conditions matching their specific enrichment requirements:
    • serp and clearbit: Activity count threshold + specific member identity conditions
    • progai: GitHub username or email verification
    • progai-linkedin-scraper: LinkedIn username verification
    • crustdata: Activity count threshold + LinkedIn username verification
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for enrichableBySql property in other enrichment services
rg -A 1 "enrichableBySql\s*=" "services/apps/premium/members_enrichment_worker/src/sources/"

Length of output: 1605

services/apps/premium/members_enrichment_worker/src/sources/crustdata/service.ts (1)

50-51: 🛠️ Refactor suggestion

Implement rate limiting using maxConcurrentRequests.

The maxConcurrentRequests property is added but not actively used to control API calls. Consider implementing rate limiting in the getDataUsingLinkedinHandle method to prevent overwhelming the Crustdata API.

Consider using a rate limiting library or implementing a semaphore:

import { Semaphore } from 'async-mutex';

export default class EnrichmentServiceCrustdata extends LoggerBase implements IEnrichmentService {
  private semaphore: Semaphore;

  constructor(public readonly log: Logger) {
    super(log);
    this.semaphore = new Semaphore(this.maxConcurrentRequests);
  }

  private async getDataUsingLinkedinHandle(handle: string): Promise<IMemberEnrichmentDataCrustdata> {
    return await this.semaphore.acquire().then(async (release) => {
      try {
        // ... existing API call code ...
      } finally {
        release();
      }
    });
  }
}
services/libs/data-access-layer/src/old/apps/premium/members_enrichment_worker/index.ts (1)

526-526: LGTM! The changes enhance the flexibility of enrichment cache retrieval.

The addition of the returnRowsWithoutData parameter with a default value of false maintains backward compatibility while allowing the function to optionally return all cache entries, including those without data. This enhancement appears to support the broader enrichment monitoring improvements mentioned in the PR objectives.

Let's verify the usage of this function to ensure the changes don't introduce any issues:

Also applies to: 528-528, 534-534

✅ Verification successful

The changes to findMemberEnrichmentCacheForAllSourcesDb are safe and properly used

The function is used correctly in the enrichment activities with the new returnRowsWithoutData parameter being properly passed through. The change maintains backward compatibility as seen in the implementation where:

  • The default value matches the original behavior (returnRowsWithoutData = false)
  • The function is called with explicit parameter in the enrichment activities
  • The SQL query modification is properly guarded with a conditional filter
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for all usages of findMemberEnrichmentCacheForAllSourcesDb
# to ensure they handle both cases (with and without data) correctly

# Search for function calls
rg -A 5 "findMemberEnrichmentCacheForAllSourcesDb" --type typescript

Length of output: 103


Script:

#!/bin/bash
# Let's try a different approach using ripgrep without file type filtering
rg -l "findMemberEnrichmentCacheForAllSourcesDb"

# Also search for the function definition to understand its usage
ast-grep --pattern 'async function findMemberEnrichmentCacheForAllSourcesDb($_) {
  $$$
}'

Length of output: 306


Script:

#!/bin/bash
# Let's examine how this function is used in the enrichment activities
rg -A 10 "findMemberEnrichmentCacheForAllSourcesDb" services/apps/premium/members_enrichment_worker/src/activities/enrichment.ts

# Also check the function implementation and its callers in the DAL
rg -A 10 "findMemberEnrichmentCacheForAllSourcesDb" services/libs/data-access-layer/src/old/apps/premium/members_enrichment_worker/index.ts

Length of output: 1351

backend/src/database/migrations/R__memberEnrichmentMaterializedViews.sql (1)

Line range hint 1-15: LGTM! Well-structured view with appropriate indexes.

The materialized view effectively aggregates member activity counts and includes proper indexing for both member ID lookups and sorting by total count.

services/apps/premium/members_enrichment_worker/src/activities/getMembers.ts (1)

10-12: Imports are correctly added

The new imports for IEnrichmentService, getEnrichmentInput, and getObsoleteSourcesOfMember are necessary for the new functionality and are correctly specified.

Comment on lines 42 to 44
const parallelEnrichmentWorkflows = await getMaxConcurrentRequests(members, sources)

const chunks = chunkArray<IEnrichableMember>(members, parallelEnrichmentWorkflows)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add validation for concurrent requests value

The code assumes getMaxConcurrentRequests always returns a positive number. Consider adding validation to prevent potential issues.

-  const parallelEnrichmentWorkflows = await getMaxConcurrentRequests(members, sources)
+  const parallelEnrichmentWorkflows = await getMaxConcurrentRequests(members, sources)
+  if (!parallelEnrichmentWorkflows || parallelEnrichmentWorkflows < 1) {
+    throw new Error('Invalid concurrent requests value. Must be a positive number.')
+  }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const parallelEnrichmentWorkflows = await getMaxConcurrentRequests(members, sources)
const chunks = chunkArray<IEnrichableMember>(members, parallelEnrichmentWorkflows)
const parallelEnrichmentWorkflows = await getMaxConcurrentRequests(members, sources)
if (!parallelEnrichmentWorkflows || parallelEnrichmentWorkflows < 1) {
throw new Error('Invalid concurrent requests value. Must be a positive number.')
}
const chunks = chunkArray<IEnrichableMember>(members, parallelEnrichmentWorkflows)

Comment on lines +62 to +66
return (
input.activityCount > this.enrichMembersWithActivityMoreThan &&
!!input.email?.value &&
input.email?.verified
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Remove unnecessary async and add input validation.

The method is marked as async but contains no asynchronous operations. Additionally, consider adding input validation:

-  async isEnrichableBySource(input: IEnrichmentSourceInput): Promise<boolean> {
+  isEnrichableBySource(input: IEnrichmentSourceInput): boolean {
+    if (!input?.activityCount || !input?.email) {
+      return false;
+    }
+
     return (
       input.activityCount > this.enrichMembersWithActivityMoreThan &&
       !!input.email?.value &&
       input.email?.verified
     )
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
return (
input.activityCount > this.enrichMembersWithActivityMoreThan &&
!!input.email?.value &&
input.email?.verified
)
isEnrichableBySource(input: IEnrichmentSourceInput): boolean {
if (!input?.activityCount || !input?.email) {
return false;
}
return (
input.activityCount > this.enrichMembersWithActivityMoreThan &&
!!input.email?.value &&
input.email?.verified
)
}

Comment on lines +40 to +41
public maxConcurrentRequests = 1000

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

⚠️ Potential issue

Based on the comprehensive search results, I can now provide a final response:

The concurrent request limit of 1000 is too high and should be reduced

The high limit of 1000 concurrent requests for the ProgAI LinkedIn scraper service is concerning because:

  1. It's significantly higher than other enrichment services in the codebase (Clearbit: 15, Crustdata: 5, SerpApi: 300)
  2. The service makes requests to LinkedIn profiles via ProgAI's scraper, which could trigger LinkedIn's rate limiting
  3. No documentation or configuration exists to justify this high limit
  4. The service uses a shared Redis-based rate limiter implementation, suggesting consistency with other services is important

Consider:

  • Reducing the limit to 300 or lower to align with other services
  • Making the limit configurable via CROWD_ENRICHMENT_PROGAI_MAX_CONCURRENT environment variable
  • Adding documentation about rate limiting considerations
🔗 Analysis chain

Review the concurrent request limit value.

The maxConcurrentRequests value of 1000 seems significantly higher compared to other enrichment services (Clearbit: 15, Crustdata: 5, SerpApi: 300). This high limit might:

  1. Overwhelm the ProgAI LinkedIn scraper service
  2. Trigger rate limiting or IP blocking from LinkedIn
  3. Consume excessive system resources

Let's verify if there are any rate limiting configurations or documentation:

Consider:

  1. Aligning the limit with other services (300 or lower)
  2. Adding rate limiting documentation
  3. Making this configurable via environment variables

Based on the search results, I can see that:

  1. The codebase has a rate limiting mechanism implemented through Redis
  2. Other enrichment services have much lower concurrent request limits:
    • Clearbit: 15
    • Crustdata: 5
    • SerpApi: 300
  3. Both ProgAI services (regular and LinkedIn scraper) are set to 1000 concurrent requests
  4. The system has a ConcurrentRequestLimiter class that enforces these limits

Let me verify the ProgAI service configuration and documentation:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for rate limiting related configurations or documentation
rg -i "rate.?limit|concurrent.?requests" --type md
rg "CROWD_ENRICHMENT_PROGAI" .env* 

Length of output: 135


Script:

#!/bin/bash
# Let's search for rate limiting configurations and usage across the codebase
rg -i "maxconcurrentrequests" -A 3 -B 3

# Search for any ProgAI configuration or documentation
fd -e md -e ts -e js -x rg -l "progai|prog.?ai"

# Look for similar enrichment services to compare configurations
ast-grep --pattern 'maxConcurrentRequests = $_'

Length of output: 79520


Script:

#!/bin/bash
# Check for ProgAI configuration files and documentation
rg -i "progai|prog.?ai" -A 5 -B 5 services/apps/premium/members_enrichment_worker/src/sources/progai-linkedin-scraper/

# Look for any rate limiting documentation in README files
fd -e md -x rg -l "rate.?limit|concurrent.?requests|progai"

Length of output: 12715

Comment on lines +47 to +90
export async function getEnrichmentInput(
input: IEnrichableMember,
): Promise<IEnrichmentSourceInput> {
const enrichmentInput: IEnrichmentSourceInput = {
memberId: input.id,
email: input.identities.find((i) => i.verified && i.type === MemberIdentityType.EMAIL),
linkedin: input.identities.find(
(i) =>
i.verified &&
i.platform === PlatformType.LINKEDIN &&
i.type === MemberIdentityType.USERNAME,
),
displayName: input.displayName || undefined,
website: input.website || undefined,
location: input.location || undefined,
activityCount: input.activityCount || 0,
}

// there can be multiple verified identities in github, we select the one with the most activities
const verifiedGithubIdentities = input.identities.filter(
(i) =>
i.verified && i.platform === PlatformType.GITHUB && i.type === MemberIdentityType.USERNAME,
)

if (verifiedGithubIdentities.length > 1) {
const ghIdentityWithTheMostActivities = await findMemberIdentityWithTheMostActivityInPlatform(
input.id,
PlatformType.GITHUB,
)
if (ghIdentityWithTheMostActivities) {
enrichmentInput.github = input.identities.find(
(i) =>
i.verified &&
i.platform === PlatformType.GITHUB &&
i.type === MemberIdentityType.USERNAME &&
i.value === ghIdentityWithTheMostActivities.username,
)
}
} else {
enrichmentInput.github = verifiedGithubIdentities?.[0] || undefined
}

return enrichmentInput
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling and input validation.

The function lacks error handling for the findMemberIdentityWithTheMostActivityInPlatform call and doesn't validate if verified identities exist before accessing them.

Consider applying these improvements:

 export async function getEnrichmentInput(
   input: IEnrichableMember,
 ): Promise<IEnrichmentSourceInput> {
+  if (!input || !input.identities) {
+    throw new Error('Invalid input: Member or identities are missing');
+  }
+
   const enrichmentInput: IEnrichmentSourceInput = {
     memberId: input.id,
     email: input.identities.find((i) => i.verified && i.type === MemberIdentityType.EMAIL),
     linkedin: input.identities.find(
       (i) =>
         i.verified &&
         i.platform === PlatformType.LINKEDIN &&
         i.type === MemberIdentityType.USERNAME,
     ),
     displayName: input.displayName || undefined,
     website: input.website || undefined,
     location: input.location || undefined,
     activityCount: input.activityCount || 0,
   }

   // Extract GitHub identity selection to a separate function for better readability
+  enrichmentInput.github = await selectGithubIdentity(input);
   
   return enrichmentInput
 }
+
+async function selectGithubIdentity(input: IEnrichableMember) {
+  try {
     const verifiedGithubIdentities = input.identities.filter(
       (i) =>
         i.verified && i.platform === PlatformType.GITHUB && i.type === MemberIdentityType.USERNAME,
     )
 
     if (verifiedGithubIdentities.length > 1) {
       const ghIdentityWithTheMostActivities = await findMemberIdentityWithTheMostActivityInPlatform(
         input.id,
         PlatformType.GITHUB,
       )
-      if (ghIdentityWithTheMostActivities) {
-        enrichmentInput.github = input.identities.find(
+      return ghIdentityWithTheMostActivities
+        ? input.identities.find(
           (i) =>
             i.verified &&
             i.platform === PlatformType.GITHUB &&
             i.type === MemberIdentityType.USERNAME &&
             i.value === ghIdentityWithTheMostActivities.username,
         )
-      }
+        : undefined
     } else {
-      enrichmentInput.github = verifiedGithubIdentities?.[0] || undefined
+      return verifiedGithubIdentities[0]
     }
+  } catch (error) {
+    svc.log.error('Error selecting GitHub identity', { error, memberId: input.id });
+    return undefined;
+  }
+}

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines 194 to 206
export async function getObsoleteSourcesOfMember(
memberId: string,
possibleSources: MemberEnrichmentSource[],
): Promise<MemberEnrichmentSource[]> {
const caches = await findMemberEnrichmentCacheForAllSources(memberId, true)
const obsoleteSources = possibleSources.filter((source) =>
isCacheObsolete(
source,
caches.find((s) => s.source === source),
),
)
return obsoleteSources
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix async operation in Array.filter and add error handling.

The function has a critical issue where isCacheObsolete is an async function but is used in Array.filter without proper await handling.

Apply this fix to handle async operations correctly and add error handling:

 export async function getObsoleteSourcesOfMember(
   memberId: string,
   possibleSources: MemberEnrichmentSource[],
 ): Promise<MemberEnrichmentSource[]> {
-  const caches = await findMemberEnrichmentCacheForAllSources(memberId, true)
-  const obsoleteSources = possibleSources.filter((source) =>
-    isCacheObsolete(
-      source,
-      caches.find((s) => s.source === source),
-    ),
-  )
-  return obsoleteSources
+  try {
+    const caches = await findMemberEnrichmentCacheForAllSources(memberId, true);
+    const obsoleteSources = await Promise.all(
+      possibleSources.map(async (source) => {
+        const isObsolete = await isCacheObsolete(
+          source,
+          caches.find((s) => s.source === source),
+        );
+        return isObsolete ? source : null;
+      })
+    );
+    return obsoleteSources.filter((source): source is MemberEnrichmentSource => source !== null);
+  } catch (error) {
+    svc.log.error('Error checking obsolete sources', { error, memberId });
+    throw error;
+  }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
export async function getObsoleteSourcesOfMember(
memberId: string,
possibleSources: MemberEnrichmentSource[],
): Promise<MemberEnrichmentSource[]> {
const caches = await findMemberEnrichmentCacheForAllSources(memberId, true)
const obsoleteSources = possibleSources.filter((source) =>
isCacheObsolete(
source,
caches.find((s) => s.source === source),
),
)
return obsoleteSources
}
export async function getObsoleteSourcesOfMember(
memberId: string,
possibleSources: MemberEnrichmentSource[],
): Promise<MemberEnrichmentSource[]> {
try {
const caches = await findMemberEnrichmentCacheForAllSources(memberId, true);
const obsoleteSources = await Promise.all(
possibleSources.map(async (source) => {
const isObsolete = await isCacheObsolete(
source,
caches.find((s) => s.source === source),
);
return isObsolete ? source : null;
})
);
return obsoleteSources.filter((source): source is MemberEnrichmentSource => source !== null);
} catch (error) {
svc.log.error('Error checking obsolete sources', { error, memberId });
throw error;
}
}

Comment on lines +41 to +42
public maxConcurrentRequests = 1000

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Verify the high concurrent request limit.

The maxConcurrentRequests value of 1000 is significantly higher than other enrichment services (Clearbit: 15, Crustdata: 5, SerpApi: 300). This could potentially:

  1. Overwhelm the ProgAI API
  2. Trigger rate limiting
  3. Impact system resources

Please verify:

  • ProgAI API's documented rate limits
  • System resource capacity
  • Historical concurrent request patterns

Consider:

  1. Aligning with other services' more conservative limits
  2. Adding rate limit handling in getData methods
  3. Making this configurable via environment variables
-  public maxConcurrentRequests = 1000
+  public maxConcurrentRequests = parseInt(process.env.PROGAI_MAX_CONCURRENT_REQUESTS || '100', 10)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public maxConcurrentRequests = 1000
public maxConcurrentRequests = parseInt(process.env.PROGAI_MAX_CONCURRENT_REQUESTS || '100', 10)

Comment on lines 62 to 68
Array.from(distinctEnrichableSources).forEach(async (source) => {
smallestMaxConcurrentRequests = Math.min(
smallestMaxConcurrentRequests,
serviceMap[source].maxConcurrentRequests,
)
})

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid unnecessary async in forEach callback

The forEach callback is marked as async, but it doesn't contain any await expressions. Moreover, using async with forEach does not behave as expected because forEach does not wait for Promises. Since the code inside the loop is synchronous, you can remove the async keyword.

Apply this diff:

-  Array.from(distinctEnrichableSources).forEach(async (source) => {
+  Array.from(distinctEnrichableSources).forEach((source) => {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Array.from(distinctEnrichableSources).forEach(async (source) => {
smallestMaxConcurrentRequests = Math.min(
smallestMaxConcurrentRequests,
serviceMap[source].maxConcurrentRequests,
)
})
Array.from(distinctEnrichableSources).forEach((source) => {
smallestMaxConcurrentRequests = Math.min(
smallestMaxConcurrentRequests,
serviceMap[source].maxConcurrentRequests,
)
})

Comment on lines 52 to 55
(await serviceMap[source].isEnrichableBySource(enrichmentInput)) &&
(obsoleteSources as string[]).includes(source)
) {
distinctEnrichableSources.add(source as MemberEnrichmentSource)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Logical error: Exclude obsolete sources instead of including them

In the if condition, you're checking if the source is included in obsoleteSources. This means you're adding sources that are obsolete, which might not be the intended behavior. To add only non-obsolete sources, the condition should check for sources not included in obsoleteSources.

Apply this diff to fix the logical condition:

-        (obsoleteSources as string[]).includes(source)
+        !(obsoleteSources as string[]).includes(source)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
(await serviceMap[source].isEnrichableBySource(enrichmentInput)) &&
(obsoleteSources as string[]).includes(source)
) {
distinctEnrichableSources.add(source as MemberEnrichmentSource)
(await serviceMap[source].isEnrichableBySource(enrichmentInput)) &&
!(obsoleteSources as string[]).includes(source)
) {
distinctEnrichableSources.add(source as MemberEnrichmentSource)

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (2)
services/apps/premium/members_enrichment_worker/src/activities/enrichment.ts (2)

155-161: Add documentation for the new parameter.

The purpose of returnRowsWithoutData parameter isn't immediately clear. Consider adding JSDoc comments explaining:

  • What this parameter controls
  • When it should be set to true/false
  • Impact on performance/data retrieval

194-210: Add error handling and improve type safety.

The function could benefit from:

  1. Error handling for Promise.all
  2. Stronger type safety instead of type assertion

Consider this improvement:

 export async function getObsoleteSourcesOfMember(
   memberId: string,
   possibleSources: MemberEnrichmentSource[],
 ): Promise<MemberEnrichmentSource[]> {
-  const caches = await findMemberEnrichmentCacheForAllSources(memberId, true)
+  try {
+    const caches = await findMemberEnrichmentCacheForAllSources(memberId, true)
 
-  const obsoleteSourcesPromises = possibleSources.map(async (source) => {
-    const cache = caches.find((s) => s.source === source)
-    return (await isCacheObsolete(source, cache)) ? source : null
-  })
+    const obsoleteSourcesPromises = possibleSources.map(async (source) => {
+      const cache = caches.find((s) => s.source === source)
+      return (await isCacheObsolete(source, cache)) ? source : null
+    })
 
-  const obsoleteSources = (await Promise.all(obsoleteSourcesPromises)).filter(
-    (source) => source !== null,
-  )
+    const results = await Promise.all(obsoleteSourcesPromises)
+    return results.filter((source): source is MemberEnrichmentSource => source !== null)
 
-  return obsoleteSources as MemberEnrichmentSource[]
+  } catch (error) {
+    svc.log.error('Error checking obsolete sources', { error, memberId })
+    throw error
+  }
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 9f08e83 and e3a4fb3.

📒 Files selected for processing (1)
  • services/apps/premium/members_enrichment_worker/src/activities/enrichment.ts (4 hunks)
🔇 Additional comments (2)
services/apps/premium/members_enrichment_worker/src/activities/enrichment.ts (2)

12-17: LGTM!

The new type imports are correctly added and used throughout the file.


72-75: Consider potential race condition in GitHub identity selection.

The findMemberIdentityWithTheMostActivityInPlatform call could return different results if member activities are being updated concurrently, potentially leading to inconsistent identity selection.

Let's verify if there are any concurrent updates to member activities:

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between e3a4fb3 and 3cb5ef3.

📒 Files selected for processing (2)
  • services/apps/premium/members_enrichment_worker/src/activities/enrichment.ts (6 hunks)
  • services/apps/premium/members_enrichment_worker/src/utils/common.ts (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • services/apps/premium/members_enrichment_worker/src/activities/enrichment.ts
🔇 Additional comments (2)
services/apps/premium/members_enrichment_worker/src/utils/common.ts (2)

3-11: LGTM! Well-organized imports.

The new imports are properly organized and necessary for the cache validation functionality.


119-129: Verify enrichment service configurations.

Let's ensure all enrichment services properly define the cache expiration time.

✅ Verification successful

Let me try a different search strategy to verify the enrichment services configuration.

This broader search should help us:

  1. Find all enrichment service implementations even if the interface implementation syntax varies
  2. See the class definitions and their properties
  3. Understand the interface requirements regarding cache configuration

Let me try one more search to specifically locate the interface definition and cache configuration.

This should help us:

  1. Find the interface definition to understand required properties
  2. Locate files containing cache expiration configuration
  3. See how cache expiration is implemented

Cache expiration is properly configured across all enrichment services

All enrichment services have correctly implemented the required cacheObsoleteAfterSeconds property with appropriate values:

  • Clearbit: 120 days
  • SERP: 120 days
  • ProgAI: 90 days
  • ProgAI LinkedIn Scraper: 90 days
  • Crustdata: 90 days

The cache obsolescence check is properly integrated in the enrichment workflow and used correctly in isCacheObsolete() function.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check if all enrichment services define cacheObsoleteAfterSeconds

# Find all enrichment service files
echo "Searching for enrichment service implementations..."
rg -l "implements\s+IEnrichmentSourceService" 

# Check for cacheObsoleteAfterSeconds in each service
echo "Checking for cache expiration configuration..."
ast-grep --pattern 'class $_ implements IEnrichmentSourceService {
  $$$
  cacheObsoleteAfterSeconds = $_
  $$$
}'

Length of output: 380


Script:

#!/bin/bash
# Search for enrichment service files and their cache configuration using a broader pattern
echo "Searching for enrichment service files..."
rg -l "class.*EnrichmentService"

echo -e "\nChecking cache configuration in these files..."
rg -A 5 "class.*EnrichmentService" 

echo -e "\nSearching for interface definition..."
rg "interface.*IEnrichmentSourceService"

Length of output: 4920


Script:

#!/bin/bash
# Search for interface and cache configuration
echo "Searching for IEnrichmentSourceService or IEnrichmentService interface..."
rg -A 10 "interface\s+(IEnrichmentSourceService|IEnrichmentService)"

echo -e "\nSearching specifically for cache expiration configuration..."
rg -l "cacheObsoleteAfterSeconds"

echo -e "\nChecking content of files with cache configuration..."
rg -A 2 "cacheObsoleteAfterSeconds"

Length of output: 6213

Comment on lines 119 to 129
export function isCacheObsolete(
source: MemberEnrichmentSource,
cache: IMemberEnrichmentCache<IMemberEnrichmentData>,
logger: Logger,
): boolean {
const service = EnrichmentSourceServiceFactory.getEnrichmentSourceService(source, logger)
return (
!cache ||
Date.now() - new Date(cache.updatedAt).getTime() > 1000 * service.cacheObsoleteAfterSeconds
)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add safety checks and improve type safety.

While the function's logic is correct, it could benefit from additional safety checks:

Consider applying these improvements:

 export function isCacheObsolete(
   source: MemberEnrichmentSource,
   cache: IMemberEnrichmentCache<IMemberEnrichmentData>,
   logger: Logger,
 ): boolean {
   const service = EnrichmentSourceServiceFactory.getEnrichmentSourceService(source, logger)
+  if (!service?.cacheObsoleteAfterSeconds) {
+    logger.warn('Cache obsolescence check failed: Invalid service configuration', {
+      source,
+      serviceConfig: service,
+    })
+    return true // Assume obsolete if config is invalid
+  }
+
+  if (!cache?.updatedAt) {
+    return true
+  }
+
+  const cacheDate = new Date(cache.updatedAt)
+  if (isNaN(cacheDate.getTime())) {
+    logger.warn('Invalid cache date', { updatedAt: cache.updatedAt })
+    return true
+  }
+
   return (
-    !cache ||
-    Date.now() - new Date(cache.updatedAt).getTime() > 1000 * service.cacheObsoleteAfterSeconds
+    Date.now() - cacheDate.getTime() > 1000 * service.cacheObsoleteAfterSeconds
   )
 }

These changes:

  1. Add null/undefined checks for service configuration
  2. Add validation for cache.updatedAt
  3. Handle invalid dates
  4. Improve error logging
  5. Maintain milliseconds conversion clarity
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
export function isCacheObsolete(
source: MemberEnrichmentSource,
cache: IMemberEnrichmentCache<IMemberEnrichmentData>,
logger: Logger,
): boolean {
const service = EnrichmentSourceServiceFactory.getEnrichmentSourceService(source, logger)
return (
!cache ||
Date.now() - new Date(cache.updatedAt).getTime() > 1000 * service.cacheObsoleteAfterSeconds
)
}
export function isCacheObsolete(
source: MemberEnrichmentSource,
cache: IMemberEnrichmentCache<IMemberEnrichmentData>,
logger: Logger,
): boolean {
const service = EnrichmentSourceServiceFactory.getEnrichmentSourceService(source, logger)
if (!service?.cacheObsoleteAfterSeconds) {
logger.warn('Cache obsolescence check failed: Invalid service configuration', {
source,
serviceConfig: service,
})
return true // Assume obsolete if config is invalid
}
if (!cache?.updatedAt) {
return true
}
const cacheDate = new Date(cache.updatedAt)
if (isNaN(cacheDate.getTime())) {
logger.warn('Invalid cache date', { updatedAt: cache.updatedAt })
return true
}
return (
Date.now() - cacheDate.getTime() > 1000 * service.cacheObsoleteAfterSeconds
)
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (1)
services/apps/premium/members_enrichment_worker/src/activities/enrichment.ts (1)

162-168: Document the purpose and impact of returnRowsWithoutData parameter.

The new parameter's purpose and its effect on data retrieval isn't immediately clear from the code. Consider adding JSDoc comments to explain:

  • When to use this parameter
  • What data is included/excluded when it's true/false
  • Performance implications
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 3cb5ef3 and 3788cc0.

📒 Files selected for processing (2)
  • services/apps/premium/members_enrichment_worker/src/activities/enrichment.ts (5 hunks)
  • services/apps/premium/members_enrichment_worker/src/utils/common.ts (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • services/apps/premium/members_enrichment_worker/src/utils/common.ts
🔇 Additional comments (1)
services/apps/premium/members_enrichment_worker/src/activities/enrichment.ts (1)

104-110: Consider potential race conditions with synchronous cache checks.

The synchronous isCacheObsoleteSync function uses Date.now() for timestamp comparison, which could lead to race conditions in concurrent scenarios. Consider using a distributed locking mechanism or adding a small buffer to the expiration time to prevent edge cases.

Comment on lines +201 to +213
export async function getObsoleteSourcesOfMember(
memberId: string,
possibleSources: MemberEnrichmentSource[],
): Promise<MemberEnrichmentSource[]> {
const caches = await findMemberEnrichmentCacheForAllSources(memberId, true)
const obsoleteSources = possibleSources.filter((source) =>
isCacheObsoleteSync(
source,
caches.find((c) => c.source === source),
),
)
return obsoleteSources
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for database operations.

The function should handle potential database errors when calling findMemberEnrichmentCacheForAllSources. Consider wrapping the operation in a try-catch block and implementing appropriate error handling:

 export async function getObsoleteSourcesOfMember(
   memberId: string,
   possibleSources: MemberEnrichmentSource[],
 ): Promise<MemberEnrichmentSource[]> {
+  try {
     const caches = await findMemberEnrichmentCacheForAllSources(memberId, true)
     const obsoleteSources = possibleSources.filter((source) =>
       isCacheObsoleteSync(
         source,
         caches.find((c) => c.source === source),
       ),
     )
     return obsoleteSources
+  } catch (error) {
+    svc.log.error('Failed to get obsolete sources', { error, memberId });
+    throw error;
+  }
 }

Committable suggestion skipped: line range outside the PR's diff.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Outside diff range and nitpick comments (2)
services/apps/premium/members_enrichment_worker/src/activities/getMembers.ts (1)

35-78: Consider optimizing the implementation for better performance

The current implementation has several areas for improvement:

  1. Nested loops over members and sources could be optimized
  2. Redundant max calculation in the first loop
  3. Multiple async operations that could potentially be parallelized

Consider this alternative implementation:

export async function getMaxConcurrentRequests(
  members: IEnrichableMember[],
  possibleSources: MemberEnrichmentSource[],
): Promise<number> {
  // Initialize services map once
  const serviceMap = Object.fromEntries(
    possibleSources.map(source => [
      source,
      EnrichmentSourceServiceFactory.getEnrichmentSourceService(source, svc.log)
    ])
  );

  // Get enrichment inputs for all members in parallel
  const enrichmentInputs = await Promise.all(
    members.map(member => getEnrichmentInput(member))
  );

  // Get obsolete sources for all members in parallel
  const obsoleteSourcesMap = new Map(
    await Promise.all(
      members.map(async member => [
        member.id,
        await getObsoleteSourcesOfMember(member.id, possibleSources)
      ])
    )
  );

  // Find valid sources
  const enrichableSources = new Set<MemberEnrichmentSource>();
  await Promise.all(
    possibleSources.map(async source => {
      const service = serviceMap[source];
      const isEnrichable = await Promise.any(
        enrichmentInputs.map(input => service.isEnrichableBySource(input))
      );
      if (isEnrichable && !Array.from(obsoleteSourcesMap.values()).some(obsolete => 
        (obsolete as string[]).includes(source))) {
        enrichableSources.add(source);
      }
    })
  );

  // Calculate minimum concurrent requests
  const maxConcurrentRequests = Math.min(
    ...Array.from(enrichableSources).map(source => 
      serviceMap[source].maxConcurrentRequests
    )
  );

  svc.log.info('Setting max concurrent requests', { maxConcurrentRequests });
  return maxConcurrentRequests;
}
services/apps/premium/members_enrichment_worker/src/schedules/getMembersToEnrich.ts (1)

Line range hint 1-116: Document schedule coordination and dependencies.

The file defines three related schedules that appear to be coordinated:

  1. Materialized views refresh (5:00 AM)
  2. Member enrichment (6:00 AM)
  3. LFID enrichment (bi-monthly or every 2 minutes in dev/test)

Consider documenting these timing dependencies and the rationale behind the sequence to help maintainers understand the workflow orchestration.

Add a comment block at the top of the file explaining:

  • The purpose of each schedule
  • The reason for the specific timing sequence
  • Dependencies between the schedules
  • Different overlap policies used (SKIP vs BUFFER_ONE)
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 3788cc0 and 50aedb7.

📒 Files selected for processing (5)
  • services/apps/premium/members_enrichment_worker/src/activities/getMembers.ts (2 hunks)
  • services/apps/premium/members_enrichment_worker/src/schedules/getMembersToEnrich.ts (1 hunks)
  • services/apps/premium/members_enrichment_worker/src/types.ts (1 hunks)
  • services/apps/premium/members_enrichment_worker/src/workflows/getMembersToEnrich.ts (3 hunks)
  • services/libs/data-access-layer/src/old/apps/premium/members_enrichment_worker/index.ts (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • services/apps/premium/members_enrichment_worker/src/types.ts
  • services/libs/data-access-layer/src/old/apps/premium/members_enrichment_worker/index.ts
🔇 Additional comments (6)
services/apps/premium/members_enrichment_worker/src/workflows/getMembersToEnrich.ts (3)

16-18: LGTM: Activity proxy setup is well-configured

The proxy setup correctly includes the new getMaxConcurrentRequests activity with an appropriate timeout.


36-38: Previous validation concern is still applicable

The validation suggestion from the previous review remains relevant here.


20-20: Verify the impact of signature change

The function signature has been simplified by removing the args parameter. This is a breaking change that needs verification.

✅ Verification successful

Function signature change is part of pagination simplification

The signature change from getMembersToEnrich(args: IGetMembersForEnrichmentArgs) to getMembersToEnrich() is an intentional improvement that simplifies the pagination logic. The changes are self-contained within this workflow:

  • Removed the afterCursor parameter as it's no longer needed
  • Updated getEnrichableMembers call to remove the cursor parameter
  • Simplified the continueAsNew call to not pass any arguments

This is a safe change as:

  1. The function is only imported in workflows.ts for export purposes
  2. The git history shows this is a new file (added in commit c7919cb)
  3. No other files in the codebase are calling this function directly
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Find all calls to getMembersToEnrich to ensure they're updated
# to match the new signature

# Search for direct function calls
rg -l "getMembersToEnrich\(" --type ts

# Search for workflow executions
rg -l "executeWorkflow.*getMembersToEnrich" --type ts

Length of output: 180


Script:

#!/bin/bash
# Let's check the content of the file to understand the context better
cat services/apps/premium/members_enrichment_worker/src/workflows/getMembersToEnrich.ts

# Also search for any imports of this function
rg -l "import.*getMembersToEnrich" --type ts

# Search for any test files that might be using this function
fd -e test.ts -e spec.ts -x rg -l "getMembersToEnrich" {}

Length of output: 2222


Script:

#!/bin/bash
# Let's check the content of the workflows.ts file to see how it's being used
cat services/apps/premium/members_enrichment_worker/src/workflows.ts

# Let's also check for any historical changes to understand the context better
git log -p services/apps/premium/members_enrichment_worker/src/workflows/getMembersToEnrich.ts

Length of output: 7720

services/apps/premium/members_enrichment_worker/src/activities/getMembers.ts (2)

68-74: ⚠️ Potential issue

Remove unnecessary async in forEach

The forEach callback is marked as async but contains no await expressions. Additionally, the calculation of smallestMaxConcurrentRequests should be synchronous.

Apply this fix:

-  Array.from(distinctEnrichableSources).forEach(async (source) => {
+  Array.from(distinctEnrichableSources).forEach((source) => {
     smallestMaxConcurrentRequests = Math.min(
       smallestMaxConcurrentRequests,
       serviceMap[source].maxConcurrentRequests,
     )
   })

Likely invalid or redundant comment.


Line range hint 14-31: Verify the impact of removing cursor-based pagination

The removal of the afterCursor parameter might affect the system's ability to handle large datasets efficiently. Please ensure this change aligns with the requirements for processing large member lists.

services/apps/premium/members_enrichment_worker/src/schedules/getMembersToEnrich.ts (1)

32-32: Verify workflow compatibility with empty arguments.

The removal of afterCursor argument might impact the getMembersToEnrich workflow's functionality. Let's verify that the workflow implementation has been updated accordingly.

Consider documenting this interface change in the workflow's documentation or comments to help maintain API consistency.

✅ Verification successful

Empty arguments array is correct - workflow implementation verified

The workflow implementation in workflows/getMembersToEnrich.ts shows that getMembersToEnrich() function doesn't expect any parameters, confirming that the empty args: [] is the correct approach. The function internally uses QUERY_FOR_ENRICHABLE_MEMBERS_PER_RUN and a predefined list of sources to fetch members, without requiring an afterCursor parameter.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check if getMembersToEnrich workflow implementation still expects arguments
# Expected: The workflow should not reference afterCursor parameter

# Search for the workflow implementation
ast-grep --pattern 'export async function getMembersToEnrich($_) {
  $$$
}'

# Search for any remaining references to afterCursor
rg 'afterCursor' -A 5

Length of output: 99


Script:

#!/bin/bash
# Let's try a broader search to find the workflow implementation and its usage

# Find files containing getMembersToEnrich
rg -l "getMembersToEnrich"

# Search for function definition with more context
rg "function getMembersToEnrich" -A 10

# Look for files containing afterCursor to see if it's used elsewhere
rg -l "afterCursor"

# Search for schedule-related files in the premium service
fd "schedule" --type f

Length of output: 1981

activityCount: chunks[chunks.length - 1][chunks[chunks.length - 1].length - 1].activityCount,
},
})
await continueAsNew<typeof getMembersToEnrich>()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding delay for empty result sets

The workflow continues immediately when no members are found, which could lead to rapid infinite loops. Consider adding a delay when no members are found.

   if (members.length === 0) {
+    // Add delay to prevent rapid polling when no members are found
+    await activities.sleep('1 minute')
     return
   }

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines 56 to 64
Object.keys(serviceMap).forEach(async (source) => {
if (
(await serviceMap[source].isEnrichableBySource(enrichmentInput)) &&
(obsoleteSources as string[]).includes(source)
) {
distinctEnrichableSources.add(source as MemberEnrichmentSource)
}
})
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Critical: Fix async/await usage in forEach loop

The current implementation has two issues:

  1. Using async/await within forEach doesn't properly wait for promises
  2. The obsolete sources check has incorrect logic

Apply this fix:

-    Object.keys(serviceMap).forEach(async (source) => {
-      if (
-        (await serviceMap[source].isEnrichableBySource(enrichmentInput)) &&
-        (obsoleteSources as string[]).includes(source)
-      ) {
-        distinctEnrichableSources.add(source as MemberEnrichmentSource)
-      }
-    })
+    for (const source of Object.keys(serviceMap)) {
+      if (
+        (await serviceMap[source].isEnrichableBySource(enrichmentInput)) &&
+        !(obsoleteSources as string[]).includes(source)
+      ) {
+        distinctEnrichableSources.add(source as MemberEnrichmentSource)
+      }
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Object.keys(serviceMap).forEach(async (source) => {
if (
(await serviceMap[source].isEnrichableBySource(enrichmentInput)) &&
(obsoleteSources as string[]).includes(source)
) {
distinctEnrichableSources.add(source as MemberEnrichmentSource)
}
})
}
for (const source of Object.keys(serviceMap)) {
if (
(await serviceMap[source].isEnrichableBySource(enrichmentInput)) &&
!(obsoleteSources as string[]).includes(source)
) {
distinctEnrichableSources.add(source as MemberEnrichmentSource)
}
}
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (5)
services/apps/premium/members_enrichment_worker/src/workflows/getMembersToEnrich.ts (2)

Line range hint 20-29: Consider moving enrichment sources to configuration

While hardcoding sources improves maintainability, consider moving them to a configuration file for better flexibility and easier updates without code changes.

+import { ENRICHMENT_SOURCES } from '../config/sources'

 export async function getMembersToEnrich(): Promise<void> {
   const QUERY_FOR_ENRICHABLE_MEMBERS_PER_RUN = 1000
-  const sources = [
-    MemberEnrichmentSource.PROGAI,
-    MemberEnrichmentSource.CLEARBIT,
-    MemberEnrichmentSource.SERP,
-    MemberEnrichmentSource.PROGAI_LINKEDIN_SCRAPER,
-    MemberEnrichmentSource.CRUSTDATA,
-  ]
+  const sources = ENRICHMENT_SOURCES

36-40: Remove unnecessary parameter from getMaxConcurrentRequests

The QUERY_FOR_ENRICHABLE_MEMBERS_PER_RUN parameter appears unnecessary for calculating concurrent requests since you already have the actual members array.

 const parallelEnrichmentWorkflows = await getMaxConcurrentRequests(
   members,
-  sources,
-  QUERY_FOR_ENRICHABLE_MEMBERS_PER_RUN,
+  sources
 )
services/apps/premium/members_enrichment_worker/src/activities/getMembers.ts (3)

35-39: Add JSDoc documentation for the new function

This function implements important business logic for controlling concurrency. Consider adding JSDoc documentation to explain the purpose, parameters, and return value.

+/**
+ * Calculates the maximum number of concurrent requests allowed based on member activity counts
+ * and enrichment source configurations.
+ * @param members - Array of enrichable members to process
+ * @param possibleSources - Array of potential enrichment sources
+ * @param concurrencyLimit - Global maximum concurrent requests limit
+ * @returns Promise resolving to the calculated maximum concurrent requests
+ */
 export async function getMaxConcurrentRequests(

55-55: Enhance logging with additional context

The current log message could be more informative by including relevant context about how the limit was determined.

-  svc.log.info('Setting max concurrent requests', { maxConcurrentRequestsInAllSources })
+  svc.log.info('Setting max concurrent requests', {
+    maxConcurrentRequestsInAllSources,
+    currentProcessingActivityCount,
+    possibleSourcesCount: possibleSources.length,
+    originalConcurrencyLimit: concurrencyLimit
+  })

45-54: Consider using Object.entries for better type safety

The current implementation could be made more type-safe by using Object.entries when iterating over the service map.

-  for (const source of possibleSources) {
-    serviceMap[source] = EnrichmentSourceServiceFactory.getEnrichmentSourceService(source, svc.log)
-    const activityCountCutoff = serviceMap[source].enrichMembersWithActivityMoreThan
+  for (const source of possibleSources) {
+    const service = EnrichmentSourceServiceFactory.getEnrichmentSourceService(source, svc.log)
+    serviceMap[source] = service
+    const activityCountCutoff = service.enrichMembersWithActivityMoreThan
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 50aedb7 and 761611b.

📒 Files selected for processing (3)
  • services/apps/premium/members_enrichment_worker/src/activities/getMembers.ts (2 hunks)
  • services/apps/premium/members_enrichment_worker/src/types.ts (2 hunks)
  • services/apps/premium/members_enrichment_worker/src/workflows/getMembersToEnrich.ts (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • services/apps/premium/members_enrichment_worker/src/types.ts
🔇 Additional comments (5)
services/apps/premium/members_enrichment_worker/src/workflows/getMembersToEnrich.ts (3)

16-18: LGTM: Activity proxy configuration is well-structured

The addition of getMaxConcurrentRequests to the proxied activities with a 2-minute timeout is appropriate for this type of operation.


67-67: LGTM: Workflow continuation is properly simplified

The simplified continueAsNew call aligns well with the updated function signature.


Line range hint 42-66: Consider implementing rate limiting per enrichment source

The current architecture handles parallel processing well, but consider implementing per-source rate limiting to prevent overwhelming individual enrichment services. This could be achieved by tracking and respecting rate limits for each source separately.

Let's check if rate limiting is implemented in the enrichment services:

services/apps/premium/members_enrichment_worker/src/activities/getMembers.ts (2)

10-10: LGTM: Clean type import

The new import is properly typed and follows TypeScript best practices.


Line range hint 14-31: LGTM: Simplified function signature and implementation

The removal of the afterCursor parameter simplifies the function's interface while maintaining its core functionality. The implementation is clean and properly typed.

concurrencyLimit: number,
): Promise<number> {
const serviceMap: Partial<Record<MemberEnrichmentSource, IEnrichmentService>> = {}
const currentProcessingActivityCount = members[0].activityCount
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add input validation for members array

The code assumes the members array is non-empty when accessing the first element. This could lead to runtime errors if the array is empty.

+  if (!members.length) {
+    throw new Error('Members array cannot be empty');
+  }
   const currentProcessingActivityCount = members[0].activityCount
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const currentProcessingActivityCount = members[0].activityCount
if (!members.length) {
throw new Error('Members array cannot be empty');
}
const currentProcessingActivityCount = members[0].activityCount

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (2)
services/apps/premium/members_enrichment_worker/src/activities/getMembers.ts (2)

53-53: Enhance logging with additional context

The current log message could be more informative by including the input parameters that led to this decision.

Apply this diff:

-  svc.log.info('Setting max concurrent requests', { maxConcurrentRequestsInAllSources })
+  svc.log.info('Setting max concurrent requests', {
+    maxConcurrentRequestsInAllSources,
+    currentProcessingActivityCount,
+    possibleSourcesCount: possibleSources.length,
+    originalConcurrencyLimit: concurrencyLimit,
+  })

33-56: Consider caching maxConcurrentRequests results

For performance optimization, consider caching the maxConcurrentRequests results for similar activity count ranges, as the same calculation might be performed frequently for members with similar activity counts.

You could implement a simple LRU cache with activity count ranges as keys. This would help reduce redundant calculations while still maintaining accurate concurrency control.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 761611b and d893c8f.

📒 Files selected for processing (1)
  • services/apps/premium/members_enrichment_worker/src/activities/getMembers.ts (2 hunks)
🔇 Additional comments (2)
services/apps/premium/members_enrichment_worker/src/activities/getMembers.ts (2)

Line range hint 12-29: LGTM! Function signature simplification

The removal of the afterCursor parameter and corresponding update to fetchMembersForEnrichment call improves the function's clarity while maintaining its core functionality.


39-39: ⚠️ Potential issue

Add input validation for members array

The code assumes the members array is non-empty when accessing the first element. This could lead to runtime errors if the array is empty.

Comment on lines +45 to +51
const activityCountCutoff = serviceMap[source].enrichMembersWithActivityMoreThan
if (!activityCountCutoff || activityCountCutoff <= currentProcessingActivityCount) {
maxConcurrentRequestsInAllSources = Math.min(
maxConcurrentRequestsInAllSources,
serviceMap[source].maxConcurrentRequests,
)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add type assertion and improve error handling for activityCountCutoff

The activityCountCutoff value from the service should be validated to ensure it's a valid number.

Apply this diff:

-    const activityCountCutoff = serviceMap[source].enrichMembersWithActivityMoreThan
+    const activityCountCutoff = Number(serviceMap[source].enrichMembersWithActivityMoreThan)
+    if (isNaN(activityCountCutoff)) {
+      svc.log.warn('Invalid activity count cutoff', { source, activityCountCutoff })
+      continue
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const activityCountCutoff = serviceMap[source].enrichMembersWithActivityMoreThan
if (!activityCountCutoff || activityCountCutoff <= currentProcessingActivityCount) {
maxConcurrentRequestsInAllSources = Math.min(
maxConcurrentRequestsInAllSources,
serviceMap[source].maxConcurrentRequests,
)
}
const activityCountCutoff = Number(serviceMap[source].enrichMembersWithActivityMoreThan)
if (isNaN(activityCountCutoff)) {
svc.log.warn('Invalid activity count cutoff', { source, activityCountCutoff })
continue
}
if (!activityCountCutoff || activityCountCutoff <= currentProcessingActivityCount) {
maxConcurrentRequestsInAllSources = Math.min(
maxConcurrentRequestsInAllSources,
serviceMap[source].maxConcurrentRequests,
)
}

@epipav epipav merged commit be60f4b into main Nov 11, 2024
7 checks passed
@epipav epipav deleted the bugfix/enrichment-monitoring-fixes branch November 11, 2024 14:13
@coderabbitai coderabbitai bot mentioned this pull request Nov 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Bug Created by Linear-GitHub Sync
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant