From 1a29299ff1ec8ceb30d4a2a084149430bf27b877 Mon Sep 17 00:00:00 2001 From: Richard Lee Date: Mon, 29 Jun 2026 20:36:31 -0500 Subject: [PATCH 1/3] bound rendezvous websocket liveness --- codex-rs/exec-server/src/lib.rs | 1 + .../exec-server/src/noise_relay/harness.rs | 195 +++++++++++++----- .../src/noise_relay/harness_tests.rs | 139 +++++++++++++ codex-rs/exec-server/src/relay.rs | 102 ++++++++- codex-rs/exec-server/src/relay_noise_tests.rs | 81 ++++++++ codex-rs/exec-server/src/remote.rs | 12 +- .../src/websocket_pong_watchdog.rs | 44 ++++ .../src/websocket_pong_watchdog_tests.rs | 34 +++ 8 files changed, 546 insertions(+), 62 deletions(-) create mode 100644 codex-rs/exec-server/src/noise_relay/harness_tests.rs create mode 100644 codex-rs/exec-server/src/websocket_pong_watchdog.rs create mode 100644 codex-rs/exec-server/src/websocket_pong_watchdog_tests.rs diff --git a/codex-rs/exec-server/src/lib.rs b/codex-rs/exec-server/src/lib.rs index 0068c2cc1ac3..882c83230c7c 100644 --- a/codex-rs/exec-server/src/lib.rs +++ b/codex-rs/exec-server/src/lib.rs @@ -29,6 +29,7 @@ mod sandboxed_file_system; mod server; mod telemetry; mod trace_context; +mod websocket_pong_watchdog; use codex_exec_server_protocol as protocol; diff --git a/codex-rs/exec-server/src/noise_relay/harness.rs b/codex-rs/exec-server/src/noise_relay/harness.rs index 1989525224f6..a2ca66ac2039 100644 --- a/codex-rs/exec-server/src/noise_relay/harness.rs +++ b/codex-rs/exec-server/src/noise_relay/harness.rs @@ -6,6 +6,7 @@ //! normal `JsonRpcConnection`. Outbound JSON-RPC is framed and split into Noise //! records; inbound records are reordered before decryption and reassembly. +use futures::FutureExt; use futures::Sink; use futures::SinkExt; use futures::Stream; @@ -24,6 +25,7 @@ use crate::connection::CHANNEL_CAPACITY; use crate::connection::JsonRpcConnection; use crate::connection::JsonRpcConnectionEvent; use crate::connection::JsonRpcTransport; +use crate::connection::WEBSOCKET_KEEPALIVE_INTERVAL; use crate::noise_channel::InitiatorHandshake; use crate::noise_channel::NoiseChannelIdentity; use crate::noise_channel::NoiseChannelPublicKey; @@ -39,6 +41,9 @@ use crate::relay::decode_relay_message_frame; use crate::relay::encode_relay_message_frame; use crate::relay_proto::RelayData; use crate::relay_proto::RelayMessageFrame; +use crate::websocket_pong_watchdog::WEBSOCKET_PONG_TIMEOUT; +use crate::websocket_pong_watchdog::WEBSOCKET_PONG_TIMEOUT_REASON; +use crate::websocket_pong_watchdog::WebSocketPongWatchdog; /// Values that bind one harness websocket to the intended executor registration. /// @@ -58,6 +63,8 @@ pub(crate) struct NoiseHarnessConnectionArgs { // Preserve the availability signal while replacing attacker-controlled reason // text before it reaches disconnect diagnostics. const NOISE_RELAY_RESET_DISCONNECT_REASON: &str = "Noise relay stream reset"; +// Give a Pong already queued behind data a bounded chance to reach the reader. +const MAX_FRAMES_DRAINED_AFTER_PONG_DEADLINE: usize = 32; /// Adapt one harness rendezvous websocket into an authenticated JSON-RPC connection. /// @@ -112,8 +119,7 @@ where &incoming_tx, &disconnected_tx, format!("failed to start Noise relay handshake: {error}"), - ) - .await; + ); return; } }; @@ -147,8 +153,7 @@ where &incoming_tx, &disconnected_tx, "Noise relay websocket ended during handshake".to_string(), - ) - .await; + ); return; }; let message = match incoming_message { @@ -158,8 +163,7 @@ where &incoming_tx, &disconnected_tx, "Noise relay websocket received close frame during handshake".to_string(), - ) - .await; + ); return; } Ok(Message::Ping(_) | Message::Pong(_) | Message::Frame(_)) => continue, @@ -168,8 +172,7 @@ where &incoming_tx, &disconnected_tx, "Noise relay transport expects binary protobuf frames".to_string(), - ) - .await; + ); return; } Err(error) => { @@ -179,8 +182,7 @@ where format!( "failed to read Noise relay websocket from {connection_label}: {error}" ), - ) - .await; + ); return; } }; @@ -191,8 +193,7 @@ where &incoming_tx, &disconnected_tx, format!("failed to parse Noise relay frame: {error}"), - ) - .await; + ); return; } }; @@ -209,8 +210,7 @@ where &incoming_tx, &disconnected_tx, format!("invalid Noise relay handshake response: {error}"), - ) - .await; + ); return; } }; @@ -228,8 +228,7 @@ where &incoming_tx, &disconnected_tx, format!("Noise relay handshake failed: {error}"), - ) - .await; + ); return; } } @@ -239,8 +238,7 @@ where &incoming_tx, &disconnected_tx, NOISE_RELAY_RESET_DISCONNECT_REASON.to_string(), - ) - .await; + ); return; } Ok( @@ -253,8 +251,7 @@ where &incoming_tx, &disconnected_tx, "Noise relay received data before handshake completion".to_string(), - ) - .await; + ); return; } } @@ -263,12 +260,23 @@ where // After the handshake, each relay sequence maps to exactly one Noise // transport record. Outbound records are encrypted once; inbound // records are reordered and deduplicated before decryption. + let mut websocket = websocket.peekable(); let mut next_outbound_seq = 0u32; let mut inbound_ciphertexts = OrderedCiphertextFrames::default(); let mut inbound_decoder = JsonRpcMessageDecoder::default(); + let mut keepalive = tokio::time::interval_at( + tokio::time::Instant::now() + WEBSOCKET_KEEPALIVE_INTERVAL, + WEBSOCKET_KEEPALIVE_INTERVAL, + ); + keepalive.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let mut pong_watchdog = WebSocketPongWatchdog::new(WEBSOCKET_PONG_TIMEOUT); + let pong_deadline = tokio::time::sleep(WEBSOCKET_PONG_TIMEOUT); + tokio::pin!(pong_deadline); + let mut drain_incoming_before_timeout = false; + let mut frames_drained_after_pong_deadline = 0usize; 'relay: loop { tokio::select! { - maybe_message = outgoing_rx.recv() => { + maybe_message = outgoing_rx.recv(), if !drain_incoming_before_timeout => { let Some(message) = maybe_message else { break; }; @@ -295,16 +303,60 @@ where } }; let frame = RelayMessageFrame::data(stream_id.clone(), seq, ciphertext); - if let Err(error) = websocket - .send(Message::Binary(encode_relay_message_frame(&frame).into())) - .await + if let Err(error) = send_websocket_message( + &mut websocket, + Message::Binary(encode_relay_message_frame(&frame).into()), + pong_watchdog.write_deadline(tokio::time::Instant::now()), + ) + .await { warn!("failed to write Noise relay websocket: {error}"); break 'relay; } } } + _ = &mut pong_deadline, if pong_watchdog.deadline().is_some() && !drain_incoming_before_timeout => { + if frames_drained_after_pong_deadline + < MAX_FRAMES_DRAINED_AFTER_PONG_DEADLINE + && std::pin::Pin::new(&mut websocket) + .peek() + .now_or_never() + .is_some() + { + frames_drained_after_pong_deadline += 1; + drain_incoming_before_timeout = true; + continue; + } + warn!( + noise_reason = WEBSOCKET_PONG_TIMEOUT_REASON, + "Noise harness rendezvous websocket disconnected" + ); + send_disconnected( + &incoming_tx, + &disconnected_tx, + WEBSOCKET_PONG_TIMEOUT_REASON.to_string(), + ); + return; + } + _ = keepalive.tick(), if pong_watchdog.deadline().is_none() => { + if let Err(error) = send_websocket_message( + &mut websocket, + Message::Ping(Vec::new().into()), + pong_watchdog.write_deadline(tokio::time::Instant::now()), + ) + .await + { + warn!("failed to write Noise relay keepalive ping: {error}"); + break; + } + pong_watchdog.ping_sent(tokio::time::Instant::now()); + frames_drained_after_pong_deadline = 0; + if let Some(deadline) = pong_watchdog.deadline() { + pong_deadline.as_mut().reset(deadline); + } + } incoming_message = websocket.next() => { + drain_incoming_before_timeout = false; let Some(incoming_message) = incoming_message else { break; }; @@ -313,7 +365,7 @@ where let frame = match decode_relay_message_frame(payload.as_ref()) { Ok(frame) => frame, Err(error) => { - send_malformed(&incoming_tx, error.to_string()).await; + send_malformed(&incoming_tx, error.to_string()); break; } }; @@ -325,7 +377,7 @@ where let data = match frame.into_data() { Ok(data) => data, Err(error) => { - send_malformed(&incoming_tx, error.to_string()).await; + send_malformed(&incoming_tx, error.to_string()); break; } }; @@ -334,22 +386,26 @@ where &mut transport, &mut inbound_decoder, data, + pong_watchdog.write_deadline(tokio::time::Instant::now()), &incoming_tx, ) .await { - send_malformed(&incoming_tx, error.to_string()).await; + if matches!(error, ExecServerError::Closed) { + break; + } + send_malformed(&incoming_tx, error.to_string()); break; } } Ok(RelayFrameBodyKind::Reset) => { - let _ = incoming_tx - .send(JsonRpcConnectionEvent::Disconnected { + let _ = incoming_tx.try_send( + JsonRpcConnectionEvent::Disconnected { reason: Some( NOISE_RELAY_RESET_DISCONNECT_REASON.to_string(), ), - }) - .await; + }, + ); break; } Ok( @@ -361,20 +417,22 @@ where send_malformed( &incoming_tx, "Noise relay received invalid post-handshake frame".to_string(), - ) - .await; + ); break; } } } Ok(Message::Close(_)) => break, - Ok(Message::Ping(_) | Message::Pong(_) | Message::Frame(_)) => {} + Ok(Message::Pong(_)) => { + pong_watchdog.received_pong(); + frames_drained_after_pong_deadline = 0; + } + Ok(Message::Ping(_) | Message::Frame(_)) => {} Ok(Message::Text(_)) => { send_malformed( &incoming_tx, "Noise relay transport expects binary protobuf frames".to_string(), - ) - .await; + ); break; } Err(error) => { @@ -398,6 +456,22 @@ where } } +async fn send_websocket_message( + websocket: &mut T, + message: Message, + deadline: tokio::time::Instant, +) -> Result<(), String> +where + T: Sink + Unpin, + E: std::fmt::Display, +{ + match tokio::time::timeout_at(deadline, websocket.send(message)).await { + Ok(Ok(())) => Ok(()), + Ok(Err(error)) => Err(error.to_string()), + Err(_) => Err("websocket write timed out".to_string()), + } +} + /// Order and decrypt one relay frame, then emit any complete JSON-RPC messages. /// Relay records and JSON-RPC messages do not share boundaries, so reassembly /// happens after decryption. @@ -406,6 +480,7 @@ async fn receive_data( transport: &mut NoiseTransport, decoder: &mut JsonRpcMessageDecoder, data: RelayData, + delivery_deadline: tokio::time::Instant, incoming_tx: &mpsc::Sender, ) -> Result<(), ExecServerError> { // Ordering must happen before decryption because Noise transport nonces are @@ -419,30 +494,50 @@ async fn receive_data( // The authenticated byte stream can carry partial or multiple JSON-RPC // messages; emit only complete, successfully parsed messages. for message in decoder.push(&plaintext)? { - incoming_tx - .send(JsonRpcConnectionEvent::Message(message)) - .await - .map_err(|_| ExecServerError::Closed)?; + send_incoming_event( + incoming_tx, + JsonRpcConnectionEvent::Message(message), + delivery_deadline, + ) + .await?; } } Ok(()) } -async fn send_malformed(incoming_tx: &mpsc::Sender, reason: String) { - let _ = incoming_tx - .send(JsonRpcConnectionEvent::MalformedMessage { reason }) - .await; +async fn send_incoming_event( + incoming_tx: &mpsc::Sender, + event: JsonRpcConnectionEvent, + deadline: tokio::time::Instant, +) -> Result<(), ExecServerError> { + match tokio::time::timeout_at(deadline, incoming_tx.send(event)).await { + Ok(Ok(())) => Ok(()), + Ok(Err(_)) => Err(ExecServerError::Closed), + Err(_) => { + warn!( + noise_reason = "application_backpressure", + "Noise harness application event delivery timed out" + ); + Err(ExecServerError::Closed) + } + } +} + +fn send_malformed(incoming_tx: &mpsc::Sender, reason: String) { + let _ = incoming_tx.try_send(JsonRpcConnectionEvent::MalformedMessage { reason }); } -async fn send_disconnected( +fn send_disconnected( incoming_tx: &mpsc::Sender, disconnected_tx: &watch::Sender, reason: String, ) { let _ = disconnected_tx.send(true); - let _ = incoming_tx - .send(JsonRpcConnectionEvent::Disconnected { - reason: Some(reason), - }) - .await; + let _ = incoming_tx.try_send(JsonRpcConnectionEvent::Disconnected { + reason: Some(reason), + }); } + +#[cfg(test)] +#[path = "harness_tests.rs"] +mod tests; diff --git a/codex-rs/exec-server/src/noise_relay/harness_tests.rs b/codex-rs/exec-server/src/noise_relay/harness_tests.rs new file mode 100644 index 000000000000..bfd0562b858d --- /dev/null +++ b/codex-rs/exec-server/src/noise_relay/harness_tests.rs @@ -0,0 +1,139 @@ +use std::time::Duration; + +use anyhow::Context; +use anyhow::Result; +use futures::SinkExt; +use futures::StreamExt; +use pretty_assertions::assert_eq; +use tokio::net::TcpListener; +use tokio::sync::mpsc; +use tokio::time::Instant; +use tokio::time::timeout; +use tokio_tungstenite::accept_async; +use tokio_tungstenite::connect_async; +use tokio_tungstenite::tungstenite::Message; + +use super::*; +use crate::connection::JsonRpcConnectionEvent; +use crate::noise_channel::PendingResponderHandshake; + +const ENVIRONMENT_ID: &str = "environment-1"; +const EXECUTOR_REGISTRATION_ID: &str = "registration-1"; + +#[tokio::test] +async fn pong_keeps_harness_alive_until_peer_stops_responding() -> Result<()> { + let listener = TcpListener::bind("127.0.0.1:0").await?; + let websocket_url = format!("ws://{}", listener.local_addr()?); + let harness_connection = tokio::spawn(connect_async(websocket_url)); + let (socket, _peer_addr) = listener.accept().await?; + let mut executor_websocket = accept_async(socket).await?; + let (harness_websocket, _response) = harness_connection.await??; + + let executor_identity = NoiseChannelIdentity::generate()?; + let mut connection = noise_harness_connection_from_websocket( + harness_websocket, + NoiseHarnessConnectionArgs { + connection_label: "test rendezvous".to_string(), + environment_id: ENVIRONMENT_ID.to_string(), + executor_registration_id: EXECUTOR_REGISTRATION_ID.to_string(), + identity: NoiseChannelIdentity::generate()?, + responder_public_key: executor_identity.public_key(), + harness_key_authorization: "authorization".to_string(), + }, + ); + + let resume_message = timeout(Duration::from_secs(1), executor_websocket.next()) + .await? + .context("harness closed before sending resume")??; + let Message::Binary(resume_payload) = resume_message else { + anyhow::bail!("expected resume frame, got {resume_message:?}"); + }; + let resume = decode_relay_message_frame(resume_payload.as_ref())?; + assert_eq!(resume.validate()?, RelayFrameBodyKind::Resume); + + let handshake_message = timeout(Duration::from_secs(1), executor_websocket.next()) + .await? + .context("harness closed before sending handshake")??; + let Message::Binary(handshake_payload) = handshake_message else { + anyhow::bail!("expected handshake frame, got {handshake_message:?}"); + }; + let handshake = decode_relay_message_frame(handshake_payload.as_ref())?; + assert_eq!(handshake.stream_id, resume.stream_id); + let stream_id = handshake.stream_id.clone(); + let prologue = + noise_channel_prologue(ENVIRONMENT_ID, EXECUTOR_REGISTRATION_ID, stream_id.as_str()); + let pending = PendingResponderHandshake::read_request( + &executor_identity, + &prologue, + &handshake.into_handshake_payload()?, + )?; + let (_transport, response) = pending.complete()?; + let response = RelayMessageFrame::handshake(stream_id, response); + executor_websocket + .send(Message::Binary( + encode_relay_message_frame(&response).into(), + )) + .await?; + + let mut pings = 0; + while pings < 6 { + let message = timeout(Duration::from_secs(1), executor_websocket.next()) + .await? + .context("harness disconnected before six keepalive pings")??; + match message { + Message::Ping(payload) => { + executor_websocket.send(Message::Pong(payload)).await?; + pings += 1; + } + Message::Pong(_) | Message::Frame(_) => {} + message => anyhow::bail!("expected keepalive ping, got {message:?}"), + } + } + + // Keep non-Pong traffic flowing after responses stop. It must not defeat + // the bounded grace for a Pong already queued behind data. + let unrelated_frame = + encode_relay_message_frame(&RelayMessageFrame::resume("unrelated-stream".to_string())); + let traffic_task = tokio::spawn(async move { + loop { + if executor_websocket + .send(Message::Binary(unrelated_frame.clone().into())) + .await + .is_err() + { + break; + } + tokio::time::sleep(Duration::from_millis(5)).await; + } + }); + let event = timeout(Duration::from_secs(1), connection.incoming_rx.recv()).await?; + traffic_task.abort(); + let _ = traffic_task.await; + let Some(JsonRpcConnectionEvent::Disconnected { reason }) = event else { + anyhow::bail!("expected pong timeout, got {event:?}"); + }; + assert_eq!(reason.as_deref(), Some(WEBSOCKET_PONG_TIMEOUT_REASON)); + Ok(()) +} + +#[tokio::test] +async fn application_event_delivery_is_bounded() -> Result<()> { + let (incoming_tx, _incoming_rx) = mpsc::channel(1); + incoming_tx + .send(JsonRpcConnectionEvent::MalformedMessage { + reason: "fill queue".to_string(), + }) + .await?; + + let result = send_incoming_event( + &incoming_tx, + JsonRpcConnectionEvent::MalformedMessage { + reason: "blocked event".to_string(), + }, + Instant::now() + Duration::from_millis(10), + ) + .await; + + assert!(matches!(result, Err(ExecServerError::Closed))); + Ok(()) +} diff --git a/codex-rs/exec-server/src/relay.rs b/codex-rs/exec-server/src/relay.rs index d7e3d9c2a7fc..b8d477f5dbb1 100644 --- a/codex-rs/exec-server/src/relay.rs +++ b/codex-rs/exec-server/src/relay.rs @@ -41,6 +41,9 @@ use crate::relay_proto::RelayReset; use crate::relay_proto::RelayResume; use crate::relay_proto::relay_message_frame; use crate::server::ConnectionProcessor; +use crate::websocket_pong_watchdog::WEBSOCKET_PONG_TIMEOUT; +use crate::websocket_pong_watchdog::WEBSOCKET_PONG_TIMEOUT_REASON; +use crate::websocket_pong_watchdog::WebSocketPongWatchdog; const RELAY_MESSAGE_FRAME_VERSION: u32 = 1; const MAX_ACTIVE_NOISE_RELAY_STREAMS: usize = 128; @@ -49,6 +52,27 @@ const MAX_HARNESS_KEY_AUTHORIZATION_BYTES: usize = 4096; const MAX_PENDING_HANDSHAKE_VALIDATIONS: usize = 32; const HARNESS_KEY_VALIDATION_TIMEOUT: Duration = Duration::from_secs(10); +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub(crate) enum RendezvousDisconnectReason { + PeerClose, + ReadError, + WriteError, + PongTimeout, + LocalShutdown, +} + +impl RendezvousDisconnectReason { + pub(crate) fn as_str(self) -> &'static str { + match self { + Self::PeerClose => "peer_close", + Self::ReadError => "read_error", + Self::WriteError => "write_error", + Self::PongTimeout => WEBSOCKET_PONG_TIMEOUT_REASON, + Self::LocalShutdown => "local_shutdown", + } + } +} + #[derive(Debug, Clone, Copy, Eq, PartialEq)] pub(crate) enum RelayFrameBodyKind { Data, @@ -448,7 +472,8 @@ pub(crate) async fn run_multiplexed_environment( executor_registration_id: String, identity: NoiseChannelIdentity, validator: V, -) where +) -> RendezvousDisconnectReason +where S: AsyncRead + AsyncWrite + Unpin + Send + 'static, V: HarnessKeyValidator + Clone + 'static, { @@ -461,6 +486,7 @@ pub(crate) async fn run_multiplexed_environment( mpsc::channel::>(CHANNEL_CAPACITY); let (closed_stream_tx, mut closed_stream_rx) = mpsc::channel::(MAX_ACTIVE_NOISE_RELAY_STREAMS); + let (pong_tx, mut pong_rx) = mpsc::channel(1); // Use a separate writer so this loop never waits on the channel it drains. let mut physical_writer_task = tokio::spawn(async move { let mut keepalive = tokio::time::interval_at( @@ -468,19 +494,61 @@ pub(crate) async fn run_multiplexed_environment( WEBSOCKET_KEEPALIVE_INTERVAL, ); keepalive.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let mut pong_watchdog = WebSocketPongWatchdog::new(WEBSOCKET_PONG_TIMEOUT); + let pong_deadline = tokio::time::sleep(WEBSOCKET_PONG_TIMEOUT); + tokio::pin!(pong_deadline); loop { let message = tokio::select! { + pong = pong_rx.recv() => { + let Some(()) = pong else { + break RendezvousDisconnectReason::LocalShutdown; + }; + pong_watchdog.received_pong(); + continue; + } + _ = &mut pong_deadline, if pong_watchdog.deadline().is_some() => { + match pong_rx.try_recv() { + Ok(()) => { + pong_watchdog.received_pong(); + continue; + } + Err(tokio::sync::mpsc::error::TryRecvError::Empty) => { + break RendezvousDisconnectReason::PongTimeout; + } + Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => { + break RendezvousDisconnectReason::LocalShutdown; + } + } + } + _ = keepalive.tick(), if pong_watchdog.deadline().is_none() => { + Message::Ping(Vec::new().into()) + } encoded = physical_outgoing_rx.recv() => { let Some(encoded) = encoded else { - break; + break RendezvousDisconnectReason::LocalShutdown; }; Message::Binary(encoded.into()) } - _ = keepalive.tick() => Message::Ping(Vec::new().into()), }; - if let Err(error) = websocket_sink.send(message).await { - warn!("Noise multiplexed environment websocket write failed: {error}"); - break; + let is_keepalive_ping = matches!(message, Message::Ping(_)); + let write_deadline = pong_watchdog.write_deadline(tokio::time::Instant::now()); + match tokio::time::timeout_at(write_deadline, websocket_sink.send(message)).await { + Ok(Ok(())) => { + if is_keepalive_ping { + pong_watchdog.ping_sent(tokio::time::Instant::now()); + if let Some(deadline) = pong_watchdog.deadline() { + pong_deadline.as_mut().reset(deadline); + } + } + } + Ok(Err(error)) => { + warn!("Noise multiplexed environment websocket write failed: {error}"); + break RendezvousDisconnectReason::WriteError; + } + Err(_) => { + warn!("Noise multiplexed environment websocket write timed out"); + break RendezvousDisconnectReason::WriteError; + } } } }); @@ -489,13 +557,18 @@ pub(crate) async fn run_multiplexed_environment( let mut validation_tasks: JoinSet = JoinSet::new(); let mut failed_handshakes = 0usize; let mut next_validation_id = 0u64; + let mut disconnect_reason = RendezvousDisconnectReason::LocalShutdown; loop { // Registry calls run separately so a slow check does not block the relay. let frame = tokio::select! { writer_result = &mut physical_writer_task => { - if let Err(error) = writer_result { - warn!("Noise multiplexed environment websocket writer failed: {error}"); + match writer_result { + Ok(reason) => disconnect_reason = reason, + Err(error) => { + warn!("Noise multiplexed environment websocket writer failed: {error}"); + disconnect_reason = RendezvousDisconnectReason::LocalShutdown; + } } break; } @@ -621,14 +694,22 @@ pub(crate) async fn run_multiplexed_environment( continue; } }, - Some(Ok(Message::Close(_))) | None => break, - Some(Ok(Message::Ping(_) | Message::Pong(_) | Message::Frame(_))) => continue, + Some(Ok(Message::Close(_))) | None => { + disconnect_reason = RendezvousDisconnectReason::PeerClose; + break; + } + Some(Ok(Message::Pong(_))) => { + let _ = pong_tx.try_send(()); + continue; + } + Some(Ok(Message::Ping(_) | Message::Frame(_))) => continue, Some(Ok(Message::Text(_))) => { warn!("dropping non-binary Noise relay frame from harness"); continue; } Some(Err(error)) => { debug!("Noise multiplexed environment websocket read failed: {error}"); + disconnect_reason = RendezvousDisconnectReason::ReadError; break; } } @@ -802,6 +883,7 @@ pub(crate) async fn run_multiplexed_environment( physical_writer_task.abort(); let _ = physical_writer_task.await; } + disconnect_reason } /// Charge one failed authenticated-channel attempt to this physical relay. diff --git a/codex-rs/exec-server/src/relay_noise_tests.rs b/codex-rs/exec-server/src/relay_noise_tests.rs index 0020efe34766..704c48d9a1b2 100644 --- a/codex-rs/exec-server/src/relay_noise_tests.rs +++ b/codex-rs/exec-server/src/relay_noise_tests.rs @@ -17,6 +17,7 @@ use tokio_tungstenite::tungstenite::Message; use super::HarnessKeyValidator; use super::MAX_FAILED_NOISE_HANDSHAKES; use super::MAX_HARNESS_KEY_AUTHORIZATION_BYTES; +use super::RendezvousDisconnectReason; use super::run_multiplexed_environment; use crate::ExecServerError; use crate::ExecServerRuntimePaths; @@ -33,6 +34,86 @@ use crate::server::ConnectionProcessor; const ENVIRONMENT_ID: &str = "environment-1"; const EXECUTOR_REGISTRATION_ID: &str = "registration-1"; +#[tokio::test] +async fn missing_pong_disconnects_physical_relay() -> Result<()> { + let listener = TcpListener::bind("127.0.0.1:0").await?; + let websocket_url = format!("ws://{}", listener.local_addr()?); + let harness_connection = tokio::spawn(connect_async(websocket_url)); + let (socket, _peer_addr) = listener.accept().await?; + let environment_websocket = accept_async(socket).await?; + let (_harness_websocket, _response) = harness_connection.await??; + + let environment_task = tokio::spawn(run_multiplexed_environment( + environment_websocket, + ConnectionProcessor::new(ExecServerRuntimePaths::new( + std::env::current_exe()?, + /*codex_linux_sandbox_exe*/ None, + )?), + ENVIRONMENT_ID.to_string(), + EXECUTOR_REGISTRATION_ID.to_string(), + NoiseChannelIdentity::generate()?, + BlockingValidator { + calls: Arc::new(AtomicUsize::new(0)), + release: Arc::new(Notify::new()), + }, + )); + + assert_eq!( + timeout(Duration::from_secs(1), environment_task).await??, + RendezvousDisconnectReason::PongTimeout + ); + Ok(()) +} + +#[tokio::test] +async fn pong_keeps_physical_relay_connected() -> Result<()> { + let listener = TcpListener::bind("127.0.0.1:0").await?; + let websocket_url = format!("ws://{}", listener.local_addr()?); + let harness_connection = tokio::spawn(connect_async(websocket_url)); + let (socket, _peer_addr) = listener.accept().await?; + let environment_websocket = accept_async(socket).await?; + let (mut harness_websocket, _response) = harness_connection.await??; + + let environment_task = tokio::spawn(run_multiplexed_environment( + environment_websocket, + ConnectionProcessor::new(ExecServerRuntimePaths::new( + std::env::current_exe()?, + /*codex_linux_sandbox_exe*/ None, + )?), + ENVIRONMENT_ID.to_string(), + EXECUTOR_REGISTRATION_ID.to_string(), + NoiseChannelIdentity::generate()?, + BlockingValidator { + calls: Arc::new(AtomicUsize::new(0)), + release: Arc::new(Notify::new()), + }, + )); + + timeout(Duration::from_secs(1), async { + let mut pings = 0; + while pings < 6 { + match harness_websocket.next().await { + Some(Ok(Message::Ping(payload))) => { + harness_websocket.send(Message::Pong(payload)).await?; + pings += 1; + } + Some(Ok(Message::Pong(_) | Message::Frame(_))) => {} + Some(Ok(message)) => anyhow::bail!("expected keepalive ping, got {message:?}"), + Some(Err(error)) => return Err(error.into()), + None => anyhow::bail!("environment disconnected before six keepalive pings"), + } + } + Ok::<_, anyhow::Error>(()) + }) + .await??; + harness_websocket.close(None).await?; + assert_eq!( + timeout(Duration::from_secs(1), environment_task).await??, + RendezvousDisconnectReason::PeerClose + ); + Ok(()) +} + #[derive(Clone)] struct BlockingValidator { calls: Arc, diff --git a/codex-rs/exec-server/src/remote.rs b/codex-rs/exec-server/src/remote.rs index 58fcd8c6540c..8870e0efe98c 100644 --- a/codex-rs/exec-server/src/remote.rs +++ b/codex-rs/exec-server/src/remote.rs @@ -492,7 +492,7 @@ pub async fn run_remote_environment( noise_outcome = "ok", "Noise executor connected to rendezvous" ); - run_multiplexed_environment( + let disconnect_reason = run_multiplexed_environment( websocket, processor.clone(), response.environment_id.clone(), @@ -505,7 +505,15 @@ pub async fn run_remote_environment( }, ) .await; - config.telemetry.remote_reconnect("disconnected"); + info!( + noise_event = "rendezvous_connection", + noise_outcome = "disconnected", + noise_reason = disconnect_reason.as_str(), + "Noise executor disconnected from rendezvous" + ); + config + .telemetry + .remote_reconnect(disconnect_reason.as_str()); } Err(error) => { let registration_rejected = matches!( diff --git a/codex-rs/exec-server/src/websocket_pong_watchdog.rs b/codex-rs/exec-server/src/websocket_pong_watchdog.rs new file mode 100644 index 000000000000..b656772b3d27 --- /dev/null +++ b/codex-rs/exec-server/src/websocket_pong_watchdog.rs @@ -0,0 +1,44 @@ +use std::time::Duration; + +use tokio::time::Instant; + +#[cfg(test)] +pub(crate) const WEBSOCKET_PONG_TIMEOUT: Duration = Duration::from_millis(100); +#[cfg(not(test))] +pub(crate) const WEBSOCKET_PONG_TIMEOUT: Duration = Duration::from_secs(60); +pub(crate) const WEBSOCKET_PONG_TIMEOUT_REASON: &str = "pong_timeout"; + +/// Tracks whether a WebSocket peer has acknowledged a keepalive ping. +pub(crate) struct WebSocketPongWatchdog { + timeout: Duration, + deadline: Option, +} + +impl WebSocketPongWatchdog { + pub(crate) fn new(timeout: Duration) -> Self { + Self { + timeout, + deadline: None, + } + } + + pub(crate) fn ping_sent(&mut self, now: Instant) { + self.deadline.get_or_insert(now + self.timeout); + } + + pub(crate) fn deadline(&self) -> Option { + self.deadline + } + + pub(crate) fn write_deadline(&self, now: Instant) -> Instant { + self.deadline.unwrap_or(now + self.timeout) + } + + pub(crate) fn received_pong(&mut self) { + self.deadline = None; + } +} + +#[cfg(test)] +#[path = "websocket_pong_watchdog_tests.rs"] +mod tests; diff --git a/codex-rs/exec-server/src/websocket_pong_watchdog_tests.rs b/codex-rs/exec-server/src/websocket_pong_watchdog_tests.rs new file mode 100644 index 000000000000..2b15a0d95223 --- /dev/null +++ b/codex-rs/exec-server/src/websocket_pong_watchdog_tests.rs @@ -0,0 +1,34 @@ +use std::time::Duration; + +use pretty_assertions::assert_eq; +use tokio::time::Instant; + +use super::WebSocketPongWatchdog; + +#[test] +fn repeated_ping_does_not_extend_deadline() { + let started_at = Instant::now(); + let timeout = Duration::from_secs(2); + let mut watchdog = WebSocketPongWatchdog::new(timeout); + + watchdog.ping_sent(started_at); + watchdog.ping_sent(started_at + Duration::from_secs(1)); + assert_eq!( + watchdog.write_deadline(started_at + Duration::from_secs(1)), + started_at + timeout + ); + assert_eq!(watchdog.deadline(), Some(started_at + timeout)); +} + +#[test] +fn pong_starts_a_fresh_deadline() { + let started_at = Instant::now(); + let timeout = Duration::from_secs(2); + let mut watchdog = WebSocketPongWatchdog::new(timeout); + + watchdog.ping_sent(started_at); + watchdog.received_pong(); + assert_eq!(watchdog.deadline(), None); + watchdog.ping_sent(started_at + timeout); + assert_eq!(watchdog.deadline(), Some(started_at + timeout + timeout)); +} From 16ec4a28f6fff3dcd12f2d7a0f954026e2cf12bc Mon Sep 17 00:00:00 2001 From: Richard Lee Date: Mon, 29 Jun 2026 21:40:41 -0500 Subject: [PATCH 2/3] yield between fragmented Noise relay writes --- .../exec-server/src/noise_relay/harness.rs | 182 ++++++++--- .../src/noise_relay/harness_tests.rs | 303 ++++++++++++++++++ 2 files changed, 432 insertions(+), 53 deletions(-) diff --git a/codex-rs/exec-server/src/noise_relay/harness.rs b/codex-rs/exec-server/src/noise_relay/harness.rs index a2ca66ac2039..53a32c0ccacf 100644 --- a/codex-rs/exec-server/src/noise_relay/harness.rs +++ b/codex-rs/exec-server/src/noise_relay/harness.rs @@ -272,94 +272,148 @@ where let mut pong_watchdog = WebSocketPongWatchdog::new(WEBSOCKET_PONG_TIMEOUT); let pong_deadline = tokio::time::sleep(WEBSOCKET_PONG_TIMEOUT); tokio::pin!(pong_deadline); - let mut drain_incoming_before_timeout = false; + let mut pending_outbound: Option<(Vec, usize)> = None; + let mut force_incoming = false; let mut frames_drained_after_pong_deadline = 0usize; 'relay: loop { + if pong_watchdog.deadline().is_none() + && keepalive.tick().now_or_never().is_some() + { + if let Err(error) = send_keepalive_ping( + &mut websocket, + &mut pong_watchdog, + pong_deadline.as_mut(), + ) + .await + { + warn!("failed to write Noise relay keepalive ping: {error}"); + break; + } + frames_drained_after_pong_deadline = 0; + continue; + } + + let pong_deadline_expired = pong_watchdog + .deadline() + .is_some_and(|deadline| tokio::time::Instant::now() >= deadline); + if pong_deadline_expired && !force_incoming { + if frames_drained_after_pong_deadline + < MAX_FRAMES_DRAINED_AFTER_PONG_DEADLINE + && std::pin::Pin::new(&mut websocket) + .peek() + .now_or_never() + .is_some() + { + force_incoming = true; + } else { + warn!( + noise_reason = WEBSOCKET_PONG_TIMEOUT_REASON, + "Noise harness rendezvous websocket disconnected" + ); + send_disconnected( + &incoming_tx, + &disconnected_tx, + WEBSOCKET_PONG_TIMEOUT_REASON.to_string(), + ); + return; + } + } + + // A queued Pong must be observed before another fragment is written. + if !force_incoming + && pong_watchdog.deadline().is_some() + && pending_outbound.is_some() + && std::pin::Pin::new(&mut websocket) + .peek() + .now_or_never() + .is_some() + { + force_incoming = true; + } + tokio::select! { - maybe_message = outgoing_rx.recv(), if !drain_incoming_before_timeout => { + maybe_message = outgoing_rx.recv(), if pending_outbound.is_none() && !force_incoming && !pong_deadline_expired => { let Some(message) = maybe_message else { break; }; - let framed = match frame_jsonrpc_message(&message) { - Ok(framed) => framed, + pending_outbound = Some(match frame_jsonrpc_message(&message) { + Ok(framed) => (framed, 0), Err(error) => { warn!("failed to frame JSON-RPC payload for Noise relay: {error}"); break; } + }); + } + _ = std::future::ready(()), if pending_outbound.is_some() && !force_incoming && !pong_deadline_expired => { + let seq = match take_next_sequence(&mut next_outbound_seq) { + Ok(seq) => seq, + Err(error) => { + warn!("Noise relay sequence exhausted: {error}"); + break 'relay; + } }; - for plaintext_record in framed.chunks(NOISE_RECORD_PLAINTEXT_LEN) { - let seq = match take_next_sequence(&mut next_outbound_seq) { - Ok(seq) => seq, - Err(error) => { - warn!("Noise relay sequence exhausted: {error}"); - break 'relay; - } + let (ciphertext, next_offset, message_complete) = { + let Some((framed, offset)) = pending_outbound.as_ref() else { + continue; }; - let ciphertext = match transport.encrypt(plaintext_record) { + let next_offset = (*offset + NOISE_RECORD_PLAINTEXT_LEN).min(framed.len()); + let ciphertext = match transport.encrypt(&framed[*offset..next_offset]) { Ok(ciphertext) => ciphertext, Err(error) => { warn!("failed to encrypt JSON-RPC payload for Noise relay: {error}"); break 'relay; } }; - let frame = RelayMessageFrame::data(stream_id.clone(), seq, ciphertext); - if let Err(error) = send_websocket_message( - &mut websocket, - Message::Binary(encode_relay_message_frame(&frame).into()), - pong_watchdog.write_deadline(tokio::time::Instant::now()), - ) - .await - { - warn!("failed to write Noise relay websocket: {error}"); - break 'relay; - } - } - } - _ = &mut pong_deadline, if pong_watchdog.deadline().is_some() && !drain_incoming_before_timeout => { - if frames_drained_after_pong_deadline - < MAX_FRAMES_DRAINED_AFTER_PONG_DEADLINE - && std::pin::Pin::new(&mut websocket) - .peek() - .now_or_never() - .is_some() + (ciphertext, next_offset, next_offset == framed.len()) + }; + let frame = RelayMessageFrame::data(stream_id.clone(), seq, ciphertext); + // A Pong can arrive after the readiness check while this write owns the + // combined sink and stream. A single bounded record can therefore hit the + // deadline and disconnect with that Pong queued. Treat that as write + // backpressure; this loop yields only between records. + if let Err(error) = send_websocket_message( + &mut websocket, + Message::Binary(encode_relay_message_frame(&frame).into()), + pong_watchdog.write_deadline(tokio::time::Instant::now()), + ) + .await { - frames_drained_after_pong_deadline += 1; - drain_incoming_before_timeout = true; - continue; + warn!("failed to write Noise relay websocket: {error}"); + break 'relay; } - warn!( - noise_reason = WEBSOCKET_PONG_TIMEOUT_REASON, - "Noise harness rendezvous websocket disconnected" - ); - send_disconnected( - &incoming_tx, - &disconnected_tx, - WEBSOCKET_PONG_TIMEOUT_REASON.to_string(), - ); - return; + if message_complete { + pending_outbound = None; + } else if let Some((_framed, offset)) = pending_outbound.as_mut() { + *offset = next_offset; + } + } + _ = &mut pong_deadline, if pong_watchdog.deadline().is_some() && !force_incoming => { + continue; } _ = keepalive.tick(), if pong_watchdog.deadline().is_none() => { - if let Err(error) = send_websocket_message( + if let Err(error) = send_keepalive_ping( &mut websocket, - Message::Ping(Vec::new().into()), - pong_watchdog.write_deadline(tokio::time::Instant::now()), + &mut pong_watchdog, + pong_deadline.as_mut(), ) .await { warn!("failed to write Noise relay keepalive ping: {error}"); break; } - pong_watchdog.ping_sent(tokio::time::Instant::now()); frames_drained_after_pong_deadline = 0; - if let Some(deadline) = pong_watchdog.deadline() { - pong_deadline.as_mut().reset(deadline); - } } incoming_message = websocket.next() => { - drain_incoming_before_timeout = false; + force_incoming = false; let Some(incoming_message) = incoming_message else { break; }; + if pong_watchdog + .deadline() + .is_some_and(|deadline| tokio::time::Instant::now() >= deadline) + { + frames_drained_after_pong_deadline += 1; + } match incoming_message { Ok(Message::Binary(payload)) => { let frame = match decode_relay_message_frame(payload.as_ref()) { @@ -472,6 +526,28 @@ where } } +async fn send_keepalive_ping( + websocket: &mut T, + pong_watchdog: &mut WebSocketPongWatchdog, + pong_deadline: std::pin::Pin<&mut tokio::time::Sleep>, +) -> Result<(), String> +where + T: Sink + Unpin, + E: std::fmt::Display, +{ + send_websocket_message( + websocket, + Message::Ping(Vec::new().into()), + pong_watchdog.write_deadline(tokio::time::Instant::now()), + ) + .await?; + pong_watchdog.ping_sent(tokio::time::Instant::now()); + if let Some(deadline) = pong_watchdog.deadline() { + pong_deadline.reset(deadline); + } + Ok(()) +} + /// Order and decrypt one relay frame, then emit any complete JSON-RPC messages. /// Relay records and JSON-RPC messages do not share boundaries, so reassembly /// happens after decryption. diff --git a/codex-rs/exec-server/src/noise_relay/harness_tests.rs b/codex-rs/exec-server/src/noise_relay/harness_tests.rs index bfd0562b858d..ce26a8ca8c09 100644 --- a/codex-rs/exec-server/src/noise_relay/harness_tests.rs +++ b/codex-rs/exec-server/src/noise_relay/harness_tests.rs @@ -1,9 +1,20 @@ +use std::pin::Pin; +use std::sync::Arc; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::task::Context as TaskContext; +use std::task::Poll; use std::time::Duration; use anyhow::Context; use anyhow::Result; +use codex_exec_server_protocol::JSONRPCMessage; +use codex_exec_server_protocol::JSONRPCRequest; +use codex_exec_server_protocol::RequestId; +use futures::Sink; use futures::SinkExt; use futures::StreamExt; +use futures::channel::mpsc as futures_mpsc; use pretty_assertions::assert_eq; use tokio::net::TcpListener; use tokio::sync::mpsc; @@ -20,6 +31,94 @@ use crate::noise_channel::PendingResponderHandshake; const ENVIRONMENT_ID: &str = "environment-1"; const EXECUTOR_REGISTRATION_ID: &str = "registration-1"; +#[tokio::test] +async fn fragmented_writes_yield_to_keepalive_and_queued_pong() -> Result<()> { + let (connection, mut control, mut outbound_rx) = connected_controlled_harness().await?; + + connection + .outgoing_tx + .send(JSONRPCMessage::Request(JSONRPCRequest { + id: RequestId::Integer(1), + method: "large".to_string(), + params: Some(serde_json::json!({ + "payload": "x".repeat(NOISE_RECORD_PLAINTEXT_LEN * 3), + })), + trace: None, + })) + .await?; + + control.wait_for_blocked_write(/*expected*/ 1).await?; + tokio::time::sleep(WEBSOCKET_KEEPALIVE_INTERVAL + Duration::from_millis(10)).await; + control.grant_writes(/*count*/ 1); + let first_data = read_outbound_data(&mut outbound_rx).await?; + assert_eq!(first_data.seq, 0); + + control.wait_for_blocked_write(/*expected*/ 2).await?; + control.grant_writes(/*count*/ 1); + let Message::Ping(ping_payload) = timeout(Duration::from_secs(1), outbound_rx.next()) + .await? + .context("harness closed before sending keepalive")? + else { + anyhow::bail!("expected keepalive between fragmented writes"); + }; + + control.wait_for_blocked_write(/*expected*/ 3).await?; + control.send_inbound(Message::Pong(ping_payload))?; + tokio::time::sleep(WEBSOCKET_KEEPALIVE_INTERVAL + Duration::from_millis(10)).await; + control.grant_writes(/*count*/ 1); + let second_data = read_outbound_data(&mut outbound_rx).await?; + assert_eq!(second_data.seq, 1); + + control.wait_for_blocked_write(/*expected*/ 4).await?; + control.grant_writes(/*count*/ 1); + let next_message = timeout(Duration::from_secs(1), outbound_rx.next()) + .await? + .context("harness closed after receiving queued Pong")?; + assert!(matches!(next_message, Message::Ping(_))); + + for task in &connection.task_handles { + task.abort(); + } + Ok(()) +} + +#[tokio::test(flavor = "current_thread")] +async fn post_deadline_drain_stops_before_frame_33() -> Result<()> { + let (mut connection, mut control, mut outbound_rx) = connected_controlled_harness().await?; + + control.wait_for_blocked_write(/*expected*/ 1).await?; + control.grant_writes(/*count*/ 1); + let Message::Ping(ping_payload) = timeout(Duration::from_secs(1), outbound_rx.next()) + .await? + .context("harness closed before sending keepalive")? + else { + anyhow::bail!("expected keepalive ping"); + }; + let reads_before_deadline = control.inbound_reads(); + + let unrelated_frame = + encode_relay_message_frame(&RelayMessageFrame::resume("unrelated-stream".to_string())); + for _ in 0..MAX_FRAMES_DRAINED_AFTER_PONG_DEADLINE { + control.send_inbound(Message::Binary(unrelated_frame.clone().into()))?; + } + control.send_inbound(Message::Pong(ping_payload))?; + + // Keep the current-thread runtime from consuming the queued frames until the + // Pong deadline and every frame are ready together. + std::thread::sleep(WEBSOCKET_PONG_TIMEOUT + Duration::from_millis(10)); + + let event = timeout(Duration::from_secs(1), connection.incoming_rx.recv()).await?; + let Some(JsonRpcConnectionEvent::Disconnected { reason }) = event else { + anyhow::bail!("expected Pong timeout, got {event:?}"); + }; + assert_eq!(reason.as_deref(), Some(WEBSOCKET_PONG_TIMEOUT_REASON)); + assert_eq!( + control.inbound_reads() - reads_before_deadline, + MAX_FRAMES_DRAINED_AFTER_PONG_DEADLINE + ); + Ok(()) +} + #[tokio::test] async fn pong_keeps_harness_alive_until_peer_stops_responding() -> Result<()> { let listener = TcpListener::bind("127.0.0.1:0").await?; @@ -137,3 +236,207 @@ async fn application_event_delivery_is_bounded() -> Result<()> { assert!(matches!(result, Err(ExecServerError::Closed))); Ok(()) } + +async fn read_outbound_data( + outbound_rx: &mut futures_mpsc::UnboundedReceiver, +) -> Result { + let Message::Binary(payload) = timeout(Duration::from_secs(1), outbound_rx.next()) + .await? + .context("harness closed before sending data")? + else { + anyhow::bail!("expected relay data frame"); + }; + let frame = decode_relay_message_frame(payload.as_ref())?; + assert_eq!(frame.validate()?, RelayFrameBodyKind::Data); + frame.into_data().map_err(anyhow::Error::from) +} + +async fn connected_controlled_harness() -> Result<( + JsonRpcConnection, + ControlledWebSocketHandle, + futures_mpsc::UnboundedReceiver, +)> { + let (websocket, control, mut outbound_rx) = ControlledWebSocket::new(/*write_permits*/ 2); + let executor_identity = NoiseChannelIdentity::generate()?; + let connection = noise_harness_connection_from_websocket( + websocket, + NoiseHarnessConnectionArgs { + connection_label: "test rendezvous".to_string(), + environment_id: ENVIRONMENT_ID.to_string(), + executor_registration_id: EXECUTOR_REGISTRATION_ID.to_string(), + identity: NoiseChannelIdentity::generate()?, + responder_public_key: executor_identity.public_key(), + harness_key_authorization: "authorization".to_string(), + }, + ); + + let Message::Binary(resume_payload) = timeout(Duration::from_secs(1), outbound_rx.next()) + .await? + .context("harness closed before sending resume")? + else { + anyhow::bail!("expected resume frame"); + }; + let resume = decode_relay_message_frame(resume_payload.as_ref())?; + let Message::Binary(handshake_payload) = timeout(Duration::from_secs(1), outbound_rx.next()) + .await? + .context("harness closed before sending handshake")? + else { + anyhow::bail!("expected handshake frame"); + }; + let handshake = decode_relay_message_frame(handshake_payload.as_ref())?; + let stream_id = handshake.stream_id.clone(); + assert_eq!(stream_id, resume.stream_id); + let prologue = + noise_channel_prologue(ENVIRONMENT_ID, EXECUTOR_REGISTRATION_ID, stream_id.as_str()); + let pending = PendingResponderHandshake::read_request( + &executor_identity, + &prologue, + &handshake.into_handshake_payload()?, + )?; + let (_transport, response) = pending.complete()?; + control.send_inbound(Message::Binary( + encode_relay_message_frame(&RelayMessageFrame::handshake(stream_id, response)).into(), + ))?; + Ok((connection, control, outbound_rx)) +} + +struct ControlledWebSocket { + inbound_rx: futures_mpsc::UnboundedReceiver>, + outbound_tx: futures_mpsc::UnboundedSender, + write_permit_rx: futures_mpsc::UnboundedReceiver<()>, + blocked_write_tx: futures_mpsc::UnboundedSender, + write_waiting: bool, + blocked_writes: usize, + inbound_reads: Arc, +} + +struct ControlledWebSocketHandle { + inbound_tx: futures_mpsc::UnboundedSender>, + write_permit_tx: futures_mpsc::UnboundedSender<()>, + blocked_write_rx: futures_mpsc::UnboundedReceiver, + inbound_reads: Arc, +} + +impl ControlledWebSocket { + fn new( + write_permits: usize, + ) -> ( + Self, + ControlledWebSocketHandle, + futures_mpsc::UnboundedReceiver, + ) { + let (inbound_tx, inbound_rx) = futures_mpsc::unbounded(); + let (outbound_tx, outbound_rx) = futures_mpsc::unbounded(); + let (write_permit_tx, write_permit_rx) = futures_mpsc::unbounded(); + let (blocked_write_tx, blocked_write_rx) = futures_mpsc::unbounded(); + for _ in 0..write_permits { + write_permit_tx + .unbounded_send(()) + .expect("test write permit receiver should stay open"); + } + let inbound_reads = Arc::new(AtomicUsize::new(0)); + ( + Self { + inbound_rx, + outbound_tx, + write_permit_rx, + blocked_write_tx, + write_waiting: false, + blocked_writes: 0, + inbound_reads: Arc::clone(&inbound_reads), + }, + ControlledWebSocketHandle { + inbound_tx, + write_permit_tx, + blocked_write_rx, + inbound_reads, + }, + outbound_rx, + ) + } +} + +impl ControlledWebSocketHandle { + fn send_inbound(&self, message: Message) -> Result<()> { + self.inbound_tx + .unbounded_send(Ok(message)) + .map_err(anyhow::Error::from) + } + + fn grant_writes(&self, count: usize) { + for _ in 0..count { + self.write_permit_tx + .unbounded_send(()) + .expect("test write permit receiver should stay open"); + } + } + + fn inbound_reads(&self) -> usize { + self.inbound_reads.load(Ordering::Acquire) + } + + async fn wait_for_blocked_write(&mut self, expected: usize) -> Result<()> { + let actual = timeout(Duration::from_secs(1), self.blocked_write_rx.next()) + .await? + .context("websocket closed before blocking the expected write")?; + assert_eq!(actual, expected); + Ok(()) + } +} + +impl Sink for ControlledWebSocket { + type Error = std::convert::Infallible; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll> { + let this = self.get_mut(); + match Pin::new(&mut this.write_permit_rx).poll_next(cx) { + Poll::Ready(Some(())) => { + this.write_waiting = false; + Poll::Ready(Ok(())) + } + Poll::Ready(None) | Poll::Pending => { + if !this.write_waiting { + this.write_waiting = true; + this.blocked_writes += 1; + this.blocked_write_tx + .unbounded_send(this.blocked_writes) + .expect("test blocked-write receiver should stay open"); + } + Poll::Pending + } + } + } + + fn start_send(self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> { + self.outbound_tx + .unbounded_send(item) + .expect("test outbound receiver should stay open"); + Ok(()) + } + + fn poll_flush( + self: Pin<&mut Self>, + _cx: &mut TaskContext<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_close( + self: Pin<&mut Self>, + _cx: &mut TaskContext<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } +} + +impl futures::Stream for ControlledWebSocket { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll> { + let result = Pin::new(&mut self.inbound_rx).poll_next(cx); + if matches!(result, Poll::Ready(Some(_))) { + self.inbound_reads.fetch_add(1, Ordering::Release); + } + result + } +} From 8de3682acaac6f1d19ff23cc2a48b4a20efcb81a Mon Sep 17 00:00:00 2001 From: Richard Lee Date: Mon, 29 Jun 2026 21:46:26 -0500 Subject: [PATCH 3/3] document Noise relay scheduling invariants --- codex-rs/exec-server/src/noise_relay/harness.rs | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/codex-rs/exec-server/src/noise_relay/harness.rs b/codex-rs/exec-server/src/noise_relay/harness.rs index 53a32c0ccacf..9969a3e13000 100644 --- a/codex-rs/exec-server/src/noise_relay/harness.rs +++ b/codex-rs/exec-server/src/noise_relay/harness.rs @@ -272,10 +272,15 @@ where let mut pong_watchdog = WebSocketPongWatchdog::new(WEBSOCKET_PONG_TIMEOUT); let pong_deadline = tokio::time::sleep(WEBSOCKET_PONG_TIMEOUT); tokio::pin!(pong_deadline); + // Keep one framed message as a cursor. Sending one Noise record per loop + // creates a scheduling point for keepalive and inbound control frames + // without splitting the WebSocket reader and writer. let mut pending_outbound: Option<(Vec, usize)> = None; let mut force_incoming = false; let mut frames_drained_after_pong_deadline = 0usize; 'relay: loop { + // Consume a due tick before the always-ready record arm below can win + // another select iteration and postpone the keepalive. if pong_watchdog.deadline().is_none() && keepalive.tick().now_or_never().is_some() { @@ -296,6 +301,8 @@ where let pong_deadline_expired = pong_watchdog .deadline() .is_some_and(|deadline| tokio::time::Instant::now() >= deadline); + // After expiry, inspect only frames already queued. Forcing the peeked + // item through next() makes the 32-frame grace deterministic. if pong_deadline_expired && !force_incoming { if frames_drained_after_pong_deadline < MAX_FRAMES_DRAINED_AFTER_PONG_DEADLINE @@ -319,7 +326,8 @@ where } } - // A queued Pong must be observed before another fragment is written. + // While a Pong is outstanding, drain already-queued inbound traffic + // before the next fragment so a queued Pong cannot sit behind writes. if !force_incoming && pong_watchdog.deadline().is_some() && pending_outbound.is_some() @@ -408,6 +416,9 @@ where let Some(incoming_message) = incoming_message else { break; }; + // Count each completed read after expiry. If only the deadline arm + // advanced this counter, reads won by a simultaneously ready incoming + // arm would not count toward the 32-frame cap. if pong_watchdog .deadline() .is_some_and(|deadline| tokio::time::Instant::now() >= deadline) @@ -541,6 +552,8 @@ where pong_watchdog.write_deadline(tokio::time::Instant::now()), ) .await?; + // Start the response clock after the Ping flushes; waiting for sink capacity + // is governed by the write deadline above. pong_watchdog.ping_sent(tokio::time::Instant::now()); if let Some(deadline) = pong_watchdog.deadline() { pong_deadline.reset(deadline);