Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f3866c3
increase max concurrency in data-worker
garrrikkotua Nov 6, 2023
b5d4613
temp remove locks in data-sink worker
garrrikkotua Nov 6, 2023
94a89e8
disable transactions in data sink worker
garrrikkotua Nov 6, 2023
a14b868
prevent organization update if previous update happened less than 1 m…
garrrikkotua Nov 6, 2023
06d0578
increase max concurrent processing for data-sink from 3 to 4
garrrikkotua Nov 7, 2023
2917c85
increase max con processing to 7
garrrikkotua Nov 7, 2023
ef45a5f
decrease to 5
garrrikkotua Nov 7, 2023
5f17f37
back to 4
garrrikkotua Nov 7, 2023
9caff93
process old jobs async
garrrikkotua Nov 7, 2023
1020201
add enum for tenants plans
garrrikkotua Nov 7, 2023
cf361c9
adjust settings
garrrikkotua Nov 7, 2023
5f97865
change old results logic
garrrikkotua Nov 7, 2023
9010862
more concurrent version of processOldResults
garrrikkotua Nov 7, 2023
69fd30d
fix logic in processOldResults
garrrikkotua Nov 7, 2023
ff48d27
small fixes
garrrikkotua Nov 7, 2023
db687bd
Merge branch 'main' into improve/optimize-data-workers
garrrikkotua Nov 7, 2023
7a7e530
remove lock and load more results
garrrikkotua Nov 7, 2023
f96c87e
bring lock back
garrrikkotua Nov 7, 2023
c309104
increase results to load
garrrikkotua Nov 7, 2023
97b16ff
some optimizations
garrrikkotua Nov 7, 2023
c72dd38
bring messages back
garrrikkotua Nov 7, 2023
c856e15
disable transactions for the process results job
garrrikkotua Nov 7, 2023
6467638
use for update skip locked instead of redis lock
Nov 8, 2023
5f6dc52
decrease results to load and check more often
garrrikkotua Nov 8, 2023
2c0ba3c
fix and simplify results query
garrrikkotua Nov 8, 2023
efd6e63
increase checks frequency
garrrikkotua Nov 8, 2023
3f7f121
remove locks completely
garrrikkotua Nov 8, 2023
ab38390
trying to speed up
garrrikkotua Nov 8, 2023
ff64142
fix issue when github org name = @
garrrikkotua Nov 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 28 additions & 12 deletions services/apps/data_sink_worker/src/jobs/processOldResults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import { RedisClient, processWithLock } from '@crowd/redis'
import { NodejsWorkerEmitter, SearchSyncWorkerEmitter } from '@crowd/sqs'
import { Client as TemporalClient } from '@crowd/temporal'

const MAX_CONCURRENT_PROMISES = 3

export const processOldResultsJob = async (
dbConn: DbConnection,
redis: RedisClient,
Expand All @@ -30,13 +32,13 @@ export const processOldResultsJob = async (

const loadNextBatch = async (): Promise<string[]> => {
return await processWithLock(redis, 'process-old-results', 5 * 60, 3 * 60, async () => {
const resultIds = await repo.getOldResultsToProcess(5)
const resultIds = await repo.getOldResultsToProcess(MAX_CONCURRENT_PROMISES)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would load like 10 at once and process them MAX_CONCURRENT_PROMISES at a time.

await repo.touchUpdatedAt(resultIds)
return resultIds
})
}

// load 5 oldest results and try process them
// load 3 oldest results and try process them
let resultsToProcess = await loadNextBatch()

let successCount = 0
Expand All @@ -45,20 +47,34 @@ export const processOldResultsJob = async (
while (resultsToProcess.length > 0) {
log.info(`Detected ${resultsToProcess.length} old results rows to process!`)

const promises = []
for (const resultId of resultsToProcess) {
try {
const result = await service.processResult(resultId)
if (result) {
successCount++
} else {
errorCount++
}
} catch (err) {
log.error(err, 'Failed to process result!')
errorCount++
promises.push(
service
.processResult(resultId)
.then((result) => {
if (result) {
successCount++
} else {
errorCount++
}
})
.catch((err) => {
log.error(err, 'Failed to process result!')
errorCount++
}),
)

if (promises.length >= MAX_CONCURRENT_PROMISES) {
await Promise.all(promises)
promises.length = 0
}
}

if (promises.length > 0) {
await Promise.all(promises)
}

log.info(`Processed ${successCount} old results successfully and ${errorCount} with errors.`)

resultsToProcess = await loadNextBatch()
Expand Down
2 changes: 1 addition & 1 deletion services/apps/data_sink_worker/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { Client as TemporalClient, getTemporalClient } from '@crowd/temporal'
const tracer = getServiceTracer()
const log = getServiceLogger()

const MAX_CONCURRENT_PROCESSING = 3
const MAX_CONCURRENT_PROCESSING = 4
const PROCESSING_INTERVAL_MINUTES = 5

setImmediate(async () => {
Expand Down
2 changes: 1 addition & 1 deletion services/apps/data_sink_worker/src/queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export class WorkerQueueReceiver extends SqsQueueReceiver {
this.log.trace({ messageType: message.type }, 'Processing message!')

const service = new DataSinkService(
new DbStore(this.log, this.dbConn),
new DbStore(this.log, this.dbConn, undefined, false),
this.nodejsWorkerEmitter,
this.searchSyncWorkerEmitter,
this.redisClient,
Expand Down
16 changes: 9 additions & 7 deletions services/apps/data_sink_worker/src/repo/dataSink.repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,15 @@ export default class DataSinkRepository extends RepositoryBase<DataSinkRepositor
try {
const results = await this.db().any(
`
select id
from integration.results
where state in ($(pendingState), $(processingState))
and "updatedAt" < now() - interval '1 hour'
order by case when "webhookId" is not null then 0 else 1 end,
"webhookId" asc,
"updatedAt" desc
select r.id
from integration.results r
inner join tenants t on t.id = r."tenantId"
where r.state in ($(pendingState), $(processingState))
and r."updatedAt" < now() - interval '1 hour'
order by case when t."plan" in ('Scale', 'Growth') then 0 else 1 end,
Comment thread
themarolt marked this conversation as resolved.
Outdated
case when r."webhookId" is not null then 0 else 1 end,
r."webhookId" asc,
r."updatedAt" desc
limit ${limit};
`,
{
Expand Down
5 changes: 3 additions & 2 deletions services/apps/data_sink_worker/src/repo/organization.repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ export class OrganizationRepository extends RepositoryBase<OrganizationRepositor
},
})
const updatedAt = new Date()
const oneMinuteAgo = new Date(updatedAt.getTime() - 60 * 1000)
const prepared = RepositoryBase.prepare(
{
...data,
Expand All @@ -472,9 +473,9 @@ export class OrganizationRepository extends RepositoryBase<OrganizationRepositor
)

const query = this.dbInstance.helpers.update(prepared, dynamicColumnSet)
const condition = this.format('where id = $(id) and "updatedAt" < $(updatedAt)', {
const condition = this.format('where id = $(id) and "updatedAt" <= $(oneMinuteAgo)', {
id,
updatedAt,
oneMinuteAgo,
})

await this.db().result(`${query} ${condition}`)
Expand Down
20 changes: 19 additions & 1 deletion services/apps/data_sink_worker/src/service/activity.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,32 @@ import IntegrationRepository from '../repo/integration.repo'
import GithubReposRepository from '../repo/githubRepos.repo'
import MemberAffiliationService from './memberAffiliation.service'
import { RedisClient } from '@crowd/redis'
import { acquireLock, releaseLock } from '@crowd/redis'
// import { acquireLock, releaseLock } from '@crowd/redis'
import { Unleash, isFeatureEnabled } from '@crowd/feature-flags'
import { Client as TemporalClient, WorkflowIdReusePolicy } from '@crowd/temporal'
import { TEMPORAL_CONFIG } from '../conf'

const MEMBER_LOCK_EXPIRE_AFTER = 10 * 60 // 10 minutes
const MEMBER_LOCK_TIMEOUT_AFTER = 5 * 60 // 5 minutes

const acquireLock = async (
redisClient: RedisClient,
lockKey: string,
lockReason: string,
lockExpireAfter: number,
lockTimeoutAfter: number,
): Promise<void> => {
// empty
}

const releaseLock = async (
redisClient: RedisClient,
lockKey: string,
lockReason: string,
): Promise<void> => {
// empty
}

export default class ActivityService extends LoggerBase {
private readonly conversationService: ConversationService

Expand Down
2 changes: 1 addition & 1 deletion services/apps/integration_data_worker/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { processOldDataJob } from './jobs/processOldData'
const tracer = getServiceTracer()
const log = getServiceLogger()

const MAX_CONCURRENT_PROCESSING = 2
const MAX_CONCURRENT_PROCESSING = 3
const PROCESSING_INTERVAL_MINUTES = 5

setImmediate(async () => {
Expand Down
5 changes: 5 additions & 0 deletions services/libs/database/src/dbStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export class DbStore extends LoggerBase {
parentLog?: Logger,
private readonly dbConnection?: DbConnection,
private readonly dbTransaction?: DbTransaction,
private readonly withTransactions: boolean = true,
) {
super(parentLog, { transactional: dbTransaction !== undefined })

Expand Down Expand Up @@ -51,6 +52,10 @@ export class DbStore extends LoggerBase {
public transactionally<T>(inTransaction: (store: DbStore) => Promise<T>): Promise<T> {
this.checkValid()

if (!this.withTransactions) {
return inTransaction(this)
}

if (this.isTransaction()) {
this.log.debug('Using an existing transaction!')
return inTransaction(this)
Expand Down