Skip to content

Commit 582c846

Browse files
authored
Views: index readsets (#3706)
# Description of Changes Precise index readsets fixes #clockworklabs/SpacetimeDBPrivate#2118 # API and ABI breaking changes NA # Expected complexity level and risk 2.5 Potential to regress performance. # Testing Updated smoketests.
1 parent d913998 commit 582c846

File tree

6 files changed

+291
-43
lines changed

6 files changed

+291
-43
lines changed

crates/core/src/host/module_host.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1684,12 +1684,18 @@ impl ModuleHost {
16841684
for ViewCallInfo {
16851685
view_id,
16861686
table_id,
1687-
view_name,
1687+
fn_ptr,
16881688
sender,
16891689
} in out.tx.view_for_update().cloned().collect::<Vec<_>>()
16901690
{
1691+
let view_def = self
1692+
.info
1693+
.module_def
1694+
.get_view_by_id(fn_ptr, sender.is_none())
1695+
.ok_or(ViewCallError::NoSuchView)?;
1696+
16911697
let result = self
1692-
.call_view(out.tx, &view_name, view_id, table_id, Nullary, caller, sender)
1698+
.call_view(out.tx, &view_def.name, view_id, table_id, Nullary, caller, sender)
16931699
.await?;
16941700

16951701
// Increment execution stats

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1003,11 +1003,11 @@ impl InstanceCommon {
10031003
.cloned()
10041004
.map(|info| {
10051005
let view_def = module_def
1006-
.view(&*info.view_name)
1007-
.unwrap_or_else(|| panic!("view `{}` not found", info.view_name));
1006+
.get_view_by_id(info.fn_ptr, info.sender.is_none())
1007+
.unwrap_or_else(|| panic!("view with fn_ptr `{}` not found", info.fn_ptr));
10081008

10091009
CallViewParams {
1010-
view_name: info.view_name,
1010+
view_name: view_def.name.clone().into(),
10111011
view_id: info.view_id,
10121012
table_id: info.table_id,
10131013
fn_ptr: view_def.fn_ptr,
@@ -1224,7 +1224,7 @@ impl InstanceOp for ViewOp<'_> {
12241224
FuncCallType::View(ViewCallInfo {
12251225
view_id: self.view_id,
12261226
table_id: self.table_id,
1227-
view_name: self.name.to_owned().into_boxed_str(),
1227+
fn_ptr: self.fn_ptr,
12281228
sender: Some(*self.sender),
12291229
})
12301230
}
@@ -1254,7 +1254,7 @@ impl InstanceOp for AnonymousViewOp<'_> {
12541254
FuncCallType::View(ViewCallInfo {
12551255
view_id: self.view_id,
12561256
table_id: self.table_id,
1257-
view_name: self.name.to_owned().into_boxed_str(),
1257+
fn_ptr: self.fn_ptr,
12581258
sender: None,
12591259
})
12601260
}

crates/datastore/src/locking_tx_datastore/committed_state.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,15 @@ impl CommittedState {
9393
pub(super) fn views_for_table_scan(&self, table_id: &TableId) -> impl Iterator<Item = &ViewCallInfo> {
9494
self.read_sets.views_for_table_scan(table_id)
9595
}
96+
97+
/// Returns the views that perform an precise index seek on given `row_ref` of `table_id`
98+
pub fn views_for_index_seek<'a>(
99+
&'a self,
100+
table_id: &TableId,
101+
row_ref: RowRef<'a>,
102+
) -> impl Iterator<Item = &'a ViewCallInfo> {
103+
self.read_sets.views_for_index_seek(table_id, row_ref)
104+
}
96105
}
97106

98107
impl MemoryUsage for CommittedState {

crates/datastore/src/locking_tx_datastore/mut_tx.rs

Lines changed: 142 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use spacetimedb_lib::{
4242
ConnectionId, Identity,
4343
};
4444
use spacetimedb_primitives::{
45-
col_list, ArgId, ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId, ViewId,
45+
col_list, ArgId, ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId, ViewFnPtr, ViewId,
4646
};
4747
use spacetimedb_sats::{
4848
bsatn::{self, to_writer, DecodeError, Deserializer},
@@ -76,7 +76,7 @@ type DecodeResult<T> = core::result::Result<T, DecodeError>;
7676
pub struct ViewCallInfo {
7777
pub view_id: ViewId,
7878
pub table_id: TableId,
79-
pub view_name: Box<str>,
79+
pub fn_ptr: ViewFnPtr,
8080
pub sender: Option<Identity>,
8181
}
8282

@@ -102,8 +102,8 @@ impl ViewReadSets {
102102
}
103103

104104
/// Record that a view performs a full scan of this table
105-
fn insert_scan(&mut self, table_id: TableId, call: ViewCallInfo) {
106-
self.tables.entry(table_id).or_default().insert_scan(call);
105+
pub fn insert_full_table_scan(&mut self, table_id: TableId, call: ViewCallInfo) {
106+
self.tables.entry(table_id).or_default().insert_table_scan(call);
107107
}
108108

109109
/// Removes keys for `view_id` from the read set
@@ -120,17 +120,39 @@ impl ViewReadSets {
120120
self.tables.entry(table_id).or_default().merge(rs);
121121
}
122122
}
123+
124+
/// Record that a view reads a specific index key
125+
pub fn insert_index_scan(&mut self, table_id: TableId, cols: ColList, proj: AlgebraicValue, call: ViewCallInfo) {
126+
let table_readset = self.tables.entry(table_id).or_default();
127+
table_readset.insert_index_scan(cols, proj, call);
128+
}
129+
130+
/// Returns the views that index seek on the given row pointer
131+
pub fn views_for_index_seek<'a>(
132+
&'a self,
133+
table_id: &TableId,
134+
row_ptr: RowRef<'a>,
135+
) -> impl Iterator<Item = &'a ViewCallInfo> {
136+
self.tables
137+
.get(table_id)
138+
.into_iter()
139+
.flat_map(move |ts| ts.views_for_index_seek(row_ptr))
140+
}
123141
}
124142

143+
type IndexKeyReadSet = HashMap<AlgebraicValue, HashSet<ViewCallInfo>>;
144+
type IndexColReadSet = HashMap<ColList, IndexKeyReadSet>;
145+
125146
/// A table-level read set for views
126147
#[derive(Default)]
127148
struct TableReadSet {
128149
table_scans: HashSet<ViewCallInfo>,
150+
index_reads: IndexColReadSet,
129151
}
130152

131153
impl TableReadSet {
132154
/// Record that this view performs a full scan of this read set's table
133-
fn insert_scan(&mut self, call: ViewCallInfo) {
155+
fn insert_table_scan(&mut self, call: ViewCallInfo) {
134156
self.table_scans.insert(call);
135157
}
136158

@@ -139,24 +161,60 @@ impl TableReadSet {
139161
self.table_scans.iter()
140162
}
141163

142-
/// Is this read set empty?
164+
/// Record that a view reads a specific index key for this table
165+
fn insert_index_scan(&mut self, cols: ColList, key: AlgebraicValue, call: ViewCallInfo) {
166+
let key_map = self.index_reads.entry(cols).or_default();
167+
key_map.entry(key).or_default().insert(call);
168+
}
169+
170+
/// Returns the views that index seek on the given row pointer (for this table)
171+
fn views_for_index_seek<'a>(&'a self, row_ptr: RowRef<'a>) -> impl Iterator<Item = &'a ViewCallInfo> {
172+
self.index_reads.iter().flat_map(move |(cols, av_set)| {
173+
row_ptr
174+
.project(cols)
175+
.ok()
176+
.and_then(|av| av_set.get(&av))
177+
.into_iter()
178+
.flat_map(|views| views.iter())
179+
})
180+
}
181+
182+
/// Is this read set empty? (no table scans and no index reads)
143183
fn is_empty(&self) -> bool {
144-
self.table_scans.is_empty()
184+
self.table_scans.is_empty() && self.index_reads.is_empty()
145185
}
146186

147187
/// Removes keys for `view_id` from the read set, optionally filtering by `sender`
148188
fn remove_view(&mut self, view_id: ViewId, sender: Option<Identity>) {
149-
if let Some(identity) = sender {
150-
self.table_scans
151-
.retain(|call| !(call.view_id == view_id && call.sender.as_ref() == Some(&identity)));
152-
} else {
153-
self.table_scans.retain(|call| call.view_id != view_id);
154-
}
189+
let matches_call = |call: &ViewCallInfo| {
190+
call.view_id == view_id && sender.as_ref().is_none_or(|s| call.sender.as_ref() == Some(s))
191+
};
192+
193+
// Remove from table_scans
194+
self.table_scans.retain(|call| !matches_call(call));
195+
196+
// Remove from index_reads
197+
self.index_reads.retain(|_cols, key_map| {
198+
key_map.retain(|_key, views| {
199+
views.retain(|call| !matches_call(call));
200+
!views.is_empty()
201+
});
202+
!key_map.is_empty()
203+
});
155204
}
156205

157-
/// Merge or union two read sets for this table
158-
fn merge(&mut self, readset: TableReadSet) {
159-
self.table_scans.extend(readset.table_scans);
206+
/// Merge (union) another table read set into this one
207+
fn merge(&mut self, other: TableReadSet) {
208+
// merge table scans
209+
self.table_scans.extend(other.table_scans);
210+
211+
// merge index reads: for each cols, for each av, extend the view set
212+
for (cols, other_av_set) in other.index_reads {
213+
let av_set = self.index_reads.entry(cols).or_default();
214+
for (av, other_views) in other_av_set {
215+
av_set.entry(av).or_default().extend(other_views);
216+
}
217+
}
160218
}
161219
}
162220

@@ -192,7 +250,7 @@ impl MutTxId {
192250
/// Record that a view performs a table scan in this transaction's read set
193251
pub fn record_table_scan(&mut self, op: &FuncCallType, table_id: TableId) {
194252
if let FuncCallType::View(view) = op {
195-
self.read_sets.insert_scan(table_id, view.clone());
253+
self.read_sets.insert_full_table_scan(table_id, view.clone());
196254
}
197255
}
198256

@@ -201,28 +259,84 @@ impl MutTxId {
201259
&mut self,
202260
op: &FuncCallType,
203261
table_id: TableId,
204-
_: IndexId,
205-
_: Bound<AlgebraicValue>,
206-
_: Bound<AlgebraicValue>,
262+
index_id: IndexId,
263+
lower: Bound<AlgebraicValue>,
264+
upper: Bound<AlgebraicValue>,
207265
) {
208-
// TODO: Implement read set tracking for index scans
209-
if let FuncCallType::View(view) = op {
210-
self.read_sets.insert_scan(table_id, view.clone());
266+
let FuncCallType::View(view) = op else {
267+
return;
268+
};
269+
270+
// Check for precise index seek
271+
if let (Bound::Included(low_val), Bound::Included(up_val)) = (&lower, &upper) {
272+
if low_val == up_val {
273+
// Fetch index metadata
274+
let Some((_, idx, _)) = self.get_table_and_index(index_id) else {
275+
return;
276+
};
277+
278+
let cols = idx.index().indexed_columns.clone();
279+
self.read_sets
280+
.insert_index_scan(table_id, cols, low_val.clone(), view.clone());
281+
return;
282+
}
211283
}
284+
285+
// Everything else is treated as a table scan
286+
self.read_sets.insert_full_table_scan(table_id, view.clone());
212287
}
213288

214289
/// Returns the views whose read sets overlaps with this transaction's write set
215-
pub fn view_for_update(&self) -> impl Iterator<Item = &ViewCallInfo> {
216-
self.tx_state
290+
pub fn view_for_update(&self) -> impl Iterator<Item = &ViewCallInfo> + '_ {
291+
let mut res = self
292+
.tx_state
217293
.insert_tables
218294
.keys()
219295
.filter(|table_id| !self.tx_state.delete_tables.contains_key(table_id))
220296
.chain(self.tx_state.delete_tables.keys())
221297
.flat_map(|table_id| self.committed_state_write_lock.views_for_table_scan(table_id))
222-
.collect::<HashSet<_>>()
223-
.into_iter()
224-
}
298+
.collect::<HashSet<_>>();
299+
300+
// Include views that perform precise index seeks.
301+
let mut process_views = |table_id: &_, row_ref| {
302+
for view_call in self.committed_state_write_lock.views_for_index_seek(table_id, row_ref) {
303+
res.insert(view_call);
304+
}
305+
};
225306

307+
for (table_id, deleted_table) in &self.tx_state.delete_tables {
308+
let (table, blob_store, _) = self
309+
.committed_state_write_lock
310+
.get_table_and_blob_store(*table_id)
311+
.expect("table must exist in committed state for deleted table");
312+
313+
if table.indexes.is_empty() {
314+
continue;
315+
}
316+
317+
for ptr in deleted_table.iter() {
318+
if let Some(row_ref) = table.get_row_ref(blob_store, ptr) {
319+
process_views(table_id, row_ref);
320+
}
321+
}
322+
}
323+
324+
for (table_id, inserted_table) in &self.tx_state.insert_tables {
325+
let (table, blob_store, _) = self
326+
.committed_state_write_lock
327+
.get_table_and_blob_store(*table_id)
328+
.expect("table must exist in committed state for inserted table");
329+
330+
if table.indexes.is_empty() {
331+
continue;
332+
}
333+
334+
for row_ref in inserted_table.scan_rows(blob_store) {
335+
process_views(table_id, row_ref);
336+
}
337+
}
338+
res.into_iter()
339+
}
226340
/// Removes keys for `view_id` from the committed read set.
227341
/// Used for dropping views in an auto-migration.
228342
pub fn drop_view_from_committed_read_set(&mut self, view_id: ViewId) {

crates/schema/src/def.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,14 @@ impl ModuleDef {
276276
self.reducers.get_index(id.idx()).map(|(_, def)| def)
277277
}
278278

279+
/// Look up a view by its id, and whether it is anonymous.
280+
pub fn get_view_by_id(&self, id: ViewFnPtr, is_anonymous: bool) -> Option<&ViewDef> {
281+
self.views
282+
.iter()
283+
.find(|(_, def)| def.fn_ptr == id && def.is_anonymous == is_anonymous)
284+
.map(|(_, def)| def)
285+
}
286+
279287
/// Convenience method to look up a procedure, possibly by a string, returning its id as well.
280288
pub fn procedure_full<K: ?Sized + Hash + Equivalent<Identifier>>(
281289
&self,

0 commit comments

Comments
 (0)