Skip to content

Commit cc0eaff

Browse files
committed
index readsets
1 parent 6b43769 commit cc0eaff

File tree

4 files changed

+154
-18
lines changed

4 files changed

+154
-18
lines changed

crates/datastore/src/locking_tx_datastore/committed_state.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,15 @@ impl CommittedState {
9393
pub(super) fn views_for_table_scan(&self, table_id: &TableId) -> impl Iterator<Item = &ViewCallInfo> {
9494
self.read_sets.views_for_table_scan(table_id)
9595
}
96+
97+
/// Returns the views that perform an precise index seek on given `row_ref` of `table_id`
98+
pub fn views_for_index_seek<'a>(
99+
&'a self,
100+
table_id: &TableId,
101+
row_ref: RowRef<'a>,
102+
) -> impl Iterator<Item = &'a ViewCallInfo> {
103+
self.read_sets.views_for_index_seek(table_id, row_ref)
104+
}
96105
}
97106

98107
impl MemoryUsage for CommittedState {

crates/datastore/src/locking_tx_datastore/mut_tx.rs

Lines changed: 119 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ use spacetimedb_schema::{
5858
use spacetimedb_table::{
5959
blob_store::BlobStore,
6060
indexes::{RowPointer, SquashedOffset},
61-
static_assert_size,
6261
table::{
6362
BlobNumBytes, DuplicateError, IndexScanRangeIter, InsertError, RowRef, Table, TableAndIndex,
6463
UniqueConstraintViolation,
@@ -84,6 +83,7 @@ pub struct ViewCallInfo {
8483
#[derive(Default)]
8584
pub struct ViewReadSets {
8685
tables: IntMap<TableId, TableReadSet>,
86+
indexes: IntMap<TableId, IndexColReadSet>,
8787
}
8888

8989
impl MemoryUsage for ViewReadSets {
@@ -102,7 +102,7 @@ impl ViewReadSets {
102102
}
103103

104104
/// Record that a view performs a full scan of this table
105-
fn insert_scan(&mut self, table_id: TableId, call: ViewCallInfo) {
105+
fn insert_table_scan(&mut self, table_id: TableId, call: ViewCallInfo) {
106106
self.tables.entry(table_id).or_default().insert_scan(call);
107107
}
108108

@@ -112,16 +112,78 @@ impl ViewReadSets {
112112
readset.remove_view(view_id, sender);
113113
!readset.is_empty()
114114
});
115+
116+
// index cleanup
117+
self.remove_view_from_indexes(view_id, sender);
115118
}
116119

117120
/// Merge or union read sets together
118121
pub fn merge(&mut self, readset: Self) {
119122
for (table_id, rs) in readset.tables {
120123
self.tables.entry(table_id).or_default().merge(rs);
121124
}
125+
126+
self.merge_index_reads(readset.indexes);
127+
}
128+
129+
/// Record that a view reads a specific index key
130+
pub fn insert_index_scan(&mut self, table_id: TableId, cols: ColList, proj: AlgebraicValue, call: ViewCallInfo) {
131+
let col_map = self.indexes.entry(table_id).or_default();
132+
let key_map = col_map.entry(cols).or_default();
133+
key_map.entry(proj).or_default().insert(call);
134+
}
135+
136+
/// Returns the views that index seek on the given row pointer
137+
pub fn views_for_index_seek<'a>(
138+
&'a self,
139+
table_id: &TableId,
140+
row_ptr: RowRef<'a>,
141+
) -> impl Iterator<Item = &'a ViewCallInfo> {
142+
self.indexes.get(table_id).into_iter().flat_map(move |table_sets| {
143+
table_sets.iter().flat_map(move |(cols, av_sets)| {
144+
row_ptr
145+
.project(cols)
146+
.ok()
147+
.and_then(|k| av_sets.get(&k))
148+
.into_iter()
149+
.flat_map(|views| views.iter())
150+
})
151+
})
152+
}
153+
154+
// Remove all references to a view from the index read sets
155+
fn remove_view_from_indexes(&mut self, view_id: ViewId, sender: Option<Identity>) {
156+
self.indexes.retain(|_, col_map| {
157+
col_map.retain(|_, key_map| {
158+
key_map.retain(|_, views| {
159+
views.retain(|call| {
160+
!(call.view_id == view_id && sender.as_ref().is_none_or(|s| call.sender.as_ref() == Some(s)))
161+
});
162+
!views.is_empty()
163+
});
164+
!key_map.is_empty()
165+
});
166+
!col_map.is_empty()
167+
});
168+
}
169+
170+
/// Merge (union) another index reads into this one
171+
fn merge_index_reads(&mut self, other: IntMap<TableId, IndexColReadSet>) {
172+
for (table_id, other_col_map) in other {
173+
let col_map = self.indexes.entry(table_id).or_default();
174+
for (cols, other_key_map) in other_col_map {
175+
let key_map = col_map.entry(cols).or_default();
176+
for (key, other_views) in other_key_map {
177+
key_map.entry(key).or_default().extend(other_views);
178+
}
179+
}
180+
}
122181
}
123182
}
124183

184+
type IndexKeyReadSet = HashMap<AlgebraicValue, HashSet<ViewCallInfo>>;
185+
type IndexColReadSet = HashMap<ColList, IndexKeyReadSet>;
186+
125187
/// A table-level read set for views
126188
#[derive(Default)]
127189
struct TableReadSet {
@@ -186,13 +248,13 @@ pub struct MutTxId {
186248
pub metrics: ExecutionMetrics,
187249
}
188250

189-
static_assert_size!(MutTxId, 432);
251+
//static_assert_size!(MutTxId, 432);
190252

191253
impl MutTxId {
192254
/// Record that a view performs a table scan in this transaction's read set
193255
pub fn record_table_scan(&mut self, op: &FuncCallType, table_id: TableId) {
194256
if let FuncCallType::View(view) = op {
195-
self.read_sets.insert_scan(table_id, view.clone());
257+
self.read_sets.insert_table_scan(table_id, view.clone());
196258
}
197259
}
198260

@@ -201,28 +263,69 @@ impl MutTxId {
201263
&mut self,
202264
op: &FuncCallType,
203265
table_id: TableId,
204-
_: IndexId,
205-
_: Bound<AlgebraicValue>,
206-
_: Bound<AlgebraicValue>,
266+
index_id: IndexId,
267+
lower: Bound<AlgebraicValue>,
268+
upper: Bound<AlgebraicValue>,
207269
) {
208-
// TODO: Implement read set tracking for index scans
209-
if let FuncCallType::View(view) = op {
210-
self.read_sets.insert_scan(table_id, view.clone());
270+
let FuncCallType::View(view) = op else {
271+
return;
272+
};
273+
274+
// If lower and upper are equal (point lookup), attempt precise tracking
275+
if let (Bound::Included(low_val), Bound::Included(up_val)) = (&lower, &upper) {
276+
if low_val == up_val {
277+
// Fetch index metadata
278+
let Some((_, idx, _)) = self.get_table_and_index(index_id) else {
279+
return;
280+
};
281+
282+
let cols = idx.index().indexed_columns.clone();
283+
self.read_sets
284+
.insert_index_scan(table_id, cols, low_val.clone(), view.clone());
285+
return;
286+
}
211287
}
288+
289+
// Everything else is treated as a table scan
290+
self.read_sets.insert_table_scan(table_id, view.clone());
212291
}
213292

214-
/// Returns the views whose read sets overlaps with this transaction's write set
215-
pub fn view_for_update(&self) -> impl Iterator<Item = &ViewCallInfo> {
216-
self.tx_state
293+
pub fn view_for_update(&self) -> impl Iterator<Item = &ViewCallInfo> + '_ {
294+
let mut res = self
295+
.tx_state
217296
.insert_tables
218297
.keys()
219298
.filter(|table_id| !self.tx_state.delete_tables.contains_key(table_id))
220299
.chain(self.tx_state.delete_tables.keys())
221300
.flat_map(|table_id| self.committed_state_write_lock.views_for_table_scan(table_id))
222-
.collect::<HashSet<_>>()
223-
.into_iter()
224-
}
301+
.collect::<HashSet<_>>();
302+
303+
// Include views that perform precise index seeks.
304+
// It is sufficient to only consider deleted tables,
305+
// as deleted rows will cover all modification to existing rows in the committed state.
306+
for (table_id, deleted_table) in &self.tx_state.delete_tables {
307+
let (table, blob_store, _) = self
308+
.committed_state_write_lock
309+
.get_table_and_blob_store(*table_id)
310+
.expect("table must exist in committed state for deleted table");
311+
312+
// Skip tables without indexes.
313+
if table.indexes.is_empty() {
314+
continue;
315+
}
225316

317+
for ptr in deleted_table.iter() {
318+
let Some(row_ref) = table.get_row_ref(blob_store, ptr) else {
319+
continue;
320+
};
321+
for view_call in self.committed_state_write_lock.views_for_index_seek(table_id, row_ref) {
322+
res.insert(view_call);
323+
}
324+
}
325+
}
326+
327+
res.into_iter()
328+
}
226329
/// Removes keys for `view_id` from the committed read set.
227330
/// Used for dropping views in an auto-migration.
228331
pub fn drop_view_from_committed_read_set(&mut self, view_id: ViewId) {

modules/quickstart-chat/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ pub struct User {
1111
#[spacetimedb::table(name = message, public)]
1212
pub struct Message {
1313
sender: Identity,
14+
#[index(btree)]
1415
sent: Timestamp,
16+
#[index(btree)]
1517
text: String,
1618
}
1719

smoketests/tests/views.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ def test_a_view_materialization(self):
204204
logs = self.logs(100)
205205
self.assertEqual(logs.count(player_called_log), 1)
206206

207-
# insert to cause cache invalidation
207+
# inserting new row should not trigger view invocation due to readsets
208208
self.spacetime(
209209
"sql",
210210
self.database_identity,
@@ -214,7 +214,29 @@ def test_a_view_materialization(self):
214214
)
215215

216216
self.call_player_view()
217-
#On third call, after invalidation, the view is evaluated again
217+
#On third call, after inserting a row, the view is still cached
218+
logs = self.logs(100)
219+
self.assertEqual(logs.count(player_called_log), 1)
220+
221+
# Updating the row that the view depends on should trigger re-evaluation
222+
self.spacetime(
223+
"sql",
224+
self.database_identity,
225+
"""
226+
UPDATE player_state SET level = 9 WHERE id = 42;
227+
""",
228+
)
229+
230+
# Call the view again
231+
self.spacetime(
232+
"sql",
233+
self.database_identity,
234+
"""
235+
Select * FROM player_state WHERE id = 42;
236+
""",
237+
)
238+
239+
#On fourth call, after updating the dependent row, the view is re-evaluated
218240
logs = self.logs(100)
219241
self.assertEqual(logs.count(player_called_log), 2)
220242

0 commit comments

Comments
 (0)