Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 53 additions & 66 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductValue}
use spacetimedb_schema::def::{ModuleDef, TableDef, ViewDef};
use spacetimedb_schema::reducer_name::ReducerName;
use spacetimedb_schema::schema::{
ColumnSchema, IndexSchema, RowLevelSecuritySchema, Schema, SequenceSchema, TableSchema,
ColumnSchema, IndexSchema, RowLevelSecuritySchema, Schema, SequenceSchema, TableSchema, VIEW_ARG_HASH_COL,
};
use spacetimedb_schema::table_name::TableName;
use spacetimedb_snapshot::{DynSnapshotRepo, ReconstructedSnapshot, SnapshotError, SnapshotRepository};
Expand Down Expand Up @@ -1559,10 +1559,10 @@ impl RelationalDB {
Ok(None)
}

/// Write `rows` into a (sender) view's backing table.
/// Write `rows` into a sender-scoped view's backing table.
///
/// # Process
/// 1. Delete all rows for `sender` from the view's backing table
/// 1. Delete all rows for `sender`'s implicit argument hash from the view's backing table
/// 2. Insert the new rows into the backing table
///
/// # Arguments
Expand All @@ -1578,39 +1578,24 @@ impl RelationalDB {
sender: Identity,
rows: Vec<ProductValue>,
) -> Result<(), DBError> {
// Delete rows for `sender` from the backing table
let rows_to_delete = self
.iter_by_col_eq_mut(tx, table_id, ColId(0), &sender.into())?
.map(|res| res.pointer())
.collect::<Vec<_>>();
self.delete(tx, table_id, rows_to_delete);

self.write_view_rows(tx, table_id, rows, Some(sender))?;

Ok(())
let arg_hash = MutTxId::view_arg_hash(sender);
self.materialize_view_arg_hash(tx, table_id, arg_hash, rows)
}

/// Write `rows` into an anonymous view's backing table.
///
/// # Process
/// 1. Clear the view's backing table
/// 2. Insert the new rows into the backing table
///
/// # Arguments
/// * `tx` - Mutable transaction context
/// * `table_id` - The id of the view's backing table
/// * `rows` - Product values to insert
#[allow(clippy::too_many_arguments)]
pub fn materialize_anonymous_view(
fn materialize_view_arg_hash(
&self,
tx: &mut MutTxId,
table_id: TableId,
arg_hash: AlgebraicValue,
rows: Vec<ProductValue>,
) -> Result<(), DBError> {
// Clear entire backing table
self.clear_table(tx, table_id)?;
let rows_to_delete = self
.iter_by_col_eq_mut(tx, table_id, VIEW_ARG_HASH_COL, &arg_hash)?
.map(|res| res.pointer())
.collect::<Vec<_>>();
self.delete(tx, table_id, rows_to_delete);

self.write_view_rows(tx, table_id, rows, None)?;
self.write_view_rows(tx, table_id, rows, &arg_hash)?;

Ok(())
}
Expand All @@ -1626,10 +1611,11 @@ impl RelationalDB {
view_call: ViewCallInfo,
rows: Vec<ProductValue>,
) -> Result<(), DBError> {
match view_call.sender {
Some(sender) => self.materialize_view(tx, table_id, sender, rows)?,
None => self.materialize_anonymous_view(tx, table_id, rows)?,
}
let arg_hash = match view_call.sender {
Some(sender) => MutTxId::view_arg_hash(sender),
None => MutTxId::anonymous_view_arg_hash(),
};
self.materialize_view_arg_hash(tx, table_id, arg_hash, rows)?;
tx.replace_view_read_set(view_call);

Ok(())
Expand All @@ -1640,34 +1626,18 @@ impl RelationalDB {
tx: &mut MutTxId,
table_id: TableId,
rows: Vec<ProductValue>,
sender: Option<Identity>,
arg_hash: &AlgebraicValue,
) -> Result<(), DBError> {
match sender {
Some(sender) => {
for product in rows {
let value = ProductValue::from_iter(std::iter::once(sender.into()).chain(product.elements));
self.insert(
tx,
table_id,
&value
.to_bsatn_vec()
.map_err(|_| ViewError::SerializeRow)
.map_err(DatastoreError::from)?,
)?;
}
}
None => {
for product in rows {
self.insert(
tx,
table_id,
&product
.to_bsatn_vec()
.map_err(|_| ViewError::SerializeRow)
.map_err(DatastoreError::from)?,
)?;
}
}
for product in rows {
let value = ProductValue::from_iter(std::iter::once(arg_hash.clone()).chain(product.elements));
self.insert(
tx,
table_id,
&value
.to_bsatn_vec()
.map_err(|_| ViewError::SerializeRow)
.map_err(DatastoreError::from)?,
)?;
}

Ok(())
Expand Down Expand Up @@ -2256,8 +2226,8 @@ pub mod tests_utils {
row: ProductValue,
) -> Result<RowRef<'a>, DBError> {
let meta_cols = match sender {
Some(identity) => vec![identity.into()],
None => vec![],
Some(identity) => vec![MutTxId::view_arg_hash(identity)],
None => vec![MutTxId::anonymous_view_arg_hash()],
};
let cols = meta_cols.into_iter().chain(row.elements);
let row = ProductValue::from_iter(cols);
Expand Down Expand Up @@ -2512,8 +2482,9 @@ mod tests {

fn project_views(stdb: &TestDB, table_id: TableId, sender: Identity) -> Vec<ProductValue> {
let tx = begin_tx(stdb);
let arg_hash = MutTxId::view_arg_hash(sender);

stdb.iter_by_col_eq(&tx, table_id, 0, &sender.into())
stdb.iter_by_col_eq(&tx, table_id, VIEW_ARG_HASH_COL, &arg_hash)
.unwrap()
.map(|row| {
let pv = row.to_product_value();
Expand All @@ -2526,10 +2497,16 @@ mod tests {

fn project_anonymous_views(stdb: &TestDB, table_id: TableId) -> Vec<ProductValue> {
let tx = begin_tx(stdb);
let arg_hash = MutTxId::anonymous_view_arg_hash();

stdb.iter(&tx, table_id)
stdb.iter_by_col_eq(&tx, table_id, VIEW_ARG_HASH_COL, &arg_hash)
.unwrap()
.map(|row| row.to_product_value())
.map(|row| {
let pv = row.to_product_value();
ProductValue {
elements: pv.elements.iter().skip(1).cloned().collect(),
}
})
.collect()
}

Expand Down Expand Up @@ -2737,7 +2714,12 @@ mod tests {
let mut tx = begin_mut_tx(&stdb);
tx.subscribe_view(view_id, ArgId::SENTINEL, stale_sender)?;
tx.subscribe_view(view_id, ArgId::SENTINEL, live_sender)?;
stdb.materialize_anonymous_view(&mut tx, table_id, vec![product![42u8]])?;
stdb.materialize_view_call(
&mut tx,
table_id,
ViewCallInfo { view_id, sender: None },
vec![product![42u8]],
)?;
stdb.commit_tx(tx)?;

let mut tx = begin_mut_tx(&stdb);
Expand Down Expand Up @@ -2786,7 +2768,12 @@ mod tests {

let mut tx = begin_mut_tx(&stdb);
tx.subscribe_view(view_id, ArgId::SENTINEL, sender)?;
stdb.materialize_anonymous_view(&mut tx, table_id, vec![product![42u8]])?;
stdb.materialize_view_call(
&mut tx,
table_id,
ViewCallInfo { view_id, sender: None },
vec![product![42u8]],
)?;
stdb.commit_tx(tx)?;

let mut tx = begin_mut_tx(&stdb);
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3309,7 +3309,7 @@ impl ModuleHost {
let delta_tx = DeltaTx::from(tx);
let params = ExecutionParams::from_auth(auth);
let plan_fragments = optimized.iter();
let (rows, _, metrics) = if returns_view_table && num_private_cols > 0 {
let (rows, _, metrics) = if returns_view_table {
execute_plan_for_view::<F>(plan_fragments, num_cols, num_private_cols, &delta_tx, &params, rlb_pool)
} else {
execute_plan::<F>(optimized.iter(), &delta_tx, &params, rlb_pool)
Expand Down
4 changes: 3 additions & 1 deletion crates/core/src/subscription/module_subscription_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2040,6 +2040,7 @@ mod tests {
use spacetimedb_client_api_messages::energy::FunctionBudget;
use spacetimedb_client_api_messages::websocket::{common::RowListLen as _, v1 as ws_v1, v2 as ws_v2};
use spacetimedb_data_structures::map::{HashCollectionExt as _, HashMap};
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
use spacetimedb_datastore::system_tables::{StRowLevelSecurityRow, ST_ROW_LEVEL_SECURITY_ID};
use spacetimedb_execution::dml::MutDatastore;
use spacetimedb_lib::bsatn::ToBsatn;
Expand Down Expand Up @@ -2536,7 +2537,8 @@ mod tests {
subs.remove_subscriber(client_id_for_b);

// Delete the backing row and verify the surviving subscriber still receives the view delta.
let _ = commit_tx(&db, &subs, [(view_table_id, product![id_for_a, 7_u8])], [])?;
let arg_hash = MutTxId::view_arg_hash(id_for_a);
let _ = commit_tx(&db, &subs, [(view_table_id, product![arg_hash, 7_u8])], [])?;

let schema = ProductType::from([AlgebraicType::U8]);
assert_v2_tx_update_for_table(
Expand Down
2 changes: 0 additions & 2 deletions crates/datastore/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ pub enum ViewError {
TypeMismatchOption,
#[error("expected ProductValue in view result")]
ExpectedProduct,
#[error("failed to serialize view arguments")]
SerializeArgs,
}

#[derive(Error, Debug)]
Expand Down
39 changes: 29 additions & 10 deletions crates/datastore/src/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ use spacetimedb_execution::{dml::MutDatastore, Datastore, DeltaStore, Row};
use spacetimedb_lib::{
db::raw_def::v9::RawSql,
db::{auth::StAccess, raw_def::SEQUENCE_ALLOCATION_STEP},
empty_view_arg_hash_value,
metrics::ExecutionMetrics,
ConnectionId, Identity, Timestamp,
sender_view_arg_hash_value, ConnectionId, Identity, Timestamp,
};
use spacetimedb_primitives::{
col_list, ArgId, ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId, ViewId,
Expand All @@ -57,7 +58,10 @@ use spacetimedb_schema::{
def::{ModuleDef, ViewColumnDef, ViewDef, ViewParamDef},
identifier::Identifier,
reducer_name::ReducerName,
schema::{ColumnSchema, ConstraintSchema, IndexSchema, RowLevelSecuritySchema, SequenceSchema, TableSchema},
schema::{
ColumnSchema, ConstraintSchema, IndexSchema, RowLevelSecuritySchema, SequenceSchema, TableSchema,
VIEW_ARG_HASH_COL,
},
table_name::TableName,
};
use spacetimedb_table::{
Expand Down Expand Up @@ -2369,6 +2373,16 @@ impl<'a, I: Iterator<Item = RowRef<'a>>> Iterator for FilterDeleted<'a, I> {
}

impl MutTxId {
/// Returns the hash value for an anonymous view's empty arguments.
pub fn anonymous_view_arg_hash() -> AlgebraicValue {
empty_view_arg_hash_value()
}

/// Returns the hash value for a sender-scoped view's arguments.
pub fn view_arg_hash(sender: Identity) -> AlgebraicValue {
sender_view_arg_hash_value(sender)
}

/// Does this caller have an entry for `view_id` in `st_view_sub`?
pub fn is_view_materialized(&self, view_id: ViewId, arg_id: ArgId, sender: Identity) -> Result<bool> {
use StViewSubFields::*;
Expand Down Expand Up @@ -2542,22 +2556,27 @@ impl MutTxId {
} = self.lookup_st_view(view_id)?;
let table_id = table_id.expect("views have backing table");

if is_anonymous {
if !self.has_other_st_view_sub_entries(view_id, sub_row_ptr)? {
self.clear_table(table_id)?;
self.drop_view_from_committed_read_set(view_id);
}
} else {
let drop_materialization = !is_anonymous || !self.has_other_st_view_sub_entries(view_id, sub_row_ptr)?;
if drop_materialization {
let arg_hash = if is_anonymous {
Self::anonymous_view_arg_hash()
} else {
Self::view_arg_hash(sender)
};
let rows_to_delete = self
.iter_by_col_eq(table_id, 0, &sender.into())?
.iter_by_col_eq(table_id, VIEW_ARG_HASH_COL, &arg_hash)?
.map(|res| res.pointer())
.collect::<Vec<_>>();

for row_ptr in rows_to_delete {
self.delete(table_id, row_ptr)?;
}

self.drop_view_with_sender_from_committed_read_set(view_id, sender);
if is_anonymous {
self.drop_view_from_committed_read_set(view_id);
} else {
self.drop_view_with_sender_from_committed_read_set(view_id, sender);
}
}

// Finally, delete the subscription row
Expand Down
9 changes: 2 additions & 7 deletions crates/datastore/src/locking_tx_datastore/state_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use crate::system_tables::{
StConnectionCredentialsFields, StConnectionCredentialsRow, StConstraintFields, StConstraintRow, StEventTableFields,
StEventTableRow, StIndexAccessorFields, StIndexAccessorRow, StIndexFields, StIndexRow, StScheduledFields,
StScheduledRow, StSequenceFields, StSequenceRow, StTableAccessorFields, StTableAccessorRow, StTableFields,
StTableRow, StViewFields, StViewParamFields, StViewRow, SystemTable, ST_COLUMN_ACCESSOR_ID, ST_COLUMN_ID,
StTableRow, StViewFields, StViewRow, SystemTable, ST_COLUMN_ACCESSOR_ID, ST_COLUMN_ID,
ST_CONNECTION_CREDENTIALS_ID, ST_CONSTRAINT_ID, ST_EVENT_TABLE_ID, ST_INDEX_ACCESSOR_ID, ST_INDEX_ID,
ST_SCHEDULED_ID, ST_SEQUENCE_ID, ST_TABLE_ACCESSOR_ID, ST_TABLE_ID, ST_VIEW_ID, ST_VIEW_PARAM_ID,
ST_SCHEDULED_ID, ST_SEQUENCE_ID, ST_TABLE_ACCESSOR_ID, ST_TABLE_ID, ST_VIEW_ID,
};
use anyhow::anyhow;
use core::ops::RangeBounds;
Expand Down Expand Up @@ -240,14 +240,9 @@ pub trait StateView {
.map(|mut iter| {
iter.next().map(|row| -> Result<_> {
let row = StViewRow::try_from(row)?;
let has_args = self
.iter_by_col_eq(ST_VIEW_PARAM_ID, StViewParamFields::ViewId, &row.view_id.into())?
.next()
.is_some();

Ok(ViewDefInfo {
view_id: row.view_id,
has_args,
is_anonymous: row.is_anonymous,
})
})
Expand Down
26 changes: 17 additions & 9 deletions crates/execution/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use anyhow::Result;
use core::hash::{Hash, Hasher};
use core::ops::RangeBounds;
use spacetimedb_lib::{identity::AuthCtx, query::Delta, AlgebraicType, Identity};
use spacetimedb_physical_plan::plan::{ParamResolver, ProjectField, TupleField};
use spacetimedb_lib::{hash_sender_view_args, identity::AuthCtx, query::Delta, AlgebraicType, Identity};
use spacetimedb_physical_plan::plan::{
ParamResolver, ParamSlot, ProjectField, TupleField, PARAM_SENDER, PARAM_VIEW_ARG_HASH,
};
use spacetimedb_primitives::{ColList, IndexId, TableId};
use spacetimedb_sats::bsatn::{BufReservedFill, EncodeError, ToBsatn};
use spacetimedb_sats::buffer::BufWriter;
use spacetimedb_sats::product_value::InvalidFieldError;
use spacetimedb_sats::{impl_serialize, AlgebraicValue, ProductValue};
use spacetimedb_sql_parser::ast::Parameter;
use spacetimedb_sats::{impl_serialize, u256, AlgebraicValue, ProductValue};
use spacetimedb_table::{static_assert_size, table::RowRef};

pub mod dml;
Expand All @@ -17,11 +18,15 @@ pub mod pipelined;
#[derive(Debug, Clone, Copy)]
pub struct ExecutionParams {
sender: Identity,
view_arg_hash: u256,
}

impl ExecutionParams {
pub fn from_sender(sender: Identity) -> Self {
Self { sender }
Self {
sender,
view_arg_hash: hash_sender_view_args(sender).to_u256(),
}
}

pub fn from_auth(auth: &AuthCtx) -> Self {
Expand All @@ -30,11 +35,14 @@ impl ExecutionParams {
}

impl ParamResolver for ExecutionParams {
fn resolve_param(&self, param: Parameter, ty: &AlgebraicType) -> AlgebraicValue {
fn resolve_param(&self, param: ParamSlot, ty: &AlgebraicType) -> AlgebraicValue {
match param {
Parameter::Sender if ty.is_identity() => self.sender.into(),
Parameter::Sender if ty.is_bytes() => AlgebraicValue::Bytes(self.sender.to_be_byte_array().into()),
Parameter::Sender => panic!("unsupported type for :sender: {ty:?}"),
PARAM_SENDER if ty.is_identity() => self.sender.into(),
PARAM_SENDER if ty.is_bytes() => AlgebraicValue::Bytes(self.sender.to_be_byte_array().into()),
PARAM_SENDER => panic!("unsupported type for :sender: {ty:?}"),
PARAM_VIEW_ARG_HASH if matches!(ty, AlgebraicType::U256) => AlgebraicValue::U256(self.view_arg_hash.into()),
PARAM_VIEW_ARG_HASH => panic!("unsupported type for view arg hash: {ty:?}"),
ParamSlot(slot) => panic!("unknown physical plan parameter slot: {slot}"),
}
}
}
Expand Down
Loading
Loading