From c0118a2d08b81475004a608781783a348f397781 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Thu, 20 Nov 2025 21:29:59 +0530 Subject: [PATCH 1/8] index readsets --- .../locking_tx_datastore/committed_state.rs | 9 ++ .../src/locking_tx_datastore/mut_tx.rs | 140 +++++++++++++++--- smoketests/tests/views.py | 26 +++- 3 files changed, 153 insertions(+), 22 deletions(-) diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index daeac6d4be1..6f7adf5ee41 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -93,6 +93,15 @@ impl CommittedState { pub(super) fn views_for_table_scan(&self, table_id: &TableId) -> impl Iterator { self.read_sets.views_for_table_scan(table_id) } + + /// Returns the views that perform an precise index seek on given `row_ref` of `table_id` + pub fn views_for_index_seek<'a>( + &'a self, + table_id: &TableId, + row_ref: RowRef<'a>, + ) -> impl Iterator { + self.read_sets.views_for_index_seek(table_id, row_ref) + } } impl MemoryUsage for CommittedState { diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index 8ef3f605628..31b6e79d1e5 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -56,14 +56,10 @@ use spacetimedb_schema::{ schema::{ColumnSchema, ConstraintSchema, IndexSchema, RowLevelSecuritySchema, SequenceSchema, TableSchema}, }; use spacetimedb_table::{ - blob_store::BlobStore, - indexes::{RowPointer, SquashedOffset}, - static_assert_size, - table::{ + blob_store::BlobStore, indexes::{RowPointer, SquashedOffset}, static_assert_size, table::{ BlobNumBytes, DuplicateError, IndexScanRangeIter, InsertError, RowRef, Table, TableAndIndex, UniqueConstraintViolation, - }, - table_index::TableIndex, + }, table_index::TableIndex }; use std::{ sync::Arc, @@ -84,6 +80,7 @@ pub struct ViewCallInfo { #[derive(Default)] pub struct ViewReadSets { tables: IntMap, + indexes: IntMap, } impl MemoryUsage for ViewReadSets { @@ -102,7 +99,7 @@ impl ViewReadSets { } /// Record that a view performs a full scan of this table - fn insert_scan(&mut self, table_id: TableId, call: ViewCallInfo) { + fn insert_table_scan(&mut self, table_id: TableId, call: ViewCallInfo) { self.tables.entry(table_id).or_default().insert_scan(call); } @@ -112,6 +109,9 @@ impl ViewReadSets { readset.remove_view(view_id, sender); !readset.is_empty() }); + + // index cleanup + self.remove_view_from_indexes(view_id, sender); } /// Merge or union read sets together @@ -119,9 +119,68 @@ impl ViewReadSets { for (table_id, rs) in readset.tables { self.tables.entry(table_id).or_default().merge(rs); } + + self.merge_index_reads(readset.indexes); + } + + /// Record that a view reads a specific index key + pub fn insert_index_scan(&mut self, table_id: TableId, cols: ColList, proj: AlgebraicValue, call: ViewCallInfo) { + let col_map = self.indexes.entry(table_id).or_default(); + let key_map = col_map.entry(cols).or_default(); + key_map.entry(proj).or_default().insert(call); + } + + /// Returns the views that index seek on the given row pointer + pub fn views_for_index_seek<'a>( + &'a self, + table_id: &TableId, + row_ptr: RowRef<'a>, + ) -> impl Iterator { + self.indexes.get(table_id).into_iter().flat_map(move |table_sets| { + table_sets.iter().flat_map(move |(cols, av_sets)| { + row_ptr + .project(cols) + .ok() + .and_then(|k| av_sets.get(&k)) + .into_iter() + .flat_map(|views| views.iter()) + }) + }) + } + + // Remove all references to a view from the index read sets + fn remove_view_from_indexes(&mut self, view_id: ViewId, sender: Option) { + self.indexes.retain(|_, col_map| { + col_map.retain(|_, key_map| { + key_map.retain(|_, views| { + views.retain(|call| { + !(call.view_id == view_id && sender.as_ref().is_none_or(|s| call.sender.as_ref() == Some(s))) + }); + !views.is_empty() + }); + !key_map.is_empty() + }); + !col_map.is_empty() + }); + } + + /// Merge (union) another index reads into this one + fn merge_index_reads(&mut self, other: IntMap) { + for (table_id, other_col_map) in other { + let col_map = self.indexes.entry(table_id).or_default(); + for (cols, other_key_map) in other_col_map { + let key_map = col_map.entry(cols).or_default(); + for (key, other_views) in other_key_map { + key_map.entry(key).or_default().extend(other_views); + } + } + } } } +type IndexKeyReadSet = HashMap>; +type IndexColReadSet = HashMap; + /// A table-level read set for views #[derive(Default)] struct TableReadSet { @@ -192,7 +251,7 @@ impl MutTxId { /// Record that a view performs a table scan in this transaction's read set pub fn record_table_scan(&mut self, op: &FuncCallType, table_id: TableId) { if let FuncCallType::View(view) = op { - self.read_sets.insert_scan(table_id, view.clone()); + self.read_sets.insert_table_scan(table_id, view.clone()); } } @@ -201,28 +260,69 @@ impl MutTxId { &mut self, op: &FuncCallType, table_id: TableId, - _: IndexId, - _: Bound, - _: Bound, + index_id: IndexId, + lower: Bound, + upper: Bound, ) { - // TODO: Implement read set tracking for index scans - if let FuncCallType::View(view) = op { - self.read_sets.insert_scan(table_id, view.clone()); + let FuncCallType::View(view) = op else { + return; + }; + + // Check for precise index seek + if let (Bound::Included(low_val), Bound::Included(up_val)) = (&lower, &upper) { + if low_val == up_val { + // Fetch index metadata + let Some((_, idx, _)) = self.get_table_and_index(index_id) else { + return; + }; + + let cols = idx.index().indexed_columns.clone(); + self.read_sets + .insert_index_scan(table_id, cols, low_val.clone(), view.clone()); + return; + } } + + // Everything else is treated as a table scan + self.read_sets.insert_table_scan(table_id, view.clone()); } - /// Returns the views whose read sets overlaps with this transaction's write set - pub fn view_for_update(&self) -> impl Iterator { - self.tx_state + pub fn view_for_update(&self) -> impl Iterator + '_ { + let mut res = self + .tx_state .insert_tables .keys() .filter(|table_id| !self.tx_state.delete_tables.contains_key(table_id)) .chain(self.tx_state.delete_tables.keys()) .flat_map(|table_id| self.committed_state_write_lock.views_for_table_scan(table_id)) - .collect::>() - .into_iter() - } + .collect::>(); + + // Include views that perform precise index seeks. + // It is sufficient to only consider deleted tables, + // as deleted rows will cover all modification to existing rows in the committed state. + for (table_id, deleted_table) in &self.tx_state.delete_tables { + let (table, blob_store, _) = self + .committed_state_write_lock + .get_table_and_blob_store(*table_id) + .expect("table must exist in committed state for deleted table"); + + // Skip tables without indexes. + if table.indexes.is_empty() { + continue; + } + for ptr in deleted_table.iter() { + let Some(row_ref) = table.get_row_ref(blob_store, ptr) else { + continue; + }; + for view_call in self.committed_state_write_lock.views_for_index_seek(table_id, row_ref) { + res.insert(view_call); + } + } + } + + res.into_iter() + } /// Removes keys for `view_id` from the committed read set. /// Used for dropping views in an auto-migration. pub fn drop_view_from_committed_read_set(&mut self, view_id: ViewId) { diff --git a/smoketests/tests/views.py b/smoketests/tests/views.py index 9d8deaa90a5..fe74975a4cd 100644 --- a/smoketests/tests/views.py +++ b/smoketests/tests/views.py @@ -204,7 +204,7 @@ def test_a_view_materialization(self): logs = self.logs(100) self.assertEqual(logs.count(player_called_log), 1) - # insert to cause cache invalidation + # inserting new row should not trigger view invocation due to readsets self.spacetime( "sql", self.database_identity, @@ -214,7 +214,29 @@ def test_a_view_materialization(self): ) self.call_player_view() - #On third call, after invalidation, the view is evaluated again + #On third call, after inserting a row, the view is still cached + logs = self.logs(100) + self.assertEqual(logs.count(player_called_log), 1) + + # Updating the row that the view depends on should trigger re-evaluation + self.spacetime( + "sql", + self.database_identity, + """ +UPDATE player_state SET level = 9 WHERE id = 42; +""", + ) + + # Call the view again + self.spacetime( + "sql", + self.database_identity, + """ +Select * FROM player_state WHERE id = 42; +""", + ) + + # On fourth call, after updating the dependent row, the view is re-evaluated logs = self.logs(100) self.assertEqual(logs.count(player_called_log), 2) From 3bd98d7018ee56b2c6ed29c5ad923d92d63ea2ad Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Thu, 20 Nov 2025 22:21:02 +0530 Subject: [PATCH 2/8] include comment --- crates/datastore/src/locking_tx_datastore/mut_tx.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index 31b6e79d1e5..084eafd4cef 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -287,9 +287,9 @@ impl MutTxId { self.read_sets.insert_table_scan(table_id, view.clone()); } + /// Returns the views whose read sets overlaps with this transaction's write set pub fn view_for_update(&self) -> impl Iterator + '_ { - let mut res = self - .tx_state + let mut res = self.tx_state .insert_tables .keys() .filter(|table_id| !self.tx_state.delete_tables.contains_key(table_id)) From d16bbc811c46216c64076d234335eb8778aff227 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Thu, 20 Nov 2025 22:29:30 +0530 Subject: [PATCH 3/8] fix statis assert --- crates/datastore/src/locking_tx_datastore/mut_tx.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index 084eafd4cef..7bc70bf0325 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -245,7 +245,7 @@ pub struct MutTxId { pub metrics: ExecutionMetrics, } -static_assert_size!(MutTxId, 432); +static_assert_size!(MutTxId, 464); impl MutTxId { /// Record that a view performs a table scan in this transaction's read set From a0b8ab8e4ae9c0968147a2e200fb6ab8dd2afd8c Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Thu, 20 Nov 2025 23:26:43 +0530 Subject: [PATCH 4/8] consider insert tables too and smoketest --- .../src/locking_tx_datastore/mut_tx.rs | 31 +++++++++++++--- smoketests/tests/views.py | 37 ++++++++++++++----- 2 files changed, 54 insertions(+), 14 deletions(-) diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index 7bc70bf0325..86d086588ec 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -56,10 +56,14 @@ use spacetimedb_schema::{ schema::{ColumnSchema, ConstraintSchema, IndexSchema, RowLevelSecuritySchema, SequenceSchema, TableSchema}, }; use spacetimedb_table::{ - blob_store::BlobStore, indexes::{RowPointer, SquashedOffset}, static_assert_size, table::{ + blob_store::BlobStore, + indexes::{RowPointer, SquashedOffset}, + static_assert_size, + table::{ BlobNumBytes, DuplicateError, IndexScanRangeIter, InsertError, RowRef, Table, TableAndIndex, UniqueConstraintViolation, - }, table_index::TableIndex + }, + table_index::TableIndex, }; use std::{ sync::Arc, @@ -289,7 +293,8 @@ impl MutTxId { /// Returns the views whose read sets overlaps with this transaction's write set pub fn view_for_update(&self) -> impl Iterator + '_ { - let mut res = self.tx_state + let mut res = self + .tx_state .insert_tables .keys() .filter(|table_id| !self.tx_state.delete_tables.contains_key(table_id)) @@ -298,8 +303,6 @@ impl MutTxId { .collect::>(); // Include views that perform precise index seeks. - // It is sufficient to only consider deleted tables, - // as deleted rows will cover all modification to existing rows in the committed state. for (table_id, deleted_table) in &self.tx_state.delete_tables { let (table, blob_store, _) = self .committed_state_write_lock @@ -321,6 +324,24 @@ impl MutTxId { } } + for (table_id, inserted_table) in &self.tx_state.insert_tables { + let (table, blob_store, _) = self + .committed_state_write_lock + .get_table_and_blob_store(*table_id) + .expect("table must exist in committed state for inserted table"); + + // Skip tables without indexes. + if table.indexes.is_empty() { + continue; + } + + for row_ref in inserted_table.scan_rows(blob_store) { + for view_call in self.committed_state_write_lock.views_for_index_seek(table_id, row_ref) { + res.insert(view_call); + } + } + } + res.into_iter() } /// Removes keys for `view_id` from the committed read set. diff --git a/smoketests/tests/views.py b/smoketests/tests/views.py index fe74975a4cd..859f6a42fcd 100644 --- a/smoketests/tests/views.py +++ b/smoketests/tests/views.py @@ -190,19 +190,29 @@ def test_http_sql(self): # since it relies on log capturing starting from an empty log. def test_a_view_materialization(self): """This test asserts whether views are materialized correctly""" - self.insert_initial_data() + player_called_log = "player view called" + + # call view, with no data + self.assertSql("SELECT * FROM player", """\ + id | level +----+------- +""") + logs = self.logs(100) + self.assertEqual(logs.count(player_called_log), 1) - self.assertNotIn(player_called_log, self.logs(100)) + self.insert_initial_data() + # Should invoke view as data is inserted self.call_player_view() - #On first call, the view is evaluated - self.assertIn(player_called_log, self.logs(100)) + + logs = self.logs(100) + self.assertEqual(logs.count(player_called_log), 2) self.call_player_view() - #On second call, the view is cached + # the view is cached logs = self.logs(100) - self.assertEqual(logs.count(player_called_log), 1) + self.assertEqual(logs.count(player_called_log), 2) # inserting new row should not trigger view invocation due to readsets self.spacetime( @@ -214,9 +224,8 @@ def test_a_view_materialization(self): ) self.call_player_view() - #On third call, after inserting a row, the view is still cached logs = self.logs(100) - self.assertEqual(logs.count(player_called_log), 1) + self.assertEqual(logs.count(player_called_log), 2) # Updating the row that the view depends on should trigger re-evaluation self.spacetime( @@ -238,7 +247,17 @@ def test_a_view_materialization(self): # On fourth call, after updating the dependent row, the view is re-evaluated logs = self.logs(100) - self.assertEqual(logs.count(player_called_log), 2) + self.assertEqual(logs.count(player_called_log), 3) + + + # Updating it back for other tests to work + self.spacetime( + "sql", + self.database_identity, + """ +UPDATE player_state SET level = 7 WHERE id = 42; +""", + ) def test_query_anonymous_view_reducer(self): """Tests that anonymous views are updated for reducers""" From d64cec66b5df08114c1b44466894d3b4dc0bc8e6 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Fri, 21 Nov 2025 17:06:10 +0530 Subject: [PATCH 5/8] multi col index smokletest --- smoketests/tests/views.py | 79 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/smoketests/tests/views.py b/smoketests/tests/views.py index 859f6a42fcd..439a9ac8771 100644 --- a/smoketests/tests/views.py +++ b/smoketests/tests/views.py @@ -109,6 +109,16 @@ class SqlViews(Smoketest): level: u64, } + +#[derive(Clone)] +#[spacetimedb::table(name = player_info, index(name=age_level_index, btree(columns = [age, level])))] +pub struct PlayerInfo { + #[primary_key] + id: u64, + age: u64, + level: u64, +} + #[spacetimedb::reducer] pub fn add_player_level(ctx: &ReducerContext, id: u64, level: u64) { ctx.db.player_level().insert(PlayerState { id, level }); @@ -141,6 +151,14 @@ class SqlViews(Smoketest): let second = PlayerState { id: 7, level: 3 }; vec![first, second] } + +#[spacetimedb::view(name = player_info_multi_index, public)] +pub fn player_info_view(ctx: &ViewContext) -> Option { + + log::info!("player_info called"); + ctx.db.player_info().age_level_index().filter((25u64, 7u64)).next() + +} """ def assertSql(self, sql, expected): @@ -259,6 +277,67 @@ def test_a_view_materialization(self): """, ) + def test_view_multi_index_materialization(self): + """This test asserts whether views using multi-column indexes are materialized correctly""" + + player_called_log = "player_info called" + + # call view, with no data + self.assertSql("SELECT * FROM player_info_multi_index", """\ + id | age | level +----+-----+------- +""") + + logs = self.logs(100) + self.assertEqual(logs.count(player_called_log), 1) + + # Insert data + self.spacetime( + "sql", + self.database_identity, + """\ +INSERT INTO player_info (id, age, level) VALUES (1, 25, 7); +""", + ) + + # Should invoke view as data is inserted + self.assertSql("SELECT * FROM player_info_multi_index", """\ + id | age | level +----+-----+------- + 1 | 25 | 7 +""") + logs = self.logs(100) + self.assertEqual(logs.count(player_called_log), 2) + + + # Inserting a row that does not match should not trigger re-evaluation + self.spacetime( + "sql", + self.database_identity, + """\ +INSERT INTO player_info (id, age, level) VALUES (2, 30, 8); +""", + ) + + logs = self.logs(100) + self.assertEqual(logs.count(player_called_log), 2) + + # Updating the row that the view depends on should trigger re-evaluation + self.spacetime( + "sql", + self.database_identity, + """ +UPDATE player_info SET age = 26 WHERE id = 1; +""", + ) + logs = self.logs(100) + self.assertEqual(logs.count(player_called_log), 3) + self.assertSql("SELECT * FROM player_info_multi_index", """\ + id | age | level +----+-----+------- +""") + + def test_query_anonymous_view_reducer(self): """Tests that anonymous views are updated for reducers""" self.call("add_player_level", 0, 1) From 0495320ac7a837061cb5046d3616b5d432f17006 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Fri, 21 Nov 2025 16:10:39 +0530 Subject: [PATCH 6/8] remove view_name from CallViewInfo --- crates/core/src/host/module_host.rs | 10 ++++++++-- crates/core/src/host/wasm_common/module_host_actor.rs | 10 +++++----- crates/datastore/src/locking_tx_datastore/mut_tx.rs | 4 ++-- crates/schema/src/def.rs | 8 ++++++++ 4 files changed, 23 insertions(+), 9 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 54dbaeaf095..202bf2f10e0 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -1684,12 +1684,18 @@ impl ModuleHost { for ViewCallInfo { view_id, table_id, - view_name, + fn_ptr, sender, } in out.tx.view_for_update().cloned().collect::>() { + let view_def = self + .info + .module_def + .get_view_by_id(fn_ptr, sender.is_none()) + .ok_or(ViewCallError::NoSuchView)?; + let result = self - .call_view(out.tx, &view_name, view_id, table_id, Nullary, caller, sender) + .call_view(out.tx, &view_def.name, view_id, table_id, Nullary, caller, sender) .await?; // Increment execution stats diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 2108fc782b4..90c7e28474d 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -1003,11 +1003,11 @@ impl InstanceCommon { .cloned() .map(|info| { let view_def = module_def - .view(&*info.view_name) - .unwrap_or_else(|| panic!("view `{}` not found", info.view_name)); + .get_view_by_id(info.fn_ptr, info.sender.is_none()) + .unwrap_or_else(|| panic!("view with fn_ptr `{}` not found", info.fn_ptr)); CallViewParams { - view_name: info.view_name, + view_name: view_def.name.clone().into(), view_id: info.view_id, table_id: info.table_id, fn_ptr: view_def.fn_ptr, @@ -1224,7 +1224,7 @@ impl InstanceOp for ViewOp<'_> { FuncCallType::View(ViewCallInfo { view_id: self.view_id, table_id: self.table_id, - view_name: self.name.to_owned().into_boxed_str(), + fn_ptr: self.fn_ptr, sender: Some(*self.sender), }) } @@ -1254,7 +1254,7 @@ impl InstanceOp for AnonymousViewOp<'_> { FuncCallType::View(ViewCallInfo { view_id: self.view_id, table_id: self.table_id, - view_name: self.name.to_owned().into_boxed_str(), + fn_ptr: self.fn_ptr, sender: None, }) } diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index 86d086588ec..29947b0d4a9 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -42,7 +42,7 @@ use spacetimedb_lib::{ ConnectionId, Identity, }; use spacetimedb_primitives::{ - col_list, ArgId, ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId, ViewId, + col_list, ArgId, ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId, ViewFnPtr, ViewId, }; use spacetimedb_sats::{ bsatn::{self, to_writer, DecodeError, Deserializer}, @@ -76,7 +76,7 @@ type DecodeResult = core::result::Result; pub struct ViewCallInfo { pub view_id: ViewId, pub table_id: TableId, - pub view_name: Box, + pub fn_ptr: ViewFnPtr, pub sender: Option, } diff --git a/crates/schema/src/def.rs b/crates/schema/src/def.rs index 6efb552c864..10c18b36691 100644 --- a/crates/schema/src/def.rs +++ b/crates/schema/src/def.rs @@ -276,6 +276,14 @@ impl ModuleDef { self.reducers.get_index(id.idx()).map(|(_, def)| def) } + /// Look up a view by its id, and whether it is anonymous. + pub fn get_view_by_id(&self, id: ViewFnPtr, is_anonymous: bool) -> Option<&ViewDef> { + self.views + .iter() + .find(|(_, def)| def.fn_ptr == id && def.is_anonymous == is_anonymous) + .map(|(_, def)| def) + } + /// Convenience method to look up a procedure, possibly by a string, returning its id as well. pub fn procedure_full>( &self, From ce6db335f2aceb0333bb48cf320864b18b769235 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Fri, 21 Nov 2025 18:14:22 +0530 Subject: [PATCH 7/8] refactoring --- .../src/locking_tx_datastore/mut_tx.rs | 145 +++++++++--------- 1 file changed, 69 insertions(+), 76 deletions(-) diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index 29947b0d4a9..3c31964a8cd 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -84,7 +84,6 @@ pub struct ViewCallInfo { #[derive(Default)] pub struct ViewReadSets { tables: IntMap, - indexes: IntMap, } impl MemoryUsage for ViewReadSets { @@ -103,8 +102,8 @@ impl ViewReadSets { } /// Record that a view performs a full scan of this table - fn insert_table_scan(&mut self, table_id: TableId, call: ViewCallInfo) { - self.tables.entry(table_id).or_default().insert_scan(call); + pub fn insert_full_table_scan(&mut self, table_id: TableId, call: ViewCallInfo) { + self.tables.entry(table_id).or_default().insert_table_scan(call); } /// Removes keys for `view_id` from the read set @@ -113,9 +112,6 @@ impl ViewReadSets { readset.remove_view(view_id, sender); !readset.is_empty() }); - - // index cleanup - self.remove_view_from_indexes(view_id, sender); } /// Merge or union read sets together @@ -123,15 +119,12 @@ impl ViewReadSets { for (table_id, rs) in readset.tables { self.tables.entry(table_id).or_default().merge(rs); } - - self.merge_index_reads(readset.indexes); } /// Record that a view reads a specific index key pub fn insert_index_scan(&mut self, table_id: TableId, cols: ColList, proj: AlgebraicValue, call: ViewCallInfo) { - let col_map = self.indexes.entry(table_id).or_default(); - let key_map = col_map.entry(cols).or_default(); - key_map.entry(proj).or_default().insert(call); + let table_readset = self.tables.entry(table_id).or_default(); + table_readset.insert_index_scan(cols, proj, call); } /// Returns the views that index seek on the given row pointer @@ -140,45 +133,10 @@ impl ViewReadSets { table_id: &TableId, row_ptr: RowRef<'a>, ) -> impl Iterator { - self.indexes.get(table_id).into_iter().flat_map(move |table_sets| { - table_sets.iter().flat_map(move |(cols, av_sets)| { - row_ptr - .project(cols) - .ok() - .and_then(|k| av_sets.get(&k)) - .into_iter() - .flat_map(|views| views.iter()) - }) - }) - } - - // Remove all references to a view from the index read sets - fn remove_view_from_indexes(&mut self, view_id: ViewId, sender: Option) { - self.indexes.retain(|_, col_map| { - col_map.retain(|_, key_map| { - key_map.retain(|_, views| { - views.retain(|call| { - !(call.view_id == view_id && sender.as_ref().is_none_or(|s| call.sender.as_ref() == Some(s))) - }); - !views.is_empty() - }); - !key_map.is_empty() - }); - !col_map.is_empty() - }); - } - - /// Merge (union) another index reads into this one - fn merge_index_reads(&mut self, other: IntMap) { - for (table_id, other_col_map) in other { - let col_map = self.indexes.entry(table_id).or_default(); - for (cols, other_key_map) in other_col_map { - let key_map = col_map.entry(cols).or_default(); - for (key, other_views) in other_key_map { - key_map.entry(key).or_default().extend(other_views); - } - } - } + self.tables + .get(table_id) + .into_iter() + .flat_map(move |ts| ts.views_for_index_seek(row_ptr)) } } @@ -189,11 +147,12 @@ type IndexColReadSet = HashMap; #[derive(Default)] struct TableReadSet { table_scans: HashSet, + index_reads: IndexColReadSet, } impl TableReadSet { /// Record that this view performs a full scan of this read set's table - fn insert_scan(&mut self, call: ViewCallInfo) { + fn insert_table_scan(&mut self, call: ViewCallInfo) { self.table_scans.insert(call); } @@ -202,24 +161,60 @@ impl TableReadSet { self.table_scans.iter() } - /// Is this read set empty? + /// Record that a view reads a specific index key for this table + fn insert_index_scan(&mut self, cols: ColList, key: AlgebraicValue, call: ViewCallInfo) { + let key_map = self.index_reads.entry(cols).or_default(); + key_map.entry(key).or_default().insert(call); + } + + /// Returns the views that index seek on the given row pointer (for this table) + fn views_for_index_seek<'a>(&'a self, row_ptr: RowRef<'a>) -> impl Iterator { + self.index_reads.iter().flat_map(move |(cols, av_set)| { + row_ptr + .project(cols) + .ok() + .and_then(|av| av_set.get(&av)) + .into_iter() + .flat_map(|views| views.iter()) + }) + } + + /// Is this read set empty? (no table scans and no index reads) fn is_empty(&self) -> bool { - self.table_scans.is_empty() + self.table_scans.is_empty() && self.index_reads.is_empty() } /// Removes keys for `view_id` from the read set, optionally filtering by `sender` fn remove_view(&mut self, view_id: ViewId, sender: Option) { - if let Some(identity) = sender { - self.table_scans - .retain(|call| !(call.view_id == view_id && call.sender.as_ref() == Some(&identity))); - } else { - self.table_scans.retain(|call| call.view_id != view_id); - } + let matches_call = |call: &ViewCallInfo| { + call.view_id == view_id && sender.as_ref().is_none_or(|s| call.sender.as_ref() == Some(s)) + }; + + // Remove from table_scans + self.table_scans.retain(|call| !matches_call(call)); + + // Remove from index_reads + self.index_reads.retain(|_cols, key_map| { + key_map.retain(|_key, views| { + views.retain(|call| !matches_call(call)); + !views.is_empty() + }); + !key_map.is_empty() + }); } - /// Merge or union two read sets for this table - fn merge(&mut self, readset: TableReadSet) { - self.table_scans.extend(readset.table_scans); + /// Merge (union) another table read set into this one + fn merge(&mut self, other: TableReadSet) { + // merge table scans + self.table_scans.extend(other.table_scans); + + // merge index reads: for each cols, for each av, extend the view set + for (cols, other_av_set) in other.index_reads { + let av_set = self.index_reads.entry(cols).or_default(); + for (av, other_views) in other_av_set { + av_set.entry(av).or_default().extend(other_views); + } + } } } @@ -249,13 +244,13 @@ pub struct MutTxId { pub metrics: ExecutionMetrics, } -static_assert_size!(MutTxId, 464); +static_assert_size!(MutTxId, 432); impl MutTxId { /// Record that a view performs a table scan in this transaction's read set pub fn record_table_scan(&mut self, op: &FuncCallType, table_id: TableId) { if let FuncCallType::View(view) = op { - self.read_sets.insert_table_scan(table_id, view.clone()); + self.read_sets.insert_full_table_scan(table_id, view.clone()); } } @@ -288,7 +283,7 @@ impl MutTxId { } // Everything else is treated as a table scan - self.read_sets.insert_table_scan(table_id, view.clone()); + self.read_sets.insert_full_table_scan(table_id, view.clone()); } /// Returns the views whose read sets overlaps with this transaction's write set @@ -303,23 +298,25 @@ impl MutTxId { .collect::>(); // Include views that perform precise index seeks. + let mut process_views = |table_id: &_, row_ref| { + for view_call in self.committed_state_write_lock.views_for_index_seek(table_id, row_ref) { + res.insert(view_call); + } + }; + for (table_id, deleted_table) in &self.tx_state.delete_tables { let (table, blob_store, _) = self .committed_state_write_lock .get_table_and_blob_store(*table_id) .expect("table must exist in committed state for deleted table"); - // Skip tables without indexes. if table.indexes.is_empty() { continue; } for ptr in deleted_table.iter() { - let Some(row_ref) = table.get_row_ref(blob_store, ptr) else { - continue; - }; - for view_call in self.committed_state_write_lock.views_for_index_seek(table_id, row_ref) { - res.insert(view_call); + if let Some(row_ref) = table.get_row_ref(blob_store, ptr) { + process_views(table_id, row_ref); } } } @@ -330,18 +327,14 @@ impl MutTxId { .get_table_and_blob_store(*table_id) .expect("table must exist in committed state for inserted table"); - // Skip tables without indexes. if table.indexes.is_empty() { continue; } for row_ref in inserted_table.scan_rows(blob_store) { - for view_call in self.committed_state_write_lock.views_for_index_seek(table_id, row_ref) { - res.insert(view_call); - } + process_views(table_id, row_ref); } } - res.into_iter() } /// Removes keys for `view_id` from the committed read set. From 9d25ff298696a8320ea33b0d0d1a99ce981c8ad5 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Sat, 22 Nov 2025 12:38:14 +0530 Subject: [PATCH 8/8] smoketest comments address --- smoketests/tests/views.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/smoketests/tests/views.py b/smoketests/tests/views.py index 439a9ac8771..2b5a9de2e34 100644 --- a/smoketests/tests/views.py +++ b/smoketests/tests/views.py @@ -254,15 +254,6 @@ def test_a_view_materialization(self): """, ) - # Call the view again - self.spacetime( - "sql", - self.database_identity, - """ -Select * FROM player_state WHERE id = 42; -""", - ) - # On fourth call, after updating the dependent row, the view is re-evaluated logs = self.logs(100) self.assertEqual(logs.count(player_called_log), 3) @@ -315,7 +306,7 @@ def test_view_multi_index_materialization(self): "sql", self.database_identity, """\ -INSERT INTO player_info (id, age, level) VALUES (2, 30, 8); +INSERT INTO player_info (id, age, level) VALUES (2, 25, 8); """, )