Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/graceful-transport-close.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@modelcontextprotocol/client': patch
---

Wait for in-flight requests to complete before aborting on close(), preventing Undici/OpenTelemetry from marking successful responses as aborted.
19 changes: 19 additions & 0 deletions packages/client/src/client/sse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ export class SSEClientTransport implements Transport {
private _fetch?: FetchLike;
private _fetchWithInit: FetchLike;
private _protocolVersion?: string;
private _pendingRequests = new Set<Promise<void>>();

onclose?: () => void;
onerror?: (error: Error) => void;
Expand Down Expand Up @@ -238,6 +239,14 @@ export class SSEClientTransport implements Transport {
}

async close(): Promise<void> {
// Wait for in-flight requests to complete before aborting to prevent
// Undici/OpenTelemetry from marking successful responses as aborted.
// Uses a timeout to avoid hanging if a request is stuck.
if (this._pendingRequests.size > 0) {
const timeout = new Promise<void>(resolve => setTimeout(resolve, 2000));
await Promise.race([Promise.allSettled(this._pendingRequests), timeout]);
}

this._abortController?.abort();
this._eventSource?.close();
this.onclose?.();
Expand All @@ -248,6 +257,13 @@ export class SSEClientTransport implements Transport {
throw new SdkError(SdkErrorCode.NotConnected, 'Not connected');
}

// Track this request so close() can wait for it to finish
let resolve: () => void;
const requestPromise = new Promise<void>(r => {
resolve = r;
});
this._pendingRequests.add(requestPromise);

try {
const headers = await this._commonHeaders();
headers.set('content-type', 'application/json');
Expand Down Expand Up @@ -290,6 +306,9 @@ export class SSEClientTransport implements Transport {
} catch (error) {
this.onerror?.(error as Error);
throw error;
} finally {
resolve!();
this._pendingRequests.delete(requestPromise);
}
}

Expand Down
20 changes: 20 additions & 0 deletions packages/client/src/client/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ export class StreamableHTTPClientTransport implements Transport {
private _lastUpscopingHeader?: string; // Track last upscoping header to prevent infinite upscoping.
private _serverRetryMs?: number; // Server-provided retry delay from SSE retry field
private _reconnectionTimeout?: ReturnType<typeof setTimeout>;
private _pendingRequests = new Set<Promise<void>>();

onclose?: () => void;
onerror?: (error: Error) => void;
Expand Down Expand Up @@ -451,6 +452,15 @@ export class StreamableHTTPClientTransport implements Transport {
clearTimeout(this._reconnectionTimeout);
this._reconnectionTimeout = undefined;
}

// Wait for in-flight requests to complete before aborting to prevent
// Undici/OpenTelemetry from marking successful responses as aborted.
// Uses a timeout to avoid hanging if a request is stuck.
if (this._pendingRequests.size > 0) {
const timeout = new Promise<void>(resolve => setTimeout(resolve, 2000));
await Promise.race([Promise.allSettled(this._pendingRequests), timeout]);
}

this._abortController?.abort();
this.onclose?.();
}
Expand All @@ -459,6 +469,13 @@ export class StreamableHTTPClientTransport implements Transport {
message: JSONRPCMessage | JSONRPCMessage[],
options?: { resumptionToken?: string; onresumptiontoken?: (token: string) => void }
): Promise<void> {
// Track this request so close() can wait for it to finish
let resolve: () => void;
const requestPromise = new Promise<void>(r => {
resolve = r;
});
this._pendingRequests.add(requestPromise);

try {
const { resumptionToken, onresumptiontoken } = options || {};

Expand Down Expand Up @@ -620,6 +637,9 @@ export class StreamableHTTPClientTransport implements Transport {
} catch (error) {
this.onerror?.(error as Error);
throw error;
} finally {
resolve!();
this._pendingRequests.delete(requestPromise);
}
}

Expand Down
83 changes: 83 additions & 0 deletions packages/client/test/client/streamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1659,4 +1659,87 @@ describe('StreamableHTTPClientTransport', () => {
});
});
});

describe('graceful close', () => {
it('should wait for in-flight requests before aborting', async () => {
await transport.start();

// Track when abort is called
const abortController = (transport as any)._abortController as AbortController;
let abortCalledAt = 0;
const origAbort = abortController.abort.bind(abortController);
abortController.abort = (...args: any[]) => {
abortCalledAt = Date.now();
return origAbort(...args);
};

// Create a slow fetch that takes 100ms to resolve
let fetchResolvedAt = 0;
(globalThis.fetch as Mock).mockImplementationOnce(() => {
return new Promise(resolve => {
setTimeout(() => {
fetchResolvedAt = Date.now();
resolve({
ok: true,
status: 202,
headers: new Headers(),
text: () => Promise.resolve('')
});
}, 100);
});
});

const message: JSONRPCMessage = {
jsonrpc: '2.0',
method: 'test',
params: {},
id: 'test-graceful'
};

// Start send (don't await) and immediately close
const sendPromise = transport.send(message);
const closePromise = transport.close();

await Promise.allSettled([sendPromise, closePromise]);

// abort() should have been called AFTER the fetch resolved
expect(fetchResolvedAt).toBeGreaterThan(0);
expect(abortCalledAt).toBeGreaterThanOrEqual(fetchResolvedAt);
});

it('should abort after timeout even if requests are still pending', async () => {
await transport.start();

// Create a fetch that never resolves on its own, but rejects on abort
(globalThis.fetch as Mock).mockImplementationOnce((_url: string, init?: RequestInit) => {
return new Promise((_resolve, reject) => {
if (init?.signal) {
init.signal.addEventListener('abort', () => {
reject(new DOMException('The operation was aborted', 'AbortError'));
});
}
});
});

const message: JSONRPCMessage = {
jsonrpc: '2.0',
method: 'test',
params: {},
id: 'test-timeout'
};

// Start send (don't await) and immediately close
const sendPromise = transport.send(message).catch(() => {});

const start = Date.now();
await transport.close();
const elapsed = Date.now() - start;

// close() should complete within the 2s timeout (with some margin)
expect(elapsed).toBeLessThan(3000);
expect(elapsed).toBeGreaterThanOrEqual(1900);

await sendPromise;
}, 10000);
});
});
Loading