Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/abort-signal-replay-ordering.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@workflow/core': patch
---

Fix a race where an `AbortController` aborted from a step was not reflected in a `controller.signal` passed to a subsequent step. The step now commits the abort's durable hook event before completing, and the workflow's suspension waits for the abort to land before serializing downstream step arguments.
122 changes: 120 additions & 2 deletions packages/core/src/abort-controller-step.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@
*/

import { FatalError } from '@workflow/errors';
import { describe, expect, it, vi, beforeEach } from 'vitest';
import { beforeEach, describe, expect, it, vi } from 'vitest';
import {
dehydrateStepArguments,
hydrateStepArguments,
} from './serialization.js';
import { contextStorage, type StepContext } from './step/context-storage.js';
import { ABORT_HOOK_TOKEN, ABORT_STREAM_NAME } from './symbols.js';
import { contextStorage } from './step/context-storage.js';

// ============================================================================
// Mocks
Expand Down Expand Up @@ -580,3 +584,117 @@ describe('AbortSignal deserialized in step context', () => {
});
});
});

/**
* Exercises the REAL `reviveAbortController` (via `hydrateStepArguments`) rather
* than the mock copy above, to lock in where a step-initiated abort routes its
* two async operations.
*
* Regression: a step that aborts a controller must record the durable
* `hook_received` event BEFORE it completes. If that hook resume is flushed in
* the background (`ctx.ops`), the workflow continuation enqueued by
* `step_completed` can advance past the abort — dispatching a later step with a
* stale, non-aborted `signal` — before the event exists (the abortFromStep E2E
* flake). The hook resume must land in `ctx.preCompletionOps`, which the step
* handler awaits before writing `step_completed`. The real-time stream write
* (which reaches an in-flight sibling) stays in the background `ctx.ops`.
*/
describe('step-initiated abort: durable hook resume is committed before completion', () => {
beforeEach(() => {
mockResumeHook.mockClear();
mockStreamReads.readResults.clear();
mockStreamReads.writeLog = [];
mockStreamReads.closeLog = [];
});

async function reviveControllerInStep(): Promise<{
controller: AbortController;
ops: Promise<void>[];
preCompletionOps: Promise<void>[];
}> {
// A controller carrying the abort symbols serializes through the workflow
// reducer with a streamName + hookToken — the shape a step receives.
const source = new AbortController();
(source as any)[ABORT_STREAM_NAME] = 'strm_pre_completion_abort';
(source as any)[ABORT_HOOK_TOKEN] = 'abrt_pre_completion';
(source.signal as any)[ABORT_STREAM_NAME] = 'strm_pre_completion_abort';
(source.signal as any)[ABORT_HOOK_TOKEN] = 'abrt_pre_completion';

const dehydrated = await dehydrateStepArguments(
[source],
'wrun_test',
undefined
);

const ops: Promise<void>[] = [];
const preCompletionOps: Promise<void>[] = [];
const store: StepContext = {
stepMetadata: {
stepName: 'aborter',
stepId: 'step_test',
stepStartedAt: new Date(),
attempt: 1,
},
workflowMetadata: {
workflowName: 'wf',
workflowRunId: 'wrun_test',
workflowStartedAt: new Date(),
features: { encryption: false },
},
ops,
preCompletionOps,
encryptionKey: undefined,
};

const controller = await contextStorage.run(store, async () => {
const [revived] = (await hydrateStepArguments(
dehydrated,
'wrun_test',
undefined,
ops
)) as [AbortController];
return revived;
});

return { controller, ops, preCompletionOps };
}

it('routes the hook resume to preCompletionOps, not the background ops', async () => {
const { controller, preCompletionOps } = await reviveControllerInStep();
expect(controller.signal.aborted).toBe(false);

const store: StepContext = {
stepMetadata: {
stepName: 'aborter',
stepId: 'step_test',
stepStartedAt: new Date(),
attempt: 1,
},
workflowMetadata: {
workflowName: 'wf',
workflowRunId: 'wrun_test',
workflowStartedAt: new Date(),
features: { encryption: false },
},
ops: [],
preCompletionOps,
encryptionKey: undefined,
};

// abort() reads the step context at call time.
contextStorage.run(store, () => controller.abort('aborted from step'));

expect(controller.signal.aborted).toBe(true);
// The durable hook resume is queued for pre-completion draining.
expect(preCompletionOps.length).toBe(1);

// Draining preCompletionOps (what the step handler awaits before
// step_completed) actually fires the resume with the correct payload.
await Promise.all(preCompletionOps);
expect(mockResumeHook).toHaveBeenCalledTimes(1);
expect(mockResumeHook).toHaveBeenCalledWith('abrt_pre_completion', {
aborted: true,
reason: 'aborted from step',
});
});
});
167 changes: 167 additions & 0 deletions packages/core/src/abort-replay-ordering.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/**
* Regression test for the abort-signal replay-ordering flake.
*
* E2E `abortFromStepWorkflow: step abort cancels an in-flight sibling step`
* intermittently observed `stepSawAborted === false`: a workflow aborts a
* controller from one step, then — after the parallel work settles — passes
* `controller.signal` to a *subsequent* step (`checkSignalState`), which read
* `aborted: false`.
*
* Root cause: the workflow VM's controller is aborted when the events consumer
* processes the `hook_received` event, but `_setAborted` is deferred behind
* `await hydrateStepReturnValue(...)` (async reason decrypt/deserialize) on the
* promiseQueue. Unlike step-result and hook-payload deliveries, the abort
* delivery did NOT participate in `ctx.pendingDeliveries`, so `scheduleWhenIdle`
* — which the suspension handler uses to decide when to dehydrate queued step
* arguments — could fire while the abort was still in flight. The downstream
* step's `controller.signal` argument was then serialized with `aborted: false`.
* Because the hydration latency (decryption) varies run-to-run, the test flaked.
*
* The fix bumps `pendingDeliveries` around the abort delivery, holding the
* idle/suspension gate until `_setAborted` lands. These tests inject hydration
* latency that outlasts a macrotask, so a regression (no counter) is caught
* deterministically rather than depending on real decryption timing.
*/

import type { Event } from '@workflow/world';
import * as nanoid from 'nanoid';
import { monotonicFactory } from 'ulid';
import { describe, expect, it, vi } from 'vitest';
import { EventsConsumer } from './events-consumer.js';
import {
scheduleWhenIdle,
type WorkflowOrchestratorContext,
} from './private.js';
import { createContext } from './vm/index.js';
import { createCreateAbortController } from './workflow/abort-controller.js';

// Simulate the production reason-payload decryption gap: every abort reason
// hydration is delayed past a macrotask boundary. Only the read side is
// slowed — `dehydrateStepReturnValue` (used to build the test payload) keeps
// its real implementation via the spread of `actual`.
const HYDRATE_DELAY_MS = 50;
vi.mock('./serialization.js', async (importOriginal) => {
const actual = await importOriginal<typeof import('./serialization.js')>();
return {
...actual,
hydrateStepReturnValue: async (
...args: Parameters<typeof actual.hydrateStepReturnValue>
) => {
await new Promise((resolve) => setTimeout(resolve, HYDRATE_DELAY_MS));
return actual.hydrateStepReturnValue(...args);
},
};
});

// Imported after the mock declaration; vi.mock is hoisted so this still
// resolves to the mocked module.
const { dehydrateStepReturnValue } = await import('./serialization.js');

let ctx: WorkflowOrchestratorContext;

function setupWorkflowContext(events: Event[]): WorkflowOrchestratorContext {
const context = createContext({
seed: 'test-abort-replay-ordering',
fixedTimestamp: 1714857600000,
});
const ulid = monotonicFactory(() => context.globalThis.Math.random());
const workflowStartedAt = context.globalThis.Date.now();
return {
runId: 'wrun_test',
encryptionKey: undefined,
globalThis: context.globalThis,
eventsConsumer: new EventsConsumer(events, {
onUnconsumedEvent: () => {},
getPromiseQueue: () => ctx.promiseQueue,
}),
invocationsQueue: new Map(),
generateUlid: () => ulid(workflowStartedAt),
generateNanoid: nanoid.customRandom(nanoid.urlAlphabet, 21, (size) =>
new Uint8Array(size).map(() => 256 * context.globalThis.Math.random())
),
onWorkflowError: vi.fn(),
promiseQueue: Promise.resolve(),
pendingDeliveries: 0,
};
}

/**
* Probe a same-seeded context to discover the deterministic correlationId and
* token the abort hook will use, so we can author the matching hook_received
* event before the controller subscribes.
*/
function probeAbortHook(): { correlationId: string; token: string } {
const probeCtx = setupWorkflowContext([]);
const ProbeAbortController = createCreateAbortController(probeCtx);
new ProbeAbortController();
const item = [...probeCtx.invocationsQueue.values()].find(
(i) => i.type === 'hook'
);
if (!item || item.type !== 'hook') {
throw new Error('expected probe abort hook item');
}
return { correlationId: item.correlationId, token: item.token };
}

async function makeAbortReceipt(): Promise<Event> {
const { correlationId, token } = probeAbortHook();
const ops: Promise<unknown>[] = [];
const payload = await dehydrateStepReturnValue(
{ reason: 'aborted from step' },
'wrun_test',
undefined,
ops
);
return {
eventId: 'evnt_abort',
runId: 'wrun_test',
eventType: 'hook_received',
correlationId,
eventData: { token, payload },
createdAt: new Date(),
};
}

describe('abort signal replay ordering', () => {
it('holds scheduleWhenIdle until the abort reason hydration lands', async () => {
const receipt = await makeAbortReceipt();
ctx = setupWorkflowContext([receipt]);

const AbortController = createCreateAbortController(ctx);
const controller = new AbortController();

// scheduleWhenIdle is exactly what the suspension handler uses to gate
// dehydration of queued step arguments. Whatever `aborted` reads here is
// what a step dispatched right after the abort would serialize.
const captured = new Promise<boolean>((resolve) => {
scheduleWhenIdle(ctx, () => resolve(controller.signal.aborted));
});

// Pre-fix: the abort delivery was invisible to pendingDeliveries, so the
// idle gate fired before the ~50ms hydration completed and captured false.
await expect(captured).resolves.toBe(true);
expect(controller.signal.aborted).toBe(true);
expect(controller.signal.reason).toBe('aborted from step');
expect(ctx.pendingDeliveries).toBe(0);
});

it('counts the in-flight abort as a pending delivery while it hydrates', async () => {
const receipt = await makeAbortReceipt();
ctx = setupWorkflowContext([receipt]);

const AbortController = createCreateAbortController(ctx);
const controller = new AbortController();

// Let the events consumer's process.nextTick run so hook_received is
// consumed, but not long enough for the injected hydration delay to elapse.
await new Promise((resolve) => process.nextTick(resolve));

expect(ctx.pendingDeliveries).toBe(1);
expect(controller.signal.aborted).toBe(false);

// After the queue drains, the abort has landed and the counter is released.
await ctx.promiseQueue;
expect(ctx.pendingDeliveries).toBe(0);
expect(controller.signal.aborted).toBe(true);
});
});
6 changes: 4 additions & 2 deletions packages/core/src/private.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,10 @@ export interface WorkflowOrchestratorContext {
promiseQueue: Promise<void>;
/**
* Counter of in-flight async data delivery operations (step result
* hydration, hook payload hydration). Suspensions must wait for this
* to reach 0 before firing, to avoid preempting data delivery.
* hydration, hook payload hydration, abort signal hydration). Suspensions
* must wait for this to reach 0 before firing, to avoid preempting data
* delivery — e.g. dehydrating a step's arguments while an abort that should
* be reflected in those arguments is still hydrating its reason.
*/
pendingDeliveries: number;
/**
Expand Down
23 changes: 23 additions & 0 deletions packages/core/src/runtime/step-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,11 @@ export async function executeStep(
return { type: 'failed' };
}

// Ops that must be durably committed before step completion (e.g. a
// step-initiated abort's hook_received event). See StepContext. Declared
// outside the try so the failure path below can also drain them.
const preCompletionOps: Promise<void>[] = [];

try {
const attempt = step.attempt;

Expand Down Expand Up @@ -336,6 +341,7 @@ export async function executeStep(
},
workflowDeploymentId: params.workflowDeploymentId,
ops,
preCompletionOps,
closureVars: hydratedInput.closureVars,
encryptionKey,
},
Expand Down Expand Up @@ -402,6 +408,15 @@ export async function executeStep(
]);
}

// Commit must-be-durable ops (e.g. a step-initiated abort's
// hook_received event) before writing step_completed, so any workflow
// continuation triggered by that event observes the abort rather than
// racing it. These ops swallow their own errors, so awaiting only
// enforces ordering. See StepContext.preCompletionOps.
if (preCompletionOps.length > 0) {
await Promise.all(preCompletionOps);
}

// Create step_completed event
let stepCompleted409 = false;
await world.events
Expand Down Expand Up @@ -451,6 +466,14 @@ export async function executeStep(
// and queue a continuation so waitUntil can flush them.
return { type: 'completed', hasPendingOps: !opsSettled };
} catch (err: unknown) {
// Order any must-be-durable ops (e.g. a step-initiated abort's
// hook_received event) ahead of step_failed too — a step that aborts and
// then throws must still have the abort recorded before the failure
// continuation observes it. See StepContext.preCompletionOps.
if (preCompletionOps.length > 0) {
await Promise.all(preCompletionOps);
}

const effectiveErr = promoteAbortErrorToFatal(err);

const normalizedError = await normalizeUnknownError(effectiveErr);
Expand Down
Loading
Loading