Skip to content

Commit e08fb02

Browse files
perf(trigger): cap concurrency on background DB tasks (#5231)
* perf(trigger): cap concurrency on background DB tasks * test(trigger): update schedule concurrency assertion to 30
1 parent d9da544 commit e08fb02

7 files changed

Lines changed: 39 additions & 3 deletions

File tree

apps/sim/background/async-execution-correlation.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ describe('async execution correlation fallbacks', () => {
5656
expect(scheduleExecutionTaskOptions).toMatchObject({
5757
queue: {
5858
name: 'schedule-execution',
59-
concurrencyLimit: 50,
59+
concurrencyLimit: 30,
6060
},
6161
})
6262
})
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import { env, envNumber } from '@/lib/core/config/env'
2+
3+
/** Per-task Trigger.dev concurrency caps. Bound heavy DB tasks so unbounded fan-out can't saturate the pool. */
4+
5+
export const WORKFLOW_EXECUTION_CONCURRENCY_LIMIT = envNumber(
6+
env.WORKFLOW_EXECUTION_CONCURRENCY_LIMIT,
7+
75,
8+
{ min: 1, integer: true }
9+
)
10+
11+
export const WEBHOOK_EXECUTION_CONCURRENCY_LIMIT = envNumber(
12+
env.WEBHOOK_EXECUTION_CONCURRENCY_LIMIT,
13+
75,
14+
{ min: 1, integer: true }
15+
)
16+
17+
export const RESUME_EXECUTION_CONCURRENCY_LIMIT = envNumber(
18+
env.RESUME_EXECUTION_CONCURRENCY_LIMIT,
19+
50,
20+
{ min: 1, integer: true }
21+
)

apps/sim/background/resume-execution.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { withCascadeLock } from '@/lib/table/cascade-lock'
66
import { isExecCancelled } from '@/lib/table/deps'
77
import type { RowData, RowExecutionMetadata } from '@/lib/table/types'
88
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
9+
import { RESUME_EXECUTION_CONCURRENCY_LIMIT } from '@/background/concurrency-limits'
910

1011
const logger = createLogger('TriggerResumeExecution')
1112

@@ -351,5 +352,8 @@ export const resumeExecutionTask = task({
351352
retry: {
352353
maxAttempts: 1,
353354
},
355+
queue: {
356+
concurrencyLimit: RESUME_EXECUTION_CONCURRENCY_LIMIT,
357+
},
354358
run: executeResumeJob,
355359
})

apps/sim/background/webhook-execution.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import {
2626
import { handlePostExecutionPauseState } from '@/lib/workflows/executor/pause-persistence'
2727
import { loadDeployedWorkflowState } from '@/lib/workflows/persistence/utils'
2828
import { resolveOAuthAccountId } from '@/app/api/auth/oauth/utils'
29+
import { WEBHOOK_EXECUTION_CONCURRENCY_LIMIT } from '@/background/concurrency-limits'
2930
import { getBlock } from '@/blocks'
3031
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
3132
import type { ExecutionMetadata } from '@/executor/execution/types'
@@ -683,5 +684,8 @@ export const webhookExecution = task({
683684
retry: {
684685
maxAttempts: 1,
685686
},
687+
queue: {
688+
concurrencyLimit: WEBHOOK_EXECUTION_CONCURRENCY_LIMIT,
689+
},
686690
run: async (payload: WebhookExecutionPayload) => executeWebhookJob(payload),
687691
})

apps/sim/background/workflow-execution.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import {
1313
wasExecutionFinalizedByCore,
1414
} from '@/lib/workflows/executor/execution-core'
1515
import { handlePostExecutionPauseState } from '@/lib/workflows/executor/pause-persistence'
16+
import { WORKFLOW_EXECUTION_CONCURRENCY_LIMIT } from '@/background/concurrency-limits'
1617
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
1718
import type { ExecutionMetadata } from '@/executor/execution/types'
1819
import { hasExecutionResult } from '@/executor/utils/errors'
@@ -206,5 +207,8 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
206207
export const workflowExecutionTask = task({
207208
id: 'workflow-execution',
208209
machine: 'medium-1x',
210+
queue: {
211+
concurrencyLimit: WORKFLOW_EXECUTION_CONCURRENCY_LIMIT,
212+
},
209213
run: executeWorkflowJob,
210214
})

apps/sim/lib/core/config/env.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,10 @@ export const env = createEnv({
206206
TRIGGER_DEV_ENABLED: z.boolean().optional(), // Toggle to enable/disable Trigger.dev for async jobs
207207
CRON_SECRET: z.string().optional(), // Secret for authenticating cron job requests
208208
JOB_RETENTION_DAYS: z.string().optional().default('1'), // Days to retain job logs/data
209-
SCHEDULE_EXECUTION_CONCURRENCY_LIMIT: z.string().optional().default('50'),
209+
SCHEDULE_EXECUTION_CONCURRENCY_LIMIT: z.string().optional().default('30'),
210+
WORKFLOW_EXECUTION_CONCURRENCY_LIMIT: z.string().optional().default('75'),
211+
WEBHOOK_EXECUTION_CONCURRENCY_LIMIT: z.string().optional().default('75'),
212+
RESUME_EXECUTION_CONCURRENCY_LIMIT: z.string().optional().default('50'),
210213
SCHEDULE_ENQUEUE_BUDGET_MULTIPLIER: z.string().optional().default('2'),
211214
SCHEDULE_JITTER_MAX_MS: z.string().optional().default('30000'),
212215
SCHEDULE_INFRA_RETRY_BASE_MS: z.string().optional().default('60000'),

apps/sim/lib/workflows/schedules/execution-limits.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ export const SCHEDULE_EXECUTION_QUEUE_NAME = 'schedule-execution'
44

55
export const SCHEDULE_EXECUTION_CONCURRENCY_LIMIT = envNumber(
66
env.SCHEDULE_EXECUTION_CONCURRENCY_LIMIT,
7-
50,
7+
30,
88
{ min: 1, integer: true }
99
)
1010

0 commit comments

Comments
 (0)