Skip to content

Commit d78877b

Browse files
committed
Fix new debounce run race
1 parent f3f7355 commit d78877b

File tree

2 files changed

+251
-12
lines changed

2 files changed

+251
-12
lines changed

internal-packages/run-engine/src/engine/systems/debounceSystem.ts

Lines changed: 122 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,74 @@ return 0
243243
return { claimed: false, existingRunId: existingValue };
244244
}
245245

246+
/**
247+
* Atomically claims the debounce key before returning "new".
248+
* This prevents the race condition where returning "new" without a claimId
249+
* allows registerDebouncedRun to do a plain SET that can overwrite another server's registration.
250+
*
251+
* This method is called when we've determined there's no valid existing run but need
252+
* to safely claim the key before creating a new one.
253+
*/
254+
private async claimKeyForNewRun({
255+
environmentId,
256+
taskIdentifier,
257+
debounce,
258+
tx,
259+
}: {
260+
environmentId: string;
261+
taskIdentifier: string;
262+
debounce: DebounceOptions;
263+
tx?: PrismaClientOrTransaction;
264+
}): Promise<DebounceResult> {
265+
const redisKey = this.getDebounceRedisKey(environmentId, taskIdentifier, debounce.key);
266+
const claimId = nanoid(16);
267+
268+
const claimResult = await this.claimDebounceKey({
269+
environmentId,
270+
taskIdentifier,
271+
debounceKey: debounce.key,
272+
claimId,
273+
ttlMs: CLAIM_TTL_MS,
274+
});
275+
276+
if (claimResult.claimed) {
277+
this.$.logger.debug("claimKeyForNewRun: claimed key, returning new", {
278+
debounceKey: debounce.key,
279+
taskIdentifier,
280+
environmentId,
281+
claimId,
282+
});
283+
return { status: "new", claimId };
284+
}
285+
286+
if (claimResult.existingRunId) {
287+
// Another server registered a run while we were processing - handle it
288+
this.$.logger.debug("claimKeyForNewRun: found existing run, handling it", {
289+
debounceKey: debounce.key,
290+
existingRunId: claimResult.existingRunId,
291+
});
292+
return await this.handleExistingRun({
293+
existingRunId: claimResult.existingRunId,
294+
redisKey,
295+
environmentId,
296+
taskIdentifier,
297+
debounce,
298+
tx,
299+
});
300+
}
301+
302+
// Another server is creating (pending state) - wait for it
303+
this.$.logger.debug("claimKeyForNewRun: key is pending, waiting for existing run", {
304+
debounceKey: debounce.key,
305+
});
306+
return await this.waitForExistingRun({
307+
environmentId,
308+
taskIdentifier,
309+
debounce,
310+
tx,
311+
});
312+
}
313+
246314
/**
247315
* Waits for another server to complete its claim and register a run ID.
248316
* Used when we detect a "pending" state, meaning another server has claimed
@@ -267,13 +335,18 @@ return 0
267335
const value = await this.redis.get(redisKey);
268336

269337
if (!value) {
270-
// Key expired or was deleted - return "new" to create fresh
271-
this.$.logger.debug("waitForExistingRun: key expired/deleted, returning new", {
338+
// Key expired or was deleted - atomically claim before returning "new"
339+
this.$.logger.debug("waitForExistingRun: key expired/deleted, claiming key", {
272340
redisKey,
273341
debounceKey: debounce.key,
274342
attempt: i + 1,
275343
});
276-
return { status: "new" };
344+
return await this.claimKeyForNewRun({
345+
environmentId,
346+
taskIdentifier,
347+
debounce,
348+
tx,
349+
});
277350
}
278351

279352
if (!value.startsWith("pending:")) {
@@ -287,6 +360,8 @@ return 0
287360
return await this.handleExistingRun({
288361
existingRunId: value,
289362
redisKey,
363+
environmentId,
364+
taskIdentifier,
290365
debounce,
291366
tx,
292367
});
@@ -314,12 +389,17 @@ return 0
314389
const deleteResult = await this.conditionallyDeletePendingKey(redisKey);
315390

316391
if (deleteResult.deleted) {
317-
// Key was pending (or didn't exist) - safe to create new run
318-
this.$.logger.debug("waitForExistingRun: stale pending key deleted, returning new", {
392+
// Key was pending (or didn't exist) - atomically claim before returning "new"
393+
this.$.logger.debug("waitForExistingRun: stale pending key deleted, claiming key", {
319394
redisKey,
320395
debounceKey: debounce.key,
321396
});
322-
return { status: "new" };
397+
return await this.claimKeyForNewRun({
398+
environmentId,
399+
taskIdentifier,
400+
debounce,
401+
tx,
402+
});
323403
}
324404

325405
// Key now has a valid run ID - the original server completed!
@@ -335,6 +415,8 @@ return 0
335415
return await this.handleExistingRun({
336416
existingRunId: deleteResult.existingRunId,
337417
redisKey,
418+
environmentId,
419+
taskIdentifier,
338420
debounce,
339421
tx,
340422
});
@@ -347,11 +429,15 @@ return 0
347429
private async handleExistingRun({
348430
existingRunId,
349431
redisKey,
432+
environmentId,
433+
taskIdentifier,
350434
debounce,
351435
tx,
352436
}: {
353437
existingRunId: string;
354438
redisKey: string;
439+
environmentId: string;
440+
taskIdentifier: string;
355441
debounce: DebounceOptions;
356442
tx?: PrismaClientOrTransaction;
357443
}): Promise<DebounceResult> {
@@ -369,9 +455,14 @@ return 0
369455
debounceKey: debounce.key,
370456
error,
371457
});
372-
// Clean up stale Redis key
458+
// Clean up stale Redis key and atomically claim before returning "new"
373459
await this.redis.del(redisKey);
374-
return { status: "new" };
460+
return await this.claimKeyForNewRun({
461+
environmentId,
462+
taskIdentifier,
463+
debounce,
464+
tx,
465+
});
375466
}
376467

377468
// Check if run is still in DELAYED status (or legacy RUN_CREATED for older runs)
@@ -381,9 +472,14 @@ return 0
381472
executionStatus: snapshot.executionStatus,
382473
debounceKey: debounce.key,
383474
});
384-
// Clean up Redis key since run is no longer debounceable
475+
// Clean up Redis key and atomically claim before returning "new"
385476
await this.redis.del(redisKey);
386-
return { status: "new" };
477+
return await this.claimKeyForNewRun({
478+
environmentId,
479+
taskIdentifier,
480+
debounce,
481+
tx,
482+
});
387483
}
388484

389485
// Get the run to check debounce metadata and createdAt
@@ -399,8 +495,14 @@ return 0
399495
existingRunId,
400496
debounceKey: debounce.key,
401497
});
498+
// Clean up stale Redis key and atomically claim before returning "new"
402499
await this.redis.del(redisKey);
403-
return { status: "new" };
500+
return await this.claimKeyForNewRun({
501+
environmentId,
502+
taskIdentifier,
503+
debounce,
504+
tx,
505+
});
404506
}
405507

406508
// Calculate new delay - parseNaturalLanguageDuration returns a Date (now + duration)
@@ -409,7 +511,13 @@ return 0
409511
this.$.logger.error("handleExistingRun: invalid delay duration", {
410512
delay: debounce.delay,
411513
});
412-
return { status: "new" };
514+
// Invalid delay but we still need to atomically claim before returning "new"
515+
return await this.claimKeyForNewRun({
516+
environmentId,
517+
taskIdentifier,
518+
debounce,
519+
tx,
520+
});
413521
}
414522

415523
// Check if max debounce duration would be exceeded
@@ -566,6 +674,8 @@ return 0
566674
return await this.handleExistingRun({
567675
existingRunId: claimResult.existingRunId,
568676
redisKey,
677+
environmentId,
678+
taskIdentifier,
569679
debounce,
570680
tx,
571681
});

internal-packages/run-engine/src/engine/tests/debounce.test.ts

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2041,5 +2041,134 @@ describe("RunEngine debounce", () => {
20412041
}
20422042
}
20432043
);
2044+
2045+
containerTest(
2046+
"waitForExistingRun: returns claimId when key expires during wait",
2047+
async ({ prisma, redisOptions }) => {
2048+
// This test verifies the fix for the race condition where waitForExistingRun
2049+
// returns { status: "new" } without a claimId. Without the fix:
2050+
// 1. Server A's pending claim expires
2051+
// 2. Server B's waitForExistingRun detects key is gone, returns { status: "new" } (no claimId)
2052+
// 3. Server C atomically claims the key and registers runId-C
2053+
// 4. Server B calls registerDebouncedRun without claimId, does plain SET, overwrites runId-C
2054+
//
2055+
// With the fix, step 2 atomically claims the key before returning, preventing step 4's overwrite.
2056+
2057+
const { createRedisClient } = await import("@internal/redis");
2058+
2059+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
2060+
2061+
const engine = new RunEngine({
2062+
prisma,
2063+
worker: {
2064+
redis: redisOptions,
2065+
workers: 1,
2066+
tasksPerWorker: 10,
2067+
pollIntervalMs: 100,
2068+
},
2069+
queue: {
2070+
redis: redisOptions,
2071+
},
2072+
runLock: {
2073+
redis: redisOptions,
2074+
},
2075+
machines: {
2076+
defaultMachine: "small-1x",
2077+
machines: {
2078+
"small-1x": {
2079+
name: "small-1x" as const,
2080+
cpu: 0.5,
2081+
memory: 0.5,
2082+
centsPerMs: 0.0001,
2083+
},
2084+
},
2085+
baseCostInCents: 0.0001,
2086+
},
2087+
debounce: {
2088+
maxDebounceDurationMs: 60_000,
2089+
},
2090+
tracer: trace.getTracer("test", "0.0.0"),
2091+
});
2092+
2093+
// Create a separate Redis client to simulate "another server" modifying keys directly
2094+
const simulatedServerRedis = createRedisClient({
2095+
...redisOptions,
2096+
keyPrefix: `${redisOptions.keyPrefix ?? ""}debounce:`,
2097+
});
2098+
2099+
try {
2100+
const taskIdentifier = "test-task";
2101+
const debounceKey = "wait-race-test-key";
2102+
const environmentId = authenticatedEnvironment.id;
2103+
2104+
await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier);
2105+
2106+
// Construct the Redis key (same format as DebounceSystem.getDebounceRedisKey)
2107+
const redisKey = `${environmentId}:${taskIdentifier}:${debounceKey}`;
2108+
2109+
// Step 1: Server A claims the key with a pending claim
2110+
const claimIdA = "claim-server-A";
2111+
await simulatedServerRedis.set(redisKey, `pending:${claimIdA}`, "PX", 60_000);
2112+
2113+
// Step 2: Delete the key to simulate Server A's claim expiring
2114+
await simulatedServerRedis.del(redisKey);
2115+
2116+
// Step 3: Server B calls handleDebounce - since key is gone, it should atomically claim
2117+
const debounceResult = await engine.debounceSystem.handleDebounce({
2118+
environmentId,
2119+
taskIdentifier,
2120+
debounce: {
2121+
key: debounceKey,
2122+
delay: "5s",
2123+
},
2124+
});
2125+
2126+
// Step 4: Verify result is { status: "new" } WITH a claimId
2127+
expect(debounceResult.status).toBe("new");
2128+
if (debounceResult.status === "new") {
2129+
expect(debounceResult.claimId).toBeDefined();
2130+
expect(typeof debounceResult.claimId).toBe("string");
2131+
expect(debounceResult.claimId!.length).toBeGreaterThan(0);
2132+
2133+
// Step 5: Verify the key now contains Server B's pending claim
2134+
const valueAfterB = await simulatedServerRedis.get(redisKey);
2135+
expect(valueAfterB).toBe(`pending:${debounceResult.claimId}`);
2136+
2137+
// Step 6: Server C tries to claim the same key - should fail
2138+
const claimIdC = "claim-server-C";
2139+
const claimResultC = await simulatedServerRedis.set(
2140+
redisKey,
2141+
`pending:${claimIdC}`,
2142+
"PX",
2143+
60_000,
2144+
"NX"
2145+
);
2146+
expect(claimResultC).toBeNull(); // NX fails because key exists
2147+
2148+
// Step 7: Server B registers its run using its claimId
2149+
const runIdB = "run_server_B";
2150+
const delayUntil = new Date(Date.now() + 60_000);
2151+
const registered = await engine.debounceSystem.registerDebouncedRun({
2152+
runId: runIdB,
2153+
environmentId,
2154+
taskIdentifier,
2155+
debounceKey,
2156+
delayUntil,
2157+
claimId: debounceResult.claimId,
2158+
});
2159+
2160+
// Step 8: Verify Server B's registration succeeded
2161+
expect(registered).toBe(true);
2162+
2163+
// Step 9: Verify Redis contains Server B's run ID
2164+
const finalValue = await simulatedServerRedis.get(redisKey);
2165+
expect(finalValue).toBe(runIdB);
2166+
}
2167+
} finally {
2168+
await simulatedServerRedis.quit();
2169+
await engine.quit();
2170+
}
2171+
}
2172+
);
20442173
});
20452174

0 commit comments

Comments
 (0)