diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index d149c437b59..2e2914fb670 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -50,7 +50,7 @@ use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductValue} use spacetimedb_schema::def::{ModuleDef, TableDef, ViewDef}; use spacetimedb_schema::reducer_name::ReducerName; use spacetimedb_schema::schema::{ - ColumnSchema, IndexSchema, RowLevelSecuritySchema, Schema, SequenceSchema, TableSchema, + ColumnSchema, IndexSchema, RowLevelSecuritySchema, Schema, SequenceSchema, TableSchema, VIEW_ARG_HASH_COL, }; use spacetimedb_schema::table_name::TableName; use spacetimedb_snapshot::{DynSnapshotRepo, ReconstructedSnapshot, SnapshotError, SnapshotRepository}; @@ -1559,10 +1559,10 @@ impl RelationalDB { Ok(None) } - /// Write `rows` into a (sender) view's backing table. + /// Write `rows` into a sender-scoped view's backing table. /// /// # Process - /// 1. Delete all rows for `sender` from the view's backing table + /// 1. Delete all rows for `sender`'s implicit argument hash from the view's backing table /// 2. Insert the new rows into the backing table /// /// # Arguments @@ -1578,39 +1578,24 @@ impl RelationalDB { sender: Identity, rows: Vec, ) -> Result<(), DBError> { - // Delete rows for `sender` from the backing table - let rows_to_delete = self - .iter_by_col_eq_mut(tx, table_id, ColId(0), &sender.into())? - .map(|res| res.pointer()) - .collect::>(); - self.delete(tx, table_id, rows_to_delete); - - self.write_view_rows(tx, table_id, rows, Some(sender))?; - - Ok(()) + let arg_hash = MutTxId::view_arg_hash(sender); + self.materialize_view_arg_hash(tx, table_id, arg_hash, rows) } - /// Write `rows` into an anonymous view's backing table. - /// - /// # Process - /// 1. Clear the view's backing table - /// 2. Insert the new rows into the backing table - /// - /// # Arguments - /// * `tx` - Mutable transaction context - /// * `table_id` - The id of the view's backing table - /// * `rows` - Product values to insert - #[allow(clippy::too_many_arguments)] - pub fn materialize_anonymous_view( + fn materialize_view_arg_hash( &self, tx: &mut MutTxId, table_id: TableId, + arg_hash: AlgebraicValue, rows: Vec, ) -> Result<(), DBError> { - // Clear entire backing table - self.clear_table(tx, table_id)?; + let rows_to_delete = self + .iter_by_col_eq_mut(tx, table_id, VIEW_ARG_HASH_COL, &arg_hash)? + .map(|res| res.pointer()) + .collect::>(); + self.delete(tx, table_id, rows_to_delete); - self.write_view_rows(tx, table_id, rows, None)?; + self.write_view_rows(tx, table_id, rows, &arg_hash)?; Ok(()) } @@ -1626,10 +1611,11 @@ impl RelationalDB { view_call: ViewCallInfo, rows: Vec, ) -> Result<(), DBError> { - match view_call.sender { - Some(sender) => self.materialize_view(tx, table_id, sender, rows)?, - None => self.materialize_anonymous_view(tx, table_id, rows)?, - } + let arg_hash = match view_call.sender { + Some(sender) => MutTxId::view_arg_hash(sender), + None => MutTxId::anonymous_view_arg_hash(), + }; + self.materialize_view_arg_hash(tx, table_id, arg_hash, rows)?; tx.replace_view_read_set(view_call); Ok(()) @@ -1640,34 +1626,18 @@ impl RelationalDB { tx: &mut MutTxId, table_id: TableId, rows: Vec, - sender: Option, + arg_hash: &AlgebraicValue, ) -> Result<(), DBError> { - match sender { - Some(sender) => { - for product in rows { - let value = ProductValue::from_iter(std::iter::once(sender.into()).chain(product.elements)); - self.insert( - tx, - table_id, - &value - .to_bsatn_vec() - .map_err(|_| ViewError::SerializeRow) - .map_err(DatastoreError::from)?, - )?; - } - } - None => { - for product in rows { - self.insert( - tx, - table_id, - &product - .to_bsatn_vec() - .map_err(|_| ViewError::SerializeRow) - .map_err(DatastoreError::from)?, - )?; - } - } + for product in rows { + let value = ProductValue::from_iter(std::iter::once(arg_hash.clone()).chain(product.elements)); + self.insert( + tx, + table_id, + &value + .to_bsatn_vec() + .map_err(|_| ViewError::SerializeRow) + .map_err(DatastoreError::from)?, + )?; } Ok(()) @@ -2256,8 +2226,8 @@ pub mod tests_utils { row: ProductValue, ) -> Result, DBError> { let meta_cols = match sender { - Some(identity) => vec![identity.into()], - None => vec![], + Some(identity) => vec![MutTxId::view_arg_hash(identity)], + None => vec![MutTxId::anonymous_view_arg_hash()], }; let cols = meta_cols.into_iter().chain(row.elements); let row = ProductValue::from_iter(cols); @@ -2512,8 +2482,9 @@ mod tests { fn project_views(stdb: &TestDB, table_id: TableId, sender: Identity) -> Vec { let tx = begin_tx(stdb); + let arg_hash = MutTxId::view_arg_hash(sender); - stdb.iter_by_col_eq(&tx, table_id, 0, &sender.into()) + stdb.iter_by_col_eq(&tx, table_id, VIEW_ARG_HASH_COL, &arg_hash) .unwrap() .map(|row| { let pv = row.to_product_value(); @@ -2526,10 +2497,16 @@ mod tests { fn project_anonymous_views(stdb: &TestDB, table_id: TableId) -> Vec { let tx = begin_tx(stdb); + let arg_hash = MutTxId::anonymous_view_arg_hash(); - stdb.iter(&tx, table_id) + stdb.iter_by_col_eq(&tx, table_id, VIEW_ARG_HASH_COL, &arg_hash) .unwrap() - .map(|row| row.to_product_value()) + .map(|row| { + let pv = row.to_product_value(); + ProductValue { + elements: pv.elements.iter().skip(1).cloned().collect(), + } + }) .collect() } @@ -2737,7 +2714,12 @@ mod tests { let mut tx = begin_mut_tx(&stdb); tx.subscribe_view(view_id, ArgId::SENTINEL, stale_sender)?; tx.subscribe_view(view_id, ArgId::SENTINEL, live_sender)?; - stdb.materialize_anonymous_view(&mut tx, table_id, vec![product![42u8]])?; + stdb.materialize_view_call( + &mut tx, + table_id, + ViewCallInfo { view_id, sender: None }, + vec![product![42u8]], + )?; stdb.commit_tx(tx)?; let mut tx = begin_mut_tx(&stdb); @@ -2786,7 +2768,12 @@ mod tests { let mut tx = begin_mut_tx(&stdb); tx.subscribe_view(view_id, ArgId::SENTINEL, sender)?; - stdb.materialize_anonymous_view(&mut tx, table_id, vec![product![42u8]])?; + stdb.materialize_view_call( + &mut tx, + table_id, + ViewCallInfo { view_id, sender: None }, + vec![product![42u8]], + )?; stdb.commit_tx(tx)?; let mut tx = begin_mut_tx(&stdb); diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 2207470d467..bda490dab74 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -3309,7 +3309,7 @@ impl ModuleHost { let delta_tx = DeltaTx::from(tx); let params = ExecutionParams::from_auth(auth); let plan_fragments = optimized.iter(); - let (rows, _, metrics) = if returns_view_table && num_private_cols > 0 { + let (rows, _, metrics) = if returns_view_table { execute_plan_for_view::(plan_fragments, num_cols, num_private_cols, &delta_tx, ¶ms, rlb_pool) } else { execute_plan::(optimized.iter(), &delta_tx, ¶ms, rlb_pool) diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index be77cc2af35..d43cf96662d 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -2040,6 +2040,7 @@ mod tests { use spacetimedb_client_api_messages::energy::FunctionBudget; use spacetimedb_client_api_messages::websocket::{common::RowListLen as _, v1 as ws_v1, v2 as ws_v2}; use spacetimedb_data_structures::map::{HashCollectionExt as _, HashMap}; + use spacetimedb_datastore::locking_tx_datastore::MutTxId; use spacetimedb_datastore::system_tables::{StRowLevelSecurityRow, ST_ROW_LEVEL_SECURITY_ID}; use spacetimedb_execution::dml::MutDatastore; use spacetimedb_lib::bsatn::ToBsatn; @@ -2536,7 +2537,8 @@ mod tests { subs.remove_subscriber(client_id_for_b); // Delete the backing row and verify the surviving subscriber still receives the view delta. - let _ = commit_tx(&db, &subs, [(view_table_id, product![id_for_a, 7_u8])], [])?; + let arg_hash = MutTxId::view_arg_hash(id_for_a); + let _ = commit_tx(&db, &subs, [(view_table_id, product![arg_hash, 7_u8])], [])?; let schema = ProductType::from([AlgebraicType::U8]); assert_v2_tx_update_for_table( diff --git a/crates/datastore/src/error.rs b/crates/datastore/src/error.rs index 508c80858bf..596c0ca26de 100644 --- a/crates/datastore/src/error.rs +++ b/crates/datastore/src/error.rs @@ -57,8 +57,6 @@ pub enum ViewError { TypeMismatchOption, #[error("expected ProductValue in view result")] ExpectedProduct, - #[error("failed to serialize view arguments")] - SerializeArgs, } #[derive(Error, Debug)] diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index 3383e9a7fb3..8325058aa10 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -43,8 +43,9 @@ use spacetimedb_execution::{dml::MutDatastore, Datastore, DeltaStore, Row}; use spacetimedb_lib::{ db::raw_def::v9::RawSql, db::{auth::StAccess, raw_def::SEQUENCE_ALLOCATION_STEP}, + empty_view_arg_hash_value, metrics::ExecutionMetrics, - ConnectionId, Identity, Timestamp, + sender_view_arg_hash_value, ConnectionId, Identity, Timestamp, }; use spacetimedb_primitives::{ col_list, ArgId, ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId, ViewId, @@ -57,7 +58,10 @@ use spacetimedb_schema::{ def::{ModuleDef, ViewColumnDef, ViewDef, ViewParamDef}, identifier::Identifier, reducer_name::ReducerName, - schema::{ColumnSchema, ConstraintSchema, IndexSchema, RowLevelSecuritySchema, SequenceSchema, TableSchema}, + schema::{ + ColumnSchema, ConstraintSchema, IndexSchema, RowLevelSecuritySchema, SequenceSchema, TableSchema, + VIEW_ARG_HASH_COL, + }, table_name::TableName, }; use spacetimedb_table::{ @@ -2369,6 +2373,16 @@ impl<'a, I: Iterator>> Iterator for FilterDeleted<'a, I> { } impl MutTxId { + /// Returns the hash value for an anonymous view's empty arguments. + pub fn anonymous_view_arg_hash() -> AlgebraicValue { + empty_view_arg_hash_value() + } + + /// Returns the hash value for a sender-scoped view's arguments. + pub fn view_arg_hash(sender: Identity) -> AlgebraicValue { + sender_view_arg_hash_value(sender) + } + /// Does this caller have an entry for `view_id` in `st_view_sub`? pub fn is_view_materialized(&self, view_id: ViewId, arg_id: ArgId, sender: Identity) -> Result { use StViewSubFields::*; @@ -2542,14 +2556,15 @@ impl MutTxId { } = self.lookup_st_view(view_id)?; let table_id = table_id.expect("views have backing table"); - if is_anonymous { - if !self.has_other_st_view_sub_entries(view_id, sub_row_ptr)? { - self.clear_table(table_id)?; - self.drop_view_from_committed_read_set(view_id); - } - } else { + let drop_materialization = !is_anonymous || !self.has_other_st_view_sub_entries(view_id, sub_row_ptr)?; + if drop_materialization { + let arg_hash = if is_anonymous { + Self::anonymous_view_arg_hash() + } else { + Self::view_arg_hash(sender) + }; let rows_to_delete = self - .iter_by_col_eq(table_id, 0, &sender.into())? + .iter_by_col_eq(table_id, VIEW_ARG_HASH_COL, &arg_hash)? .map(|res| res.pointer()) .collect::>(); @@ -2557,7 +2572,11 @@ impl MutTxId { self.delete(table_id, row_ptr)?; } - self.drop_view_with_sender_from_committed_read_set(view_id, sender); + if is_anonymous { + self.drop_view_from_committed_read_set(view_id); + } else { + self.drop_view_with_sender_from_committed_read_set(view_id, sender); + } } // Finally, delete the subscription row diff --git a/crates/datastore/src/locking_tx_datastore/state_view.rs b/crates/datastore/src/locking_tx_datastore/state_view.rs index d1d484fe363..3d20e5e50d9 100644 --- a/crates/datastore/src/locking_tx_datastore/state_view.rs +++ b/crates/datastore/src/locking_tx_datastore/state_view.rs @@ -7,9 +7,9 @@ use crate::system_tables::{ StConnectionCredentialsFields, StConnectionCredentialsRow, StConstraintFields, StConstraintRow, StEventTableFields, StEventTableRow, StIndexAccessorFields, StIndexAccessorRow, StIndexFields, StIndexRow, StScheduledFields, StScheduledRow, StSequenceFields, StSequenceRow, StTableAccessorFields, StTableAccessorRow, StTableFields, - StTableRow, StViewFields, StViewParamFields, StViewRow, SystemTable, ST_COLUMN_ACCESSOR_ID, ST_COLUMN_ID, + StTableRow, StViewFields, StViewRow, SystemTable, ST_COLUMN_ACCESSOR_ID, ST_COLUMN_ID, ST_CONNECTION_CREDENTIALS_ID, ST_CONSTRAINT_ID, ST_EVENT_TABLE_ID, ST_INDEX_ACCESSOR_ID, ST_INDEX_ID, - ST_SCHEDULED_ID, ST_SEQUENCE_ID, ST_TABLE_ACCESSOR_ID, ST_TABLE_ID, ST_VIEW_ID, ST_VIEW_PARAM_ID, + ST_SCHEDULED_ID, ST_SEQUENCE_ID, ST_TABLE_ACCESSOR_ID, ST_TABLE_ID, ST_VIEW_ID, }; use anyhow::anyhow; use core::ops::RangeBounds; @@ -240,14 +240,9 @@ pub trait StateView { .map(|mut iter| { iter.next().map(|row| -> Result<_> { let row = StViewRow::try_from(row)?; - let has_args = self - .iter_by_col_eq(ST_VIEW_PARAM_ID, StViewParamFields::ViewId, &row.view_id.into())? - .next() - .is_some(); Ok(ViewDefInfo { view_id: row.view_id, - has_args, is_anonymous: row.is_anonymous, }) }) diff --git a/crates/execution/src/lib.rs b/crates/execution/src/lib.rs index f0aeeb11dfc..34d02317d1f 100644 --- a/crates/execution/src/lib.rs +++ b/crates/execution/src/lib.rs @@ -1,14 +1,15 @@ use anyhow::Result; use core::hash::{Hash, Hasher}; use core::ops::RangeBounds; -use spacetimedb_lib::{identity::AuthCtx, query::Delta, AlgebraicType, Identity}; -use spacetimedb_physical_plan::plan::{ParamResolver, ProjectField, TupleField}; +use spacetimedb_lib::{hash_sender_view_args, identity::AuthCtx, query::Delta, AlgebraicType, Identity}; +use spacetimedb_physical_plan::plan::{ + ParamResolver, ParamSlot, ProjectField, TupleField, PARAM_SENDER, PARAM_VIEW_ARG_HASH, +}; use spacetimedb_primitives::{ColList, IndexId, TableId}; use spacetimedb_sats::bsatn::{BufReservedFill, EncodeError, ToBsatn}; use spacetimedb_sats::buffer::BufWriter; use spacetimedb_sats::product_value::InvalidFieldError; -use spacetimedb_sats::{impl_serialize, AlgebraicValue, ProductValue}; -use spacetimedb_sql_parser::ast::Parameter; +use spacetimedb_sats::{impl_serialize, u256, AlgebraicValue, ProductValue}; use spacetimedb_table::{static_assert_size, table::RowRef}; pub mod dml; @@ -17,11 +18,15 @@ pub mod pipelined; #[derive(Debug, Clone, Copy)] pub struct ExecutionParams { sender: Identity, + view_arg_hash: u256, } impl ExecutionParams { pub fn from_sender(sender: Identity) -> Self { - Self { sender } + Self { + sender, + view_arg_hash: hash_sender_view_args(sender).to_u256(), + } } pub fn from_auth(auth: &AuthCtx) -> Self { @@ -30,11 +35,14 @@ impl ExecutionParams { } impl ParamResolver for ExecutionParams { - fn resolve_param(&self, param: Parameter, ty: &AlgebraicType) -> AlgebraicValue { + fn resolve_param(&self, param: ParamSlot, ty: &AlgebraicType) -> AlgebraicValue { match param { - Parameter::Sender if ty.is_identity() => self.sender.into(), - Parameter::Sender if ty.is_bytes() => AlgebraicValue::Bytes(self.sender.to_be_byte_array().into()), - Parameter::Sender => panic!("unsupported type for :sender: {ty:?}"), + PARAM_SENDER if ty.is_identity() => self.sender.into(), + PARAM_SENDER if ty.is_bytes() => AlgebraicValue::Bytes(self.sender.to_be_byte_array().into()), + PARAM_SENDER => panic!("unsupported type for :sender: {ty:?}"), + PARAM_VIEW_ARG_HASH if matches!(ty, AlgebraicType::U256) => AlgebraicValue::U256(self.view_arg_hash.into()), + PARAM_VIEW_ARG_HASH => panic!("unsupported type for view arg hash: {ty:?}"), + ParamSlot(slot) => panic!("unknown physical plan parameter slot: {slot}"), } } } diff --git a/crates/lib/src/lib.rs b/crates/lib/src/lib.rs index 2b20be8c685..aedded78ad1 100644 --- a/crates/lib/src/lib.rs +++ b/crates/lib/src/lib.rs @@ -23,6 +23,7 @@ pub mod query; pub mod scheduler; pub mod st_var; pub mod version; +pub mod view_args; pub mod type_def { pub use spacetimedb_sats::{AlgebraicType, ProductType, ProductTypeElement, SumType}; @@ -47,6 +48,10 @@ pub use spacetimedb_sats::__make_register_reftype; pub use spacetimedb_sats::{self as sats, bsatn, buffer, de, ser}; pub use spacetimedb_sats::{AlgebraicType, ProductType, ProductTypeElement, SumType}; pub use spacetimedb_sats::{AlgebraicValue, ProductValue}; +pub use view_args::{ + empty_view_arg_hash_value, hash_empty_view_args, hash_sender_view_args, hash_view_args, sender_view_arg_hash_value, + VIEW_ARGS_HASH_DOMAIN, +}; pub const MODULE_ABI_MAJOR_VERSION: u16 = 10; diff --git a/crates/lib/src/view_args.rs b/crates/lib/src/view_args.rs new file mode 100644 index 00000000000..81414d94337 --- /dev/null +++ b/crates/lib/src/view_args.rs @@ -0,0 +1,33 @@ +use crate::{Hash, Identity}; +use spacetimedb_sats::{bsatn::ToBsatn, AlgebraicValue, ProductValue}; + +pub const VIEW_ARGS_HASH_DOMAIN: &[u8] = b"spacetimedb::view::args::v1\0"; + +pub fn hash_view_args(args_bsatn: &[u8]) -> Hash { + let mut hasher = blake3::Hasher::new(); + hasher.update(VIEW_ARGS_HASH_DOMAIN); + hasher.update(args_bsatn); + Hash::from_byte_array(*hasher.finalize().as_bytes()) +} + +pub fn hash_empty_view_args() -> Hash { + let args_bsatn = ProductValue::default() + .to_bsatn_vec() + .expect("empty view args should serialize"); + hash_view_args(&args_bsatn) +} + +pub fn hash_sender_view_args(sender: Identity) -> Hash { + let args_bsatn = ProductValue::from_iter([sender.into()]) + .to_bsatn_vec() + .expect("sender view args should serialize"); + hash_view_args(&args_bsatn) +} + +pub fn empty_view_arg_hash_value() -> AlgebraicValue { + AlgebraicValue::U256(hash_empty_view_args().to_u256().into()) +} + +pub fn sender_view_arg_hash_value(sender: Identity) -> AlgebraicValue { + AlgebraicValue::U256(hash_sender_view_args(sender).to_u256().into()) +} diff --git a/crates/physical-plan/src/compile.rs b/crates/physical-plan/src/compile.rs index 9cb8d6435bb..322416d7ccb 100644 --- a/crates/physical-plan/src/compile.rs +++ b/crates/physical-plan/src/compile.rs @@ -3,10 +3,12 @@ use crate::dml::{DeletePlan, MutationPlan, UpdatePlan}; use crate::plan::{ HashJoin, Label, PhysicalExpr, PhysicalPlan, ProjectListPlan, ProjectPlan, Semi, TableScan, TupleField, + PARAM_SENDER, }; use spacetimedb_data_structures::map::HashMap; use spacetimedb_expr::expr::{Expr, FieldProject, LeftDeepJoin, ProjectList, ProjectName, RelExpr, Relvar}; use spacetimedb_expr::statement::DML; +use spacetimedb_sql_parser::ast::Parameter; pub trait VarLabel { fn label(&mut self, name: &str) -> Label; @@ -21,7 +23,7 @@ fn compile_expr(expr: Expr, var: &mut impl VarLabel) -> PhysicalExpr { PhysicalExpr::BinOp(op, a, b) } Expr::Value(v, _) => PhysicalExpr::Value(v), - Expr::Param(param, ty) => PhysicalExpr::Param(param, ty), + Expr::Param(Parameter::Sender, ty) => PhysicalExpr::Param(PARAM_SENDER, ty), Expr::Field(proj) => PhysicalExpr::Field(compile_field_project(var, proj)), } } diff --git a/crates/physical-plan/src/plan.rs b/crates/physical-plan/src/plan.rs index cc1872f7955..3dfbd82d80c 100644 --- a/crates/physical-plan/src/plan.rs +++ b/crates/physical-plan/src/plan.rs @@ -6,10 +6,12 @@ use spacetimedb_expr::{ expr::{AggType, CollectViews}, StatementSource, }; -use spacetimedb_lib::{query::Delta, sats::size_of::SizeOf, AlgebraicType, AlgebraicValue, ProductValue}; +use spacetimedb_lib::{ + empty_view_arg_hash_value, query::Delta, sats::size_of::SizeOf, AlgebraicType, AlgebraicValue, ProductValue, +}; use spacetimedb_primitives::{ColId, ColOrCols, ColSet, IndexId, TableId, ViewId}; -use spacetimedb_schema::schema::{IndexSchema, TableSchema}; -use spacetimedb_sql_parser::ast::{BinOp, LogOp, Parameter}; +use spacetimedb_schema::schema::{IndexSchema, TableSchema, VIEW_ARG_HASH_COL}; +use spacetimedb_sql_parser::ast::{BinOp, LogOp}; use spacetimedb_table::table::RowRef; use std::{ borrow::Cow, @@ -32,9 +34,16 @@ pub struct Label(pub usize); /// Concrete variable bindings are supplied at runtime /// and implement this minimal interface. pub trait ParamResolver { - fn resolve_param(&self, param: Parameter, ty: &AlgebraicType) -> AlgebraicValue; + fn resolve_param(&self, param: ParamSlot, ty: &AlgebraicType) -> AlgebraicValue; } +/// A runtime parameter slot in a physical plan. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct ParamSlot(pub u16); + +pub const PARAM_SENDER: ParamSlot = ParamSlot(0); +pub const PARAM_VIEW_ARG_HASH: ParamSlot = ParamSlot(1); + /// Physical plans always terminate with a projection. /// This type of projection returns row ids. /// @@ -541,8 +550,8 @@ impl PhysicalPlan { Ok(optimized) } - /// If a view is not anonymous, its backing table has a `sender` column. - /// This column tracks which rows belong to which caller. + /// If a view has private arguments, its backing table has an `arg_hash` column. + /// This column tracks which rows belong to which argument tuple. /// /// As a result, queries over such views cannot read the entire backing table. /// They must only select the rows corresponding to the caller of the query. @@ -553,24 +562,31 @@ impl PhysicalPlan { /// SELECT * FROM my_view /// ``` /// - /// becomes + /// becomes the equivalent of /// ```sql - /// SELECT * FROM my_view WHERE sender = :sender + /// SELECT * FROM my_view WHERE arg_hash = /// ``` fn expand_views(self) -> Self { match self { - Self::TableScan(scan, label) if scan.schema.is_view() && !scan.schema.is_anonymous_view() => Self::Filter( - Box::new(Self::TableScan(scan, label)), - PhysicalExpr::BinOp( - BinOp::Eq, - Box::new(PhysicalExpr::Param(Parameter::Sender, AlgebraicType::identity())), - Box::new(PhysicalExpr::Field(TupleField { - label, - label_pos: None, - field_pos: 0, - })), - ), - ), + Self::TableScan(scan, label) if scan.schema.is_view() => { + let arg_hash = if scan.schema.is_anonymous_view() { + PhysicalExpr::Value(empty_view_arg_hash_value()) + } else { + PhysicalExpr::Param(PARAM_VIEW_ARG_HASH, AlgebraicType::U256) + }; + Self::Filter( + Box::new(Self::TableScan(scan, label)), + PhysicalExpr::BinOp( + BinOp::Eq, + Box::new(arg_hash), + Box::new(PhysicalExpr::Field(TupleField { + label, + label_pos: None, + field_pos: VIEW_ARG_HASH_COL.idx(), + })), + ), + ) + } Self::IxJoin( IxJoin { lhs, @@ -1392,7 +1408,7 @@ pub enum PhysicalExpr { /// A constant algebraic value Value(AlgebraicValue), /// A runtime parameter. - Param(Parameter, AlgebraicType), + Param(ParamSlot, AlgebraicType), /// A field projection expression Field(TupleField), } diff --git a/crates/schema/src/schema.rs b/crates/schema/src/schema.rs index 7eac68d79f1..4079d98a70a 100644 --- a/crates/schema/src/schema.rs +++ b/crates/schema/src/schema.rs @@ -55,15 +55,10 @@ pub trait Schema: Sized { #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct ViewDefInfo { pub view_id: ViewId, - pub has_args: bool, pub is_anonymous: bool, } -impl ViewDefInfo { - pub fn num_private_cols(&self) -> usize { - (if self.is_anonymous { 0 } else { 1 }) + (if self.has_args { 1 } else { 0 }) - } -} +pub const VIEW_ARG_HASH_COL: ColId = ColId(0); /// A wrapper around a [`TableSchema`] for views. #[derive(Debug, Clone, PartialEq, Eq)] @@ -99,7 +94,7 @@ impl TableOrViewSchema { } /// Returns the [`TableSchema`] of the underlying datastore table. - /// For views, this schema will include the internal `sender` and `arg_id` columns. + /// For views, this schema will include the internal `arg_hash` column when present. pub fn inner(&self) -> Arc { self.inner.clone() } @@ -109,32 +104,10 @@ impl TableOrViewSchema { /// The [`ColId`]s in this list do not necessarily correspond to their position in this list. /// Rather they correspond to the position of the column in the physical datastore table. /// This is important since this method may not return all columns recorded in the datastore. - /// For views in particular it will not include the internal `sender` and `arg_id` columns. + /// For views in particular it will not include the internal `arg_hash` column. /// Hence columns in this list should be looked up by their [`ColId`] - not their position. pub fn public_columns(&self) -> &[ColumnSchema] { - match self.view_info { - Some(ViewDefInfo { - has_args: true, - is_anonymous: false, - .. - }) => &self.inner.columns[2..], - Some(ViewDefInfo { - has_args: true, - is_anonymous: true, - .. - }) => &self.inner.columns[1..], - Some(ViewDefInfo { - has_args: false, - is_anonymous: false, - .. - }) => &self.inner.columns[1..], - Some(ViewDefInfo { - has_args: false, - is_anonymous: true, - .. - }) - | None => &self.inner.columns, - } + &self.inner.columns[self.inner.num_private_cols()..] } /// Check if the `col_name` exist on this [`TableOrViewSchema`] @@ -297,10 +270,7 @@ impl TableSchema { /// How many private columns does this table have? /// Will only be non-zero in the case of views. pub fn num_private_cols(&self) -> usize { - self.view_info - .as_ref() - .map(|view_info| view_info.num_private_cols()) - .unwrap_or_default() + usize::from(self.is_view()) } /// Update the table id of this schema. @@ -759,9 +729,7 @@ impl TableSchema { let ViewDef { name, is_public, - is_anonymous, primary_key, - param_columns, return_columns, .. } = view_def; @@ -783,16 +751,10 @@ impl TableSchema { StAccess::Private }; - let view_info = ViewDefInfo { - view_id: ViewId::SENTINEL, - has_args: !param_columns.is_empty(), - is_anonymous: *is_anonymous, - }; - TableSchema::new( TableId::SENTINEL, TableName::new(name.clone()), - Some(view_info), + None, columns, vec![], vec![], @@ -820,25 +782,33 @@ impl TableSchema { /// fn my_view(ctx: &ViewContext, x: u32, y: u32) -> Vec { ... } /// /// #[view(accessor = my_anonymous_view, public)] - /// fn my_anonymous_view(ctx: &AnonymousViewContext, x: u32, y: u32) -> Vec { ... } + /// fn my_anonymous_view(ctx: &AnonymousViewContext) -> Vec { ... } + /// + /// #[view(accessor = my_parameterized_anonymous_view, public)] + /// fn my_parameterized_anonymous_view(ctx: &AnonymousViewContext, x: u32, y: u32) -> Vec { ... } /// ``` /// /// The above views are materialized with the following schema: /// /// my_view: /// - /// | sender | arg_id | a | b | - /// |----------------|--------|-----|-----| - /// | (some = 0x...) | u64 | u32 | u32 | + /// | arg_hash | a | b | + /// |----------|-----|-----| + /// | u256 | u32 | u32 | /// /// my_anonymous_view: /// - /// | sender | arg_id | a | b | - /// |-------------|--------|-----|-----| - /// | (none = ()) | u64 | u32 | u32 | + /// | arg_hash | a | b | + /// |----------|-----|-----| + /// | u256 | u32 | u32 | /// - /// Note, `sender` and `arg_id` are internal columns not defined by the module, - /// where `arg_id` is a foreign key into `st_view_arg`. + /// my_parameterized_anonymous_view: + /// + /// | arg_hash | a | b | + /// |----------|-----|-----| + /// | u256 | u32 | u32 | + /// + /// Note, `arg_hash` is an internal column not defined by the module. pub fn from_view_def_for_datastore(module_def: &ModuleDef, view_def: &ViewDef) -> Self { module_def.expect_contains(view_def); @@ -847,41 +817,26 @@ impl TableSchema { is_public, is_anonymous, primary_key, - param_columns, return_columns, accessor_name, .. } = view_def; - let n = return_columns.len() + 2; - let mut columns = Vec::with_capacity(n); - let mut meta_cols = 0; - - let mut push_column = |name: &'static str, col_type| { - meta_cols += 1; - columns.push(ColumnSchema { - table_id: TableId::SENTINEL, - col_pos: columns.len().into(), - col_name: Identifier::new_assume_valid(name.into()), - col_type, - alias: None, - }); - }; - - if !is_anonymous { - push_column("sender", AlgebraicType::identity()); - } - - if !param_columns.is_empty() { - push_column("arg_id", AlgebraicType::U64); - } + let mut columns = Vec::with_capacity(return_columns.len() + 1); + columns.push(ColumnSchema { + table_id: TableId::SENTINEL, + col_pos: VIEW_ARG_HASH_COL, + col_name: Identifier::new_assume_valid("arg_hash".into()), + col_type: AlgebraicType::U256, + alias: None, + }); columns.extend( return_columns .iter() .map(|def| ColumnSchema::from_view_column_def(module_def, def)) .enumerate() - .map(|(i, schema)| (ColId::from(meta_cols + i), schema)) + .map(|(i, schema)| (ColId::from(1 + i), schema)) .map(|(col_pos, schema)| ColumnSchema { col_pos, ..schema }), ); @@ -895,50 +850,22 @@ impl TableSchema { RawIdentifier::new(format!("{name}_{cols_name}_key")) }; - let mut indexes = match meta_cols { - 1 => vec![IndexSchema { - index_id: IndexId::SENTINEL, - table_id: TableId::SENTINEL, - index_name: make_index_name(&col_list![0]), - index_algorithm: IndexAlgorithm::BTree(col_list![0].into()), - alias: None, - }], - 2 => vec![IndexSchema { - index_id: IndexId::SENTINEL, - table_id: TableId::SENTINEL, - index_name: make_index_name(&col_list![0, 1]), - index_algorithm: IndexAlgorithm::BTree(col_list![0, 1].into()), - alias: None, - }], - _ => vec![], - }; + let arg_hash_cols = col_list![VIEW_ARG_HASH_COL]; + let mut indexes = vec![IndexSchema { + index_id: IndexId::SENTINEL, + table_id: TableId::SENTINEL, + index_name: make_index_name(&arg_hash_cols), + index_algorithm: IndexAlgorithm::BTree(arg_hash_cols.into()), + alias: None, + }]; let mut constraints = vec![]; let view_primary_key = (module_def.raw_module_def_version() == RawModuleDefVersion::V10) - .then_some(primary_key.map(|pk| ColId::from(meta_cols + pk.idx()))) + .then_some(primary_key.map(|pk| ColId::from(1 + pk.idx()))) .flatten(); - if *is_anonymous { - if let Some(pk_col) = view_primary_key { - let cols = col_list![pk_col]; - constraints.push(ConstraintSchema { - table_id: TableId::SENTINEL, - constraint_id: ConstraintId::SENTINEL, - constraint_name: make_constraint_name(&cols), - data: ConstraintData::Unique(UniqueConstraintData { - columns: ColSet::from(cols.clone()), - }), - }); - indexes.push(IndexSchema { - index_id: IndexId::SENTINEL, - table_id: TableId::SENTINEL, - index_name: make_index_name(&cols), - index_algorithm: IndexAlgorithm::BTree(cols.into()), - alias: None, - }); - } - } else if let Some(pk_col) = view_primary_key { - let cols = col_list![ColId(0), pk_col]; + if let Some(pk_col) = view_primary_key { + let cols = col_list![VIEW_ARG_HASH_COL, pk_col]; constraints.push(ConstraintSchema { table_id: TableId::SENTINEL, constraint_id: ConstraintId::SENTINEL, @@ -964,7 +891,6 @@ impl TableSchema { let view_info = ViewDefInfo { view_id: ViewId::SENTINEL, - has_args: !param_columns.is_empty(), is_anonymous: *is_anonymous, }; @@ -979,7 +905,7 @@ impl TableSchema { StTableType::User, table_access, None, - if *is_anonymous { view_primary_key } else { None }, + None, false, Some(accessor_name.clone()), )