diff --git a/crates/client-api/src/routes/database.rs b/crates/client-api/src/routes/database.rs index 0ec7a1f0577..cacf7ab0557 100644 --- a/crates/client-api/src/routes/database.rs +++ b/crates/client-api/src/routes/database.rs @@ -9,6 +9,7 @@ use crate::auth::{ SpacetimeIdentityToken, }; use crate::routes::subscribe::generate_random_connection_id; +use crate::util::serde::humantime_duration; pub use crate::util::{ByteStringBody, NameOrIdentity}; use crate::{ log_and_500, Action, Authorization, ControlStateDelegate, DatabaseDef, DatabaseResetDef, Host, MaybeMisdirected, @@ -20,13 +21,14 @@ use axum::response::{ErrorResponse, IntoResponse}; use axum::routing::MethodRouter; use axum::Extension; use axum_extra::TypedHeader; +use derive_more::From; use futures::TryStreamExt; use http::StatusCode; use log::{info, warn}; use serde::Deserialize; use spacetimedb::auth::identity::ConnectionAuthCtx; use spacetimedb::database_logger::DatabaseLogger; -use spacetimedb::host::module_host::ClientConnectedError; +use spacetimedb::host::module_host::{ClientConnectedError, DurabilityExited}; use spacetimedb::host::{CallResult, UpdateDatabaseResult}; use spacetimedb::host::{FunctionArgs, MigratePlanResult}; use spacetimedb::host::{ModuleHost, ReducerOutcome}; @@ -44,6 +46,9 @@ use spacetimedb_lib::{sats, AlgebraicValue, Hash, ProductValue, Timestamp}; use spacetimedb_schema::auto_migrate::{ MigrationPolicy as SchemaMigrationPolicy, MigrationToken, PrettyPrintStyle as AutoMigratePrettyPrintStyle, }; +use tokio::sync::oneshot; +use tokio::time::error::Elapsed; +use tokio::time::timeout; use super::subscribe::{handle_websocket, HasWebSocketOptions}; @@ -695,8 +700,33 @@ pub struct PublishDatabaseQueryParams { parent: Option, #[serde(alias = "org")] organization: Option, + /// Duration to wait for a database update to become confirmed (i.e. durable). + /// + /// The value is parsed via the `humantime` crate, e.g. "1m", "23s", "5min". + /// + /// If not given, defaults to [default_update_confirmation_timeout]. + /// The maximum timeout is capped by [MAX_UPDATE_CONFIRMATION_TIMEOUT]. + /// + /// The parameter has no effect when creating a new database. + #[serde(with = "humantime_duration", default = "default_update_confirmation_timeout")] + update_confirmation_timeout: Duration, +} + +/// Default timeout for a database update to become confirmed / durable. +/// +/// Currently, the value is 5s. +const fn default_update_confirmation_timeout() -> Duration { + Duration::from_secs(5) } +/// Maximum timeout for a database update to become confirmed / durable. +/// +/// If a replication group doesn't converge within this time span, it is +/// probably not making progress at all. +/// +/// Currently, the value is 5min. +const MAX_UPDATE_CONFIRMATION_TIMEOUT: Duration = Duration::from_secs(5 * 60); + pub async fn publish( State(ctx): State, Path(PublishDatabaseParams { name_or_identity }): Path, @@ -708,6 +738,7 @@ pub async fn publish( host_type, parent, organization, + update_confirmation_timeout: confirmation_timeout, }): Query, Extension(auth): Extension, program_bytes: Bytes, @@ -823,6 +854,13 @@ pub async fn publish( .await .map_err(log_and_500)?; + let success = || { + axum::Json(PublishResult::Success { + domain: db_name.cloned(), + database_identity, + op: publish_op, + }) + }; match maybe_updated { Some(UpdateDatabaseResult::AutoMigrateError(errs)) => { Err(bad_request(format!("Database update rejected: {errs}").into())) @@ -830,16 +868,58 @@ pub async fn publish( Some(UpdateDatabaseResult::ErrorExecutingMigration(err)) => Err(bad_request( format!("Failed to create or update the database: {err}").into(), )), - None - | Some( - UpdateDatabaseResult::NoUpdateNeeded - | UpdateDatabaseResult::UpdatePerformed - | UpdateDatabaseResult::UpdatePerformedWithClientDisconnect, - ) => Ok(axum::Json(PublishResult::Success { - domain: db_name.cloned(), - database_identity, - op: publish_op, - })), + None | Some(UpdateDatabaseResult::NoUpdateNeeded) => Ok(success()), + Some( + UpdateDatabaseResult::UpdatePerformed { + tx_offset, + durable_offset, + } + | UpdateDatabaseResult::UpdatePerformedWithClientDisconnect { + tx_offset, + durable_offset, + }, + ) => { + timeout(confirmation_timeout.min(MAX_UPDATE_CONFIRMATION_TIMEOUT), async { + let tx_offset = tx_offset.await?; + if let Some(mut durable_offset) = durable_offset { + durable_offset.wait_for(tx_offset).await?; + } + + Ok::<_, UpdateConfirmationError>(()) + }) + .await + .map_err(Into::into) + .flatten()?; + + Ok(success()) + } + } +} + +#[derive(From)] +enum UpdateConfirmationError { + Cancelled(oneshot::error::RecvError), + Crashed(DurabilityExited), + Timeout(Elapsed), +} + +impl From for ErrorResponse { + fn from(e: UpdateConfirmationError) -> Self { + match e { + UpdateConfirmationError::Cancelled(_) => ( + StatusCode::SERVICE_UNAVAILABLE, + "Database update failed: transaction was cancelled", + ), + UpdateConfirmationError::Crashed(_) => ( + StatusCode::SERVICE_UNAVAILABLE, + "Database update failed: database crashed while waiting for transaction confirmation", + ), + UpdateConfirmationError::Timeout(_) => ( + StatusCode::GATEWAY_TIMEOUT, + "Database update failed: timeout waiting for transaction confirmation", + ), + } + .into() } } diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index 9a22c0e97d4..0abf4c5579a 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -1200,7 +1200,7 @@ impl Host { // Only replace the module + scheduler if the update succeeded. // Otherwise, we want the database to continue running with the old state. match update_result { - UpdateDatabaseResult::NoUpdateNeeded | UpdateDatabaseResult::UpdatePerformed => { + UpdateDatabaseResult::NoUpdateNeeded | UpdateDatabaseResult::UpdatePerformed { .. } => { self.scheduler = scheduler; scheduler_starter.start(&module)?; let old_module = self.module.send_replace(module); @@ -1208,7 +1208,7 @@ impl Host { } // In this case, we need to disconnect all clients connected to the old module - UpdateDatabaseResult::UpdatePerformedWithClientDisconnect => { + UpdateDatabaseResult::UpdatePerformedWithClientDisconnect { .. } => { // Replace the module first, so that new clients get the new module. let old_watcher = std::mem::replace(&mut self.module, watch::Sender::new(module.clone())); diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 6ed0f0ec97c..88c45a3f867 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -23,6 +23,7 @@ use crate::sql::ast::SchemaViewer; use crate::sql::execute::SqlResult; use crate::sql::parser::RowLevelExpr; use crate::subscription::module_subscription_actor::ModuleSubscriptions; +pub use crate::subscription::module_subscription_manager::TransactionOffset; use crate::subscription::tx::DeltaTx; use crate::subscription::websocket_building::{BuildableWebsocketFormat, RowListBuilderSource}; use crate::subscription::{execute_plan, execute_plan_for_view}; @@ -48,7 +49,7 @@ use spacetimedb_datastore::error::DatastoreError; use spacetimedb_datastore::execution_context::{Workload, WorkloadType}; use spacetimedb_datastore::locking_tx_datastore::{MutTxId, ViewCallInfo}; use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData}; -use spacetimedb_durability::DurableOffset; +pub use spacetimedb_durability::{DurabilityExited, DurableOffset}; use spacetimedb_execution::pipelined::{PipelinedProject, ViewProject}; use spacetimedb_execution::RelValue; use spacetimedb_expr::expr::CollectViews; @@ -911,8 +912,20 @@ pub struct WeakModuleHost { #[derive(Debug)] pub enum UpdateDatabaseResult { NoUpdateNeeded, - UpdatePerformed, - UpdatePerformedWithClientDisconnect, + UpdatePerformed { + /// The transaction offset of the successful database update. + tx_offset: TransactionOffset, + /// The durable transaction offset of the database. + /// `None` if the database is in-memory only. + durable_offset: Option, + }, + UpdatePerformedWithClientDisconnect { + /// The transaction offset of the successful database update. + tx_offset: TransactionOffset, + /// The durable transaction offset of the database. + /// `None` if the database is in-memory only. + durable_offset: Option, + }, AutoMigrateError(Box>), ErrorExecutingMigration(anyhow::Error), } @@ -921,9 +934,9 @@ impl UpdateDatabaseResult { pub fn was_successful(&self) -> bool { matches!( self, - UpdateDatabaseResult::UpdatePerformed + UpdateDatabaseResult::UpdatePerformed { .. } | UpdateDatabaseResult::NoUpdateNeeded - | UpdateDatabaseResult::UpdatePerformedWithClientDisconnect + | UpdateDatabaseResult::UpdatePerformedWithClientDisconnect { .. } ) } } diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index a711e0f18dc..3b709fc433e 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -23,7 +23,7 @@ use crate::module_host_context::ModuleCreationContext; use crate::replica_context::ReplicaContext; use crate::sql::ast::SchemaViewer; use crate::sql::execute::run_with_instance; -use crate::subscription::module_subscription_actor::commit_and_broadcast_event; +use crate::subscription::module_subscription_actor::{commit_and_broadcast_event, CommitAndBroadcastEventSuccess}; use crate::subscription::module_subscription_manager::TransactionOffset; use crate::util::prometheus_handle::{HistogramExt, TimerGuard}; use crate::worker_metrics::WORKER_METRICS; @@ -33,6 +33,7 @@ use core::future::Future; use core::time::Duration; use prometheus::{Histogram, IntCounter, IntGauge}; use spacetimedb_auth::identity::ConnectionAuthCtx; +use spacetimedb_client_api_messages::energy::EnergyQuanta; use spacetimedb_datastore::db_metrics::DB_METRICS; use spacetimedb_datastore::error::{DatastoreError, ViewError}; use spacetimedb_datastore::execution_context::{self, ReducerContext, Workload}; @@ -617,8 +618,6 @@ impl InstanceCommon { let auth_ctx = AuthCtx::for_current(replica_ctx.database.owner_identity); let res = crate::db::update::update_database(stdb, &mut tx, auth_ctx, plan, system_logger); - let mut energy_quanta_used = FunctionBudget::ZERO; - let mut host_execution_duration = Duration::ZERO; match res { Err(e) => { @@ -631,14 +630,42 @@ impl InstanceCommon { Ok(res) => { system_logger.info("Database updated"); log::info!("Database updated, {} host-type={}", stdb.database_identity(), host_type); + + let succeed = |info: Arc, + energy_quanta_used: EnergyQuanta, + host_execution_duration: Duration, + tx: MutTxId| + -> TransactionOffset { + let event = ModuleEvent { + timestamp: Timestamp::now(), + caller_identity: info.owner_identity, + caller_connection_id: None, + function_call: ModuleFunctionCall::update(), + status: EventStatus::Committed(DatabaseUpdate::default()), + reducer_return_value: None, + energy_quanta_used, + host_execution_duration, + request_id: None, + timer: None, + }; + let CommitAndBroadcastEventSuccess { tx_offset, .. } = + commit_and_broadcast_event(&info.subscriptions, None, event, tx); + + tx_offset + }; + let durable_offset = stdb.durable_tx_offset(); + let res: UpdateDatabaseResult = match res { - crate::db::update::UpdateResult::Success => UpdateDatabaseResult::UpdatePerformed, + crate::db::update::UpdateResult::Success => { + let tx_offset = succeed(self.info.clone(), FunctionBudget::ZERO.into(), Duration::ZERO, tx); + UpdateDatabaseResult::UpdatePerformed { + tx_offset, + durable_offset, + } + } crate::db::update::UpdateResult::EvaluateSubscribedViews => { let (out, trapped) = self.evaluate_subscribed_views(tx, inst)?; tx = out.tx; - energy_quanta_used = out.energy_used; - host_execution_duration = out.total_duration; - if trapped || out.outcome != ViewOutcome::Success { let msg = match trapped { true => "Trapped while evaluating views during database update".to_string(), @@ -648,35 +675,26 @@ impl InstanceCommon { ), }; + let (_, tx_metrics, reducer) = stdb.rollback_mut_tx(tx); + stdb.report_mut_tx_metrics(reducer, tx_metrics, None); UpdateDatabaseResult::ErrorExecutingMigration(anyhow::anyhow!(msg)) } else { - UpdateDatabaseResult::UpdatePerformed + let tx_offset = succeed(self.info.clone(), out.energy_used.into(), out.total_duration, tx); + UpdateDatabaseResult::UpdatePerformed { + tx_offset, + durable_offset, + } } } crate::db::update::UpdateResult::RequiresClientDisconnect => { - UpdateDatabaseResult::UpdatePerformedWithClientDisconnect + let tx_offset = succeed(self.info.clone(), FunctionBudget::ZERO.into(), Duration::ZERO, tx); + UpdateDatabaseResult::UpdatePerformedWithClientDisconnect { + tx_offset, + durable_offset, + } } }; - if res.was_successful() { - let event = ModuleEvent { - timestamp: Timestamp::now(), - caller_identity: self.info.owner_identity, - caller_connection_id: None, - function_call: ModuleFunctionCall::update(), - status: EventStatus::Committed(DatabaseUpdate::default()), - reducer_return_value: None, - energy_quanta_used: energy_quanta_used.into(), - host_execution_duration, - request_id: None, - timer: None, - }; - //TODO: Return back event in `UpdateDatabaseResult`? - let _ = commit_and_broadcast_event(&self.info.subscriptions, None, event, tx); - } else { - let (_, tx_metrics, reducer) = stdb.rollback_mut_tx(tx); - stdb.report_mut_tx_metrics(reducer, tx_metrics, None); - } Ok(res) } } diff --git a/crates/durability/src/lib.rs b/crates/durability/src/lib.rs index 2b2aadcbbfb..8bdcbb3b922 100644 --- a/crates/durability/src/lib.rs +++ b/crates/durability/src/lib.rs @@ -31,7 +31,7 @@ pub struct DurabilityExited; /// The handle is valid for as long as the [`Durability`] instance it was /// obtained from is live, i.e. able to persist transactions. When the instance /// shuts down or crashes, methods will return errors of type [`DurabilityExited`]. -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct DurableOffset { // TODO: `watch::Receiver::wait_for` will hold a shared lock until all // subscribers have seen the current value. Although it may skip entries, diff --git a/crates/standalone/src/control_db.rs b/crates/standalone/src/control_db.rs index 8333695a42a..887d1aeefba 100644 --- a/crates/standalone/src/control_db.rs +++ b/crates/standalone/src/control_db.rs @@ -167,6 +167,7 @@ impl ControlDb { let identity_bytes = database_identity.to_byte_array(); let tree = self.db.open_tree("dns")?; tree.insert(domain.to_lowercase(), &identity_bytes)?; + tree.flush()?; let tree = self.db.open_tree("reverse_dns")?; match tree.get(identity_bytes)? { @@ -179,6 +180,7 @@ impl ControlDb { tree.insert(identity_bytes, serde_json::to_string(&vec![&domain])?.as_bytes())?; } } + tree.flush()?; Ok(InsertDomainResult::Success { domain, @@ -272,6 +274,10 @@ impl ControlDb { } rev_tx.insert(&database_identity_bytes, serde_json::to_vec(domain_names).unwrap())?; + dns_tx.flush(); + tld_tx.flush(); + rev_tx.flush(); + Ok::<_, ConflictableTransactionError>(()) }); @@ -309,6 +315,7 @@ impl ControlDb { } None => { tree.insert(key, &owner_identity.to_byte_array())?; + tree.flush()?; Ok(RegisterTldResult::Success { domain: tld }) } } @@ -372,9 +379,11 @@ impl ControlDb { let buf = sled::IVec::from(compat::Database::from(database).to_vec()?); tree.insert(key, buf.clone())?; + tree.flush()?; let tree = self.db.open_tree("database")?; tree.insert(id.to_be_bytes(), buf)?; + tree.flush()?; Ok(id) } @@ -387,9 +396,11 @@ impl ControlDb { let tree = self.db.open_tree("database_by_identity")?; let buf = sled::IVec::from(compat::Database::from(database).to_vec()?); tree.insert(stored_database.database_identity.to_be_byte_array(), buf.clone())?; + tree.flush()?; let tree = self.db.open_tree("database")?; tree.insert(stored_database.id.to_be_bytes(), buf)?; + tree.flush()?; Ok(()) } @@ -404,6 +415,7 @@ impl ControlDb { tree_by_identity.remove(&key[..])?; tree.remove(id.to_be_bytes())?; + tree.flush()?; return Ok(Some(id)); } @@ -466,6 +478,7 @@ impl ControlDb { let buf = bsatn::to_vec(&replica).unwrap(); tree.insert(id.to_be_bytes(), buf)?; + tree.flush()?; Ok(id) } @@ -473,6 +486,7 @@ impl ControlDb { pub fn delete_replica(&self, id: u64) -> Result<()> { let tree = self.db.open_tree("replica")?; tree.remove(id.to_be_bytes())?; + tree.flush()?; Ok(()) } @@ -509,6 +523,7 @@ impl ControlDb { let buf = bsatn::to_vec(&node).unwrap(); tree.insert(id.to_be_bytes(), buf)?; + tree.flush()?; Ok(id) } @@ -519,12 +534,14 @@ impl ControlDb { let buf = bsatn::to_vec(&node)?; tree.insert(node.id.to_be_bytes(), buf)?; + tree.flush()?; Ok(()) } pub fn _delete_node(&self, id: u64) -> Result<()> { let tree = self.db.open_tree("node")?; tree.remove(id.to_be_bytes())?; + tree.flush()?; Ok(()) } @@ -580,6 +597,7 @@ impl ControlDb { pub fn set_energy_balance(&self, identity: Identity, energy_balance: energy::EnergyBalance) -> Result<()> { let tree = self.db.open_tree("energy_budget")?; tree.insert(identity.to_byte_array(), &energy_balance.get().to_be_bytes())?; + tree.flush()?; Ok(()) }