diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 9f3dd5e0b..0451e72d1 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -257,48 +257,60 @@ async def _handle_post_request(self, ctx: RequestContext) -> None: message = ctx.session_message.message is_initialization = self._is_initialization_request(message) - async with ctx.client.stream( - "POST", - self.url, - json=message.model_dump(by_alias=True, mode="json", exclude_unset=True), - headers=headers, - ) as response: - if response.status_code == 202: - logger.debug("Received 202 Accepted") - return - - if response.status_code == 404: # pragma: no branch - if isinstance(message, JSONRPCRequest): # pragma: no branch - error_data = ErrorData(code=INVALID_REQUEST, message="Session terminated") - session_message = SessionMessage(JSONRPCError(jsonrpc="2.0", id=message.id, error=error_data)) - await ctx.read_stream_writer.send(session_message) - return + try: + async with ctx.client.stream( + "POST", + self.url, + json=message.model_dump(by_alias=True, mode="json", exclude_unset=True), + headers=headers, + ) as response: + if response.status_code == 202: + logger.debug("Received 202 Accepted") + return - if response.status_code >= 400: - if isinstance(message, JSONRPCRequest): - error_data = ErrorData(code=INTERNAL_ERROR, message="Server returned an error response") - session_message = SessionMessage(JSONRPCError(jsonrpc="2.0", id=message.id, error=error_data)) - await ctx.read_stream_writer.send(session_message) - return + if response.status_code >= 400: + # Read body for error detail + await response.aread() + body_text = response.text[:200] if response.text else "" + error_msg = ( + f"HTTP {response.status_code}: {body_text}" if body_text else f"HTTP {response.status_code}" + ) + if isinstance(message, JSONRPCRequest): + error_data = ErrorData( + code=INTERNAL_ERROR, + message=error_msg, + ) + session_message = SessionMessage(JSONRPCError(jsonrpc="2.0", id=message.id, error=error_data)) + await ctx.read_stream_writer.send(session_message) + return - if is_initialization: - self._maybe_extract_session_id_from_response(response) + if is_initialization: + self._maybe_extract_session_id_from_response(response) - # Per https://modelcontextprotocol.io/specification/2025-06-18/basic#notifications: - # The server MUST NOT send a response to notifications. + # Per https://modelcontextprotocol.io/specification/2025-06-18/basic#notifications: + # The server MUST NOT send a response to notifications. + if isinstance(message, JSONRPCRequest): + content_type = response.headers.get("content-type", "").lower() + if content_type.startswith("application/json"): + await self._handle_json_response( + response, ctx.read_stream_writer, is_initialization, request_id=message.id + ) + elif content_type.startswith("text/event-stream"): + await self._handle_sse_response(response, ctx, is_initialization) + else: + logger.error(f"Unexpected content type: {content_type}") + error_data = ErrorData(code=INVALID_REQUEST, message=f"Unexpected content type: {content_type}") + error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=message.id, error=error_data)) + await ctx.read_stream_writer.send(error_msg) + except Exception as exc: + # Propagate connection/transport errors to the caller via the read stream + # so they don't hang waiting for a response that will never arrive. if isinstance(message, JSONRPCRequest): - content_type = response.headers.get("content-type", "").lower() - if content_type.startswith("application/json"): - await self._handle_json_response( - response, ctx.read_stream_writer, is_initialization, request_id=message.id - ) - elif content_type.startswith("text/event-stream"): - await self._handle_sse_response(response, ctx, is_initialization) - else: - logger.error(f"Unexpected content type: {content_type}") - error_data = ErrorData(code=INVALID_REQUEST, message=f"Unexpected content type: {content_type}") - error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=message.id, error=error_data)) - await ctx.read_stream_writer.send(error_msg) + error_data = ErrorData(code=INTERNAL_ERROR, message=str(exc)) + session_message = SessionMessage(JSONRPCError(jsonrpc="2.0", id=message.id, error=error_data)) + with contextlib.suppress(Exception): + await ctx.read_stream_writer.send(session_message) + raise async def _handle_json_response( self,