From e3e07847deb3684609b90394dcbd72666266a412 Mon Sep 17 00:00:00 2001 From: anilb Date: Fri, 8 Nov 2024 16:02:30 +0300 Subject: [PATCH] Crustdata enrichment tweaks (#2680) --- .../R__memberEnrichmentMaterializedViews.sql | 619 ++++++++++++++++++ .../src/activities.ts | 2 + .../src/activities/enrichment.ts | 5 + .../members_enrichment_worker/src/main.ts | 7 +- .../src/schedules.ts | 7 +- .../src/schedules/getMembersToEnrich.ts | 38 +- .../src/sources/clearbit/service.ts | 6 +- .../src/sources/crustdata/service.ts | 13 +- .../src/sources/crustdata/types.ts | 2 +- .../progai-linkedin-scraper/service.ts | 1 + .../src/sources/serp/service.ts | 2 +- .../members_enrichment_worker/src/types.ts | 2 +- .../src/utils/common.ts | 8 + .../src/workflows.ts | 2 + .../src/workflows/enrichMember.ts | 2 +- .../src/workflows/getMembersToEnrich.ts | 60 +- ...efreshMemberEnrichmentMaterializedViews.ts | 15 + .../members_enrichment_worker/index.ts | 18 +- services/libs/data-access-layer/src/utils.ts | 9 + services/libs/types/src/enums/enrichment.ts | 10 + 20 files changed, 777 insertions(+), 51 deletions(-) create mode 100644 backend/src/database/migrations/R__memberEnrichmentMaterializedViews.sql create mode 100644 services/apps/premium/members_enrichment_worker/src/workflows/refreshMemberEnrichmentMaterializedViews.ts diff --git a/backend/src/database/migrations/R__memberEnrichmentMaterializedViews.sql b/backend/src/database/migrations/R__memberEnrichmentMaterializedViews.sql new file mode 100644 index 0000000000..e87c73cc05 --- /dev/null +++ b/backend/src/database/migrations/R__memberEnrichmentMaterializedViews.sql @@ -0,0 +1,619 @@ +-- global member activity counts +drop materialized view if exists "membersGlobalActivityCount"; +create materialized view "membersGlobalActivityCount" as +select msa."memberId", + sum(msa."activityCount") AS total_count +from "memberSegmentsAgg" msa +where msa."segmentId" IN ( + select id + from segments + where "grandparentId" is not null + and "parentId" is not null + ) +group by msa."memberId" +order by sum(msa."activityCount") desc; +create unique index ix_member_global_activity_count_member_id on "membersGlobalActivityCount" ("memberId"); +create index ix_member_global_activity_count on "membersGlobalActivityCount" (total_count); + +-- member enrichment monitoring (total) +drop materialized view if exists "memberEnrichmentMonitoringTotal"; +create materialized view "memberEnrichmentMonitoringTotal" as +with all_members as ( + select count(*) as count from members +), +total_enrichable_members as ( + with enrichable_in_at_least_one_source as ( + select mem.id + from members mem + inner join "memberIdentities" mi on mem.id = mi."memberId" and mi.verified + left join "membersGlobalActivityCount" on "membersGlobalActivityCount"."memberId" = mem.id + left join "memberEnrichmentCache" mec on mec."memberId" = mem.id + where ( + (mi.verified and + ((mi.type = 'username' AND mi.platform = 'github') OR (mi.type = 'email'))) + OR + ("membersGlobalActivityCount".total_count > 10 AND mi.type = 'email' and mi.verified) + OR + ( + ("membersGlobalActivityCount".total_count > 500) AND + (mem."displayName" like '% %') AND + (mem.attributes -> 'location' ->> 'default' is not null and + mem.attributes -> 'location' ->> 'default' <> '') AND + ( + (mem.attributes -> 'websiteUrl' ->> 'default' is not null and + mem.attributes -> 'websiteUrl' ->> 'default' <> '') OR + (mi.verified AND mi.type = 'username' and mi.platform = 'github') OR + (mi.verified AND mi.type = 'email') + ) + ) + OR + ((mi.verified AND mi.type = 'username' and mi.platform = 'linkedin')) + ) + group by mem.id + order by mem.id desc) + select count(*) as count from enrichable_in_at_least_one_source + ), + attempted_to_enrich_among_enrichable_members as ( + with enrichable_by_clearbit as ( + select distinct mec."memberId" + from "memberEnrichmentCache" mec + where mec.source = 'clearbit' + and mec."memberId" in ( + select distinct mi."memberId" + from "memberIdentities" mi + where mi.verified and mi.type = 'email' + ) + ), + enrichable_by_progai as ( + select distinct mec."memberId" + from "memberEnrichmentCache" mec + where mec.source = 'progai' + and mec."memberId" in ( + select distinct mi."memberId" + from "memberIdentities" mi + where mi.verified + and mi.type = 'username' + and mi.platform = 'github' + ) + ), + enrichable_by_serp as ( + select distinct mec."memberId" + from "memberEnrichmentCache" mec + where mec.source = 'serp' + and mec."memberId" in ( + select distinct mem.id as "memberId" + from members mem + inner join "memberIdentities" mi on mem.id = mi."memberId" and mi.verified + inner join "membersGlobalActivityCount" + on "membersGlobalActivityCount"."memberId" = mem.id + where + ("membersGlobalActivityCount".total_count > 500 and + (mem."displayName" like '% %') and + (mem.attributes -> 'location' ->> 'default' is not null and + mem.attributes -> 'location' ->> 'default' <> '') and + ((mem.attributes -> 'websiteUrl' ->> 'default' is not null and + mem.attributes -> 'websiteUrl' ->> 'default' <> '') OR + (mi.verified and mi.type = 'username' and mi.platform = 'github') OR + (mi.verified and mi.type = 'email'))) + group by mem.id + order by mem.id desc + ) + ), + enrichable_by_progai_linkedin_scraper as ( + with clearbit_linkedin_profiles as ( + select distinct mec."memberId" + from "memberEnrichmentCache" mec + where mec.source = 'clearbit' + and mec.data is not null + and mec.data -> 'linkedin' ->> 'handle' is not null), + progai_linkedin_profiles as ( + select distinct mec."memberId" + from "memberEnrichmentCache" mec + where mec.source = 'progai' + and mec.data is not null + and mec.data ->> 'linkedin_url' is not null), + serp_linkedin_profiles as ( + select distinct mec."memberId" + from "memberEnrichmentCache" mec + where mec.source = 'serp' + and mec.data is not null + ), + existing_verified_linkedin_identities as ( + select distinct mi."memberId" + from "memberIdentities" mi + where mi.platform = 'linkedin' and mi.verified + group by mi."memberId"), + all_unique_members_enrichable_by_progai_linkedin_scraper as ( + select "memberId" from clearbit_linkedin_profiles + union + select "memberId" from progai_linkedin_profiles + union + select "memberId" from serp_linkedin_profiles + union + select "memberId" from existing_verified_linkedin_identities) + select distinct "memberId" + from all_unique_members_enrichable_by_progai_linkedin_scraper + ), + enrichable_by_crustdata_linkedin_scraper as ( + with clearbit_linkedin_profiles as ( + select distinct mec."memberId" + from "memberEnrichmentCache" mec + left join "membersGlobalActivityCount" on "membersGlobalActivityCount"."memberId" = mec."memberId" + where "membersGlobalActivityCount".total_count > 1000 + and mec.source = 'clearbit' + and mec.data is not null + and mec.data -> 'linkedin' ->> 'handle' is not null), + progai_linkedin_profiles as ( + select distinct mec."memberId" + from "memberEnrichmentCache" mec + left join "membersGlobalActivityCount" on "membersGlobalActivityCount"."memberId" = mec."memberId" + where "membersGlobalActivityCount".total_count > 1000 + and mec.source = 'progai' + and mec.data is not null + and mec.data ->> 'linkedin_url' is not null), + serp_linkedin_profiles as ( + select distinct mec."memberId" + from "memberEnrichmentCache" mec + left join "membersGlobalActivityCount" on "membersGlobalActivityCount"."memberId" = mec."memberId" + where "membersGlobalActivityCount".total_count > 1000 + and mec.source = 'serp' + and mec.data is not null), + existing_verified_linkedin_identities as ( + select distinct mi."memberId" + from "memberIdentities" mi + left join "membersGlobalActivityCount" on "membersGlobalActivityCount"."memberId" = mi."memberId" + where "membersGlobalActivityCount".total_count > 1000 + and mi.verified + and mi.platform = 'linkedin' + group by mi."memberId"), + all_unique_members_enrichable_by_crustdata as ( + select "memberId" from clearbit_linkedin_profiles + union + select "memberId" from progai_linkedin_profiles + union + select "memberId" from serp_linkedin_profiles + union + select "memberId" from existing_verified_linkedin_identities) + select distinct "memberId" from all_unique_members_enrichable_by_crustdata + ), + unique_members as ( + select "memberId" from enrichable_by_clearbit + union + select "memberId" from enrichable_by_progai + union + select "memberId" from enrichable_by_serp + union + select "memberId" from enrichable_by_progai_linkedin_scraper + union + select "memberId" from enrichable_by_crustdata_linkedin_scraper) + select count(distinct "memberId") as count + from unique_members) + SELECT (SELECT count FROM all_members) as "totalMembers", + (SELECT count FROM total_enrichable_members) as "enrichableMembers", + (SELECT count FROM attempted_to_enrich_among_enrichable_members) as "attemptedToEnrich"; + + +-- member enrichment monitoring (clearbit) +drop materialized view if exists "memberEnrichmentMonitoringClearbit"; +create materialized view "memberEnrichmentMonitoringClearbit" as +with clearbit_enrichable_members as ( + select count(distinct mi."memberId") as count + from "memberIdentities" mi + left join "membersGlobalActivityCount" on "membersGlobalActivityCount"."memberId" = mi."memberId" + where mi.verified and mi.type = 'email' and "membersGlobalActivityCount".total_count > 10 +), +attempted_to_enrich_among_clearbit_enrichable_members as ( + select count(distinct mec."memberId") as count + from "memberEnrichmentCache" mec + where mec."memberId" in ( + select distinct mi."memberId" + from "memberIdentities" mi + left join "membersGlobalActivityCount" on "membersGlobalActivityCount"."memberId" = mi."memberId" + where mi.verified and mi.type = 'email' and "membersGlobalActivityCount".total_count > 10 + ) +), +clearbit_hit_count_among_attempted as ( + select count(*) as count + from "memberEnrichmentCache" mec + where mec.data is not null + and mec.source = 'clearbit' + and mec."memberId" in ( + select distinct mi."memberId" + from "memberIdentities" mi + left join "membersGlobalActivityCount" on "membersGlobalActivityCount"."memberId" = mi."memberId" + where mi.verified and mi.type = 'email' and "membersGlobalActivityCount".total_count > 10 + ) +) +select + (select count from clearbit_enrichable_members) as "enrichableMembers", + (select count from attempted_to_enrich_among_clearbit_enrichable_members) as "attemptedToEnrich", + case when (select count from clearbit_enrichable_members)::numeric = 0 then 0 else + (round((select count from attempted_to_enrich_among_clearbit_enrichable_members)::numeric / + (select count from clearbit_enrichable_members)::numeric * 100, 2)) end as progress, + ((select count from clearbit_hit_count_among_attempted)) as "hitCount", + case when (select count from attempted_to_enrich_among_clearbit_enrichable_members)::numeric = 0 then 0 else + (round((select count from clearbit_hit_count_among_attempted)::numeric / + (select count from attempted_to_enrich_among_clearbit_enrichable_members)::numeric * 100, + 2)) end as "hitRate"; + + +-- member enrichment monitoring (crustdata) +drop materialized view if exists "memberEnrichmentMonitoringCrustdata"; +create materialized view "memberEnrichmentMonitoringCrustdata" as +with crustdata_members_with_scrapable_profiles AS ( + with clearbit_linkedin_profiles as ( + select distinct mec."memberId" + from "memberEnrichmentCache" mec + left join "membersGlobalActivityCount" on "membersGlobalActivityCount"."memberId" = mec."memberId" + where "membersGlobalActivityCount".total_count > 1000 + and mec.source = 'clearbit' + and mec.data is not null + and mec.data -> 'linkedin' ->> 'handle' is not null), + progai_linkedin_profiles as ( + select distinct mec."memberId" + from "memberEnrichmentCache" mec + left join "membersGlobalActivityCount" on "membersGlobalActivityCount"."memberId" = mec."memberId" + where "membersGlobalActivityCount".total_count > 1000 + and mec.source = 'progai' + and mec.data is not null + and mec.data ->> 'linkedin_url' is not null), + serp_linkedin_profiles as ( + select distinct mec."memberId" + from "memberEnrichmentCache" mec + left join "membersGlobalActivityCount" on "membersGlobalActivityCount"."memberId" = mec."memberId" + where "membersGlobalActivityCount".total_count > 1000 + and mec.source = 'serp' + and mec.data is not null), + existing_verified_linkedin_identities as ( + select distinct mi."memberId" + from "memberIdentities" mi + left join "membersGlobalActivityCount" on "membersGlobalActivityCount"."memberId" = mi."memberId" + where "membersGlobalActivityCount".total_count > 1000 + and mi.verified + and mi.platform = 'linkedin' + group by mi."memberId"), + unique_members AS ( + select "memberId" from clearbit_linkedin_profiles + union + select "memberId" from progai_linkedin_profiles + union + select "memberId" from serp_linkedin_profiles + union + select "memberId" from existing_verified_linkedin_identities) +select count(distinct "memberId") as count +from unique_members), + crustdata_attempted_to_scrape_among_scrapable_profiles as ( + select count(*) as count + from "memberEnrichmentCache" mec + where mec.source = 'crustdata' + and mec."memberId" in ( + with clearbit_linkedin_profiles as ( + select distinct mec."memberId" + from "memberEnrichmentCache" mec + left join "membersGlobalActivityCount" on "membersGlobalActivityCount"."memberId" = mec."memberId" + where "membersGlobalActivityCount".total_count > 1000 + and mec.source = 'clearbit' + and mec.data is not null + and mec.data -> 'linkedin' ->> 'handle' is not null), + progai_linkedin_profiles as ( + select distinct mec."memberId" + from "memberEnrichmentCache" mec + left join "membersGlobalActivityCount" on "membersGlobalActivityCount"."memberId" = mec."memberId" + where "membersGlobalActivityCount".total_count > 1000 + and mec.source = 'progai' + and mec.data is not null + and mec.data ->> 'linkedin_url' is not null), + serp_linkedin_profiles as ( + select distinct mec."memberId" + from "memberEnrichmentCache" mec + left join "membersGlobalActivityCount" on "membersGlobalActivityCount"."memberId" = mec."memberId" + where "membersGlobalActivityCount".total_count > 1000 + and mec.source = 'serp' + and mec.data is not null), + existing_verified_linkedin_identities as ( + select distinct mi."memberId" + from "memberIdentities" mi + left join "membersGlobalActivityCount" on "membersGlobalActivityCount"."memberId" = mi."memberId" + where mi.verified + and mi.platform = 'linkedin' + group by mi."memberId"), + unique_members as ( + select "memberId" from clearbit_linkedin_profiles + union + select "memberId" from progai_linkedin_profiles + union + select "memberId" from serp_linkedin_profiles + union + select "memberId" from existing_verified_linkedin_identities) + select distinct "memberId" from unique_members)), + crustdata_hit_count_among_attempted as ( + select count(*) as count + from "memberEnrichmentCache" mec + where mec.source = 'crustdata' + and mec.data is not null + and mec."memberId" in ( + with clearbit_linkedin_profiles as ( + select distinct mec."memberId" + from "memberEnrichmentCache" mec + left join "membersGlobalActivityCount" on "membersGlobalActivityCount"."memberId" = mec."memberId" + where "membersGlobalActivityCount".total_count > 1000 + and mec.source = 'clearbit' + and mec.data is not null + and mec.data -> 'linkedin' ->> 'handle' is not null), + progai_linkedin_profiles as ( + select distinct mec."memberId" + from "memberEnrichmentCache" mec + left join "membersGlobalActivityCount" on "membersGlobalActivityCount"."memberId" = mec."memberId" + where "membersGlobalActivityCount".total_count > 1000 + and mec.source = 'progai' + and mec.data is not null + and mec.data ->> 'linkedin_url' is not null), + serp_linkedin_profiles as ( + select distinct mec."memberId" + from "memberEnrichmentCache" mec + left join "membersGlobalActivityCount" on "membersGlobalActivityCount"."memberId" = mec."memberId" + where "membersGlobalActivityCount".total_count > 1000 + and mec.source = 'serp' + and mec.data is not null), + existing_verified_linkedin_identities as ( + select distinct mi."memberId" + from "memberIdentities" mi + left join "membersGlobalActivityCount" on "membersGlobalActivityCount"."memberId" = mi."memberId" + where mi.verified + and mi.platform = 'linkedin' + group by mi."memberId"), + unique_members as ( + select "memberId" from clearbit_linkedin_profiles + union + select "memberId" from progai_linkedin_profiles + union + select "memberId" from serp_linkedin_profiles + union + select "memberId" from existing_verified_linkedin_identities) +select distinct "memberId" from unique_members)) +select + (select count from crustdata_members_with_scrapable_profiles) as "membersWithScrapableProfiles", + (select count from crustdata_attempted_to_scrape_among_scrapable_profiles) as "attemptedToScrape", + case when (select count from crustdata_members_with_scrapable_profiles)::numeric = 0 then 0 else + (round((select count from crustdata_attempted_to_scrape_among_scrapable_profiles)::numeric / + (select count from crustdata_members_with_scrapable_profiles)::numeric * 100, 2)) end as progress, + ((select count from crustdata_hit_count_among_attempted)) as "hitCount", + case when (select count from crustdata_attempted_to_scrape_among_scrapable_profiles)::numeric = 0 then 0 else + (round((select count from crustdata_hit_count_among_attempted)::numeric / + (select count from crustdata_attempted_to_scrape_among_scrapable_profiles)::numeric * 100, + 2)) end as "hitRate"; + +-- member enrichment monitoring (progai-github) +drop materialized view if exists "memberEnrichmentMonitoringProgaiGithub"; +create materialized view "memberEnrichmentMonitoringProgaiGithub" as +with progai_enrichable_members as ( + select count(distinct mi."memberId") as count + from "memberIdentities" mi + where mi.verified + and mi.type = 'username' and mi.platform = 'github' +), +attempted_to_enrich_among_progai_enrichable_members as ( + select count(distinct mec."memberId") as count + from "memberEnrichmentCache" mec + where mec."memberId" in ( + select distinct mi."memberId" + from "memberIdentities" mi + where mi.verified + and mi.type = 'username' and mi.platform = 'github' + ) +), +progai_hit_count_among_attempted as ( + select count(*) as count + from "memberEnrichmentCache" mec + where mec.data is not null and mec.source = 'progai' and mec."memberId" in ( + select distinct mi."memberId" + from "memberIdentities" mi + where mi.verified + and mi.type = 'username' and mi.platform = 'github' + ) +) +select + (select count from progai_enrichable_members) as "enrichableMembers", + (select count from attempted_to_enrich_among_progai_enrichable_members) as "attemptedToEnrich", + round((select count from attempted_to_enrich_among_progai_enrichable_members)::numeric / (select count from progai_enrichable_members)::numeric * 100, 2) as progress, + (select count from progai_hit_count_among_attempted) as "hitCount", + round((select count from progai_hit_count_among_attempted)::numeric / (select count from attempted_to_enrich_among_progai_enrichable_members)::numeric * 100, 2) as "hitRate"; + +-- member enrichment monitoring (progai-linkedin) +drop materialized view if exists "memberEnrichmentMonitoringProgaiLinkedin"; +create materialized view "memberEnrichmentMonitoringProgaiLinkedin" as +with progai_linkedin_members_with_scrapable_profiles as ( + with clearbit_linkedin_profiles as ( + select distinct mec."memberId" + from "memberEnrichmentCache" mec + where mec.source = 'clearbit' + and mec.data is not null + and mec.data->'linkedin'->>'handle' is not null + ), + progai_linkedin_profiles as ( + select distinct mec."memberId" + from "memberEnrichmentCache" mec + where mec.source = 'progai' + and mec.data is not null + and mec.data->>'linkedin_url' is not null + ), + serp_linkedin_profiles as ( + select distinct mec."memberId" + from "memberEnrichmentCache" mec + where mec.source = 'serp' + and mec.data is not null + ), + existing_verified_linkedin_identities as ( + select distinct mi."memberId" + from "memberIdentities" mi + where mi.platform = 'linkedin' + and mi.verified + group by mi."memberId" + ), + unique_members as ( + select "memberId" from clearbit_linkedin_profiles + union + select "memberId" from progai_linkedin_profiles + union + select "memberId" from serp_linkedin_profiles + union + select "memberId" from existing_verified_linkedin_identities + ) + select count(distinct "memberId") as count from unique_members +), +progai_linkedin_attempted_to_scrape_among_scrapable_profiles as ( + select count(*) as count from "memberEnrichmentCache" mec where mec.source= 'progai-linkedin-scraper' + and mec."memberId" in ( + with clearbit_linkedin_profiles as ( + select distinct mec."memberId" + from "memberEnrichmentCache" mec + where mec.source = 'clearbit' + and mec.data is not null + and mec.data->'linkedin'->>'handle' is not null + ), + progai_linkedin_profiles as ( + select distinct mec."memberId" + from "memberEnrichmentCache" mec + where mec.source = 'progai' + and mec.data is not null + and mec.data->>'linkedin_url' is not null + ), + serp_linkedin_profiles as ( + select distinct mec."memberId" + from "memberEnrichmentCache" mec + where mec.source = 'serp' + and mec.data is not null + ), + existing_verified_linkedin_identities as ( + select distinct mi."memberId" + from "memberIdentities" mi + where mi.platform = 'linkedin' + and mi.verified + group by mi."memberId" + ), + unique_members as ( + select "memberId" from clearbit_linkedin_profiles + union + select "memberId" from progai_linkedin_profiles + union + select "memberId" from serp_linkedin_profiles + union + select "memberId" from existing_verified_linkedin_identities + ) + select distinct "memberId" from unique_members + ) +), +progai_linkedin_scraper_hit_count_among_attempted as ( + select count(*) as count from "memberEnrichmentCache" mec where mec.source= 'progai-linkedin-scraper' + and mec.data is not null and mec."memberId" in ( + with clearbit_linkedin_profiles as ( + select distinct mec."memberId" + from "memberEnrichmentCache" mec + where mec.source = 'clearbit' + and mec.data is not null + and mec.data->'linkedin'->>'handle' is not null + ), + progai_linkedin_profiles as ( + select distinct mec."memberId" + from "memberEnrichmentCache" mec + where mec.source = 'progai' + and mec.data is not null + and mec.data->>'linkedin_url' is not null + ), + serp_linkedin_profiles as ( + select distinct mec."memberId" + from "memberEnrichmentCache" mec + where mec.source = 'serp' and mec.data is not null + ), + existing_verified_linkedin_identities as ( + select distinct mi."memberId" + from "memberIdentities" mi + where mi.platform = 'linkedin' + and mi.verified + group by mi."memberId" + ), + unique_members as ( + select "memberId" from clearbit_linkedin_profiles + union + select "memberId" from progai_linkedin_profiles + union + select "memberId" from serp_linkedin_profiles + union + select "memberId" from existing_verified_linkedin_identities + ) + select distinct "memberId" from unique_members + ) +) +select + (select count from progai_linkedin_members_with_scrapable_profiles) as "membersWithScrapableProfiles", + (select count from progai_linkedin_attempted_to_scrape_among_scrapable_profiles) as "attemptedToScrape", + case when (select count from progai_linkedin_members_with_scrapable_profiles)::numeric = 0 then 0 else + (round((select count from progai_linkedin_attempted_to_scrape_among_scrapable_profiles)::numeric / + (select count from progai_linkedin_members_with_scrapable_profiles)::numeric * 100, 2)) end as progress, + ((select count from progai_linkedin_scraper_hit_count_among_attempted)) as "hitCount", + case when (select count from progai_linkedin_attempted_to_scrape_among_scrapable_profiles)::numeric = 0 then 0 else + (round((select count from progai_linkedin_scraper_hit_count_among_attempted)::numeric / + (select count from progai_linkedin_attempted_to_scrape_among_scrapable_profiles)::numeric * 100, + 2)) end as "hitRate"; + +-- member enrichment monitoring (serp) +drop materialized view if exists "memberEnrichmentMonitoringSerp"; +create materialized view "memberEnrichmentMonitoringSerp" as +with serp_enrichable_members as ( + select count(distinct mem.id) as count from members mem + join "membersGlobalActivityCount" on "membersGlobalActivityCount"."memberId" = mem.id + join "memberIdentities" mi on mi."memberId" = mem.id + where + ("membersGlobalActivityCount".total_count > 500) and + (mem."displayName" like '% %') and + (mem.attributes->'location'->>'default' is not null and mem.attributes->'location'->>'default' <> '') and + ((mem.attributes->'websiteUrl'->>'default' is not null and mem.attributes->'websiteUrl'->>'default' <> '') or + (mi.verified and mi.type = 'username' and mi.platform = 'github') or + (mi.verified and mi.type = 'email') + ) +), +attempted_to_enrich_among_serp_enrichable_members as ( + select count(*) as count + from "memberEnrichmentCache" mec + where mec.source = 'serp' and mec."memberId" in ( + select distinct mem.id + from members mem + join "membersGlobalActivityCount" on "membersGlobalActivityCount"."memberId" = mem.id + join "memberIdentities" mi on mi."memberId" = mem.id + where + ("membersGlobalActivityCount".total_count > 500) and + (mem."displayName" like '% %') and + (mem.attributes->'location'->>'default' is not null and mem.attributes->'location'->>'default' <> '') and + ((mem.attributes->'websiteUrl'->>'default' is not null and mem.attributes->'websiteUrl'->>'default' <> '') or + (mi.verified and mi.type = 'username' and mi.platform = 'github') or + (mi.verified and mi.type = 'email')) + ) +), +serp_hit_count_among_attempted as ( + select count(*) as count + from "memberEnrichmentCache" mec + where mec.data is not null + and mec.source = 'serp' + and mec."memberId" in ( + select distinct mem.id from members mem + join "membersGlobalActivityCount" on "membersGlobalActivityCount"."memberId" = mem.id + join "memberIdentities" mi on mi."memberId" = mem.id + where + ("membersGlobalActivityCount".total_count > 500) and + (mem."displayName" like '% %') and + (mem.attributes->'location'->>'default' is not null and mem.attributes->'location'->>'default' <> '') and + ((mem.attributes->'websiteUrl'->>'default' is not null and mem.attributes->'websiteUrl'->>'default' <> '') or + (mi.verified and mi.type = 'username' and mi.platform = 'github') or + (mi.verified and mi.type = 'email') + ) + ) +) +select + (select count from serp_enrichable_members) as "enrichableMembers", + (select count from attempted_to_enrich_among_serp_enrichable_members) as "attemptedToEnrich", + case when (select count from serp_enrichable_members)::numeric = 0 then 0 else + round((select count from attempted_to_enrich_among_serp_enrichable_members)::numeric / (select count from serp_enrichable_members)::numeric * 100, 2) end as progress, + (select count from serp_hit_count_among_attempted) as "hitCount", + case when (select count from attempted_to_enrich_among_serp_enrichable_members)::numeric = 0 then 0 else + round((select count from serp_hit_count_among_attempted)::numeric / (select count from attempted_to_enrich_among_serp_enrichable_members)::numeric * 100, 2) end as "hitRate"; + diff --git a/services/apps/premium/members_enrichment_worker/src/activities.ts b/services/apps/premium/members_enrichment_worker/src/activities.ts index 98d7c43e9d..addb2befb5 100644 --- a/services/apps/premium/members_enrichment_worker/src/activities.ts +++ b/services/apps/premium/members_enrichment_worker/src/activities.ts @@ -6,6 +6,7 @@ import { isCacheObsolete, isEnrichableBySource, normalizeEnrichmentData, + refreshMemberEnrichmentMaterializedView, touchMemberEnrichmentCacheUpdatedAt, updateMemberEnrichmentCache, } from './activities/enrichment' @@ -52,4 +53,5 @@ export { updateMemberEnrichmentCache, isEnrichableBySource, findMemberIdentityWithTheMostActivityInPlatform, + refreshMemberEnrichmentMaterializedView, } diff --git a/services/apps/premium/members_enrichment_worker/src/activities/enrichment.ts b/services/apps/premium/members_enrichment_worker/src/activities/enrichment.ts index 2700e63e27..d19031f52b 100644 --- a/services/apps/premium/members_enrichment_worker/src/activities/enrichment.ts +++ b/services/apps/premium/members_enrichment_worker/src/activities/enrichment.ts @@ -6,6 +6,7 @@ import { touchMemberEnrichmentCacheUpdatedAtDb, updateMemberEnrichmentCacheDb, } from '@crowd/data-access-layer/src/old/apps/premium/members_enrichment_worker' +import { refreshMaterializedView } from '@crowd/data-access-layer/src/utils' import { RedisCache } from '@crowd/redis' import { IEnrichableMemberIdentityActivityAggregate, @@ -136,3 +137,7 @@ export async function findMemberIdentityWithTheMostActivityInPlatform( ): Promise { return findMemberIdentityWithTheMostActivityInPlatformQuestDb(svc.questdbSQL, memberId, platform) } + +export async function refreshMemberEnrichmentMaterializedView(mvName: string): Promise { + await refreshMaterializedView(svc.postgres.writer.connection(), mvName) +} diff --git a/services/apps/premium/members_enrichment_worker/src/main.ts b/services/apps/premium/members_enrichment_worker/src/main.ts index 7603252158..f9cc0f6282 100644 --- a/services/apps/premium/members_enrichment_worker/src/main.ts +++ b/services/apps/premium/members_enrichment_worker/src/main.ts @@ -2,7 +2,11 @@ import { Config } from '@crowd/archetype-standard' import { Options, ServiceWorker } from '@crowd/archetype-worker' import { Edition } from '@crowd/types' -import { scheduleMembersEnrichment, scheduleMembersLFIDEnrichment } from './schedules' +import { + scheduleMembersEnrichment, + scheduleMembersLFIDEnrichment, + scheduleRefreshMembersEnrichmentMaterializedViews, +} from './schedules' const config: Config = { envvars: [ @@ -43,6 +47,7 @@ export const svc = new ServiceWorker(config, options) setImmediate(async () => { await svc.init() + await scheduleRefreshMembersEnrichmentMaterializedViews() await scheduleMembersEnrichment() if (process.env['CROWD_EDITION'] === Edition.LFX) { diff --git a/services/apps/premium/members_enrichment_worker/src/schedules.ts b/services/apps/premium/members_enrichment_worker/src/schedules.ts index 07cfb70c1e..5a18490766 100644 --- a/services/apps/premium/members_enrichment_worker/src/schedules.ts +++ b/services/apps/premium/members_enrichment_worker/src/schedules.ts @@ -1,6 +1,11 @@ import { scheduleMembersEnrichment, scheduleMembersLFIDEnrichment, + scheduleRefreshMembersEnrichmentMaterializedViews, } from './schedules/getMembersToEnrich' -export { scheduleMembersEnrichment, scheduleMembersLFIDEnrichment } +export { + scheduleMembersEnrichment, + scheduleMembersLFIDEnrichment, + scheduleRefreshMembersEnrichmentMaterializedViews, +} diff --git a/services/apps/premium/members_enrichment_worker/src/schedules/getMembersToEnrich.ts b/services/apps/premium/members_enrichment_worker/src/schedules/getMembersToEnrich.ts index e318fce93a..948fc69400 100644 --- a/services/apps/premium/members_enrichment_worker/src/schedules/getMembersToEnrich.ts +++ b/services/apps/premium/members_enrichment_worker/src/schedules/getMembersToEnrich.ts @@ -3,7 +3,11 @@ import { ScheduleAlreadyRunning, ScheduleOverlapPolicy } from '@temporalio/clien import { IS_DEV_ENV, IS_TEST_ENV } from '@crowd/common' import { svc } from '../main' -import { getMembersForLFIDEnrichment, getMembersToEnrich } from '../workflows' +import { + getMembersForLFIDEnrichment, + getMembersToEnrich, + refreshMemberEnrichmentMaterializedViews, +} from '../workflows' export const scheduleMembersEnrichment = async () => { try { @@ -42,6 +46,38 @@ export const scheduleMembersEnrichment = async () => { } } +export const scheduleRefreshMembersEnrichmentMaterializedViews = async () => { + try { + await svc.temporal.schedule.create({ + scheduleId: 'refresh-members-enrichment-materialized-views', + spec: { + cronExpressions: ['0 5 * * *'], + }, + policies: { + overlap: ScheduleOverlapPolicy.SKIP, + catchupWindow: '1 minute', + }, + action: { + type: 'startWorkflow', + workflowType: refreshMemberEnrichmentMaterializedViews, + taskQueue: 'members-enrichment', + retry: { + initialInterval: '15 seconds', + backoffCoefficient: 2, + maximumAttempts: 3, + }, + }, + }) + } catch (err) { + if (err instanceof ScheduleAlreadyRunning) { + svc.log.info('Schedule already registered in Temporal.') + svc.log.info('Configuration may have changed since. Please make sure they are in sync.') + } else { + throw new Error(err) + } + } +} + export const scheduleMembersLFIDEnrichment = async () => { try { await svc.temporal.schedule.create({ diff --git a/services/apps/premium/members_enrichment_worker/src/sources/clearbit/service.ts b/services/apps/premium/members_enrichment_worker/src/sources/clearbit/service.ts index 6d5884b84b..7cfbb25df6 100644 --- a/services/apps/premium/members_enrichment_worker/src/sources/clearbit/service.ts +++ b/services/apps/premium/members_enrichment_worker/src/sources/clearbit/service.ts @@ -29,7 +29,7 @@ export default class EnrichmentServiceClearbit extends LoggerBase implements IEn public platform = `enrichment-${this.source}` public enrichMembersWithActivityMoreThan = 10 - public enrichableBySql = `"activitySummary".total_count > ${this.enrichMembersWithActivityMoreThan} AND mi.type = 'email' and mi.verified` + public enrichableBySql = `"membersGlobalActivityCount".total_count > ${this.enrichMembersWithActivityMoreThan} AND mi.type = 'email' and mi.verified` // bust cache after 120 days public cacheObsoleteAfterSeconds = 60 * 60 * 24 * 120 @@ -172,7 +172,9 @@ export default class EnrichmentServiceClearbit extends LoggerBase implements IEn if (data.linkedin?.handle) { normalized = normalizeSocialIdentity( { - handle: data.linkedin.handle.split('/').pop(), + handle: data.linkedin.handle.endsWith('/') + ? data.linkedin.handle.slice(0, -1).split('/').pop() + : data.linkedin.handle.split('/').pop(), platform: PlatformType.LINKEDIN, }, MemberIdentityType.USERNAME, diff --git a/services/apps/premium/members_enrichment_worker/src/sources/crustdata/service.ts b/services/apps/premium/members_enrichment_worker/src/sources/crustdata/service.ts index 6e2e20808e..80aa3abebb 100644 --- a/services/apps/premium/members_enrichment_worker/src/sources/crustdata/service.ts +++ b/services/apps/premium/members_enrichment_worker/src/sources/crustdata/service.ts @@ -43,7 +43,7 @@ export default class EnrichmentServiceCrustdata extends LoggerBase implements IE public enrichMembersWithActivityMoreThan = 1000 - public enrichableBySql = `("activitySummary".total_count > ${this.enrichMembersWithActivityMoreThan}) AND mi.verified AND mi.type = 'username' and mi.platform = 'linkedin'` + public enrichableBySql = `("membersGlobalActivityCount".total_count > ${this.enrichMembersWithActivityMoreThan}) AND mi.verified AND mi.type = 'username' and mi.platform = 'linkedin'` public cacheObsoleteAfterSeconds = 60 * 60 * 24 * 90 @@ -216,6 +216,7 @@ export default class EnrichmentServiceCrustdata extends LoggerBase implements IE if (!linkedinUrlHashmap.get(input.linkedin.value)) { consumableIdentities.push({ ...input.linkedin, + value: input.linkedin.value.replace(/\//g, ''), repeatedTimesInDifferentSources: 1, isFromVerifiedSource: true, }) @@ -279,7 +280,15 @@ export default class EnrichmentServiceCrustdata extends LoggerBase implements IE } if (data.email) { - for (const email of data.email.split(',').filter(isEmail)) { + let emails: string[] + + if (Array.isArray(data.email)) { + emails = data.email + } else { + emails = data.email.split(',').filter(isEmail) + } + + for (const email of emails) { normalized.identities.push({ type: MemberIdentityType.EMAIL, platform: this.platform, diff --git a/services/apps/premium/members_enrichment_worker/src/sources/crustdata/types.ts b/services/apps/premium/members_enrichment_worker/src/sources/crustdata/types.ts index 6fe2266261..7893df81d1 100644 --- a/services/apps/premium/members_enrichment_worker/src/sources/crustdata/types.ts +++ b/services/apps/premium/members_enrichment_worker/src/sources/crustdata/types.ts @@ -17,7 +17,7 @@ export interface IMemberEnrichmentDataCrustdata { linkedin_profile_url: string linkedin_flagship_url: string name: string - email: string + email: string | string[] title: string last_updated: string headline: string diff --git a/services/apps/premium/members_enrichment_worker/src/sources/progai-linkedin-scraper/service.ts b/services/apps/premium/members_enrichment_worker/src/sources/progai-linkedin-scraper/service.ts index f5c4d45f3a..b0b75f9733 100644 --- a/services/apps/premium/members_enrichment_worker/src/sources/progai-linkedin-scraper/service.ts +++ b/services/apps/premium/members_enrichment_worker/src/sources/progai-linkedin-scraper/service.ts @@ -164,6 +164,7 @@ export default class EnrichmentServiceProgAILinkedinScraper if (!linkedinUrlHashmap.get(input.linkedin.value)) { consumableIdentities.push({ ...input.linkedin, + value: input.linkedin.value.replace(/\//g, ''), repeatedTimesInDifferentSources: 1, isFromVerifiedSource: true, }) diff --git a/services/apps/premium/members_enrichment_worker/src/sources/serp/service.ts b/services/apps/premium/members_enrichment_worker/src/sources/serp/service.ts index bb3791a480..7559480dae 100644 --- a/services/apps/premium/members_enrichment_worker/src/sources/serp/service.ts +++ b/services/apps/premium/members_enrichment_worker/src/sources/serp/service.ts @@ -21,7 +21,7 @@ export default class EnrichmentServiceSerpApi extends LoggerBase implements IEnr public enrichMembersWithActivityMoreThan = 500 public enrichableBySql = ` - ("activitySummary".total_count > ${this.enrichMembersWithActivityMoreThan}) AND + ("membersGlobalActivityCount".total_count > ${this.enrichMembersWithActivityMoreThan}) AND (members."displayName" like '% %') AND (members.attributes->'location'->>'default' is not null and members.attributes->'location'->>'default' <> '') AND ((members.attributes->'websiteUrl'->>'default' is not null and members.attributes->'websiteUrl'->>'default' <> '') OR diff --git a/services/apps/premium/members_enrichment_worker/src/types.ts b/services/apps/premium/members_enrichment_worker/src/types.ts index fe706eacd2..e51bba2547 100644 --- a/services/apps/premium/members_enrichment_worker/src/types.ts +++ b/services/apps/premium/members_enrichment_worker/src/types.ts @@ -51,7 +51,7 @@ export interface IEnrichmentService { // SQL filter to get enrichable members for a source // members table is available as "members" alias // memberIdentities table is available as "mi" alias - // activity count is available in "activitySummary" alias, "activitySummary".total_count field + // activity count is available in "membersGlobalActivityCount" alias, "membersGlobalActivityCount".total_count field enrichableBySql: string // should either return the data or null if it's a miss diff --git a/services/apps/premium/members_enrichment_worker/src/utils/common.ts b/services/apps/premium/members_enrichment_worker/src/utils/common.ts index 88f59afb78..0a62747909 100644 --- a/services/apps/premium/members_enrichment_worker/src/utils/common.ts +++ b/services/apps/premium/members_enrichment_worker/src/utils/common.ts @@ -100,3 +100,11 @@ export function normalizeAttributes( return normalized } + +export function chunkArray(array: T[], chunkSize: number): T[][] { + const chunks = [] + for (let i = 0; i < array.length; i += chunkSize) { + chunks.push(array.slice(i, i + chunkSize)) + } + return chunks +} diff --git a/services/apps/premium/members_enrichment_worker/src/workflows.ts b/services/apps/premium/members_enrichment_worker/src/workflows.ts index 56b734f7a2..90875449a6 100644 --- a/services/apps/premium/members_enrichment_worker/src/workflows.ts +++ b/services/apps/premium/members_enrichment_worker/src/workflows.ts @@ -4,6 +4,7 @@ import { enrichMemberWithLFAuth0 } from './workflows/lf-auth0/enrichMemberWithLF import { findAndSaveGithubSourceIds } from './workflows/lf-auth0/findAndSaveGithubSourceIds' import { getEnrichmentData } from './workflows/lf-auth0/getEnrichmentData' import { getMembersForLFIDEnrichment } from './workflows/lf-auth0/getMembersForLFIDEnrichment' +import { refreshMemberEnrichmentMaterializedViews } from './workflows/refreshMemberEnrichmentMaterializedViews' export { getMembersToEnrich, @@ -12,4 +13,5 @@ export { enrichMemberWithLFAuth0, findAndSaveGithubSourceIds, getEnrichmentData, + refreshMemberEnrichmentMaterializedViews, } diff --git a/services/apps/premium/members_enrichment_worker/src/workflows/enrichMember.ts b/services/apps/premium/members_enrichment_worker/src/workflows/enrichMember.ts index 5295043c7c..90b24dfcf1 100644 --- a/services/apps/premium/members_enrichment_worker/src/workflows/enrichMember.ts +++ b/services/apps/premium/members_enrichment_worker/src/workflows/enrichMember.ts @@ -21,7 +21,7 @@ const { normalizeEnrichmentData, findMemberIdentityWithTheMostActivityInPlatform, } = proxyActivities({ - startToCloseTimeout: '20 seconds', + startToCloseTimeout: '1 minute', retry: { initialInterval: '5s', backoffCoefficient: 2.0, diff --git a/services/apps/premium/members_enrichment_worker/src/workflows/getMembersToEnrich.ts b/services/apps/premium/members_enrichment_worker/src/workflows/getMembersToEnrich.ts index 6439666ac5..b5255357d6 100644 --- a/services/apps/premium/members_enrichment_worker/src/workflows/getMembersToEnrich.ts +++ b/services/apps/premium/members_enrichment_worker/src/workflows/getMembersToEnrich.ts @@ -6,10 +6,11 @@ import { proxyActivities, } from '@temporalio/workflow' -import { MemberEnrichmentSource } from '@crowd/types' +import { IEnrichableMember, MemberEnrichmentSource } from '@crowd/types' import * as activities from '../activities/getMembers' import { IGetMembersForEnrichmentArgs } from '../types' +import { chunkArray } from '../utils/common' import { enrichMember } from './enrichMember' @@ -18,7 +19,8 @@ const { getEnrichableMembers } = proxyActivities({ }) export async function getMembersToEnrich(args: IGetMembersForEnrichmentArgs): Promise { - const MEMBER_ENRICHMENT_PER_RUN = 500 + const QUERY_FOR_ENRICHABLE_MEMBERS_PER_RUN = 1000 + const PARALLEL_ENRICHMENT_WORKFLOWS = 5 const afterCursor = args?.afterCursor || null const sources = [ MemberEnrichmentSource.PROGAI, @@ -28,37 +30,45 @@ export async function getMembersToEnrich(args: IGetMembersForEnrichmentArgs): Pr MemberEnrichmentSource.CRUSTDATA, ] - const members = await getEnrichableMembers(MEMBER_ENRICHMENT_PER_RUN, sources, afterCursor) + const members = await getEnrichableMembers( + QUERY_FOR_ENRICHABLE_MEMBERS_PER_RUN, + sources, + afterCursor, + ) if (members.length === 0) { return } - await Promise.all( - members.map((member) => { - return executeChild(enrichMember, { - workflowId: 'member-enrichment/' + member.tenantId + '/' + member.id, - cancellationType: ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED, - parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_REQUEST_CANCEL, - workflowExecutionTimeout: '15 minutes', - retry: { - backoffCoefficient: 2, - maximumAttempts: 10, - initialInterval: 2 * 1000, - maximumInterval: 30 * 1000, - }, - args: [member, sources], - searchAttributes: { - TenantId: [member.tenantId], - }, - }) - }), - ) + const chunks = chunkArray(members, PARALLEL_ENRICHMENT_WORKFLOWS) + + for (const chunk of chunks) { + await Promise.all( + chunk.map((member) => { + return executeChild(enrichMember, { + workflowId: 'member-enrichment/' + member.tenantId + '/' + member.id, + cancellationType: ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED, + parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_REQUEST_CANCEL, + workflowExecutionTimeout: '15 minutes', + retry: { + backoffCoefficient: 2, + maximumAttempts: 10, + initialInterval: 2 * 1000, + maximumInterval: 30 * 1000, + }, + args: [member, sources], + searchAttributes: { + TenantId: [member.tenantId], + }, + }) + }), + ) + } await continueAsNew({ afterCursor: { - memberId: members[members.length - 1].id, - activityCount: members[members.length - 1].activityCount, + memberId: chunks[chunks.length - 1][chunks[chunks.length - 1].length - 1].id, + activityCount: chunks[chunks.length - 1][chunks[chunks.length - 1].length - 1].activityCount, }, }) } diff --git a/services/apps/premium/members_enrichment_worker/src/workflows/refreshMemberEnrichmentMaterializedViews.ts b/services/apps/premium/members_enrichment_worker/src/workflows/refreshMemberEnrichmentMaterializedViews.ts new file mode 100644 index 0000000000..1fecb028ad --- /dev/null +++ b/services/apps/premium/members_enrichment_worker/src/workflows/refreshMemberEnrichmentMaterializedViews.ts @@ -0,0 +1,15 @@ +import { proxyActivities } from '@temporalio/workflow' + +import { MemberEnrichmentMaterializedView } from '@crowd/types' + +import * as activities from '../activities/enrichment' + +const { refreshMemberEnrichmentMaterializedView } = proxyActivities({ + startToCloseTimeout: '10 minutes', +}) + +export async function refreshMemberEnrichmentMaterializedViews(): Promise { + for (const mv of Object.values(MemberEnrichmentMaterializedView)) { + await refreshMemberEnrichmentMaterializedView(mv) + } +} diff --git a/services/libs/data-access-layer/src/old/apps/premium/members_enrichment_worker/index.ts b/services/libs/data-access-layer/src/old/apps/premium/members_enrichment_worker/index.ts index ec0df7b091..46d719e3f0 100644 --- a/services/libs/data-access-layer/src/old/apps/premium/members_enrichment_worker/index.ts +++ b/services/libs/data-access-layer/src/old/apps/premium/members_enrichment_worker/index.ts @@ -19,7 +19,7 @@ export async function fetchMembersForEnrichment( afterCursor: { activityCount: number; memberId: string } | null, ): Promise { const cursorFilter = afterCursor - ? `AND ((coalesce("activitySummary".total_count, 0) < $2) OR (coalesce("activitySummary".total_count, 0) = $2 AND members.id < $3))` + ? `AND ((coalesce("membersGlobalActivityCount".total_count, 0) < $2) OR (coalesce("membersGlobalActivityCount".total_count, 0) = $2 AND members.id < $3))` : '' const sourceInnerQueryItems = [] @@ -47,18 +47,6 @@ export async function fetchMembersForEnrichment( return db.connection().query( ` - WITH "activitySummary" AS ( - SELECT - msa."memberId", - SUM(msa."activityCount") AS total_count - FROM "memberSegmentsAgg" msa - WHERE msa."segmentId" IN ( - SELECT id - FROM segments - WHERE "grandparentId" IS NOT NULL AND "parentId" IS NOT NULL - ) - GROUP BY msa."memberId" - ) SELECT members."id", members."tenantId", @@ -73,11 +61,11 @@ export async function fetchMembersForEnrichment( 'verified', mi.verified ) ) AS identities, - MAX(coalesce("activitySummary".total_count, 0)) AS "activityCount" + MAX(coalesce("membersGlobalActivityCount".total_count, 0)) AS "activityCount" FROM members INNER JOIN tenants ON tenants.id = members."tenantId" INNER JOIN "memberIdentities" mi ON mi."memberId" = members.id - LEFT JOIN "activitySummary" ON "activitySummary"."memberId" = members.id + LEFT JOIN "membersGlobalActivityCount" ON "membersGlobalActivityCount"."memberId" = members.id WHERE ${enrichableBySqlJoined} AND tenants."deletedAt" IS NULL diff --git a/services/libs/data-access-layer/src/utils.ts b/services/libs/data-access-layer/src/utils.ts index 82319944ee..c0712121fa 100644 --- a/services/libs/data-access-layer/src/utils.ts +++ b/services/libs/data-access-layer/src/utils.ts @@ -1,6 +1,7 @@ import pgp from 'pg-promise' import { RawQueryParser } from '@crowd/common' +import { DbConnOrTx } from '@crowd/database' import { QueryFilter } from './query' import { QueryExecutor } from './queryExecutor' @@ -126,3 +127,11 @@ export async function queryTableById( return null } + +export async function refreshMaterializedView( + tx: DbConnOrTx, + mvName: string, + concurrently = false, +) { + await tx.query(`REFRESH MATERIALIZED VIEW ${concurrently ? 'concurrently' : ''} "${mvName}"`) +} diff --git a/services/libs/types/src/enums/enrichment.ts b/services/libs/types/src/enums/enrichment.ts index 7e46da08e7..a129eec4b8 100644 --- a/services/libs/types/src/enums/enrichment.ts +++ b/services/libs/types/src/enums/enrichment.ts @@ -5,3 +5,13 @@ export enum MemberEnrichmentSource { PROGAI_LINKEDIN_SCRAPER = 'progai-linkedin-scraper', CRUSTDATA = 'crustdata', } + +export enum MemberEnrichmentMaterializedView { + MEMBERS_GLOBAL_ACTIVITY_COUNT = 'membersGlobalActivityCount', + TOTAL_ENRICHMENT_ANALYSIS = 'memberEnrichmentMonitoringTotal', + PROGAI_GITHUB_ENRICHMENT_ANALYSIS = 'memberEnrichmentMonitoringProgaiGithub', + CLEARBIT_EMAIL_ENRICHMENT_ANALYSIS = 'memberEnrichmentMonitoringClearbit', + SERP_LINKEDIN_FINDER_ENRICHMENT_ANALYSIS = 'memberEnrichmentMonitoringSerp', + PROGAI_LINKEDIN_SCRAPER_ENRICHMENT_ANALYSIS = 'memberEnrichmentMonitoringProgaiLinkedin', + CRUSTDATA_LINKEDIN_SCRAPER_ENRICHMENT_ANALYSIS = 'memberEnrichmentMonitoringCrustdata', +}