Skip to content

Commit a9d8d3f

Browse files
committed
improvement(webhooks): cut inline dispatch latency + add trigger-age instrumentation
- Skip the redundant second preprocessExecution pass for inline (Slack/generic) webhook execution: thread the route's already-resolved workflowRecord + executionTimeout into the in-process job (PreprocessedWebhookContext) so it reuses them instead of redoing ~4 DB reads. Scoped to the in-process inline call only; the persisted/polling Trigger.dev payload still re-validates fresh. - Add dispatch-latency / trigger-age instrumentation: capture webhook receipt time + Slack x-slack-request-timestamp at the route and log structured dispatchLatencyMs + triggerAgeMs, surfacing the pre-execution latency that per-block timings cannot see (Slack trigger_id expires at 3s). - Guard the effective-env fetch in verifyProviderAuth to only run when the handler verifies auth AND the providerConfig references env vars ({{VAR}}), avoiding a needless DB read/decrypt on the synchronous pre-ack path.
1 parent d9da544 commit a9d8d3f

3 files changed

Lines changed: 132 additions & 28 deletions

File tree

apps/sim/app/api/webhooks/trigger/[path]/route.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,15 @@ async function handleWebhookPost(
6969
request: NextRequest,
7070
context: { params: Promise<{ path: string }> }
7171
): Promise<NextResponse> {
72+
const receivedAt = Date.now()
73+
// Slack signs every interactive request with the originating interaction time.
74+
// Capturing it lets the executor surface the true trigger_id age (the window
75+
// that expires at 3s) instead of only the in-workflow block timings.
76+
const slackRequestTimestamp = request.headers.get('x-slack-request-timestamp')
77+
const triggerTimestampMs = slackRequestTimestamp
78+
? Number(slackRequestTimestamp) * 1000
79+
: undefined
80+
7281
const requestId = generateRequestId()
7382
const parsed = await parseRequest(webhookTriggerPostContract, request, context)
7483
if (!parsed.success) return parsed.response
@@ -200,6 +209,9 @@ async function handleWebhookPost(
200209
actorUserId: preprocessResult.actorUserId,
201210
executionId: preprocessResult.executionId,
202211
correlation: preprocessResult.correlation,
212+
receivedAt,
213+
triggerTimestampMs: Number.isFinite(triggerTimestampMs) ? triggerTimestampMs : undefined,
214+
preprocessed: preprocessResult.preprocessed,
203215
})
204216
responses.push(response)
205217
}

apps/sim/background/webhook-execution.ts

Lines changed: 56 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818
WebhookAttachmentProcessor,
1919
} from '@/lib/webhooks/attachment-processor'
2020
import { resolveWebhookRecordProviderConfig } from '@/lib/webhooks/env-resolver'
21+
import type { PreprocessedWebhookContext } from '@/lib/webhooks/processor'
2122
import { getProviderHandler } from '@/lib/webhooks/providers'
2223
import {
2324
executeWorkflowCore,
@@ -236,6 +237,12 @@ export type WebhookExecutionPayload = {
236237
blockId?: string
237238
workspaceId?: string
238239
credentialId?: string
240+
/** Epoch ms when the webhook HTTP request was first received (for dispatch-latency metrics). */
241+
webhookReceivedAt?: number
242+
/** Epoch ms of the originating provider interaction (e.g. Slack x-slack-request-timestamp). */
243+
triggerTimestampMs?: number
244+
/** Route-level preprocessing result; when present the job skips the redundant second pass. */
245+
preprocessed?: PreprocessedWebhookContext
239246
}
240247

241248
export async function executeWebhookJob(payload: WebhookExecutionPayload) {
@@ -350,25 +357,41 @@ async function executeWebhookJobInternal(
350357
requestId
351358
)
352359

353-
const preprocessResult = await preprocessExecution({
354-
workflowId: payload.workflowId,
355-
userId: payload.userId,
356-
triggerType: 'webhook',
357-
executionId,
358-
requestId,
359-
triggerData: { correlation },
360-
checkRateLimit: false,
361-
checkDeployment: false,
362-
skipUsageLimits: true,
363-
workspaceId: payload.workspaceId,
364-
loggingSession,
365-
})
360+
// The webhook route already ran full preprocessing (workflow validation,
361+
// billing actor, ban/usage/rate-limit gates, admission reservation) moments
362+
// ago and handed back the resolved record + timeout. Reuse it to skip a
363+
// redundant second pass (≈4 DB reads) on the latency-critical dispatch path.
364+
// Fall back to a fresh pass when the prefetched context is absent (e.g. other
365+
// callers, or a payload that predates this field).
366+
let workflowRecord: PreprocessedWebhookContext['workflowRecord'] | undefined
367+
let executionTimeout: PreprocessedWebhookContext['executionTimeout'] | undefined
368+
369+
if (payload.preprocessed) {
370+
workflowRecord = payload.preprocessed.workflowRecord
371+
executionTimeout = payload.preprocessed.executionTimeout
372+
} else {
373+
const preprocessResult = await preprocessExecution({
374+
workflowId: payload.workflowId,
375+
userId: payload.userId,
376+
triggerType: 'webhook',
377+
executionId,
378+
requestId,
379+
triggerData: { correlation },
380+
checkRateLimit: false,
381+
checkDeployment: false,
382+
skipUsageLimits: true,
383+
workspaceId: payload.workspaceId,
384+
loggingSession,
385+
})
366386

367-
if (!preprocessResult.success) {
368-
throw new Error(preprocessResult.error?.message || 'Preprocessing failed in background job')
387+
if (!preprocessResult.success) {
388+
throw new Error(preprocessResult.error?.message || 'Preprocessing failed in background job')
389+
}
390+
391+
workflowRecord = preprocessResult.workflowRecord
392+
executionTimeout = preprocessResult.executionTimeout
369393
}
370394

371-
const { workflowRecord, executionTimeout } = preprocessResult
372395
if (!workflowRecord) {
373396
throw new Error(`Workflow ${payload.workflowId} not found during preprocessing`)
374397
}
@@ -564,6 +587,23 @@ async function executeWebhookJobInternal(
564587

565588
const triggerInput = input || {}
566589

590+
// Surface the pre-execution latency that per-block timings cannot see: the
591+
// gap between webhook receipt and the first block running, and — for
592+
// trigger_id-bound providers like Slack — the true age of the interaction
593+
// against its 3s expiry window. Logged structured so it is queryable/alarmable.
594+
if (payload.webhookReceivedAt !== undefined || payload.triggerTimestampMs !== undefined) {
595+
const now = Date.now()
596+
logger.info(`[${requestId}] Webhook dispatch latency`, {
597+
workflowId: payload.workflowId,
598+
provider: payload.provider,
599+
dispatchLatencyMs:
600+
payload.webhookReceivedAt !== undefined ? now - payload.webhookReceivedAt : undefined,
601+
triggerAgeMs:
602+
payload.triggerTimestampMs !== undefined ? now - payload.triggerTimestampMs : undefined,
603+
reusedPreprocessing: Boolean(payload.preprocessed),
604+
})
605+
}
606+
567607
const snapshot = new ExecutionSnapshot(
568608
metadata,
569609
workflowRecord,

apps/sim/lib/webhooks/processor.ts

Lines changed: 64 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,20 +32,37 @@ import { isPollingWebhookProvider } from '@/triggers/constants'
3232

3333
const logger = createLogger('WebhookProcessor')
3434

35+
/**
36+
* Preprocessing outputs resolved at the route level (active workflow record and
37+
* resolved execution timeout) that the background job can reuse to avoid a
38+
* redundant second {@link preprocessExecution} pass on the dispatch path.
39+
*/
40+
export interface PreprocessedWebhookContext {
41+
workflowRecord: typeof workflow.$inferSelect
42+
executionTimeout: { sync: number; async: number }
43+
}
44+
3545
export interface WebhookProcessorOptions {
3646
requestId: string
3747
path?: string
3848
webhookId?: string
3949
actorUserId?: string
4050
executionId?: string
4151
correlation?: AsyncExecutionCorrelation
52+
/** Epoch ms when the webhook HTTP request was first received (for dispatch-latency metrics). */
53+
receivedAt?: number
54+
/** Epoch ms of the originating provider interaction (e.g. Slack x-slack-request-timestamp). */
55+
triggerTimestampMs?: number
56+
/** Route-level preprocessing result reused by the background job. */
57+
preprocessed?: PreprocessedWebhookContext
4258
}
4359

4460
export interface WebhookPreprocessingResult {
4561
error: NextResponse | null
4662
actorUserId?: string
4763
executionId?: string
4864
correlation?: AsyncExecutionCorrelation
65+
preprocessed?: PreprocessedWebhookContext
4966
}
5067

5168
async function verifyCredentialSetBilling(credentialSetId: string): Promise<{
@@ -406,6 +423,16 @@ function resolveEnvVars(value: string, envVars: Record<string, string>): string
406423
return resolveEnvVarReferences(value, envVars) as string
407424
}
408425

426+
/** True when any string value in the provider config contains an env-var reference (`{{VAR}}`). */
427+
function providerConfigReferencesEnvVars(config: Record<string, unknown>): boolean {
428+
for (const value of Object.values(config)) {
429+
if (typeof value === 'string' && value.includes('{{')) {
430+
return true
431+
}
432+
}
433+
return false
434+
}
435+
409436
function resolveProviderConfigEnvVars(
410437
config: Record<string, unknown>,
411438
envVars: Record<string, string>
@@ -432,22 +459,28 @@ export async function verifyProviderAuth(
432459
rawBody: string,
433460
requestId: string
434461
): Promise<NextResponse | null> {
462+
const handler = getProviderHandler(foundWebhook.provider)
463+
const rawProviderConfig = (foundWebhook.providerConfig as Record<string, unknown>) || {}
464+
465+
// Only fetch + decrypt the effective env when there is auth to verify AND the
466+
// provider config actually references env vars ({{VAR}}). This avoids a DB read
467+
// and decryption on the synchronous pre-ack path for the common case.
435468
let decryptedEnvVars: Record<string, string> = {}
436-
try {
437-
decryptedEnvVars = await getEffectiveDecryptedEnv(
438-
foundWorkflow.userId,
439-
foundWorkflow.workspaceId
440-
)
441-
} catch (error) {
442-
logger.error(`[${requestId}] Failed to fetch environment variables`, {
443-
error,
444-
})
469+
if (handler.verifyAuth && providerConfigReferencesEnvVars(rawProviderConfig)) {
470+
try {
471+
decryptedEnvVars = await getEffectiveDecryptedEnv(
472+
foundWorkflow.userId,
473+
foundWorkflow.workspaceId
474+
)
475+
} catch (error) {
476+
logger.error(`[${requestId}] Failed to fetch environment variables`, {
477+
error,
478+
})
479+
}
445480
}
446481

447-
const rawProviderConfig = (foundWebhook.providerConfig as Record<string, unknown>) || {}
448482
const providerConfig = resolveProviderConfigEnvVars(rawProviderConfig, decryptedEnvVars)
449483

450-
const handler = getProviderHandler(foundWebhook.provider)
451484
if (handler.verifyAuth) {
452485
const authResult = await handler.verifyAuth({
453486
webhook: foundWebhook,
@@ -515,6 +548,13 @@ export async function checkWebhookPreprocessing(
515548
actorUserId: preprocessResult.actorUserId,
516549
executionId,
517550
correlation,
551+
preprocessed:
552+
preprocessResult.workflowRecord && preprocessResult.executionTimeout
553+
? {
554+
workflowRecord: preprocessResult.workflowRecord,
555+
executionTimeout: preprocessResult.executionTimeout,
556+
}
557+
: undefined,
518558
}
519559
} catch (preprocessError) {
520560
logger.error(`[${requestId}] Error during webhook preprocessing:`, preprocessError)
@@ -611,6 +651,10 @@ export async function queueWebhookExecution(
611651
blockId: foundWebhook.blockId,
612652
workspaceId: foundWorkflow.workspaceId,
613653
...(credentialId ? { credentialId } : {}),
654+
...(options.receivedAt !== undefined ? { webhookReceivedAt: options.receivedAt } : {}),
655+
...(options.triggerTimestampMs !== undefined
656+
? { triggerTimestampMs: options.triggerTimestampMs }
657+
: {}),
614658
}
615659

616660
const isPolling = isPollingWebhookProvider(payload.provider)
@@ -641,10 +685,18 @@ export async function queueWebhookExecution(
641685
`[${options.requestId}] Queued ${foundWebhook.provider} webhook execution ${jobId} via inline backend`
642686
)
643687

688+
// Reuse the route's preprocessing ONLY for this in-process execution, where
689+
// no time passes and no serialization occurs. The enqueued/persisted copy
690+
// (and the polling Trigger.dev path) deliberately omit it so any deferred
691+
// re-run still performs a fresh ban/archived re-validation.
692+
const inlinePayload = options.preprocessed
693+
? { ...payload, preprocessed: options.preprocessed }
694+
: payload
695+
644696
void (async () => {
645697
try {
646698
await jobQueue.startJob(jobId)
647-
const output = await executeWebhookJob(payload)
699+
const output = await executeWebhookJob(inlinePayload)
648700
await jobQueue.completeJob(jobId, output)
649701
} catch (error) {
650702
const errorMessage = toError(error).message

0 commit comments

Comments
 (0)