Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
import { graphql } from '@octokit/graphql'
import { GraphQlQueryResponseData } from '@octokit/graphql/dist-types/types'
import { GraphQlQueryResponse } from '@crowd/types'
import { RateLimitError } from '@crowd/types'
import { RateLimitError, IConcurrentRequestLimiter } from '@crowd/types'

interface Limiter {
integrationId: string
concurrentRequestLimiter: IConcurrentRequestLimiter
}

class BaseQuery {
static BASE_URL = 'https://api.github.com/graphql'
Expand Down Expand Up @@ -84,14 +89,24 @@ class BaseQuery {
* @param beforeCursor Cursor to paginate records before it
* @returns parsed graphQl result
*/
async getSinglePage(beforeCursor: string): Promise<GraphQlQueryResponse> {
async getSinglePage(beforeCursor: string, limiter?: Limiter): Promise<GraphQlQueryResponse> {
const paginatedQuery = BaseQuery.interpolate(this.query, {
beforeCursor: BaseQuery.getPagination(beforeCursor),
})

try {
const result = await this.graphQL(paginatedQuery)
return this.getEventData(result)
if (limiter) {
return limiter.concurrentRequestLimiter.processWithLimit(
limiter.integrationId,
async () => {
const result = await this.graphQL(paginatedQuery)
return this.getEventData(result)
},
)
} else {
const result = await this.graphQL(paginatedQuery)
return this.getEventData(result)
}
} catch (err) {
throw BaseQuery.processGraphQLError(err)
}
Expand Down
78 changes: 65 additions & 13 deletions services/libs/integrations/src/integrations/github/processStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ import {
Repo,
Repos,
} from './types'
import { ConcurrentRequestLimiter } from '@crowd/redis'

const IS_TEST_ENV: boolean = process.env.NODE_ENV === 'test'

let githubAuthenticator: AuthInterface | undefined = undefined
let concurrentRequestLimiter: ConcurrentRequestLimiter | undefined = undefined

function getAuth(ctx: IProcessStreamContext): AuthInterface | undefined {
const GITHUB_CONFIG = ctx.platformSettings as GithubPlatformSettings
Expand All @@ -60,6 +62,17 @@ function getAuth(ctx: IProcessStreamContext): AuthInterface | undefined {
return githubAuthenticator
}

export function getConcurrentRequestLimiter(ctx: IProcessStreamContext): ConcurrentRequestLimiter {
if (concurrentRequestLimiter === undefined) {
concurrentRequestLimiter = new ConcurrentRequestLimiter(
ctx.cache,
2, // max 2 concurrent requests
'github-concurrent-request-limiter',
)
}
return concurrentRequestLimiter
}

// const getTokenFromCache = async (ctx: IProcessStreamContext) => {
// const key = 'github-token-cache'
// const cache = ctx.integrationCache // this cache is tied up with integrationId
Expand Down Expand Up @@ -219,7 +232,10 @@ const processRootStream: ProcessStreamHandler = async (ctx) => {
try {
// we don't need to get default 100 item per page, just 1 is enough to check if repo is available
const stargazersQuery = new StargazersQuery(repo, ctx.integration.token, 1)
await stargazersQuery.getSinglePage('')
await stargazersQuery.getSinglePage('', {
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
integrationId: ctx.integration.id,
})
repos.push(repo)
} catch (e) {
if (e.rateLimitResetSeconds) {
Expand Down Expand Up @@ -263,7 +279,10 @@ const processRootStream: ProcessStreamHandler = async (ctx) => {
const processStargazersStream: ProcessStreamHandler = async (ctx) => {
const data = ctx.stream.data as GithubBasicStream
const stargazersQuery = new StargazersQuery(data.repo, ctx.integration.token)
const result = await stargazersQuery.getSinglePage(data.page)
const result = await stargazersQuery.getSinglePage(data.page, {
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
integrationId: ctx.integration.id,
})
result.data = result.data.filter((i) => (i as any).node?.login)

// handle next page
Expand All @@ -285,7 +304,10 @@ const processStargazersStream: ProcessStreamHandler = async (ctx) => {
const processForksStream: ProcessStreamHandler = async (ctx) => {
const data = ctx.stream.data as GithubBasicStream
const forksQuery = new ForksQuery(data.repo, ctx.integration.token)
const result = await forksQuery.getSinglePage(data.page)
const result = await forksQuery.getSinglePage(data.page, {
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
integrationId: ctx.integration.id,
})

// filter out activities without authors (such as bots) -- may not the case for forks, but filter out anyway
result.data = result.data.filter((i) => (i as any).owner?.login)
Expand All @@ -309,7 +331,10 @@ const processForksStream: ProcessStreamHandler = async (ctx) => {
const processPullsStream: ProcessStreamHandler = async (ctx) => {
const data = ctx.stream.data as GithubBasicStream
const forksQuery = new PullRequestsQuery(data.repo, ctx.integration.token)
const result = await forksQuery.getSinglePage(data.page)
const result = await forksQuery.getSinglePage(data.page, {
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
integrationId: ctx.integration.id,
})

// filter out activities without authors (such as bots)
result.data = result.data.filter((i) => (i as any).author?.login)
Expand Down Expand Up @@ -484,7 +509,10 @@ const processPullCommentsStream: ProcessStreamHandler = async (ctx) => {
ctx.integration.token,
)

const result = await pullRequestCommentsQuery.getSinglePage(data.page)
const result = await pullRequestCommentsQuery.getSinglePage(data.page, {
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
integrationId: ctx.integration.id,
})
result.data = result.data.filter((i) => (i as any).author?.login)

// handle next page
Expand Down Expand Up @@ -513,7 +541,10 @@ const processPullReviewThreadsStream: ProcessStreamHandler = async (ctx) => {
ctx.integration.token,
)

const result = await pullRequestReviewThreadsQuery.getSinglePage(data.page)
const result = await pullRequestReviewThreadsQuery.getSinglePage(data.page, {
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
integrationId: ctx.integration.id,
})

// handle next page
await publishNextPageStream(ctx, result)
Expand Down Expand Up @@ -541,7 +572,10 @@ const processPullReviewThreadCommentsStream: ProcessStreamHandler = async (ctx)
ctx.integration.token,
)

const result = await pullRequestReviewThreadCommentsQuery.getSinglePage(data.page)
const result = await pullRequestReviewThreadCommentsQuery.getSinglePage(data.page, {
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
integrationId: ctx.integration.id,
})

// filter out activities without authors (such as bots)
result.data = result.data.filter((i) => (i as any).author?.login)
Expand Down Expand Up @@ -574,7 +608,10 @@ export const processPullCommitsStream: ProcessStreamHandler = async (ctx) => {
const pullRequestCommitsQuery = new PullRequestCommitsQuery(data.repo, pullRequestNumber, token)

try {
result = await pullRequestCommitsQuery.getSinglePage(data.page)
result = await pullRequestCommitsQuery.getSinglePage(data.page, {
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
integrationId: ctx.integration.id,
})
} catch (err) {
ctx.log.warn(
{
Expand All @@ -589,7 +626,10 @@ export const processPullCommitsStream: ProcessStreamHandler = async (ctx) => {
pullRequestNumber,
ctx.integration.token,
)
result = await pullRequestCommitsQueryNoAdditions.getSinglePage(data.page)
result = await pullRequestCommitsQueryNoAdditions.getSinglePage(data.page, {
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
integrationId: ctx.integration.id,
})
}

// handle next page
Expand Down Expand Up @@ -624,7 +664,10 @@ export const processPullCommitsStream: ProcessStreamHandler = async (ctx) => {
const processIssuesStream: ProcessStreamHandler = async (ctx) => {
const data = ctx.stream.data as GithubBasicStream
const issuesQuery = new IssuesQuery(data.repo, ctx.integration.token)
const result = await issuesQuery.getSinglePage(data.page)
const result = await issuesQuery.getSinglePage(data.page, {
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
integrationId: ctx.integration.id,
})

// filter out activities without authors (such as bots)
result.data = result.data.filter((i) => (i as any).author?.login)
Expand Down Expand Up @@ -683,7 +726,10 @@ const processIssueCommentsStream: ProcessStreamHandler = async (ctx) => {
const data = ctx.stream.data as GithubBasicStream
const issueNumber = data.issueNumber
const issueCommentsQuery = new IssueCommentsQuery(data.repo, issueNumber, ctx.integration.token)
const result = await issueCommentsQuery.getSinglePage(data.page)
const result = await issueCommentsQuery.getSinglePage(data.page, {
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
integrationId: ctx.integration.id,
})
result.data = result.data.filter((i) => (i as any).author?.login)

// handle next page
Expand All @@ -706,7 +752,10 @@ const processIssueCommentsStream: ProcessStreamHandler = async (ctx) => {
const processDiscussionsStream: ProcessStreamHandler = async (ctx) => {
const data = ctx.stream.data as GithubBasicStream
const discussionsQuery = new DiscussionsQuery(data.repo, ctx.integration.token)
const result = await discussionsQuery.getSinglePage(data.page)
const result = await discussionsQuery.getSinglePage(data.page, {
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
integrationId: ctx.integration.id,
})

result.data = result.data.filter((i) => (i as any).author?.login)

Expand Down Expand Up @@ -746,7 +795,10 @@ const processDiscussionCommentsStream: ProcessStreamHandler = async (ctx) => {
data.discussionNumber,
ctx.integration.token,
)
const result = await discussionCommentsQuery.getSinglePage(data.page)
const result = await discussionCommentsQuery.getSinglePage(data.page, {
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
integrationId: ctx.integration.id,
})
result.data = result.data.filter((i) => (i as any).author?.login)

// handle next page
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ import getMember from './api/graphql/members'
import { prepareMember } from './processStream'
import TeamsQuery from './api/graphql/teams'
import { GithubWebhookTeam } from './api/graphql/types'
import { processPullCommitsStream, getGithubToken } from './processStream'
import {
processPullCommitsStream,
getGithubToken,
getConcurrentRequestLimiter,
} from './processStream'

const IS_TEST_ENV: boolean = process.env.NODE_ENV === 'test'

Expand Down Expand Up @@ -198,7 +202,10 @@ const parseWebhookPullRequest = async (payload: any, ctx: IProcessWebhookStreamC
// a team sent as reviewer, first we need to find members in this team
const team: GithubWebhookTeam = payload.requested_team
const token = await getGithubToken(ctx as IProcessStreamContext)
const teamMembers = await new TeamsQuery(team.node_id, token).getSinglePage('')
const teamMembers = await new TeamsQuery(team.node_id, token).getSinglePage('', {
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx as IProcessStreamContext),
integrationId: ctx.integration.id,
})

for (const teamMember of teamMembers.data) {
await parseWebhookPullRequestEvents({ ...payload, requested_reviewer: teamMember }, ctx)
Expand Down
16 changes: 16 additions & 0 deletions services/libs/redis/src/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,22 @@ export class RedisCache extends LoggerBase implements ICache {
return result
}

async decrement(key: string, decrementBy = 1, ttlSeconds?: number): Promise<number> {
const actualKey = this.prefixer(key)

if (ttlSeconds !== undefined) {
const [decrResult] = await this.client
.multi()
.decrBy(actualKey, decrementBy)
.expire(actualKey, ttlSeconds)
.exec()
return decrResult as number
}

const result = await this.client.decrBy(actualKey, decrementBy)
return result
}

public setIfNotExistsAlready(key: string, value: string): Promise<boolean> {
const actualKey = this.prefixer(key)
return this.client.setNX(actualKey, value)
Expand Down
54 changes: 54 additions & 0 deletions services/libs/redis/src/rateLimiter.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { ICache, IRateLimiter, RateLimitError } from '@crowd/types'
import { timeout } from '@crowd/common'

export class RateLimiter implements IRateLimiter {
constructor(
Expand Down Expand Up @@ -28,3 +29,56 @@ export class RateLimiter implements IRateLimiter {
await this.cache.increment(this.counterKey, 1, this.timeWindowSeconds)
}
}

export class ConcurrentRequestLimiter {
constructor(
private readonly cache: ICache,
private readonly maxConcurrentRequests: number,
private readonly requestKey: string,
) {
this.cache = cache
this.maxConcurrentRequests = maxConcurrentRequests
this.requestKey = requestKey
}

public async checkConcurrentRequestLimit(integrationId: string, retries = 5, sleepTimeMs = 1000) {
const key = this.getRequestKey(integrationId)
const value = await this.cache.get(key)
const currentRequests = value === null ? 0 : parseInt(value)
const canMakeRequest = currentRequests < this.maxConcurrentRequests

if (!canMakeRequest) {
if (retries > 0) {
await timeout(sleepTimeMs)
return this.checkConcurrentRequestLimit(integrationId, retries - 1, sleepTimeMs)
} else {
throw new Error(`Too many concurrent requests for integration ${integrationId}`)
}
}
}

public async incrementConcurrentRequest(integrationId: string) {
const key = this.getRequestKey(integrationId)
await this.cache.increment(key, 1)
}

public async decrementConcurrentRequest(integrationId: string) {
const key = this.getRequestKey(integrationId)
await this.cache.decrement(key, 1)
}

public async processWithLimit<T>(integrationId: string, func: () => Promise<T>): Promise<T> {
await this.checkConcurrentRequestLimit(integrationId)
await this.incrementConcurrentRequest(integrationId)

try {
return await func()
} finally {
await this.decrementConcurrentRequest(integrationId)
}
}

private getRequestKey(integrationId: string) {
return `${this.requestKey}:${integrationId}`
}
}
8 changes: 8 additions & 0 deletions services/libs/types/src/caching.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,17 @@ export interface ICache {
set(key: string, value: string, ttlSeconds: number): Promise<void>
delete(key: string): Promise<number>
increment(key: string, incrementBy?: number, ttlSeconds?: number): Promise<number>
decrement(key: string, decrementBy?: number, ttlSeconds?: number): Promise<number>
}

export interface IRateLimiter {
checkRateLimit(endpoint: string): Promise<void>
incrementRateLimit(): Promise<void>
}

export interface IConcurrentRequestLimiter {
checkConcurrentRequestLimit(integrationId: string): Promise<void>
incrementConcurrentRequest(integrationId: string): Promise<void>
decrementConcurrentRequest(integrationId: string): Promise<void>
processWithLimit<T>(integrationId: string, func: () => Promise<T>): Promise<T>
}