Skip to content

Commit c81fcc8

Browse files
author
Jay Hemnani
committed
test: add transport-layer 404 handling tests for session recovery
Add two new tests to cover the HTTP transport layer's handling of 404 responses in streamable_http.py: - test_streamable_http_transport_404_sends_session_expired: Tests that HTTP 404 response on non-init requests sends SESSION_EXPIRED error - test_streamable_http_transport_404_on_init_sends_terminated: Tests that HTTP 404 on initialization request sends session terminated error These tests use httpx.MockTransport to simulate server responses and ensure the _send_session_expired_error method and 404 handling logic in StreamableHTTPTransport are properly covered. Github-Issue:#1676
1 parent 3b52b1b commit c81fcc8

File tree

1 file changed

+204
-0
lines changed

1 file changed

+204
-0
lines changed

tests/client/test_session_recovery.py

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -581,3 +581,207 @@ async def mock_server():
581581
assert tool_params_received[0] == tool_params_received[1]
582582
assert tool_params_received[0]["name"] == "important_tool"
583583
assert tool_params_received[0]["arguments"] == {"key": "sensitive_value", "count": 42}
584+
585+
586+
@pytest.mark.anyio
587+
async def test_streamable_http_transport_404_sends_session_expired():
588+
"""Test that HTTP transport converts 404 response to SESSION_EXPIRED error.
589+
590+
This tests the transport layer directly to ensure the 404 -> SESSION_EXPIRED
591+
conversion happens correctly in streamable_http.py.
592+
"""
593+
import json
594+
595+
import httpx
596+
597+
from mcp.client.streamable_http import StreamableHTTPTransport
598+
599+
# Track requests to simulate different responses
600+
request_count = 0
601+
602+
def mock_handler(request: httpx.Request) -> httpx.Response:
603+
nonlocal request_count
604+
request_count += 1
605+
606+
if request_count == 1:
607+
# First request (initialize) - return success with SSE
608+
init_response = {
609+
"jsonrpc": "2.0",
610+
"id": 0,
611+
"result": {
612+
"protocolVersion": LATEST_PROTOCOL_VERSION,
613+
"capabilities": {},
614+
"serverInfo": {"name": "mock", "version": "0.1.0"},
615+
},
616+
}
617+
sse_data = f"event: message\ndata: {json.dumps(init_response)}\n\n"
618+
return httpx.Response(
619+
200,
620+
content=sse_data.encode(),
621+
headers={
622+
"Content-Type": "text/event-stream",
623+
"mcp-session-id": "test-session-123",
624+
},
625+
)
626+
else:
627+
# Second request - return 404 (session not found)
628+
return httpx.Response(404, content=b"Session not found")
629+
630+
transport = httpx.MockTransport(mock_handler)
631+
http_client = httpx.AsyncClient(transport=transport)
632+
633+
# Create the transport
634+
streamable_transport = StreamableHTTPTransport("http://example.com/mcp")
635+
636+
# Set up streams - read_send needs to accept SessionMessage | Exception
637+
read_send, read_receive = anyio.create_memory_object_stream[SessionMessage | Exception](10)
638+
write_send, write_receive = anyio.create_memory_object_stream[SessionMessage](10)
639+
640+
# Send an initialization request
641+
init_request = JSONRPCMessage(
642+
JSONRPCRequest(
643+
jsonrpc="2.0",
644+
id=0,
645+
method="initialize",
646+
params={
647+
"protocolVersion": LATEST_PROTOCOL_VERSION,
648+
"capabilities": {},
649+
"clientInfo": {"name": "test", "version": "0.1.0"},
650+
},
651+
)
652+
)
653+
654+
try:
655+
async with anyio.create_task_group() as tg:
656+
get_stream_started = False
657+
658+
def start_get_stream() -> None:
659+
nonlocal get_stream_started
660+
get_stream_started = True
661+
662+
async def run_post_writer():
663+
await streamable_transport.post_writer(
664+
http_client,
665+
write_receive,
666+
read_send,
667+
write_send,
668+
start_get_stream,
669+
tg,
670+
)
671+
672+
tg.start_soon(run_post_writer)
673+
674+
# Send init request
675+
await write_send.send(SessionMessage(init_request))
676+
677+
# Get init response
678+
received = await read_receive.receive()
679+
assert isinstance(received, SessionMessage)
680+
response = received
681+
assert isinstance(response.message.root, JSONRPCResponse)
682+
683+
# Now send a tool call request (will get 404)
684+
tool_request = JSONRPCMessage(
685+
JSONRPCRequest(
686+
jsonrpc="2.0",
687+
id=1,
688+
method="tools/call",
689+
params={"name": "test_tool", "arguments": {}},
690+
)
691+
)
692+
await write_send.send(SessionMessage(tool_request))
693+
694+
# Should receive SESSION_EXPIRED error
695+
received = await read_receive.receive()
696+
assert isinstance(received, SessionMessage)
697+
error_response = received
698+
assert isinstance(error_response.message.root, JSONRPCError)
699+
assert error_response.message.root.error.code == SESSION_EXPIRED
700+
701+
# Verify session_id was cleared
702+
assert streamable_transport.session_id is None
703+
704+
tg.cancel_scope.cancel()
705+
finally:
706+
# Proper cleanup of all streams
707+
await write_send.aclose()
708+
await write_receive.aclose()
709+
await read_send.aclose()
710+
await read_receive.aclose()
711+
await http_client.aclose()
712+
713+
714+
@pytest.mark.anyio
715+
async def test_streamable_http_transport_404_on_init_sends_terminated():
716+
"""Test that 404 on initialization request sends session terminated error.
717+
718+
When the server returns 404 for an initialization request, it means the
719+
session truly doesn't exist (not expired), so we send a different error.
720+
"""
721+
import httpx
722+
723+
from mcp.client.streamable_http import StreamableHTTPTransport
724+
725+
def mock_handler(request: httpx.Request) -> httpx.Response:
726+
# Return 404 for initialization request
727+
return httpx.Response(404, content=b"Session not found")
728+
729+
transport = httpx.MockTransport(mock_handler)
730+
http_client = httpx.AsyncClient(transport=transport)
731+
732+
streamable_transport = StreamableHTTPTransport("http://example.com/mcp")
733+
734+
# Set up streams - read_send needs to accept SessionMessage | Exception
735+
read_send, read_receive = anyio.create_memory_object_stream[SessionMessage | Exception](10)
736+
write_send, write_receive = anyio.create_memory_object_stream[SessionMessage](10)
737+
738+
init_request = JSONRPCMessage(
739+
JSONRPCRequest(
740+
jsonrpc="2.0",
741+
id=0,
742+
method="initialize",
743+
params={
744+
"protocolVersion": LATEST_PROTOCOL_VERSION,
745+
"capabilities": {},
746+
"clientInfo": {"name": "test", "version": "0.1.0"},
747+
},
748+
)
749+
)
750+
751+
try:
752+
async with anyio.create_task_group() as tg:
753+
754+
def start_get_stream() -> None:
755+
pass # No-op for this test
756+
757+
async def run_post_writer():
758+
await streamable_transport.post_writer(
759+
http_client,
760+
write_receive,
761+
read_send,
762+
write_send,
763+
start_get_stream,
764+
tg,
765+
)
766+
767+
tg.start_soon(run_post_writer)
768+
769+
# Send init request
770+
await write_send.send(SessionMessage(init_request))
771+
772+
# Should receive session terminated error (not expired)
773+
received = await read_receive.receive()
774+
assert isinstance(received, SessionMessage)
775+
error_response = received
776+
assert isinstance(error_response.message.root, JSONRPCError)
777+
# For initialization 404, we send session terminated, not expired
778+
assert error_response.message.root.error.code == 32600 # Session terminated
779+
780+
tg.cancel_scope.cancel()
781+
finally:
782+
# Proper cleanup of all streams
783+
await write_send.aclose()
784+
await write_receive.aclose()
785+
await read_send.aclose()
786+
await read_receive.aclose()
787+
await http_client.aclose()

0 commit comments

Comments
 (0)