Skip to content

Commit 602e59d

Browse files
committed
fix(client): surface streamable http transport errors
1 parent cf110e3 commit 602e59d

2 files changed

Lines changed: 115 additions & 5 deletions

File tree

src/mcp/client/streamable_http.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -468,11 +468,19 @@ async def _handle_message(session_message: SessionMessage) -> None:
468468
read_stream_writer=read_stream_writer,
469469
)
470470

471-
async def handle_request_async():
472-
if is_resumption:
473-
await self._handle_resumption_request(ctx)
474-
else:
475-
await self._handle_post_request(ctx)
471+
async def handle_request_async() -> None:
472+
try:
473+
if is_resumption:
474+
await self._handle_resumption_request(ctx)
475+
else:
476+
await self._handle_post_request(ctx)
477+
except httpx.HTTPError as exc:
478+
logger.exception("Transport error handling request")
479+
if isinstance(message, JSONRPCRequest):
480+
error_data = ErrorData(code=INTERNAL_ERROR, message=f"Transport error: {exc}")
481+
error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=message.id, error=error_data))
482+
with contextlib.suppress(anyio.BrokenResourceError, anyio.ClosedResourceError):
483+
await read_stream_writer.send(error_msg)
476484

477485
# If this is a request, start a new task to handle it
478486
if isinstance(message, JSONRPCRequest):
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
import json
2+
from typing import cast
3+
4+
import anyio
5+
import httpx
6+
import pytest
7+
8+
from mcp import ClientSession
9+
from mcp.client.session_group import ClientSessionGroup, StreamableHttpParameters
10+
from mcp.client.streamable_http import streamable_http_client
11+
from mcp.shared.exceptions import MCPError
12+
from mcp.types import LATEST_PROTOCOL_VERSION, RootsListChangedNotification
13+
14+
pytestmark = pytest.mark.anyio
15+
16+
17+
def _contains_cancel_scope_error(exc: BaseException) -> bool:
18+
if isinstance(exc, RuntimeError) and "Attempted to exit cancel scope" in str(exc):
19+
return True
20+
21+
raw_grouped_exceptions = getattr(exc, "exceptions", ())
22+
if isinstance(raw_grouped_exceptions, tuple) and raw_grouped_exceptions:
23+
grouped_exceptions = cast(tuple[BaseException, ...], raw_grouped_exceptions)
24+
return any(_contains_cancel_scope_error(inner) for inner in grouped_exceptions)
25+
26+
return any(_contains_cancel_scope_error(inner) for inner in (exc.__cause__, exc.__context__) if inner is not None)
27+
28+
29+
def test_contains_cancel_scope_error_follows_exception_tree() -> None:
30+
cancel_scope_error = RuntimeError("Attempted to exit cancel scope in a different task than it was entered in")
31+
wrapped = RuntimeError("wrapped")
32+
wrapped.__cause__ = cancel_scope_error
33+
34+
assert _contains_cancel_scope_error(wrapped)
35+
36+
37+
def test_contains_cancel_scope_error_follows_grouped_exceptions() -> None:
38+
cancel_scope_error = RuntimeError("Attempted to exit cancel scope in a different task than it was entered in")
39+
40+
class DummyGroup(Exception):
41+
def __init__(self) -> None:
42+
self.exceptions = (cancel_scope_error,)
43+
44+
assert _contains_cancel_scope_error(DummyGroup())
45+
46+
47+
async def test_session_group_streamable_http_connect_error_is_catchable(
48+
monkeypatch: pytest.MonkeyPatch,
49+
) -> None:
50+
async def raise_connect_error(request: httpx.Request) -> httpx.Response:
51+
raise httpx.ConnectError("server unavailable", request=request)
52+
53+
def mock_http_client(
54+
headers: dict[str, str] | None = None,
55+
timeout: httpx.Timeout | None = None,
56+
auth: httpx.Auth | None = None,
57+
) -> httpx.AsyncClient:
58+
return httpx.AsyncClient(
59+
auth=auth,
60+
headers=headers,
61+
timeout=timeout,
62+
transport=httpx.MockTransport(raise_connect_error),
63+
)
64+
65+
monkeypatch.setattr("mcp.client.session_group.create_mcp_http_client", mock_http_client)
66+
67+
async with ClientSessionGroup() as group:
68+
with anyio.fail_after(5), pytest.raises(MCPError) as exc_info:
69+
await group.connect_to_server(StreamableHttpParameters(url="http://example.invalid/mcp"))
70+
71+
assert "Transport error: server unavailable" in exc_info.value.error.message
72+
assert not _contains_cancel_scope_error(exc_info.value)
73+
74+
75+
async def test_streamable_http_notification_transport_error_does_not_crash() -> None:
76+
async def handle_request(request: httpx.Request) -> httpx.Response:
77+
data = json.loads(request.content)
78+
if data.get("method") == "initialize":
79+
return httpx.Response(
80+
200,
81+
headers={"content-type": "application/json"},
82+
json={
83+
"jsonrpc": "2.0",
84+
"id": data["id"],
85+
"result": {
86+
"protocolVersion": LATEST_PROTOCOL_VERSION,
87+
"capabilities": {},
88+
"serverInfo": {"name": "mock-server", "version": "1.0.0"},
89+
},
90+
},
91+
)
92+
93+
raise httpx.ConnectError("notification failed", request=request)
94+
95+
async with (
96+
httpx.AsyncClient(transport=httpx.MockTransport(handle_request)) as http_client,
97+
streamable_http_client("http://example.invalid/mcp", http_client=http_client) as (read_stream, write_stream),
98+
ClientSession(read_stream, write_stream) as session,
99+
):
100+
await session.initialize()
101+
await session.send_notification(RootsListChangedNotification(method="notifications/roots/list_changed"))
102+
await anyio.sleep(0)

0 commit comments

Comments
 (0)