Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { AxiosError, AxiosRequestConfig } from 'axios'
import { RateLimitError } from '@crowd/types'
import { IProcessStreamContext } from '../../../types'

export const handleRedditError = (
err: AxiosError,
config: AxiosRequestConfig,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
input: any,
ctx: IProcessStreamContext,
) => {
const logger = ctx.log

let url = config.url
if (config.params) {
const queryParams: string[] = []
for (const [key, value] of Object.entries(config.params)) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
queryParams.push(`${key}=${encodeURIComponent(value as any)}`)
}

url = `${config.url}?${queryParams.join('&')}`
}

if (err && err.response && err.response.status === 429) {
logger.warn('Reddit API rate limit exceeded')
let rateLimitResetSeconds = 60

if (err.response.headers['x-ratelimit-reset']) {
rateLimitResetSeconds = parseInt(err.response.headers['x-ratelimit-reset'], 10)
}

return new RateLimitError(rateLimitResetSeconds, url, err)
}

logger.error(err, { input }, `Error while calling Reddit API URL: ${url}`)
return err
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { IProcessStreamContext } from '../../../types'
import { PlatformType } from '@crowd/types'
import { RedditGetCommentsInput, RedditCommentsResponse } from '../types'
import { timeout } from '@crowd/common'
import { getRateLimiter } from './handleRateLimit'
import { handleRedditError } from './errorHandler'

/**
* Get the comment tree of a post.
Expand All @@ -16,22 +16,18 @@ async function getComments(
input: RedditGetCommentsInput,
ctx: IProcessStreamContext,
): Promise<RedditCommentsResponse> {
let config: AxiosRequestConfig
try {
const rateLimiter = getRateLimiter(ctx)

ctx.log.info({ message: 'Fetching comments from a post in a sub-reddit', input })

// Wait for 1.5s for rate limits.
// eslint-disable-next-line no-promise-executor-return
await timeout(1500)

// Check if we can make a request - if not, it will throw a RateLimitError
await rateLimiter.checkRateLimit('getComments')

// Gett an access token from Nango
const accessToken = await getNangoToken(input.nangoId, PlatformType.REDDIT, ctx)

const config: AxiosRequestConfig = {
config = {
method: 'get',
url: `http://oauth.reddit.com/r/${input.subreddit}/comments/${input.postId}.json`,
params: {
Expand All @@ -42,14 +38,12 @@ async function getComments(
},
}

// we are going to make a request, so increment the rate limit
await rateLimiter.incrementRateLimit()

const response: RedditCommentsResponse = (await axios(config)).data
return response
} catch (err) {
ctx.log.error({ err, input }, 'Error while getting comments in subreddit')
throw err
const newErr = handleRedditError(err, config, input, ctx)
throw newErr
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { IProcessStreamContext } from '../../../types'
import { PlatformType } from '@crowd/types'
import { RedditMoreCommentsInput, RedditMoreCommentsResponse } from '../types'
import { timeout } from '@crowd/common'
import { getRateLimiter } from './handleRateLimit'
import { handleRedditError } from './errorHandler'

/**
* Expand a list of comment IDs into a comment tree.
Expand All @@ -17,22 +17,18 @@ async function getMoreComments(
input: RedditMoreCommentsInput,
ctx: IProcessStreamContext,
): Promise<RedditMoreCommentsResponse> {
let config: AxiosRequestConfig
try {
const rateLimiter = getRateLimiter(ctx)

ctx.log.info({ message: 'Fetching more comments from a sub-reddit', input })

// Wait for 1.5s for rate limits.
// eslint-disable-next-line no-promise-executor-return
await timeout(1500)

// Check if we can make a request - if not, it will throw a RateLimitError
await rateLimiter.checkRateLimit('getMoreComments')

// Gett an access token from Nango
const accessToken = await getNangoToken(input.nangoId, PlatformType.REDDIT, ctx)

const config: AxiosRequestConfig = {
config = {
method: 'get',
url: `http://oauth.reddit.com/api/morechildren?api_type=json`,
params: {
Expand All @@ -45,14 +41,12 @@ async function getMoreComments(
},
}

// we are going to make a request, so increment the rate limit
await rateLimiter.incrementRateLimit()

const response: RedditMoreCommentsResponse = (await axios(config)).data
return response
} catch (err) {
ctx.log.error({ err, input }, 'Error while getting more comments in subreddit')
throw err
const newErr = handleRedditError(err, config, input, ctx)
throw newErr
}
}

Expand Down
16 changes: 5 additions & 11 deletions services/libs/integrations/src/integrations/reddit/api/getPosts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { IProcessStreamContext } from '../../../types'
import { PlatformType } from '@crowd/types'
import { RedditGetPostsInput, RedditPostsResponse, REDDIT_MAX_RETROSPECT_IN_HOURS } from '../types'
import { timeout } from '@crowd/common'
import { getRateLimiter } from './handleRateLimit'
import { handleRedditError } from './errorHandler'

/**
* Get paginated posts from a subreddit
Expand All @@ -16,22 +16,18 @@ async function getPosts(
input: RedditGetPostsInput,
ctx: IProcessStreamContext,
): Promise<RedditPostsResponse> {
let config: AxiosRequestConfig
try {
const rateLimiter = getRateLimiter(ctx)

ctx.log.info({ message: 'Fetching posts from a sub-reddit', input })

// Wait for 1.5s for rate limits.
// eslint-disable-next-line no-promise-executor-return
await timeout(1500)

// Check if we can make a request - if not, it will throw a RateLimitError
await rateLimiter.checkRateLimit('getPosts')

// Gett an access token from Nango
const accessToken = await getNangoToken(input.nangoId, PlatformType.REDDIT, ctx)

const config: AxiosRequestConfig = {
config = {
method: 'get',
url: `http://oauth.reddit.com/r/${input.subreddit}/new.json`,
params: {
Expand All @@ -47,9 +43,6 @@ async function getPosts(
config.params.after = input.after
}

// we are going to make a request, so increment the rate limit
await rateLimiter.incrementRateLimit()

const response: RedditPostsResponse = (await axios(config)).data

// If ctx.onboarding is false, check the last post's date
Expand All @@ -69,7 +62,8 @@ async function getPosts(
return response
} catch (err) {
ctx.log.error({ err, input }, 'Error while getting posts in subreddit')
throw err
const newErr = handleRedditError(err, config, input, ctx)
throw newErr
}
}

Expand Down