diff --git a/crates/bench/benches/subscription.rs b/crates/bench/benches/subscription.rs index ebd8e83e35c..49ce21489e2 100644 --- a/crates/bench/benches/subscription.rs +++ b/crates/bench/benches/subscription.rs @@ -11,7 +11,7 @@ use spacetimedb_bench::database::BenchDatabase as _; use spacetimedb_bench::spacetime_raw::SpacetimeRaw; use spacetimedb_client_api_messages::websocket::v1::BsatnFormat; use spacetimedb_datastore::execution_context::Workload; -use spacetimedb_execution::pipelined::PipelinedProject; +use spacetimedb_execution::{pipelined::PipelinedProject, ExecutionParams}; use spacetimedb_primitives::{col_list, TableId}; use spacetimedb_query::compile_subscription; use spacetimedb_sats::{bsatn, product, AlgebraicType, AlgebraicValue}; @@ -113,9 +113,10 @@ fn eval(c: &mut Criterion) { let auth = AuthCtx::for_testing(); let schema_viewer = &SchemaViewer::new(&tx, &auth); let (plans, table_id, table_name, _) = compile_subscription(sql, schema_viewer, &auth).unwrap(); + let params = ExecutionParams::from_auth(&auth); let plans = plans .into_iter() - .map(|plan| plan.optimize(&auth).unwrap()) + .map(|plan| plan.optimize().unwrap()) .map(PipelinedProject::from) .collect::>(); let tx = DeltaTx::from(&tx); @@ -126,6 +127,7 @@ fn eval(c: &mut Criterion) { table_id, table_name.clone(), &tx, + ¶ms, TableUpdateType::Subscribe, &bsatn_rlb_pool, )); diff --git a/crates/core/src/estimation.rs b/crates/core/src/estimation.rs index ced68fc5689..c7d3a29d148 100644 --- a/crates/core/src/estimation.rs +++ b/crates/core/src/estimation.rs @@ -152,7 +152,7 @@ mod tests { .map(|(plans, ..)| plans) .expect("failed to compile sql query") .into_iter() - .map(|plan| plan.optimize(&auth).expect("failed to optimize sql query")) + .map(|plan| plan.optimize().expect("failed to optimize sql query")) .map(|plan| row_estimate(&tx, &plan)) .sum() } @@ -166,7 +166,7 @@ mod tests { .map(|(plans, ..)| plans) .expect("failed to compile sql query") .into_iter() - .map(|plan| plan.optimize(&auth).expect("failed to optimize sql query")) + .map(|plan| plan.optimize().expect("failed to optimize sql query")) .map(|plan| estimate_rows_scanned(&tx, plan.physical_plan())) .sum() } diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index aa6344265c4..2207470d467 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -52,6 +52,7 @@ use spacetimedb_datastore::locking_tx_datastore::{MutTxId, ViewCallInfo}; use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData}; pub use spacetimedb_durability::{DurabilityExited, DurableOffset}; use spacetimedb_execution::pipelined::PipelinedProject; +use spacetimedb_execution::ExecutionParams; use spacetimedb_execution::RelValue; use spacetimedb_expr::expr::CollectViews; use spacetimedb_lib::db::raw_def::v9::Lifecycle; @@ -3278,7 +3279,7 @@ impl ModuleHost { // Optimize each fragment. let optimized = plans .into_iter() - .map(|plan| plan.optimize(auth)) + .map(|plan| plan.optimize()) .collect::, _>>()?; check_row_limit( @@ -3306,10 +3307,12 @@ impl ModuleHost { let table_name = table_name.into(); let delta_tx = DeltaTx::from(tx); + let params = ExecutionParams::from_auth(auth); + let plan_fragments = optimized.iter(); let (rows, _, metrics) = if returns_view_table && num_private_cols > 0 { - execute_plan_for_view::(optimized.iter(), num_cols, num_private_cols, &delta_tx, rlb_pool) + execute_plan_for_view::(plan_fragments, num_cols, num_private_cols, &delta_tx, ¶ms, rlb_pool) } else { - execute_plan::(optimized.iter(), &delta_tx, rlb_pool) + execute_plan::(optimized.iter(), &delta_tx, ¶ms, rlb_pool) } .context("One-off queries are not allowed to modify the database")?; diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 97d7a3e7fc5..e502e0e1efc 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -39,6 +39,7 @@ use spacetimedb_datastore::error::{DatastoreError, ViewError}; use spacetimedb_datastore::execution_context::{self, ReducerContext, Workload}; use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, MutTxId, ViewCallInfo}; use spacetimedb_datastore::traits::{IsolationLevel, Program}; +use spacetimedb_execution::ExecutionParams; use spacetimedb_lib::buffer::DecodeError; use spacetimedb_lib::db::raw_def::v9::{Lifecycle, ViewResultHeader}; use spacetimedb_lib::de::DeserializeSeed; @@ -206,6 +207,8 @@ pub(crate) fn run_query_for_view( let mut metrics = ExecutionMetrics::default(); let mut rows = Vec::new(); + let params = ExecutionParams::from_auth(&auth); + for plan in plans { // Track read sets for all tables involved in this plan. // TODO(jsdt): This means we will rerun the view and query for any change to these tables, so we should optimize this asap. @@ -213,7 +216,7 @@ pub(crate) fn run_query_for_view( tx.record_table_scan(&op, table_id); } - plan.base_plan().execute(&*tx, &mut metrics, &mut |row| { + plan.base_plan().execute(&*tx, ¶ms, &mut metrics, &mut |row| { rows.push(row.to_product_value()); Ok(()) })?; diff --git a/crates/core/src/subscription/mod.rs b/crates/core/src/subscription/mod.rs index 70565a82b1e..bd296b6f9f9 100644 --- a/crates/core/src/subscription/mod.rs +++ b/crates/core/src/subscription/mod.rs @@ -12,6 +12,7 @@ use spacetimedb_datastore::{ }; use spacetimedb_execution::{pipelined::PipelinedProject, Datastore, DeltaStore, Row}; use spacetimedb_lib::{metrics::ExecutionMetrics, Identity}; +use spacetimedb_physical_plan::plan::ParamResolver; use spacetimedb_primitives::{ColList, TableId}; use spacetimedb_sats::bsatn::ToBsatn; use spacetimedb_sats::Serialize; @@ -102,6 +103,7 @@ pub fn execute_plan_for_view<'p, F>( num_cols: usize, num_private_cols: usize, tx: &(impl Datastore + DeltaStore), + params: &impl ParamResolver, rlb_pool: &impl RowListBuilderSource, ) -> Result<(F::List, u64, ExecutionMetrics)> where @@ -110,7 +112,7 @@ where build_list_with_executor(rlb_pool, |metrics, add| { let col_list = ColList::from_iter(num_private_cols..num_cols); for fragment in plan_fragments { - fragment.execute(tx, metrics, &mut |row| match row { + fragment.execute(tx, params, metrics, &mut |row| match row { Row::Ptr(ptr) => add(ptr.project_product(&col_list)?), Row::Ref(val) => add(val.project_product(&col_list)?), })?; @@ -123,6 +125,7 @@ where pub fn execute_plan<'p, F>( plan_fragments: impl IntoIterator, tx: &(impl Datastore + DeltaStore), + params: &impl ParamResolver, rlb_pool: &impl RowListBuilderSource, ) -> Result<(F::List, u64, ExecutionMetrics)> where @@ -130,7 +133,7 @@ where { build_list_with_executor(rlb_pool, |metrics, add| { for fragment in plan_fragments { - fragment.execute(tx, metrics, add)?; + fragment.execute(tx, params, metrics, add)?; } Ok(()) }) @@ -208,6 +211,7 @@ pub fn collect_table_update_for_view<'p, Tx, F>( table_id: TableId, table_name: TableName, tx: &Tx, + params: &impl ParamResolver, update_type: TableUpdateType, rlb_pool: &impl RowListBuilderSource, ) -> Result<(ws_v1::TableUpdate, ExecutionMetrics)> @@ -215,7 +219,7 @@ where Tx: Datastore + DeltaStore, F: BuildableWebsocketFormat, { - execute_plan_for_view::(plan_fragments, num_cols, num_private_cols, tx, rlb_pool).map( + execute_plan_for_view::(plan_fragments, num_cols, num_private_cols, tx, params, rlb_pool).map( |(rows, num_rows, metrics)| table_update_from_rows(rows, num_rows, metrics, table_id, table_name, update_type), ) } @@ -226,13 +230,14 @@ pub fn collect_table_update<'p, F>( table_id: TableId, table_name: TableName, tx: &(impl Datastore + DeltaStore), + params: &impl ParamResolver, update_type: TableUpdateType, rlb_pool: &impl RowListBuilderSource, ) -> Result<(ws_v1::TableUpdate, ExecutionMetrics)> where F: BuildableWebsocketFormat, { - execute_plan::(plan_fragments, tx, rlb_pool).map(|(rows, num_rows, metrics)| { + execute_plan::(plan_fragments, tx, params, rlb_pool).map(|(rows, num_rows, metrics)| { table_update_from_rows(rows, num_rows, metrics, table_id, table_name, update_type) }) } @@ -265,6 +270,7 @@ pub fn execute_plans( table_id, table_name.clone(), tx, + plan.params(), update_type, rlb_pool, )? @@ -274,6 +280,7 @@ pub fn execute_plans( table_id, table_name.clone(), tx, + plan.params(), update_type, rlb_pool, )? diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index dca5810e13d..be77cc2af35 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -35,6 +35,7 @@ use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics; use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId}; use spacetimedb_datastore::traits::{IsolationLevel, TxData}; use spacetimedb_durability::TxOffset; +use spacetimedb_execution::ExecutionParams; use spacetimedb_expr::expr::CollectViews; use spacetimedb_lib::identity::RequestId; use spacetimedb_lib::metrics::ExecutionMetrics; @@ -467,6 +468,7 @@ impl ModuleSubscriptions { .unwrap_or_default(); let tx = DeltaTx::from(tx); + let params = ExecutionParams::from_sender(sender.id.identity); // TODO: See the comment on `collect_table_update_for_view`. // The following view and non-view branches should be merged together, @@ -479,6 +481,7 @@ impl ModuleSubscriptions { table_id, table_name.clone(), &tx, + ¶ms, update_type, &self.bsatn_rlb_pool, ) @@ -488,6 +491,7 @@ impl ModuleSubscriptions { table_id, table_name.clone(), &tx, + ¶ms, update_type, &self.bsatn_rlb_pool, ) @@ -499,6 +503,7 @@ impl ModuleSubscriptions { table_id, table_name, &tx, + ¶ms, update_type, &JsonRowListBuilderFakePool, ) @@ -508,6 +513,7 @@ impl ModuleSubscriptions { table_id, table_name, &tx, + ¶ms, update_type, &JsonRowListBuilderFakePool, ) diff --git a/crates/execution/src/dml.rs b/crates/execution/src/dml.rs index baa78e1e0e6..59c621fe8c3 100644 --- a/crates/execution/src/dml.rs +++ b/crates/execution/src/dml.rs @@ -1,6 +1,7 @@ use anyhow::Result; use spacetimedb_lib::{metrics::ExecutionMetrics, AlgebraicValue, ProductValue}; use spacetimedb_physical_plan::dml::{DeletePlan, InsertPlan, MutationPlan, UpdatePlan}; +use spacetimedb_physical_plan::plan::ParamResolver; use spacetimedb_primitives::{ColId, TableId}; use spacetimedb_sats::size_of::SizeOf; @@ -30,11 +31,16 @@ impl From for MutExecutor { } impl MutExecutor { - pub fn execute(&self, tx: &mut Tx, metrics: &mut ExecutionMetrics) -> Result<()> { + pub fn execute( + &self, + tx: &mut Tx, + params: &impl ParamResolver, + metrics: &mut ExecutionMetrics, + ) -> Result<()> { match self { Self::Insert(exec) => exec.execute(tx, metrics), - Self::Delete(exec) => exec.execute(tx, metrics), - Self::Update(exec) => exec.execute(tx, metrics), + Self::Delete(exec) => exec.execute(tx, params, metrics), + Self::Update(exec) => exec.execute(tx, params, metrics), } } } @@ -84,10 +90,15 @@ impl From for DeleteExecutor { } impl DeleteExecutor { - fn execute(&self, tx: &mut Tx, metrics: &mut ExecutionMetrics) -> Result<()> { + fn execute( + &self, + tx: &mut Tx, + params: &impl ParamResolver, + metrics: &mut ExecutionMetrics, + ) -> Result<()> { // TODO: Delete by row id instead of product value let mut deletes = vec![]; - self.filter.execute(tx, metrics, &mut |row| { + self.filter.execute(tx, params, metrics, &mut |row| { deletes.push(row.to_product_value()); Ok(()) })?; @@ -122,9 +133,14 @@ impl From for UpdateExecutor { } impl UpdateExecutor { - fn execute(&self, tx: &mut Tx, metrics: &mut ExecutionMetrics) -> Result<()> { + fn execute( + &self, + tx: &mut Tx, + params: &impl ParamResolver, + metrics: &mut ExecutionMetrics, + ) -> Result<()> { let mut deletes = vec![]; - self.filter.execute(tx, metrics, &mut |row| { + self.filter.execute(tx, params, metrics, &mut |row| { deletes.push(row.to_product_value()); Ok(()) })?; diff --git a/crates/execution/src/lib.rs b/crates/execution/src/lib.rs index a4f259b6885..f0aeeb11dfc 100644 --- a/crates/execution/src/lib.rs +++ b/crates/execution/src/lib.rs @@ -1,18 +1,44 @@ use anyhow::Result; use core::hash::{Hash, Hasher}; use core::ops::RangeBounds; -use spacetimedb_lib::query::Delta; -use spacetimedb_physical_plan::plan::{ProjectField, TupleField}; +use spacetimedb_lib::{identity::AuthCtx, query::Delta, AlgebraicType, Identity}; +use spacetimedb_physical_plan::plan::{ParamResolver, ProjectField, TupleField}; use spacetimedb_primitives::{ColList, IndexId, TableId}; use spacetimedb_sats::bsatn::{BufReservedFill, EncodeError, ToBsatn}; use spacetimedb_sats::buffer::BufWriter; use spacetimedb_sats::product_value::InvalidFieldError; use spacetimedb_sats::{impl_serialize, AlgebraicValue, ProductValue}; +use spacetimedb_sql_parser::ast::Parameter; use spacetimedb_table::{static_assert_size, table::RowRef}; pub mod dml; pub mod pipelined; +#[derive(Debug, Clone, Copy)] +pub struct ExecutionParams { + sender: Identity, +} + +impl ExecutionParams { + pub fn from_sender(sender: Identity) -> Self { + Self { sender } + } + + pub fn from_auth(auth: &AuthCtx) -> Self { + Self::from_sender(auth.caller()) + } +} + +impl ParamResolver for ExecutionParams { + fn resolve_param(&self, param: Parameter, ty: &AlgebraicType) -> AlgebraicValue { + match param { + Parameter::Sender if ty.is_identity() => self.sender.into(), + Parameter::Sender if ty.is_bytes() => AlgebraicValue::Bytes(self.sender.to_be_byte_array().into()), + Parameter::Sender => panic!("unsupported type for :sender: {ty:?}"), + } + } +} + pub trait Datastore { /// Iterator type for table scans type TableIter<'a>: Iterator> + 'a diff --git a/crates/execution/src/pipelined.rs b/crates/execution/src/pipelined.rs index edaeaef8b29..373519ba672 100644 --- a/crates/execution/src/pipelined.rs +++ b/crates/execution/src/pipelined.rs @@ -8,8 +8,8 @@ use itertools::Either; use spacetimedb_expr::expr::AggType; use spacetimedb_lib::{metrics::ExecutionMetrics, query::Delta, sats::size_of::SizeOf, AlgebraicValue, ProductValue}; use spacetimedb_physical_plan::plan::{ - HashJoin, IndexProbe, IxJoin, IxScan, PhysicalExpr, PhysicalPlan, ProjectField, ProjectListPlan, ProjectPlan, Semi, - TableScan, TupleField, + HashJoin, IndexProbe, IxJoin, IxScan, ParamResolver, PhysicalExpr, PhysicalPlan, ProjectField, ProjectListPlan, + ProjectPlan, Semi, TableScan, TupleField, }; use spacetimedb_primitives::{ColList, IndexId, TableId}; use spacetimedb_sats::product; @@ -81,6 +81,7 @@ impl ProjectListExecutor { pub fn execute( &self, tx: &Tx, + params: &impl ParamResolver, metrics: &mut ExecutionMetrics, f: &mut dyn FnMut(ProductValue) -> Result<()>, ) -> Result<()> { @@ -89,7 +90,7 @@ impl ProjectListExecutor { match self { Self::Name(plans) => { for plan in plans { - plan.execute(tx, metrics, &mut |row| { + plan.execute(tx, params, metrics, &mut |row| { n += 1; let row = row.to_product_value(); bytes_scanned += row.size_of(); @@ -99,7 +100,7 @@ impl ProjectListExecutor { } Self::View(plans) => { for plan in plans { - plan.execute(tx, metrics, &mut |row| { + plan.execute(tx, params, metrics, &mut |row| { n += 1; f(row) })?; @@ -107,7 +108,7 @@ impl ProjectListExecutor { } Self::List(plans, fields) => { for plan in plans { - plan.execute(tx, metrics, &mut |t| { + plan.execute(tx, params, metrics, &mut |t| { n += 1; let row = ProductValue::from_iter(fields.iter().map(|field| t.project(field))); bytes_scanned += row.size_of(); @@ -116,7 +117,7 @@ impl ProjectListExecutor { } } Self::Limit(plan, limit) => { - plan.execute(tx, metrics, &mut |row| { + plan.execute(tx, params, metrics, &mut |row| { n += 1; if n <= *limit as usize { f(row)?; @@ -136,7 +137,7 @@ impl ProjectListExecutor { n += tx.row_count(table_scan.table) as usize; } _ => { - plan.execute(tx, metrics, &mut |_| { + plan.execute(tx, params, metrics, &mut |_| { n += 1; Ok(()) })?; @@ -180,12 +181,13 @@ impl ViewProject { pub fn execute( &self, tx: &Tx, + params: &impl ParamResolver, metrics: &mut ExecutionMetrics, f: &mut dyn FnMut(ProductValue) -> Result<()>, ) -> Result<()> { let mut n = 0; let mut bytes_scanned = 0; - self.inner.execute(tx, metrics, &mut |row| match row { + self.inner.execute(tx, params, metrics, &mut |row| match row { Row::Ptr(ptr) => { n += 1; let col_list = ColList::from_iter(self.num_private_cols..self.num_cols); @@ -243,6 +245,7 @@ impl PipelinedProject { pub fn execute<'a, Tx: Datastore + DeltaStore>( &self, tx: &'a Tx, + params: &impl ParamResolver, metrics: &mut ExecutionMetrics, f: &mut dyn FnMut(Row<'a>) -> Result<()>, ) -> Result<()> { @@ -252,7 +255,7 @@ impl PipelinedProject { // No explicit projection. // This means the input does not return tuples. // It returns either row ids or product values. - plan.execute(tx, metrics, &mut |t| { + plan.execute(tx, params, metrics, &mut |t| { n += 1; if let Tuple::Row(row) = t { f(row)?; @@ -263,7 +266,7 @@ impl PipelinedProject { Self::Some(plan, i) => { // The contrary is true for explicit projections. // They return a tuple of row ids or product values. - plan.execute(tx, metrics, &mut |t| { + plan.execute(tx, params, metrics, &mut |t| { n += 1; if let Some(row) = t.select(*i) { f(row)?; @@ -384,17 +387,18 @@ impl PipelinedExecutor { pub fn execute<'a, Tx: Datastore + DeltaStore>( &self, tx: &'a Tx, + params: &impl ParamResolver, metrics: &mut ExecutionMetrics, f: &mut dyn FnMut(Tuple<'a>) -> Result<()>, ) -> Result<()> { match self { Self::TableScan(scan) => scan.execute(tx, metrics, f), - Self::IxScan(scan) => scan.execute(tx, metrics, f), - Self::IxJoin(join) => join.execute(tx, metrics, f), - Self::HashJoin(join) => join.execute(tx, metrics, f), - Self::NLJoin(join) => join.execute(tx, metrics, f), - Self::Filter(filter) => filter.execute(tx, metrics, f), - Self::Limit(limit) => limit.execute(tx, metrics, f), + Self::IxScan(scan) => scan.execute(tx, params, metrics, f), + Self::IxJoin(join) => join.execute(tx, params, metrics, f), + Self::HashJoin(join) => join.execute(tx, params, metrics, f), + Self::NLJoin(join) => join.execute(tx, params, metrics, f), + Self::Filter(filter) => filter.execute(tx, params, metrics, f), + Self::Limit(limit) => limit.execute(tx, params, metrics, f), } } } @@ -502,15 +506,6 @@ enum EvaluatedIndexProbe { Range(Bound, Bound), } -impl From for EvaluatedIndexProbe { - fn from(probe: IndexProbe) -> Self { - match probe { - IndexProbe::Point(point) => Self::Point(eval_static_probe(point)), - IndexProbe::Range(lower, upper) => Self::Range(eval_static_bound(lower), eval_static_bound(upper)), - } - } -} - /// A pipelined executor for scanning an index. #[derive(Debug)] pub struct PipelinedIxScan { @@ -520,7 +515,7 @@ pub struct PipelinedIxScan { pub index_id: IndexId, pub limit: Option, source: IndexSource, - probe: EvaluatedIndexProbe, + probe: IndexProbe, } impl From for PipelinedIxScan { @@ -537,7 +532,7 @@ impl From for PipelinedIxScan { index_id, limit, source: IndexSource::from_delta(delta), - probe: probe.into(), + probe, } } } @@ -550,20 +545,34 @@ impl ProjectField for NoRow { } } -fn eval_static_probe(expr: PhysicalExpr) -> AlgebraicValue { - expr.eval_with_metrics(&NoRow, &mut 0).into_owned() +fn eval_static_probe(expr: &PhysicalExpr, params: &impl ParamResolver) -> AlgebraicValue { + expr.eval_with_params(&NoRow, params, &mut 0).into_owned() } -fn eval_static_bound(bound: Bound) -> Bound { +fn eval_static_bound(bound: &Bound, params: &impl ParamResolver) -> Bound { match bound { - Bound::Included(expr) => Bound::Included(eval_static_probe(expr)), - Bound::Excluded(expr) => Bound::Excluded(eval_static_probe(expr)), + Bound::Included(expr) => Bound::Included(eval_static_probe(expr, params)), + Bound::Excluded(expr) => Bound::Excluded(eval_static_probe(expr, params)), Bound::Unbounded => Bound::Unbounded, } } -fn eval_probe(expr: &PhysicalExpr, row: &impl ProjectField, bytes_scanned: &mut usize) -> AlgebraicValue { - expr.eval_with_metrics(row, bytes_scanned).into_owned() +fn eval_index_probe(probe: &IndexProbe, params: &impl ParamResolver) -> EvaluatedIndexProbe { + match probe { + IndexProbe::Point(point) => EvaluatedIndexProbe::Point(eval_static_probe(point, params)), + IndexProbe::Range(lower, upper) => { + EvaluatedIndexProbe::Range(eval_static_bound(lower, params), eval_static_bound(upper, params)) + } + } +} + +fn eval_probe( + expr: &PhysicalExpr, + row: &impl ProjectField, + params: &impl ParamResolver, + bytes_scanned: &mut usize, +) -> AlgebraicValue { + expr.eval_with_params(row, params, bytes_scanned).into_owned() } fn for_each_index_scan_row<'a, Tx: Datastore + DeltaStore>( @@ -683,18 +692,12 @@ impl PipelinedIxScan { pub fn execute<'a, Tx: Datastore + DeltaStore>( &self, tx: &'a Tx, + params: &impl ParamResolver, metrics: &mut ExecutionMetrics, f: &mut dyn FnMut(Tuple<'a>) -> Result<()>, ) -> Result<()> { - let n = for_each_index_scan_row( - tx, - self.source, - self.table_id, - self.index_id, - &self.probe, - self.limit, - f, - )?; + let probe = eval_index_probe(&self.probe, params); + let n = for_each_index_scan_row(tx, self.source, self.table_id, self.index_id, &probe, self.limit, f)?; metrics.index_seeks += 1; metrics.rows_scanned += n; Ok(()) @@ -728,6 +731,7 @@ impl PipelinedIxJoin { pub fn execute<'a, Tx: Datastore + DeltaStore>( &self, tx: &'a Tx, + params: &impl ParamResolver, metrics: &mut ExecutionMetrics, f: &mut dyn FnMut(Tuple<'a>) -> Result<()>, ) -> Result<()> { @@ -737,12 +741,12 @@ impl PipelinedIxJoin { let iter_rhs = |u: &Tuple, bytes_scanned: &mut usize, f: &mut dyn FnMut(Tuple<'a>) -> Result<()>| -> Result<()> { - let key = eval_probe(&self.probe, u, bytes_scanned); + let key = eval_probe(&self.probe, u, params, bytes_scanned); for_each_index_point(tx, self.source, self.rhs_table, self.rhs_index, &key, f) }; let probe_rhs = |u: &Tuple, bytes_scanned: &mut usize| -> Result<_> { - let key = eval_probe(&self.probe, u, bytes_scanned); + let key = eval_probe(&self.probe, u, params, bytes_scanned); first_index_point(tx, self.source, self.rhs_table, self.rhs_index, &key) }; @@ -750,7 +754,7 @@ impl PipelinedIxJoin { (true, Semi::Lhs) => { // Should we evaluate the lhs tuple? // Probe the index to see if there is a matching row. - self.lhs.execute(tx, metrics, &mut |u| { + self.lhs.execute(tx, params, metrics, &mut |u| { n += 1; index_seeks += 1; if probe_rhs(&u, &mut bytes_scanned)?.is_some() { @@ -761,7 +765,7 @@ impl PipelinedIxJoin { } (true, Semi::Rhs) => { // Probe the index and evaluate the matching rhs row - self.lhs.execute(tx, metrics, &mut |u| { + self.lhs.execute(tx, params, metrics, &mut |u| { n += 1; index_seeks += 1; if let Some(v) = probe_rhs(&u, &mut bytes_scanned)? { @@ -772,7 +776,7 @@ impl PipelinedIxJoin { } (true, Semi::All) => { // Probe the index and evaluate the matching rhs row - self.lhs.execute(tx, metrics, &mut |u| { + self.lhs.execute(tx, params, metrics, &mut |u| { n += 1; index_seeks += 1; if let Some(v) = probe_rhs(&u, &mut bytes_scanned)? { @@ -784,7 +788,7 @@ impl PipelinedIxJoin { (false, Semi::Lhs) => { // How many times should we evaluate the lhs tuple? // Probe the index for the number of matching rows. - self.lhs.execute(tx, metrics, &mut |u| { + self.lhs.execute(tx, params, metrics, &mut |u| { n += 1; index_seeks += 1; iter_rhs(&u, &mut bytes_scanned, &mut |_| f(u.clone()))?; @@ -793,7 +797,7 @@ impl PipelinedIxJoin { } (false, Semi::Rhs) => { // Probe the index and evaluate the matching rhs rows - self.lhs.execute(tx, metrics, &mut |u| { + self.lhs.execute(tx, params, metrics, &mut |u| { n += 1; index_seeks += 1; iter_rhs(&u, &mut bytes_scanned, f)?; @@ -802,7 +806,7 @@ impl PipelinedIxJoin { } (false, Semi::All) => { // Probe the index and evaluate the matching rhs rows - self.lhs.execute(tx, metrics, &mut |u| { + self.lhs.execute(tx, params, metrics, &mut |u| { n += 1; index_seeks += 1; iter_rhs(&u, &mut bytes_scanned, &mut |v| f(u.clone().join(v)))?; @@ -839,6 +843,7 @@ impl BlockingHashJoin { pub fn execute<'a, Tx: Datastore + DeltaStore>( &self, tx: &'a Tx, + params: &impl ParamResolver, metrics: &mut ExecutionMetrics, f: &mut dyn FnMut(Tuple<'a>) -> Result<()>, ) -> Result<()> { @@ -854,7 +859,7 @@ impl BlockingHashJoin { semijoin: Semi::Lhs, } => { let mut rhs_table = HashSet::new(); - rhs.execute(tx, metrics, &mut |v| { + rhs.execute(tx, params, metrics, &mut |v| { rhs_table.insert(project(&v, rhs_field, &mut bytes_scanned)); Ok(()) })?; @@ -862,7 +867,7 @@ impl BlockingHashJoin { // How many rows did we pull from the rhs? n += rhs_table.len(); - lhs.execute(tx, metrics, &mut |u| { + lhs.execute(tx, params, metrics, &mut |u| { n += 1; if rhs_table.contains(&project(&u, lhs_field, &mut bytes_scanned)) { f(u)?; @@ -879,7 +884,7 @@ impl BlockingHashJoin { semijoin: Semi::Rhs, } => { let mut rhs_table = HashMap::new(); - rhs.execute(tx, metrics, &mut |v| { + rhs.execute(tx, params, metrics, &mut |v| { rhs_table.insert(project(&v, rhs_field, &mut bytes_scanned), v); Ok(()) })?; @@ -887,7 +892,7 @@ impl BlockingHashJoin { // How many rows did we pull from the rhs? n += rhs_table.len(); - lhs.execute(tx, metrics, &mut |u| { + lhs.execute(tx, params, metrics, &mut |u| { n += 1; if let Some(v) = rhs_table.get(&project(&u, lhs_field, &mut bytes_scanned)) { f(v.clone())?; @@ -904,7 +909,7 @@ impl BlockingHashJoin { semijoin: Semi::All, } => { let mut rhs_table = HashMap::new(); - rhs.execute(tx, metrics, &mut |v| { + rhs.execute(tx, params, metrics, &mut |v| { rhs_table.insert(project(&v, rhs_field, &mut bytes_scanned), v); Ok(()) })?; @@ -912,7 +917,7 @@ impl BlockingHashJoin { // How many rows did we pull from the rhs? n += rhs_table.len(); - lhs.execute(tx, metrics, &mut |u| { + lhs.execute(tx, params, metrics, &mut |u| { n += 1; if let Some(v) = rhs_table.get(&project(&u, lhs_field, &mut bytes_scanned)) { f(u.clone().join(v.clone()))?; @@ -929,7 +934,7 @@ impl BlockingHashJoin { semijoin: Semi::Lhs, } => { let mut rhs_table = HashMap::new(); - rhs.execute(tx, metrics, &mut |v| { + rhs.execute(tx, params, metrics, &mut |v| { n += 1; rhs_table .entry(project(&v, rhs_field, &mut bytes_scanned)) @@ -937,7 +942,7 @@ impl BlockingHashJoin { .or_insert(1); Ok(()) })?; - lhs.execute(tx, metrics, &mut |u| { + lhs.execute(tx, params, metrics, &mut |u| { n += 1; if let Some(n) = rhs_table.get(&project(&u, lhs_field, &mut bytes_scanned)).copied() { for _ in 0..n { @@ -956,7 +961,7 @@ impl BlockingHashJoin { semijoin: Semi::Rhs, } => { let mut rhs_table: HashMap> = HashMap::new(); - rhs.execute(tx, metrics, &mut |v| { + rhs.execute(tx, params, metrics, &mut |v| { n += 1; let key = project(&v, rhs_field, &mut bytes_scanned); if let Some(tuples) = rhs_table.get_mut(&key) { @@ -966,7 +971,7 @@ impl BlockingHashJoin { } Ok(()) })?; - lhs.execute(tx, metrics, &mut |u| { + lhs.execute(tx, params, metrics, &mut |u| { n += 1; if let Some(rhs_tuples) = rhs_table.get(&project(&u, lhs_field, &mut bytes_scanned)) { for v in rhs_tuples { @@ -985,7 +990,7 @@ impl BlockingHashJoin { semijoin: Semi::All, } => { let mut rhs_table: HashMap> = HashMap::new(); - rhs.execute(tx, metrics, &mut |v| { + rhs.execute(tx, params, metrics, &mut |v| { n += 1; let key = project(&v, rhs_field, &mut bytes_scanned); if let Some(tuples) = rhs_table.get_mut(&key) { @@ -995,7 +1000,7 @@ impl BlockingHashJoin { } Ok(()) })?; - lhs.execute(tx, metrics, &mut |u| { + lhs.execute(tx, params, metrics, &mut |u| { n += 1; if let Some(rhs_tuples) = rhs_table.get(&project(&u, lhs_field, &mut bytes_scanned)) { for v in rhs_tuples { @@ -1030,11 +1035,12 @@ impl BlockingNLJoin { pub fn execute<'a, Tx: Datastore + DeltaStore>( &self, tx: &'a Tx, + params: &impl ParamResolver, metrics: &mut ExecutionMetrics, f: &mut dyn FnMut(Tuple<'a>) -> Result<()>, ) -> Result<()> { let mut rhs = vec![]; - self.rhs.execute(tx, metrics, &mut |v| { + self.rhs.execute(tx, params, metrics, &mut |v| { rhs.push(v); Ok(()) })?; @@ -1042,7 +1048,7 @@ impl BlockingNLJoin { // How many rows did we pull from the rhs? let mut n = rhs.len(); - self.lhs.execute(tx, metrics, &mut |u| { + self.lhs.execute(tx, params, metrics, &mut |u| { n += 1; for v in rhs.iter() { f(u.clone().join(v.clone()))?; @@ -1071,14 +1077,15 @@ impl PipelinedFilter { pub fn execute<'a, Tx: Datastore + DeltaStore>( &self, tx: &'a Tx, + params: &impl ParamResolver, metrics: &mut ExecutionMetrics, f: &mut dyn FnMut(Tuple<'a>) -> Result<()>, ) -> Result<()> { let mut n = 0; let mut bytes_scanned = 0; - self.input.execute(tx, metrics, &mut |t| { + self.input.execute(tx, params, metrics, &mut |t| { n += 1; - if self.expr.eval_bool_with_metrics(&t, &mut bytes_scanned) { + if self.expr.eval_bool_with_params(&t, params, &mut bytes_scanned) { f(t)?; } Ok(()) @@ -1106,11 +1113,12 @@ impl PipelinedLimit { pub fn execute<'a, Tx: Datastore + DeltaStore>( &self, tx: &'a Tx, + params: &impl ParamResolver, metrics: &mut ExecutionMetrics, f: &mut dyn FnMut(Tuple<'a>) -> Result<()>, ) -> Result<()> { let mut n = 0; - self.input.execute(tx, metrics, &mut |t| { + self.input.execute(tx, params, metrics, &mut |t| { n += 1; if n <= self.limit as usize { f(t)?; diff --git a/crates/expr/src/check.rs b/crates/expr/src/check.rs index 01e151f4357..ca3bb0304f2 100644 --- a/crates/expr/src/check.rs +++ b/crates/expr/src/check.rs @@ -157,10 +157,9 @@ impl TypeChecker for SubChecker { } /// Parse and type check a subscription query -pub fn parse_and_type_sub(sql: &str, tx: &impl SchemaView, auth: &AuthCtx) -> TypingResult<(ProjectName, bool)> { +pub fn parse_and_type_sub(sql: &str, tx: &impl SchemaView, _auth: &AuthCtx) -> TypingResult<(ProjectName, bool)> { let ast = parse_subscription(sql)?; let has_param = ast.has_parameter(); - let ast = ast.resolve_sender(auth.caller()); expect_table_type(SubChecker::type_ast(ast, tx)?).map(|plan| (plan, has_param)) } diff --git a/crates/expr/src/expr.rs b/crates/expr/src/expr.rs index c9753e5c195..d0534fd9475 100644 --- a/crates/expr/src/expr.rs +++ b/crates/expr/src/expr.rs @@ -3,7 +3,7 @@ use spacetimedb_lib::{query::Delta, AlgebraicType, AlgebraicValue}; use spacetimedb_primitives::{TableId, ViewId}; use spacetimedb_sats::raw_identifier::RawIdentifier; use spacetimedb_schema::{identifier::Identifier, schema::TableOrViewSchema}; -use spacetimedb_sql_parser::ast::{BinOp, LogOp}; +use spacetimedb_sql_parser::ast::{BinOp, LogOp, Parameter}; use std::sync::Arc; pub trait CollectViews { @@ -383,6 +383,8 @@ pub enum Expr { LogOp(LogOp, Box, Box), /// A typed literal expression Value(AlgebraicValue, AlgebraicType), + /// A typed runtime parameter. + Param(Parameter, AlgebraicType), /// A field projection Field(FieldProject), } @@ -396,7 +398,7 @@ impl Expr { a.visit(f); b.visit(f); } - Self::Value(..) | Self::Field(..) => {} + Self::Value(..) | Self::Param(..) | Self::Field(..) => {} } } @@ -408,7 +410,7 @@ impl Expr { a.visit_mut(f); b.visit_mut(f); } - Self::Value(..) | Self::Field(..) => {} + Self::Value(..) | Self::Param(..) | Self::Field(..) => {} } } @@ -426,7 +428,7 @@ impl Expr { pub fn ty(&self) -> &AlgebraicType { match self { Self::BinOp(..) | Self::LogOp(..) => &AlgebraicType::Bool, - Self::Value(_, ty) | Self::Field(FieldProject { ty, .. }) => ty, + Self::Value(_, ty) | Self::Param(_, ty) | Self::Field(FieldProject { ty, .. }) => ty, } } } diff --git a/crates/expr/src/lib.rs b/crates/expr/src/lib.rs index 2a6f02f5a21..e27a3b735e3 100644 --- a/crates/expr/src/lib.rs +++ b/crates/expr/src/lib.rs @@ -19,7 +19,7 @@ use spacetimedb_sats::algebraic_type::fmt::fmt_algebraic_type; use spacetimedb_sats::algebraic_value::ser::ValueSerializer; use spacetimedb_sats::uuid::Uuid; use spacetimedb_schema::schema::ColumnSchema; -use spacetimedb_sql_parser::ast::{self, BinOp, ProjectElem, SqlExpr, SqlIdent, SqlLiteral}; +use spacetimedb_sql_parser::ast::{self, BinOp, Parameter, ProjectElem, SqlExpr, SqlIdent, SqlLiteral}; use spacetimedb_sql_parser::parser::recursion; use std::{ops::Deref, str::FromStr}; @@ -99,6 +99,14 @@ fn _type_expr(vars: &Relvars, expr: SqlExpr, expected: Option<&AlgebraicType>, d parse(&v, ty).map_err(|_| InvalidLiteral::new(v.into_string(), ty))?, ty.clone(), )), + (SqlExpr::Param(Parameter::Sender), expected) => { + let ty = match expected { + Some(ty) if ty.is_identity() || ty.is_bytes() => ty.clone(), + Some(ty) => return Err(UnexpectedType::new(&AlgebraicType::identity(), ty).into()), + None => AlgebraicType::identity(), + }; + Ok(Expr::Param(Parameter::Sender, ty)) + } (SqlExpr::Field(SqlIdent(table), SqlIdent(field)), expected) => { let table_type = vars.deref().get(&*table).ok_or_else(|| Unresolved::var(&table))?; let ColumnSchema { col_pos, col_type, .. } = table_type @@ -123,7 +131,9 @@ fn _type_expr(vars: &Relvars, expr: SqlExpr, expected: Option<&AlgebraicType>, d let b = _type_expr(vars, *b, Some(&AlgebraicType::Bool), depth + 1)?; Ok(Expr::LogOp(op, Box::new(a), Box::new(b))) } - (SqlExpr::Bin(a, b, op), None | Some(AlgebraicType::Bool)) if matches!(&*a, SqlExpr::Lit(_)) => { + (SqlExpr::Bin(a, b, op), None | Some(AlgebraicType::Bool)) + if matches!(&*a, SqlExpr::Lit(_) | SqlExpr::Param(_)) => + { let b = _type_expr(vars, *b, None, depth + 1)?; let a = _type_expr(vars, *a, Some(b.ty()), depth + 1)?; if !op_supports_type(op, a.ty()) { @@ -140,9 +150,8 @@ fn _type_expr(vars: &Relvars, expr: SqlExpr, expected: Option<&AlgebraicType>, d Ok(Expr::BinOp(op, Box::new(a), Box::new(b))) } (SqlExpr::Bin(..) | SqlExpr::Log(..), Some(ty)) => Err(UnexpectedType::new(&AlgebraicType::Bool, ty).into()), - // Both unqualified names as well as parameters are syntactic constructs. - // Unqualified names are qualified and parameters are resolved before type checking. - (SqlExpr::Var(_) | SqlExpr::Param(_), _) => unreachable!(), + // Unqualified names are syntactic constructs and are qualified before type checking. + (SqlExpr::Var(_), _) => unreachable!(), } } diff --git a/crates/expr/src/rls.rs b/crates/expr/src/rls.rs index 7382bd5e630..e2bca3b7bd2 100644 --- a/crates/expr/src/rls.rs +++ b/crates/expr/src/rls.rs @@ -483,7 +483,7 @@ mod tests { def::ModuleDef, schema::{Schema, TableOrViewSchema, TableSchema}, }; - use spacetimedb_sql_parser::ast::BinOp; + use spacetimedb_sql_parser::ast::{BinOp, Parameter}; use crate::{ check::{parse_and_type_sub, test_utils::build_module_def, SchemaView}, @@ -602,7 +602,7 @@ mod tests { field: 0, ty: AlgebraicType::identity(), })), - Box::new(Expr::Value(Identity::ONE.into(), AlgebraicType::identity())) + Box::new(Expr::Param(Parameter::Sender, AlgebraicType::identity())) ) ), "users".into() @@ -649,7 +649,7 @@ mod tests { field: 0, ty: AlgebraicType::identity(), })), - Box::new(Expr::Value(Identity::ONE.into(), AlgebraicType::identity())), + Box::new(Expr::Param(Parameter::Sender, AlgebraicType::identity())), ), )), Expr::BinOp( @@ -700,7 +700,7 @@ mod tests { field: 0, ty: AlgebraicType::identity(), })), - Box::new(Expr::Value(Identity::ONE.into(), AlgebraicType::identity())), + Box::new(Expr::Param(Parameter::Sender, AlgebraicType::identity())), ), )), Expr::BinOp( diff --git a/crates/expr/src/statement.rs b/crates/expr/src/statement.rs index b7422dd031c..8083d813706 100644 --- a/crates/expr/src/statement.rs +++ b/crates/expr/src/statement.rs @@ -450,8 +450,8 @@ impl TypeChecker for SqlChecker { } } -pub fn parse_and_type_sql(sql: &str, tx: &impl SchemaView, auth: &AuthCtx) -> TypingResult { - match parse_sql(sql)?.resolve_sender(auth.caller()) { +pub fn parse_and_type_sql(sql: &str, tx: &impl SchemaView, _auth: &AuthCtx) -> TypingResult { + match parse_sql(sql)? { SqlAst::Select(ast) => Ok(Statement::Select(SqlChecker::type_ast(ast, tx)?)), SqlAst::Insert(insert) => Ok(Statement::DML(DML::Insert(type_insert(insert, tx)?))), SqlAst::Delete(delete) => Ok(Statement::DML(DML::Delete(type_delete(delete, tx)?))), diff --git a/crates/physical-plan/src/compile.rs b/crates/physical-plan/src/compile.rs index 1bda4fd65cd..9cb8d6435bb 100644 --- a/crates/physical-plan/src/compile.rs +++ b/crates/physical-plan/src/compile.rs @@ -21,6 +21,7 @@ fn compile_expr(expr: Expr, var: &mut impl VarLabel) -> PhysicalExpr { PhysicalExpr::BinOp(op, a, b) } Expr::Value(v, _) => PhysicalExpr::Value(v), + Expr::Param(param, ty) => PhysicalExpr::Param(param, ty), Expr::Field(proj) => PhysicalExpr::Field(compile_field_project(var, proj)), } } diff --git a/crates/physical-plan/src/dml.rs b/crates/physical-plan/src/dml.rs index f6a0aa1aa73..54a975591c3 100644 --- a/crates/physical-plan/src/dml.rs +++ b/crates/physical-plan/src/dml.rs @@ -5,7 +5,7 @@ use spacetimedb_expr::{ expr::{ProjectName, RelExpr, Relvar}, statement::{TableDelete, TableInsert, TableUpdate}, }; -use spacetimedb_lib::{identity::AuthCtx, AlgebraicValue, ProductValue}; +use spacetimedb_lib::{AlgebraicValue, ProductValue}; use spacetimedb_primitives::ColId; use spacetimedb_schema::schema::TableOrViewSchema; @@ -20,11 +20,11 @@ pub enum MutationPlan { impl MutationPlan { /// Optimizes the filters in updates and deletes - pub fn optimize(self, auth: &AuthCtx) -> Result { + pub fn optimize(self) -> Result { match self { Self::Insert(..) => Ok(self), - Self::Delete(plan) => Ok(Self::Delete(plan.optimize(auth)?)), - Self::Update(plan) => Ok(Self::Update(plan.optimize(auth)?)), + Self::Delete(plan) => Ok(Self::Delete(plan.optimize()?)), + Self::Update(plan) => Ok(Self::Update(plan.optimize()?)), } } } @@ -51,9 +51,9 @@ pub struct DeletePlan { impl DeletePlan { /// Optimize the filter part of the delete - fn optimize(self, auth: &AuthCtx) -> Result { + fn optimize(self) -> Result { let Self { table, filter } = self; - let filter = filter.optimize(auth)?; + let filter = filter.optimize()?; Ok(Self { table, filter }) } @@ -85,9 +85,9 @@ pub struct UpdatePlan { impl UpdatePlan { /// Optimize the filter part of the update - fn optimize(self, auth: &AuthCtx) -> Result { + fn optimize(self) -> Result { let Self { table, columns, filter } = self; - let filter = filter.optimize(auth)?; + let filter = filter.optimize()?; Ok(Self { columns, table, filter }) } diff --git a/crates/physical-plan/src/plan.rs b/crates/physical-plan/src/plan.rs index c20a8e0237f..cc1872f7955 100644 --- a/crates/physical-plan/src/plan.rs +++ b/crates/physical-plan/src/plan.rs @@ -6,10 +6,10 @@ use spacetimedb_expr::{ expr::{AggType, CollectViews}, StatementSource, }; -use spacetimedb_lib::{identity::AuthCtx, query::Delta, sats::size_of::SizeOf, AlgebraicValue, ProductValue}; +use spacetimedb_lib::{query::Delta, sats::size_of::SizeOf, AlgebraicType, AlgebraicValue, ProductValue}; use spacetimedb_primitives::{ColId, ColOrCols, ColSet, IndexId, TableId, ViewId}; use spacetimedb_schema::schema::{IndexSchema, TableSchema}; -use spacetimedb_sql_parser::ast::{BinOp, LogOp}; +use spacetimedb_sql_parser::ast::{BinOp, LogOp, Parameter}; use spacetimedb_table::table::RowRef; use std::{ borrow::Cow, @@ -26,6 +26,15 @@ use crate::rules::{ #[derive(Debug, Clone, Copy, PartialEq, Eq, From)] pub struct Label(pub usize); +/// Resolves formal parameters during expression evaluation. +/// +/// Physical plans store parameter slots. +/// Concrete variable bindings are supplied at runtime +/// and implement this minimal interface. +pub trait ParamResolver { + fn resolve_param(&self, param: Parameter, ty: &AlgebraicType) -> AlgebraicValue; +} + /// Physical plans always terminate with a projection. /// This type of projection returns row ids. /// @@ -79,11 +88,11 @@ impl CollectViews for ProjectPlan { } impl ProjectPlan { - pub fn optimize(self, auth: &AuthCtx) -> Result { + pub fn optimize(self) -> Result { match self { - Self::None(plan) => Ok(Self::None(plan.optimize(auth, vec![])?)), + Self::None(plan) => Ok(Self::None(plan.optimize(vec![])?)), Self::Name(plan, label, _) => { - let plan = plan.optimize(auth, vec![label])?; + let plan = plan.optimize(vec![label])?; let n = plan.nfields(); let pos = plan.position(&label); Ok(match n { @@ -164,15 +173,13 @@ pub enum ProjectListPlan { } impl ProjectListPlan { - pub fn optimize(self, auth: &AuthCtx) -> Result { + pub fn optimize(self) -> Result { match self { Self::Name(plan) => Ok(Self::Name( - plan.into_iter() - .map(|plan| plan.optimize(auth)) - .collect::>()?, + plan.into_iter().map(|plan| plan.optimize()).collect::>()?, )), Self::Limit(plan, n) => { - let mut limit = Self::Limit(Box::new(plan.optimize(auth)?), n); + let mut limit = Self::Limit(Box::new(plan.optimize()?), n); // Merge a limit with a scan if possible if PushLimit::matches(&limit).is_some() { limit = PushLimit::rewrite(limit, ())?; @@ -181,7 +188,7 @@ impl ProjectListPlan { } Self::Agg(plan, agg_type) => Ok(Self::Agg( plan.into_iter() - .map(|plan| plan.optimize(auth, vec![])) + .map(|plan| plan.optimize(vec![])) .collect::>()?, agg_type, )), @@ -191,7 +198,7 @@ impl ProjectListPlan { // Collect the names of the relvars let labels = fields.iter().map(|field| field.label).collect(); // Optimize each plan - let optimized_plan = plan.optimize(auth, labels)?; + let optimized_plan = plan.optimize(labels)?; // Compute the position of each relvar referenced in the projection for TupleField { label, label_pos, .. } in &mut fields { *label_pos = optimized_plan.position(label); @@ -473,9 +480,9 @@ impl PhysicalPlan { /// 3. Turn filters into index scans if possible /// 4. Determine index and semijoins /// 5. Compute positions for tuple labels - pub fn optimize(self, auth: &AuthCtx, reqs: Vec