Skip to content

Commit

Permalink
Merge branch 'feature/organizations-merge-suggestions' of github.com:…
Browse files Browse the repository at this point in the history
…CrowdDotDev/crowd.dev into feature/organizations-merge-suggestions
  • Loading branch information
epipav committed Oct 2, 2023
2 parents 7561ddf + cbaef7b commit 85def5b
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
IProcessWebhookStreamContext,
} from '@crowd/integrations'
import { Logger, LoggerBase, getChildLogger } from '@crowd/logging'
import { RedisCache, RedisClient, RateLimiter } from '@crowd/redis'
import { RedisCache, RedisClient, RateLimiter, ConcurrentRequestLimiter } from '@crowd/redis'
import {
IntegrationDataWorkerEmitter,
IntegrationRunWorkerEmitter,
Expand Down Expand Up @@ -349,6 +349,9 @@ export default class IntegrationStreamService extends LoggerBase {
getRateLimiter: (maxRequests: number, timeWindowSeconds: number, counterKey: string) => {
return new RateLimiter(globalCache, maxRequests, timeWindowSeconds, counterKey)
},
getConcurrentRequestLimiter: (maxConcurrentRequests: number, counterKey: string) => {
return new ConcurrentRequestLimiter(globalCache, maxConcurrentRequests, counterKey)
},
}

this.log.debug('Processing webhook stream!')
Expand Down Expand Up @@ -520,6 +523,9 @@ export default class IntegrationStreamService extends LoggerBase {
getRateLimiter: (maxRequests: number, timeWindowSeconds: number, counterKey: string) => {
return new RateLimiter(globalCache, maxRequests, timeWindowSeconds, counterKey)
},
getConcurrentRequestLimiter: (maxConcurrentRequests: number, counterKey: string) => {
return new ConcurrentRequestLimiter(globalCache, maxConcurrentRequests, counterKey)
},
}

this.log.debug('Processing stream!')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
import { graphql } from '@octokit/graphql'
import { GraphQlQueryResponseData } from '@octokit/graphql/dist-types/types'
import { GraphQlQueryResponse } from '@crowd/types'
import { RateLimitError } from '@crowd/types'
import { RateLimitError, IConcurrentRequestLimiter } from '@crowd/types'

interface Limiter {
integrationId: string
concurrentRequestLimiter: IConcurrentRequestLimiter
}

class BaseQuery {
static BASE_URL = 'https://api.github.com/graphql'
Expand Down Expand Up @@ -84,14 +89,24 @@ class BaseQuery {
* @param beforeCursor Cursor to paginate records before it
* @returns parsed graphQl result
*/
async getSinglePage(beforeCursor: string): Promise<GraphQlQueryResponse> {
async getSinglePage(beforeCursor: string, limiter?: Limiter): Promise<GraphQlQueryResponse> {
const paginatedQuery = BaseQuery.interpolate(this.query, {
beforeCursor: BaseQuery.getPagination(beforeCursor),
})

try {
const result = await this.graphQL(paginatedQuery)
return this.getEventData(result)
if (limiter) {
return limiter.concurrentRequestLimiter.processWithLimit(
limiter.integrationId,
async () => {
const result = await this.graphQL(paginatedQuery)
return this.getEventData(result)
},
)
} else {
const result = await this.graphQL(paginatedQuery)
return this.getEventData(result)
}
} catch (err) {
throw BaseQuery.processGraphQLError(err)
}
Expand Down
86 changes: 71 additions & 15 deletions services/libs/integrations/src/integrations/github/processStream.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
// processStream.ts content
import { singleOrDefault, timeout } from '@crowd/common'
import { GraphQlQueryResponse } from '@crowd/types'
import { GraphQlQueryResponse, IConcurrentRequestLimiter } from '@crowd/types'
import { createAppAuth } from '@octokit/auth-app'
import { AuthInterface } from '@octokit/auth-app/dist-types/types'
import { IProcessStreamContext, ProcessStreamHandler } from '../../types'
import {
IProcessStreamContext,
ProcessStreamHandler,
IProcessWebhookStreamContext,
} from '../../types'
import DiscussionCommentsQuery from './api/graphql/discussionComments'
import DiscussionsQuery from './api/graphql/discussions'
import ForksQuery from './api/graphql/forks'
Expand Down Expand Up @@ -40,6 +44,7 @@ import {
const IS_TEST_ENV: boolean = process.env.NODE_ENV === 'test'

let githubAuthenticator: AuthInterface | undefined = undefined
let concurrentRequestLimiter: IConcurrentRequestLimiter | undefined = undefined

function getAuth(ctx: IProcessStreamContext): AuthInterface | undefined {
const GITHUB_CONFIG = ctx.platformSettings as GithubPlatformSettings
Expand All @@ -60,6 +65,18 @@ function getAuth(ctx: IProcessStreamContext): AuthInterface | undefined {
return githubAuthenticator
}

export function getConcurrentRequestLimiter(
ctx: IProcessStreamContext | IProcessWebhookStreamContext,
): IConcurrentRequestLimiter {
if (concurrentRequestLimiter === undefined) {
concurrentRequestLimiter = ctx.getConcurrentRequestLimiter(
2, // max 2 concurrent requests
'github-concurrent-request-limiter',
)
}
return concurrentRequestLimiter
}

// const getTokenFromCache = async (ctx: IProcessStreamContext) => {
// const key = 'github-token-cache'
// const cache = ctx.integrationCache // this cache is tied up with integrationId
Expand Down Expand Up @@ -219,7 +236,10 @@ const processRootStream: ProcessStreamHandler = async (ctx) => {
try {
// we don't need to get default 100 item per page, just 1 is enough to check if repo is available
const stargazersQuery = new StargazersQuery(repo, ctx.integration.token, 1)
await stargazersQuery.getSinglePage('')
await stargazersQuery.getSinglePage('', {
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
integrationId: ctx.integration.id,
})
repos.push(repo)
} catch (e) {
if (e.rateLimitResetSeconds) {
Expand Down Expand Up @@ -263,7 +283,10 @@ const processRootStream: ProcessStreamHandler = async (ctx) => {
const processStargazersStream: ProcessStreamHandler = async (ctx) => {
const data = ctx.stream.data as GithubBasicStream
const stargazersQuery = new StargazersQuery(data.repo, ctx.integration.token)
const result = await stargazersQuery.getSinglePage(data.page)
const result = await stargazersQuery.getSinglePage(data.page, {
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
integrationId: ctx.integration.id,
})
result.data = result.data.filter((i) => (i as any).node?.login)

// handle next page
Expand All @@ -285,7 +308,10 @@ const processStargazersStream: ProcessStreamHandler = async (ctx) => {
const processForksStream: ProcessStreamHandler = async (ctx) => {
const data = ctx.stream.data as GithubBasicStream
const forksQuery = new ForksQuery(data.repo, ctx.integration.token)
const result = await forksQuery.getSinglePage(data.page)
const result = await forksQuery.getSinglePage(data.page, {
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
integrationId: ctx.integration.id,
})

// filter out activities without authors (such as bots) -- may not the case for forks, but filter out anyway
result.data = result.data.filter((i) => (i as any).owner?.login)
Expand All @@ -309,7 +335,10 @@ const processForksStream: ProcessStreamHandler = async (ctx) => {
const processPullsStream: ProcessStreamHandler = async (ctx) => {
const data = ctx.stream.data as GithubBasicStream
const forksQuery = new PullRequestsQuery(data.repo, ctx.integration.token)
const result = await forksQuery.getSinglePage(data.page)
const result = await forksQuery.getSinglePage(data.page, {
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
integrationId: ctx.integration.id,
})

// filter out activities without authors (such as bots)
result.data = result.data.filter((i) => (i as any).author?.login)
Expand Down Expand Up @@ -484,7 +513,10 @@ const processPullCommentsStream: ProcessStreamHandler = async (ctx) => {
ctx.integration.token,
)

const result = await pullRequestCommentsQuery.getSinglePage(data.page)
const result = await pullRequestCommentsQuery.getSinglePage(data.page, {
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
integrationId: ctx.integration.id,
})
result.data = result.data.filter((i) => (i as any).author?.login)

// handle next page
Expand Down Expand Up @@ -513,7 +545,10 @@ const processPullReviewThreadsStream: ProcessStreamHandler = async (ctx) => {
ctx.integration.token,
)

const result = await pullRequestReviewThreadsQuery.getSinglePage(data.page)
const result = await pullRequestReviewThreadsQuery.getSinglePage(data.page, {
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
integrationId: ctx.integration.id,
})

// handle next page
await publishNextPageStream(ctx, result)
Expand Down Expand Up @@ -541,7 +576,10 @@ const processPullReviewThreadCommentsStream: ProcessStreamHandler = async (ctx)
ctx.integration.token,
)

const result = await pullRequestReviewThreadCommentsQuery.getSinglePage(data.page)
const result = await pullRequestReviewThreadCommentsQuery.getSinglePage(data.page, {
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
integrationId: ctx.integration.id,
})

// filter out activities without authors (such as bots)
result.data = result.data.filter((i) => (i as any).author?.login)
Expand Down Expand Up @@ -574,7 +612,10 @@ export const processPullCommitsStream: ProcessStreamHandler = async (ctx) => {
const pullRequestCommitsQuery = new PullRequestCommitsQuery(data.repo, pullRequestNumber, token)

try {
result = await pullRequestCommitsQuery.getSinglePage(data.page)
result = await pullRequestCommitsQuery.getSinglePage(data.page, {
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
integrationId: ctx.integration.id,
})
} catch (err) {
ctx.log.warn(
{
Expand All @@ -589,7 +630,10 @@ export const processPullCommitsStream: ProcessStreamHandler = async (ctx) => {
pullRequestNumber,
ctx.integration.token,
)
result = await pullRequestCommitsQueryNoAdditions.getSinglePage(data.page)
result = await pullRequestCommitsQueryNoAdditions.getSinglePage(data.page, {
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
integrationId: ctx.integration.id,
})
}

// handle next page
Expand Down Expand Up @@ -624,7 +668,10 @@ export const processPullCommitsStream: ProcessStreamHandler = async (ctx) => {
const processIssuesStream: ProcessStreamHandler = async (ctx) => {
const data = ctx.stream.data as GithubBasicStream
const issuesQuery = new IssuesQuery(data.repo, ctx.integration.token)
const result = await issuesQuery.getSinglePage(data.page)
const result = await issuesQuery.getSinglePage(data.page, {
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
integrationId: ctx.integration.id,
})

// filter out activities without authors (such as bots)
result.data = result.data.filter((i) => (i as any).author?.login)
Expand Down Expand Up @@ -683,7 +730,10 @@ const processIssueCommentsStream: ProcessStreamHandler = async (ctx) => {
const data = ctx.stream.data as GithubBasicStream
const issueNumber = data.issueNumber
const issueCommentsQuery = new IssueCommentsQuery(data.repo, issueNumber, ctx.integration.token)
const result = await issueCommentsQuery.getSinglePage(data.page)
const result = await issueCommentsQuery.getSinglePage(data.page, {
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
integrationId: ctx.integration.id,
})
result.data = result.data.filter((i) => (i as any).author?.login)

// handle next page
Expand All @@ -706,7 +756,10 @@ const processIssueCommentsStream: ProcessStreamHandler = async (ctx) => {
const processDiscussionsStream: ProcessStreamHandler = async (ctx) => {
const data = ctx.stream.data as GithubBasicStream
const discussionsQuery = new DiscussionsQuery(data.repo, ctx.integration.token)
const result = await discussionsQuery.getSinglePage(data.page)
const result = await discussionsQuery.getSinglePage(data.page, {
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
integrationId: ctx.integration.id,
})

result.data = result.data.filter((i) => (i as any).author?.login)

Expand Down Expand Up @@ -746,7 +799,10 @@ const processDiscussionCommentsStream: ProcessStreamHandler = async (ctx) => {
data.discussionNumber,
ctx.integration.token,
)
const result = await discussionCommentsQuery.getSinglePage(data.page)
const result = await discussionCommentsQuery.getSinglePage(data.page, {
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
integrationId: ctx.integration.id,
})
result.data = result.data.filter((i) => (i as any).author?.login)

// handle next page
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ import getMember from './api/graphql/members'
import { prepareMember } from './processStream'
import TeamsQuery from './api/graphql/teams'
import { GithubWebhookTeam } from './api/graphql/types'
import { processPullCommitsStream, getGithubToken } from './processStream'
import {
processPullCommitsStream,
getGithubToken,
getConcurrentRequestLimiter,
} from './processStream'

const IS_TEST_ENV: boolean = process.env.NODE_ENV === 'test'

Expand Down Expand Up @@ -198,7 +202,10 @@ const parseWebhookPullRequest = async (payload: any, ctx: IProcessWebhookStreamC
// a team sent as reviewer, first we need to find members in this team
const team: GithubWebhookTeam = payload.requested_team
const token = await getGithubToken(ctx as IProcessStreamContext)
const teamMembers = await new TeamsQuery(team.node_id, token).getSinglePage('')
const teamMembers = await new TeamsQuery(team.node_id, token).getSinglePage('', {
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
integrationId: ctx.integration.id,
})

for (const teamMember of teamMembers.data) {
await parseWebhookPullRequestEvents({ ...payload, requested_reviewer: teamMember }, ctx)
Expand Down
16 changes: 15 additions & 1 deletion services/libs/integrations/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@ import {
IAutomation,
} from '@crowd/types'
import { Logger } from '@crowd/logging'
import { ICache, IIntegration, IIntegrationStream, IRateLimiter } from '@crowd/types'
import {
ICache,
IIntegration,
IIntegrationStream,
IRateLimiter,
IConcurrentRequestLimiter,
} from '@crowd/types'

import { IntegrationSyncWorkerEmitter } from '@crowd/sqs'
import { IBatchOperationResult } from './integrations/premium/hubspot/api/types'
Expand Down Expand Up @@ -71,6 +77,10 @@ export interface IProcessStreamContext extends IIntegrationContext {
integrationCache: ICache

getRateLimiter: (maxRequests: number, timeWindowSeconds: number, cacheKey: string) => IRateLimiter
getConcurrentRequestLimiter: (
maxConcurrentRequests: number,
cacheKey: string,
) => IConcurrentRequestLimiter
}

export interface IProcessWebhookStreamContext {
Expand Down Expand Up @@ -99,6 +109,10 @@ export interface IProcessWebhookStreamContext {
integrationCache: ICache

getRateLimiter: (maxRequests: number, timeWindowSeconds: number, cacheKey: string) => IRateLimiter
getConcurrentRequestLimiter: (
maxConcurrentRequests: number,
cacheKey: string,
) => IConcurrentRequestLimiter
}

export interface IProcessDataContext extends IIntegrationContext {
Expand Down
16 changes: 16 additions & 0 deletions services/libs/redis/src/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,22 @@ export class RedisCache extends LoggerBase implements ICache {
return result
}

async decrement(key: string, decrementBy = 1, ttlSeconds?: number): Promise<number> {
const actualKey = this.prefixer(key)

if (ttlSeconds !== undefined) {
const [decrResult] = await this.client
.multi()
.decrBy(actualKey, decrementBy)
.expire(actualKey, ttlSeconds)
.exec()
return decrResult as number
}

const result = await this.client.decrBy(actualKey, decrementBy)
return result
}

public setIfNotExistsAlready(key: string, value: string): Promise<boolean> {
const actualKey = this.prefixer(key)
return this.client.setNX(actualKey, value)
Expand Down
Loading

0 comments on commit 85def5b

Please sign in to comment.