diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index d8cd4884bcc..d11c4acd51d 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -989,6 +989,10 @@ impl RelationalDB { Ok(self.inner.alter_table_access_mut_tx(tx, name, access)?) } + pub(crate) fn alter_table_event_flag(&self, tx: &mut MutTx, name: &str, is_event: bool) -> Result<(), DBError> { + Ok(self.inner.alter_table_event_flag_mut_tx(tx, name, is_event)?) + } + pub(crate) fn alter_table_primary_key( &self, tx: &mut MutTx, diff --git a/crates/core/src/db/update.rs b/crates/core/src/db/update.rs index 6e7db31f7ba..31c5b496b7c 100644 --- a/crates/core/src/db/update.rs +++ b/crates/core/src/db/update.rs @@ -282,6 +282,27 @@ fn auto_migrate_database( let table_def = plan.new.stored_in_table_def(&table_name.clone().into()).unwrap(); stdb.alter_table_access(tx, table_name, table_def.table_access.into())?; } + spacetimedb_schema::auto_migrate::AutoMigrateStep::ChangeEventFlag(table_name) => { + let table_def: &TableDef = plan.new.expect_lookup(table_name); + let table_id = stdb + .table_id_from_name_mut(tx, table_name)? + .expect("ChangeEventFlag references a table that should exist"); + + // Pre-validate: flipping is only safe when the table has no committed rows. + if stdb.table_row_count_mut(tx, table_id).unwrap_or(0) > 0 { + anyhow::bail!( + "Cannot change `event` flag on table `{table_name}`: table contains data. \ + Clear the table's rows (e.g. via a reducer) before toggling the `event` annotation." + ); + } + + log!( + logger, + "Changing `event` flag on table `{table_name}` to `{}`", + table_def.is_event + ); + stdb.alter_table_event_flag(tx, table_name, table_def.is_event)?; + } spacetimedb_schema::auto_migrate::AutoMigrateStep::ChangePrimaryKey(table_name) => { let table_def = plan.new.stored_in_table_def(&table_name.clone().into()).unwrap(); log!(logger, "Changing primary key for table `{table_name}`"); @@ -339,6 +360,10 @@ mod test { db::relational_db::tests_utils::{begin_mut_tx, insert, TestDB}, host::module_host::create_table_from_def, }; + use pretty_assertions::assert_matches; + use spacetimedb_datastore::locking_tx_datastore::test_helpers::{ + assert_is_event_state, check_table_event_flag_altered, + }; use spacetimedb_datastore::locking_tx_datastore::PendingSchemaChange; use spacetimedb_lib::db::raw_def::v9::{btree, RawModuleDefV9Builder, TableAccess}; use spacetimedb_sats::{product, AlgebraicType, AlgebraicType::U64}; @@ -580,4 +605,86 @@ mod test { ); Ok(()) } + + /// Build a minimal v10 module with a single user table `events` whose + /// `is_event` flag matches `is_event`. + fn single_event_table_module_v10(is_event: bool) -> ModuleDef { + use spacetimedb_lib::db::raw_def::v10::RawModuleDefV10Builder; + + let mut builder = RawModuleDefV10Builder::new(); + builder + .build_table_with_new_type("events", [("id", U64)], true) + .with_event(is_event) + .finish(); + builder + .finish() + .try_into() + .expect("should be a valid v10 module definition") + } + + /// Create a non-event `events` table from the schema of `single_event_table_module_v10(false)` + /// in a fresh tx, commit it, and return the `TableId`. Leaves the table empty. + fn setup_events_table(stdb: &TestDB, module: &ModuleDef) -> anyhow::Result { + let mut tx = begin_mut_tx(stdb); + for def in module.tables() { + create_table_from_def(stdb, &mut tx, module, def)?; + } + let table_id = stdb + .table_id_from_name_mut(&tx, "events")? + .expect("table should exist"); + stdb.commit_tx(tx)?; + Ok(table_id) + } + + #[test] + fn change_event_flag_empty_table_succeeds() -> anyhow::Result<()> { + let auth_ctx = AuthCtx::for_testing(); + let stdb = TestDB::durable()?; + + let old = single_event_table_module_v10(false); + let new = single_event_table_module_v10(true); + let table_id = setup_events_table(&stdb, &old)?; + + let mut tx = begin_mut_tx(&stdb); + assert_is_event_state(&tx, table_id, false); + + let plan = ponder_migrate(&old, &new)?; + let res = update_database(&stdb, &mut tx, auth_ctx, plan, &TestLogger)?; + + assert!( + matches!(res, UpdateResult::RequiresClientDisconnect), + "flipping the `event` flag should disconnect clients" + ); + assert_is_event_state(&tx, table_id, true); + check_table_event_flag_altered(&tx, table_id, true); + Ok(()) + } + + #[test] + fn change_event_flag_nonempty_table_fails() -> anyhow::Result<()> { + let auth_ctx = AuthCtx::for_testing(); + let stdb = TestDB::durable()?; + + let old = single_event_table_module_v10(false); + let new = single_event_table_module_v10(true); + let table_id = setup_events_table(&stdb, &old)?; + + // Insert a row in a separate tx so the pre-flip table state is committed. + let mut tx = begin_mut_tx(&stdb); + insert(&stdb, &mut tx, table_id, &product![42u64])?; + stdb.commit_tx(tx)?; + + let mut tx = begin_mut_tx(&stdb); + let plan = ponder_migrate(&old, &new)?; + let err = update_database(&stdb, &mut tx, auth_ctx, plan, &TestLogger) + .err() + .expect("flipping `is_event` on a non-empty table should fail"); + assert!( + err.to_string().contains("contains data"), + "error should mention that the table contains data, got: {err}" + ); + assert_is_event_state(&tx, table_id, false); + assert_eq!(tx.pending_schema_changes(), []); + Ok(()) + } } diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index c479be085d9..a048fd6173b 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -734,6 +734,12 @@ impl CommittedState { let table = self.tables.get_mut(&table_id)?; table.with_mut_schema(|s| s.table_access = access); } + // A table's `is_event` flag was changed. Change back to the old one. + TableAlterEventFlag(table_id, old_is_event) => { + let table = self.tables.get_mut(&table_id)?; + assert_eq!(table.num_rows(), 0); + table.with_mut_schema(|s| s.is_event = old_is_event); + } // A table's primary key was changed. Change back to the old one. TableAlterPrimaryKey(table_id, old_pk) => { let table = self.tables.get_mut(&table_id)?; diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index 9d25dc144a8..e6fd87fe790 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -280,6 +280,14 @@ impl Locking { tx.alter_table_access(table_id, access) } + pub fn alter_table_event_flag_mut_tx(&self, tx: &mut MutTxId, name: &str, is_event: bool) -> Result<()> { + let table_id = self + .table_id_from_name_mut_tx(tx, name)? + .ok_or_else(|| TableError::NotFound(name.into()))?; + + tx.alter_table_event_flag(table_id, is_event) + } + pub fn alter_table_primary_key_mut_tx( &self, tx: &mut MutTxId, @@ -968,11 +976,15 @@ fn metadata_from_row(row: RowRef<'_>) -> Result { pub(crate) mod tests { use super::*; use crate::error::IndexError; + use crate::locking_tx_datastore::test_helpers::{ + assert_is_event_state, check_table_event_flag_altered, st_event_table_has_row, + }; use crate::locking_tx_datastore::tx_state::PendingSchemaChange; use crate::system_tables::{ system_tables, StColumnRow, StConnectionCredentialsFields, StConstraintData, StConstraintFields, - StConstraintRow, StEventTableFields, StIndexAlgorithm, StIndexFields, StIndexRow, StRowLevelSecurityFields, - StScheduledFields, StSequenceFields, StSequenceRow, StTableRow, StVarFields, StViewArgFields, StViewFields, + StConstraintRow, StEventTableFields, StEventTableRow, StIndexAlgorithm, StIndexFields, StIndexRow, + StRowLevelSecurityFields, StScheduledFields, StSequenceFields, StSequenceRow, StTableRow, StVarFields, + StViewArgFields, StViewFields, ST_CLIENT_ID, ST_CLIENT_NAME, ST_COLUMN_ACCESSOR_ID, ST_COLUMN_ACCESSOR_NAME, ST_COLUMN_ID, ST_COLUMN_NAME, ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_NAME, ST_CONSTRAINT_ID, ST_CONSTRAINT_NAME, ST_EVENT_TABLE_ID, ST_EVENT_TABLE_NAME, ST_INDEX_ACCESSOR_ID, ST_INDEX_ACCESSOR_NAME, ST_INDEX_ID, @@ -3141,6 +3153,137 @@ pub(crate) mod tests { Ok(()) } + #[test] + fn test_alter_table_event_flag_non_event_to_event() -> ResultTest<()> { + // Create a non-event table. + let (datastore, tx, table_id) = setup_table()?; + commit(&datastore, tx)?; + + // Flip `is_event` from `false` to `true`. + let mut tx = begin_mut_tx(&datastore); + assert_is_event_state(&tx, table_id, false); + assert!( + !st_event_table_has_row(&datastore, &tx, table_id), + "fresh non-event table must not have a row in `st_event_table`" + ); + + tx.alter_table_event_flag(table_id, true)?; + check_table_event_flag_altered(&tx, table_id, true); + assert_is_event_state(&tx, table_id, true); + assert!( + st_event_table_has_row(&datastore, &tx, table_id), + "after flipping to event, `st_event_table` should have the row" + ); + + let tx_data = commit(&datastore, tx)?; + // Flipping to event inserts one row into `st_event_table` + // and does not touch the user table's row data. + let expected_row = ProductValue::from(StEventTableRow { table_id }); + assert_eq!( + tx_data.inserts_for_table(ST_EVENT_TABLE_ID), + Some(&[expected_row][..]), + ); + assert_eq!(tx_data.inserts_for_table(table_id), None); + assert_eq!(tx_data.deletes_for_table(table_id), None); + + // After commit, the schema should reflect the flipped flag + // and `st_event_table` should contain the row. + let tx = begin_mut_tx(&datastore); + assert_is_event_state(&tx, table_id, true); + assert!( + st_event_table_has_row(&datastore, &tx, table_id), + "after commit, `st_event_table` should have the row" + ); + Ok(()) + } + + #[test] + fn test_alter_table_event_flag_event_to_non_event() -> ResultTest<()> { + // Create an event table. + let (datastore, tx, table_id) = setup_event_table()?; + commit(&datastore, tx)?; + + // Sanity check: `st_event_table` should have the row. + let mut tx = begin_mut_tx(&datastore); + assert_is_event_state(&tx, table_id, true); + assert!( + st_event_table_has_row(&datastore, &tx, table_id), + "event table should have a row in `st_event_table`" + ); + + // Flip `is_event` from `true` to `false`. + tx.alter_table_event_flag(table_id, false)?; + check_table_event_flag_altered(&tx, table_id, false); + assert_is_event_state(&tx, table_id, false); + assert!( + !st_event_table_has_row(&datastore, &tx, table_id), + "after flipping to non-event, `st_event_table` should not have the row" + ); + + let tx_data = commit(&datastore, tx)?; + // Flipping away from event deletes one row from `st_event_table` + // and does not touch the user table's row data. + let expected_row = ProductValue::from(StEventTableRow { table_id }); + assert_eq!( + tx_data.deletes_for_table(ST_EVENT_TABLE_ID), + Some(&[expected_row][..]), + ); + assert_eq!(tx_data.inserts_for_table(table_id), None); + assert_eq!(tx_data.deletes_for_table(table_id), None); + + // After commit, the schema should reflect the flipped flag + // and `st_event_table` should NOT contain the row. + let tx = begin_mut_tx(&datastore); + assert_is_event_state(&tx, table_id, false); + assert!( + !st_event_table_has_row(&datastore, &tx, table_id), + "after commit, `st_event_table` should not have the row" + ); + Ok(()) + } + + #[test] + fn test_alter_table_event_flag_rollback_reverts_live_state_and_st_event_table() -> ResultTest<()> { + // Create a non-event table. + let (datastore, tx, table_id) = setup_table()?; + commit(&datastore, tx)?; + + // Start a new tx, flip, check pending change, then rollback. + let mut tx = begin_mut_tx(&datastore); + assert!(!st_event_table_has_row(&datastore, &tx, table_id)); + + tx.alter_table_event_flag(table_id, true)?; + check_table_event_flag_altered(&tx, table_id, true); + // The in-tx view must reflect the flip. + assert_is_event_state(&tx, table_id, true); + assert!( + st_event_table_has_row(&datastore, &tx, table_id), + "after flipping within the tx, `st_event_table` should have the row" + ); + let _ = datastore.rollback_mut_tx(tx); + + // After rollback, the schema and `st_event_table` should be back to pre-state. + let tx = begin_mut_tx(&datastore); + assert_eq!(tx.pending_schema_changes(), []); + assert_is_event_state(&tx, table_id, false); + assert!( + !st_event_table_has_row(&datastore, &tx, table_id), + "rollback should revert the `st_event_table` row" + ); + Ok(()) + } + + #[test] + fn test_alter_table_event_flag_idempotent_no_pending_change() -> ResultTest<()> { + let (datastore, tx, table_id) = setup_table()?; + commit(&datastore, tx)?; + + let mut tx = begin_mut_tx(&datastore); + tx.alter_table_event_flag(table_id, false)?; + assert_eq!(tx.pending_schema_changes(), []); + Ok(()) + } + #[test] fn test_alter_table_row_type_rejects_some_bad_changes() -> ResultTest<()> { let datastore = get_datastore()?; diff --git a/crates/datastore/src/locking_tx_datastore/mod.rs b/crates/datastore/src/locking_tx_datastore/mod.rs index 8f77b462bdd..aac824bc1aa 100644 --- a/crates/datastore/src/locking_tx_datastore/mod.rs +++ b/crates/datastore/src/locking_tx_datastore/mod.rs @@ -15,6 +15,8 @@ pub use tx::{NumDistinctValues, TxId}; mod tx_state; #[cfg(any(test, feature = "test"))] pub use tx_state::PendingSchemaChange; +#[cfg(any(test, feature = "test"))] +pub mod test_helpers; use parking_lot::{ lock_api::{ArcMutexGuard, ArcRwLockReadGuard, ArcRwLockWriteGuard}, diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index d73f794e029..4c32c9c92f1 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -20,7 +20,7 @@ use crate::{ error::{IndexError, SequenceError, TableError}, system_tables::{ with_sys_table_buf, StClientFields, StClientRow, StColumnAccessorFields, StColumnAccessorRow, StColumnFields, - StColumnRow, StConstraintFields, StConstraintRow, StEventTableRow, StFields as _, StIndexAccessorFields, + StColumnRow, StConstraintFields, StConstraintRow, StEventTableFields, StEventTableRow, StFields as _, StIndexAccessorFields, StIndexAccessorRow, StIndexFields, StIndexRow, StRowLevelSecurityFields, StRowLevelSecurityRow, StScheduledFields, StScheduledRow, StSequenceFields, StSequenceRow, StTableAccessorFields, StTableAccessorRow, StTableFields, StTableRow, SystemTable, ST_CLIENT_ID, ST_COLUMN_ACCESSOR_ID, ST_COLUMN_ID, ST_CONSTRAINT_ID, @@ -691,8 +691,7 @@ impl MutTxId { // Insert into st_event_table if this is an event table. if is_event { - let row = StEventTableRow { table_id }; - self.insert_via_serialize_bsatn(ST_EVENT_TABLE_ID, &row)?; + self.insert_st_event_table_row(table_id)?; } // Create the indexes for the table. @@ -1082,6 +1081,50 @@ impl MutTxId { Ok(()) } + /// Change the `is_event` flag of the table identified by `table_id`. + /// + /// Updates both the in-memory schema and the `st_event_table` system table. + /// This is a breaking change for subscribed clients (the committed state + /// semantics of the table flip), so callers must arrange a `DisconnectAllUsers`. + pub(crate) fn alter_table_event_flag(&mut self, table_id: TableId, is_event: bool) -> Result<()> { + // Write to the table in the tx state (and clone into commit state). + let ((tx_table, ..), (commit_table, ..)) = self.get_or_create_insert_table_mut(table_id)?; + let old_is_event = tx_table.get_schema().is_event; + if old_is_event == is_event { + // Idempotent no-op; do not record a pending change or it would confuse rollback. + return Ok(()); + } + tx_table.with_mut_schema_and_clone(commit_table, |s| s.is_event = is_event); + + // Update `st_event_table`. + if is_event { + self.insert_st_event_table_row(table_id)?; + } else { + self.delete_st_event_table_row(table_id)?; + } + + // Remember the pending change so we can undo if necessary. + self.push_schema_change(PendingSchemaChange::TableAlterEventFlag(table_id, old_is_event)); + + Ok(()) + } + + /// Inserts a row into `st_event_table` marking `table_id` as an event table. + fn insert_st_event_table_row(&mut self, table_id: TableId) -> Result<()> { + let row = StEventTableRow { table_id }; + self.insert_via_serialize_bsatn(ST_EVENT_TABLE_ID, &row)?; + Ok(()) + } + + /// Drops the row in `st_event_table` for this `table_id`. + fn delete_st_event_table_row(&mut self, table_id: TableId) -> Result<()> { + self.delete_col_eq( + ST_EVENT_TABLE_ID, + StEventTableFields::TableId.col_id(), + &table_id.into(), + ) + } + /// Change the primary key of the table identified by `table_id`. /// /// Updates both the in-memory schema and the `st_table` system table. diff --git a/crates/datastore/src/locking_tx_datastore/replay.rs b/crates/datastore/src/locking_tx_datastore/replay.rs index cf0fd357f39..2f9a4e9cff8 100644 --- a/crates/datastore/src/locking_tx_datastore/replay.rs +++ b/crates/datastore/src/locking_tx_datastore/replay.rs @@ -5,8 +5,8 @@ use crate::error::{DatastoreError, IndexError, TableError, ViewError}; use crate::locking_tx_datastore::state_view::{iter_st_column_for_table, StateView}; use crate::system_tables::{ is_built_in_meta_row, table_id_is_reserved, StColumnRow, StConstraintData, StConstraintRow, StFields as _, - StIndexRow, StSequenceFields, StTableFields, StTableRow, StViewRow, ST_COLUMN_ID, ST_CONSTRAINT_ID, ST_INDEX_ID, - ST_SEQUENCE_ID, ST_TABLE_ID, ST_VIEW_ARG_ID, ST_VIEW_ID, ST_VIEW_SUB_ID, + StIndexRow, StSequenceFields, StTableFields, StTableRow, StViewRow, ST_COLUMN_ID, ST_CONSTRAINT_ID, + ST_EVENT_TABLE_ID, ST_INDEX_ID, ST_SEQUENCE_ID, ST_TABLE_ID, ST_VIEW_ARG_ID, ST_VIEW_ID, ST_VIEW_SUB_ID, }; use anyhow::{anyhow, Context}; use core::cell::RefMut; @@ -762,6 +762,14 @@ impl<'cs> ReplayCommittedState<'cs> { self.st_column_changed(referenced_table_id)?; } + if table_id == ST_EVENT_TABLE_ID { + // An `st_event_table` row was inserted; flip `is_event=true` + // on the referenced table's cached schema. + // The `table_id` is the first (and only) field in `StEventTableRow`. + let referenced_table_id = Self::read_table_id(row); + self.reschema_table_for_st_event_table_update(referenced_table_id, true); + } + Ok(()) } @@ -807,6 +815,20 @@ impl<'cs> ReplayCommittedState<'cs> { Ok(()) } + /// Update the in-memory table structure's `is_event` flag in response to + /// replay of an `st_event_table` mutation. + fn reschema_table_for_st_event_table_update(&mut self, table_id: TableId, is_event: bool) { + // We only need to update if we've already constructed the in-memory table structure. + // If we haven't yet, then `self.get_table_and_blob_store_or_create` will see the correct schema + // (via `schema_for_table_raw`'s live `st_event_table` lookup) when it eventually runs. + if let Ok((table, ..)) = self.get_table_and_blob_store_mut(table_id) { + assert_eq!(table.num_rows(), 0); + table.with_mut_schema(|schema| { + schema.is_event = is_event; + }); + } + } + /// Mark all `st_column` rows which refer to the same column as `st_column_row` /// other than the one at `row_pointer` as outdated /// by storing them in [`Self::replay_columns_to_ignore`]. @@ -936,6 +958,14 @@ impl<'cs> ReplayCommittedState<'cs> { self.replay_columns_to_ignore.remove(&row_ptr); } + if table_id == ST_EVENT_TABLE_ID { + // An `st_event_table` row was deleted; flip `is_event=false` + // on the referenced table's cached schema. + // The `table_id` is the first (and only) field in `StEventTableRow`. + let referenced_table_id = Self::read_table_id(row); + self.reschema_table_for_st_event_table_update(referenced_table_id, false); + } + Ok(()) } diff --git a/crates/datastore/src/locking_tx_datastore/test_helpers.rs b/crates/datastore/src/locking_tx_datastore/test_helpers.rs new file mode 100644 index 00000000000..99e7fb1dd41 --- /dev/null +++ b/crates/datastore/src/locking_tx_datastore/test_helpers.rs @@ -0,0 +1,42 @@ +//! Test-only helpers shared between the datastore's internal tests and +//! downstream-crate tests (e.g. `spacetimedb-core`'s `update.rs`). +//! +//! These are gated by `#[cfg(any(test, feature = "test"))]` and re-exported +//! from `locking_tx_datastore::mod` so they are reachable from other crates +//! that enable the `test` feature. + +use super::datastore::Locking; +use super::state_view::StateView as _; +use super::tx_state::PendingSchemaChange; +use super::MutTxId; +use crate::system_tables::{StEventTableFields, ST_EVENT_TABLE_ID}; +use crate::traits::MutTxDatastore as _; +use spacetimedb_primitives::TableId; + +/// Asserts that the live schema's `is_event` flag for `table_id` equals `expected`. +pub fn assert_is_event_state(tx: &MutTxId, table_id: TableId, expected: bool) { + let actual = tx + .get_schema(table_id) + .map(|s| s.is_event) + .expect("schema should exist"); + assert_eq!(actual, expected, "expected table {table_id:?} is_event={expected}"); +} + +/// Returns whether `st_event_table` contains a row referencing `table_id`. +pub fn st_event_table_has_row(datastore: &Locking, tx: &MutTxId, table_id: TableId) -> bool { + datastore + .iter_by_col_eq_mut_tx(tx, ST_EVENT_TABLE_ID, StEventTableFields::TableId, &table_id.into()) + .expect("st_event_table lookup should succeed") + .next() + .is_some() +} + +/// Asserts that `tx.pending_schema_changes()` contains exactly one +/// `TableAlterEventFlag` change for `table_id` recording the old value +/// (i.e. the value just before we altered to `state`). +pub fn check_table_event_flag_altered(tx: &MutTxId, table_id: TableId, state: bool) { + assert_eq!( + tx.pending_schema_changes(), + [PendingSchemaChange::TableAlterEventFlag(table_id, !state)] + ); +} diff --git a/crates/datastore/src/locking_tx_datastore/tx_state.rs b/crates/datastore/src/locking_tx_datastore/tx_state.rs index 4e712bcf684..7d13180647c 100644 --- a/crates/datastore/src/locking_tx_datastore/tx_state.rs +++ b/crates/datastore/src/locking_tx_datastore/tx_state.rs @@ -118,6 +118,9 @@ pub enum PendingSchemaChange { /// The access of the table with [`TableId`] was changed. /// The old access was stored. TableAlterAccess(TableId, StAccess), + /// The `is_event` flag of the table with [`TableId`] was changed. + /// The old value is stored. + TableAlterEventFlag(TableId, bool), /// The row type of the table with [`TableId`] was changed. /// The old column schemas was stored. /// Only non-representational row-type changes are allowed here, @@ -148,6 +151,7 @@ impl MemoryUsage for PendingSchemaChange { Self::TableRemoved(table_id, table) => table_id.heap_usage() + table.heap_usage(), Self::TableAdded(table_id) => table_id.heap_usage(), Self::TableAlterAccess(table_id, st_access) => table_id.heap_usage() + st_access.heap_usage(), + Self::TableAlterEventFlag(table_id, old_is_event) => table_id.heap_usage() + old_is_event.heap_usage(), Self::TableAlterRowType(table_id, column_schemas) => table_id.heap_usage() + column_schemas.heap_usage(), Self::TableAlterPrimaryKey(table_id, pk) => table_id.heap_usage() + pk.heap_usage(), Self::ConstraintRemoved(table_id, constraint_schema) => { diff --git a/crates/schema/src/auto_migrate.rs b/crates/schema/src/auto_migrate.rs index 6d169a902cc..af1585e082b 100644 --- a/crates/schema/src/auto_migrate.rs +++ b/crates/schema/src/auto_migrate.rs @@ -301,6 +301,13 @@ pub enum AutoMigrateStep<'def> { /// Change the access of a table. ChangeAccess(::Key<'def>), + /// Toggle the `is_event` flag of a table. The table must be empty at apply time. + /// + /// This is a breaking change for subscribed clients (the committed-state + /// semantics of the table flip), so this step is always accompanied by a + /// `DisconnectAllUsers`. + ChangeEventFlag(::Key<'def>), + /// Change the primary key of a table. /// /// This updates the `table_primary_key` field in `st_table` @@ -434,9 +441,6 @@ pub enum AutoMigrateError { type2: TableType, }, - #[error("Changing the event flag of table {table} requires a manual migration")] - ChangeTableEventFlag { table: Identifier }, - #[error( "Changing the accessor name on index {index} from {old_accessor:?} to {new_accessor:?} requires a manual migration" )] @@ -674,14 +678,12 @@ fn auto_migrate_table<'def>(plan: &mut AutoMigratePlan<'def>, old: &'def TableDe } .into()) }; - let event_ok: Result<()> = if old.is_event == new.is_event { - Ok(()) - } else { - Err(AutoMigrateError::ChangeTableEventFlag { - table: old.name.clone(), - } - .into()) - }; + if old.is_event != new.is_event { + // Flipping `is_event` changes committed-state semantics; + // clients must reconnect after the migration. + plan.ensure_disconnect_all_users(); + plan.steps.push(AutoMigrateStep::ChangeEventFlag(key)); + } if old.table_access != new.table_access { plan.steps.push(AutoMigrateStep::ChangeAccess(key)); } @@ -760,8 +762,8 @@ fn auto_migrate_table<'def>(plan: &mut AutoMigratePlan<'def>, old: &'def TableDe }) .collect_all_errors::>(); - let ((), (), ProductMonoid(Any(row_type_changed), Any(columns_added))) = - (type_ok, event_ok, columns_ok).combine_errors()?; + let ((), ProductMonoid(Any(row_type_changed), Any(columns_added))) = + (type_ok, columns_ok).combine_errors()?; // If we're adding a column, we'll rewrite the whole table. // That makes any `ChangeColumns` moot, so we can skip it. @@ -2391,31 +2393,60 @@ mod tests { } #[test] - fn test_change_event_flag_rejected() { - // non-event → event - let old = create_v10_module_def(|builder| { - builder - .build_table_with_new_type("Events", ProductType::from([("id", AlgebraicType::U64)]), true) - .finish(); - }); - let new = create_v10_module_def(|builder| { - builder - .build_table_with_new_type("events", ProductType::from([("id", AlgebraicType::U64)]), true) - .with_event(true) - .finish(); - }); + fn test_change_event_flag_produces_step() { + let build = |is_event: bool| { + create_v10_module_def(|builder| { + builder + .build_table_with_new_type("events", ProductType::from([("id", AlgebraicType::U64)]), true) + .with_event(is_event) + .finish(); + }) + }; + let assert_flip = |old_is_event: bool, new_is_event: bool| { + let old = build(old_is_event); + let new = build(new_is_event); + let events_key = expect_identifier("events"); + let plan = ponder_auto_migrate(&old, &new) + .expect("toggling `is_event` on an empty table should succeed"); + assert_eq!( + plan.steps, + &[ + AutoMigrateStep::ChangeEventFlag(&events_key), + AutoMigrateStep::DisconnectAllUsers, + ], + ); + assert!(plan.disconnects_all_users(), "{plan:#?}"); + }; + assert_flip(false, true); + assert_flip(true, false); + } - let result = ponder_auto_migrate(&old, &new); - expect_error_matching!( - result, - AutoMigrateError::ChangeTableEventFlag { table } => &table[..] == "events" - ); + #[test] + fn test_change_event_flag_does_not_produce_orphan_sub_object_steps() { + // Flipping `is_event` must not trigger spurious index/constraint/sequence diff steps + // for a table whose only change is the `is_event` annotation. + let build = |is_event: bool| { + create_v10_module_def(|builder| { + builder + .build_table_with_new_type("events", ProductType::from([("id", AlgebraicType::U64)]), true) + .with_unique_constraint(0) + .with_index(btree(0), "events_id_idx", "events_id_idx") + .with_primary_key(0) + .with_event(is_event) + .finish(); + }) + }; - // event → non-event (reverse direction) - let result = ponder_auto_migrate(&new, &old); - expect_error_matching!( - result, - AutoMigrateError::ChangeTableEventFlag { table } => &table[..] == "events" + let events_key = expect_identifier("events"); + let old = build(false); + let new = build(true); + let plan = ponder_auto_migrate(&old, &new).expect("toggling `is_event` on an empty table should succeed"); + assert_eq!( + plan.steps, + &[ + AutoMigrateStep::ChangeEventFlag(&events_key), + AutoMigrateStep::DisconnectAllUsers, + ], ); } diff --git a/crates/schema/src/auto_migrate/formatter.rs b/crates/schema/src/auto_migrate/formatter.rs index 1f7377209bf..cc16a4ee938 100644 --- a/crates/schema/src/auto_migrate/formatter.rs +++ b/crates/schema/src/auto_migrate/formatter.rs @@ -75,6 +75,10 @@ fn format_step( let access_info = extract_access_change_info(*table, plan)?; f.format_change_access(&access_info) } + AutoMigrateStep::ChangeEventFlag(table) => { + let event_info = extract_event_flag_change_info(*table, plan)?; + f.format_change_event_flag(&event_info) + } AutoMigrateStep::ChangePrimaryKey(table) => { let old_table = plan.old.lookup_expect::(*table); let new_table = plan.new.lookup_expect::(*table); @@ -156,6 +160,7 @@ pub trait MigrationFormatter { fn format_constraint(&mut self, constraint_info: &ConstraintInfo, action: Action) -> io::Result<()>; fn format_sequence(&mut self, sequence_info: &SequenceInfo, action: Action) -> io::Result<()>; fn format_change_access(&mut self, access_info: &AccessChangeInfo) -> io::Result<()>; + fn format_change_event_flag(&mut self, event_info: &EventFlagChangeInfo) -> io::Result<()>; fn format_change_primary_key( &mut self, table_name: &str, @@ -235,6 +240,12 @@ pub struct AccessChangeInfo { pub new_access: TableAccess, } +#[derive(Debug, Clone, PartialEq)] +pub struct EventFlagChangeInfo { + pub table_name: Identifier, + pub new_is_event: bool, +} + #[derive(Debug, Clone, PartialEq)] pub struct ScheduleInfo { pub table_name: Identifier, @@ -513,6 +524,20 @@ fn extract_access_change_info( }) } +fn extract_event_flag_change_info( + table: ::Key<'_>, + plan: &super::AutoMigratePlan, +) -> Result { + let table_def = plan.new.table(table).ok_or_else(|| FormattingErrors::TableNotFound { + table: table.to_string().into(), + })?; + + Ok(EventFlagChangeInfo { + table_name: table_def.name.clone(), + new_is_event: table_def.is_event, + }) +} + fn extract_schedule_info( schedule_table: ::Key<'_>, module_def: &ModuleDef, diff --git a/crates/schema/src/auto_migrate/termcolor_formatter.rs b/crates/schema/src/auto_migrate/termcolor_formatter.rs index 31807d14b30..f08532d70a8 100644 --- a/crates/schema/src/auto_migrate/termcolor_formatter.rs +++ b/crates/schema/src/auto_migrate/termcolor_formatter.rs @@ -10,8 +10,8 @@ use crate::auto_migrate::formatter::ViewInfo; use crate::identifier::Identifier; use super::formatter::{ - AccessChangeInfo, Action, ColumnChange, ColumnChanges, ConstraintInfo, IndexInfo, MigrationFormatter, NewColumns, - RlsInfo, ScheduleInfo, SequenceInfo, TableInfo, + AccessChangeInfo, Action, ColumnChange, ColumnChanges, ConstraintInfo, EventFlagChangeInfo, IndexInfo, + MigrationFormatter, NewColumns, RlsInfo, ScheduleInfo, SequenceInfo, TableInfo, }; /// Color scheme for consistent formatting @@ -325,6 +325,20 @@ impl MigrationFormatter for TermColorFormatter { self.buffer.write_all(b")\n") } + fn format_change_event_flag(&mut self, e: &EventFlagChangeInfo) -> io::Result<()> { + let direction = if e.new_is_event { + "non-event → event" + } else { + "event → non-event" + }; + self.write_action_prefix(&Action::Changed)?; + self.buffer.write_all(b" event flag for table ")?; + self.write_colored(&e.table_name, Some(self.colors.table_name), true)?; + self.buffer.write_all(b" (")?; + self.write_colored(direction, Some(self.colors.access), false)?; + self.buffer.write_all(b")\n") + } + fn format_change_primary_key( &mut self, table_name: &str,