diff --git a/crates/runtime/Cargo.toml b/crates/runtime/Cargo.toml index 4cd0af60869..d17a0224797 100644 --- a/crates/runtime/Cargo.toml +++ b/crates/runtime/Cargo.toml @@ -10,7 +10,7 @@ rust-version.workspace = true workspace = true [dependencies] -tokio = { workspace = true, optional = true } +tokio.workspace = true async-task = { version = "4.4", default-features = false, optional = true } spin = { version = "0.9", default-features = false, features = ["mutex", "spin_mutex"], optional = true } libc = { version = "0.2", optional = true } @@ -20,5 +20,5 @@ futures.workspace = true [features] default = ["tokio"] -tokio = ["dep:tokio"] +tokio = [] simulation = ["dep:async-task", "dep:spin", "dep:libc"] diff --git a/crates/runtime/src/lib.rs b/crates/runtime/src/lib.rs index eaed2f35f46..8093ca61ca4 100644 --- a/crates/runtime/src/lib.rs +++ b/crates/runtime/src/lib.rs @@ -1,3 +1,11 @@ +#[cfg(all(feature = "tokio", feature = "simulation"))] +compile_error!( + "spacetimedb-runtime requires exactly one runtime backend: enable either `tokio` or `simulation`, not both" +); + +#[cfg(not(any(feature = "tokio", feature = "simulation")))] +compile_error!("spacetimedb-runtime requires exactly one runtime backend: enable either `tokio` or `simulation`"); + #[cfg(feature = "simulation")] extern crate alloc; @@ -15,8 +23,19 @@ pub mod sim; #[cfg(feature = "simulation")] pub mod sim_std; -#[cfg(feature = "tokio")] pub type TokioHandle = tokio::runtime::Handle; +pub type TokioRuntime = tokio::runtime::Runtime; +pub type TokioRuntimeBuilder = tokio::runtime::Builder; + +// We intentionally re-export `tokio::sync` even when the simulation backend is +// selected. Async and non-blocking synchronization operations are +// executor-agnostic, so driving them from the deterministic simulation runtime +// remains deterministic. +// +// Callers must avoid APIs that block or park OS threads on their own, such as +// `blocking_send`, because those semantics are outside the simulation runtime's +// deterministic scheduler. +pub use tokio::sync; #[derive(Clone)] pub enum Handle { @@ -74,6 +93,15 @@ enum JoinErrorInner { Simulation(sim::JoinError), } +#[cfg(feature = "tokio")] +impl From for AbortHandle { + fn from(handle: tokio::task::AbortHandle) -> Self { + Self { + inner: AbortHandleInner::Tokio(handle), + } + } +} + impl AbortHandle { pub fn abort(&self) { match &self.inner { @@ -81,8 +109,6 @@ impl AbortHandle { AbortHandleInner::Tokio(handle) => handle.abort(), #[cfg(feature = "simulation")] AbortHandleInner::Simulation(handle) => handle.abort(), - #[cfg(not(any(feature = "tokio", feature = "simulation")))] - _ => unreachable!("runtime abort handle has no enabled backend"), } } } @@ -100,16 +126,10 @@ impl JoinErrorInner { impl fmt::Display for JoinError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - #[cfg(not(any(feature = "tokio", feature = "simulation")))] - let _ = f; - #[cfg(any(feature = "tokio", feature = "simulation"))] - return self.inner.fmt(f); - #[cfg(not(any(feature = "tokio", feature = "simulation")))] - unreachable!("runtime join error has no enabled backend") + self.inner.fmt(f) } } -#[cfg(any(feature = "tokio", feature = "simulation"))] impl std::error::Error for JoinError {} impl JoinHandleInner { @@ -160,8 +180,6 @@ impl Future for JoinHandle { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - #[cfg(not(any(feature = "tokio", feature = "simulation")))] - let _ = cx; match self.inner.poll_result(cx) { Poll::Ready(Ok(output)) => { self.inner = JoinHandleInner::Detached(PhantomData); @@ -197,17 +215,30 @@ impl fmt::Display for RuntimeTimeout { } } -#[cfg(any(feature = "tokio", feature = "simulation"))] impl std::error::Error for RuntimeTimeout {} -#[cfg(feature = "tokio")] impl Handle { pub fn tokio(handle: TokioHandle) -> Self { - Self::Tokio(handle) + #[cfg(feature = "tokio")] + { + Self::Tokio(handle) + } + #[cfg(not(feature = "tokio"))] + { + let _ = handle; + panic!("spacetimedb-runtime tokio handle requested without the `tokio` backend enabled") + } } pub fn tokio_current() -> Self { - Self::tokio(TokioHandle::current()) + #[cfg(feature = "tokio")] + { + Self::tokio(TokioHandle::current()) + } + #[cfg(not(feature = "tokio"))] + { + panic!("spacetimedb-runtime current tokio handle requested without the `tokio` backend enabled") + } } } @@ -220,8 +251,6 @@ impl Handle { impl Handle { pub fn spawn(&self, future: impl Future + Send + 'static) -> JoinHandle { - #[cfg(not(any(feature = "tokio", feature = "simulation")))] - let _ = future; match self { #[cfg(feature = "tokio")] Self::Tokio(handle) => JoinHandle { @@ -231,8 +260,6 @@ impl Handle { Self::Simulation(handle) => JoinHandle { inner: JoinHandleInner::Simulation(handle.spawn_on(sim::NodeId::MAIN, future)), }, - #[cfg(not(any(feature = "tokio", feature = "simulation")))] - _ => unreachable!("runtime dispatch has no enabled backend"), } } @@ -241,8 +268,6 @@ impl Handle { F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - #[cfg(not(any(feature = "tokio", feature = "simulation")))] - let _ = &f; match self { #[cfg(feature = "tokio")] Self::Tokio(_) => tokio::task::spawn_blocking(f) @@ -261,8 +286,6 @@ impl Handle { .spawn_on(sim::NodeId::MAIN, async move { f() }) .await .expect("simulation spawn_blocking task should not be cancelled"), - #[cfg(not(any(feature = "tokio", feature = "simulation")))] - _ => unreachable!("runtime dispatch has no enabled backend"), } } @@ -271,8 +294,6 @@ impl Handle { timeout_after: Duration, future: impl Future, ) -> Result { - #[cfg(not(any(feature = "tokio", feature = "simulation")))] - let _ = (timeout_after, future); match self { #[cfg(feature = "tokio")] Self::Tokio(_) => tokio::time::timeout(timeout_after, future) @@ -280,8 +301,15 @@ impl Handle { .map_err(|_| RuntimeTimeout), #[cfg(feature = "simulation")] Self::Simulation(handle) => handle.timeout(timeout_after, future).await.map_err(|_| RuntimeTimeout), - #[cfg(not(any(feature = "tokio", feature = "simulation")))] - _ => unreachable!("runtime dispatch has no enabled backend"), + } + } + + pub async fn sleep(&self, duration: Duration) { + match self { + #[cfg(feature = "tokio")] + Self::Tokio(_) => tokio::time::sleep(duration).await, + #[cfg(feature = "simulation")] + Self::Simulation(handle) => handle.sleep(duration).await, } } }