diff --git a/crates/rmcp/src/transport/async_rw.rs b/crates/rmcp/src/transport/async_rw.rs index 2ef0aae2..62ab86b4 100644 --- a/crates/rmcp/src/transport/async_rw.rs +++ b/crates/rmcp/src/transport/async_rw.rs @@ -143,15 +143,30 @@ where Ok(Some(msg)) => return Some(msg), Ok(None) => continue, Err(JsonRpcMessageCodecError::Serde(e)) => { - tracing::debug!("Parse error on incoming message: {e}"); - let mut write = self.write.lock().await; - let framed = write.as_mut()?; - let response = TxJsonRpcMessage::::error( - ErrorData::parse_error("Parse error", None), - None, - ); - if framed.send(response).await.is_err() { - return None; + match e.classify() { + serde_json::error::Category::Syntax | serde_json::error::Category::Eof => { + // The input isn't valid JSON, so there's no message id to correlate a + // response to, and replying to invalid data can trigger an error storm + // if the peer echoes the response back as more invalid data. This + // matches the other official MCP SDKs, which ignore unparsable input. + // See https://github.com/modelcontextprotocol/rust-sdk/issues/938 + tracing::debug!("Ignoring unparsable incoming message: {e}"); + } + serde_json::error::Category::Data | serde_json::error::Category::Io => { + // Valid JSON that doesn't match the expected message shape is a real + // protocol error rather than unparsable input, so surface it with a + // response instead of silently dropping it. + tracing::debug!("Protocol error on incoming message: {e}"); + let mut write = self.write.lock().await; + let framed = write.as_mut()?; + let response = TxJsonRpcMessage::::error( + ErrorData::parse_error("Parse error", None), + None, + ); + if framed.send(response).await.is_err() { + return None; + } + } } } Err(e) => { @@ -618,8 +633,8 @@ mod test { #[cfg(feature = "server")] #[tokio::test] - async fn receive_recovers_from_parse_error() { - use tokio::io::AsyncWriteExt; + async fn receive_ignores_parse_error() { + use tokio::io::{AsyncReadExt, AsyncWriteExt}; use crate::{RoleServer, transport::Transport}; @@ -638,28 +653,70 @@ mod test { .await .unwrap(); + // The unparsable line is skipped and the next valid message is still yielded. let received = transport .receive() .await - .expect("transport should recover and yield the next valid message"); + .expect("transport should skip the invalid line and yield the next valid message"); + assert_eq!( + serde_json::to_value(&received).unwrap()["method"], + "notifications/initialized", + ); + + // No response is sent back for the unparsable message (issue #938). Dropping the + // transport closes its write side, so the peer reads to EOF and should see no bytes. + drop(transport); + let mut reply_buf = Vec::new(); + client_r.read_to_end(&mut reply_buf).await.unwrap(); + assert!( + reply_buf.is_empty(), + "expected no response to an unparsable message, got: {}", + String::from_utf8_lossy(&reply_buf), + ); + } + + #[cfg(feature = "server")] + #[tokio::test] + async fn receive_responds_to_protocol_error() { + use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; + + use crate::{RoleServer, transport::Transport}; + + let (server_io, client_io) = tokio::io::duplex(4096); + let (server_r, server_w) = tokio::io::split(server_io); + let (client_r, mut client_w) = tokio::io::split(client_io); + + let mut transport = AsyncRwTransport::::new(server_r, server_w); + + // Well-formed JSON that does not match the JSON-RPC message shape, followed by a + // valid notification. Unlike unparsable bytes, this is a protocol error: the + // transport should reply to it and still yield the next valid message. + client_w + .write_all( + b"{\"foo\":\"bar\"}\n{\"jsonrpc\":\"2.0\",\"method\":\"notifications/initialized\"}\n", + ) + .await + .unwrap(); - // Read one line back from the peer side and parse as JSON. + let received = transport.receive().await.expect( + "transport should reply to the protocol error and yield the next valid message", + ); + assert_eq!( + serde_json::to_value(&received).unwrap()["method"], + "notifications/initialized", + ); + + // A protocol error gets an error response back (id omitted since it can't be read). let mut reply_buf = Vec::new(); - let mut peer = tokio::io::BufReader::new(&mut client_r); + let mut peer = BufReader::new(client_r); peer.read_until(b'\n', &mut reply_buf).await.unwrap(); let reply: serde_json::Value = serde_json::from_slice(&reply_buf).unwrap(); - - // Per MCP 2025-11-25: id is omitted when the server can't read the request id. assert_eq!( reply, serde_json::json!({ "jsonrpc": "2.0", "error": {"code": -32700, "message": "Parse error"}, - }) - ); - assert_eq!( - serde_json::to_value(&received).unwrap()["method"], - "notifications/initialized", + }), ); } }