Skip to content

Commit 3924aa4

Browse files
d-csclaude
andauthored
feat(redis-worker,webapp): mollifier buffer extensions + snapshot type (#3752)
## Summary Buffer-side data layer used by the rest of the mollifier phase-3 stack. - `buffer.ts` gains entry inspection (`getEntry`), idempotency lookup (`lookupIdempotency`), in-place snapshot mutation (`mutateSnapshot`), and dwell tracking. All atomic via Lua. - `mollifierSnapshot.server.ts`: shared `MollifierSnapshot` type plus (de)serialise helpers. - Drops the entry-TTL config and its env var. The drainer is the recovery mechanism; an entry that survives the drainer should surface as a stale-sweep alert, not silently TTL away. Adds methods to the buffer interface; nothing consumes them yet. Subsequent PRs in the stack wire trigger-time mollify, read-fallback, and mutation paths against this surface. ## Test plan - [x] \`pnpm run typecheck --filter webapp\` passes - [x] \`pnpm run test --filter @trigger.dev/redis-worker packages/redis-worker/src/mollifier/buffer.test.ts\` passes --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 181d9ba commit 3924aa4

17 files changed

Lines changed: 2631 additions & 185 deletions
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/redis-worker": minor
3+
---
4+
5+
Mollifier buffer extensions: idempotency dedup, an atomic `mutateSnapshot` API, metadata CAS, claim primitives, and a `MollifierSnapshot` type. The buffer's Redis client now reconnects with jittered backoff so a fleet of clients doesn't stampede Redis in lockstep after a blip.
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/redis-worker": patch
3+
---
4+
5+
Pipeline the per-entry `HGETALL` fetches in `MollifierBuffer.listEntriesForEnv`. The previous serial implementation issued one Redis round-trip per runId returned by `LRANGE`, which dominated stale-sweep wall-time at any meaningful backlog (at the sweep's default maxCount=1000, this is ~1000 RTTs per env per pass). Behaviour is unchanged — entries are still skipped when the entry hash has been torn down by a concurrent drainer ack/fail between the LRANGE and the HGETALL.

apps/webapp/app/env.server.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1095,7 +1095,6 @@ const EnvironmentSchema = z
10951095
TRIGGER_MOLLIFIER_TRIP_THRESHOLD: z.coerce.number().int().positive().default(100),
10961096
TRIGGER_MOLLIFIER_HOLD_MS: z.coerce.number().int().positive().default(500),
10971097
TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY: z.coerce.number().int().positive().default(50),
1098-
TRIGGER_MOLLIFIER_ENTRY_TTL_S: z.coerce.number().int().positive().default(600),
10991098
TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3),
11001099
TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().positive().default(30_000),
11011100
TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK: z.coerce.number().int().positive().default(500),

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -374,13 +374,13 @@ export class RunEngineTriggerTaskService {
374374

375375
const payloadPacket = await this.payloadProcessor.process(triggerRequest);
376376

377-
// Phase 1 dual-write: if the org has the mollifier feature flag
377+
// Dual-write: if the org has the mollifier feature flag
378378
// enabled and the per-env trip evaluator says divert, write the
379379
// canonical replay payload to the buffer AND continue through
380380
// engine.trigger as normal. The buffer entry is an audit/preview
381381
// copy; the drainer's no-op handler consumes it to prove the
382-
// dequeue mechanism works. Phase 2 will replace engine.trigger
383-
// (below) with a synthesised 200 response and rely on the
382+
// dequeue mechanism works. A later change replaces engine.trigger
383+
// (below) with a synthesised 200 response and relies on the
384384
// drainer to perform the Postgres write via replay.
385385
if (mollifierOutcome?.action === "mollify") {
386386
const buffer = this.getMollifierBuffer();
@@ -430,8 +430,8 @@ export class RunEngineTriggerTaskService {
430430
});
431431
} catch (err) {
432432
// Fail-open: buffer write must never block the customer's
433-
// trigger. engine.trigger below is the primary write path
434-
// in Phase 1 — the customer still gets a valid run.
433+
// trigger. engine.trigger below is still the primary write
434+
// path here — the customer still gets a valid run.
435435
logger.error("mollifier.buffer_accept_failed", {
436436
runId: runFriendlyId,
437437
envId: environment.id,

apps/webapp/app/v3/mollifier/bufferedTriggerPayload.server.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,17 @@ import type { TriggerTaskRequestBody } from "@trigger.dev/core/v3";
22
import type { TriggerTaskServiceOptions } from "~/v3/services/triggerTask.server";
33

44
// Canonical payload shape written to the mollifier buffer when the gate
5-
// decides to mollify a trigger. Phase 1 ALSO calls engine.trigger directly
6-
// (dual-write) so this is currently an audit/preview record. Phase 2 will
7-
// make the buffer the primary write path: the drainer's handler will read
8-
// this payload and replay it through engine.trigger to create the run in
9-
// Postgres, and read-fallback endpoints will synthesise a Run view from it
10-
// while it is still QUEUED.
5+
// decides to mollify a trigger. At this stage the call site ALSO calls
6+
// engine.trigger directly (dual-write), so this is currently an
7+
// audit/preview record. A later change makes the buffer the primary write
8+
// path: the drainer's handler reads this payload and replays it through
9+
// engine.trigger to create the run in Postgres, and read-fallback
10+
// endpoints synthesise a Run view from it while it is still QUEUED.
1111
//
12-
// CONTRACT: this shape must contain everything needed for Phase 2's
13-
// drainer-replay to reconstruct an equivalent engine.trigger call. Phase 1
14-
// emits it to logs; Phase 2 will serialise it into Redis and rebuild it on
15-
// the drain side. Keep it serialisable — no functions, no class instances.
12+
// CONTRACT: this shape must contain everything the drainer-replay needs to
13+
// reconstruct an equivalent engine.trigger call. Today it is emitted to
14+
// logs; later it is serialised into Redis and rebuilt on the drain side.
15+
// Keep it serialisable — no functions, no class instances.
1616
export type BufferedTriggerPayload = {
1717
runFriendlyId: string;
1818

apps/webapp/app/v3/mollifier/mollifierBuffer.server.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ function initializeMollifierBuffer(): MollifierBuffer {
2222
enableAutoPipelining: true,
2323
...(env.TRIGGER_MOLLIFIER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
2424
},
25-
entryTtlSeconds: env.TRIGGER_MOLLIFIER_ENTRY_TTL_S,
2625
});
2726
}
2827

apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,11 @@ function initializeMollifierDrainer(): MollifierDrainer<BufferedTriggerPayload>
6868
maxAttempts: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS,
6969
});
7070

71-
// Phase 1 handler: no-op ack. The trigger has ALREADY been written to
72-
// Postgres via engine.trigger (dual-write at the call site). Popping +
73-
// acking here proves the dequeue mechanism works end-to-end without
74-
// duplicating the work. Phase 2 will replace this with an engine.trigger
75-
// replay that performs the actual Postgres write.
71+
// No-op ack handler: the trigger has ALREADY been written to Postgres
72+
// via engine.trigger (dual-write at the call site). Popping + acking
73+
// here proves the dequeue mechanism works end-to-end without duplicating
74+
// the work. A later change replaces this with an engine.trigger replay
75+
// that performs the actual Postgres write.
7676
const drainer = new MollifierDrainer<BufferedTriggerPayload>({
7777
buffer,
7878
handler: async (input) => {

apps/webapp/app/v3/mollifier/mollifierGate.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ export type GateDependencies = {
7373
};
7474

7575
// `options` is a thunk so env reads happen per-evaluation, not at module load.
76-
// Don't "simplify" to a plain object — Phase 2 dynamic config relies on the
76+
// Don't "simplify" to a plain object — dynamic config relies on the
7777
// gate observing whichever env values are live at trigger time.
7878
const defaultEvaluator = createRealTripEvaluator({
7979
getBuffer: () => getMollifierBuffer(),
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import { serialiseSnapshot, deserialiseSnapshot } from "@trigger.dev/redis-worker";
2+
3+
// MollifierSnapshot is the JSON-serialisable shape of the input that would be
4+
// passed to engine.trigger(). The drainer deserialises and replays it.
5+
// Kept as Record<string, unknown> at this layer — the engine.trigger call site
6+
// casts it to the engine's typed input. This keeps the mollifier subdirectory
7+
// from depending on @internal/run-engine internals.
8+
export type MollifierSnapshot = Record<string, unknown>;
9+
10+
export function serialiseMollifierSnapshot(input: MollifierSnapshot): string {
11+
return serialiseSnapshot(input);
12+
}
13+
14+
export function deserialiseMollifierSnapshot(serialised: string): MollifierSnapshot {
15+
return deserialiseSnapshot<MollifierSnapshot>(serialised);
16+
}

apps/webapp/app/v3/mollifier/mollifierTripEvaluator.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ export function createRealTripEvaluator(deps: CreateRealTripEvaluatorDeps): Trip
3535
} catch (err) {
3636
// Deliberate: no error counter here. Shadow mode means a silent miss is
3737
// harmless — fail-open is the safe direction. The error log + Sentry
38-
// capture is sufficient operability for Phase 1. Revisit in Phase 2
39-
// when buffer writes are the primary path and a missed evaluation has cost.
38+
// capture is sufficient operability while this runs in shadow mode. Revisit
39+
// once buffer writes are the primary path and a missed evaluation has cost.
4040
logger.error("mollifier trip evaluator: fail-open on error", {
4141
envId: inputs.envId,
4242
err: err instanceof Error ? err.message : String(err),

0 commit comments

Comments
 (0)