diff --git a/services/apps/integration_stream_worker/src/service/integrationStreamService.ts b/services/apps/integration_stream_worker/src/service/integrationStreamService.ts index 2743e7b6f3..f6d8896bc2 100644 --- a/services/apps/integration_stream_worker/src/service/integrationStreamService.ts +++ b/services/apps/integration_stream_worker/src/service/integrationStreamService.ts @@ -6,7 +6,7 @@ import { IProcessWebhookStreamContext, } from '@crowd/integrations' import { Logger, LoggerBase, getChildLogger } from '@crowd/logging' -import { RedisCache, RedisClient, RateLimiter } from '@crowd/redis' +import { RedisCache, RedisClient, RateLimiter, ConcurrentRequestLimiter } from '@crowd/redis' import { IntegrationDataWorkerEmitter, IntegrationRunWorkerEmitter, @@ -349,6 +349,9 @@ export default class IntegrationStreamService extends LoggerBase { getRateLimiter: (maxRequests: number, timeWindowSeconds: number, counterKey: string) => { return new RateLimiter(globalCache, maxRequests, timeWindowSeconds, counterKey) }, + getConcurrentRequestLimiter: (maxConcurrentRequests: number, counterKey: string) => { + return new ConcurrentRequestLimiter(globalCache, maxConcurrentRequests, counterKey) + }, } this.log.debug('Processing webhook stream!') @@ -520,6 +523,9 @@ export default class IntegrationStreamService extends LoggerBase { getRateLimiter: (maxRequests: number, timeWindowSeconds: number, counterKey: string) => { return new RateLimiter(globalCache, maxRequests, timeWindowSeconds, counterKey) }, + getConcurrentRequestLimiter: (maxConcurrentRequests: number, counterKey: string) => { + return new ConcurrentRequestLimiter(globalCache, maxConcurrentRequests, counterKey) + }, } this.log.debug('Processing stream!') diff --git a/services/libs/integrations/src/integrations/github/api/graphql/baseQuery.ts b/services/libs/integrations/src/integrations/github/api/graphql/baseQuery.ts index 971a56dbb0..21ae1001fb 100644 --- a/services/libs/integrations/src/integrations/github/api/graphql/baseQuery.ts +++ b/services/libs/integrations/src/integrations/github/api/graphql/baseQuery.ts @@ -2,7 +2,12 @@ import { graphql } from '@octokit/graphql' import { GraphQlQueryResponseData } from '@octokit/graphql/dist-types/types' import { GraphQlQueryResponse } from '@crowd/types' -import { RateLimitError } from '@crowd/types' +import { RateLimitError, IConcurrentRequestLimiter } from '@crowd/types' + +interface Limiter { + integrationId: string + concurrentRequestLimiter: IConcurrentRequestLimiter +} class BaseQuery { static BASE_URL = 'https://api.github.com/graphql' @@ -84,14 +89,24 @@ class BaseQuery { * @param beforeCursor Cursor to paginate records before it * @returns parsed graphQl result */ - async getSinglePage(beforeCursor: string): Promise { + async getSinglePage(beforeCursor: string, limiter?: Limiter): Promise { const paginatedQuery = BaseQuery.interpolate(this.query, { beforeCursor: BaseQuery.getPagination(beforeCursor), }) try { - const result = await this.graphQL(paginatedQuery) - return this.getEventData(result) + if (limiter) { + return limiter.concurrentRequestLimiter.processWithLimit( + limiter.integrationId, + async () => { + const result = await this.graphQL(paginatedQuery) + return this.getEventData(result) + }, + ) + } else { + const result = await this.graphQL(paginatedQuery) + return this.getEventData(result) + } } catch (err) { throw BaseQuery.processGraphQLError(err) } diff --git a/services/libs/integrations/src/integrations/github/processStream.ts b/services/libs/integrations/src/integrations/github/processStream.ts index fd0dff3af7..8a01148779 100644 --- a/services/libs/integrations/src/integrations/github/processStream.ts +++ b/services/libs/integrations/src/integrations/github/processStream.ts @@ -1,10 +1,14 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ // processStream.ts content import { singleOrDefault, timeout } from '@crowd/common' -import { GraphQlQueryResponse } from '@crowd/types' +import { GraphQlQueryResponse, IConcurrentRequestLimiter } from '@crowd/types' import { createAppAuth } from '@octokit/auth-app' import { AuthInterface } from '@octokit/auth-app/dist-types/types' -import { IProcessStreamContext, ProcessStreamHandler } from '../../types' +import { + IProcessStreamContext, + ProcessStreamHandler, + IProcessWebhookStreamContext, +} from '../../types' import DiscussionCommentsQuery from './api/graphql/discussionComments' import DiscussionsQuery from './api/graphql/discussions' import ForksQuery from './api/graphql/forks' @@ -40,6 +44,7 @@ import { const IS_TEST_ENV: boolean = process.env.NODE_ENV === 'test' let githubAuthenticator: AuthInterface | undefined = undefined +let concurrentRequestLimiter: IConcurrentRequestLimiter | undefined = undefined function getAuth(ctx: IProcessStreamContext): AuthInterface | undefined { const GITHUB_CONFIG = ctx.platformSettings as GithubPlatformSettings @@ -60,6 +65,18 @@ function getAuth(ctx: IProcessStreamContext): AuthInterface | undefined { return githubAuthenticator } +export function getConcurrentRequestLimiter( + ctx: IProcessStreamContext | IProcessWebhookStreamContext, +): IConcurrentRequestLimiter { + if (concurrentRequestLimiter === undefined) { + concurrentRequestLimiter = ctx.getConcurrentRequestLimiter( + 2, // max 2 concurrent requests + 'github-concurrent-request-limiter', + ) + } + return concurrentRequestLimiter +} + // const getTokenFromCache = async (ctx: IProcessStreamContext) => { // const key = 'github-token-cache' // const cache = ctx.integrationCache // this cache is tied up with integrationId @@ -219,7 +236,10 @@ const processRootStream: ProcessStreamHandler = async (ctx) => { try { // we don't need to get default 100 item per page, just 1 is enough to check if repo is available const stargazersQuery = new StargazersQuery(repo, ctx.integration.token, 1) - await stargazersQuery.getSinglePage('') + await stargazersQuery.getSinglePage('', { + concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), + integrationId: ctx.integration.id, + }) repos.push(repo) } catch (e) { if (e.rateLimitResetSeconds) { @@ -263,7 +283,10 @@ const processRootStream: ProcessStreamHandler = async (ctx) => { const processStargazersStream: ProcessStreamHandler = async (ctx) => { const data = ctx.stream.data as GithubBasicStream const stargazersQuery = new StargazersQuery(data.repo, ctx.integration.token) - const result = await stargazersQuery.getSinglePage(data.page) + const result = await stargazersQuery.getSinglePage(data.page, { + concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), + integrationId: ctx.integration.id, + }) result.data = result.data.filter((i) => (i as any).node?.login) // handle next page @@ -285,7 +308,10 @@ const processStargazersStream: ProcessStreamHandler = async (ctx) => { const processForksStream: ProcessStreamHandler = async (ctx) => { const data = ctx.stream.data as GithubBasicStream const forksQuery = new ForksQuery(data.repo, ctx.integration.token) - const result = await forksQuery.getSinglePage(data.page) + const result = await forksQuery.getSinglePage(data.page, { + concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), + integrationId: ctx.integration.id, + }) // filter out activities without authors (such as bots) -- may not the case for forks, but filter out anyway result.data = result.data.filter((i) => (i as any).owner?.login) @@ -309,7 +335,10 @@ const processForksStream: ProcessStreamHandler = async (ctx) => { const processPullsStream: ProcessStreamHandler = async (ctx) => { const data = ctx.stream.data as GithubBasicStream const forksQuery = new PullRequestsQuery(data.repo, ctx.integration.token) - const result = await forksQuery.getSinglePage(data.page) + const result = await forksQuery.getSinglePage(data.page, { + concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), + integrationId: ctx.integration.id, + }) // filter out activities without authors (such as bots) result.data = result.data.filter((i) => (i as any).author?.login) @@ -484,7 +513,10 @@ const processPullCommentsStream: ProcessStreamHandler = async (ctx) => { ctx.integration.token, ) - const result = await pullRequestCommentsQuery.getSinglePage(data.page) + const result = await pullRequestCommentsQuery.getSinglePage(data.page, { + concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), + integrationId: ctx.integration.id, + }) result.data = result.data.filter((i) => (i as any).author?.login) // handle next page @@ -513,7 +545,10 @@ const processPullReviewThreadsStream: ProcessStreamHandler = async (ctx) => { ctx.integration.token, ) - const result = await pullRequestReviewThreadsQuery.getSinglePage(data.page) + const result = await pullRequestReviewThreadsQuery.getSinglePage(data.page, { + concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), + integrationId: ctx.integration.id, + }) // handle next page await publishNextPageStream(ctx, result) @@ -541,7 +576,10 @@ const processPullReviewThreadCommentsStream: ProcessStreamHandler = async (ctx) ctx.integration.token, ) - const result = await pullRequestReviewThreadCommentsQuery.getSinglePage(data.page) + const result = await pullRequestReviewThreadCommentsQuery.getSinglePage(data.page, { + concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), + integrationId: ctx.integration.id, + }) // filter out activities without authors (such as bots) result.data = result.data.filter((i) => (i as any).author?.login) @@ -574,7 +612,10 @@ export const processPullCommitsStream: ProcessStreamHandler = async (ctx) => { const pullRequestCommitsQuery = new PullRequestCommitsQuery(data.repo, pullRequestNumber, token) try { - result = await pullRequestCommitsQuery.getSinglePage(data.page) + result = await pullRequestCommitsQuery.getSinglePage(data.page, { + concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), + integrationId: ctx.integration.id, + }) } catch (err) { ctx.log.warn( { @@ -589,7 +630,10 @@ export const processPullCommitsStream: ProcessStreamHandler = async (ctx) => { pullRequestNumber, ctx.integration.token, ) - result = await pullRequestCommitsQueryNoAdditions.getSinglePage(data.page) + result = await pullRequestCommitsQueryNoAdditions.getSinglePage(data.page, { + concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), + integrationId: ctx.integration.id, + }) } // handle next page @@ -624,7 +668,10 @@ export const processPullCommitsStream: ProcessStreamHandler = async (ctx) => { const processIssuesStream: ProcessStreamHandler = async (ctx) => { const data = ctx.stream.data as GithubBasicStream const issuesQuery = new IssuesQuery(data.repo, ctx.integration.token) - const result = await issuesQuery.getSinglePage(data.page) + const result = await issuesQuery.getSinglePage(data.page, { + concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), + integrationId: ctx.integration.id, + }) // filter out activities without authors (such as bots) result.data = result.data.filter((i) => (i as any).author?.login) @@ -683,7 +730,10 @@ const processIssueCommentsStream: ProcessStreamHandler = async (ctx) => { const data = ctx.stream.data as GithubBasicStream const issueNumber = data.issueNumber const issueCommentsQuery = new IssueCommentsQuery(data.repo, issueNumber, ctx.integration.token) - const result = await issueCommentsQuery.getSinglePage(data.page) + const result = await issueCommentsQuery.getSinglePage(data.page, { + concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), + integrationId: ctx.integration.id, + }) result.data = result.data.filter((i) => (i as any).author?.login) // handle next page @@ -706,7 +756,10 @@ const processIssueCommentsStream: ProcessStreamHandler = async (ctx) => { const processDiscussionsStream: ProcessStreamHandler = async (ctx) => { const data = ctx.stream.data as GithubBasicStream const discussionsQuery = new DiscussionsQuery(data.repo, ctx.integration.token) - const result = await discussionsQuery.getSinglePage(data.page) + const result = await discussionsQuery.getSinglePage(data.page, { + concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), + integrationId: ctx.integration.id, + }) result.data = result.data.filter((i) => (i as any).author?.login) @@ -746,7 +799,10 @@ const processDiscussionCommentsStream: ProcessStreamHandler = async (ctx) => { data.discussionNumber, ctx.integration.token, ) - const result = await discussionCommentsQuery.getSinglePage(data.page) + const result = await discussionCommentsQuery.getSinglePage(data.page, { + concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), + integrationId: ctx.integration.id, + }) result.data = result.data.filter((i) => (i as any).author?.login) // handle next page diff --git a/services/libs/integrations/src/integrations/github/processWebhookStream.ts b/services/libs/integrations/src/integrations/github/processWebhookStream.ts index cc30327e1a..67aa2e8584 100644 --- a/services/libs/integrations/src/integrations/github/processWebhookStream.ts +++ b/services/libs/integrations/src/integrations/github/processWebhookStream.ts @@ -20,7 +20,11 @@ import getMember from './api/graphql/members' import { prepareMember } from './processStream' import TeamsQuery from './api/graphql/teams' import { GithubWebhookTeam } from './api/graphql/types' -import { processPullCommitsStream, getGithubToken } from './processStream' +import { + processPullCommitsStream, + getGithubToken, + getConcurrentRequestLimiter, +} from './processStream' const IS_TEST_ENV: boolean = process.env.NODE_ENV === 'test' @@ -198,7 +202,10 @@ const parseWebhookPullRequest = async (payload: any, ctx: IProcessWebhookStreamC // a team sent as reviewer, first we need to find members in this team const team: GithubWebhookTeam = payload.requested_team const token = await getGithubToken(ctx as IProcessStreamContext) - const teamMembers = await new TeamsQuery(team.node_id, token).getSinglePage('') + const teamMembers = await new TeamsQuery(team.node_id, token).getSinglePage('', { + concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), + integrationId: ctx.integration.id, + }) for (const teamMember of teamMembers.data) { await parseWebhookPullRequestEvents({ ...payload, requested_reviewer: teamMember }, ctx) diff --git a/services/libs/integrations/src/types.ts b/services/libs/integrations/src/types.ts index 0fa8802b6e..62cff0b763 100644 --- a/services/libs/integrations/src/types.ts +++ b/services/libs/integrations/src/types.ts @@ -6,7 +6,13 @@ import { IAutomation, } from '@crowd/types' import { Logger } from '@crowd/logging' -import { ICache, IIntegration, IIntegrationStream, IRateLimiter } from '@crowd/types' +import { + ICache, + IIntegration, + IIntegrationStream, + IRateLimiter, + IConcurrentRequestLimiter, +} from '@crowd/types' import { IntegrationSyncWorkerEmitter } from '@crowd/sqs' import { IBatchOperationResult } from './integrations/premium/hubspot/api/types' @@ -71,6 +77,10 @@ export interface IProcessStreamContext extends IIntegrationContext { integrationCache: ICache getRateLimiter: (maxRequests: number, timeWindowSeconds: number, cacheKey: string) => IRateLimiter + getConcurrentRequestLimiter: ( + maxConcurrentRequests: number, + cacheKey: string, + ) => IConcurrentRequestLimiter } export interface IProcessWebhookStreamContext { @@ -99,6 +109,10 @@ export interface IProcessWebhookStreamContext { integrationCache: ICache getRateLimiter: (maxRequests: number, timeWindowSeconds: number, cacheKey: string) => IRateLimiter + getConcurrentRequestLimiter: ( + maxConcurrentRequests: number, + cacheKey: string, + ) => IConcurrentRequestLimiter } export interface IProcessDataContext extends IIntegrationContext { diff --git a/services/libs/redis/src/cache.ts b/services/libs/redis/src/cache.ts index 3b5a1eda87..9139d51fe1 100644 --- a/services/libs/redis/src/cache.ts +++ b/services/libs/redis/src/cache.ts @@ -58,6 +58,22 @@ export class RedisCache extends LoggerBase implements ICache { return result } + async decrement(key: string, decrementBy = 1, ttlSeconds?: number): Promise { + const actualKey = this.prefixer(key) + + if (ttlSeconds !== undefined) { + const [decrResult] = await this.client + .multi() + .decrBy(actualKey, decrementBy) + .expire(actualKey, ttlSeconds) + .exec() + return decrResult as number + } + + const result = await this.client.decrBy(actualKey, decrementBy) + return result + } + public setIfNotExistsAlready(key: string, value: string): Promise { const actualKey = this.prefixer(key) return this.client.setNX(actualKey, value) diff --git a/services/libs/redis/src/rateLimiter.ts b/services/libs/redis/src/rateLimiter.ts index 9fbc2965e6..b712b7b85e 100644 --- a/services/libs/redis/src/rateLimiter.ts +++ b/services/libs/redis/src/rateLimiter.ts @@ -1,4 +1,5 @@ -import { ICache, IRateLimiter, RateLimitError } from '@crowd/types' +import { ICache, IRateLimiter, RateLimitError, IConcurrentRequestLimiter } from '@crowd/types' +import { timeout } from '@crowd/common' export class RateLimiter implements IRateLimiter { constructor( @@ -28,3 +29,56 @@ export class RateLimiter implements IRateLimiter { await this.cache.increment(this.counterKey, 1, this.timeWindowSeconds) } } + +export class ConcurrentRequestLimiter implements IConcurrentRequestLimiter { + constructor( + private readonly cache: ICache, + private readonly maxConcurrentRequests: number, + private readonly requestKey: string, + ) { + this.cache = cache + this.maxConcurrentRequests = maxConcurrentRequests + this.requestKey = requestKey + } + + public async checkConcurrentRequestLimit(integrationId: string, retries = 5, sleepTimeMs = 1000) { + const key = this.getRequestKey(integrationId) + const value = await this.cache.get(key) + const currentRequests = value === null ? 0 : parseInt(value) + const canMakeRequest = currentRequests < this.maxConcurrentRequests + + if (!canMakeRequest) { + if (retries > 0) { + await timeout(sleepTimeMs) + return this.checkConcurrentRequestLimit(integrationId, retries - 1, sleepTimeMs) + } else { + throw new Error(`Too many concurrent requests for integration ${integrationId}`) + } + } + } + + public async incrementConcurrentRequest(integrationId: string) { + const key = this.getRequestKey(integrationId) + await this.cache.increment(key, 1) + } + + public async decrementConcurrentRequest(integrationId: string) { + const key = this.getRequestKey(integrationId) + await this.cache.decrement(key, 1) + } + + public async processWithLimit(integrationId: string, func: () => Promise): Promise { + await this.checkConcurrentRequestLimit(integrationId) + await this.incrementConcurrentRequest(integrationId) + + try { + return await func() + } finally { + await this.decrementConcurrentRequest(integrationId) + } + } + + private getRequestKey(integrationId: string) { + return `${this.requestKey}:${integrationId}` + } +} diff --git a/services/libs/types/src/caching.ts b/services/libs/types/src/caching.ts index dc4066a139..d5cc759584 100644 --- a/services/libs/types/src/caching.ts +++ b/services/libs/types/src/caching.ts @@ -3,9 +3,17 @@ export interface ICache { set(key: string, value: string, ttlSeconds: number): Promise delete(key: string): Promise increment(key: string, incrementBy?: number, ttlSeconds?: number): Promise + decrement(key: string, decrementBy?: number, ttlSeconds?: number): Promise } export interface IRateLimiter { checkRateLimit(endpoint: string): Promise incrementRateLimit(): Promise } + +export interface IConcurrentRequestLimiter { + checkConcurrentRequestLimit(integrationId: string): Promise + incrementConcurrentRequest(integrationId: string): Promise + decrementConcurrentRequest(integrationId: string): Promise + processWithLimit(integrationId: string, func: () => Promise): Promise +}