diff --git a/backend/src/database/migrations/U1692786381__unique-activity-index.sql b/backend/src/database/migrations/U1692786381__unique-activity-index.sql new file mode 100644 index 0000000000..1c5fa7cd74 --- /dev/null +++ b/backend/src/database/migrations/U1692786381__unique-activity-index.sql @@ -0,0 +1 @@ +DROP INDEX CONCURRENTLY IF EXISTS activities_tenant_segment_source_id_idx; diff --git a/backend/src/database/migrations/U1692800745__index-subprojects-by-tenant.sql b/backend/src/database/migrations/U1692800745__index-subprojects-by-tenant.sql new file mode 100644 index 0000000000..7fca8ad6b2 --- /dev/null +++ b/backend/src/database/migrations/U1692800745__index-subprojects-by-tenant.sql @@ -0,0 +1 @@ +DROP INDEX IF EXISTS segments_tenant_subprojects; diff --git a/backend/src/database/migrations/U1692803447__conversation-slug-index.sql b/backend/src/database/migrations/U1692803447__conversation-slug-index.sql new file mode 100644 index 0000000000..cb087c4ca8 --- /dev/null +++ b/backend/src/database/migrations/U1692803447__conversation-slug-index.sql @@ -0,0 +1 @@ +DROP INDEX CONCURRENTLY IF EXISTS conversations_tenant_segment_slug; diff --git a/backend/src/database/migrations/V1692786381__unique-activity-index.sql b/backend/src/database/migrations/V1692786381__unique-activity-index.sql new file mode 100644 index 0000000000..234df4ead3 --- /dev/null +++ b/backend/src/database/migrations/V1692786381__unique-activity-index.sql @@ -0,0 +1 @@ +CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS activities_tenant_segment_source_id_idx ON activities ("tenantId", "segmentId", "sourceId") WHERE "deletedAt" IS NULL; diff --git a/backend/src/database/migrations/V1692800745__index-subprojects-by-tenant.sql b/backend/src/database/migrations/V1692800745__index-subprojects-by-tenant.sql new file mode 100644 index 0000000000..4a7fedc9cf --- /dev/null +++ b/backend/src/database/migrations/V1692800745__index-subprojects-by-tenant.sql @@ -0,0 +1 @@ +CREATE INDEX CONCURRENTLY IF NOT EXISTS segments_tenant_subprojects ON segments ("tenantId") WHERE "grandparentSlug" IS NOT NULL AND "parentSlug" IS NOT NULL; diff --git a/backend/src/database/migrations/V1692803447__conversation-slug-index.sql b/backend/src/database/migrations/V1692803447__conversation-slug-index.sql new file mode 100644 index 0000000000..01b7d13031 --- /dev/null +++ b/backend/src/database/migrations/V1692803447__conversation-slug-index.sql @@ -0,0 +1 @@ +CREATE INDEX CONCURRENTLY IF NOT EXISTS conversations_tenant_segment_slug ON conversations ("tenantId", "segmentId", MD5(slug)); diff --git a/backend/src/database/repositories/incomingWebhookRepository.ts b/backend/src/database/repositories/incomingWebhookRepository.ts index ee9ee1e777..7a2fee8161 100644 --- a/backend/src/database/repositories/incomingWebhookRepository.ts +++ b/backend/src/database/repositories/incomingWebhookRepository.ts @@ -237,6 +237,7 @@ export default class IncomingWebhookRepository extends RepositoryBase< from "incomingWebhooks" where state = :pending and "createdAt" < now() - interval '1 hour' + and type not in ('GITHUB', 'DISCORD') limit ${perPage} offset ${(page - 1) * perPage}; ` diff --git a/backend/src/serverless/integrations/services/webhookProcessor.ts b/backend/src/serverless/integrations/services/webhookProcessor.ts index 4b99858fdf..14566dc601 100644 --- a/backend/src/serverless/integrations/services/webhookProcessor.ts +++ b/backend/src/serverless/integrations/services/webhookProcessor.ts @@ -44,7 +44,7 @@ export class WebhookProcessor extends LoggerBase { integrationId: webhook.integrationId, }) - logger.info('Webhook found!') + logger.debug('Webhook found!') if (!(force === true) && webhook.state !== WebhookState.PENDING) { logger.error({ state: webhook.state }, 'Webhook is not in pending state!') @@ -55,6 +55,9 @@ export class WebhookProcessor extends LoggerBase { userContext.log = logger const integration = await IntegrationRepository.findById(webhook.integrationId, userContext) + if (integration.platform === 'github' || integration.platform === 'discord') { + return + } const segment = await new SegmentRepository(userContext).findById(integration.segmentId) userContext.currentSegments = [segment] diff --git a/backend/src/serverless/utils/nodeWorkerSQS.ts b/backend/src/serverless/utils/nodeWorkerSQS.ts index 31bacc5d31..6144a4aed1 100644 --- a/backend/src/serverless/utils/nodeWorkerSQS.ts +++ b/backend/src/serverless/utils/nodeWorkerSQS.ts @@ -71,7 +71,7 @@ export const sendNodeWorkerMessage = async ( DelaySeconds: delay, } - log.info( + log.debug( { messageType: body.type, body, diff --git a/services/apps/data_sink_worker/src/queue/index.ts b/services/apps/data_sink_worker/src/queue/index.ts index d589a0865f..b27af043f1 100644 --- a/services/apps/data_sink_worker/src/queue/index.ts +++ b/services/apps/data_sink_worker/src/queue/index.ts @@ -23,7 +23,7 @@ export class WorkerQueueReceiver extends SqsQueueReceiver { parentLog: Logger, maxConcurrentProcessing: number, ) { - super(client, DATA_SINK_WORKER_QUEUE_SETTINGS, maxConcurrentProcessing, parentLog, true) + super(client, DATA_SINK_WORKER_QUEUE_SETTINGS, maxConcurrentProcessing, parentLog) } override async processMessage(message: IQueueMessage): Promise { diff --git a/services/apps/data_sink_worker/src/repo/activity.repo.ts b/services/apps/data_sink_worker/src/repo/activity.repo.ts index c62c3a4cd4..86849f5ca7 100644 --- a/services/apps/data_sink_worker/src/repo/activity.repo.ts +++ b/services/apps/data_sink_worker/src/repo/activity.repo.ts @@ -40,7 +40,11 @@ export default class ActivityRepository extends RepositoryBase { } public async update(id: string, tenantId: string, data: IDbMemberUpdateData): Promise { + const keys = Object.keys(data) + keys.push('updatedAt') + // construct custom column set + const dynamicColumnSet = new this.dbInstance.helpers.ColumnSet(keys, { + table: { + table: 'members', + }, + }) + const prepared = RepositoryBase.prepare( { ...data, - weakIdentities: JSON.stringify(data.weakIdentities || []), updatedAt: new Date(), }, - this.updateMemberColumnSet, + dynamicColumnSet, ) - const query = this.dbInstance.helpers.update(prepared, this.updateMemberColumnSet) + const query = this.dbInstance.helpers.update(prepared, dynamicColumnSet) const condition = this.format('where id = $(id) and "tenantId" = $(tenantId)', { id, diff --git a/services/apps/data_sink_worker/src/service/dataSink.service.ts b/services/apps/data_sink_worker/src/service/dataSink.service.ts index ea56034eef..8894de2be7 100644 --- a/services/apps/data_sink_worker/src/service/dataSink.service.ts +++ b/services/apps/data_sink_worker/src/service/dataSink.service.ts @@ -77,11 +77,8 @@ export default class DataSinkService extends LoggerBase { return } - this.log.debug('Marking result as in progress.') - await this.repo.markResultInProgress(resultId) - if (resultInfo.runId) { - await this.repo.touchRun(resultInfo.runId) - } + // this.log.debug('Marking result as in progress.') + // await this.repo.markResultInProgress(resultId) try { const data = resultInfo.data @@ -170,10 +167,6 @@ export default class DataSinkService extends LoggerBase { log: this.log, frameworkVersion: 'new', }) - } finally { - if (resultInfo.runId) { - await this.repo.touchRun(resultInfo.runId) - } } } } diff --git a/services/apps/data_sink_worker/src/service/member.service.ts b/services/apps/data_sink_worker/src/service/member.service.ts index 922fadb976..e8f9fd555b 100644 --- a/services/apps/data_sink_worker/src/service/member.service.ts +++ b/services/apps/data_sink_worker/src/service/member.service.ts @@ -154,15 +154,19 @@ export default class MemberService extends LoggerBase { if (!isObjectEmpty(toUpdate)) { this.log.debug({ memberId: id }, 'Updating a member!') - await txRepo.update(id, tenantId, { - emails: toUpdate.emails || original.emails, - joinedAt: toUpdate.joinedAt || original.joinedAt, - attributes: toUpdate.attributes || original.attributes, - weakIdentities: toUpdate.weakIdentities || original.weakIdentities, - // leave this one empty if nothing changed - we are only adding up new identities not removing them - identities: toUpdate.identities, - displayName: toUpdate.displayName || original.displayName, - }) + + const dateToUpdate = Object.entries(toUpdate).reduce((acc, [key, value]) => { + if (key === 'identities') { + return acc + } + + if (value) { + acc[key] = value + } + return acc + }, {} as IDbMemberUpdateData) + + await txRepo.update(id, tenantId, dateToUpdate) await txRepo.addToSegment(id, tenantId, segmentId) updated = true @@ -272,6 +276,12 @@ export default class MemberService extends LoggerBase { await this.store.transactionally(async (txStore) => { const txRepo = new MemberRepository(txStore, this.log) const txIntegrationRepo = new IntegrationRepository(txStore, this.log) + const txService = new MemberService( + txStore, + this.nodejsWorkerEmitter, + this.searchSyncWorkerEmitter, + this.log, + ) const dbIntegration = await txIntegrationRepo.findById(integrationId) const segmentId = dbIntegration.segmentId @@ -300,7 +310,7 @@ export default class MemberService extends LoggerBase { ) } - await this.update( + await txService.update( dbMember.id, tenantId, segmentId, @@ -374,9 +384,12 @@ export default class MemberService extends LoggerBase { const newDate = member.joinedAt const oldDate = new Date(dbMember.joinedAt) - if (newDate.getTime() !== oldDate.getTime()) { - // pick the oldest - joinedAt = newDate < oldDate ? newDate.toISOString() : oldDate.toISOString() + if (oldDate <= newDate) { + // we already have the oldest date in the db, so we don't need to update it + joinedAt = undefined + } else { + // we have a new date and it's older, so we need to update it + joinedAt = newDate.toISOString() } } diff --git a/services/apps/data_sink_worker/src/service/settings.repo.ts b/services/apps/data_sink_worker/src/service/settings.repo.ts index b05d17ae99..a1264e77fb 100644 --- a/services/apps/data_sink_worker/src/service/settings.repo.ts +++ b/services/apps/data_sink_worker/src/service/settings.repo.ts @@ -62,39 +62,64 @@ export default class SettingsRepository extends RepositoryBase { - const result = await this.db().result( - ` - update segments - set "activityChannels" = - case - -- If platform exists, and channel does not exist, add it - when "activityChannels" ? $(platform) - and not ($(channel) = any (select jsonb_array_elements_text("activityChannels" -> $(platform)))) then - jsonb_set( - "activityChannels", - array [$(platform)::text], - "activityChannels" -> $(platform) || jsonb_build_array($(channel)) - ) - -- If platform does not exist, create it - when not ("activityChannels" ? $(platform)) or "activityChannels" is null then - coalesce("activityChannels", '{}'::jsonb) || - jsonb_build_object($(platform), jsonb_build_array($(channel))) - -- Else, do nothing - else - "activityChannels" - end - where "tenantId" = $(tenantId) - and id = $(segmentId) - - `, + const existingData = await this.db().oneOrNone( + `select "activityChannels" from "segments" where "tenantId" = $(tenantId) and id = $(segmentId)`, { tenantId, segmentId, - platform, - channel, }, ) - this.checkUpdateRowCount(result.rowCount, 1) + if (existingData) { + const channels = existingData.activityChannels + + if (channels && channels[platform] && channels[platform].includes(channel)) { + return + } else { + await this.db().result( + ` + update segments + set "activityChannels" = + case + -- If platform exists, and channel does not exist, add it + when "activityChannels" ? $(platform) + and not ($(channel) = any (select jsonb_array_elements_text("activityChannels" -> $(platform)))) then + jsonb_set( + "activityChannels", + array [$(platform)::text], + "activityChannels" -> $(platform) || jsonb_build_array($(channel)) + ) + -- If platform does not exist, create it + when not ("activityChannels" ? $(platform)) or "activityChannels" is null then + coalesce("activityChannels", '{}'::jsonb) || + jsonb_build_object($(platform), jsonb_build_array($(channel))) + -- Else, do nothing + else + "activityChannels" + end + where "tenantId" = $(tenantId) + and id = $(segmentId) + and case + -- If platform exists, and channel does not exist, add it + when "activityChannels" ? $(platform) + and not ($(channel) = any (select jsonb_array_elements_text("activityChannels" -> $(platform)))) then + 1 + -- If platform does not exist, create it + when not ("activityChannels" ? $(platform)) or "activityChannels" is null then + 1 + -- Else, do nothing + else + 0 + end = 1 + `, + { + tenantId, + segmentId, + platform, + channel, + }, + ) + } + } } } diff --git a/services/apps/integration_data_worker/src/repo/integrationData.repo.ts b/services/apps/integration_data_worker/src/repo/integrationData.repo.ts index 8ec81a27e9..e2045ea743 100644 --- a/services/apps/integration_data_worker/src/repo/integrationData.repo.ts +++ b/services/apps/integration_data_worker/src/repo/integrationData.repo.ts @@ -198,16 +198,11 @@ export default class IntegrationDataRepository extends RepositoryBase { + public async deleteData(dataId: string): Promise { const result = await this.db().result( - `update integration."apiData" - set state = $(state), - "processedAt" = now(), - "updatedAt" = now() - where id = $(dataId)`, + `delete from integration."apiData" where id = $(dataId)`, { dataId, - state: IntegrationStreamDataState.PROCESSED, }, ) diff --git a/services/apps/integration_data_worker/src/service/integrationDataService.ts b/services/apps/integration_data_worker/src/service/integrationDataService.ts index 4e13c22528..854cdd130c 100644 --- a/services/apps/integration_data_worker/src/service/integrationDataService.ts +++ b/services/apps/integration_data_worker/src/service/integrationDataService.ts @@ -171,17 +171,19 @@ export default class IntegrationDataService extends LoggerBase { : undefined, } - this.log.debug('Marking data as in progress!') - await this.repo.markDataInProgress(dataId) - if (dataInfo.runId) { - await this.repo.touchRun(dataInfo.runId) - } + // this.log.debug('Marking data as in progress!') + // await this.repo.markDataInProgress(dataId) + + // TODO we might need that later to check for stuck runs + // if (dataInfo.runId) { + // await this.repo.touchRun(dataInfo.runId) + // } this.log.debug('Processing data!') try { await integrationService.processData(context) this.log.debug('Finished processing data!') - await this.repo.markDataProcessed(dataId) + await this.repo.deleteData(dataId) } catch (err) { this.log.error(err, 'Error while processing stream!') await this.triggerDataError( @@ -232,11 +234,14 @@ export default class IntegrationDataService extends LoggerBase { frameworkVersion: 'new', }) } - } finally { - if (dataInfo.runId) { - await this.repo.touchRun(dataInfo.runId) - } } + + // TODO we might need that later to check for stuck runs + // finally { + // if (dataInfo.runId) { + // await this.repo.touchRun(dataInfo.runId) + // } + // } } private async publishCustom( @@ -253,7 +258,12 @@ export default class IntegrationDataService extends LoggerBase { type, data: entity, }) - await this.dataSinkWorkerEmitter.triggerResultProcessing(tenantId, platform, resultId) + await this.dataSinkWorkerEmitter.triggerResultProcessing( + tenantId, + platform, + resultId, + resultId, + ) } catch (err) { await this.triggerDataError( dataId, @@ -278,7 +288,12 @@ export default class IntegrationDataService extends LoggerBase { type: IntegrationResultType.ACTIVITY, data: activity, }) - await this.dataSinkWorkerEmitter.triggerResultProcessing(tenantId, platform, resultId) + await this.dataSinkWorkerEmitter.triggerResultProcessing( + tenantId, + platform, + resultId, + activity.sourceId, + ) } catch (err) { await this.triggerDataError( dataId, diff --git a/services/apps/integration_run_worker/package.json b/services/apps/integration_run_worker/package.json index 5610e1f57d..2bd57af993 100644 --- a/services/apps/integration_run_worker/package.json +++ b/services/apps/integration_run_worker/package.json @@ -15,7 +15,8 @@ "format-check": "./node_modules/.bin/prettier --check .", "tsc-check": "./node_modules/.bin/tsc --noEmit", "script:onboard-integration": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/onboard-integration.ts", - "script:process-repo": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/process-repo.ts" + "script:process-repo": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/process-repo.ts", + "script:trigger-stream-processed": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/trigger-stream-processed.ts" }, "dependencies": { "@crowd/common": "file:../../libs/common", diff --git a/services/apps/integration_run_worker/src/bin/onboard-integration.ts b/services/apps/integration_run_worker/src/bin/onboard-integration.ts index bf1d9f8972..d7724d407b 100644 --- a/services/apps/integration_run_worker/src/bin/onboard-integration.ts +++ b/services/apps/integration_run_worker/src/bin/onboard-integration.ts @@ -47,6 +47,7 @@ setImmediate(async () => { integration.id, true, ) + process.exit(0) } else { log.error({ integrationId }, 'Integration not found!') process.exit(1) diff --git a/services/apps/integration_run_worker/src/bin/trigger-stream-processed.ts b/services/apps/integration_run_worker/src/bin/trigger-stream-processed.ts new file mode 100644 index 0000000000..4f0e34b16b --- /dev/null +++ b/services/apps/integration_run_worker/src/bin/trigger-stream-processed.ts @@ -0,0 +1,25 @@ +import { SQS_CONFIG } from '@/conf' +import { getServiceLogger } from '@crowd/logging' +import { IntegrationRunWorkerEmitter, getSqsClient } from '@crowd/sqs' +import { StreamProcessedQueueMessage } from '@crowd/types' + +const log = getServiceLogger() + +const processArguments = process.argv.slice(2) + +if (processArguments.length !== 1) { + log.error('Expected 1 argument: streamIds') + process.exit(1) +} + +const runIds = processArguments[0].split(',') + +setImmediate(async () => { + const sqsClient = getSqsClient(SQS_CONFIG()) + const emitter = new IntegrationRunWorkerEmitter(sqsClient, log) + await emitter.init() + + for (const runId of runIds) { + await emitter.sendMessage(runId, new StreamProcessedQueueMessage(runId)) + } +}) diff --git a/services/apps/integration_run_worker/src/service/integrationRunService.ts b/services/apps/integration_run_worker/src/service/integrationRunService.ts index 979d86c8d4..f1598b838c 100644 --- a/services/apps/integration_run_worker/src/service/integrationRunService.ts +++ b/services/apps/integration_run_worker/src/service/integrationRunService.ts @@ -454,7 +454,8 @@ export default class IntegrationRunService extends LoggerBase { this.log.debug('Marking run as in progress!') await this.repo.markRunInProgress(runId) - await this.repo.touchRun(runId) + // TODO we might need that later to check for stuck runs + // await this.repo.touchRun(runId) this.log.info('Generating streams!') try { @@ -469,9 +470,11 @@ export default class IntegrationRunService extends LoggerBase { undefined, err, ) - } finally { - await this.repo.touchRun(runId) } + // TODO we might need that later to check for stuck runs + // finally { + // await this.repo.touchRun(runId) + // } } private async updateIntegrationSettings(runId: string, settings: unknown): Promise { diff --git a/services/apps/integration_stream_worker/src/service/integrationStreamService.ts b/services/apps/integration_stream_worker/src/service/integrationStreamService.ts index 0bae955596..ad52fb34a4 100644 --- a/services/apps/integration_stream_worker/src/service/integrationStreamService.ts +++ b/services/apps/integration_stream_worker/src/service/integrationStreamService.ts @@ -488,7 +488,8 @@ export default class IntegrationStreamService extends LoggerBase { this.log.debug('Marking stream as in progress!') await this.repo.markStreamInProgress(streamId) - await this.repo.touchRun(streamInfo.runId) + // TODO we might need that later to check for stuck runs + // await this.repo.touchRun(streamInfo.runId) this.log.debug('Processing stream!') try { @@ -505,7 +506,8 @@ export default class IntegrationStreamService extends LoggerBase { err, ) } finally { - await this.repo.touchRun(streamInfo.runId) + // TODO we might need that later to check for stuck runs + // await this.repo.touchRun(streamInfo.runId) await this.runWorkerEmitter.streamProcessed( streamInfo.tenantId, streamInfo.integrationType, diff --git a/services/libs/conversations/src/repo/conversation.repo.ts b/services/libs/conversations/src/repo/conversation.repo.ts index fe43300d0d..859a9340b2 100644 --- a/services/libs/conversations/src/repo/conversation.repo.ts +++ b/services/libs/conversations/src/repo/conversation.repo.ts @@ -82,7 +82,7 @@ export class ConversationRepository extends RepositoryBase { const results = await this.db().any( - `select id from conversations where "tenantId" = $(tenantId) and slug = $(slug) and "segmentId" = $(segmentId)`, + `select id from conversations where "tenantId" = $(tenantId) and MD5(slug) = MD5($(slug)) and "segmentId" = $(segmentId)`, { tenantId, slug, diff --git a/services/libs/database/src/connection.ts b/services/libs/database/src/connection.ts index 79d9db7cfb..6ed4700867 100644 --- a/services/libs/database/src/connection.ts +++ b/services/libs/database/src/connection.ts @@ -59,6 +59,7 @@ export const getDbConnection = (config: IDatabaseConfig, maxPoolSize?: number): dbConnection = dbInstance({ ...config, max: maxPoolSize || 5, + query_timeout: 10000, }) return dbConnection diff --git a/services/libs/integrations/src/integrations/devto/index.ts b/services/libs/integrations/src/integrations/devto/index.ts index 776923087b..9febdf8ead 100644 --- a/services/libs/integrations/src/integrations/devto/index.ts +++ b/services/libs/integrations/src/integrations/devto/index.ts @@ -8,7 +8,7 @@ import { PlatformType } from '@crowd/types' const descriptor: IIntegrationDescriptor = { type: PlatformType.DEVTO, memberAttributes: DEVTO_MEMBER_ATTRIBUTES, - checkEvery: 20, + checkEvery: 30, generateStreams, processStream, diff --git a/services/libs/integrations/src/integrations/slack/index.ts b/services/libs/integrations/src/integrations/slack/index.ts index 7ed5820950..0772c657a3 100644 --- a/services/libs/integrations/src/integrations/slack/index.ts +++ b/services/libs/integrations/src/integrations/slack/index.ts @@ -8,7 +8,7 @@ import { PlatformType } from '@crowd/types' const descriptor: IIntegrationDescriptor = { type: PlatformType.SLACK, memberAttributes: SLACK_MEMBER_ATTRIBUTES, - checkEvery: 20, + checkEvery: 30, generateStreams, processStream, processData, diff --git a/services/libs/sqs/src/client.ts b/services/libs/sqs/src/client.ts index 69815b06fd..9c761c0536 100644 --- a/services/libs/sqs/src/client.ts +++ b/services/libs/sqs/src/client.ts @@ -11,7 +11,7 @@ import { SendMessageRequest, SendMessageResult, } from '@aws-sdk/client-sqs' -import { IS_DEV_ENV, IS_STAGING_ENV } from '@crowd/common' +import { IS_DEV_ENV, IS_STAGING_ENV, timeout } from '@crowd/common' import { getServiceChildLogger } from '@crowd/logging' import { ISqsClientConfig, SqsClient, SqsMessage } from './types' @@ -63,7 +63,11 @@ export const receiveMessage = async ( return [] } catch (err) { - if (err.message === 'We encountered an internal error. Please try again.') { + if ( + err.message === 'We encountered an internal error. Please try again.' || + err.message === 'Request is throttled.' || + err.message === 'Queue Throttled' + ) { return [] } @@ -74,20 +78,59 @@ export const receiveMessage = async ( export const deleteMessage = async ( client: SqsClient, params: DeleteMessageRequest, + retry = 0, ): Promise => { - await client.send(new DeleteMessageCommand(params)) + try { + await client.send(new DeleteMessageCommand(params)) + } catch (err) { + if ( + (err.message === 'Request is throttled.' || err.message === 'Queue Throttled') && + retry < 5 + ) { + await timeout(1000) + return await deleteMessage(client, params, retry + 1) + } + + throw err + } } export const sendMessage = async ( client: SqsClient, params: SendMessageRequest, + retry = 0, ): Promise => { - return client.send(new SendMessageCommand(params)) + try { + return client.send(new SendMessageCommand(params)) + } catch (err) { + if ( + (err.message === 'Request is throttled.' || err.message === 'Queue Throttled') && + retry < 5 + ) { + await timeout(1000) + return await sendMessage(client, params, retry + 1) + } + + throw err + } } export const sendMessagesBulk = async ( client: SqsClient, params: SendMessageBatchRequest, + retry = 0, ): Promise => { - return client.send(new SendMessageBatchCommand(params)) + try { + return client.send(new SendMessageBatchCommand(params)) + } catch (err) { + if ( + (err.message === 'Request is throttled.' || err.message === 'Queue Throttled') && + retry < 5 + ) { + await timeout(1000) + return await sendMessagesBulk(client, params, retry + 1) + } + + throw err + } } diff --git a/services/libs/sqs/src/instances/dataSinkWorker.ts b/services/libs/sqs/src/instances/dataSinkWorker.ts index 6c4722b67a..42105d77bc 100644 --- a/services/libs/sqs/src/instances/dataSinkWorker.ts +++ b/services/libs/sqs/src/instances/dataSinkWorker.ts @@ -9,7 +9,12 @@ export class DataSinkWorkerEmitter extends SqsQueueEmitter { super(client, DATA_SINK_WORKER_QUEUE_SETTINGS, parentLog) } - public async triggerResultProcessing(tenantId: string, platform: string, resultId: string) { - await this.sendMessage(resultId, new ProcessIntegrationResultQueueMessage(resultId), resultId) + public async triggerResultProcessing( + tenantId: string, + platform: string, + resultId: string, + sourceId: string, + ) { + await this.sendMessage(sourceId, new ProcessIntegrationResultQueueMessage(resultId), resultId) } } diff --git a/services/libs/sqs/src/instances/integrationRunWorker.ts b/services/libs/sqs/src/instances/integrationRunWorker.ts index eeea831a53..d66b572c42 100644 --- a/services/libs/sqs/src/instances/integrationRunWorker.ts +++ b/services/libs/sqs/src/instances/integrationRunWorker.ts @@ -47,6 +47,6 @@ export class IntegrationRunWorkerEmitter extends SqsQueueEmitter { } public async streamProcessed(tenantId: string, platform: string, runId: string): Promise { - await this.sendMessage(runId, new StreamProcessedQueueMessage(runId)) + await this.sendMessage(runId, new StreamProcessedQueueMessage(runId), runId) } } diff --git a/services/libs/sqs/src/queue.ts b/services/libs/sqs/src/queue.ts index 04b3358e50..30b4683e4c 100644 --- a/services/libs/sqs/src/queue.ts +++ b/services/libs/sqs/src/queue.ts @@ -172,16 +172,24 @@ export abstract class SqsQueueReceiver extends SqsQueueBase { protected abstract processMessage(data: IQueueMessage): Promise private async receiveMessage(): Promise { - const params: ReceiveMessageRequest = { - QueueUrl: this.getQueueUrl(), - } + try { + const params: ReceiveMessageRequest = { + QueueUrl: this.getQueueUrl(), + } - return receiveMessage( - this.sqsClient, - params, - this.visibilityTimeoutSeconds, - this.receiveMessageCount, - ) + return receiveMessage( + this.sqsClient, + params, + this.visibilityTimeoutSeconds, + this.receiveMessageCount, + ) + } catch (err) { + if (err.message === 'Request is throttled.') { + return [] + } + + throw err + } } private async deleteMessage(receiptHandle: string): Promise {