diff --git a/src/mcp/client/stdio.py b/src/mcp/client/stdio.py index 902dc8576..38dc645c0 100644 --- a/src/mcp/client/stdio.py +++ b/src/mcp/client/stdio.py @@ -153,12 +153,24 @@ async def stdout_reader(): message = types.jsonrpc_message_adapter.validate_json(line, by_name=False) except Exception as exc: # pragma: no cover logger.exception("Failed to parse JSONRPC message from server") - await read_stream_writer.send(exc) + try: + await read_stream_writer.send(exc) + except (anyio.ClosedResourceError, anyio.BrokenResourceError): + return continue session_message = SessionMessage(message) - await read_stream_writer.send(session_message) - except anyio.ClosedResourceError: # pragma: lax no cover + try: + await read_stream_writer.send(session_message) + except (anyio.ClosedResourceError, anyio.BrokenResourceError): + # The caller exited the stdio_client context while the + # subprocess was still writing; the reader stream has been + # closed (ClosedResourceError) or the closure raced our + # in-flight send (BrokenResourceError). Either way there + # is nowhere to deliver this message, so shut down + # cleanly. Fixes #1960. + return + except (anyio.ClosedResourceError, anyio.BrokenResourceError): # pragma: lax no cover await anyio.lowlevel.checkpoint() async def stdin_writer(): diff --git a/tests/client/test_stdio.py b/tests/client/test_stdio.py index 06e2cba4b..9e87aac22 100644 --- a/tests/client/test_stdio.py +++ b/tests/client/test_stdio.py @@ -37,6 +37,45 @@ async def test_stdio_context_manager_exiting(): pass +@pytest.mark.anyio +async def test_stdio_client_exits_cleanly_while_server_still_writing(): + """Regression test for #1960. + + Exiting the ``stdio_client`` context while the subprocess is still writing to + stdout used to surface ``anyio.BrokenResourceError`` through the task group + (as an ``ExceptionGroup``). The ``finally`` block closes + ``read_stream_writer`` while the background ``stdout_reader`` task is + mid-``send``. + + The fix makes ``stdout_reader`` catch both ``ClosedResourceError`` and + ``BrokenResourceError`` and return cleanly, so exiting the context is a + no-op no matter what the subprocess is doing. + """ + # A server that emits a large burst of valid JSON-RPC notifications without + # ever reading stdin. When we exit the context below, the subprocess is + # still in the middle of that burst, which is the exact shape of the race. + noisy_script = textwrap.dedent( + """ + import sys + for i in range(1000): + sys.stdout.write( + '{"jsonrpc":"2.0","method":"notifications/message",' + '"params":{"level":"info","data":"line ' + str(i) + '"}}\\n' + ) + sys.stdout.flush() + """ + ) + + server_params = StdioServerParameters(command=sys.executable, args=["-c", noisy_script]) + + # The ``async with`` must complete without an ``ExceptionGroup`` / + # ``BrokenResourceError`` propagating. ``anyio.fail_after`` prevents a + # regression from hanging CI. + with anyio.fail_after(5.0): + async with stdio_client(server_params) as (_, _): + pass + + @pytest.mark.anyio @pytest.mark.skipif(tee is None, reason="could not find tee command") async def test_stdio_client():