From 77b7e4b041c4e2b930eea38a913a4890d304e328 Mon Sep 17 00:00:00 2001 From: yaowubarbara <113857460+yaowubarbara@users.noreply.github.com> Date: Thu, 5 Mar 2026 23:17:43 +0800 Subject: [PATCH 1/3] fix(stdio): allow configurable memory stream buffer size in stdio_server Add read_stream_buffer_size and write_stream_buffer_size parameters to stdio_server() to allow decoupling the stdin reader from the message processor. With the default buffer_size=0, the reader blocks on send() until the processor consumes the message, causing the server to become unresponsive during slow operations. A non-zero buffer allows the reader to queue messages ahead, preventing ping timeouts and request starvation. Closes #1333 --- src/mcp/server/stdio.py | 11 ++++-- tests/server/test_stdio.py | 75 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 3 deletions(-) diff --git a/src/mcp/server/stdio.py b/src/mcp/server/stdio.py index e526bab56..28b540c8c 100644 --- a/src/mcp/server/stdio.py +++ b/src/mcp/server/stdio.py @@ -30,7 +30,12 @@ async def run_server(): @asynccontextmanager -async def stdio_server(stdin: anyio.AsyncFile[str] | None = None, stdout: anyio.AsyncFile[str] | None = None): +async def stdio_server( + stdin: anyio.AsyncFile[str] | None = None, + stdout: anyio.AsyncFile[str] | None = None, + read_stream_buffer_size: int = 0, + write_stream_buffer_size: int = 0, +): """Server transport for stdio: this communicates with an MCP client by reading from the current process' stdin and writing to stdout. """ @@ -49,8 +54,8 @@ async def stdio_server(stdin: anyio.AsyncFile[str] | None = None, stdout: anyio. write_stream: MemoryObjectSendStream[SessionMessage] write_stream_reader: MemoryObjectReceiveStream[SessionMessage] - read_stream_writer, read_stream = anyio.create_memory_object_stream(0) - write_stream, write_stream_reader = anyio.create_memory_object_stream(0) + read_stream_writer, read_stream = anyio.create_memory_object_stream(read_stream_buffer_size) + write_stream, write_stream_reader = anyio.create_memory_object_stream(write_stream_buffer_size) async def stdin_reader(): try: diff --git a/tests/server/test_stdio.py b/tests/server/test_stdio.py index 9a7ddaab4..9b3809365 100644 --- a/tests/server/test_stdio.py +++ b/tests/server/test_stdio.py @@ -59,3 +59,78 @@ async def test_stdio_server(): assert len(received_responses) == 2 assert received_responses[0] == JSONRPCRequest(jsonrpc="2.0", id=3, method="ping") assert received_responses[1] == JSONRPCResponse(jsonrpc="2.0", id=4, result={}) + + +@pytest.mark.anyio +async def test_stdio_server_with_buffer_size(): + """Test that stdio_server works with configurable buffer sizes.""" + stdin = io.StringIO() + stdout = io.StringIO() + + messages = [ + JSONRPCRequest(jsonrpc="2.0", id=1, method="ping"), + JSONRPCRequest(jsonrpc="2.0", id=2, method="ping"), + JSONRPCRequest(jsonrpc="2.0", id=3, method="ping"), + ] + + for message in messages: + stdin.write(message.model_dump_json(by_alias=True, exclude_none=True) + "\n") + stdin.seek(0) + + async with stdio_server( + stdin=anyio.AsyncFile(stdin), + stdout=anyio.AsyncFile(stdout), + read_stream_buffer_size=5, + write_stream_buffer_size=5, + ) as (read_stream, write_stream): + received_messages: list[JSONRPCMessage] = [] + async with read_stream: + async for message in read_stream: + if isinstance(message, Exception): + raise message + received_messages.append(message.message) + if len(received_messages) == 3: + break + + assert len(received_messages) == 3 + for i, msg in enumerate(received_messages, 1): + assert msg == JSONRPCRequest(jsonrpc="2.0", id=i, method="ping") + + +@pytest.mark.anyio +async def test_stdio_server_buffered_does_not_block_reader(): + """Test that a non-zero buffer allows stdin_reader to continue reading + even when the consumer is slow to process messages. + + With buffer_size=0, the reader blocks on send() until the consumer calls + receive(). With buffer_size>0, the reader can queue messages ahead. + """ + stdin = io.StringIO() + stdout = io.StringIO() + + num_messages = 5 + for i in range(1, num_messages + 1): + msg = JSONRPCRequest(jsonrpc="2.0", id=i, method="ping") + stdin.write(msg.model_dump_json(by_alias=True, exclude_none=True) + "\n") + stdin.seek(0) + + async with stdio_server( + stdin=anyio.AsyncFile(stdin), + stdout=anyio.AsyncFile(stdout), + read_stream_buffer_size=num_messages, + ) as (read_stream, write_stream): + # Give the reader time to buffer all messages + await anyio.sleep(0.1) + + received: list[JSONRPCMessage] = [] + async with read_stream: + async for message in read_stream: + if isinstance(message, Exception): + raise message + received.append(message.message) + # Simulate slow processing + await anyio.sleep(0.01) + if len(received) == num_messages: + break + + assert len(received) == num_messages From 084ac9e9567422b95d0766f4992261dd987c438c Mon Sep 17 00:00:00 2001 From: yaowubarbara <113857460+yaowubarbara@users.noreply.github.com> Date: Thu, 5 Mar 2026 23:26:12 +0800 Subject: [PATCH 2/3] fix: rename unused write_stream to _write_stream in tests --- tests/server/test_stdio.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/server/test_stdio.py b/tests/server/test_stdio.py index 9b3809365..288f64a31 100644 --- a/tests/server/test_stdio.py +++ b/tests/server/test_stdio.py @@ -82,7 +82,7 @@ async def test_stdio_server_with_buffer_size(): stdout=anyio.AsyncFile(stdout), read_stream_buffer_size=5, write_stream_buffer_size=5, - ) as (read_stream, write_stream): + ) as (read_stream, _write_stream): received_messages: list[JSONRPCMessage] = [] async with read_stream: async for message in read_stream: @@ -118,7 +118,7 @@ async def test_stdio_server_buffered_does_not_block_reader(): stdin=anyio.AsyncFile(stdin), stdout=anyio.AsyncFile(stdout), read_stream_buffer_size=num_messages, - ) as (read_stream, write_stream): + ) as (read_stream, _write_stream): # Give the reader time to buffer all messages await anyio.sleep(0.1) From 349bdf397ccc781c9912043a98771981c903d034 Mon Sep 17 00:00:00 2001 From: yaowubarbara <113857460+yaowubarbara@users.noreply.github.com> Date: Fri, 6 Mar 2026 11:10:34 +0800 Subject: [PATCH 3/3] fix: close write_stream in tests to prevent task group hang The two new buffer tests did not close write_stream, causing stdout_writer to block indefinitely on write_stream_reader iteration. This prevented the task group from exiting, hanging the test until CI timeout. --- tests/server/test_stdio.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/server/test_stdio.py b/tests/server/test_stdio.py index 288f64a31..f688eaa5b 100644 --- a/tests/server/test_stdio.py +++ b/tests/server/test_stdio.py @@ -82,7 +82,7 @@ async def test_stdio_server_with_buffer_size(): stdout=anyio.AsyncFile(stdout), read_stream_buffer_size=5, write_stream_buffer_size=5, - ) as (read_stream, _write_stream): + ) as (read_stream, write_stream): received_messages: list[JSONRPCMessage] = [] async with read_stream: async for message in read_stream: @@ -95,6 +95,7 @@ async def test_stdio_server_with_buffer_size(): assert len(received_messages) == 3 for i, msg in enumerate(received_messages, 1): assert msg == JSONRPCRequest(jsonrpc="2.0", id=i, method="ping") + await write_stream.aclose() @pytest.mark.anyio @@ -118,7 +119,7 @@ async def test_stdio_server_buffered_does_not_block_reader(): stdin=anyio.AsyncFile(stdin), stdout=anyio.AsyncFile(stdout), read_stream_buffer_size=num_messages, - ) as (read_stream, _write_stream): + ) as (read_stream, write_stream): # Give the reader time to buffer all messages await anyio.sleep(0.1) @@ -134,3 +135,4 @@ async def test_stdio_server_buffered_does_not_block_reader(): break assert len(received) == num_messages + await write_stream.aclose()