diff --git a/crates/core/src/host/scheduler.rs b/crates/core/src/host/scheduler.rs index 295574606e2..8dba5287aca 100644 --- a/crates/core/src/host/scheduler.rs +++ b/crates/core/src/host/scheduler.rs @@ -63,6 +63,14 @@ enum SchedulerMessage { function_name: String, args: FunctionArgs, }, + /// Re-enqueue an interval procedure after its spawned run finished. The spawned task + /// can't touch the actor's queue, so it sends the reschedule back through this message. + Reschedule { + id: ScheduledFunctionId, + function_name: Arc, + at: Timestamp, + real_at: Instant, + }, } #[derive(Clone)] @@ -72,13 +80,15 @@ pub struct Scheduler { pub struct SchedulerStarter { rx: mpsc::UnboundedReceiver>, + /// Sender clone for the [`SchedulerActor`], so spawned procedure tasks can reschedule. + tx: mpsc::UnboundedSender>, db: Arc, } impl Scheduler { pub fn open(db: Arc) -> (Self, SchedulerStarter) { let (tx, rx) = mpsc::unbounded_channel(); - (Scheduler { tx }, SchedulerStarter { rx, db }) + (Scheduler { tx: tx.clone() }, SchedulerStarter { rx, tx, db }) } } @@ -149,6 +159,7 @@ impl SchedulerStarter { tokio::spawn( SchedulerActor { rx: self.rx, + tx: self.tx, queue, key_map, module_host: module_host.downgrade(), @@ -271,6 +282,8 @@ impl Scheduler { struct SchedulerActor { rx: mpsc::UnboundedReceiver>, + /// Used by spawned procedure tasks to re-enqueue interval reschedules. + tx: mpsc::UnboundedSender>, queue: DelayQueue, key_map: FxHashMap, module_host: WeakModuleHost, @@ -348,19 +361,7 @@ impl SchedulerActor { effective_at, real_at, } => { - // Incase of row update, remove the existing entry from queue first - if let Some(key) = self.key_map.get(&id) { - self.queue.remove(key); - } - let key = self.queue.insert_at( - QueueItem::Id { - id, - function_name, - at: effective_at, - }, - real_at, - ); - self.key_map.insert(id, key); + self.enqueue_scheduled(id, function_name, effective_at, real_at); } SchedulerMessage::ScheduleImmediate { function_name, args } => { self.queue.insert( @@ -368,7 +369,38 @@ impl SchedulerActor { Duration::ZERO, ); } + SchedulerMessage::Reschedule { + id, + function_name, + at, + real_at, + } => { + self.enqueue_scheduled(id, function_name, at, real_at); + } + } + } + + /// Insert (or replace) the queued entry for `id`, used by both `Schedule` and `Reschedule`. + fn enqueue_scheduled( + &mut self, + id: ScheduledFunctionId, + function_name: Arc, + at: Timestamp, + real_at: Instant, + ) { + // Incase of row update, remove the existing entry from queue first + if let Some(key) = self.key_map.get(&id) { + self.queue.remove(key); } + let key = self.queue.insert_at( + QueueItem::Id { + id, + function_name, + at, + }, + real_at, + ); + self.key_map.insert(id, key); } async fn handle_queued(&mut self, id: Expired) { @@ -386,26 +418,65 @@ impl SchedulerActor { }; let params = ScheduledFunctionParams(item.clone()); - let result = match params.kind(module_host.info()) { + match params.kind(module_host.info()) { + // Procedures can run for a long time (e.g. `sleep_until`) and execute on their own + // instance pool, so awaiting them inline here would starve every other due function. + // Spawn instead and keep draining the queue; interval reschedules come back via + // `SchedulerMessage::Reschedule`. (Scheduled procedures thus run concurrently rather + // than one-at-a-time; the datastore's serializable isolation still applies.) ScheduledFunctionKind::Procedure => { - panic::AssertUnwindSafe(module_host.call_scheduled_procedure(params)) - .catch_unwind() - .await + let tx = self.tx.clone(); + tokio::spawn(async move { + // Catch panics so a faulty procedure logs instead of taking down the task. + let result = panic::AssertUnwindSafe(module_host.call_scheduled_procedure(params)) + .catch_unwind() + .await; + let result = match result { + Ok(result) => result, + Err(_) => { + log::warn!("scheduled function panicked"); + return; + } + }; + if let Ok(CallScheduledFunctionResult { + reschedule: Some(Reschedule { at_ts, at_real }), + }) = result + { + if let QueueItem::Id { id, function_name, .. } = item { + let _ = tx.send(MsgOrExit::Msg(SchedulerMessage::Reschedule { + id, + function_name, + at: at_ts, + real_at: at_real, + })); + } + } + }); } + // Reducers can't yield and run on the main executor, so await inline to keep order. ScheduledFunctionKind::Reducer => { - panic::AssertUnwindSafe(module_host.call_scheduled_reducer(params)) + let result = panic::AssertUnwindSafe(module_host.call_scheduled_reducer(params)) .catch_unwind() - .await - } - }; - let result = match result { - Ok(result) => result, - Err(_) => { - log::warn!("scheduled function panicked"); - return; + .await; + let result = match result { + Ok(result) => result, + Err(_) => { + log::warn!("scheduled function panicked"); + return; + } + }; + self.handle_result(item, result); } - }; + } + } + /// Handle the result of an inline (reducer) scheduled call, re-enqueueing it if it + /// is an interval schedule. + fn handle_result( + &mut self, + item: QueueItem, + result: Result, + ) { match result { // If the module already exited, leave the `ScheduledFunction` in // the database for when the module restarts. diff --git a/sdks/rust/tests/procedure-concurrency-client/src/test_handlers.rs b/sdks/rust/tests/procedure-concurrency-client/src/test_handlers.rs index fa67bc11692..5dcd6357ff4 100644 --- a/sdks/rust/tests/procedure-concurrency-client/src/test_handlers.rs +++ b/sdks/rust/tests/procedure-concurrency-client/src/test_handlers.rs @@ -13,8 +13,8 @@ pub async fn dispatch(test: &str, db_name: &str) { "procedure-concurrent-with-scheduled-reducer" => { exec_procedure_concurrent_with_scheduled_reducer(db_name).await } - "scheduled-procedure-scheduled-reducer-not-interleaved" => { - exec_scheduled_procedure_scheduled_reducer_not_interleaved(db_name).await + "scheduled-procedure-scheduled-reducer-interleaves" => { + exec_scheduled_procedure_scheduled_reducer_interleaves(db_name).await } _ => panic!("Unknown test: {test}"), } @@ -468,9 +468,15 @@ async fn exec_procedure_concurrent_with_scheduled_reducer(db_name: &str) { test_counter.wait_for_all().await; } -/// Like [`exec_procedure_reducer_same_client_not_interleaved`], but with the scheduler instead of a client. -/// Tracks a behavior that we'd like to change. -async fn exec_scheduled_procedure_scheduled_reducer_not_interleaved(db_name: &str) { +/// Like [`exec_procedure_concurrent_with_scheduled_reducer`], but the procedure is itself +/// scheduled rather than called by a client. +/// +/// A long-running scheduled procedure (which sleeps between two inserts) must NOT starve a +/// scheduled reducer whose deadline falls during the procedure's sleep: the reducer should +/// run in between, giving insertion order `scheduled_procedure_before < scheduled_reducer < +/// scheduled_procedure_after`. (This used to be serialized — the procedure ran to completion +/// before the reducer — until the scheduler began dispatching procedures concurrently.) +async fn exec_scheduled_procedure_scheduled_reducer_interleaves(db_name: &str) { let test_counter = TestCounter::new(); let sub_applied_nothing_result = test_counter.add_test("on_subscription_applied_nothing"); let mut reducer_callback_result = Some(test_counter.add_test("schedule_procedure_then_reducer_callback")); @@ -526,8 +532,8 @@ async fn exec_scheduled_procedure_scheduled_reducer_not_interleaved(db_name: &st #[allow(clippy::redundant_closure_call)] (|| { anyhow::ensure!( - before < after && after < scheduled_reducer, - "Expected scheduled procedure insertion order scheduled_procedure_before < scheduled_procedure_after < scheduled_reducer, got {before} < {after} < {scheduled_reducer}" + before < scheduled_reducer && scheduled_reducer < after, + "Expected scheduled procedure/reducer to interleave: scheduled_procedure_before < scheduled_reducer < scheduled_procedure_after, got {before} < {scheduled_reducer} < {after}" ); Ok(()) })(), diff --git a/sdks/rust/tests/test.rs b/sdks/rust/tests/test.rs index 0714033956f..d2c9d2aa2d1 100644 --- a/sdks/rust/tests/test.rs +++ b/sdks/rust/tests/test.rs @@ -544,14 +544,13 @@ mod rust_procedure_concurrency { make_test("procedure-concurrent-with-scheduled-reducer").run() } - /// Test that the scheduler has only a single active execution slot, - /// which can be occupied by a long-running or suspended procedure. - /// - /// We're not attached to this behavior, and in fact it should be changed. - /// At that time, this test should be altered to demonstrate that the execution is interleaved. + /// A long-running scheduled procedure must not starve a scheduled reducer that comes + /// due during the procedure's sleep: see issue + /// https://github.com/clockworklabs/SpacetimeDB/issues/4954 . The reducer should + /// interleave (`before < scheduled_reducer < after`). #[test] - fn scheduled_procedure_scheduled_reducer_not_interleaved() { - make_test("scheduled-procedure-scheduled-reducer-not-interleaved").run() + fn scheduled_procedure_scheduled_reducer_interleaves() { + make_test("scheduled-procedure-scheduled-reducer-interleaves").run() } }