Skip to content

Commit de49007

Browse files
committed
Allow disabling local SSE to allow for pub/sub
1 parent c71c55b commit de49007

File tree

2 files changed

+38
-5
lines changed

2 files changed

+38
-5
lines changed

src/server/streamableHttp.ts

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import { IncomingMessage, ServerResponse } from "node:http";
22
import { Transport } from "../shared/transport.js";
3-
import { MessageExtraInfo, RequestInfo, isInitializeRequest, isJSONRPCError, isJSONRPCRequest, isJSONRPCResponse, JSONRPCMessage, JSONRPCMessageSchema, RequestId, SUPPORTED_PROTOCOL_VERSIONS, DEFAULT_NEGOTIATED_PROTOCOL_VERSION } from "../types.js";
3+
import { MessageExtraInfo, RequestInfo, isInitializeRequest, isJSONRPCError, isJSONRPCRequest, isJSONRPCResponse, JSONRPCMessage, JSONRPCMessageSchema, RequestId, ServerEvents, SUPPORTED_PROTOCOL_VERSIONS, DEFAULT_NEGOTIATED_PROTOCOL_VERSION, EventListener } from "../types.js";
44
import getRawBody from "raw-body";
55
import contentType from "content-type";
66
import { randomUUID } from "node:crypto";
77
import { AuthInfo } from "./auth/types.js";
8+
import { EventEmitter } from "node:events";
89

910
const MAXIMUM_MESSAGE_SIZE = "4mb";
1011

@@ -46,6 +47,13 @@ export interface StreamableHTTPServerTransportOptions {
4647
*/
4748
sessionIdGenerator: (() => string) | undefined;
4849

50+
/**
51+
* Disabling local SSE means that the transport will not automatically send SSE messages to the local handler.
52+
* You need to implement your own pub/sub mechanism to handle SSE messages by listening to the `responseSse` event
53+
* and emitting them to the `sse` for any events that match the transport sessionId
54+
*/
55+
disableLocalSse?: boolean;
56+
4957
/**
5058
* A callback for session initialization events
5159
* This is called when the server initializes a new session.
@@ -149,6 +157,8 @@ export class StreamableHTTPServerTransport implements Transport {
149157
private _allowedHosts?: string[];
150158
private _allowedOrigins?: string[];
151159
private _enableDnsRebindingProtection: boolean;
160+
readonly events = new EventEmitter<ServerEvents>();
161+
152162

153163
sessionId?: string;
154164
onclose?: () => void;
@@ -168,6 +178,10 @@ export class StreamableHTTPServerTransport implements Transport {
168178
this.sessionId = options.sessionId;
169179
this._initialized = true; // Assume initialized if session ID is provided
170180
}
181+
if (!options.disableLocalSse) {
182+
// If we are not disabling local SSE, we pipe see responses to the local handler
183+
this.events.on('responseSse', (data) => this.events.emit('sse', data));
184+
}
171185
}
172186

173187
/**
@@ -307,11 +321,19 @@ export class StreamableHTTPServerTransport implements Transport {
307321
// otherwise the client will just wait for the first message
308322
res.writeHead(200, headers).flushHeaders();
309323

310-
// Assign the response to the standalone SSE stream
324+
// Write any message matching the sessionId to the SSE stream
325+
const listener: EventListener<ServerEvents['sse']> = ({ sessionId, message, eventId }) => {
326+
if (sessionId === this.sessionId) {
327+
this.writeSSEEvent(res, message, eventId);
328+
}
329+
};
330+
this.events.on('sse', listener);
331+
311332
this._streamMapping.set(this._standaloneSseStreamId, res);
312333
// Set up close handler for client disconnects
313334
res.on("close", () => {
314335
this._streamMapping.delete(this._standaloneSseStreamId);
336+
this.events.removeListener('sse', listener);
315337
});
316338
}
317339

@@ -469,7 +491,7 @@ export class StreamableHTTPServerTransport implements Transport {
469491

470492
// If we have a session ID and an onsessioninitialized handler, call it immediately
471493
// This is needed in cases where the server needs to keep track of multiple sessions
472-
if (this.sessionId&& this._onsessioninitialized) {
494+
if (this.sessionId && this._onsessioninitialized) {
473495
await Promise.resolve(this._onsessioninitialized(this.sessionId));
474496
}
475497

@@ -690,8 +712,11 @@ export class StreamableHTTPServerTransport implements Transport {
690712
eventId = await this._eventStore.storeEvent(this._standaloneSseStreamId, message);
691713
}
692714

693-
// Send the message to the standalone SSE stream
694-
this.writeSSEEvent(standaloneSse, message, eventId);
715+
// We emit the responseSse event. If disableLocalSse is set to true this message will not be automatically sent to any local SSE handler
716+
// You can listen for the responseSse and emit to a pub/sub. You can also listen for sse events from pubs/sub and emit them
717+
// to events['sse'] to send them to the local SSE handler.
718+
this.events.emit('responseSse',{sessionId: this.sessionId, message, eventId});
719+
695720
return;
696721
}
697722

src/types.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1623,3 +1623,11 @@ export type ClientResult = Infer<typeof ClientResultSchema>;
16231623
export type ServerRequest = Infer<typeof ServerRequestSchema>;
16241624
export type ServerNotification = Infer<typeof ServerNotificationSchema>;
16251625
export type ServerResult = Infer<typeof ServerResultSchema>;
1626+
1627+
/* Server events */
1628+
export type ServerEvents = {
1629+
responseSse: [{ sessionId?: string, message: JSONRPCMessage, eventId?: string }];
1630+
sse: [{ sessionId?: string, message: JSONRPCMessage, eventId?: string }];
1631+
}
1632+
1633+
export type EventListener<K extends any[]> = (...args: K) => void;

0 commit comments

Comments
 (0)