Skip to content

Optimize and standardize the readSSEStream function handling #204

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

wangchaodeyuzhou
Copy link
Contributor

@wangchaodeyuzhou wangchaodeyuzhou commented Apr 24, 2025

Summary by CodeRabbit

  • Refactor
    • Improved the handling of Server-Sent Events (SSE) streams for more reliable and consistent event processing.
    • Unified event handling to use structured event types, simplifying event management and reducing potential errors.
    • Enhanced code consistency and maintainability by delegating SSE parsing to a dedicated helper function.
  • Chores
    • Updated the .gitignore file to exclude Visual Studio Code workspace settings from version control.

Copy link
Contributor

coderabbitai bot commented Apr 24, 2025

Walkthrough

The changes introduce a refactored approach to Server-Sent Events (SSE) stream handling in the client transport layer. A new helper file implements a structured SSE event parser and stream reader, replacing previous manual parsing logic in the SSE and StreamableHTTP client implementations. Method signatures are updated to use a unified event struct, and context handling is improved for cancellation support. Imports are streamlined to remove unused modules, and HTTP method constants are standardized. Additionally, the .gitignore file is updated to exclude Visual Studio Code settings.

Changes

File(s) Change Summary
.gitignore Added .vscode to the ignore list to exclude Visual Studio Code workspace settings from version control.
client/transport/sse.go Refactored SSE reading to use a new ReadSSEStream helper function; updated method signatures to accept context and use an event struct; standardized HTTP method constants; removed unused imports.
client/transport/sse_helper.go Introduced new file defining the sseEvent struct and the ReadSSEStream function for structured, line-oriented SSE stream parsing with context support and callback handling.
client/transport/streamable_http.go Updated SSE handling to use the new event struct and helper function; refactored handler signatures and removed manual parsing logic and unused imports.

📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0ecbcae and 43c273b.

📒 Files selected for processing (1)
  • client/transport/sse.go (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • client/transport/sse.go
✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🔭 Outside diff range comments (1)
server/server_test.go (1)

212-228: ⚠️ Potential issue

Invalid for … range <int> syntax – tests will not compile

range can only iterate over arrays, slices, maps, strings, and channels.
Statements like

for i := range 5 { … }
for range 10 { … }

are illegal and will cause a compile-time cannot range over <int> error, breaking the entire test suite.

Fix all integer loops by switching to a classic for i := 0; i < N; i++ { … } form (or create a helper slice if you really want range semantics):

- for i := range 5 {
+ for i := 0; i < 5; i++ {

- for range 10 {
+ for i := 0; i < 10; i++ {

Apply the change at every occurrence (lines 212–218, 221–228, 530–535).

Also applies to: 530-536

🧹 Nitpick comments (9)
mcp/types.go (1)

49-63: Well-defined notification method constants added.

The addition of notification method constants for resources, prompts, and tools list changes follows a consistent naming convention and includes proper documentation with URLs.

However, there's one inconsistency in the URL at line 62:

-// https://spec.modelcontextprotocol.io/specification/2024-11-05/server/tools/list_changed/
+// https://modelcontextprotocol.io/specification/2024-11-05/server/tools/list_changed/

This URL still uses the old domain pattern, while all others have been updated.

client/inprocess.go (2)

11-12: Consider supporting client options for more flexible configuration.

While the implementation is correct, this function doesn't pass any optional configuration options to NewClient, which accepts variadic ClientOption parameters. Consider modifying the signature to support these options.

-func NewInProcessClient(server *server.MCPServer) (*Client, error) {
-	inProcessTransport := transport.NewInProcessTransport(server)
-	return NewClient(inProcessTransport), nil
+func NewInProcessClient(server *server.MCPServer, options ...ClientOption) (*Client, error) {
+	inProcessTransport := transport.NewInProcessTransport(server)
+	return NewClient(inProcessTransport, options...), nil
}

11-12: Return value simplification opportunity.

The function signature returns an error, but it always returns nil. Consider either removing the error return or identifying potential error conditions that could occur during client creation.

-func NewInProcessClient(server *server.MCPServer) (*Client, error) {
-	inProcessTransport := transport.NewInProcessTransport(server)
-	return NewClient(inProcessTransport), nil
+func NewInProcessClient(server *server.MCPServer) *Client {
+	inProcessTransport := transport.NewInProcessTransport(server)
+	return NewClient(inProcessTransport)
}
client/inprocess_test.go (1)

85-139: Heavy duplication – extract a reusable initClient helper

Each sub-test repeats ~25 lines to create, start, initialise, and defer-close a client. Duplication makes tests harder to maintain and hides the intent of the individual scenarios.

+func newStartedClient(t *testing.T, srv *server.MCPServer) *MCPClient {
+    t.Helper()
+    client, err := NewInProcessClient(srv)
+    require.NoError(t, err)
+    require.NoError(t, client.Start(context.Background()))
+
+    initReq := mcp.InitializeRequest{
+        Params: mcp.InitializeParams{
+            ProtocolVersion: mcp.LATEST_PROTOCOL_VERSION,
+            ClientInfo: mcp.Implementation{Name: "test-client", Version: "1.0.0"},
+        },
+    }
+    _, err = client.Initialize(context.Background(), initReq)
+    require.NoError(t, err)
+    return client
+}

Then inside each sub-test:

cli := newStartedClient(t, mcpServer)
defer cli.Close()

This keeps the focus on the action under test and future changes (e.g. protocol version bumps) only need one edit.

client/transport/streamable_http.go (2)

206-229: Content-Type equality check too strict – breaks on ; charset=utf-8

Many servers (including Go’s net/http) return
Content-Type: text/event-stream; charset=utf-8.

An exact equality check fails to recognise this. Use strings.HasPrefix:

-import "text/event-stream"
+strings.HasPrefix(ct, "text/event-stream")

Same applies to application/json. This small tweak avoids subtle 415 errors when the server includes a charset parameter.


243-273: Potential goroutine leak if final JSON-RPC response never arrives

handleSSEResponse exits after receiving a message with a non-nil ID.
If the server streams notifications forever and never sends the final response
(allowed by spec for “fire-and-forget” requests), the goroutine and the
readSSE loop linger until the HTTP connection closes.

To be defensive, allow the caller to set a per-request timeout or add a guard:

select {
case response := <-responseChan:
    return response, nil
case <-ctx.Done():
    return nil, ctx.Err()
case <-time.After(maxStreamDuration):
    cancel()
    return nil, context.DeadlineExceeded
}

(Or document that the server must close the stream with a response.)

client/transport/sse.go (1)

218-218: Improved use of HTTP method constants.

Replacing string literals like "POST" with the constant http.MethodPost enhances code maintainability and reduces the risk of typos.

Consider also updating the HTTP method in line 80 to use http.MethodGet instead of the string literal "GET" for consistency.

-req, err := http.NewRequestWithContext(ctx, "GET", c.baseURL.String(), nil)
+req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL.String(), nil)

Also applies to: 296-296

client/transport/inprocess.go (2)

37-47: Consider adding context cancellation check.

The SendRequest method should respect context cancellation when calling HandleMessage and during the unmarshaling process.

 func (c *InProcessTransport) SendRequest(ctx context.Context, request JSONRPCRequest) (*JSONRPCResponse, error) {
 	requestBytes, err := json.Marshal(request)
 	if err != nil {
 		return nil, fmt.Errorf("failed to marshal request: %w", err)
 	}
 	requestBytes = append(requestBytes, '\n')
 
+	// Check if context is cancelled before proceeding
+	select {
+	case <-ctx.Done():
+		return nil, ctx.Err()
+	default:
+	}
+
 	respMessage := c.server.HandleMessage(ctx, requestBytes)
 	respByte, err := json.Marshal(respMessage)
 	if err != nil {
 		return nil, fmt.Errorf("failed to marshal response message: %w", err)
 	}
+
+	// Check context again before unmarshaling
+	select {
+	case <-ctx.Done():
+		return nil, ctx.Err()
+	default:
+	}
+
 	rpcResp := JSONRPCResponse{}
 	err = json.Unmarshal(respByte, &rpcResp)
 	if err != nil {
 		return nil, fmt.Errorf("failed to unmarshal response message: %w", err)
 	}

51-60: Consider adding context cancellation check in SendNotification.

Similar to the SendRequest method, the SendNotification should respect context cancellation before calling HandleMessage.

 func (c *InProcessTransport) SendNotification(ctx context.Context, notification mcp.JSONRPCNotification) error {
 	notificationBytes, err := json.Marshal(notification)
 	if err != nil {
 		return fmt.Errorf("failed to marshal notification: %w", err)
 	}
 	notificationBytes = append(notificationBytes, '\n')
+
+	// Check if context is cancelled before proceeding
+	select {
+	case <-ctx.Done():
+		return ctx.Err()
+	default:
+	}
+
 	c.server.HandleMessage(ctx, notificationBytes)
 
 	return nil
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between edda393 and 873706a.

📒 Files selected for processing (15)
  • .gitignore (1 hunks)
  • client/client.go (4 hunks)
  • client/inprocess.go (1 hunks)
  • client/inprocess_test.go (1 hunks)
  • client/stdio.go (1 hunks)
  • client/stdio_test.go (1 hunks)
  • client/transport/inprocess.go (1 hunks)
  • client/transport/sse.go (5 hunks)
  • client/transport/sse_helper.go (1 hunks)
  • client/transport/stdio.go (1 hunks)
  • client/transport/streamable_http.go (2 hunks)
  • mcp/types.go (2 hunks)
  • server/server.go (7 hunks)
  • server/server_test.go (6 hunks)
  • server/sse.go (2 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (7)
client/stdio.go (2)
client/client.go (1)
  • Client (16-25)
client/transport/stdio.go (1)
  • Stdio (20-34)
client/inprocess.go (2)
client/client.go (2)
  • Client (16-25)
  • NewClient (44-54)
client/transport/inprocess.go (1)
  • NewInProcessTransport (20-24)
client/stdio_test.go (1)
client/stdio.go (1)
  • GetStderr (34-43)
server/server.go (1)
mcp/types.go (5)
  • MethodNotificationResourcesListChanged (52-52)
  • MethodNotificationPromptsListChanged (58-58)
  • MethodNotificationToolsListChanged (62-62)
  • Params (118-118)
  • RESOURCE_NOT_FOUND (245-245)
client/transport/sse.go (2)
client/transport/sse_helper.go (2)
  • ReadSSEStream (17-60)
  • SSEEvent (11-14)
mcp/types.go (1)
  • JSONRPCNotification (206-209)
client/transport/streamable_http.go (2)
client/transport/sse_helper.go (2)
  • SSEEvent (11-14)
  • ReadSSEStream (17-60)
mcp/types.go (1)
  • JSONRPCNotification (206-209)
client/transport/inprocess.go (1)
mcp/types.go (1)
  • JSONRPCNotification (206-209)
🔇 Additional comments (34)
.gitignore (1)

3-4: Adding .vscode to .gitignore is a good practice.

Including .vscode alongside .idea ensures that both Visual Studio Code and JetBrains IDE settings remain local and don't get committed to version control.

mcp/types.go (10)

15-16: Correctly updated documentation URL.

The URL has been updated from "spec.modelcontextprotocol.io" to "modelcontextprotocol.io" which appears to be the current domain for the specification.


19-20: Correctly updated documentation URL.

The URL has been updated from "spec.modelcontextprotocol.io" to "modelcontextprotocol.io" which appears to be the current domain for the specification.


23-24: Correctly updated documentation URL.

The URL has been updated from "spec.modelcontextprotocol.io" to "modelcontextprotocol.io" which appears to be the current domain for the specification.


27-28: Correctly updated documentation URL.

The URL has been updated from "spec.modelcontextprotocol.io" to "modelcontextprotocol.io" which appears to be the current domain for the specification.


31-32: Correctly updated documentation URL.

The URL has been updated from "spec.modelcontextprotocol.io" to "modelcontextprotocol.io" which appears to be the current domain for the specification.


35-36: Correctly updated documentation URL.

The URL has been updated from "spec.modelcontextprotocol.io" to "modelcontextprotocol.io" which appears to be the current domain for the specification.


39-40: Correctly updated documentation URL.

The URL has been updated from "spec.modelcontextprotocol.io" to "modelcontextprotocol.io" which appears to be the current domain for the specification.


43-44: Correctly updated documentation URL.

The URL has been updated from "spec.modelcontextprotocol.io" to "modelcontextprotocol.io" which appears to be the current domain for the specification.


47-48: Correctly updated documentation URL.

The URL has been updated from "spec.modelcontextprotocol.io" to "modelcontextprotocol.io" which appears to be the current domain for the specification.


243-246: Added appropriate error code for resource not found scenario.

The addition of RESOURCE_NOT_FOUND error code with value -32002 is appropriate and follows the error code convention established for the MCP protocol.

client/transport/stdio.go (1)

217-220: Added nil check for robust error handling.

The addition of this nil check prevents potential runtime panics by ensuring the stdin pipe is initialized before attempting to use it. This is a good defensive programming practice that matches the existing check in the SendRequest method.

client/stdio_test.go (1)

50-56: Improved error handling in test goroutine.

The test now properly checks if GetStderr returns a valid stderr stream before attempting to use it. This aligns with the updated signature of GetStderr which now returns a boolean success flag in addition to the reader.

This change prevents potential runtime errors when the client transport is not of type *transport.Stdio.

client/inprocess.go (1)

8-12: Good implementation of the in-process client connection.

The NewInProcessClient function provides a clean way to directly connect to an MCP server in the same process without network overhead, which is excellent for testing or cases where network communication isn't needed.

client/stdio.go (1)

34-42: Excellent safety improvement for type assertion.

This change prevents potential panics by safely handling incompatible transport types. The updated function signature with the boolean return value provides better error handling and makes the API more robust.

server/sse.go (2)

182-183: Good simplification of the test server creation.

The code now passes options directly to NewSSEServer instead of applying them in a separate loop, making the implementation cleaner and more maintainable.


324-325: Essential improvement for resource cleanup.

Adding this case to handle session termination ensures the SSE handler exits promptly when the session is marked as done, preventing potential goroutine leaks and unnecessary processing.

server/server.go (8)

422-427: Good implementation of conditional resource change notifications.

This enhancement properly notifies initialized clients when resources change, but only if the server has declared the listChanged capability, following the notification protocol specification.


460-464: Consistent implementation of resource template change notifications.

This notification logic for resource templates follows the same pattern as for resources, maintaining consistency in the codebase while adhering to the protocol specification.


480-484: Good implementation of prompt list change notifications.

This change ensures clients are properly notified when the prompts list changes, contingent on server capabilities.


506-510: Properly implemented tool list change notifications.

This notification logic for tools changes follows the correct pattern, ensuring clients are informed when available tools are modified.


516-516: Good map capacity pre-allocation optimization.

Pre-allocating the map with the expected capacity is a small but effective optimization that can reduce memory reallocations.


529-533: Consistent notification handling for tool deletions.

This ensures clients are notified when tools are removed, maintaining consistency with the addition notification logic.


741-741: Good pre-allocation optimization for arguments map.

Pre-sizing the map based on the expected number of elements improves performance by reducing memory reallocations.


764-764: Appropriate error code update for resource not found scenarios.

Using the specific mcp.RESOURCE_NOT_FOUND error code provides more precise error information to clients compared to a generic error.

client/transport/sse.go (3)

126-129: Good refactoring to extract SSE stream reading logic.

The readSSE method has been simplified by delegating the complex parsing logic to the ReadSSEStream helper function. This improves separation of concerns and makes the code more maintainable.


134-135: Clean implementation of the SSE event handling.

The change to use a structured SSEEvent type instead of separate parameters makes the code more robust and easier to understand. The event data is now accessed through the struct fields, which provides better encapsulation.


105-105: Correct update to pass context to readSSE.

The call to readSSE has been updated to include the context parameter, ensuring proper propagation of cancellation signals and deadlines.

client/client.go (5)

19-25: Well-structured client fields with clear separation of capabilities.

The renamed serverCapabilities field and the addition of clientCapabilities provide a clear distinction between the client's and server's capabilities. This makes the code more maintainable and easier to understand.


27-34: Good implementation of functional options pattern.

The ClientOption type and WithClientCapabilities function provide a clean and extensible way to configure the client. This pattern allows for flexible initialization and can be easily extended with additional options in the future.


44-54: Enhanced client constructor with optional configuration.

The updated NewClient function now accepts and applies functional options, which improves the API's flexibility while maintaining backward compatibility.


151-153: Clear update to store server capabilities.

The comment and assignment are now properly aligned to reflect the name change from capabilities to serverCapabilities, which enhances code readability.


426-434: Good addition of capability getter methods.

The new GetServerCapabilities and GetClientCapabilities methods provide a clean interface for accessing the capabilities, following good encapsulation practices.

client/transport/inprocess.go (1)

1-71: Well-designed in-process transport implementation.

This new transport enables direct, network-free communication with an MCP server instance in the same process. The implementation follows the Interface contract correctly and provides thread-safe notification handling.

Key strengths:

  • Efficient direct server communication
  • Proper error handling during marshaling/unmarshaling
  • Thread-safe notification handling with mutex protection
  • Clean and minimal implementation of required interfaces

Comment on lines 62 to 66
func (c *InProcessTransport) SetNotificationHandler(handler func(notification mcp.JSONRPCNotification)) {
c.notifyMu.Lock()
defer c.notifyMu.Unlock()
c.onNotification = handler
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handler function is set but never called.

The SetNotificationHandler method sets the notification handler, but there's no code in this transport implementation that calls this handler when notifications are received from the server.

You need to implement a mechanism to receive notifications from the server and call the registered handler. Consider adding a method that the server can call to push notifications to the transport, or implement a notification polling mechanism.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🔭 Outside diff range comments (1)
client/client.go (1)

134-139: 💡 Verification agent

🧩 Analysis chain

Potentially breaking wire-format change – please verify server expectations

Changing the JSON tag from "capabilities" to "serverCapabilities" alters the field name that the server receives.
If the server side has not been updated in lock-step, initialization will silently fail with a “missing required field” error.

Run a quick grep against the server to confirm which key it expects, e.g.:

If the server still expects capabilities, either:

-        Capabilities    mcp.ClientCapabilities `json:"serverCapabilities"`
+        Capabilities    mcp.ClientCapabilities `json:"capabilities"`

or introduce two fields (one omitted) until both ends are migrated.
Let me know if you’d like a concrete compatibility patch.


🏁 Script executed:

#!/bin/bash
# Look for occurrences of "capabilities" and "serverCapabilities" in server-side Go files
rg -n '"capabilities"\s*:`' -A2
rg -n '"serverCapabilities"\s*:`' -A2

Length of output: 73


🏁 Script executed:

#!/bin/bash
# Search for JSON tags for capabilities in server-side Go files
rg -n 'json:"capabilities' -A2
rg -n 'json:"serverCapabilities' -A2

Length of output: 680


Fix JSON tag on Capabilities to match server expectations

The server’s types (mcp/types.go:291,304) still use json:"capabilities". Using json:"serverCapabilities" on the client will cause the server to see a missing required field.

Please update the client struct in client/client.go:

--- client/client.go
@@ Lines 134-139
-       Capabilities    mcp.ClientCapabilities `json:"serverCapabilities"`
+       Capabilities    mcp.ClientCapabilities `json:"capabilities"`
🧹 Nitpick comments (1)
client/transport/sse.go (1)

224-235: Guarantee response-map cleanup with defer

deleteResponseChan() is invoked on several error paths, but not if httpClient.Do succeeds and later
returns a non-2xx status (resp.StatusCode check).
Placing the cleanup in a defer right after the map insertion removes this subtlety:

-// Register response channel
-responseChan := make(chan *JSONRPCResponse, 1)
-c.mu.Lock()
-c.responses[request.ID] = responseChan
-c.mu.Unlock()
-deleteResponseChan := func() {
+responseChan := make(chan *JSONRPCResponse, 1)
+c.mu.Lock()
+c.responses[request.ID] = responseChan
+c.mu.Unlock()
+defer func() {                    // ensure map entry is always removed
     c.mu.Lock()
     delete(c.responses, request.ID)
     c.mu.Unlock()
-}
+}()

Reduces the risk of leaking entries on new future code paths.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 082d2e3 and 3b9f746.

📒 Files selected for processing (5)
  • client/client.go (1 hunks)
  • client/interface.go (1 hunks)
  • client/transport/sse.go (5 hunks)
  • mcp/types.go (1 hunks)
  • server/server.go (2 hunks)
✅ Files skipped from review due to trivial changes (3)
  • mcp/types.go
  • client/interface.go
  • server/server.go
🧰 Additional context used
🧬 Code Graph Analysis (2)
client/client.go (1)
mcp/types.go (1)
  • ClientCapabilities (323-333)
client/transport/sse.go (4)
client/transport/sse_helper.go (1)
  • ReadSSEStream (17-61)
mcp/types.go (1)
  • JSONRPCNotification (206-209)
client/transport/interface.go (1)
  • JSONRPCResponse (36-45)
testdata/mockstdio_server.go (1)
  • JSONRPCResponse (18-26)
🪛 GitHub Check: test
client/transport/sse.go

[failure] 134-134:
undefined: SSEEvent

🪛 GitHub Actions: go
client/transport/sse.go

[error] 134-134: undefined: SSEEvent

🔇 Additional comments (1)
client/transport/sse.go (1)

105-108: Context propagation looks good

Passing ctx into readSSE enables fast shutdown via cancelSSEStream().
Nice improvement over the previous infinite read loop. 👍

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🔭 Outside diff range comments (1)
client/transport/sse.go (1)

134-148: 🛠️ Refactor suggestion

⚠️ Potential issue

Guard against double-closing endpointChan

If the server (or a reconnect) accidentally emits more than one endpoint event, the second call to
close(c.endpointChan) will panic and bring the whole client down.
A very small race window also exists between the time the channel is closed and code that is still selecting
on it (e.g. Start)—a second close would crash before the started flag is set, making the failure hard to reproduce.

A simple sync.Once or an atomic flag avoids this class of bugs while preserving the one-shot semantics:

@@ func (c *SSE) handleSSEEvent(evt sseEvent) {
  case "endpoint":
+     // Ensure the endpoint channel is closed only once.
+     var closed bool
+     if !c.started.Load() { // fast path – guard common case without lock
+         closed = false
+     }
+
      endpoint, err := c.baseURL.Parse(evt.data)
      if err != nil {
          fmt.Printf("Error parsing endpoint URL: %v\n", err)
          return
      }
@@
-     c.endpoint = endpoint
-     close(c.endpointChan)
+     c.endpoint = endpoint
+
+     // Close safely.
+     if !closed {
+         select {
+         case <-c.endpointChan: // already closed
+         default:
+             close(c.endpointChan)
+         }
+     }

(Using sync.Once at struct level is even cleaner.)

🧹 Nitpick comments (3)
client/transport/sse.go (3)

151-160: Avoid double unmarshal of the same JSON blob

evt.data is unmarshalled twice: first into baseMessage, then again into
mcp.JSONRPCNotification when baseMessage.ID == nil.
Parsing once into a generic struct (or using json.RawMessage inside
baseMessage) removes redundant work and GC pressure:

-        var baseMessage JSONRPCResponse
-        if err := json.Unmarshal([]byte(evt.data), &baseMessage); err != nil {
+        var baseMessage struct {
+            JSONRPC string           `json:"jsonrpc"`
+            ID      *int64           `json:"id"`
+            Method  string           `json:"method,omitempty"`
+            Params  json.RawMessage  `json:"params,omitempty"`
+            Result  json.RawMessage  `json:"result,omitempty"`
+            Error   json.RawMessage  `json:"error,omitempty"`
+        }
+        if err := json.Unmarshal([]byte(evt.data), &baseMessage); err != nil {
             fmt.Printf("Error unmarshaling message: %v\n", err)
             return
         }
@@
-            var notification mcp.JSONRPCNotification
-            if err := json.Unmarshal([]byte(evt.data), &notification); err != nil {
-                return
-            }
+            var notification mcp.JSONRPCNotification
+            if err := json.Unmarshal(evt.data, &notification); err != nil {
+                return
+            }

This keeps the hot path tight while still yielding correctly-typed notifications.


206-234: Minor race-window: register response channel after ensuring HTTP request will be attempted

The response channel is added to c.responses before the HTTP request is issued.
If the request creation (http.NewRequestWithContext) fails, the tidy-up path does
not remove the channel, leaving an entry that will never be fulfilled or cleaned until
Close() is called.

A tiny rearrangement eliminates the leak:

-// Marshal request
-requestBytes, err := json.Marshal(request)
-...
-// Create HTTP request
-req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.endpoint.String(), bytes.NewReader(requestBytes))
-...
-// Register response channel
-responseChan := make(chan *JSONRPCResponse, 1)
-c.mu.Lock()
-c.responses[request.ID] = responseChan
-c.mu.Unlock()
+// Marshal request
+requestBytes, err := json.Marshal(request)
+...
+// Create HTTP request **first**
+req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.endpoint.String(), bytes.NewReader(requestBytes))
+...
+// Only register the response channel if we have a valid *req*
+responseChan := make(chan *JSONRPCResponse, 1)
+c.mu.Lock()
+c.responses[request.ID] = responseChan
+c.mu.Unlock()

It’s a one-liner change but prevents a leaked map entry on early failure.


105-108: Consider handling premature readSSE exit to trigger reconnect or shutdown

readSSE is started in its own goroutine, but if the helper returns an error
(e.g. network drop) the goroutine silently exits after printing to stdout.
Depending on the desired UX you might want to:

  1. attempt an automatic reconnect, or
  2. propagate the error to the caller via a channel / callback so higher layers can decide.

Right now, the transport stays in a half-open state (started is true, but the
stream is gone) and subsequent SendRequest calls will stall indefinitely.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3b9f746 and 3b8139d.

📒 Files selected for processing (1)
  • client/transport/sse.go (5 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
client/transport/sse.go (4)
client/transport/sse_helper.go (1)
  • ReadSSEStream (17-61)
mcp/types.go (1)
  • JSONRPCNotification (206-209)
client/transport/interface.go (1)
  • JSONRPCResponse (36-45)
testdata/mockstdio_server.go (1)
  • JSONRPCResponse (18-26)

}
}(reader)

scanner := bufio.NewScanner(reader)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scanner has a MaxScanTokenSize (64K). I may prefer the original ReadString as it's robuster

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. i have changed

scanner has a MaxScanTokenSize (64K). I may prefer the original ReadString as it's robuster

@@ -245,20 +243,20 @@ func (c *StreamableHTTP) handleSSEResponse(ctx context.Context, reader io.ReadCl
// only close responseChan after readingSSE()
defer close(responseChan)

c.readSSE(ctx, reader, func(event, data string) {
c.readSSE(ctx, reader, func(evt sseEvent) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is omitting two letters really necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to unify readSSEStream

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

emm, I think sseEvent is more scalable

Is omitting two letters really necessary?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean evt -> event

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean evt -> event
I know, I mean
Using a sseEvent struct instead of two separate parameters (event, data) is intentional for scalability, clarity, and future extensibility. Here’s why:

  1. Extensibility:
    If we want to support additional SSE fields such as id, retry, or timestamp in the future, we can easily add them into the struct without changing the function signature or breaking existing code.

  2. Optional Fields:
    Structs naturally support optional fields — meaning we can selectively fill only the necessary data for each event without forcing all parameters to be provided. This provides flexibility and keeps the event handling code clean.

  3. Readability:
    Grouping related fields into a struct improves readability and reduces the risk of mistakes like wrong parameter ordering. For example:

    onEvent(sseEvent{event: "message", data: "hello"})

    is much clearer than:

    onEvent("message", "hello")
  4. Consistency:
    It aligns with the design of ReadSSEStream and keeps the API surface consistent across all SSE-related functions, making the codebase easier to maintain and reason about.

Although event is just two letters longer than evt, using clear naming and structured types greatly improves long-term maintainability and collaboration.

@wangchaodeyuzhou wangchaodeyuzhou requested a review from leavez April 27, 2025 03:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants