Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions crates/bench/benches/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use spacetimedb_bench::database::BenchDatabase as _;
use spacetimedb_bench::spacetime_raw::SpacetimeRaw;
use spacetimedb_client_api_messages::websocket::v1::BsatnFormat;
use spacetimedb_datastore::execution_context::Workload;
use spacetimedb_execution::pipelined::PipelinedProject;
use spacetimedb_execution::{pipelined::PipelinedProject, ExecutionParams};
use spacetimedb_primitives::{col_list, TableId};
use spacetimedb_query::compile_subscription;
use spacetimedb_sats::{bsatn, product, AlgebraicType, AlgebraicValue};
Expand Down Expand Up @@ -113,9 +113,10 @@ fn eval(c: &mut Criterion) {
let auth = AuthCtx::for_testing();
let schema_viewer = &SchemaViewer::new(&tx, &auth);
let (plans, table_id, table_name, _) = compile_subscription(sql, schema_viewer, &auth).unwrap();
let params = ExecutionParams::from_auth(&auth);
let plans = plans
.into_iter()
.map(|plan| plan.optimize(&auth).unwrap())
.map(|plan| plan.optimize().unwrap())
.map(PipelinedProject::from)
.collect::<Vec<_>>();
let tx = DeltaTx::from(&tx);
Expand All @@ -126,6 +127,7 @@ fn eval(c: &mut Criterion) {
table_id,
table_name.clone(),
&tx,
&params,
TableUpdateType::Subscribe,
&bsatn_rlb_pool,
));
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/estimation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ mod tests {
.map(|(plans, ..)| plans)
.expect("failed to compile sql query")
.into_iter()
.map(|plan| plan.optimize(&auth).expect("failed to optimize sql query"))
.map(|plan| plan.optimize().expect("failed to optimize sql query"))
.map(|plan| row_estimate(&tx, &plan))
.sum()
}
Expand All @@ -166,7 +166,7 @@ mod tests {
.map(|(plans, ..)| plans)
.expect("failed to compile sql query")
.into_iter()
.map(|plan| plan.optimize(&auth).expect("failed to optimize sql query"))
.map(|plan| plan.optimize().expect("failed to optimize sql query"))
.map(|plan| estimate_rows_scanned(&tx, plan.physical_plan()))
.sum()
}
Expand Down
9 changes: 6 additions & 3 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use spacetimedb_datastore::locking_tx_datastore::{MutTxId, ViewCallInfo};
use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData};
pub use spacetimedb_durability::{DurabilityExited, DurableOffset};
use spacetimedb_execution::pipelined::PipelinedProject;
use spacetimedb_execution::ExecutionParams;
use spacetimedb_execution::RelValue;
use spacetimedb_expr::expr::CollectViews;
use spacetimedb_lib::db::raw_def::v9::Lifecycle;
Expand Down Expand Up @@ -3278,7 +3279,7 @@ impl ModuleHost {
// Optimize each fragment.
let optimized = plans
.into_iter()
.map(|plan| plan.optimize(auth))
.map(|plan| plan.optimize())
.collect::<Result<Vec<_>, _>>()?;

check_row_limit(
Expand Down Expand Up @@ -3306,10 +3307,12 @@ impl ModuleHost {

let table_name = table_name.into();
let delta_tx = DeltaTx::from(tx);
let params = ExecutionParams::from_auth(auth);
let plan_fragments = optimized.iter();
let (rows, _, metrics) = if returns_view_table && num_private_cols > 0 {
execute_plan_for_view::<F>(optimized.iter(), num_cols, num_private_cols, &delta_tx, rlb_pool)
execute_plan_for_view::<F>(plan_fragments, num_cols, num_private_cols, &delta_tx, &params, rlb_pool)
} else {
execute_plan::<F>(optimized.iter(), &delta_tx, rlb_pool)
execute_plan::<F>(optimized.iter(), &delta_tx, &params, rlb_pool)
}
.context("One-off queries are not allowed to modify the database")?;

Expand Down
5 changes: 4 additions & 1 deletion crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use spacetimedb_datastore::error::{DatastoreError, ViewError};
use spacetimedb_datastore::execution_context::{self, ReducerContext, Workload};
use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, MutTxId, ViewCallInfo};
use spacetimedb_datastore::traits::{IsolationLevel, Program};
use spacetimedb_execution::ExecutionParams;
use spacetimedb_lib::buffer::DecodeError;
use spacetimedb_lib::db::raw_def::v9::{Lifecycle, ViewResultHeader};
use spacetimedb_lib::de::DeserializeSeed;
Expand Down Expand Up @@ -206,14 +207,16 @@ pub(crate) fn run_query_for_view(
let mut metrics = ExecutionMetrics::default();
let mut rows = Vec::new();

let params = ExecutionParams::from_auth(&auth);

for plan in plans {
// Track read sets for all tables involved in this plan.
// TODO(jsdt): This means we will rerun the view and query for any change to these tables, so we should optimize this asap.
for table_id in plan.table_ids() {
tx.record_table_scan(&op, table_id);
}

plan.base_plan().execute(&*tx, &mut metrics, &mut |row| {
plan.base_plan().execute(&*tx, &params, &mut metrics, &mut |row| {
rows.push(row.to_product_value());
Ok(())
})?;
Expand Down
15 changes: 11 additions & 4 deletions crates/core/src/subscription/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use spacetimedb_datastore::{
};
use spacetimedb_execution::{pipelined::PipelinedProject, Datastore, DeltaStore, Row};
use spacetimedb_lib::{metrics::ExecutionMetrics, Identity};
use spacetimedb_physical_plan::plan::ParamResolver;
use spacetimedb_primitives::{ColList, TableId};
use spacetimedb_sats::bsatn::ToBsatn;
use spacetimedb_sats::Serialize;
Expand Down Expand Up @@ -102,6 +103,7 @@ pub fn execute_plan_for_view<'p, F>(
num_cols: usize,
num_private_cols: usize,
tx: &(impl Datastore + DeltaStore),
params: &impl ParamResolver,
rlb_pool: &impl RowListBuilderSource<F>,
) -> Result<(F::List, u64, ExecutionMetrics)>
where
Expand All @@ -110,7 +112,7 @@ where
build_list_with_executor(rlb_pool, |metrics, add| {
let col_list = ColList::from_iter(num_private_cols..num_cols);
for fragment in plan_fragments {
fragment.execute(tx, metrics, &mut |row| match row {
fragment.execute(tx, params, metrics, &mut |row| match row {
Row::Ptr(ptr) => add(ptr.project_product(&col_list)?),
Row::Ref(val) => add(val.project_product(&col_list)?),
})?;
Expand All @@ -123,14 +125,15 @@ where
pub fn execute_plan<'p, F>(
plan_fragments: impl IntoIterator<Item = &'p PipelinedProject>,
tx: &(impl Datastore + DeltaStore),
params: &impl ParamResolver,
rlb_pool: &impl RowListBuilderSource<F>,
) -> Result<(F::List, u64, ExecutionMetrics)>
where
F: BuildableWebsocketFormat,
{
build_list_with_executor(rlb_pool, |metrics, add| {
for fragment in plan_fragments {
fragment.execute(tx, metrics, add)?;
fragment.execute(tx, params, metrics, add)?;
}
Ok(())
})
Expand Down Expand Up @@ -208,14 +211,15 @@ pub fn collect_table_update_for_view<'p, Tx, F>(
table_id: TableId,
table_name: TableName,
tx: &Tx,
params: &impl ParamResolver,
update_type: TableUpdateType,
rlb_pool: &impl RowListBuilderSource<F>,
) -> Result<(ws_v1::TableUpdate<F>, ExecutionMetrics)>
where
Tx: Datastore + DeltaStore,
F: BuildableWebsocketFormat,
{
execute_plan_for_view::<F>(plan_fragments, num_cols, num_private_cols, tx, rlb_pool).map(
execute_plan_for_view::<F>(plan_fragments, num_cols, num_private_cols, tx, params, rlb_pool).map(
|(rows, num_rows, metrics)| table_update_from_rows(rows, num_rows, metrics, table_id, table_name, update_type),
)
}
Expand All @@ -226,13 +230,14 @@ pub fn collect_table_update<'p, F>(
table_id: TableId,
table_name: TableName,
tx: &(impl Datastore + DeltaStore),
params: &impl ParamResolver,
update_type: TableUpdateType,
rlb_pool: &impl RowListBuilderSource<F>,
) -> Result<(ws_v1::TableUpdate<F>, ExecutionMetrics)>
where
F: BuildableWebsocketFormat,
{
execute_plan::<F>(plan_fragments, tx, rlb_pool).map(|(rows, num_rows, metrics)| {
execute_plan::<F>(plan_fragments, tx, params, rlb_pool).map(|(rows, num_rows, metrics)| {
table_update_from_rows(rows, num_rows, metrics, table_id, table_name, update_type)
})
}
Expand Down Expand Up @@ -265,6 +270,7 @@ pub fn execute_plans<F: BuildableWebsocketFormat>(
table_id,
table_name.clone(),
tx,
plan.params(),
update_type,
rlb_pool,
)?
Expand All @@ -274,6 +280,7 @@ pub fn execute_plans<F: BuildableWebsocketFormat>(
table_id,
table_name.clone(),
tx,
plan.params(),
update_type,
rlb_pool,
)?
Expand Down
6 changes: 6 additions & 0 deletions crates/core/src/subscription/module_subscription_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId};
use spacetimedb_datastore::traits::{IsolationLevel, TxData};
use spacetimedb_durability::TxOffset;
use spacetimedb_execution::ExecutionParams;
use spacetimedb_expr::expr::CollectViews;
use spacetimedb_lib::identity::RequestId;
use spacetimedb_lib::metrics::ExecutionMetrics;
Expand Down Expand Up @@ -467,6 +468,7 @@ impl ModuleSubscriptions {
.unwrap_or_default();

let tx = DeltaTx::from(tx);
let params = ExecutionParams::from_sender(sender.id.identity);

// TODO: See the comment on `collect_table_update_for_view`.
// The following view and non-view branches should be merged together,
Expand All @@ -479,6 +481,7 @@ impl ModuleSubscriptions {
table_id,
table_name.clone(),
&tx,
&params,
update_type,
&self.bsatn_rlb_pool,
)
Expand All @@ -488,6 +491,7 @@ impl ModuleSubscriptions {
table_id,
table_name.clone(),
&tx,
&params,
update_type,
&self.bsatn_rlb_pool,
)
Expand All @@ -499,6 +503,7 @@ impl ModuleSubscriptions {
table_id,
table_name,
&tx,
&params,
update_type,
&JsonRowListBuilderFakePool,
)
Expand All @@ -508,6 +513,7 @@ impl ModuleSubscriptions {
table_id,
table_name,
&tx,
&params,
update_type,
&JsonRowListBuilderFakePool,
)
Expand Down
30 changes: 23 additions & 7 deletions crates/execution/src/dml.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::Result;
use spacetimedb_lib::{metrics::ExecutionMetrics, AlgebraicValue, ProductValue};
use spacetimedb_physical_plan::dml::{DeletePlan, InsertPlan, MutationPlan, UpdatePlan};
use spacetimedb_physical_plan::plan::ParamResolver;
use spacetimedb_primitives::{ColId, TableId};
use spacetimedb_sats::size_of::SizeOf;

Expand Down Expand Up @@ -30,11 +31,16 @@ impl From<MutationPlan> for MutExecutor {
}

impl MutExecutor {
pub fn execute<Tx: MutDatastore>(&self, tx: &mut Tx, metrics: &mut ExecutionMetrics) -> Result<()> {
pub fn execute<Tx: MutDatastore>(
&self,
tx: &mut Tx,
params: &impl ParamResolver,
metrics: &mut ExecutionMetrics,
) -> Result<()> {
match self {
Self::Insert(exec) => exec.execute(tx, metrics),
Self::Delete(exec) => exec.execute(tx, metrics),
Self::Update(exec) => exec.execute(tx, metrics),
Self::Delete(exec) => exec.execute(tx, params, metrics),
Self::Update(exec) => exec.execute(tx, params, metrics),
}
}
}
Expand Down Expand Up @@ -84,10 +90,15 @@ impl From<DeletePlan> for DeleteExecutor {
}

impl DeleteExecutor {
fn execute<Tx: MutDatastore>(&self, tx: &mut Tx, metrics: &mut ExecutionMetrics) -> Result<()> {
fn execute<Tx: MutDatastore>(
&self,
tx: &mut Tx,
params: &impl ParamResolver,
metrics: &mut ExecutionMetrics,
) -> Result<()> {
// TODO: Delete by row id instead of product value
let mut deletes = vec![];
self.filter.execute(tx, metrics, &mut |row| {
self.filter.execute(tx, params, metrics, &mut |row| {
deletes.push(row.to_product_value());
Ok(())
})?;
Expand Down Expand Up @@ -122,9 +133,14 @@ impl From<UpdatePlan> for UpdateExecutor {
}

impl UpdateExecutor {
fn execute<Tx: MutDatastore>(&self, tx: &mut Tx, metrics: &mut ExecutionMetrics) -> Result<()> {
fn execute<Tx: MutDatastore>(
&self,
tx: &mut Tx,
params: &impl ParamResolver,
metrics: &mut ExecutionMetrics,
) -> Result<()> {
let mut deletes = vec![];
self.filter.execute(tx, metrics, &mut |row| {
self.filter.execute(tx, params, metrics, &mut |row| {
deletes.push(row.to_product_value());
Ok(())
})?;
Expand Down
30 changes: 28 additions & 2 deletions crates/execution/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,44 @@
use anyhow::Result;
use core::hash::{Hash, Hasher};
use core::ops::RangeBounds;
use spacetimedb_lib::query::Delta;
use spacetimedb_physical_plan::plan::{ProjectField, TupleField};
use spacetimedb_lib::{identity::AuthCtx, query::Delta, AlgebraicType, Identity};
use spacetimedb_physical_plan::plan::{ParamResolver, ProjectField, TupleField};
use spacetimedb_primitives::{ColList, IndexId, TableId};
use spacetimedb_sats::bsatn::{BufReservedFill, EncodeError, ToBsatn};
use spacetimedb_sats::buffer::BufWriter;
use spacetimedb_sats::product_value::InvalidFieldError;
use spacetimedb_sats::{impl_serialize, AlgebraicValue, ProductValue};
use spacetimedb_sql_parser::ast::Parameter;
use spacetimedb_table::{static_assert_size, table::RowRef};

pub mod dml;
pub mod pipelined;

#[derive(Debug, Clone, Copy)]
pub struct ExecutionParams {
sender: Identity,
}

impl ExecutionParams {
pub fn from_sender(sender: Identity) -> Self {
Self { sender }
}

pub fn from_auth(auth: &AuthCtx) -> Self {
Self::from_sender(auth.caller())
}
}

impl ParamResolver for ExecutionParams {
fn resolve_param(&self, param: Parameter, ty: &AlgebraicType) -> AlgebraicValue {
match param {
Parameter::Sender if ty.is_identity() => self.sender.into(),
Parameter::Sender if ty.is_bytes() => AlgebraicValue::Bytes(self.sender.to_be_byte_array().into()),
Parameter::Sender => panic!("unsupported type for :sender: {ty:?}"),
}
}
}

pub trait Datastore {
/// Iterator type for table scans
type TableIter<'a>: Iterator<Item = RowRef<'a>> + 'a
Expand Down
Loading
Loading