diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index cf096839d47..3be12f26cc4 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -31,6 +31,7 @@ use crate::host::{ModuleHost, ReducerCallError, ReducerCallResult, Scheduler}; use crate::module_host_context::ModuleCreationContext; use crate::replica_context::ReplicaContext; use crate::subscription::module_subscription_manager::TransactionOffset; +use crate::util::adaptive_recv::AdaptiveUnboundedReceiver; use crate::util::jobs::{AllocatedJobCore, CorePinner, LoadBalanceOnDropGuard}; use crate::worker_metrics::WORKER_METRICS; use core::any::type_name; @@ -53,8 +54,9 @@ use std::cell::Cell; use std::panic::AssertUnwindSafe; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, LazyLock}; +use std::time::Duration; use std::time::Instant; -use tokio::sync::{oneshot, Mutex as AsyncMutex, Notify}; +use tokio::sync::{mpsc, oneshot, Mutex as AsyncMutex, Notify}; use tracing::Instrument; use v8::script_compiler::{compile_module, Source}; use v8::{ @@ -114,6 +116,8 @@ static V8_RUNTIME_GLOBAL: LazyLock = LazyLock::new(V8RuntimeInne static NEXT_JS_INSTANCE_ID: AtomicU64 = AtomicU64::new(1); const REDUCER_ARGS_BUFFER_SIZE: usize = 4_096; pub(crate) const V8_WORKER_KIND_INSTANCE_LANE: &str = "instance_lane"; +const JS_REPLY_RELAY_BASELINE_LINGER: Duration = Duration::from_micros(25); +const JS_REPLY_RELAY_MAX_LINGER: Duration = Duration::from_micros(100); thread_local! { // Note, `on_module_thread` runs host closures on a single JS module thread. @@ -163,6 +167,37 @@ impl JsWorkerKind { } } +trait JsReplyTask { + fn run(self: Box); +} + +impl JsReplyTask for F +where + F: FnOnce() + Send + 'static, +{ + fn run(self: Box) { + (*self)(); + } +} + +type JsReplyTaskBox = Box; +type JsReplyRelayTx = mpsc::UnboundedSender; + +/// Spawns a worker for relaying replys to client connection handlers. +/// When the JS worker thread finishes executing a request, it pushes a +/// reply to this worker which then forwards it on to the correct +/// client connection handler. +fn spawn_js_reply_relay() -> JsReplyRelayTx { + let (tx, rx) = mpsc::unbounded_channel::(); + tokio::spawn(async move { + let mut rx = AdaptiveUnboundedReceiver::new(rx, JS_REPLY_RELAY_BASELINE_LINGER, JS_REPLY_RELAY_MAX_LINGER); + while let Some(task) = rx.recv().await { + task.run(); + } + }); + tx +} + pub(crate) fn assert_not_on_js_module_thread(label: &str) { ON_JS_MODULE_THREAD.with(|entered| { assert!( @@ -230,6 +265,7 @@ impl V8RuntimeInner { .v8_instance_lane_queue_length .with_label_values(&database_identity), ); + let lane_reply_relay = spawn_js_reply_relay(); // Validate/create the module and spawn the first instance. let mcc = Either::Right(mcc); @@ -243,6 +279,7 @@ impl V8RuntimeInner { heap_policy, JsWorkerKind::InstanceLane, lane_queue.clone(), + Some(lane_reply_relay.clone()), ) .await?; let module = JsModule { @@ -252,6 +289,7 @@ impl V8RuntimeInner { core_pinner, heap_policy, lane_queue, + lane_reply_relay, }; Ok(ModuleWithInstance::Js { module, init_inst }) @@ -266,6 +304,7 @@ pub struct JsModule { core_pinner: CorePinner, heap_policy: V8HeapPolicyConfig, lane_queue: Arc, + lane_reply_relay: JsReplyRelayTx, } impl JsModule { @@ -281,7 +320,12 @@ impl JsModule { self.common.info().clone() } - async fn create_instance_with_queue(&self, request_queue: Arc) -> JsInstance { + async fn create_instance_with_queue( + &self, + request_queue: Arc, + worker_kind: JsWorkerKind, + reply_relay: Option, + ) -> JsInstance { let program = self.program.clone(); let common = self.common.clone(); let load_balance_guard = self.load_balance_guard.clone(); @@ -295,8 +339,9 @@ impl JsModule { load_balance_guard, core_pinner, heap_policy, - JsWorkerKind::Pooled, + worker_kind, request_queue, + reply_relay, ) .await .expect("`spawn_instance_worker` should succeed when passed `ModuleCommon`"); @@ -307,11 +352,17 @@ impl JsModule { // We use a rendezvous channel for pooled instances, because they are checked // out one request at a time and subsequently returned to the pool, unlike the // long lived instance used for executing reducers. - self.create_instance_with_queue(JsWorkerQueue::bounded(0)).await + self.create_instance_with_queue(JsWorkerQueue::bounded(0), JsWorkerKind::Pooled, None) + .await } async fn create_lane_instance(&self) -> JsInstance { - self.create_instance_with_queue(self.lane_queue.clone()).await + self.create_instance_with_queue( + self.lane_queue.clone(), + JsWorkerKind::InstanceLane, + Some(self.lane_reply_relay.clone()), + ) + .await } } @@ -806,9 +857,22 @@ enum JsWorkerRequest { static_assert_size!(CallReducerParams, 192); -fn send_worker_reply(ctx: &str, reply_tx: JsReplyTx, value: T) { - if reply_tx.send(value).is_err() { - log::error!("should have receiver for `{ctx}` response"); +fn send_worker_reply(reply_relay: Option<&JsReplyRelayTx>, ctx: &'static str, reply_tx: JsReplyTx, value: T) +where + T: Send + 'static, +{ + let send_reply = move || { + if reply_tx.send(value).is_err() { + log::error!("should have receiver for `{ctx}` response"); + } + }; + + if let Some(reply_relay) = reply_relay { + if reply_relay.send(Box::new(send_reply)).is_err() { + log::error!("reply relay dropped while handling `{ctx}` response"); + } + } else { + send_reply(); } } @@ -1324,6 +1388,7 @@ async fn spawn_instance_worker( heap_policy: V8HeapPolicyConfig, worker_kind: JsWorkerKind, request_queue: Arc, + reply_relay: Option, ) -> anyhow::Result<(ModuleCommon, JsInstance)> { // This one-shot channel is used for initial startup error handling within the thread. let (result_tx, result_rx) = oneshot::channel(); @@ -1437,14 +1502,14 @@ async fn spawn_instance_worker( policy, } => { let res = instance_common.update_database(program, old_module_info, policy, &mut inst); - send_worker_reply("update_database", reply_tx, res); + send_worker_reply(reply_relay.as_ref(), "update_database", reply_tx, res); } JsWorkerRequest::CallReducer { reply_tx, params } => { let (res, trapped) = call_reducer(None, params); if trapped { worker_state_in_thread.mark_trapped(); } - send_worker_reply("call_reducer", reply_tx, res); + send_worker_reply(reply_relay.as_ref(), "call_reducer", reply_tx, res); should_exit = trapped; } JsWorkerRequest::CallView { reply_tx, cmd } => { @@ -1452,7 +1517,7 @@ async fn spawn_instance_worker( if trapped { worker_state_in_thread.mark_trapped(); } - send_worker_reply("call_view", reply_tx, res); + send_worker_reply(reply_relay.as_ref(), "call_view", reply_tx, res); should_exit = trapped; } JsWorkerRequest::CallProcedure { reply_tx, params } => { @@ -1463,12 +1528,12 @@ async fn spawn_instance_worker( if trapped { worker_state_in_thread.mark_trapped(); } - send_worker_reply("call_procedure", reply_tx, res); + send_worker_reply(reply_relay.as_ref(), "call_procedure", reply_tx, res); should_exit = trapped; } JsWorkerRequest::ClearAllClients(reply_tx) => { let res = instance_common.clear_all_clients(); - send_worker_reply("clear_all_clients", reply_tx, res); + send_worker_reply(reply_relay.as_ref(), "clear_all_clients", reply_tx, res); } JsWorkerRequest::CallIdentityConnected { reply_tx, @@ -1481,7 +1546,7 @@ async fn spawn_instance_worker( if trapped { worker_state_in_thread.mark_trapped(); } - send_worker_reply("call_identity_connected", reply_tx, res); + send_worker_reply(reply_relay.as_ref(), "call_identity_connected", reply_tx, res); should_exit = trapped; } JsWorkerRequest::CallIdentityDisconnected { @@ -1500,7 +1565,7 @@ async fn spawn_instance_worker( if trapped { worker_state_in_thread.mark_trapped(); } - send_worker_reply("call_identity_disconnected", reply_tx, res); + send_worker_reply(reply_relay.as_ref(), "call_identity_disconnected", reply_tx, res); should_exit = trapped; } JsWorkerRequest::DisconnectClient { reply_tx, client_id } => { @@ -1509,7 +1574,7 @@ async fn spawn_instance_worker( if trapped { worker_state_in_thread.mark_trapped(); } - send_worker_reply("disconnect_client", reply_tx, res); + send_worker_reply(reply_relay.as_ref(), "disconnect_client", reply_tx, res); should_exit = trapped; } JsWorkerRequest::InitDatabase { reply_tx, program } => { @@ -1518,7 +1583,7 @@ async fn spawn_instance_worker( if trapped { worker_state_in_thread.mark_trapped(); } - send_worker_reply("init_database", reply_tx, res); + send_worker_reply(reply_relay.as_ref(), "init_database", reply_tx, res); should_exit = trapped; } JsWorkerRequest::CallScheduledFunction { reply_tx, params } => { @@ -1529,7 +1594,7 @@ async fn spawn_instance_worker( if trapped { worker_state_in_thread.mark_trapped(); } - send_worker_reply("call_scheduled_function", reply_tx, res); + send_worker_reply(reply_relay.as_ref(), "call_scheduled_function", reply_tx, res); should_exit = trapped; } } diff --git a/crates/core/src/subscription/module_subscription_manager.rs b/crates/core/src/subscription/module_subscription_manager.rs index 7fbfbbd9242..4109bb1239f 100644 --- a/crates/core/src/subscription/module_subscription_manager.rs +++ b/crates/core/src/subscription/module_subscription_manager.rs @@ -10,6 +10,7 @@ use crate::host::module_host::{DatabaseTableUpdate, ModuleEvent, UpdatesRelValue use crate::subscription::delta::eval_delta; use crate::subscription::row_list_builder_pool::{BsatnRowListBuilderPool, JsonRowListBuilderFakePool}; use crate::subscription::websocket_building::{BuildableWebsocketFormat, RowListBuilderSource}; +use crate::util::adaptive_recv::AdaptiveUnboundedReceiver; use crate::worker_metrics::WORKER_METRICS; type V2EvalUpdatesResult = (Vec, Vec<(SubscriptionIdV2, Box)>, ExecutionMetrics); use core::mem; @@ -37,6 +38,7 @@ use std::collections::BTreeMap; use std::fmt::Debug; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use std::time::Duration; use tokio::sync::{mpsc, oneshot}; /// Clients are uniquely identified by their Identity and ConnectionId. @@ -1715,7 +1717,7 @@ impl SendWorkerClient { /// See comment on the `send_worker_tx` field in [`SubscriptionManager`] for motivation. struct SendWorker { /// Receiver end of the [`SubscriptionManager`]'s `send_worker_tx` channel. - rx: mpsc::UnboundedReceiver, + rx: AdaptiveUnboundedReceiver, /// `subscription_send_queue_length` metric labeled for this database's `Identity`. /// @@ -1756,6 +1758,12 @@ impl Drop for SendWorker { } impl SendWorker { + // Keep the worker warm briefly after handling a message so bursts do not + // pay a park/unpark cost on every enqueue, while still parking quickly + // once traffic goes quiet. + const BASELINE_LINGER: Duration = Duration::from_micros(25); + const MAX_LINGER: Duration = Duration::from_micros(500); + fn is_client_dropped_or_cancelled(&self, client_id: &ClientId) -> bool { self.clients .get(client_id) @@ -1814,7 +1822,7 @@ impl SendWorker { database_identity_to_clean_up_metric: Option, ) -> Self { Self { - rx, + rx: AdaptiveUnboundedReceiver::new(rx, Self::BASELINE_LINGER, Self::MAX_LINGER), queue_length_metric, clients: Default::default(), database_identity_to_clean_up_metric, diff --git a/crates/core/src/util/adaptive_recv.rs b/crates/core/src/util/adaptive_recv.rs new file mode 100644 index 00000000000..1b72adaa2ec --- /dev/null +++ b/crates/core/src/util/adaptive_recv.rs @@ -0,0 +1,150 @@ +use std::time::Duration; + +use tokio::sync::mpsc; +use tokio::time::sleep; + +/// Receives from a Tokio unbounded channel with an adaptive linger policy. +/// +/// This helper is intended for single-consumer background workers that want +/// to avoid parking on `recv()` after every message during bursty traffic. +/// +/// The receiver has two modes - hot and cold. In cold mode it blocks on +/// `recv()` until the next message arrives. In hot mode it prefers to stay +/// awake, so after receiving a message, it will drain the channel, sleep for +/// a short period (linger), and only then poll the channel again. This keeps +/// the receiver off `recv()` during the linger window, so producers can enqueue +/// more work without waking a parked task. +/// +/// The linger policy is as follows: If work is present when a linger window +/// expires, double the window up to `max_linger`. If a linger window expires +/// and the queue is still empty, reset the window to `baseline_linger`. +/// +/// Note, messages returned immediately by `try_recv()` do not count as hits, +/// and do not double the linger window. +#[derive(Debug)] +pub struct AdaptiveUnboundedReceiver { + rx: mpsc::UnboundedReceiver, + linger: AdaptiveLinger, + is_hot: bool, +} + +impl AdaptiveUnboundedReceiver { + /// Create an adaptive receiver around a Tokio unbounded channel. + /// + /// `baseline_linger` is the linger window used after a cold wakeup or any + /// linger miss. `max_linger` caps how far the linger window may grow after + /// repeated linger hits. + /// + /// This constructor does not spawn any tasks and does not alter the + /// channel's ordering semantics. It only configures how aggressively the + /// consumer stays awake after work arrives. + pub fn new(rx: mpsc::UnboundedReceiver, baseline_linger: Duration, max_linger: Duration) -> Self { + Self { + rx, + linger: AdaptiveLinger::new(baseline_linger, max_linger), + is_hot: false, + } + } + + /// Receive the next message while adapting how aggressively we linger + /// before parking again. + /// + /// Once a worker has been woken up by one message, subsequent calls try to + /// stay on the hot path: + /// + /// 1. Drain any already-queued work immediately with `try_recv()` + /// 2. If the queue is empty, sleep for the current linger window + /// 3. When the sleep fires, poll the queue again with `try_recv()` + /// 4. On a linger hit, double the window and continue lingering + /// 5. On a linger miss, reset the window to the baseline and park on `recv()` + /// + /// This trades a small amount of hot-path latency for lower wake overhead. + /// While the receiver is hot, senders enqueue into the channel without + /// waking a parked `recv()` future. + pub async fn recv(&mut self) -> Option { + loop { + if !self.is_hot { + let message = self.rx.recv().await?; + self.is_hot = true; + return Some(message); + } + + match self.rx.try_recv() { + Ok(message) => return Some(message), + Err(mpsc::error::TryRecvError::Disconnected) => return None, + Err(mpsc::error::TryRecvError::Empty) => {} + } + + let linger = self.linger.current(); + if linger.is_zero() { + self.cool_down(); + continue; + } + + sleep(linger).await; + + match self.rx.try_recv() { + Ok(message) => { + self.linger.on_hit(); + return Some(message); + } + Err(mpsc::error::TryRecvError::Disconnected) => return None, + Err(mpsc::error::TryRecvError::Empty) => { + self.cool_down(); + } + } + } + } + + /// Return the receiver to its cold state after a linger miss. + /// + /// The next call to [`Self::recv`] will block on the underlying channel + /// instead of continuing to linger, and the linger policy is reset to its + /// baseline window. + fn cool_down(&mut self) { + self.is_hot = false; + self.linger.on_miss(); + } +} + +#[derive(Debug)] +struct AdaptiveLinger { + baseline: Duration, + current: Duration, + max: Duration, +} + +impl AdaptiveLinger { + /// Create a linger policy with a baseline window and an upper bound. + /// + /// `baseline` is the window restored after any linger miss. `max` caps how + /// far the window may grow after repeated linger hits. + fn new(baseline: Duration, max: Duration) -> Self { + assert!( + baseline <= max, + "baseline linger ({baseline:?}) must not exceed max linger ({max:?})" + ); + Self { + baseline, + current: baseline, + max, + } + } + + /// Return the current linger window. + fn current(&self) -> Duration { + self.current + } + + /// Record a linger hit by growing the next linger window. + /// + /// The window doubles on each hit until it reaches `self.max`. + fn on_hit(&mut self) { + self.current = self.current.saturating_mul(2).min(self.max); + } + + /// Record a linger miss by resetting to the baseline window. + fn on_miss(&mut self) { + self.current = self.baseline; + } +} diff --git a/crates/core/src/util/mod.rs b/crates/core/src/util/mod.rs index 691a27dec98..dafa83dea44 100644 --- a/crates/core/src/util/mod.rs +++ b/crates/core/src/util/mod.rs @@ -6,6 +6,7 @@ use tracing::Span; pub mod prometheus_handle; +pub mod adaptive_recv; pub mod jobs; pub mod notify_once; pub mod thread_scheduling;