Skip to content

Commit 3bbee8c

Browse files
committed
Add an option for choosing the debounce mode, trailing or leading, with default leading
1 parent b383a73 commit 3bbee8c

File tree

8 files changed

+938
-9
lines changed

8 files changed

+938
-9
lines changed

docs/triggering.mdx

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -861,14 +861,37 @@ The `debounce` option accepts:
861861

862862
- `key` - A unique string to identify the debounce group (scoped to the task)
863863
- `delay` - Duration string specifying how long to delay (e.g., "5s", "1m", "30s")
864+
- `mode` - Optional. Controls which trigger's data is used: `"leading"` (default) or `"trailing"`
864865

865866
**How it works:**
866867

867868
1. First trigger with a debounce key creates a new delayed run
868869
2. Subsequent triggers with the same key (while the run is still delayed) push the execution time further
869-
3. Once no new triggers occur within the delay duration, the run executes with the **first** payload
870+
3. Once no new triggers occur within the delay duration, the run executes
870871
4. After the run starts executing, a new trigger with the same key will create a new run
871872

873+
**Leading vs Trailing mode:**
874+
875+
By default, debounce uses **leading mode** - the run executes with data from the **first** trigger.
876+
877+
With **trailing mode**, each subsequent trigger updates the run's data (payload, metadata, tags, maxAttempts, maxDuration, and machine), so the run executes with data from the **last** trigger:
878+
879+
```ts
880+
// Leading mode (default): runs with first payload
881+
await myTask.trigger({ count: 1 }, { debounce: { key: "user-123", delay: "5s" } });
882+
await myTask.trigger({ count: 2 }, { debounce: { key: "user-123", delay: "5s" } });
883+
// After 5 seconds, runs with { count: 1 }
884+
885+
// Trailing mode: runs with last payload
886+
await myTask.trigger({ count: 1 }, { debounce: { key: "user-123", delay: "5s", mode: "trailing" } });
887+
await myTask.trigger({ count: 2 }, { debounce: { key: "user-123", delay: "5s", mode: "trailing" } });
888+
// After 5 seconds, runs with { count: 2 }
889+
```
890+
891+
Use **trailing mode** when you want to process the most recent data, such as:
892+
- Saving the latest version of a document after edits stop
893+
- Processing the final state after a series of rapid updates
894+
872895
**With `triggerAndWait`:**
873896

874897
When using `triggerAndWait` with debounce, the parent run blocks on the existing debounced run if one exists:

internal-packages/run-engine/src/engine/index.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,22 @@ export class RunEngine {
457457
const debounceResult = await this.debounceSystem.handleDebounce({
458458
environmentId: environment.id,
459459
taskIdentifier,
460-
debounce,
460+
debounce:
461+
debounce.mode === "trailing"
462+
? {
463+
...debounce,
464+
updateData: {
465+
payload,
466+
payloadType,
467+
metadata,
468+
metadataType,
469+
tags,
470+
maxAttempts,
471+
maxDurationInSeconds,
472+
machine,
473+
},
474+
}
475+
: debounce,
461476
tx: prisma,
462477
});
463478

internal-packages/run-engine/src/engine/systems/debounceSystem.ts

Lines changed: 97 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,18 @@ import { DelayedRunSystem } from "./delayedRunSystem.js";
1616
export type DebounceOptions = {
1717
key: string;
1818
delay: string;
19+
mode?: "leading" | "trailing";
20+
/** When mode: "trailing", these fields will be used to update the existing run */
21+
updateData?: {
22+
payload: string;
23+
payloadType: string;
24+
metadata?: string;
25+
metadataType?: string;
26+
tags?: { id: string; name: string }[];
27+
maxAttempts?: number;
28+
maxDurationInSeconds?: number;
29+
machine?: string;
30+
};
1931
};
2032

2133
export type DebounceSystemOptions = {
@@ -439,9 +451,24 @@ return { 0, value }
439451
);
440452
}
441453

454+
// Update run data when mode is "trailing"
455+
let updatedRun = existingRun;
456+
if (debounce.mode === "trailing" && debounce.updateData) {
457+
updatedRun = await this.#updateRunForTrailingMode({
458+
runId: existingRunId,
459+
updateData: debounce.updateData,
460+
tx: prisma,
461+
});
462+
463+
this.$.logger.debug("handleExistingRun: updated run data for trailing mode", {
464+
existingRunId,
465+
debounceKey: debounce.key,
466+
});
467+
}
468+
442469
return {
443470
status: "existing",
444-
run: existingRun,
471+
run: updatedRun,
445472
waitpoint: existingRun.associatedWaitpoint,
446473
};
447474
});
@@ -639,6 +666,75 @@ return { 0, value }
639666
});
640667
}
641668

669+
/**
670+
* Updates a run's data for trailing mode debounce.
671+
* Updates: payload, metadata, tags, maxAttempts, maxDurationInSeconds, machinePreset
672+
*/
673+
async #updateRunForTrailingMode({
674+
runId,
675+
updateData,
676+
tx,
677+
}: {
678+
runId: string;
679+
updateData: NonNullable<DebounceOptions["updateData"]>;
680+
tx?: PrismaClientOrTransaction;
681+
}): Promise<TaskRun & { associatedWaitpoint: Waitpoint | null }> {
682+
const prisma = tx ?? this.$.prisma;
683+
684+
// Build the update object
685+
const updatePayload: {
686+
payload: string;
687+
payloadType: string;
688+
metadata?: string;
689+
metadataType?: string;
690+
maxAttempts?: number;
691+
maxDurationInSeconds?: number;
692+
machinePreset?: string;
693+
runTags?: string[];
694+
tags?: {
695+
set: { id: string }[];
696+
};
697+
} = {
698+
payload: updateData.payload,
699+
payloadType: updateData.payloadType,
700+
};
701+
702+
if (updateData.metadata !== undefined) {
703+
updatePayload.metadata = updateData.metadata;
704+
updatePayload.metadataType = updateData.metadataType ?? "application/json";
705+
}
706+
707+
if (updateData.maxAttempts !== undefined) {
708+
updatePayload.maxAttempts = updateData.maxAttempts;
709+
}
710+
711+
if (updateData.maxDurationInSeconds !== undefined) {
712+
updatePayload.maxDurationInSeconds = updateData.maxDurationInSeconds;
713+
}
714+
715+
if (updateData.machine !== undefined) {
716+
updatePayload.machinePreset = updateData.machine;
717+
}
718+
719+
// Handle tags update - replace existing tags
720+
if (updateData.tags !== undefined) {
721+
updatePayload.runTags = updateData.tags.map((t) => t.name);
722+
updatePayload.tags = {
723+
set: updateData.tags.map((t) => ({ id: t.id })),
724+
};
725+
}
726+
727+
const updatedRun = await prisma.taskRun.update({
728+
where: { id: runId },
729+
data: updatePayload,
730+
include: {
731+
associatedWaitpoint: true,
732+
},
733+
});
734+
735+
return updatedRun;
736+
}
737+
642738
async quit(): Promise<void> {
643739
await this.redis.quit();
644740
}

0 commit comments

Comments
 (0)