diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 54dbaeaf095..202bf2f10e0 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -1684,12 +1684,18 @@ impl ModuleHost { for ViewCallInfo { view_id, table_id, - view_name, + fn_ptr, sender, } in out.tx.view_for_update().cloned().collect::>() { + let view_def = self + .info + .module_def + .get_view_by_id(fn_ptr, sender.is_none()) + .ok_or(ViewCallError::NoSuchView)?; + let result = self - .call_view(out.tx, &view_name, view_id, table_id, Nullary, caller, sender) + .call_view(out.tx, &view_def.name, view_id, table_id, Nullary, caller, sender) .await?; // Increment execution stats diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 2108fc782b4..90c7e28474d 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -1003,11 +1003,11 @@ impl InstanceCommon { .cloned() .map(|info| { let view_def = module_def - .view(&*info.view_name) - .unwrap_or_else(|| panic!("view `{}` not found", info.view_name)); + .get_view_by_id(info.fn_ptr, info.sender.is_none()) + .unwrap_or_else(|| panic!("view with fn_ptr `{}` not found", info.fn_ptr)); CallViewParams { - view_name: info.view_name, + view_name: view_def.name.clone().into(), view_id: info.view_id, table_id: info.table_id, fn_ptr: view_def.fn_ptr, @@ -1224,7 +1224,7 @@ impl InstanceOp for ViewOp<'_> { FuncCallType::View(ViewCallInfo { view_id: self.view_id, table_id: self.table_id, - view_name: self.name.to_owned().into_boxed_str(), + fn_ptr: self.fn_ptr, sender: Some(*self.sender), }) } @@ -1254,7 +1254,7 @@ impl InstanceOp for AnonymousViewOp<'_> { FuncCallType::View(ViewCallInfo { view_id: self.view_id, table_id: self.table_id, - view_name: self.name.to_owned().into_boxed_str(), + fn_ptr: self.fn_ptr, sender: None, }) } diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index daeac6d4be1..6f7adf5ee41 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -93,6 +93,15 @@ impl CommittedState { pub(super) fn views_for_table_scan(&self, table_id: &TableId) -> impl Iterator { self.read_sets.views_for_table_scan(table_id) } + + /// Returns the views that perform an precise index seek on given `row_ref` of `table_id` + pub fn views_for_index_seek<'a>( + &'a self, + table_id: &TableId, + row_ref: RowRef<'a>, + ) -> impl Iterator { + self.read_sets.views_for_index_seek(table_id, row_ref) + } } impl MemoryUsage for CommittedState { diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index 8ef3f605628..3c31964a8cd 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -42,7 +42,7 @@ use spacetimedb_lib::{ ConnectionId, Identity, }; use spacetimedb_primitives::{ - col_list, ArgId, ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId, ViewId, + col_list, ArgId, ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId, ViewFnPtr, ViewId, }; use spacetimedb_sats::{ bsatn::{self, to_writer, DecodeError, Deserializer}, @@ -76,7 +76,7 @@ type DecodeResult = core::result::Result; pub struct ViewCallInfo { pub view_id: ViewId, pub table_id: TableId, - pub view_name: Box, + pub fn_ptr: ViewFnPtr, pub sender: Option, } @@ -102,8 +102,8 @@ impl ViewReadSets { } /// Record that a view performs a full scan of this table - fn insert_scan(&mut self, table_id: TableId, call: ViewCallInfo) { - self.tables.entry(table_id).or_default().insert_scan(call); + pub fn insert_full_table_scan(&mut self, table_id: TableId, call: ViewCallInfo) { + self.tables.entry(table_id).or_default().insert_table_scan(call); } /// Removes keys for `view_id` from the read set @@ -120,17 +120,39 @@ impl ViewReadSets { self.tables.entry(table_id).or_default().merge(rs); } } + + /// Record that a view reads a specific index key + pub fn insert_index_scan(&mut self, table_id: TableId, cols: ColList, proj: AlgebraicValue, call: ViewCallInfo) { + let table_readset = self.tables.entry(table_id).or_default(); + table_readset.insert_index_scan(cols, proj, call); + } + + /// Returns the views that index seek on the given row pointer + pub fn views_for_index_seek<'a>( + &'a self, + table_id: &TableId, + row_ptr: RowRef<'a>, + ) -> impl Iterator { + self.tables + .get(table_id) + .into_iter() + .flat_map(move |ts| ts.views_for_index_seek(row_ptr)) + } } +type IndexKeyReadSet = HashMap>; +type IndexColReadSet = HashMap; + /// A table-level read set for views #[derive(Default)] struct TableReadSet { table_scans: HashSet, + index_reads: IndexColReadSet, } impl TableReadSet { /// Record that this view performs a full scan of this read set's table - fn insert_scan(&mut self, call: ViewCallInfo) { + fn insert_table_scan(&mut self, call: ViewCallInfo) { self.table_scans.insert(call); } @@ -139,24 +161,60 @@ impl TableReadSet { self.table_scans.iter() } - /// Is this read set empty? + /// Record that a view reads a specific index key for this table + fn insert_index_scan(&mut self, cols: ColList, key: AlgebraicValue, call: ViewCallInfo) { + let key_map = self.index_reads.entry(cols).or_default(); + key_map.entry(key).or_default().insert(call); + } + + /// Returns the views that index seek on the given row pointer (for this table) + fn views_for_index_seek<'a>(&'a self, row_ptr: RowRef<'a>) -> impl Iterator { + self.index_reads.iter().flat_map(move |(cols, av_set)| { + row_ptr + .project(cols) + .ok() + .and_then(|av| av_set.get(&av)) + .into_iter() + .flat_map(|views| views.iter()) + }) + } + + /// Is this read set empty? (no table scans and no index reads) fn is_empty(&self) -> bool { - self.table_scans.is_empty() + self.table_scans.is_empty() && self.index_reads.is_empty() } /// Removes keys for `view_id` from the read set, optionally filtering by `sender` fn remove_view(&mut self, view_id: ViewId, sender: Option) { - if let Some(identity) = sender { - self.table_scans - .retain(|call| !(call.view_id == view_id && call.sender.as_ref() == Some(&identity))); - } else { - self.table_scans.retain(|call| call.view_id != view_id); - } + let matches_call = |call: &ViewCallInfo| { + call.view_id == view_id && sender.as_ref().is_none_or(|s| call.sender.as_ref() == Some(s)) + }; + + // Remove from table_scans + self.table_scans.retain(|call| !matches_call(call)); + + // Remove from index_reads + self.index_reads.retain(|_cols, key_map| { + key_map.retain(|_key, views| { + views.retain(|call| !matches_call(call)); + !views.is_empty() + }); + !key_map.is_empty() + }); } - /// Merge or union two read sets for this table - fn merge(&mut self, readset: TableReadSet) { - self.table_scans.extend(readset.table_scans); + /// Merge (union) another table read set into this one + fn merge(&mut self, other: TableReadSet) { + // merge table scans + self.table_scans.extend(other.table_scans); + + // merge index reads: for each cols, for each av, extend the view set + for (cols, other_av_set) in other.index_reads { + let av_set = self.index_reads.entry(cols).or_default(); + for (av, other_views) in other_av_set { + av_set.entry(av).or_default().extend(other_views); + } + } } } @@ -192,7 +250,7 @@ impl MutTxId { /// Record that a view performs a table scan in this transaction's read set pub fn record_table_scan(&mut self, op: &FuncCallType, table_id: TableId) { if let FuncCallType::View(view) = op { - self.read_sets.insert_scan(table_id, view.clone()); + self.read_sets.insert_full_table_scan(table_id, view.clone()); } } @@ -201,28 +259,84 @@ impl MutTxId { &mut self, op: &FuncCallType, table_id: TableId, - _: IndexId, - _: Bound, - _: Bound, + index_id: IndexId, + lower: Bound, + upper: Bound, ) { - // TODO: Implement read set tracking for index scans - if let FuncCallType::View(view) = op { - self.read_sets.insert_scan(table_id, view.clone()); + let FuncCallType::View(view) = op else { + return; + }; + + // Check for precise index seek + if let (Bound::Included(low_val), Bound::Included(up_val)) = (&lower, &upper) { + if low_val == up_val { + // Fetch index metadata + let Some((_, idx, _)) = self.get_table_and_index(index_id) else { + return; + }; + + let cols = idx.index().indexed_columns.clone(); + self.read_sets + .insert_index_scan(table_id, cols, low_val.clone(), view.clone()); + return; + } } + + // Everything else is treated as a table scan + self.read_sets.insert_full_table_scan(table_id, view.clone()); } /// Returns the views whose read sets overlaps with this transaction's write set - pub fn view_for_update(&self) -> impl Iterator { - self.tx_state + pub fn view_for_update(&self) -> impl Iterator + '_ { + let mut res = self + .tx_state .insert_tables .keys() .filter(|table_id| !self.tx_state.delete_tables.contains_key(table_id)) .chain(self.tx_state.delete_tables.keys()) .flat_map(|table_id| self.committed_state_write_lock.views_for_table_scan(table_id)) - .collect::>() - .into_iter() - } + .collect::>(); + + // Include views that perform precise index seeks. + let mut process_views = |table_id: &_, row_ref| { + for view_call in self.committed_state_write_lock.views_for_index_seek(table_id, row_ref) { + res.insert(view_call); + } + }; + for (table_id, deleted_table) in &self.tx_state.delete_tables { + let (table, blob_store, _) = self + .committed_state_write_lock + .get_table_and_blob_store(*table_id) + .expect("table must exist in committed state for deleted table"); + + if table.indexes.is_empty() { + continue; + } + + for ptr in deleted_table.iter() { + if let Some(row_ref) = table.get_row_ref(blob_store, ptr) { + process_views(table_id, row_ref); + } + } + } + + for (table_id, inserted_table) in &self.tx_state.insert_tables { + let (table, blob_store, _) = self + .committed_state_write_lock + .get_table_and_blob_store(*table_id) + .expect("table must exist in committed state for inserted table"); + + if table.indexes.is_empty() { + continue; + } + + for row_ref in inserted_table.scan_rows(blob_store) { + process_views(table_id, row_ref); + } + } + res.into_iter() + } /// Removes keys for `view_id` from the committed read set. /// Used for dropping views in an auto-migration. pub fn drop_view_from_committed_read_set(&mut self, view_id: ViewId) { diff --git a/crates/schema/src/def.rs b/crates/schema/src/def.rs index 6efb552c864..10c18b36691 100644 --- a/crates/schema/src/def.rs +++ b/crates/schema/src/def.rs @@ -276,6 +276,14 @@ impl ModuleDef { self.reducers.get_index(id.idx()).map(|(_, def)| def) } + /// Look up a view by its id, and whether it is anonymous. + pub fn get_view_by_id(&self, id: ViewFnPtr, is_anonymous: bool) -> Option<&ViewDef> { + self.views + .iter() + .find(|(_, def)| def.fn_ptr == id && def.is_anonymous == is_anonymous) + .map(|(_, def)| def) + } + /// Convenience method to look up a procedure, possibly by a string, returning its id as well. pub fn procedure_full>( &self, diff --git a/smoketests/tests/views.py b/smoketests/tests/views.py index 9d8deaa90a5..2b5a9de2e34 100644 --- a/smoketests/tests/views.py +++ b/smoketests/tests/views.py @@ -109,6 +109,16 @@ class SqlViews(Smoketest): level: u64, } + +#[derive(Clone)] +#[spacetimedb::table(name = player_info, index(name=age_level_index, btree(columns = [age, level])))] +pub struct PlayerInfo { + #[primary_key] + id: u64, + age: u64, + level: u64, +} + #[spacetimedb::reducer] pub fn add_player_level(ctx: &ReducerContext, id: u64, level: u64) { ctx.db.player_level().insert(PlayerState { id, level }); @@ -141,6 +151,14 @@ class SqlViews(Smoketest): let second = PlayerState { id: 7, level: 3 }; vec![first, second] } + +#[spacetimedb::view(name = player_info_multi_index, public)] +pub fn player_info_view(ctx: &ViewContext) -> Option { + + log::info!("player_info called"); + ctx.db.player_info().age_level_index().filter((25u64, 7u64)).next() + +} """ def assertSql(self, sql, expected): @@ -190,21 +208,31 @@ def test_http_sql(self): # since it relies on log capturing starting from an empty log. def test_a_view_materialization(self): """This test asserts whether views are materialized correctly""" - self.insert_initial_data() + player_called_log = "player view called" + + # call view, with no data + self.assertSql("SELECT * FROM player", """\ + id | level +----+------- +""") + logs = self.logs(100) + self.assertEqual(logs.count(player_called_log), 1) - self.assertNotIn(player_called_log, self.logs(100)) + self.insert_initial_data() + # Should invoke view as data is inserted self.call_player_view() - #On first call, the view is evaluated - self.assertIn(player_called_log, self.logs(100)) + + logs = self.logs(100) + self.assertEqual(logs.count(player_called_log), 2) self.call_player_view() - #On second call, the view is cached + # the view is cached logs = self.logs(100) - self.assertEqual(logs.count(player_called_log), 1) + self.assertEqual(logs.count(player_called_log), 2) - # insert to cause cache invalidation + # inserting new row should not trigger view invocation due to readsets self.spacetime( "sql", self.database_identity, @@ -214,10 +242,93 @@ def test_a_view_materialization(self): ) self.call_player_view() - #On third call, after invalidation, the view is evaluated again logs = self.logs(100) self.assertEqual(logs.count(player_called_log), 2) + # Updating the row that the view depends on should trigger re-evaluation + self.spacetime( + "sql", + self.database_identity, + """ +UPDATE player_state SET level = 9 WHERE id = 42; +""", + ) + + # On fourth call, after updating the dependent row, the view is re-evaluated + logs = self.logs(100) + self.assertEqual(logs.count(player_called_log), 3) + + + # Updating it back for other tests to work + self.spacetime( + "sql", + self.database_identity, + """ +UPDATE player_state SET level = 7 WHERE id = 42; +""", + ) + + def test_view_multi_index_materialization(self): + """This test asserts whether views using multi-column indexes are materialized correctly""" + + player_called_log = "player_info called" + + # call view, with no data + self.assertSql("SELECT * FROM player_info_multi_index", """\ + id | age | level +----+-----+------- +""") + + logs = self.logs(100) + self.assertEqual(logs.count(player_called_log), 1) + + # Insert data + self.spacetime( + "sql", + self.database_identity, + """\ +INSERT INTO player_info (id, age, level) VALUES (1, 25, 7); +""", + ) + + # Should invoke view as data is inserted + self.assertSql("SELECT * FROM player_info_multi_index", """\ + id | age | level +----+-----+------- + 1 | 25 | 7 +""") + logs = self.logs(100) + self.assertEqual(logs.count(player_called_log), 2) + + + # Inserting a row that does not match should not trigger re-evaluation + self.spacetime( + "sql", + self.database_identity, + """\ +INSERT INTO player_info (id, age, level) VALUES (2, 25, 8); +""", + ) + + logs = self.logs(100) + self.assertEqual(logs.count(player_called_log), 2) + + # Updating the row that the view depends on should trigger re-evaluation + self.spacetime( + "sql", + self.database_identity, + """ +UPDATE player_info SET age = 26 WHERE id = 1; +""", + ) + logs = self.logs(100) + self.assertEqual(logs.count(player_called_log), 3) + self.assertSql("SELECT * FROM player_info_multi_index", """\ + id | age | level +----+-----+------- +""") + + def test_query_anonymous_view_reducer(self): """Tests that anonymous views are updated for reducers""" self.call("add_player_level", 0, 1)