Skip to content

Commit c0118a2

Browse files
committed
index readsets
1 parent 6b43769 commit c0118a2

File tree

3 files changed

+153
-22
lines changed

3 files changed

+153
-22
lines changed

crates/datastore/src/locking_tx_datastore/committed_state.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,15 @@ impl CommittedState {
9393
pub(super) fn views_for_table_scan(&self, table_id: &TableId) -> impl Iterator<Item = &ViewCallInfo> {
9494
self.read_sets.views_for_table_scan(table_id)
9595
}
96+
97+
/// Returns the views that perform an precise index seek on given `row_ref` of `table_id`
98+
pub fn views_for_index_seek<'a>(
99+
&'a self,
100+
table_id: &TableId,
101+
row_ref: RowRef<'a>,
102+
) -> impl Iterator<Item = &'a ViewCallInfo> {
103+
self.read_sets.views_for_index_seek(table_id, row_ref)
104+
}
96105
}
97106

98107
impl MemoryUsage for CommittedState {

crates/datastore/src/locking_tx_datastore/mut_tx.rs

Lines changed: 120 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,10 @@ use spacetimedb_schema::{
5656
schema::{ColumnSchema, ConstraintSchema, IndexSchema, RowLevelSecuritySchema, SequenceSchema, TableSchema},
5757
};
5858
use spacetimedb_table::{
59-
blob_store::BlobStore,
60-
indexes::{RowPointer, SquashedOffset},
61-
static_assert_size,
62-
table::{
59+
blob_store::BlobStore, indexes::{RowPointer, SquashedOffset}, static_assert_size, table::{
6360
BlobNumBytes, DuplicateError, IndexScanRangeIter, InsertError, RowRef, Table, TableAndIndex,
6461
UniqueConstraintViolation,
65-
},
66-
table_index::TableIndex,
62+
}, table_index::TableIndex
6763
};
6864
use std::{
6965
sync::Arc,
@@ -84,6 +80,7 @@ pub struct ViewCallInfo {
8480
#[derive(Default)]
8581
pub struct ViewReadSets {
8682
tables: IntMap<TableId, TableReadSet>,
83+
indexes: IntMap<TableId, IndexColReadSet>,
8784
}
8885

8986
impl MemoryUsage for ViewReadSets {
@@ -102,7 +99,7 @@ impl ViewReadSets {
10299
}
103100

104101
/// Record that a view performs a full scan of this table
105-
fn insert_scan(&mut self, table_id: TableId, call: ViewCallInfo) {
102+
fn insert_table_scan(&mut self, table_id: TableId, call: ViewCallInfo) {
106103
self.tables.entry(table_id).or_default().insert_scan(call);
107104
}
108105

@@ -112,16 +109,78 @@ impl ViewReadSets {
112109
readset.remove_view(view_id, sender);
113110
!readset.is_empty()
114111
});
112+
113+
// index cleanup
114+
self.remove_view_from_indexes(view_id, sender);
115115
}
116116

117117
/// Merge or union read sets together
118118
pub fn merge(&mut self, readset: Self) {
119119
for (table_id, rs) in readset.tables {
120120
self.tables.entry(table_id).or_default().merge(rs);
121121
}
122+
123+
self.merge_index_reads(readset.indexes);
124+
}
125+
126+
/// Record that a view reads a specific index key
127+
pub fn insert_index_scan(&mut self, table_id: TableId, cols: ColList, proj: AlgebraicValue, call: ViewCallInfo) {
128+
let col_map = self.indexes.entry(table_id).or_default();
129+
let key_map = col_map.entry(cols).or_default();
130+
key_map.entry(proj).or_default().insert(call);
131+
}
132+
133+
/// Returns the views that index seek on the given row pointer
134+
pub fn views_for_index_seek<'a>(
135+
&'a self,
136+
table_id: &TableId,
137+
row_ptr: RowRef<'a>,
138+
) -> impl Iterator<Item = &'a ViewCallInfo> {
139+
self.indexes.get(table_id).into_iter().flat_map(move |table_sets| {
140+
table_sets.iter().flat_map(move |(cols, av_sets)| {
141+
row_ptr
142+
.project(cols)
143+
.ok()
144+
.and_then(|k| av_sets.get(&k))
145+
.into_iter()
146+
.flat_map(|views| views.iter())
147+
})
148+
})
149+
}
150+
151+
// Remove all references to a view from the index read sets
152+
fn remove_view_from_indexes(&mut self, view_id: ViewId, sender: Option<Identity>) {
153+
self.indexes.retain(|_, col_map| {
154+
col_map.retain(|_, key_map| {
155+
key_map.retain(|_, views| {
156+
views.retain(|call| {
157+
!(call.view_id == view_id && sender.as_ref().is_none_or(|s| call.sender.as_ref() == Some(s)))
158+
});
159+
!views.is_empty()
160+
});
161+
!key_map.is_empty()
162+
});
163+
!col_map.is_empty()
164+
});
165+
}
166+
167+
/// Merge (union) another index reads into this one
168+
fn merge_index_reads(&mut self, other: IntMap<TableId, IndexColReadSet>) {
169+
for (table_id, other_col_map) in other {
170+
let col_map = self.indexes.entry(table_id).or_default();
171+
for (cols, other_key_map) in other_col_map {
172+
let key_map = col_map.entry(cols).or_default();
173+
for (key, other_views) in other_key_map {
174+
key_map.entry(key).or_default().extend(other_views);
175+
}
176+
}
177+
}
122178
}
123179
}
124180

181+
type IndexKeyReadSet = HashMap<AlgebraicValue, HashSet<ViewCallInfo>>;
182+
type IndexColReadSet = HashMap<ColList, IndexKeyReadSet>;
183+
125184
/// A table-level read set for views
126185
#[derive(Default)]
127186
struct TableReadSet {
@@ -192,7 +251,7 @@ impl MutTxId {
192251
/// Record that a view performs a table scan in this transaction's read set
193252
pub fn record_table_scan(&mut self, op: &FuncCallType, table_id: TableId) {
194253
if let FuncCallType::View(view) = op {
195-
self.read_sets.insert_scan(table_id, view.clone());
254+
self.read_sets.insert_table_scan(table_id, view.clone());
196255
}
197256
}
198257

@@ -201,28 +260,69 @@ impl MutTxId {
201260
&mut self,
202261
op: &FuncCallType,
203262
table_id: TableId,
204-
_: IndexId,
205-
_: Bound<AlgebraicValue>,
206-
_: Bound<AlgebraicValue>,
263+
index_id: IndexId,
264+
lower: Bound<AlgebraicValue>,
265+
upper: Bound<AlgebraicValue>,
207266
) {
208-
// TODO: Implement read set tracking for index scans
209-
if let FuncCallType::View(view) = op {
210-
self.read_sets.insert_scan(table_id, view.clone());
267+
let FuncCallType::View(view) = op else {
268+
return;
269+
};
270+
271+
// Check for precise index seek
272+
if let (Bound::Included(low_val), Bound::Included(up_val)) = (&lower, &upper) {
273+
if low_val == up_val {
274+
// Fetch index metadata
275+
let Some((_, idx, _)) = self.get_table_and_index(index_id) else {
276+
return;
277+
};
278+
279+
let cols = idx.index().indexed_columns.clone();
280+
self.read_sets
281+
.insert_index_scan(table_id, cols, low_val.clone(), view.clone());
282+
return;
283+
}
211284
}
285+
286+
// Everything else is treated as a table scan
287+
self.read_sets.insert_table_scan(table_id, view.clone());
212288
}
213289

214-
/// Returns the views whose read sets overlaps with this transaction's write set
215-
pub fn view_for_update(&self) -> impl Iterator<Item = &ViewCallInfo> {
216-
self.tx_state
290+
pub fn view_for_update(&self) -> impl Iterator<Item = &ViewCallInfo> + '_ {
291+
let mut res = self
292+
.tx_state
217293
.insert_tables
218294
.keys()
219295
.filter(|table_id| !self.tx_state.delete_tables.contains_key(table_id))
220296
.chain(self.tx_state.delete_tables.keys())
221297
.flat_map(|table_id| self.committed_state_write_lock.views_for_table_scan(table_id))
222-
.collect::<HashSet<_>>()
223-
.into_iter()
224-
}
298+
.collect::<HashSet<_>>();
299+
300+
// Include views that perform precise index seeks.
301+
// It is sufficient to only consider deleted tables,
302+
// as deleted rows will cover all modification to existing rows in the committed state.
303+
for (table_id, deleted_table) in &self.tx_state.delete_tables {
304+
let (table, blob_store, _) = self
305+
.committed_state_write_lock
306+
.get_table_and_blob_store(*table_id)
307+
.expect("table must exist in committed state for deleted table");
308+
309+
// Skip tables without indexes.
310+
if table.indexes.is_empty() {
311+
continue;
312+
}
225313

314+
for ptr in deleted_table.iter() {
315+
let Some(row_ref) = table.get_row_ref(blob_store, ptr) else {
316+
continue;
317+
};
318+
for view_call in self.committed_state_write_lock.views_for_index_seek(table_id, row_ref) {
319+
res.insert(view_call);
320+
}
321+
}
322+
}
323+
324+
res.into_iter()
325+
}
226326
/// Removes keys for `view_id` from the committed read set.
227327
/// Used for dropping views in an auto-migration.
228328
pub fn drop_view_from_committed_read_set(&mut self, view_id: ViewId) {

smoketests/tests/views.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ def test_a_view_materialization(self):
204204
logs = self.logs(100)
205205
self.assertEqual(logs.count(player_called_log), 1)
206206

207-
# insert to cause cache invalidation
207+
# inserting new row should not trigger view invocation due to readsets
208208
self.spacetime(
209209
"sql",
210210
self.database_identity,
@@ -214,7 +214,29 @@ def test_a_view_materialization(self):
214214
)
215215

216216
self.call_player_view()
217-
#On third call, after invalidation, the view is evaluated again
217+
#On third call, after inserting a row, the view is still cached
218+
logs = self.logs(100)
219+
self.assertEqual(logs.count(player_called_log), 1)
220+
221+
# Updating the row that the view depends on should trigger re-evaluation
222+
self.spacetime(
223+
"sql",
224+
self.database_identity,
225+
"""
226+
UPDATE player_state SET level = 9 WHERE id = 42;
227+
""",
228+
)
229+
230+
# Call the view again
231+
self.spacetime(
232+
"sql",
233+
self.database_identity,
234+
"""
235+
Select * FROM player_state WHERE id = 42;
236+
""",
237+
)
238+
239+
# On fourth call, after updating the dependent row, the view is re-evaluated
218240
logs = self.logs(100)
219241
self.assertEqual(logs.count(player_called_log), 2)
220242

0 commit comments

Comments
 (0)