Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1684,12 +1684,18 @@ impl ModuleHost {
for ViewCallInfo {
view_id,
table_id,
view_name,
fn_ptr,
sender,
} in out.tx.view_for_update().cloned().collect::<Vec<_>>()
{
let view_def = self
.info
.module_def
.get_view_by_id(fn_ptr, sender.is_none())
.ok_or(ViewCallError::NoSuchView)?;

let result = self
.call_view(out.tx, &view_name, view_id, table_id, Nullary, caller, sender)
.call_view(out.tx, &view_def.name, view_id, table_id, Nullary, caller, sender)
.await?;

// Increment execution stats
Expand Down
10 changes: 5 additions & 5 deletions crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1003,11 +1003,11 @@ impl InstanceCommon {
.cloned()
.map(|info| {
let view_def = module_def
.view(&*info.view_name)
.unwrap_or_else(|| panic!("view `{}` not found", info.view_name));
.get_view_by_id(info.fn_ptr, info.sender.is_none())
.unwrap_or_else(|| panic!("view with fn_ptr `{}` not found", info.fn_ptr));

CallViewParams {
view_name: info.view_name,
view_name: view_def.name.clone().into(),
view_id: info.view_id,
table_id: info.table_id,
fn_ptr: view_def.fn_ptr,
Expand Down Expand Up @@ -1224,7 +1224,7 @@ impl InstanceOp for ViewOp<'_> {
FuncCallType::View(ViewCallInfo {
view_id: self.view_id,
table_id: self.table_id,
view_name: self.name.to_owned().into_boxed_str(),
fn_ptr: self.fn_ptr,
sender: Some(*self.sender),
})
}
Expand Down Expand Up @@ -1254,7 +1254,7 @@ impl InstanceOp for AnonymousViewOp<'_> {
FuncCallType::View(ViewCallInfo {
view_id: self.view_id,
table_id: self.table_id,
view_name: self.name.to_owned().into_boxed_str(),
fn_ptr: self.fn_ptr,
sender: None,
})
}
Expand Down
9 changes: 9 additions & 0 deletions crates/datastore/src/locking_tx_datastore/committed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ impl CommittedState {
pub(super) fn views_for_table_scan(&self, table_id: &TableId) -> impl Iterator<Item = &ViewCallInfo> {
self.read_sets.views_for_table_scan(table_id)
}

/// Returns the views that perform an precise index seek on given `row_ref` of `table_id`
pub fn views_for_index_seek<'a>(
&'a self,
table_id: &TableId,
row_ref: RowRef<'a>,
) -> impl Iterator<Item = &'a ViewCallInfo> {
self.read_sets.views_for_index_seek(table_id, row_ref)
}
}

impl MemoryUsage for CommittedState {
Expand Down
170 changes: 142 additions & 28 deletions crates/datastore/src/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use spacetimedb_lib::{
ConnectionId, Identity,
};
use spacetimedb_primitives::{
col_list, ArgId, ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId, ViewId,
col_list, ArgId, ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId, ViewFnPtr, ViewId,
};
use spacetimedb_sats::{
bsatn::{self, to_writer, DecodeError, Deserializer},
Expand Down Expand Up @@ -76,7 +76,7 @@ type DecodeResult<T> = core::result::Result<T, DecodeError>;
pub struct ViewCallInfo {
pub view_id: ViewId,
pub table_id: TableId,
pub view_name: Box<str>,
pub fn_ptr: ViewFnPtr,
pub sender: Option<Identity>,
}

Expand All @@ -102,8 +102,8 @@ impl ViewReadSets {
}

/// Record that a view performs a full scan of this table
fn insert_scan(&mut self, table_id: TableId, call: ViewCallInfo) {
self.tables.entry(table_id).or_default().insert_scan(call);
pub fn insert_full_table_scan(&mut self, table_id: TableId, call: ViewCallInfo) {
self.tables.entry(table_id).or_default().insert_table_scan(call);
}

/// Removes keys for `view_id` from the read set
Expand All @@ -120,17 +120,39 @@ impl ViewReadSets {
self.tables.entry(table_id).or_default().merge(rs);
}
}

/// Record that a view reads a specific index key
pub fn insert_index_scan(&mut self, table_id: TableId, cols: ColList, proj: AlgebraicValue, call: ViewCallInfo) {
let table_readset = self.tables.entry(table_id).or_default();
table_readset.insert_index_scan(cols, proj, call);
}

/// Returns the views that index seek on the given row pointer
pub fn views_for_index_seek<'a>(
&'a self,
table_id: &TableId,
row_ptr: RowRef<'a>,
) -> impl Iterator<Item = &'a ViewCallInfo> {
self.tables
.get(table_id)
.into_iter()
.flat_map(move |ts| ts.views_for_index_seek(row_ptr))
}
}

type IndexKeyReadSet = HashMap<AlgebraicValue, HashSet<ViewCallInfo>>;
type IndexColReadSet = HashMap<ColList, IndexKeyReadSet>;

/// A table-level read set for views
#[derive(Default)]
struct TableReadSet {
table_scans: HashSet<ViewCallInfo>,
index_reads: IndexColReadSet,
}

impl TableReadSet {
/// Record that this view performs a full scan of this read set's table
fn insert_scan(&mut self, call: ViewCallInfo) {
fn insert_table_scan(&mut self, call: ViewCallInfo) {
self.table_scans.insert(call);
}

Expand All @@ -139,24 +161,60 @@ impl TableReadSet {
self.table_scans.iter()
}

/// Is this read set empty?
/// Record that a view reads a specific index key for this table
fn insert_index_scan(&mut self, cols: ColList, key: AlgebraicValue, call: ViewCallInfo) {
let key_map = self.index_reads.entry(cols).or_default();
key_map.entry(key).or_default().insert(call);
}

/// Returns the views that index seek on the given row pointer (for this table)
fn views_for_index_seek<'a>(&'a self, row_ptr: RowRef<'a>) -> impl Iterator<Item = &'a ViewCallInfo> {
self.index_reads.iter().flat_map(move |(cols, av_set)| {
row_ptr
.project(cols)
.ok()
.and_then(|av| av_set.get(&av))
.into_iter()
.flat_map(|views| views.iter())
})
}

/// Is this read set empty? (no table scans and no index reads)
fn is_empty(&self) -> bool {
self.table_scans.is_empty()
self.table_scans.is_empty() && self.index_reads.is_empty()
}

/// Removes keys for `view_id` from the read set, optionally filtering by `sender`
fn remove_view(&mut self, view_id: ViewId, sender: Option<Identity>) {
if let Some(identity) = sender {
self.table_scans
.retain(|call| !(call.view_id == view_id && call.sender.as_ref() == Some(&identity)));
} else {
self.table_scans.retain(|call| call.view_id != view_id);
}
let matches_call = |call: &ViewCallInfo| {
call.view_id == view_id && sender.as_ref().is_none_or(|s| call.sender.as_ref() == Some(s))
};

// Remove from table_scans
self.table_scans.retain(|call| !matches_call(call));

// Remove from index_reads
self.index_reads.retain(|_cols, key_map| {
key_map.retain(|_key, views| {
views.retain(|call| !matches_call(call));
!views.is_empty()
});
!key_map.is_empty()
});
}

/// Merge or union two read sets for this table
fn merge(&mut self, readset: TableReadSet) {
self.table_scans.extend(readset.table_scans);
/// Merge (union) another table read set into this one
fn merge(&mut self, other: TableReadSet) {
// merge table scans
self.table_scans.extend(other.table_scans);

// merge index reads: for each cols, for each av, extend the view set
for (cols, other_av_set) in other.index_reads {
let av_set = self.index_reads.entry(cols).or_default();
for (av, other_views) in other_av_set {
av_set.entry(av).or_default().extend(other_views);
}
}
}
}

Expand Down Expand Up @@ -192,7 +250,7 @@ impl MutTxId {
/// Record that a view performs a table scan in this transaction's read set
pub fn record_table_scan(&mut self, op: &FuncCallType, table_id: TableId) {
if let FuncCallType::View(view) = op {
self.read_sets.insert_scan(table_id, view.clone());
self.read_sets.insert_full_table_scan(table_id, view.clone());
}
}

Expand All @@ -201,28 +259,84 @@ impl MutTxId {
&mut self,
op: &FuncCallType,
table_id: TableId,
_: IndexId,
_: Bound<AlgebraicValue>,
_: Bound<AlgebraicValue>,
index_id: IndexId,
lower: Bound<AlgebraicValue>,
upper: Bound<AlgebraicValue>,
) {
// TODO: Implement read set tracking for index scans
if let FuncCallType::View(view) = op {
self.read_sets.insert_scan(table_id, view.clone());
let FuncCallType::View(view) = op else {
return;
};

// Check for precise index seek
if let (Bound::Included(low_val), Bound::Included(up_val)) = (&lower, &upper) {
if low_val == up_val {
// Fetch index metadata
let Some((_, idx, _)) = self.get_table_and_index(index_id) else {
return;
};

let cols = idx.index().indexed_columns.clone();
self.read_sets
.insert_index_scan(table_id, cols, low_val.clone(), view.clone());
return;
}
}

// Everything else is treated as a table scan
self.read_sets.insert_full_table_scan(table_id, view.clone());
}

/// Returns the views whose read sets overlaps with this transaction's write set
pub fn view_for_update(&self) -> impl Iterator<Item = &ViewCallInfo> {
self.tx_state
pub fn view_for_update(&self) -> impl Iterator<Item = &ViewCallInfo> + '_ {
let mut res = self
.tx_state
.insert_tables
.keys()
.filter(|table_id| !self.tx_state.delete_tables.contains_key(table_id))
.chain(self.tx_state.delete_tables.keys())
.flat_map(|table_id| self.committed_state_write_lock.views_for_table_scan(table_id))
.collect::<HashSet<_>>()
.into_iter()
}
.collect::<HashSet<_>>();

// Include views that perform precise index seeks.
let mut process_views = |table_id: &_, row_ref| {
for view_call in self.committed_state_write_lock.views_for_index_seek(table_id, row_ref) {
res.insert(view_call);
}
};

for (table_id, deleted_table) in &self.tx_state.delete_tables {
let (table, blob_store, _) = self
.committed_state_write_lock
.get_table_and_blob_store(*table_id)
.expect("table must exist in committed state for deleted table");

if table.indexes.is_empty() {
continue;
}

for ptr in deleted_table.iter() {
if let Some(row_ref) = table.get_row_ref(blob_store, ptr) {
process_views(table_id, row_ref);
}
}
}

for (table_id, inserted_table) in &self.tx_state.insert_tables {
let (table, blob_store, _) = self
.committed_state_write_lock
.get_table_and_blob_store(*table_id)
.expect("table must exist in committed state for inserted table");

if table.indexes.is_empty() {
continue;
}

for row_ref in inserted_table.scan_rows(blob_store) {
process_views(table_id, row_ref);
}
}
res.into_iter()
}
/// Removes keys for `view_id` from the committed read set.
/// Used for dropping views in an auto-migration.
pub fn drop_view_from_committed_read_set(&mut self, view_id: ViewId) {
Expand Down
8 changes: 8 additions & 0 deletions crates/schema/src/def.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,14 @@ impl ModuleDef {
self.reducers.get_index(id.idx()).map(|(_, def)| def)
}

/// Look up a view by its id, and whether it is anonymous.
pub fn get_view_by_id(&self, id: ViewFnPtr, is_anonymous: bool) -> Option<&ViewDef> {
self.views
.iter()
.find(|(_, def)| def.fn_ptr == id && def.is_anonymous == is_anonymous)
.map(|(_, def)| def)
}

/// Convenience method to look up a procedure, possibly by a string, returning its id as well.
pub fn procedure_full<K: ?Sized + Hash + Equivalent<Identifier>>(
&self,
Expand Down
Loading
Loading