Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions packages/core/src/reconnecting-framed-stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,53 @@ describe('createReconnectingFramedStream', () => {
expect(calls).toEqual([0, 2]);
});

it('retries the reopen itself against the budget instead of failing fatally', async () => {
// After 2 frames the connection drops (read error → reconnect at index 2).
// The first *reopen* attempt also fails — the server is briefly
// unavailable during the reconnect window. That transient failure of the
// reopen is the exact blip this wrapper exists to survive, so it must be
// counted against the budget and retried, not treated as fatal. The
// second reopen succeeds and the stream completes.
const calls: number[] = [];
let reopenAttempts = 0;
const world = {
streams: {
get: vi.fn(
async (_runId: string, _name: string, startIndex?: number) => {
const idx = startIndex ?? 0;
calls.push(idx);
if (idx === 0) {
return scriptedStream([
{ kind: 'value', value: payloadFrame(1) },
{ kind: 'value', value: payloadFrame(2) },
{ kind: 'error', err: new Error('max-duration abort') },
]);
}
// Reopen at index 2: throw on the first attempt, succeed on the next.
reopenAttempts++;
if (reopenAttempts === 1) {
throw new Error('reopen failed: server briefly unavailable');
}
return scriptedStream([
{ kind: 'value', value: payloadFrame(3) },
{ kind: 'close' },
]);
}
),
},
} as unknown as World;
setWorld(world);

const stream = createReconnectingFramedStream(RUN_ID, 's', 0);
const chunks = await readAll(stream);

// The failed reopen did not surface to the consumer; the stream recovered.
expect(chunks).toEqual([payloadFrame(1), payloadFrame(2), payloadFrame(3)]);
// index 0 once, then index 2 twice — the failed reopen and the retry both
// resume from the same position.
expect(calls).toEqual([0, 2, 2]);
});

it('respects an initial non-zero startIndex on reconnect', async () => {
const { world, calls } = makeWorldWithScriptedStreams({
10: () =>
Expand Down
44 changes: 31 additions & 13 deletions packages/core/src/serialization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -460,26 +460,44 @@ export function createReconnectingFramedStream(
}

async function reconnect(): Promise<void> {
reconnectCount++;
totalReconnectCount++;
if (reconnectCount > FRAMED_STREAM_MAX_RECONNECTS) {
throw new Error(
`Stream "${name}" exceeded maximum reconnection attempts (${FRAMED_STREAM_MAX_RECONNECTS})`
);
}
if (totalReconnectCount > FRAMED_STREAM_MAX_TOTAL_RECONNECTS) {
throw new Error(
`Stream "${name}" exceeded maximum total reconnection attempts (${FRAMED_STREAM_MAX_TOTAL_RECONNECTS})`
);
}
if (reader) {
await reader.cancel().catch(() => {});
reader = undefined;
}
// Advance the resume position past the frames already delivered, then
// drop any partial-frame bytes — the reopened connection re-sends from a
// frame boundary at the new index.
currentStartIndex += consumedFrames;
consumedFrames = 0;
buffer = new Uint8Array(0);
await connect();

// Retry the reopen itself against the reconnect budget. A transient
// failure of connect() — the server briefly unavailable during the
// reconnect window — is the exact blip this wrapper exists to survive, so
// count it against the budget and try again rather than treating it as
// fatal. Only budget exhaustion (a server that stays down) terminates the
// stream.
for (;;) {
reconnectCount++;
totalReconnectCount++;
if (reconnectCount > FRAMED_STREAM_MAX_RECONNECTS) {
throw new Error(
`Stream "${name}" exceeded maximum reconnection attempts (${FRAMED_STREAM_MAX_RECONNECTS})`
);
}
if (totalReconnectCount > FRAMED_STREAM_MAX_TOTAL_RECONNECTS) {
throw new Error(
`Stream "${name}" exceeded maximum total reconnection attempts (${FRAMED_STREAM_MAX_TOTAL_RECONNECTS})`
);
}
try {
await connect();
return;
} catch {
// Reopen failed transiently; loop to retry, counting against the
// budget so a server that never recovers still terminates the stream.
}
}
}

return new ReadableStream<Uint8Array>({
Expand Down
Loading