Skip to content

Commit 4937fc1

Browse files
committed
fix: improve compatibility with non-compliant MCP servers (#413)
Addresses issues with servers like Shopify that violate MCP/HTTP specs: - Prioritize application/json in Accept headers to fix content-type issues - Handle non-compliant notification non-empty responses - Add status code validation and null safety improvements Resolves #406 Signed-off-by: Christian Tzolov <[email protected]>
1 parent 1e93776 commit 4937fc1

File tree

4 files changed

+37
-10
lines changed

4 files changed

+37
-10
lines changed

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.modelcontextprotocol.spec.McpTransportSessionNotFoundException;
3131
import io.modelcontextprotocol.spec.McpTransportStream;
3232
import io.modelcontextprotocol.util.Assert;
33+
import io.modelcontextprotocol.util.Utils;
3334
import reactor.core.Disposable;
3435
import reactor.core.publisher.Flux;
3536
import reactor.core.publisher.Mono;
@@ -244,7 +245,7 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
244245

245246
Disposable connection = webClient.post()
246247
.uri(this.endpoint)
247-
.accept(MediaType.TEXT_EVENT_STREAM, MediaType.APPLICATION_JSON)
248+
.accept(MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM)
248249
.headers(httpHeaders -> {
249250
transportSession.sessionId().ifPresent(id -> httpHeaders.add("mcp-session-id", id));
250251
})
@@ -287,7 +288,7 @@ else if (mediaType.isCompatibleWith(MediaType.APPLICATION_JSON)) {
287288
logger.trace("Received response to POST for session {}", sessionRepresentation);
288289
// communicate to caller the message was delivered
289290
sink.success();
290-
return responseFlux(response);
291+
return directResponseFlux(message, response);
291292
}
292293
else {
293294
logger.warn("Unknown media type {} returned for POST in session {}", contentType,
@@ -384,14 +385,22 @@ private static String sessionIdOrPlaceholder(McpTransportSession<?> transportSes
384385
return transportSession.sessionId().orElse("[missing_session_id]");
385386
}
386387

387-
private Flux<McpSchema.JSONRPCMessage> responseFlux(ClientResponse response) {
388+
private Flux<McpSchema.JSONRPCMessage> directResponseFlux(McpSchema.JSONRPCMessage sentMessage,
389+
ClientResponse response) {
388390
return response.bodyToMono(String.class).<Iterable<McpSchema.JSONRPCMessage>>handle((responseMessage, s) -> {
389391
try {
390-
McpSchema.JSONRPCMessage jsonRpcResponse = McpSchema.deserializeJsonRpcMessage(objectMapper,
391-
responseMessage);
392-
s.next(List.of(jsonRpcResponse));
392+
if (sentMessage instanceof McpSchema.JSONRPCNotification && Utils.hasText(responseMessage)) {
393+
logger.warn("Notification: {} received non-compliant response: {}", sentMessage, responseMessage);
394+
s.complete();
395+
}
396+
else {
397+
McpSchema.JSONRPCMessage jsonRpcResponse = McpSchema.deserializeJsonRpcMessage(objectMapper,
398+
responseMessage);
399+
s.next(List.of(jsonRpcResponse));
400+
}
393401
}
394402
catch (IOException e) {
403+
// TODO: this should be a McpTransportError
395404
s.error(e);
396405
}
397406
}).flatMapIterable(Function.identity());

mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -342,9 +342,9 @@ public String toString(McpSchema.JSONRPCMessage message) {
342342
}
343343
}
344344

345-
public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sendMessage) {
345+
public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
346346
return Mono.create(messageSink -> {
347-
logger.debug("Sending message {}", sendMessage);
347+
logger.debug("Sending message {}", sentMessage);
348348

349349
final AtomicReference<Disposable> disposableRef = new AtomicReference<>();
350350
final McpTransportSession<Disposable> transportSession = this.activeSession.get();
@@ -355,10 +355,10 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sendMessage) {
355355
requestBuilder = requestBuilder.header("mcp-session-id", transportSession.sessionId().get());
356356
}
357357

358-
String jsonBody = this.toString(sendMessage);
358+
String jsonBody = this.toString(sentMessage);
359359

360360
HttpRequest request = requestBuilder.uri(Utils.resolveUri(this.baseUri, this.endpoint))
361-
.header("Accept", TEXT_EVENT_STREAM + ", " + APPLICATION_JSON)
361+
.header("Accept", APPLICATION_JSON + ", " + TEXT_EVENT_STREAM)
362362
.header("Content-Type", APPLICATION_JSON)
363363
.header("Cache-Control", "no-cache")
364364
.POST(HttpRequest.BodyPublishers.ofString(jsonBody))
@@ -436,10 +436,16 @@ else if (contentType.contains(TEXT_EVENT_STREAM)) {
436436
else if (contentType.contains(APPLICATION_JSON)) {
437437
messageSink.success();
438438
String data = ((ResponseSubscribers.AggregateResponseEvent) responseEvent).data();
439+
if (sentMessage instanceof McpSchema.JSONRPCNotification && Utils.hasText(data)) {
440+
logger.warn("Notification: {} received non-compliant response: {}", sentMessage, data);
441+
return Mono.empty();
442+
}
443+
439444
try {
440445
return Mono.just(McpSchema.deserializeJsonRpcMessage(objectMapper, data));
441446
}
442447
catch (IOException e) {
448+
// TODO: this should be a McpTransportError
443449
return Mono.error(e);
444450
}
445451
}

mcp/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.reactivestreams.FlowAdapters;
1313
import org.reactivestreams.Subscription;
1414

15+
import io.modelcontextprotocol.spec.McpError;
1516
import reactor.core.publisher.BaseSubscriber;
1617
import reactor.core.publisher.FluxSink;
1718

@@ -135,6 +136,7 @@ protected void hookOnSubscribe(Subscription subscription) {
135136

136137
@Override
137138
protected void hookOnNext(String line) {
139+
138140
if (line.isEmpty()) {
139141
// Empty line means end of event
140142
if (this.eventBuilder.length() > 0) {
@@ -164,6 +166,13 @@ else if (line.startsWith("event:")) {
164166
this.currentEventType.set(matcher.group(1).trim());
165167
}
166168
}
169+
else {
170+
// If the response is not successful, emit an error
171+
// TODO: This should be a McpTransportError
172+
this.sink.error(new McpError(
173+
"Invalid SSE response. Status code: " + this.responseInfo.statusCode() + " Line: " + line));
174+
175+
}
167176
}
168177
}
169178

mcp/src/main/java/io/modelcontextprotocol/util/Utils.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ public static boolean isEmpty(@Nullable Map<?, ?> map) {
6969
* base URL or URI is malformed
7070
*/
7171
public static URI resolveUri(URI baseUrl, String endpointUrl) {
72+
if (!Utils.hasText(endpointUrl)) {
73+
return baseUrl;
74+
}
7275
URI endpointUri = URI.create(endpointUrl);
7376
if (endpointUri.isAbsolute() && !isUnderBaseUri(baseUrl, endpointUri)) {
7477
throw new IllegalArgumentException("Absolute endpoint URL does not match the base URL.");

0 commit comments

Comments
 (0)