Skip to content

Commit 2fdd93c

Browse files
committed
improvement(webhooks): cut inline dispatch latency + add trigger-age instrumentation
- Skip the redundant second preprocessExecution pass for inline (Slack/generic) webhook execution: thread the route's already-resolved workflowRecord + executionTimeout into the in-process job (PreprocessedWebhookContext) so it reuses them instead of redoing ~4 DB reads. Scoped to the in-process inline call only; the persisted/polling Trigger.dev payload still re-validates fresh. - Add dispatch-latency / trigger-age instrumentation: capture webhook receipt time + Slack x-slack-request-timestamp at the route and log structured dispatchLatencyMs + triggerAgeMs, surfacing the pre-execution latency that per-block timings cannot see (Slack trigger_id expires at 3s). - Guard the effective-env fetch in verifyProviderAuth to only run when the handler verifies auth AND the providerConfig references env vars ({{VAR}}), avoiding a needless DB read/decrypt on the synchronous pre-ack path.
1 parent d9da544 commit 2fdd93c

3 files changed

Lines changed: 142 additions & 28 deletions

File tree

apps/sim/app/api/webhooks/trigger/[path]/route.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,17 @@ async function handleWebhookPost(
6969
request: NextRequest,
7070
context: { params: Promise<{ path: string }> }
7171
): Promise<NextResponse> {
72+
const receivedAt = Date.now()
73+
/**
74+
* Slack signs every interactive request with the originating interaction time.
75+
* Capturing it lets the executor surface the true trigger_id age (the window
76+
* that expires at 3s) instead of only the in-workflow block timings.
77+
*/
78+
const slackRequestTimestamp = request.headers.get('x-slack-request-timestamp')
79+
const triggerTimestampMs = slackRequestTimestamp
80+
? Number(slackRequestTimestamp) * 1000
81+
: undefined
82+
7283
const requestId = generateRequestId()
7384
const parsed = await parseRequest(webhookTriggerPostContract, request, context)
7485
if (!parsed.success) return parsed.response
@@ -200,6 +211,9 @@ async function handleWebhookPost(
200211
actorUserId: preprocessResult.actorUserId,
201212
executionId: preprocessResult.executionId,
202213
correlation: preprocessResult.correlation,
214+
receivedAt,
215+
triggerTimestampMs: Number.isFinite(triggerTimestampMs) ? triggerTimestampMs : undefined,
216+
preprocessed: preprocessResult.preprocessed,
203217
})
204218
responses.push(response)
205219
}

apps/sim/background/webhook-execution.ts

Lines changed: 60 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818
WebhookAttachmentProcessor,
1919
} from '@/lib/webhooks/attachment-processor'
2020
import { resolveWebhookRecordProviderConfig } from '@/lib/webhooks/env-resolver'
21+
import type { PreprocessedWebhookContext } from '@/lib/webhooks/processor'
2122
import { getProviderHandler } from '@/lib/webhooks/providers'
2223
import {
2324
executeWorkflowCore,
@@ -236,6 +237,12 @@ export type WebhookExecutionPayload = {
236237
blockId?: string
237238
workspaceId?: string
238239
credentialId?: string
240+
/** Epoch ms when the webhook HTTP request was first received (for dispatch-latency metrics). */
241+
webhookReceivedAt?: number
242+
/** Epoch ms of the originating provider interaction (e.g. Slack x-slack-request-timestamp). */
243+
triggerTimestampMs?: number
244+
/** Route-level preprocessing result; when present the job skips the redundant second pass. */
245+
preprocessed?: PreprocessedWebhookContext
239246
}
240247

241248
export async function executeWebhookJob(payload: WebhookExecutionPayload) {
@@ -350,25 +357,43 @@ async function executeWebhookJobInternal(
350357
requestId
351358
)
352359

353-
const preprocessResult = await preprocessExecution({
354-
workflowId: payload.workflowId,
355-
userId: payload.userId,
356-
triggerType: 'webhook',
357-
executionId,
358-
requestId,
359-
triggerData: { correlation },
360-
checkRateLimit: false,
361-
checkDeployment: false,
362-
skipUsageLimits: true,
363-
workspaceId: payload.workspaceId,
364-
loggingSession,
365-
})
360+
/**
361+
* The webhook route already ran full preprocessing (workflow validation,
362+
* billing actor, ban/usage/rate-limit gates, admission reservation) moments
363+
* ago and handed back the resolved record + timeout. Reuse it to skip a
364+
* redundant second pass (≈4 DB reads) on the latency-critical dispatch path.
365+
* Fall back to a fresh pass when the prefetched context is absent (e.g. other
366+
* callers, or a payload that predates this field).
367+
*/
368+
let workflowRecord: PreprocessedWebhookContext['workflowRecord'] | undefined
369+
let executionTimeout: PreprocessedWebhookContext['executionTimeout'] | undefined
370+
371+
if (payload.preprocessed) {
372+
workflowRecord = payload.preprocessed.workflowRecord
373+
executionTimeout = payload.preprocessed.executionTimeout
374+
} else {
375+
const preprocessResult = await preprocessExecution({
376+
workflowId: payload.workflowId,
377+
userId: payload.userId,
378+
triggerType: 'webhook',
379+
executionId,
380+
requestId,
381+
triggerData: { correlation },
382+
checkRateLimit: false,
383+
checkDeployment: false,
384+
skipUsageLimits: true,
385+
workspaceId: payload.workspaceId,
386+
loggingSession,
387+
})
366388

367-
if (!preprocessResult.success) {
368-
throw new Error(preprocessResult.error?.message || 'Preprocessing failed in background job')
389+
if (!preprocessResult.success) {
390+
throw new Error(preprocessResult.error?.message || 'Preprocessing failed in background job')
391+
}
392+
393+
workflowRecord = preprocessResult.workflowRecord
394+
executionTimeout = preprocessResult.executionTimeout
369395
}
370396

371-
const { workflowRecord, executionTimeout } = preprocessResult
372397
if (!workflowRecord) {
373398
throw new Error(`Workflow ${payload.workflowId} not found during preprocessing`)
374399
}
@@ -564,6 +589,25 @@ async function executeWebhookJobInternal(
564589

565590
const triggerInput = input || {}
566591

592+
/**
593+
* Surface the pre-execution latency that per-block timings cannot see: the
594+
* gap between webhook receipt and the first block running, and — for
595+
* trigger_id-bound providers like Slack — the true age of the interaction
596+
* against its 3s expiry window. Logged structured so it is queryable/alarmable.
597+
*/
598+
if (payload.webhookReceivedAt !== undefined || payload.triggerTimestampMs !== undefined) {
599+
const now = Date.now()
600+
logger.info(`[${requestId}] Webhook dispatch latency`, {
601+
workflowId: payload.workflowId,
602+
provider: payload.provider,
603+
dispatchLatencyMs:
604+
payload.webhookReceivedAt !== undefined ? now - payload.webhookReceivedAt : undefined,
605+
triggerAgeMs:
606+
payload.triggerTimestampMs !== undefined ? now - payload.triggerTimestampMs : undefined,
607+
reusedPreprocessing: Boolean(payload.preprocessed),
608+
})
609+
}
610+
567611
const snapshot = new ExecutionSnapshot(
568612
metadata,
569613
workflowRecord,

apps/sim/lib/webhooks/processor.ts

Lines changed: 68 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,20 +32,37 @@ import { isPollingWebhookProvider } from '@/triggers/constants'
3232

3333
const logger = createLogger('WebhookProcessor')
3434

35+
/**
36+
* Preprocessing outputs resolved at the route level (active workflow record and
37+
* resolved execution timeout) that the background job can reuse to avoid a
38+
* redundant second {@link preprocessExecution} pass on the dispatch path.
39+
*/
40+
export interface PreprocessedWebhookContext {
41+
workflowRecord: typeof workflow.$inferSelect
42+
executionTimeout: { sync: number; async: number }
43+
}
44+
3545
export interface WebhookProcessorOptions {
3646
requestId: string
3747
path?: string
3848
webhookId?: string
3949
actorUserId?: string
4050
executionId?: string
4151
correlation?: AsyncExecutionCorrelation
52+
/** Epoch ms when the webhook HTTP request was first received (for dispatch-latency metrics). */
53+
receivedAt?: number
54+
/** Epoch ms of the originating provider interaction (e.g. Slack x-slack-request-timestamp). */
55+
triggerTimestampMs?: number
56+
/** Route-level preprocessing result reused by the background job. */
57+
preprocessed?: PreprocessedWebhookContext
4258
}
4359

4460
export interface WebhookPreprocessingResult {
4561
error: NextResponse | null
4662
actorUserId?: string
4763
executionId?: string
4864
correlation?: AsyncExecutionCorrelation
65+
preprocessed?: PreprocessedWebhookContext
4966
}
5067

5168
async function verifyCredentialSetBilling(credentialSetId: string): Promise<{
@@ -406,6 +423,16 @@ function resolveEnvVars(value: string, envVars: Record<string, string>): string
406423
return resolveEnvVarReferences(value, envVars) as string
407424
}
408425

426+
/** True when any string value in the provider config contains an env-var reference (`{{VAR}}`). */
427+
function providerConfigReferencesEnvVars(config: Record<string, unknown>): boolean {
428+
for (const value of Object.values(config)) {
429+
if (typeof value === 'string' && value.includes('{{')) {
430+
return true
431+
}
432+
}
433+
return false
434+
}
435+
409436
function resolveProviderConfigEnvVars(
410437
config: Record<string, unknown>,
411438
envVars: Record<string, string>
@@ -432,22 +459,30 @@ export async function verifyProviderAuth(
432459
rawBody: string,
433460
requestId: string
434461
): Promise<NextResponse | null> {
462+
const handler = getProviderHandler(foundWebhook.provider)
463+
const rawProviderConfig = (foundWebhook.providerConfig as Record<string, unknown>) || {}
464+
465+
/**
466+
* Only fetch + decrypt the effective env when there is auth to verify AND the
467+
* provider config actually references env vars (`{{VAR}}`). This avoids a DB
468+
* read and decryption on the synchronous pre-ack path for the common case.
469+
*/
435470
let decryptedEnvVars: Record<string, string> = {}
436-
try {
437-
decryptedEnvVars = await getEffectiveDecryptedEnv(
438-
foundWorkflow.userId,
439-
foundWorkflow.workspaceId
440-
)
441-
} catch (error) {
442-
logger.error(`[${requestId}] Failed to fetch environment variables`, {
443-
error,
444-
})
471+
if (handler.verifyAuth && providerConfigReferencesEnvVars(rawProviderConfig)) {
472+
try {
473+
decryptedEnvVars = await getEffectiveDecryptedEnv(
474+
foundWorkflow.userId,
475+
foundWorkflow.workspaceId
476+
)
477+
} catch (error) {
478+
logger.error(`[${requestId}] Failed to fetch environment variables`, {
479+
error,
480+
})
481+
}
445482
}
446483

447-
const rawProviderConfig = (foundWebhook.providerConfig as Record<string, unknown>) || {}
448484
const providerConfig = resolveProviderConfigEnvVars(rawProviderConfig, decryptedEnvVars)
449485

450-
const handler = getProviderHandler(foundWebhook.provider)
451486
if (handler.verifyAuth) {
452487
const authResult = await handler.verifyAuth({
453488
webhook: foundWebhook,
@@ -515,6 +550,13 @@ export async function checkWebhookPreprocessing(
515550
actorUserId: preprocessResult.actorUserId,
516551
executionId,
517552
correlation,
553+
preprocessed:
554+
preprocessResult.workflowRecord && preprocessResult.executionTimeout
555+
? {
556+
workflowRecord: preprocessResult.workflowRecord,
557+
executionTimeout: preprocessResult.executionTimeout,
558+
}
559+
: undefined,
518560
}
519561
} catch (preprocessError) {
520562
logger.error(`[${requestId}] Error during webhook preprocessing:`, preprocessError)
@@ -611,6 +653,10 @@ export async function queueWebhookExecution(
611653
blockId: foundWebhook.blockId,
612654
workspaceId: foundWorkflow.workspaceId,
613655
...(credentialId ? { credentialId } : {}),
656+
...(options.receivedAt !== undefined ? { webhookReceivedAt: options.receivedAt } : {}),
657+
...(options.triggerTimestampMs !== undefined
658+
? { triggerTimestampMs: options.triggerTimestampMs }
659+
: {}),
614660
}
615661

616662
const isPolling = isPollingWebhookProvider(payload.provider)
@@ -641,10 +687,20 @@ export async function queueWebhookExecution(
641687
`[${options.requestId}] Queued ${foundWebhook.provider} webhook execution ${jobId} via inline backend`
642688
)
643689

690+
/**
691+
* Reuse the route's preprocessing ONLY for this in-process execution, where
692+
* no time passes and no serialization occurs. The enqueued/persisted copy
693+
* (and the polling Trigger.dev path) deliberately omit it so any deferred
694+
* re-run still performs a fresh ban/archived re-validation.
695+
*/
696+
const inlinePayload = options.preprocessed
697+
? { ...payload, preprocessed: options.preprocessed }
698+
: payload
699+
644700
void (async () => {
645701
try {
646702
await jobQueue.startJob(jobId)
647-
const output = await executeWebhookJob(payload)
703+
const output = await executeWebhookJob(inlinePayload)
648704
await jobQueue.completeJob(jobId, output)
649705
} catch (error) {
650706
const errorMessage = toError(error).message

0 commit comments

Comments
 (0)