diff --git a/README.md b/README.md
index d33695a4d0..5ef6b8719f 100644
--- a/README.md
+++ b/README.md
@@ -40,18 +40,18 @@
- [Book a call](#đź“ž-book-a-call)
## About crowd.dev
-crowd.dev is the developer data platform (DDP) that lets companies centralize all touch points developers have with their product and brand - be it in community (e.g. Stack Overflow or Reddit), product (open-source or SaaS), or commercial channels (e.g. HubSpot). The platform pulls data from a variety of different sources, normalizes it, matches identities across platforms, and enriches it with 3rd party data. The result is a unified 360-view of who the developers are that engage with your product and community, which companies they work for, and where they stand in their personal customer journey.
+crowd.dev is the developer data platform (DDP) that lets companies centralize all touch points developers have with their product and brand, be it in community (e.g. Stack Overflow or Reddit), product (open-source or SaaS), or commercial channels (e.g. HubSpot). The platform pulls data from a variety of different sources, normalizes it, matches identities across platforms, and enriches it with 3rd party data. The result is a unified 360-view of who the developers are that engage with your product and community, which companies they work for, and where they stand in their personal customer journey.
crowd.dev is open-source, built with developers in mind, available for both hosted and self-hosted deployments, open to extensions, and offers full control over your data.
**To our **users**:**
- You can get actively involved, contribute to our roadmap, and turn crowd.dev into the tool you always wanted.
- We are open regarding what we are building, allowing you to take a look inside, and making sure we handle your data in a privacy-preserving way.
-- You will never be locked in by us. Our interests as a company are aligned with yours and we need to make sure that we always deliver enough value to your with our commercial offering in relation to our pricing.
+- You will never be locked in by us. Our interests as a company are aligned with you and we need to make sure that we always deliver enough value to you with our commercial offering in relation to our pricing.
**To our developer community:**
- You can self-host crowd.dev to centralize data for your community or company while keeping full control over your data.
-- Our product is build for extensibilty. If you can think of any use cases that you want to build with the data we collect and store for you, please go ahead and build it! We will be here to help out if you need us.
+- Our product is built for extensibilty. If you can think of any use cases that you want to build with the data we collect and store for you, please go ahead and build it! We will be here to help out if you need us.
- You can actively contribute to crowd.dev (e.g. integrations), and we will be supporting you along the journey. Just take a look at our [Contributing guide](https://github.com/CrowdDotDev/crowd.dev/blob/main/CONTRIBUTING.md).
## ✨ Features
@@ -64,14 +64,14 @@ crowd.dev is open-source, built with developers in mind, available for both host
- **User enrichment** with 25+ attributes, including emails, social profiles, work experience, and technical skills. [cloud only]
- **Organization enrichment** with 50+ attributes, including industry, headcount, and revenue. [cloud only]
- **Sentiment analysis and conversation detection** to stay on top of what's going on in your open source community. [cloud only]
-- **[Eagle Eye](https://www.crowd.dev/eagle-eye)**: Monitor dev-focussed community platforms to find relevant content to engage with, helping you to gain developers’ mindshare and grow your community organically [cloud only]
+- **[Eagle Eye](https://www.crowd.dev/eagle-eye)**: Monitor dev-focused community platforms to find relevant content to engage with, helping you to gain developers’ mindshare and grow your community organically [cloud only]
## 🚀 Getting started
### Cloud version
-Our cloud version is a fast, easy and free way to get started with crowd.dev.
+Our cloud version is a fast, easy, and free way to get started with crowd.dev.
### Self-hosted version
diff --git a/backend/package-lock.json b/backend/package-lock.json
index ac8706d478..5e87b678f9 100644
--- a/backend/package-lock.json
+++ b/backend/package-lock.json
@@ -59,6 +59,7 @@
"erlpack": "^0.1.4",
"express": "4.17.1",
"express-rate-limit": "6.5.1",
+ "fast-levenshtein": "^3.0.0",
"formidable-serverless": "1.1.1",
"he": "^1.2.0",
"helmet": "4.1.1",
@@ -22441,6 +22442,11 @@
"source-map": "~0.6.1"
}
},
+ "node_modules/escodegen/node_modules/fast-levenshtein": {
+ "version": "2.0.6",
+ "resolved": "https://registry.npmjs.org/fast-levenshtein/-/fast-levenshtein-2.0.6.tgz",
+ "integrity": "sha512-DCXu6Ifhqcks7TZKY3Hxp3y6qphY5SJZmrWMDrKcERSOXWQdMhU9Ig/PYrzyw/ul9jOIyh0N4M0tbC5hodg8dw=="
+ },
"node_modules/escodegen/node_modules/levn": {
"version": "0.3.0",
"license": "MIT",
@@ -23262,8 +23268,12 @@
"license": "MIT"
},
"node_modules/fast-levenshtein": {
- "version": "2.0.6",
- "license": "MIT"
+ "version": "3.0.0",
+ "resolved": "https://registry.npmjs.org/fast-levenshtein/-/fast-levenshtein-3.0.0.tgz",
+ "integrity": "sha512-hKKNajm46uNmTlhHSyZkmToAc56uZJwYq7yrciZjqOxnlfQwERDQJmHPUp7m1m9wx8vgOe8IaCKZ5Kv2k1DdCQ==",
+ "dependencies": {
+ "fastest-levenshtein": "^1.0.7"
+ }
},
"node_modules/fast-safe-stringify": {
"version": "2.1.1",
@@ -23293,6 +23303,14 @@
"fxparser": "src/cli/cli.js"
}
},
+ "node_modules/fastest-levenshtein": {
+ "version": "1.0.16",
+ "resolved": "https://registry.npmjs.org/fastest-levenshtein/-/fastest-levenshtein-1.0.16.tgz",
+ "integrity": "sha512-eRnCtTTtGZFpQCwhJiUOuxPQWRXVKYDn0b2PeHfXL6/Zi53SLAzAHfVhVWK2AryC/WH05kGfxhFIPvTF0SXQzg==",
+ "engines": {
+ "node": ">= 4.9.1"
+ }
+ },
"node_modules/fastq": {
"version": "1.15.0",
"dev": true,
@@ -28538,6 +28556,12 @@
"node": ">= 0.8.0"
}
},
+ "node_modules/optionator/node_modules/fast-levenshtein": {
+ "version": "2.0.6",
+ "resolved": "https://registry.npmjs.org/fast-levenshtein/-/fast-levenshtein-2.0.6.tgz",
+ "integrity": "sha512-DCXu6Ifhqcks7TZKY3Hxp3y6qphY5SJZmrWMDrKcERSOXWQdMhU9Ig/PYrzyw/ul9jOIyh0N4M0tbC5hodg8dw==",
+ "dev": true
+ },
"node_modules/ora": {
"version": "5.4.1",
"dev": true,
@@ -60041,6 +60065,11 @@
"source-map": "~0.6.1"
},
"dependencies": {
+ "fast-levenshtein": {
+ "version": "2.0.6",
+ "resolved": "https://registry.npmjs.org/fast-levenshtein/-/fast-levenshtein-2.0.6.tgz",
+ "integrity": "sha512-DCXu6Ifhqcks7TZKY3Hxp3y6qphY5SJZmrWMDrKcERSOXWQdMhU9Ig/PYrzyw/ul9jOIyh0N4M0tbC5hodg8dw=="
+ },
"levn": {
"version": "0.3.0",
"requires": {
@@ -60593,7 +60622,12 @@
"dev": true
},
"fast-levenshtein": {
- "version": "2.0.6"
+ "version": "3.0.0",
+ "resolved": "https://registry.npmjs.org/fast-levenshtein/-/fast-levenshtein-3.0.0.tgz",
+ "integrity": "sha512-hKKNajm46uNmTlhHSyZkmToAc56uZJwYq7yrciZjqOxnlfQwERDQJmHPUp7m1m9wx8vgOe8IaCKZ5Kv2k1DdCQ==",
+ "requires": {
+ "fastest-levenshtein": "^1.0.7"
+ }
},
"fast-safe-stringify": {
"version": "2.1.1"
@@ -60607,6 +60641,11 @@
"strnum": "^1.0.5"
}
},
+ "fastest-levenshtein": {
+ "version": "1.0.16",
+ "resolved": "https://registry.npmjs.org/fastest-levenshtein/-/fastest-levenshtein-1.0.16.tgz",
+ "integrity": "sha512-eRnCtTTtGZFpQCwhJiUOuxPQWRXVKYDn0b2PeHfXL6/Zi53SLAzAHfVhVWK2AryC/WH05kGfxhFIPvTF0SXQzg=="
+ },
"fastq": {
"version": "1.15.0",
"dev": true,
@@ -63957,6 +63996,14 @@
"prelude-ls": "^1.2.1",
"type-check": "^0.4.0",
"word-wrap": "^1.2.3"
+ },
+ "dependencies": {
+ "fast-levenshtein": {
+ "version": "2.0.6",
+ "resolved": "https://registry.npmjs.org/fast-levenshtein/-/fast-levenshtein-2.0.6.tgz",
+ "integrity": "sha512-DCXu6Ifhqcks7TZKY3Hxp3y6qphY5SJZmrWMDrKcERSOXWQdMhU9Ig/PYrzyw/ul9jOIyh0N4M0tbC5hodg8dw==",
+ "dev": true
+ }
}
},
"ora": {
diff --git a/backend/package.json b/backend/package.json
index 2322afdade..b946b165f3 100644
--- a/backend/package.json
+++ b/backend/package.json
@@ -54,10 +54,10 @@
"@crowd/common": "file:../services/libs/common",
"@crowd/integrations": "file:../services/libs/integrations",
"@crowd/logging": "file:../services/libs/logging",
- "@crowd/tracing": "file:../services/libs/tracing",
"@crowd/opensearch": "file:../services/libs/opensearch",
"@crowd/redis": "file:../services/libs/redis",
"@crowd/sqs": "file:../services/libs/sqs",
+ "@crowd/tracing": "file:../services/libs/tracing",
"@crowd/types": "file:../services/libs/types",
"@cubejs-client/core": "^0.30.4",
"@google-cloud/storage": "5.3.0",
@@ -97,6 +97,7 @@
"erlpack": "^0.1.4",
"express": "4.17.1",
"express-rate-limit": "6.5.1",
+ "fast-levenshtein": "^3.0.0",
"formidable-serverless": "1.1.1",
"he": "^1.2.0",
"helmet": "4.1.1",
diff --git a/backend/src/bin/discord-ws.ts b/backend/src/bin/discord-ws.ts
index 8f058d6c2c..997e670e3c 100644
--- a/backend/src/bin/discord-ws.ts
+++ b/backend/src/bin/discord-ws.ts
@@ -58,7 +58,7 @@ async function spawnClient(
logger.info({ payload }, 'Processing Discord WS Message!')
- tracer.startActiveSpan('ProcessDiscordWSMessage', async (span) => {
+ await tracer.startActiveSpan('ProcessDiscordWSMessage', async (span) => {
try {
const integration = (await IntegrationRepository.findByIdentifier(
guildId,
diff --git a/backend/src/bin/job-generator.ts b/backend/src/bin/job-generator.ts
index 4956b14d04..a16c78af0a 100644
--- a/backend/src/bin/job-generator.ts
+++ b/backend/src/bin/job-generator.ts
@@ -10,7 +10,7 @@ for (const job of jobs) {
const cronJob = new CronJob(
job.cronTime,
async () => {
- tracer.startActiveSpan(`ProcessingJob:${job.name}`, async (span) => {
+ await tracer.startActiveSpan(`ProcessingJob:${job.name}`, async (span) => {
log.info({ job: job.name }, 'Triggering job.')
try {
await job.onTrigger(log)
diff --git a/backend/src/bin/nodejs-worker.ts b/backend/src/bin/nodejs-worker.ts
index 0a1b7515f6..e1fae5e069 100644
--- a/backend/src/bin/nodejs-worker.ts
+++ b/backend/src/bin/nodejs-worker.ts
@@ -57,7 +57,7 @@ async function handleDelayedMessages() {
const message = await receive(true)
if (message) {
- tracer.startActiveSpan('ProcessDelayedMessage', async (span) => {
+ await tracer.startActiveSpan('ProcessDelayedMessage', async (span) => {
try {
const msg: NodeWorkerMessageBase = JSON.parse(message.Body)
const messageLogger = getChildLogger('messageHandler', serviceLogger, {
@@ -130,7 +130,7 @@ async function handleMessages() {
handlerLogger.info('Listening for messages!')
const processSingleMessage = async (message: Message): Promise => {
- tracer.startActiveSpan('ProcessMessage', async (span) => {
+ await tracer.startActiveSpan('ProcessMessage', async (span) => {
const msg: NodeWorkerMessageBase = JSON.parse(message.Body)
const messageLogger = getChildLogger('messageHandler', serviceLogger, {
diff --git a/backend/src/database/repositories/organizationRepository.ts b/backend/src/database/repositories/organizationRepository.ts
index aa9a93455a..ad27851113 100644
--- a/backend/src/database/repositories/organizationRepository.ts
+++ b/backend/src/database/repositories/organizationRepository.ts
@@ -1,4 +1,5 @@
import lodash, { chunk } from 'lodash'
+import { get as getLevenshteinDistance } from 'fast-levenshtein'
import validator from 'validator'
import { FieldTranslatorFactory, OpensearchQueryParser } from '@crowd/opensearch'
import { PageData } from '@crowd/common'
@@ -27,6 +28,11 @@ import SegmentRepository from './segmentRepository'
const { Op } = Sequelize
+interface IOrganizationIdentityOpensearch {
+ string_platform: string
+ string_name: string
+}
+
interface IOrganizationPartialAggregatesOpensearch {
_source: {
uuid_organizationId: string
@@ -38,10 +44,12 @@ interface IOrganizationPartialAggregatesOpensearch {
}
}
-interface IOrganizationIdOpensearch {
+interface ISimilarOrganization {
_score: number
_source: {
uuid_organizationId: string
+ nested_identities: IOrganizationIdentityOpensearch[]
+ nested_weakIdentities: IOrganizationIdentityOpensearch[]
}
}
@@ -54,8 +62,6 @@ interface IOrganizationNoMerge {
noMergeId: string
}
-type MinMaxScores = { maxScore: number; minScore: number }
-
class OrganizationRepository {
static async filterByPayingTenant(
tenantId: string,
@@ -1162,28 +1168,48 @@ class OrganizationRepository {
return 10
}
- const normalizeScore = (max: number, min: number, score: number): number => {
- if (score > 100) {
- return 1
- }
+ const calculateSimilarity = (
+ primaryOrganization: IOrganizationPartialAggregatesOpensearch,
+ similarOrganization: ISimilarOrganization,
+ ): number => {
+ let smallestEditDistance: number = null
- if (max === min) {
- return (40 + Math.floor(Math.random() * 26) - 10) / 100
- }
-
- const normalizedScore = (score - min) / (max - min)
+ let similarPrimaryIdentity: IOrganizationIdentityOpensearch = null
- // randomize the cases where score === max and score === min
- if (normalizedScore === 1) {
- return Math.floor(Math.random() * (76 - 50) + 50) / 100
+ // find the smallest edit distance between both identity arrays
+ for (const primaryIdentity of primaryOrganization._source.nested_identities) {
+ // similar organization has a weakIdentity as one of primary organization's strong identity, return score 95
+ if (
+ similarOrganization._source.nested_weakIdentities.length > 0 &&
+ similarOrganization._source.nested_weakIdentities.some(
+ (weakIdentity) =>
+ weakIdentity.string_name === primaryIdentity.string_name &&
+ weakIdentity.string_platform === primaryIdentity.string_platform,
+ )
+ ) {
+ return 0.95
+ }
+ for (const secondaryIdentity of similarOrganization._source.nested_identities) {
+ const currentLevenstheinDistance = getLevenshteinDistance(
+ primaryIdentity.string_name,
+ secondaryIdentity.string_name,
+ )
+ if (smallestEditDistance === null || smallestEditDistance > currentLevenstheinDistance) {
+ smallestEditDistance = currentLevenstheinDistance
+ similarPrimaryIdentity = primaryIdentity
+ }
+ }
}
- // normalization is resolved to 0, randomize it
- if (normalizedScore === 0) {
- return Math.floor(Math.random() * (41 - 20) + 20) / 100
+ // calculate similarity percentage
+ const identityLength = similarPrimaryIdentity.string_name.length
+
+ if (identityLength < smallestEditDistance) {
+ // if levensthein distance is bigger than the word itself, it might be a prefix match, return medium similarity
+ return (Math.floor(Math.random() * 21) + 20) / 100
}
- return normalizedScore
+ return Math.floor(((identityLength - smallestEditDistance) / identityLength) * 100) / 100
}
const tenant = SequelizeRepository.getCurrentTenant(options)
@@ -1433,10 +1459,10 @@ class OrganizationRepository {
collapse: {
field: 'uuid_organizationId',
},
- _source: ['uuid_organizationId'],
+ _source: ['uuid_organizationId', 'nested_identities', 'nested_weakIdentities'],
}
- const organizationsToMerge: IOrganizationIdOpensearch[] =
+ const organizationsToMerge: ISimilarOrganization[] =
(
await options.opensearch.search({
index: OpenSearchIndex.ORGANIZATIONS,
@@ -1444,6 +1470,7 @@ class OrganizationRepository {
})
).body?.hits?.hits || []
+ /*
const { maxScore, minScore } = organizationsToMerge.reduce(
(acc, organizationToMerge) => {
if (!acc.minScore || organizationToMerge._score < acc.minScore) {
@@ -1458,10 +1485,11 @@ class OrganizationRepository {
},
{ maxScore: null, minScore: null },
)
+ */
for (const organizationToMerge of organizationsToMerge) {
yieldChunk.push({
- similarity: normalizeScore(maxScore, minScore, organizationToMerge._score),
+ similarity: calculateSimilarity(organization, organizationToMerge),
organizations: [
organization._source.uuid_organizationId,
organizationToMerge._source.uuid_organizationId,
@@ -1541,7 +1569,7 @@ class OrganizationRepository {
organizations: [i, organizationToMergeResults[idx]],
similarity: orgs[idx].similarity,
}))
- return { rows: result, count: orgs[0].total_count / 2, limit, offset }
+ return { rows: result, count: orgs[0].total_count, limit, offset }
}
return { rows: [{ organizations: [], similarity: 0 }], count: 0, limit, offset }
diff --git a/frontend/src/integrations/devto/components/devto-connect-drawer.vue b/frontend/src/integrations/devto/components/devto-connect-drawer.vue
index 21e75689ba..c68b953856 100644
--- a/frontend/src/integrations/devto/components/devto-connect-drawer.vue
+++ b/frontend/src/integrations/devto/components/devto-connect-drawer.vue
@@ -454,7 +454,7 @@ const rules = {
const $externalResults = ref({});
-const $v = useVuelidate(rules, form, { $externalResults });
+const $v = useVuelidate(rules, form, { $externalResults, $stopPropagation: true });
watch(
() => props.integration,
diff --git a/frontend/src/modules/organization/components/form/organization-form-identities.vue b/frontend/src/modules/organization/components/form/organization-form-identities.vue
index 5bc10937bf..f3b562a69d 100644
--- a/frontend/src/modules/organization/components/form/organization-form-identities.vue
+++ b/frontend/src/modules/organization/components/form/organization-form-identities.vue
@@ -172,6 +172,7 @@ function findPlatform(platform) {
function onInputChange(newValue, key, value, index) {
model.value.identities[index] = {
...props.modelValue.identities[index],
+ name: newValue,
url: newValue.length ? `https://${value.urlPrefix}${newValue}` : null,
};
}
diff --git a/frontend/src/modules/organization/pages/organization-form-page.vue b/frontend/src/modules/organization/pages/organization-form-page.vue
index dbef85717b..29632ffd20 100644
--- a/frontend/src/modules/organization/pages/organization-form-page.vue
+++ b/frontend/src/modules/organization/pages/organization-form-page.vue
@@ -204,18 +204,17 @@ function getInitialModel(record) {
return JSON.parse(
JSON.stringify(
formSchema.initialValues({
+ ...(record || {}),
name: record ? record.name : '',
displayName: record ? record.displayName || record.name : '',
headline: record ? record.headline : '',
description: record ? record.description : '',
joinedAt: record ? record.joinedAt : '',
- employees: record ? record.employees : null,
- location: record ? record.location : null,
- website: record ? record.website : null,
identities: record ? [...record.identities.map((i) => ({
+ ...i,
platform: i.platform,
name: i.name,
- username: i.url ? i.url.split('/').at(-1) : '',
+ username: i.url ? i.url.split('/').at(-1) : null,
url: i.url,
}))] : [],
revenueRange: record ? record.revenueRange : {},
@@ -227,29 +226,6 @@ function getInitialModel(record) {
record && record.phoneNumbers?.length > 0
? record.phoneNumbers
: [''],
- type: record ? record.type : null,
- size: record ? record.size : null,
- industry: record ? record.industry : null,
- founded: record ? record.founded : null,
- profiles: record ? record.profiles : null,
- affiliatedProfiles: record ? record.affiliatedProfiles : null,
- allSubsidiaries: record ? record.allSubsidiaries : null,
- alternativeDomains: record ? record.alternativeDomains : null,
- alternativeNames: record ? record.alternativeNames : null,
- averageEmployeeTenure: record ? record.averageEmployeeTenure : null,
- averageTenureByLevel: record ? record.averageTenureByLevel : null,
- averageTenureByRole: record ? record.averageTenureByRole : null,
- directSubsidiaries: record ? record.directSubsidiaries : null,
- employeeChurnRate: record ? record.employeeChurnRate : null,
- employeeCountByCountry: record ? record.employeeCountByCountry : null,
- employeeCountByMonth: record ? record.employeeCountByMonth : null,
- employeeGrowthRate: record ? record.employeeGrowthRate : null,
- gicsSector: record ? record.gicsSector : null,
- grossAdditionsByMonth: record ? record.grossAdditionsByMonth : null,
- grossDeparturesByMonth: record ? record.grossDeparturesByMonth : null,
- immediateParent: record ? record.immediateParent : null,
- tags: record ? record.tags : null,
- ultimateParent: record ? record.ultimateParent : null,
}),
),
);
@@ -377,6 +353,7 @@ function onCancel() {
async function onSubmit() {
isFormSubmitting.value = true;
+
const data = {
manuallyCreated: true,
...formModel.value,
@@ -388,7 +365,8 @@ async function onSubmit() {
}
return acc;
}, []),
- identities: formModel.value.identities.filter((i) => i.username.length > 0).map((i) => ({
+ identities: formModel.value.identities.filter((i) => i.username?.length > 0 || i.organizationId).map((i) => ({
+ ...i,
platform: i.platform,
url: i.url,
name: i.name,
diff --git a/frontend/src/modules/settings/settings-pricing-plans.js b/frontend/src/modules/settings/settings-pricing-plans.js
index 3095413167..5c8f1a7ec8 100644
--- a/frontend/src/modules/settings/settings-pricing-plans.js
+++ b/frontend/src/modules/settings/settings-pricing-plans.js
@@ -53,7 +53,7 @@ export const plans = {
title: 'Scale',
description:
'Commercialize your open source product',
- price: '$950/year',
+ price: '$950/month',
priceInfo: 'annual payment',
featuresNote: 'Everything in Essential, plus:',
features: [
diff --git a/services/apps/data_sink_worker/src/bin/process-results.ts b/services/apps/data_sink_worker/src/bin/process-results.ts
index 12f3b79cd7..22e116ac48 100644
--- a/services/apps/data_sink_worker/src/bin/process-results.ts
+++ b/services/apps/data_sink_worker/src/bin/process-results.ts
@@ -1,4 +1,4 @@
-import { DB_CONFIG, REDIS_CONFIG, SQS_CONFIG } from '../conf'
+import { DB_CONFIG, REDIS_CONFIG, SQS_CONFIG, SENTIMENT_CONFIG } from '../conf'
import DataSinkRepository from '../repo/dataSink.repo'
import DataSinkService from '../service/dataSink.service'
import { DbStore, getDbConnection } from '@crowd/database'
@@ -6,6 +6,7 @@ import { getServiceTracer } from '@crowd/tracing'
import { getServiceLogger } from '@crowd/logging'
import { getRedisClient } from '@crowd/redis'
import { NodejsWorkerEmitter, SearchSyncWorkerEmitter, getSqsClient } from '@crowd/sqs'
+import { initializeSentimentAnalysis } from '@crowd/sentiment'
const tracer = getServiceTracer()
const log = getServiceLogger()
@@ -23,8 +24,12 @@ setImmediate(async () => {
const sqsClient = getSqsClient(SQS_CONFIG())
const redisClient = await getRedisClient(REDIS_CONFIG())
+ initializeSentimentAnalysis(SENTIMENT_CONFIG())
+
const nodejsWorkerEmitter = new NodejsWorkerEmitter(sqsClient, tracer, log)
+ await nodejsWorkerEmitter.init()
const searchSyncWorkerEmitter = new SearchSyncWorkerEmitter(sqsClient, tracer, log)
+ await searchSyncWorkerEmitter.init()
const dbConnection = await getDbConnection(DB_CONFIG())
const store = new DbStore(log, dbConnection)
diff --git a/services/apps/data_sink_worker/src/queue/index.ts b/services/apps/data_sink_worker/src/queue/index.ts
index 0f75d82c6e..9b91160271 100644
--- a/services/apps/data_sink_worker/src/queue/index.ts
+++ b/services/apps/data_sink_worker/src/queue/index.ts
@@ -32,7 +32,7 @@ export class WorkerQueueReceiver extends SqsQueueReceiver {
}
override async processMessage(message: IQueueMessage): Promise {
- this.tracer.startActiveSpan('ProcessMessage', async (span: Span) => {
+ await this.tracer.startActiveSpan('ProcessMessage', async (span: Span) => {
try {
this.log.trace({ messageType: message.type }, 'Processing message!')
diff --git a/services/apps/data_sink_worker/src/repo/member.repo.ts b/services/apps/data_sink_worker/src/repo/member.repo.ts
index 6bc8060e92..45bc6a1f8f 100644
--- a/services/apps/data_sink_worker/src/repo/member.repo.ts
+++ b/services/apps/data_sink_worker/src/repo/member.repo.ts
@@ -152,6 +152,8 @@ export default class MemberRepository extends RepositoryBase {
},
})
+ const updatedAt = new Date()
+
const prepared = RepositoryBase.prepare(
{
...data,
@@ -159,19 +161,21 @@ export default class MemberRepository extends RepositoryBase {
data?.weakIdentities?.length > 0 && {
weakIdentities: JSON.stringify(data.weakIdentities),
}),
- updatedAt: new Date(),
+ updatedAt,
},
dynamicColumnSet,
)
const query = this.dbInstance.helpers.update(prepared, dynamicColumnSet)
- const condition = this.format('where id = $(id) and "tenantId" = $(tenantId)', {
- id,
- tenantId,
- })
- const result = await this.db().result(`${query} ${condition}`)
-
- this.checkUpdateRowCount(result.rowCount, 1)
+ const condition = this.format(
+ 'where id = $(id) and "tenantId" = $(tenantId) and "updatedAt" < $(updatedAt)',
+ {
+ id,
+ tenantId,
+ updatedAt,
+ },
+ )
+ await this.db().result(`${query} ${condition}`)
}
public async getIdentities(memberId: string, tenantId: string): Promise {
diff --git a/services/apps/data_sink_worker/src/repo/organization.repo.ts b/services/apps/data_sink_worker/src/repo/organization.repo.ts
index f4e02bc588..60df4a860d 100644
--- a/services/apps/data_sink_worker/src/repo/organization.repo.ts
+++ b/services/apps/data_sink_worker/src/repo/organization.repo.ts
@@ -269,6 +269,8 @@ export class OrganizationRepository extends RepositoryBase 0 && {
weakIdentities: JSON.stringify(data.weakIdentities),
}),
- updatedAt: new Date(),
+ updatedAt,
},
dynamicColumnSet,
)
const query = this.dbInstance.helpers.update(prepared, dynamicColumnSet)
- const condition = this.format('where id = $(id)', { id })
-
- const result = await this.db().result(`${query} ${condition}`)
+ const condition = this.format('where id = $(id) and "updatedAt" < $(updatedAt)', {
+ id,
+ updatedAt,
+ })
- this.checkUpdateRowCount(result.rowCount, 1)
+ await this.db().result(`${query} ${condition}`)
}
public async addIdentity(
diff --git a/services/apps/data_sink_worker/src/service/member.service.ts b/services/apps/data_sink_worker/src/service/member.service.ts
index 7cd67fbf1f..5b153db0eb 100644
--- a/services/apps/data_sink_worker/src/service/member.service.ts
+++ b/services/apps/data_sink_worker/src/service/member.service.ts
@@ -341,7 +341,7 @@ export default class MemberService extends LoggerBase {
// first try finding the member using the identity
const identity = singleOrDefault(
member.identities,
- (i) => i.platform === platform && i.sourceId !== null,
+ (i) => i.platform === platform && i.sourceId !== undefined && i.sourceId !== null,
)
let dbMember = await txRepo.findMember(tenantId, segmentId, platform, identity.username)
@@ -356,7 +356,7 @@ export default class MemberService extends LoggerBase {
this.log.trace({ memberId: dbMember.id }, 'Found existing member.')
// set a record in membersSyncRemote to save the sourceId
- // we can't use member.attributes because of segments
+ // these rows will be used for outgoing data
if (member.attributes?.sourceId?.[platform]) {
await txRepo.addToSyncRemote(
dbMember.id,
diff --git a/services/apps/integration_data_worker/src/queue/index.ts b/services/apps/integration_data_worker/src/queue/index.ts
index df19f93750..084e6eea04 100644
--- a/services/apps/integration_data_worker/src/queue/index.ts
+++ b/services/apps/integration_data_worker/src/queue/index.ts
@@ -37,7 +37,7 @@ export class WorkerQueueReceiver extends SqsQueueReceiver {
}
override async processMessage(message: IQueueMessage): Promise {
- this.tracer.startActiveSpan('ProcessMessage', async (span: Span) => {
+ await this.tracer.startActiveSpan('ProcessMessage', async (span: Span) => {
try {
this.log.trace({ messageType: message.type }, 'Processing message!')
diff --git a/services/apps/integration_run_worker/src/queue/index.ts b/services/apps/integration_run_worker/src/queue/index.ts
index 3512d2e029..15ccb34da0 100644
--- a/services/apps/integration_run_worker/src/queue/index.ts
+++ b/services/apps/integration_run_worker/src/queue/index.ts
@@ -40,7 +40,7 @@ export class WorkerQueueReceiver extends SqsQueueReceiver {
}
override async processMessage(message: IQueueMessage): Promise {
- this.tracer.startActiveSpan('ProcessMessage', async (span: Span) => {
+ await this.tracer.startActiveSpan('ProcessMessage', async (span: Span) => {
try {
this.log.trace({ messageType: message.type }, 'Processing message!')
diff --git a/services/apps/integration_stream_worker/src/queue/index.ts b/services/apps/integration_stream_worker/src/queue/index.ts
index 7ea466f1af..e74f4212b9 100644
--- a/services/apps/integration_stream_worker/src/queue/index.ts
+++ b/services/apps/integration_stream_worker/src/queue/index.ts
@@ -41,7 +41,7 @@ export class WorkerQueueReceiver extends SqsQueueReceiver {
}
override async processMessage(message: IQueueMessage, receiptHandle: string): Promise {
- this.tracer.startActiveSpan('ProcessMessage', async (span: Span) => {
+ await this.tracer.startActiveSpan('ProcessMessage', async (span: Span) => {
try {
this.log.trace({ messageType: message.type }, 'Processing message!')
diff --git a/services/apps/integration_sync_worker/src/queue/index.ts b/services/apps/integration_sync_worker/src/queue/index.ts
index 8f39472e98..0d6ed5a083 100644
--- a/services/apps/integration_sync_worker/src/queue/index.ts
+++ b/services/apps/integration_sync_worker/src/queue/index.ts
@@ -43,7 +43,7 @@ export class WorkerQueueReceiver extends SqsQueueReceiver {
}
protected override async processMessage(message: T): Promise {
- this.tracer.startActiveSpan('ProcessMessage', async (span: Span) => {
+ await this.tracer.startActiveSpan('ProcessMessage', async (span: Span) => {
try {
this.log.trace({ messageType: message.type }, 'Processing message!')
diff --git a/services/apps/search_sync_worker/src/queue/index.ts b/services/apps/search_sync_worker/src/queue/index.ts
index 34b388cfc6..01465faf1c 100644
--- a/services/apps/search_sync_worker/src/queue/index.ts
+++ b/services/apps/search_sync_worker/src/queue/index.ts
@@ -108,7 +108,7 @@ export class WorkerQueueReceiver extends SqsQueueReceiver {
}
protected override async processMessage(message: T): Promise {
- this.tracer.startActiveSpan('ProcessMessage', async (span: Span) => {
+ await this.tracer.startActiveSpan('ProcessMessage', async (span: Span) => {
try {
this.log.trace({ messageType: message.type }, 'Processing message!')
diff --git a/services/apps/search_sync_worker/src/repo/organization.repo.ts b/services/apps/search_sync_worker/src/repo/organization.repo.ts
index 303739281a..52987e6a98 100644
--- a/services/apps/search_sync_worker/src/repo/organization.repo.ts
+++ b/services/apps/search_sync_worker/src/repo/organization.repo.ts
@@ -113,7 +113,8 @@ export class OrganizationRepository extends RepositoryBase {
ctx,
)
- const comments = response.json.data.things
+ const comments = response?.json?.data?.things
+
+ if (!comments) {
+ return
+ }
if (comments.length === 0) {
return
diff --git a/services/libs/redis/src/rateLimiter.ts b/services/libs/redis/src/rateLimiter.ts
index bab293e2aa..411b4b65e6 100644
--- a/services/libs/redis/src/rateLimiter.ts
+++ b/services/libs/redis/src/rateLimiter.ts
@@ -37,7 +37,7 @@ export class ConcurrentRequestLimiter implements IConcurrentRequestLimiter {
private readonly maxConcurrentRequests: number,
private readonly requestKey: string,
// cache key will be deleted after this time since last increment / decrement
- private readonly maxLockTimeSeconds = 30,
+ private readonly maxLockTimeSeconds = 50,
) {
this.cache = cache
this.maxConcurrentRequests = maxConcurrentRequests
@@ -45,7 +45,11 @@ export class ConcurrentRequestLimiter implements IConcurrentRequestLimiter {
this.maxLockTimeSeconds = maxLockTimeSeconds
}
- public async checkConcurrentRequestLimit(integrationId: string, retries = 200, sleepTimeMs = 50) {
+ public async checkConcurrentRequestLimit(
+ integrationId: string,
+ retries = 1000,
+ sleepTimeMs = 50,
+ ) {
const key = this.getRequestKey(integrationId)
let currentRequests: number
let canMakeRequest: boolean