diff --git a/src/mcp/server/lowlevel/server.py b/src/mcp/server/lowlevel/server.py index d2536189d..43be8da03 100644 --- a/src/mcp/server/lowlevel/server.py +++ b/src/mcp/server/lowlevel/server.py @@ -440,6 +440,7 @@ def streamable_http_app( stateless_http: bool = False, event_store: EventStore | None = None, retry_interval: int | None = None, + session_idle_timeout: float | None = None, transport_security: TransportSecuritySettings | None = None, host: str = "127.0.0.1", auth: AuthSettings | None = None, @@ -461,6 +462,7 @@ def streamable_http_app( app=self, event_store=event_store, retry_interval=retry_interval, + session_idle_timeout=session_idle_timeout, json_response=json_response, stateless=stateless_http, security_settings=transport_security, diff --git a/src/mcp/server/mcpserver/server.py b/src/mcp/server/mcpserver/server.py index fdb69571d..141e6ec2d 100644 --- a/src/mcp/server/mcpserver/server.py +++ b/src/mcp/server/mcpserver/server.py @@ -1046,6 +1046,7 @@ def streamable_http_app( stateless_http: bool = False, event_store: EventStore | None = None, retry_interval: int | None = None, + session_idle_timeout: float | None = None, transport_security: TransportSecuritySettings | None = None, host: str = "127.0.0.1", ) -> Starlette: @@ -1056,6 +1057,7 @@ def streamable_http_app( stateless_http=stateless_http, event_store=event_store, retry_interval=retry_interval, + session_idle_timeout=session_idle_timeout, transport_security=transport_security, host=host, auth=self.settings.auth, diff --git a/src/mcp/server/streamable_http.py b/src/mcp/server/streamable_http.py index 220d46f9a..9f0320db1 100644 --- a/src/mcp/server/streamable_http.py +++ b/src/mcp/server/streamable_http.py @@ -7,6 +7,7 @@ """ import logging +import math import re from abc import ABC, abstractmethod from collections.abc import AsyncGenerator, Awaitable, Callable @@ -171,6 +172,7 @@ def __init__( ] = {} self._sse_stream_writers: dict[RequestId, MemoryObjectSendStream[dict[str, str]]] = {} self._terminated = False + self._active_request_count = 0 # Idle timeout cancel scope; managed by the session manager. self.idle_scope: anyio.CancelScope | None = None @@ -179,6 +181,23 @@ def is_terminated(self) -> bool: """Check if this transport has been explicitly terminated.""" return self._terminated + def mark_request_started(self) -> None: + """Suspend idle reaping while at least one HTTP request is in flight.""" + self._active_request_count += 1 + if self.idle_scope is not None: + self.idle_scope.deadline = math.inf + + def mark_request_finished(self, idle_timeout_seconds: float | None) -> None: + """Resume idle reaping once the last in-flight request completes.""" + self._active_request_count = max(0, self._active_request_count - 1) + if ( + idle_timeout_seconds is not None + and self.idle_scope is not None + and self._active_request_count == 0 + and not self._terminated + ): + self.idle_scope.deadline = anyio.current_time() + idle_timeout_seconds + def close_sse_stream(self, request_id: RequestId) -> None: """Close SSE connection for a specific request without terminating the stream. diff --git a/src/mcp/server/streamable_http_manager.py b/src/mcp/server/streamable_http_manager.py index cec170082..657861b99 100644 --- a/src/mcp/server/streamable_http_manager.py +++ b/src/mcp/server/streamable_http_manager.py @@ -222,10 +222,14 @@ async def _handle_stateful_request(self, scope: Scope, receive: Receive, send: S await response(scope, receive, send) return logger.debug("Session already exists, handling request directly") - # Push back idle deadline on activity - if transport.idle_scope is not None and self.session_idle_timeout is not None: - transport.idle_scope.deadline = anyio.current_time() + self.session_idle_timeout # pragma: no cover - await transport.handle_request(scope, receive, send) + # Suspend idle reaping for the duration of the request so an in-flight + # request is never counted as an idle session; the deadline is pushed + # forward when the last concurrent request completes. + transport.mark_request_started() + try: + await transport.handle_request(scope, receive, send) + finally: + transport.mark_request_finished(self.session_idle_timeout) return if request_mcp_session_id is None: @@ -251,7 +255,6 @@ async def _handle_stateful_request(self, scope: Scope, receive: Receive, send: S async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORED) -> None: async with http_transport.connect() as streams: read_stream, write_stream = streams - task_status.started() try: # Use a cancel scope for idle timeout — when the # deadline passes the scope cancels app.run() and @@ -262,6 +265,10 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE idle_scope.deadline = anyio.current_time() + self.session_idle_timeout http_transport.idle_scope = idle_scope + # Signal readiness only after idle_scope is attached so the + # first request (below) can suspend reaping without a race. + task_status.started() + with idle_scope: await self.app.run( read_stream, @@ -297,7 +304,11 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE await self._task_group.start(run_server) # Handle the HTTP request and return the response - await http_transport.handle_request(scope, receive, send) + http_transport.mark_request_started() + try: + await http_transport.handle_request(scope, receive, send) + finally: + http_transport.mark_request_finished(self.session_idle_timeout) else: # Unknown or expired session ID - return 404 per MCP spec # TODO: Align error code once spec clarifies diff --git a/tests/server/test_streamable_http_manager.py b/tests/server/test_streamable_http_manager.py index f02e520ee..c37851c82 100644 --- a/tests/server/test_streamable_http_manager.py +++ b/tests/server/test_streamable_http_manager.py @@ -601,3 +601,75 @@ async def test_anonymous_session_accepts_anonymous_requests( session_id = await _open_session(manager, None) assert await _request_session(manager, session_id, None) != 404 + + +# --------------------------------------------------------------------------- +# session_idle_timeout: API exposure + in-flight request protection (#2455) +# --------------------------------------------------------------------------- + + +def test_streamable_http_app_forwards_session_idle_timeout(): + """The high-level streamable_http_app() API exposes session_idle_timeout. + + Previously the parameter only existed on StreamableHTTPSessionManager, forcing + users to drop down to manual session-manager wiring to configure idle reaping. + """ + app = Server("test-idle-expose") + app.streamable_http_app(session_idle_timeout=12.5) + assert app.session_manager.session_idle_timeout == 12.5 + + +def test_streamable_http_app_session_idle_timeout_defaults_to_none(): + app = Server("test-idle-default") + app.streamable_http_app() + assert app.session_manager.session_idle_timeout is None + + +@pytest.mark.anyio +async def test_mark_request_suspends_and_resumes_idle_reaping(): + """An in-flight request pushes the idle deadline to infinity until it completes. + + Regression for #2455: a request actively being processed must not be counted as + an idle session. The deadline is restored only when the last concurrent request + finishes, and never moved if the session has no configured timeout. + """ + transport = StreamableHTTPServerTransport(mcp_session_id=None) + scope = anyio.CancelScope() + scope.deadline = 100.0 + transport.idle_scope = scope + + # Two overlapping requests: deadline stays suspended until both finish. + transport.mark_request_started() + assert scope.deadline == float("inf") + transport.mark_request_started() + assert scope.deadline == float("inf") + + transport.mark_request_finished(idle_timeout_seconds=30.0) + # Still one request in flight -> still suspended. + assert scope.deadline == float("inf") + + transport.mark_request_finished(idle_timeout_seconds=30.0) + # Last request done -> deadline re-armed into the future. + assert scope.deadline != float("inf") + assert scope.deadline > anyio.current_time() + + +@pytest.mark.anyio +async def test_mark_request_finished_is_noop_without_idle_timeout(): + """When no idle timeout is configured the deadline is left untouched.""" + transport = StreamableHTTPServerTransport(mcp_session_id=None) + scope = anyio.CancelScope() # default deadline is +inf + transport.idle_scope = scope + + transport.mark_request_started() + transport.mark_request_finished(idle_timeout_seconds=None) + assert scope.deadline == float("inf") + + +@pytest.mark.anyio +async def test_mark_request_finished_does_not_underflow(): + """Unbalanced finishes never drive the active-request counter negative.""" + transport = StreamableHTTPServerTransport(mcp_session_id=None) + transport.mark_request_finished(idle_timeout_seconds=30.0) + transport.mark_request_finished(idle_timeout_seconds=30.0) + assert transport._active_request_count == 0