From dffb89fb5d27b041c553ee63a6f44b3bd42daef3 Mon Sep 17 00:00:00 2001 From: Jeffrey Dallatezza Date: Thu, 13 Nov 2025 12:23:14 -0800 Subject: [PATCH 01/16] Add types for the ViewReturnValue --- crates/lib/src/db/raw_def/v9.rs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/crates/lib/src/db/raw_def/v9.rs b/crates/lib/src/db/raw_def/v9.rs index bc6fc5ba77f..e2d32418d74 100644 --- a/crates/lib/src/db/raw_def/v9.rs +++ b/crates/lib/src/db/raw_def/v9.rs @@ -484,6 +484,25 @@ pub struct RawViewDefV9 { pub return_type: AlgebraicType, } +#[derive(Debug, Clone, SpacetimeType)] +// TODO: Is there a module-only crate I can use? +#[sats(crate = crate)] +pub enum ViewReturnValue { + RowData, + RawSql(String), + ParameterizedQuery(ParameterizedQuery), +} + +#[derive(Debug, Clone, SpacetimeType)] +// TODO: Is there a module-only crate I can use? +#[sats(crate = crate)] +pub struct ParameterizedQuery { + pub template: String, + pub parameter_values: Vec, + // This is optional to support parameter inference in the future. + pub parameter_types: Option, +} + /// A reducer definition. #[derive(Debug, Clone, SpacetimeType)] #[sats(crate = crate)] From 65310462eab15c85f09c58068c3f26e5243c3495 Mon Sep 17 00:00:00 2001 From: Jeffrey Dallatezza Date: Thu, 13 Nov 2025 15:37:29 -0800 Subject: [PATCH 02/16] About to regenerate --- .../bindings-typescript/src/server/runtime.ts | 6 +- .../bindings-typescript/src/server/sys.d.ts | 4 +- crates/core/src/host/v8/syscall/v1.rs | 74 +++++++++++++++++-- crates/lib/src/db/raw_def/v9.rs | 2 +- 4 files changed, 76 insertions(+), 10 deletions(-) diff --git a/crates/bindings-typescript/src/server/runtime.ts b/crates/bindings-typescript/src/server/runtime.ts index f39e6bdb034..891e0d7dff0 100644 --- a/crates/bindings-typescript/src/server/runtime.ts +++ b/crates/bindings-typescript/src/server/runtime.ts @@ -237,7 +237,8 @@ export const hooks_v1_1: import('spacetime:sys@1.1').ModuleHooks = { const ret = fn(ctx, args); const retBuf = new BinaryWriter(returnTypeBaseSize); AlgebraicType.serializeValue(retBuf, returnType, ret, MODULE_DEF.typespace); - return retBuf.getBuffer(); + // return retBuf.getBuffer(); + return { 'data': retBuf.getBuffer() }; }, __call_view_anon__(id, argsBuf) { const { fn, params, returnType, returnTypeBaseSize } = ANON_VIEWS[id]; @@ -255,7 +256,8 @@ export const hooks_v1_1: import('spacetime:sys@1.1').ModuleHooks = { const ret = fn(ctx, args); const retBuf = new BinaryWriter(returnTypeBaseSize); AlgebraicType.serializeValue(retBuf, returnType, ret, MODULE_DEF.typespace); - return retBuf.getBuffer(); + // return retBuf.getBuffer(); + return { 'data': retBuf.getBuffer()}; }, }; diff --git a/crates/bindings-typescript/src/server/sys.d.ts b/crates/bindings-typescript/src/server/sys.d.ts index 8a102453d03..253f9de0d1b 100644 --- a/crates/bindings-typescript/src/server/sys.d.ts +++ b/crates/bindings-typescript/src/server/sys.d.ts @@ -69,8 +69,8 @@ declare module 'spacetime:sys@1.0' { declare module 'spacetime:sys@1.1' { export type ModuleHooks = { - __call_view__(id: u32, sender: u256, args: Uint8Array): Uint8Array; - __call_view_anon__(id: u32, args: Uint8Array): Uint8Array; + __call_view__(id: u32, sender: u256, args: Uint8Array): Uint8Array | Object; + __call_view_anon__(id: u32, args: Uint8Array): Uint8Array | Object; }; export function register_hooks(hooks: ModuleHooks); diff --git a/crates/core/src/host/v8/syscall/v1.rs b/crates/core/src/host/v8/syscall/v1.rs index aa3abccde20..3d89acf7ff0 100644 --- a/crates/core/src/host/v8/syscall/v1.rs +++ b/crates/core/src/host/v8/syscall/v1.rs @@ -19,6 +19,8 @@ use crate::host::wasm_common::{err_to_errno_and_log, RowIterIdx, TimingSpan, Tim use crate::host::AbiCall; use anyhow::Context; use bytes::Bytes; +use log::info; +use spacetimedb_lib::db::raw_def::v9::ViewReturnValue; use spacetimedb_lib::{bsatn, ConnectionId, Identity, RawModuleDef}; use spacetimedb_primitives::{errno, ColId, IndexId, ReducerId, TableId, ViewFnPtr}; use spacetimedb_sats::Serialize; @@ -421,6 +423,7 @@ pub(super) fn call_call_view( hooks: &HookFunctions<'_>, op: ViewOp<'_>, ) -> Result> { + info!("call_call_view"); let fun = hooks.call_view.context("`__call_view__` was never defined")?; let ViewOp { @@ -441,8 +444,39 @@ pub(super) fn call_call_view( // Call the function. let ret = call_free_fun(scope, fun, args)?; - // Deserialize the user result. - let ret = cast!(scope, ret, v8::Uint8Array, "bytes return from `__call_view__`").map_err(|e| e.throw(scope))?; + if ret.is_typed_array() && ret.is_uint8_array() { + log::info!("Returning with the original version"); + // This is the original format, which just returns the raw bytes. + let ret = + cast!(scope, ret, v8::Uint8Array, "bytes return from `__call_view_anon__`").map_err(|e| e.throw(scope))?; + let bytes = ret.get_contents(&mut []); + let modified_return_value = ViewReturnValue::RowData(bytes.to_vec()); + + let as_expected = bsatn::to_vec(&modified_return_value).map_err(|e| ErrorOrException::Err(e.into()))?; + + let b = Bytes::from_owner(as_expected); + // We are pretending this was sent with the new format. + return Ok(b); + + // TODO: We should do something to convert this into the new result format. + // return Ok(Bytes::copy_from_slice(bytes)) + }; + + log::info!("Using the object version"); + + // let ret = v8::Local::::try_from(ret)?; + let ret = cast!(scope, ret, v8::Object, "object return from `__call_view_anon__`").map_err(|e| e.throw(scope))?; + + let data_key = v8::String::new(scope, "data").unwrap(); + let data_val = ret.get(scope, data_key.into()).unwrap(); + + let ret = cast!( + scope, + data_val, + v8::Uint8Array, + "bytes in the `data` field returned from `__call_view_anon__`" + ) + .map_err(|e| e.throw(scope))?; let bytes = ret.get_contents(&mut []); Ok(Bytes::copy_from_slice(bytes)) @@ -454,6 +488,7 @@ pub(super) fn call_call_view_anon( hooks: &HookFunctions<'_>, op: AnonymousViewOp<'_>, ) -> Result> { + info!("call_call_view_anon"); let fun = hooks.call_view_anon.context("`__call_view__` was never defined")?; let AnonymousViewOp { @@ -472,9 +507,38 @@ pub(super) fn call_call_view_anon( // Call the function. let ret = call_free_fun(scope, fun, args)?; - // Deserialize the user result. - let ret = - cast!(scope, ret, v8::Uint8Array, "bytes return from `__call_view_anon__`").map_err(|e| e.throw(scope))?; + if ret.is_typed_array() && ret.is_uint8_array() { + log::info!("Returning with the original version"); + // This is the original format, which just returns the raw bytes. + let ret = + cast!(scope, ret, v8::Uint8Array, "bytes return from `__call_view_anon__`").map_err(|e| e.throw(scope))?; + let bytes = ret.get_contents(&mut []); + + // TODO: We should do something to convert this into the new result format. + return Ok(Bytes::copy_from_slice(bytes)); + }; + + log::info!("Using the object version"); + + // let ret = v8::Local::::try_from(ret)?; + let ret = cast!( + scope, + ret, + v8::Object, + "bytes or object return from `__call_view_anon__`" + ) + .map_err(|e| e.throw(scope))?; + + let data_key = v8::String::new(scope, "data_v1").unwrap(); + let data_val = ret.get(scope, data_key.into()).unwrap(); + + let ret = cast!( + scope, + data_val, + v8::Uint8Array, + "bytes in the `data` field returned from `__call_view_anon__`" + ) + .map_err(|e| e.throw(scope))?; let bytes = ret.get_contents(&mut []); Ok(Bytes::copy_from_slice(bytes)) diff --git a/crates/lib/src/db/raw_def/v9.rs b/crates/lib/src/db/raw_def/v9.rs index e2d32418d74..719d4d708a9 100644 --- a/crates/lib/src/db/raw_def/v9.rs +++ b/crates/lib/src/db/raw_def/v9.rs @@ -488,7 +488,7 @@ pub struct RawViewDefV9 { // TODO: Is there a module-only crate I can use? #[sats(crate = crate)] pub enum ViewReturnValue { - RowData, + RowData(Vec), RawSql(String), ParameterizedQuery(ParameterizedQuery), } From 12a0cb25ff64c8864feea7303733d1725a60cbe7 Mon Sep 17 00:00:00 2001 From: Jeffrey Dallatezza Date: Fri, 14 Nov 2025 15:46:35 -0800 Subject: [PATCH 03/16] Change ViewReturnValue to ViewResultHeader, so dynamically encoded types can follow --- .../examples/regen-typescript-moduledef.rs | 2 ++ crates/lib/src/db/raw_def/v9.rs | 18 ++++++++++++------ 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/crates/codegen/examples/regen-typescript-moduledef.rs b/crates/codegen/examples/regen-typescript-moduledef.rs index b47162d15f3..971212ee224 100644 --- a/crates/codegen/examples/regen-typescript-moduledef.rs +++ b/crates/codegen/examples/regen-typescript-moduledef.rs @@ -4,6 +4,7 @@ use fs_err as fs; use regex::Regex; use spacetimedb_codegen::{generate, typescript, OutputFile}; +use spacetimedb_lib::db::raw_def::v9::ViewResultHeader; use spacetimedb_lib::{RawModuleDef, RawModuleDefV8}; use spacetimedb_schema::def::ModuleDef; use std::path::Path; @@ -20,6 +21,7 @@ macro_rules! regex_replace { fn main() -> anyhow::Result<()> { let module = RawModuleDefV8::with_builder(|module| { module.add_type::(); + module.add_type::(); }); let dir = &Path::new(concat!( diff --git a/crates/lib/src/db/raw_def/v9.rs b/crates/lib/src/db/raw_def/v9.rs index 719d4d708a9..1af309155a1 100644 --- a/crates/lib/src/db/raw_def/v9.rs +++ b/crates/lib/src/db/raw_def/v9.rs @@ -485,20 +485,26 @@ pub struct RawViewDefV9 { } #[derive(Debug, Clone, SpacetimeType)] -// TODO: Is there a module-only crate I can use? #[sats(crate = crate)] -pub enum ViewReturnValue { - RowData(Vec), +pub enum ViewResultHeader { + // This means the row data will follow, as a bsatn-encoded Vec. + // We could make RowData contain an Vec of the bytes, but that forces us to make an extra copy when we serialize and + // when we deserialize. + RowData, + // This means we the view wants to return the results of the sql query. RawSql(String), + // This means we the view wants to return the results of a sql query. + // Any query parameters will follow the header. ParameterizedQuery(ParameterizedQuery), } #[derive(Debug, Clone, SpacetimeType)] -// TODO: Is there a module-only crate I can use? +// A header for a parameterized query. This should be followed by a bsatn encoding of any parameters. #[sats(crate = crate)] -pub struct ParameterizedQuery { +pub struct ParameterizedQueryHeader { + // The sql query template. Add details on parameter syntax when it is supported. pub template: String, - pub parameter_values: Vec, + // If set, these are the types of the parameters. // This is optional to support parameter inference in the future. pub parameter_types: Option, } From a76cc17df7c9c6b668155169625ac06dcc921407 Mon Sep 17 00:00:00 2001 From: Jeffrey Dallatezza Date: Tue, 18 Nov 2025 09:38:15 -0800 Subject: [PATCH 04/16] Add new view return type --- crates/core/src/host/v8/mod.rs | 2 +- crates/core/src/host/v8/syscall/mod.rs | 7 +- crates/core/src/host/v8/syscall/v1.rs | 41 ++++------ .../src/host/wasm_common/module_host_actor.rs | 74 ++++++++++++++++--- .../core/src/host/wasmtime/wasmtime_module.rs | 5 +- crates/lib/src/db/raw_def/v9.rs | 2 +- 6 files changed, 87 insertions(+), 44 deletions(-) diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index 3e962fa4853..2dffbb8dea1 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -664,7 +664,7 @@ fn common_call<'scope, R, O, F>( budget: FunctionBudget, op: O, call: F, -) -> ExecutionResult> +) -> ExecutionResult where O: InstanceOp, F: FnOnce(&mut PinScope<'scope, '_>, O) -> Result>, diff --git a/crates/core/src/host/v8/syscall/mod.rs b/crates/core/src/host/v8/syscall/mod.rs index 25afa0f4eac..b59291e3875 100644 --- a/crates/core/src/host/v8/syscall/mod.rs +++ b/crates/core/src/host/v8/syscall/mod.rs @@ -1,11 +1,10 @@ -use bytes::Bytes; use spacetimedb_lib::{RawModuleDef, VersionTuple}; use v8::{callback_scope, Context, FixedArray, Local, Module, PinScope}; use crate::host::v8::de::scratch_buf; use crate::host::v8::error::{ErrorOrException, ExcResult, ExceptionThrown, Throwable, TypeError}; use crate::host::wasm_common::abi::parse_abi_version; -use crate::host::wasm_common::module_host_actor::{AnonymousViewOp, ReducerOp, ReducerResult, ViewOp}; +use crate::host::wasm_common::module_host_actor::{AnonymousViewOp, ReducerOp, ReducerResult, ViewOp, ViewReturnData}; mod hooks; mod v1; @@ -84,7 +83,7 @@ pub(super) fn call_call_view( scope: &mut PinScope<'_, '_>, hooks: &HookFunctions<'_>, op: ViewOp<'_>, -) -> Result> { +) -> Result> { match hooks.abi { AbiVersion::V1 => v1::call_call_view(scope, hooks, op), } @@ -97,7 +96,7 @@ pub(super) fn call_call_view_anon( scope: &mut PinScope<'_, '_>, hooks: &HookFunctions<'_>, op: AnonymousViewOp<'_>, -) -> Result> { +) -> Result> { match hooks.abi { AbiVersion::V1 => v1::call_call_view_anon(scope, hooks, op), } diff --git a/crates/core/src/host/v8/syscall/v1.rs b/crates/core/src/host/v8/syscall/v1.rs index 3d89acf7ff0..7bae33dc138 100644 --- a/crates/core/src/host/v8/syscall/v1.rs +++ b/crates/core/src/host/v8/syscall/v1.rs @@ -14,13 +14,13 @@ use crate::host::v8::{ TerminationError, Throwable, }; use crate::host::wasm_common::instrumentation::span; -use crate::host::wasm_common::module_host_actor::{AnonymousViewOp, ReducerOp, ReducerResult, ViewOp}; +use crate::host::wasm_common::module_host_actor::{ + AnonymousViewOp, ReducerOp, ReducerResult, ViewOp, ViewResultFormat, ViewReturnData, +}; use crate::host::wasm_common::{err_to_errno_and_log, RowIterIdx, TimingSpan, TimingSpanIdx}; use crate::host::AbiCall; use anyhow::Context; use bytes::Bytes; -use log::info; -use spacetimedb_lib::db::raw_def::v9::ViewReturnValue; use spacetimedb_lib::{bsatn, ConnectionId, Identity, RawModuleDef}; use spacetimedb_primitives::{errno, ColId, IndexId, ReducerId, TableId, ViewFnPtr}; use spacetimedb_sats::Serialize; @@ -422,8 +422,7 @@ pub(super) fn call_call_view( scope: &mut PinScope<'_, '_>, hooks: &HookFunctions<'_>, op: ViewOp<'_>, -) -> Result> { - info!("call_call_view"); +) -> Result> { let fun = hooks.call_view.context("`__call_view__` was never defined")?; let ViewOp { @@ -444,29 +443,20 @@ pub(super) fn call_call_view( // Call the function. let ret = call_free_fun(scope, fun, args)?; + // The original version returned a byte array with the encoded rows. if ret.is_typed_array() && ret.is_uint8_array() { - log::info!("Returning with the original version"); // This is the original format, which just returns the raw bytes. let ret = cast!(scope, ret, v8::Uint8Array, "bytes return from `__call_view_anon__`").map_err(|e| e.throw(scope))?; let bytes = ret.get_contents(&mut []); - let modified_return_value = ViewReturnValue::RowData(bytes.to_vec()); - - let as_expected = bsatn::to_vec(&modified_return_value).map_err(|e| ErrorOrException::Err(e.into()))?; - - let b = Bytes::from_owner(as_expected); - // We are pretending this was sent with the new format. - return Ok(b); - // TODO: We should do something to convert this into the new result format. - // return Ok(Bytes::copy_from_slice(bytes)) + return Ok(ViewReturnData::from_raw_rows(Bytes::copy_from_slice(bytes))); }; - log::info!("Using the object version"); - - // let ret = v8::Local::::try_from(ret)?; + // The newer version returns an object with a `data` field containing the bytes. let ret = cast!(scope, ret, v8::Object, "object return from `__call_view_anon__`").map_err(|e| e.throw(scope))?; + // TODO: don't use unwrap. let data_key = v8::String::new(scope, "data").unwrap(); let data_val = ret.get(scope, data_key.into()).unwrap(); @@ -479,7 +469,7 @@ pub(super) fn call_call_view( .map_err(|e| e.throw(scope))?; let bytes = ret.get_contents(&mut []); - Ok(Bytes::copy_from_slice(bytes)) + Ok(ViewReturnData::with_header(Bytes::copy_from_slice(bytes))) } /// Calls the `__call_view_anon__` function `fun`. @@ -487,8 +477,7 @@ pub(super) fn call_call_view_anon( scope: &mut PinScope<'_, '_>, hooks: &HookFunctions<'_>, op: AnonymousViewOp<'_>, -) -> Result> { - info!("call_call_view_anon"); +) -> Result> { let fun = hooks.call_view_anon.context("`__call_view__` was never defined")?; let AnonymousViewOp { @@ -508,14 +497,16 @@ pub(super) fn call_call_view_anon( let ret = call_free_fun(scope, fun, args)?; if ret.is_typed_array() && ret.is_uint8_array() { - log::info!("Returning with the original version"); // This is the original format, which just returns the raw bytes. let ret = cast!(scope, ret, v8::Uint8Array, "bytes return from `__call_view_anon__`").map_err(|e| e.throw(scope))?; let bytes = ret.get_contents(&mut []); - // TODO: We should do something to convert this into the new result format. - return Ok(Bytes::copy_from_slice(bytes)); + // We are pretending this was sent with the new format. + return Ok(ViewReturnData::new( + ViewResultFormat::Rows, + Bytes::copy_from_slice(bytes), + )); }; log::info!("Using the object version"); @@ -541,7 +532,7 @@ pub(super) fn call_call_view_anon( .map_err(|e| e.throw(scope))?; let bytes = ret.get_contents(&mut []); - Ok(Bytes::copy_from_slice(bytes)) + Ok(ViewReturnData::with_header(Bytes::copy_from_slice(bytes))) } /// Calls the registered `__describe_module__` function hook. diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 30d5819a303..1447d6e9041 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -153,17 +153,57 @@ pub enum ExecutionError { } #[derive(derive_more::AsRef)] -pub struct ExecutionResult { +pub struct ExecutionResult { #[as_ref] pub stats: ExecutionStats, - pub call_result: T, + pub call_result: Result, } -pub type ReducerExecuteResult = ExecutionResult>; +// pub type ReducerExecuteResult = ExecutionResult>; +pub type ReducerExecuteResult = ExecutionResult<(), ExecutionError>; + +// What format is the view using? +// In the initial version of views, they returned the rows directly. +// Views can now return multiple formats (rows or queries), so we use this to determine how to interpret the view result. +#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)] +pub enum ViewResultFormat { + // This view returns a Vec of rows (bsatn encoded). + Rows, + // This view returns a ViewResultHeader, potentially followed by more data. + HeaderFirst, +} + +pub struct ViewReturnData { + // How the bytes returned by the view should be interpreted. + pub format: ViewResultFormat, + // The actual bytes returned by the view. + pub data: Bytes, +} -pub type ViewExecuteResult = ExecutionResult>; +impl ViewReturnData { + pub fn new(format: ViewResultFormat, data: Bytes) -> Self { + Self { format, data } + } + + pub fn from_raw_rows(data: Bytes) -> Self { + Self { + format: ViewResultFormat::Rows, + data, + } + } + + pub fn with_header(data: Bytes) -> Self { + Self { + format: ViewResultFormat::HeaderFirst, + data, + } + } +} -pub type ProcedureExecuteResult = ExecutionResult>; +// pub type ViewExecuteResult = ExecutionResult; +pub type ViewExecuteResult = ExecutionResult; + +pub type ProcedureExecuteResult = ExecutionResult; pub struct WasmModuleHostActor { module: T::InstancePre, @@ -914,21 +954,33 @@ impl InstanceCommon { } // Materialize anonymous view (Ok(bytes), None) => { - stdb.materialize_anonymous_view(&mut tx, table_id, row_type, bytes, self.info.module_def.typespace()) - .inspect_err(|err| { - log::error!("Fatal error materializing view `{view_name}`: {err}"); - }) - .expect("Fatal error materializing view"); + if bytes.format != ViewResultFormat::Rows { + unimplemented!("View returned a non-rows format"); + } + stdb.materialize_anonymous_view( + &mut tx, + table_id, + row_type, + bytes.data, + self.info.module_def.typespace(), + ) + .inspect_err(|err| { + log::error!("Fatal error materializing view `{view_name}`: {err}"); + }) + .expect("Fatal error materializing view"); ViewOutcome::Success } // Materialize sender view (Ok(bytes), Some(sender)) => { + if bytes.format != ViewResultFormat::Rows { + unimplemented!("View returned a non-rows format"); + } stdb.materialize_view( &mut tx, table_id, sender, row_type, - bytes, + bytes.data, self.info.module_def.typespace(), ) .inspect_err(|err| { diff --git a/crates/core/src/host/wasmtime/wasmtime_module.rs b/crates/core/src/host/wasmtime/wasmtime_module.rs index 23647ddc773..01726d8e81f 100644 --- a/crates/core/src/host/wasmtime/wasmtime_module.rs +++ b/crates/core/src/host/wasmtime/wasmtime_module.rs @@ -9,6 +9,7 @@ use crate::host::instance_env::{InstanceEnv, TxSlot}; use crate::host::module_common::run_describer; use crate::host::wasm_common::module_host_actor::{ AnonymousViewOp, DescribeError, ExecutionError, ExecutionStats, InitializationError, InstanceOp, ViewOp, + ViewReturnData, }; use crate::host::wasm_common::*; use crate::replica_context::ReplicaContext; @@ -452,7 +453,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { let call_result = call_result .map_err(ExecutionError::Trap) .and_then(|code| handle_result_sink_code(code, result_bytes)) - .map(|r| r.into()); + .map(|r| ViewReturnData::new(module_host_actor::ViewResultFormat::Rows, r.into())); module_host_actor::ViewExecuteResult { stats, call_result } } @@ -491,7 +492,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { let call_result = call_result .map_err(ExecutionError::Trap) .and_then(|code| handle_result_sink_code(code, result_bytes)) - .map(|r| r.into()); + .map(|r| ViewReturnData::new(module_host_actor::ViewResultFormat::Rows, r.into())); module_host_actor::ViewExecuteResult { stats, call_result } } diff --git a/crates/lib/src/db/raw_def/v9.rs b/crates/lib/src/db/raw_def/v9.rs index 1af309155a1..0741d70b7d4 100644 --- a/crates/lib/src/db/raw_def/v9.rs +++ b/crates/lib/src/db/raw_def/v9.rs @@ -495,7 +495,7 @@ pub enum ViewResultHeader { RawSql(String), // This means we the view wants to return the results of a sql query. // Any query parameters will follow the header. - ParameterizedQuery(ParameterizedQuery), + ParameterizedQuery(ParameterizedQueryHeader), } #[derive(Debug, Clone, SpacetimeType)] From 0bffa5348e8c2aaf3efd2f60bdc0925d8e4ed036 Mon Sep 17 00:00:00 2001 From: Jeffrey Dallatezza Date: Tue, 18 Nov 2025 09:45:47 -0800 Subject: [PATCH 05/16] Change view functions to return new format --- .../core/src/host/wasmtime/wasmtime_module.rs | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/crates/core/src/host/wasmtime/wasmtime_module.rs b/crates/core/src/host/wasmtime/wasmtime_module.rs index 01726d8e81f..7ce6e66084d 100644 --- a/crates/core/src/host/wasmtime/wasmtime_module.rs +++ b/crates/core/src/host/wasmtime/wasmtime_module.rs @@ -110,6 +110,17 @@ fn handle_result_sink_code(code: i32, result: Vec) -> Result, Execut } } +/// Handle the return code from a view function using a result sink. +/// For views, we treat the return code 2 as a successful return using the header format. +fn handle_view_result_sink_code(code: i32, result: Vec) -> Result { + match code { + 0 => Ok(ViewReturnData::from_raw_rows(result.into())), + 2 => Ok(ViewReturnData::with_header(result.into())), + CALL_FAILURE => Err(ExecutionError::User(string_from_utf8_lossy_owned(result).into())), + _ => Err(ExecutionError::Recoverable(anyhow::anyhow!("unknown return code"))), + } +} + const CALL_FAILURE: i32 = HOST_CALL_FAILURE.get() as i32; /// Invoke `typed_func` and assert that it doesn't yield. @@ -452,8 +463,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { let call_result = call_result .map_err(ExecutionError::Trap) - .and_then(|code| handle_result_sink_code(code, result_bytes)) - .map(|r| ViewReturnData::new(module_host_actor::ViewResultFormat::Rows, r.into())); + .and_then(|code| handle_view_result_sink_code(code, result_bytes)); module_host_actor::ViewExecuteResult { stats, call_result } } @@ -491,8 +501,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { let call_result = call_result .map_err(ExecutionError::Trap) - .and_then(|code| handle_result_sink_code(code, result_bytes)) - .map(|r| ViewReturnData::new(module_host_actor::ViewResultFormat::Rows, r.into())); + .and_then(|code| handle_view_result_sink_code(code, result_bytes)); module_host_actor::ViewExecuteResult { stats, call_result } } From 609279693ceb32e664d92771662ab993b040c397 Mon Sep 17 00:00:00 2001 From: Jeffrey Dallatezza Date: Tue, 18 Nov 2025 10:38:51 -0800 Subject: [PATCH 06/16] Revert a change to the ts runtime --- crates/bindings-typescript/src/server/runtime.ts | 6 ++---- crates/core/src/host/wasm_common/module_host_actor.rs | 1 - 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/crates/bindings-typescript/src/server/runtime.ts b/crates/bindings-typescript/src/server/runtime.ts index 891e0d7dff0..f39e6bdb034 100644 --- a/crates/bindings-typescript/src/server/runtime.ts +++ b/crates/bindings-typescript/src/server/runtime.ts @@ -237,8 +237,7 @@ export const hooks_v1_1: import('spacetime:sys@1.1').ModuleHooks = { const ret = fn(ctx, args); const retBuf = new BinaryWriter(returnTypeBaseSize); AlgebraicType.serializeValue(retBuf, returnType, ret, MODULE_DEF.typespace); - // return retBuf.getBuffer(); - return { 'data': retBuf.getBuffer() }; + return retBuf.getBuffer(); }, __call_view_anon__(id, argsBuf) { const { fn, params, returnType, returnTypeBaseSize } = ANON_VIEWS[id]; @@ -256,8 +255,7 @@ export const hooks_v1_1: import('spacetime:sys@1.1').ModuleHooks = { const ret = fn(ctx, args); const retBuf = new BinaryWriter(returnTypeBaseSize); AlgebraicType.serializeValue(retBuf, returnType, ret, MODULE_DEF.typespace); - // return retBuf.getBuffer(); - return { 'data': retBuf.getBuffer()}; + return retBuf.getBuffer(); }, }; diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 1447d6e9041..fa2bbdd24a7 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -200,7 +200,6 @@ impl ViewReturnData { } } -// pub type ViewExecuteResult = ExecutionResult; pub type ViewExecuteResult = ExecutionResult; pub type ProcedureExecuteResult = ExecutionResult; From 90b34d04f959fa94d90902b60880a5a999cadff8 Mon Sep 17 00:00:00 2001 From: Jeffrey Dallatezza Date: Tue, 18 Nov 2025 10:58:22 -0800 Subject: [PATCH 07/16] Avoid some unwrap calls --- crates/core/src/host/v8/syscall/v1.rs | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/crates/core/src/host/v8/syscall/v1.rs b/crates/core/src/host/v8/syscall/v1.rs index 7bae33dc138..5175d7c5d95 100644 --- a/crates/core/src/host/v8/syscall/v1.rs +++ b/crates/core/src/host/v8/syscall/v1.rs @@ -456,9 +456,12 @@ pub(super) fn call_call_view( // The newer version returns an object with a `data` field containing the bytes. let ret = cast!(scope, ret, v8::Object, "object return from `__call_view_anon__`").map_err(|e| e.throw(scope))?; - // TODO: don't use unwrap. - let data_key = v8::String::new(scope, "data").unwrap(); - let data_val = ret.get(scope, data_key.into()).unwrap(); + let Some(data_key) = v8::String::new(scope, "data") else { + return Err(anyhow::anyhow!("error creating a v8 string")); + }; + let Some(data_val) = ret.get(scope, data_key.into()) else { + return Err(anyhow::anyhow!("data key not found in return object")); + }; let ret = cast!( scope, @@ -509,9 +512,6 @@ pub(super) fn call_call_view_anon( )); }; - log::info!("Using the object version"); - - // let ret = v8::Local::::try_from(ret)?; let ret = cast!( scope, ret, @@ -520,8 +520,12 @@ pub(super) fn call_call_view_anon( ) .map_err(|e| e.throw(scope))?; - let data_key = v8::String::new(scope, "data_v1").unwrap(); - let data_val = ret.get(scope, data_key.into()).unwrap(); + let Some(data_key) = v8::String::new(scope, "data") else { + return Err(anyhow::anyhow!("error creating a v8 string")); + }; + let Some(data_val) = ret.get(scope, data_key.into()) else { + return Err(anyhow::anyhow!("data key not found in return object")); + }; let ret = cast!( scope, From 5dbdbd50a16c183d2022a28176e6099c816583e7 Mon Sep 17 00:00:00 2001 From: Jeffrey Dallatezza Date: Wed, 19 Nov 2025 10:17:44 -0800 Subject: [PATCH 08/16] Cleanup --- crates/core/src/host/v8/syscall/v1.rs | 27 ++++---- .../src/host/wasm_common/module_host_actor.rs | 65 +++++-------------- .../core/src/host/wasmtime/wasmtime_module.rs | 4 +- 3 files changed, 30 insertions(+), 66 deletions(-) diff --git a/crates/core/src/host/v8/syscall/v1.rs b/crates/core/src/host/v8/syscall/v1.rs index 5175d7c5d95..7853f105269 100644 --- a/crates/core/src/host/v8/syscall/v1.rs +++ b/crates/core/src/host/v8/syscall/v1.rs @@ -14,9 +14,7 @@ use crate::host::v8::{ TerminationError, Throwable, }; use crate::host::wasm_common::instrumentation::span; -use crate::host::wasm_common::module_host_actor::{ - AnonymousViewOp, ReducerOp, ReducerResult, ViewOp, ViewResultFormat, ViewReturnData, -}; +use crate::host::wasm_common::module_host_actor::{AnonymousViewOp, ReducerOp, ReducerResult, ViewOp, ViewReturnData}; use crate::host::wasm_common::{err_to_errno_and_log, RowIterIdx, TimingSpan, TimingSpanIdx}; use crate::host::AbiCall; use anyhow::Context; @@ -450,17 +448,19 @@ pub(super) fn call_call_view( cast!(scope, ret, v8::Uint8Array, "bytes return from `__call_view_anon__`").map_err(|e| e.throw(scope))?; let bytes = ret.get_contents(&mut []); - return Ok(ViewReturnData::from_raw_rows(Bytes::copy_from_slice(bytes))); + return Ok(ViewReturnData::Rows(Bytes::copy_from_slice(bytes))); }; // The newer version returns an object with a `data` field containing the bytes. let ret = cast!(scope, ret, v8::Object, "object return from `__call_view_anon__`").map_err(|e| e.throw(scope))?; let Some(data_key) = v8::String::new(scope, "data") else { - return Err(anyhow::anyhow!("error creating a v8 string")); + return Err(ErrorOrException::Err(anyhow::anyhow!("error creating a v8 string"))); }; let Some(data_val) = ret.get(scope, data_key.into()) else { - return Err(anyhow::anyhow!("data key not found in return object")); + return Err(ErrorOrException::Err(anyhow::anyhow!( + "data key not found in return object" + ))); }; let ret = cast!( @@ -472,7 +472,7 @@ pub(super) fn call_call_view( .map_err(|e| e.throw(scope))?; let bytes = ret.get_contents(&mut []); - Ok(ViewReturnData::with_header(Bytes::copy_from_slice(bytes))) + Ok(ViewReturnData::HeaderFirst(Bytes::copy_from_slice(bytes))) } /// Calls the `__call_view_anon__` function `fun`. @@ -506,10 +506,7 @@ pub(super) fn call_call_view_anon( let bytes = ret.get_contents(&mut []); // We are pretending this was sent with the new format. - return Ok(ViewReturnData::new( - ViewResultFormat::Rows, - Bytes::copy_from_slice(bytes), - )); + return Ok(ViewReturnData::Rows(Bytes::copy_from_slice(bytes))); }; let ret = cast!( @@ -521,10 +518,12 @@ pub(super) fn call_call_view_anon( .map_err(|e| e.throw(scope))?; let Some(data_key) = v8::String::new(scope, "data") else { - return Err(anyhow::anyhow!("error creating a v8 string")); + return Err(ErrorOrException::Err(anyhow::anyhow!("error creating a v8 string"))); }; let Some(data_val) = ret.get(scope, data_key.into()) else { - return Err(anyhow::anyhow!("data key not found in return object")); + return Err(ErrorOrException::Err(anyhow::anyhow!( + "data key not found in return object" + ))); }; let ret = cast!( @@ -536,7 +535,7 @@ pub(super) fn call_call_view_anon( .map_err(|e| e.throw(scope))?; let bytes = ret.get_contents(&mut []); - Ok(ViewReturnData::with_header(Bytes::copy_from_slice(bytes))) + Ok(ViewReturnData::HeaderFirst(Bytes::copy_from_slice(bytes))) } /// Calls the registered `__describe_module__` function hook. diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index f0a1d183191..887c92118ca 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -159,42 +159,13 @@ pub struct ExecutionResult { // pub type ReducerExecuteResult = ExecutionResult>; pub type ReducerExecuteResult = ExecutionResult<(), ExecutionError>; -// What format is the view using? -// In the initial version of views, they returned the rows directly. -// Views can now return multiple formats (rows or queries), so we use this to determine how to interpret the view result. -#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)] -pub enum ViewResultFormat { +// The original version of views used a different return format (it returned the rows directly). +// The newer version uses ViewReturnData to represent the different formats. +pub enum ViewReturnData { // This view returns a Vec of rows (bsatn encoded). - Rows, + Rows(Bytes), // This view returns a ViewResultHeader, potentially followed by more data. - HeaderFirst, -} - -pub struct ViewReturnData { - // How the bytes returned by the view should be interpreted. - pub format: ViewResultFormat, - // The actual bytes returned by the view. - pub data: Bytes, -} - -impl ViewReturnData { - pub fn new(format: ViewResultFormat, data: Bytes) -> Self { - Self { format, data } - } - - pub fn from_raw_rows(data: Bytes) -> Self { - Self { - format: ViewResultFormat::Rows, - data, - } - } - - pub fn with_header(data: Bytes) -> Self { - Self { - format: ViewResultFormat::HeaderFirst, - data, - } - } + HeaderFirst(Bytes), } pub type ViewExecuteResult = ExecutionResult; @@ -1019,33 +990,27 @@ impl InstanceCommon { } // Materialize anonymous view (Ok(bytes), None) => { - if bytes.format != ViewResultFormat::Rows { + let ViewReturnData::Rows(bytes) = bytes else { unimplemented!("View returned a non-rows format"); - } - stdb.materialize_anonymous_view( - &mut tx, - table_id, - row_type, - bytes.data, - self.info.module_def.typespace(), - ) - .inspect_err(|err| { - log::error!("Fatal error materializing view `{view_name}`: {err}"); - }) - .expect("Fatal error materializing view"); + }; + stdb.materialize_anonymous_view(&mut tx, table_id, row_type, bytes, self.info.module_def.typespace()) + .inspect_err(|err| { + log::error!("Fatal error materializing view `{view_name}`: {err}"); + }) + .expect("Fatal error materializing view"); ViewOutcome::Success } // Materialize sender view (Ok(bytes), Some(sender)) => { - if bytes.format != ViewResultFormat::Rows { + let ViewReturnData::Rows(bytes) = bytes else { unimplemented!("View returned a non-rows format"); - } + }; stdb.materialize_view( &mut tx, table_id, sender, row_type, - bytes.data, + bytes, self.info.module_def.typespace(), ) .inspect_err(|err| { diff --git a/crates/core/src/host/wasmtime/wasmtime_module.rs b/crates/core/src/host/wasmtime/wasmtime_module.rs index 7ce6e66084d..74d78b0c226 100644 --- a/crates/core/src/host/wasmtime/wasmtime_module.rs +++ b/crates/core/src/host/wasmtime/wasmtime_module.rs @@ -114,8 +114,8 @@ fn handle_result_sink_code(code: i32, result: Vec) -> Result, Execut /// For views, we treat the return code 2 as a successful return using the header format. fn handle_view_result_sink_code(code: i32, result: Vec) -> Result { match code { - 0 => Ok(ViewReturnData::from_raw_rows(result.into())), - 2 => Ok(ViewReturnData::with_header(result.into())), + 0 => Ok(ViewReturnData::Rows(result.into())), + 2 => Ok(ViewReturnData::HeaderFirst(result.into())), CALL_FAILURE => Err(ExecutionError::User(string_from_utf8_lossy_owned(result).into())), _ => Err(ExecutionError::Recoverable(anyhow::anyhow!("unknown return code"))), } From 6b7c03b91dd7dc684e27370c9a4873382c6a5444 Mon Sep 17 00:00:00 2001 From: Jeffrey Dallatezza Date: Wed, 19 Nov 2025 10:20:51 -0800 Subject: [PATCH 09/16] use lower case object --- crates/bindings-typescript/src/server/sys.d.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/bindings-typescript/src/server/sys.d.ts b/crates/bindings-typescript/src/server/sys.d.ts index 253f9de0d1b..2b34ef3197f 100644 --- a/crates/bindings-typescript/src/server/sys.d.ts +++ b/crates/bindings-typescript/src/server/sys.d.ts @@ -69,8 +69,8 @@ declare module 'spacetime:sys@1.0' { declare module 'spacetime:sys@1.1' { export type ModuleHooks = { - __call_view__(id: u32, sender: u256, args: Uint8Array): Uint8Array | Object; - __call_view_anon__(id: u32, args: Uint8Array): Uint8Array | Object; + __call_view__(id: u32, sender: u256, args: Uint8Array): Uint8Array | object; + __call_view_anon__(id: u32, args: Uint8Array): Uint8Array | object; }; export function register_hooks(hooks: ModuleHooks); From ee5b0422d76363304d555289800b7a177eaea891 Mon Sep 17 00:00:00 2001 From: Jeffrey Dallatezza Date: Thu, 20 Nov 2025 14:58:57 -0800 Subject: [PATCH 10/16] Combine some more code between the view paths --- crates/core/src/db/relational_db.rs | 160 +++++++----------- .../src/host/wasm_common/module_host_actor.rs | 61 +++++-- 2 files changed, 101 insertions(+), 120 deletions(-) diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 5e9fd5a864c..e4c537a8cbf 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -5,7 +5,6 @@ use crate::subscription::ExecutionCounters; use crate::util::{asyncify, spawn_rayon}; use crate::worker_metrics::WORKER_METRICS; use anyhow::{anyhow, Context}; -use bytes::Bytes; use enum_map::EnumMap; use fs2::FileExt; use spacetimedb_commitlog::repo::OnNewSegmentFn; @@ -41,13 +40,13 @@ use spacetimedb_lib::db::raw_def::v9::{btree, RawModuleDefV9Builder, RawSql}; use spacetimedb_lib::de::DeserializeSeed; use spacetimedb_lib::st_var::StVarValue; use spacetimedb_lib::Identity; -use spacetimedb_lib::{bsatn, ConnectionId}; +use spacetimedb_lib::ConnectionId; use spacetimedb_paths::server::{CommitLogDir, ReplicaDir, SnapshotsPath}; use spacetimedb_primitives::*; use spacetimedb_sats::algebraic_type::fmt::fmt_algebraic_type; use spacetimedb_sats::memory_usage::MemoryUsage; use spacetimedb_sats::{ - AlgebraicType, AlgebraicTypeRef, AlgebraicValue, ProductType, ProductValue, Typespace, WithTypespace, + AlgebraicType, AlgebraicValue, ProductType, ProductValue, }; use spacetimedb_schema::def::{ModuleDef, TableDef, ViewDef}; use spacetimedb_schema::schema::{ @@ -1593,29 +1592,24 @@ impl RelationalDB { }) } - /// Write `bytes` into a (sender) view's backing table. + /// Write `rows` into a (sender) view's backing table. /// /// # Process /// 1. Delete all rows for `sender` from the view's backing table - /// 2. Deserialize `bytes` - /// 3. Insert the new rows into the backing table + /// 2. Insert the new rows into the backing table /// /// # Arguments /// * `tx` - Mutable transaction context /// * `table_id` - The id of the view's backing table /// * `sender` - The calling identity of the view being updated - /// * `row_type` - Expected return type of the view - /// * `bytes` - An array of product values (bsatn encoded) - /// * `typespace` - Type information for deserialization + /// * `rows` - Product values to insert #[allow(clippy::too_many_arguments)] pub fn materialize_view( &self, tx: &mut MutTxId, table_id: TableId, sender: Identity, - row_type: AlgebraicTypeRef, - bytes: Bytes, - typespace: &Typespace, + rows: Vec, ) -> Result<(), DBError> { // Delete rows for `sender` from the backing table let rows_to_delete = self @@ -1624,91 +1618,69 @@ impl RelationalDB { .collect::>(); self.delete(tx, table_id, rows_to_delete); - // Deserialize the return rows. - // The return type is expected to be an array of products. - let row_type = typespace.resolve(row_type); - let ret_type = AlgebraicType::array(row_type.ty().clone()); - let seed = WithTypespace::new(typespace, &ret_type); - let rows = seed - .deserialize(bsatn::Deserializer::new(&mut &bytes[..])) - .map_err(|e| DatastoreError::from(ViewError::DeserializeReturn(e.to_string())))?; - - // Insert new rows into the backing table - for product in rows - .into_array() - .map_err(|_| ViewError::SerializeRow) - .map_err(DatastoreError::from)? - .into_iter() - { - let product = product - .into_product() - .map_err(|_| ViewError::SerializeRow) - .map_err(DatastoreError::from)?; - self.insert( - tx, - table_id, - &ProductValue::from_iter(std::iter::once(sender.into()).chain(product.elements)) - .to_bsatn_vec() - .map_err(|_| ViewError::SerializeRow) - .map_err(DatastoreError::from)?, - )?; - } + self.write_view_rows(tx, table_id, rows, Some(sender))?; Ok(()) } - /// Write `bytes` into an anonymous view's backing table. + /// Write `rows` into an anonymous view's backing table. /// /// # Process /// 1. Clear the view's backing table - /// 2. Deserialize `bytes` - /// 3. Insert the new rows into the backing table + /// 2. Insert the new rows into the backing table /// /// # Arguments /// * `tx` - Mutable transaction context /// * `table_id` - The id of the view's backing table - /// * `row_type` - Expected return type of the view - /// * `bytes` - An array of product values (bsatn encoded) - /// * `typespace` - Type information for deserialization + /// * `rows` - Product values to insert #[allow(clippy::too_many_arguments)] pub fn materialize_anonymous_view( &self, tx: &mut MutTxId, table_id: TableId, - row_type: AlgebraicTypeRef, - bytes: Bytes, - typespace: &Typespace, + rows: Vec, ) -> Result<(), DBError> { // Clear entire backing table self.clear_table(tx, table_id)?; - // Deserialize the return rows. - // The return type is expected to be an array of products. - let row_type = typespace.resolve(row_type); - let ret_type = AlgebraicType::array(row_type.ty().clone()); - let seed = WithTypespace::new(typespace, &ret_type); - let rows = seed - .deserialize(bsatn::Deserializer::new(&mut &bytes[..])) - .map_err(|e| DatastoreError::from(ViewError::DeserializeReturn(e.to_string())))?; - - // Insert new rows into the backing table - for product in rows - .into_array() - .map_err(|_| ViewError::SerializeRow) - .map_err(DatastoreError::from)? - .into_iter() - { - self.insert( - tx, - table_id, - &product - .into_product() - .map_err(|_| ViewError::SerializeRow) - .map_err(DatastoreError::from)? - .to_bsatn_vec() - .map_err(|_| ViewError::SerializeRow) - .map_err(DatastoreError::from)?, - )?; + self.write_view_rows(tx, table_id, rows, None)?; + + Ok(()) + } + + fn write_view_rows( + &self, + tx: &mut MutTxId, + table_id: TableId, + rows: Vec, + sender: Option, + ) -> Result<(), DBError> { + match sender { + Some(sender) => { + for product in rows { + let value = ProductValue::from_iter(std::iter::once(sender.into()).chain(product.elements)); + self.insert( + tx, + table_id, + &value + .to_bsatn_vec() + .map_err(|_| ViewError::SerializeRow) + .map_err(DatastoreError::from)?, + )?; + } + } + None => { + for product in rows { + self.insert( + tx, + table_id, + &product + .to_bsatn_vec() + .map_err(|_| ViewError::SerializeRow) + .map_err(DatastoreError::from)?, + )?; + } + } } Ok(()) @@ -2592,24 +2564,12 @@ mod tests { Ok((view_id, table_id, module_def.clone(), view_def.clone())) } - fn insert_view_row( - stdb: &TestDB, - view_id: ViewId, - table_id: TableId, - typespace: &Typespace, - row_type: AlgebraicTypeRef, - sender: Identity, - v: u8, - ) -> ResultTest<()> { - let to_bsatn = |pv: &ProductValue| { - Bytes::from(bsatn::to_vec(&AlgebraicValue::Array([pv.clone()].into())).expect("bstan serialization failed")) - }; - + fn insert_view_row(stdb: &TestDB, view_id: ViewId, table_id: TableId, sender: Identity, v: u8) -> ResultTest<()> { let row_pv = |v: u8| product![v]; let mut tx = begin_mut_tx(stdb); tx.subscribe_view(view_id, ArgId::SENTINEL, sender)?; - stdb.materialize_view(&mut tx, table_id, sender, row_type, to_bsatn(&row_pv(v)), typespace)?; + stdb.materialize_view(&mut tx, table_id, sender, vec![row_pv(v)])?; stdb.commit_tx(tx)?; Ok(()) @@ -2633,13 +2593,11 @@ mod tests { fn test_view_tables_are_ephemeral() -> ResultTest<()> { let stdb = TestDB::durable()?; - let (view_id, table_id, module_def, view_def) = setup_view(&stdb)?; - let row_type = view_def.product_type_ref; - let typespace = module_def.typespace(); + let (view_id, table_id, _, _) = setup_view(&stdb)?; // Write some rows (reusing the same helper) - insert_view_row(&stdb, view_id, table_id, typespace, row_type, Identity::ONE, 10)?; - insert_view_row(&stdb, view_id, table_id, typespace, row_type, Identity::ZERO, 20)?; + insert_view_row(&stdb, view_id, table_id, Identity::ONE, 10)?; + insert_view_row(&stdb, view_id, table_id, Identity::ZERO, 20)?; assert!( !project_views(&stdb, table_id, Identity::ZERO).is_empty(), @@ -2668,14 +2626,12 @@ mod tests { fn test_views() -> ResultTest<()> { let stdb = TestDB::durable()?; - let (view_id, table_id, module_def, view_def) = setup_view(&stdb)?; - let row_type = view_def.product_type_ref; - let typespace = module_def.typespace(); + let (view_id, table_id, _, _) = setup_view(&stdb)?; let sender1 = Identity::ONE; // Sender 1 insert - insert_view_row(&stdb, view_id, table_id, typespace, row_type, sender1, 42)?; + insert_view_row(&stdb, view_id, table_id, sender1, 42)?; assert_eq!( project_views(&stdb, table_id, sender1)[0], @@ -2686,7 +2642,7 @@ mod tests { // Sender 2 insert let sender2 = Identity::ZERO; let before_sender2 = Instant::now(); - insert_view_row(&stdb, view_id, table_id, typespace, row_type, sender2, 84)?; + insert_view_row(&stdb, view_id, table_id, sender2, 84)?; assert_eq!( project_views(&stdb, table_id, sender2)[0], @@ -2712,7 +2668,7 @@ mod tests { stdb.commit_tx(tx)?; // Reinsert after restart - insert_view_row(&stdb, view_id, table_id, typespace, row_type, sender2, 91)?; + insert_view_row(&stdb, view_id, table_id, sender2, 91)?; assert_eq!( project_views(&stdb, table_id, sender2)[0], product![91u8], diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index e2c9aeab04d..6a561eec143 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -3,6 +3,7 @@ use super::*; use crate::client::ClientActorId; use crate::database_logger; use crate::energy::{EnergyMonitor, FunctionBudget, FunctionFingerprint}; +use crate::error::DBError; use crate::host::host_controller::CallProcedureReturn; use crate::host::instance_env::{InstanceEnv, TxSlot}; use crate::host::module_common::{build_common_module_from_raw, ModuleCommon}; @@ -30,6 +31,7 @@ use core::time::Duration; use prometheus::{Histogram, IntCounter, IntGauge}; use spacetimedb_auth::identity::ConnectionAuthCtx; use spacetimedb_datastore::db_metrics::DB_METRICS; +use spacetimedb_datastore::error::{DatastoreError, ViewError}; use spacetimedb_datastore::execution_context::{self, ReducerContext, Workload}; use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, MutTxId, ViewCallInfo}; use spacetimedb_datastore::traits::{IsolationLevel, Program}; @@ -39,6 +41,7 @@ use spacetimedb_lib::de::DeserializeSeed; use spacetimedb_lib::identity::AuthCtx; use spacetimedb_lib::{bsatn, ConnectionId, RawModuleDef, Timestamp}; use spacetimedb_primitives::{ProcedureId, TableId, ViewFnPtr, ViewId}; +use spacetimedb_sats::{AlgebraicType, AlgebraicTypeRef, ProductValue, Typespace, WithTypespace}; use spacetimedb_schema::auto_migrate::{MigratePlan, MigrationPolicy, MigrationPolicyError}; use spacetimedb_schema::def::{ModuleDef, ViewDef}; use std::sync::Arc; @@ -101,6 +104,35 @@ impl EnergyStats { } } +fn deserialize_view_rows( + row_type: AlgebraicTypeRef, + bytes: Bytes, + typespace: &Typespace, +) -> Result, DBError> { + // The return type is expected to be an array of products. + let row_type = typespace.resolve(row_type); + let ret_type = AlgebraicType::array(row_type.ty().clone()); + let seed = WithTypespace::new(typespace, &ret_type); + let rows = seed + .deserialize(bsatn::Deserializer::new(&mut &bytes[..])) + .map_err(|e| DatastoreError::from(ViewError::DeserializeReturn(e.to_string()))) + .map_err(DBError::from)?; + + rows.into_array() + .map_err(|_| ViewError::SerializeRow) + .map_err(DatastoreError::from) + .map_err(DBError::from)? + .into_iter() + .map(|product| { + product + .into_product() + .map_err(|_| ViewError::SerializeRow) + .map_err(DatastoreError::from) + .map_err(DBError::from) + }) + .collect() +} + pub struct ExecutionTimings { pub total_duration: Duration, pub wasm_instance_env_call_times: CallTimes, @@ -960,35 +992,28 @@ impl InstanceCommon { inst.log_traceback("view", &view_name, &anyhow::anyhow!(err)); self.handle_outer_error(&result.stats.energy, &view_name).into() } - // Materialize anonymous view - (Ok(bytes), None) => { + (Ok(bytes), sender) => { let ViewReturnData::Rows(bytes) = bytes else { unimplemented!("View returned a non-rows format"); }; - stdb.materialize_anonymous_view(&mut tx, table_id, row_type, bytes, self.info.module_def.typespace()) + + let typespace = self.info.module_def.typespace(); + let rows = deserialize_view_rows(row_type, bytes, typespace) .inspect_err(|err| { log::error!("Fatal error materializing view `{view_name}`: {err}"); }) .expect("Fatal error materializing view"); - ViewOutcome::Success - } - // Materialize sender view - (Ok(bytes), Some(sender)) => { - let ViewReturnData::Rows(bytes) = bytes else { - unimplemented!("View returned a non-rows format"); + + let res = match sender { + Some(sender) => stdb.materialize_view(&mut tx, table_id, sender, rows), + None => stdb.materialize_anonymous_view(&mut tx, table_id, rows), }; - stdb.materialize_view( - &mut tx, - table_id, - sender, - row_type, - bytes, - self.info.module_def.typespace(), - ) - .inspect_err(|err| { + + res.inspect_err(|err| { log::error!("Fatal error materializing view `{view_name}`: {err}"); }) .expect("Fatal error materializing view"); + ViewOutcome::Success } }; From 0954489fd995d36f9af402a6bd0dff7c35e3b595 Mon Sep 17 00:00:00 2001 From: Jeffrey Dallatezza Date: Thu, 20 Nov 2025 16:18:36 -0800 Subject: [PATCH 11/16] Parse sooner --- crates/core/src/db/relational_db.rs | 6 +- .../src/host/wasm_common/module_host_actor.rs | 63 +++++++++++++++++-- 2 files changed, 59 insertions(+), 10 deletions(-) diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index e4c537a8cbf..cd2d0e9ec24 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -39,15 +39,13 @@ use spacetimedb_lib::db::auth::StAccess; use spacetimedb_lib::db::raw_def::v9::{btree, RawModuleDefV9Builder, RawSql}; use spacetimedb_lib::de::DeserializeSeed; use spacetimedb_lib::st_var::StVarValue; -use spacetimedb_lib::Identity; use spacetimedb_lib::ConnectionId; +use spacetimedb_lib::Identity; use spacetimedb_paths::server::{CommitLogDir, ReplicaDir, SnapshotsPath}; use spacetimedb_primitives::*; use spacetimedb_sats::algebraic_type::fmt::fmt_algebraic_type; use spacetimedb_sats::memory_usage::MemoryUsage; -use spacetimedb_sats::{ - AlgebraicType, AlgebraicValue, ProductType, ProductValue, -}; +use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductValue}; use spacetimedb_schema::def::{ModuleDef, TableDef, ViewDef}; use spacetimedb_schema::schema::{ ColumnSchema, IndexSchema, RowLevelSecuritySchema, Schema, SequenceSchema, TableSchema, diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 6a561eec143..50cdd84090e 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -25,7 +25,8 @@ use crate::subscription::module_subscription_actor::commit_and_broadcast_event; use crate::subscription::module_subscription_manager::TransactionOffset; use crate::util::prometheus_handle::{HistogramExt, TimerGuard}; use crate::worker_metrics::WORKER_METRICS; -use bytes::Bytes; +use anyhow::Context; +use bytes::{Buf, Bytes}; use core::future::Future; use core::time::Duration; use prometheus::{Histogram, IntCounter, IntGauge}; @@ -36,12 +37,12 @@ use spacetimedb_datastore::execution_context::{self, ReducerContext, Workload}; use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, MutTxId, ViewCallInfo}; use spacetimedb_datastore::traits::{IsolationLevel, Program}; use spacetimedb_lib::buffer::DecodeError; -use spacetimedb_lib::db::raw_def::v9::Lifecycle; +use spacetimedb_lib::db::raw_def::v9::{Lifecycle, ViewResultHeader}; use spacetimedb_lib::de::DeserializeSeed; use spacetimedb_lib::identity::AuthCtx; -use spacetimedb_lib::{bsatn, ConnectionId, RawModuleDef, Timestamp}; +use spacetimedb_lib::{bsatn, ConnectionId, ProductType, RawModuleDef, Timestamp}; use spacetimedb_primitives::{ProcedureId, TableId, ViewFnPtr, ViewId}; -use spacetimedb_sats::{AlgebraicType, AlgebraicTypeRef, ProductValue, Typespace, WithTypespace}; +use spacetimedb_sats::{AlgebraicType, AlgebraicTypeRef, Deserialize, ProductValue, Typespace, WithTypespace}; use spacetimedb_schema::auto_migrate::{MigratePlan, MigrationPolicy, MigrationPolicyError}; use spacetimedb_schema::def::{ModuleDef, ViewDef}; use std::sync::Arc; @@ -197,6 +198,49 @@ pub enum ViewReturnData { HeaderFirst(Bytes), } +pub enum ViewResult { + Rows(Bytes), + RawSql(String), + ParameterizedQuery { + query: String, + parameters: Bytes, + parameter_types: Option, + }, +} + +impl ViewResult { + pub fn from_return_data(data: ViewReturnData) -> Result { + match data { + ViewReturnData::Rows(bytes) => Ok(ViewResult::Rows(bytes)), + ViewReturnData::HeaderFirst(bytes) => { + let mut reader = &bytes[..]; + let header = { + let deserializer = bsatn::Deserializer::new(&mut reader); + ViewResultHeader::deserialize(deserializer) + .context("failed to deserialize ViewResultHeader from view return data")? + }; + match header { + ViewResultHeader::RawSql(query) => Ok(ViewResult::RawSql(query)), + ViewResultHeader::RowData => { + let at = bytes.len() - reader.remaining(); + let remaining_bytes = bytes.slice(at..); + Ok(ViewResult::Rows(remaining_bytes)) + } + ViewResultHeader::ParameterizedQuery(header) => { + let at = bytes.len() - reader.remaining(); + let remaining_bytes = bytes.slice(at..); + Ok(ViewResult::ParameterizedQuery { + query: header.template, + parameters: remaining_bytes, + parameter_types: header.parameter_types, + }) + } + } + } + } + } +} + pub type ViewExecuteResult = ExecutionResult; pub type ProcedureExecuteResult = ExecutionResult; @@ -992,8 +1036,15 @@ impl InstanceCommon { inst.log_traceback("view", &view_name, &anyhow::anyhow!(err)); self.handle_outer_error(&result.stats.energy, &view_name).into() } - (Ok(bytes), sender) => { - let ViewReturnData::Rows(bytes) = bytes else { + (Ok(raw), sender) => { + // TODO: This shouldn't be able to cause fatal errors. + let result = ViewResult::from_return_data(raw) + .inspect_err(|err| { + log::error!("Fatal error parsing result for view `{view_name}`: {err}"); + }) + .expect("Fatal error parsing view result"); + + let ViewResult::Rows(bytes) = result else { unimplemented!("View returned a non-rows format"); }; From 773dcf2429ae4468252d31d29d1a9aa5aae189dc Mon Sep 17 00:00:00 2001 From: Jeffrey Dallatezza Date: Fri, 21 Nov 2025 10:28:36 -0800 Subject: [PATCH 12/16] Add query running --- crates/core/src/db/relational_db.rs | 8 +- .../src/host/wasm_common/module_host_actor.rs | 128 ++++++++++++++++-- 2 files changed, 125 insertions(+), 11 deletions(-) diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index cd2d0e9ec24..bda63f980ed 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -1,6 +1,7 @@ use crate::db::MetricsRecorderQueue; use crate::error::{DBError, DatabaseError, RestoreSnapshotError}; use crate::messages::control_db::HostType; +use crate::sql::ast::SchemaViewer; use crate::subscription::ExecutionCounters; use crate::util::{asyncify, spawn_rayon}; use crate::worker_metrics::WORKER_METRICS; @@ -17,7 +18,7 @@ use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics; use spacetimedb_datastore::locking_tx_datastore::state_view::{ IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, IterTx, StateView, }; -use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId}; +use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId, ViewCallInfo}; use spacetimedb_datastore::system_tables::{ system_tables, StModuleRow, ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_SUB_ID, }; @@ -34,15 +35,19 @@ use spacetimedb_datastore::{ traits::TxData, }; use spacetimedb_durability as durability; +use spacetimedb_execution::pipelined::PipelinedProject; use spacetimedb_lib::bsatn::ToBsatn; use spacetimedb_lib::db::auth::StAccess; use spacetimedb_lib::db::raw_def::v9::{btree, RawModuleDefV9Builder, RawSql}; use spacetimedb_lib::de::DeserializeSeed; +use spacetimedb_lib::identity::AuthCtx; +use spacetimedb_lib::metrics::ExecutionMetrics; use spacetimedb_lib::st_var::StVarValue; use spacetimedb_lib::ConnectionId; use spacetimedb_lib::Identity; use spacetimedb_paths::server::{CommitLogDir, ReplicaDir, SnapshotsPath}; use spacetimedb_primitives::*; +use spacetimedb_query::compile_subscription; use spacetimedb_sats::algebraic_type::fmt::fmt_algebraic_type; use spacetimedb_sats::memory_usage::MemoryUsage; use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductValue}; @@ -1683,6 +1688,7 @@ impl RelationalDB { Ok(()) } + } #[allow(unused)] #[derive(Clone)] diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 50cdd84090e..aba7d862b5a 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -21,11 +21,12 @@ use crate::identity::Identity; use crate::messages::control_db::HostType; use crate::module_host_context::ModuleCreationContextLimited; use crate::replica_context::ReplicaContext; +use crate::sql::ast::SchemaViewer; use crate::subscription::module_subscription_actor::commit_and_broadcast_event; use crate::subscription::module_subscription_manager::TransactionOffset; use crate::util::prometheus_handle::{HistogramExt, TimerGuard}; use crate::worker_metrics::WORKER_METRICS; -use anyhow::Context; +use anyhow::{bail, ensure, Context}; use bytes::{Buf, Bytes}; use core::future::Future; use core::time::Duration; @@ -36,15 +37,19 @@ use spacetimedb_datastore::error::{DatastoreError, ViewError}; use spacetimedb_datastore::execution_context::{self, ReducerContext, Workload}; use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, MutTxId, ViewCallInfo}; use spacetimedb_datastore::traits::{IsolationLevel, Program}; +use spacetimedb_execution::pipelined::PipelinedProject; use spacetimedb_lib::buffer::DecodeError; use spacetimedb_lib::db::raw_def::v9::{Lifecycle, ViewResultHeader}; use spacetimedb_lib::de::DeserializeSeed; use spacetimedb_lib::identity::AuthCtx; +use spacetimedb_lib::metrics::ExecutionMetrics; use spacetimedb_lib::{bsatn, ConnectionId, ProductType, RawModuleDef, Timestamp}; use spacetimedb_primitives::{ProcedureId, TableId, ViewFnPtr, ViewId}; +use spacetimedb_sats::algebraic_type::fmt::fmt_algebraic_type; use spacetimedb_sats::{AlgebraicType, AlgebraicTypeRef, Deserialize, ProductValue, Typespace, WithTypespace}; use spacetimedb_schema::auto_migrate::{MigratePlan, MigrationPolicy, MigrationPolicyError}; use spacetimedb_schema::def::{ModuleDef, ViewDef}; +use spacetimedb_subscription::SubscriptionPlan; use std::sync::Arc; use tracing::span::EnteredSpan; @@ -986,6 +991,12 @@ impl InstanceCommon { } = params; let _outer_span = start_call_function_span(&view_name, &caller, None); + let func_call_type = FuncCallType::View(ViewCallInfo { + view_id, + table_id, + view_name: view_name.clone(), + sender: sender.clone(), + }); let mut tx_slot = inst.tx_slot(); let (mut tx, result) = tx_slot.set(tx, || { @@ -1043,17 +1054,54 @@ impl InstanceCommon { log::error!("Fatal error parsing result for view `{view_name}`: {err}"); }) .expect("Fatal error parsing view result"); - - let ViewResult::Rows(bytes) = result else { - unimplemented!("View returned a non-rows format"); - }; - let typespace = self.info.module_def.typespace(); - let rows = deserialize_view_rows(row_type, bytes, typespace) - .inspect_err(|err| { - log::error!("Fatal error materializing view `{view_name}`: {err}"); + let row_product_type = typespace + .resolve(row_type) + .ty() + .clone() + .into_product() + .inspect_err(|t| { + log::error!("Fatal error resolving row type for view `{view_name}`: {t:?}"); }) - .expect("Fatal error materializing view"); + .expect("Fatal error resolving row type for view"); + + let rows = match result { + ViewResult::Rows(bytes) => { + let rows = deserialize_view_rows(row_type, bytes, typespace) + .inspect_err(|err| { + log::error!("Fatal error materializing view `{view_name}`: {err}"); + }) + .expect("Fatal error materializing view"); + rows + } + ViewResult::RawSql(query) => self + .run_query_for_view( + &mut tx, + &query, + &row_product_type, + &ViewCallInfo { + view_id, + table_id, + view_name: view_name.clone(), + sender: sender.clone(), + }, + ) + .inspect_err(|err| { + log::error!("Fatal error executing raw SQL for view `{view_name}`: {err}"); + }) + .expect("Fatal error executing raw SQL for view"), + _ => unimplemented!("Parameterized query is not supported"), + }; + // let ViewResult::Rows(bytes) = result else { + // unimplemented!("View returned a non-rows format"); + // }; + + // let typespace = self.info.module_def.typespace(); + // let rows = deserialize_view_rows(row_type, bytes, typespace) + // .inspect_err(|err| { + // log::error!("Fatal error materializing view `{view_name}`: {err}"); + // }) + // .expect("Fatal error materializing view"); let res = match sender { Some(sender) => stdb.materialize_view(&mut tx, table_id, sender, rows), @@ -1080,6 +1128,66 @@ impl InstanceCommon { (res, trapped) } + fn run_query_for_view( + &self, + tx: &mut MutTxId, + the_query: &str, + expected_row_type: &ProductType, + call_info: &ViewCallInfo, + ) -> anyhow::Result> { + if the_query.trim().is_empty() { + return Ok(Vec::new()); + } + + // Views bypass RLS. + let auth = AuthCtx::for_current(self.info.database_identity); + let schema_view = SchemaViewer::new(&*tx, &auth); + + // Compile to subscription plans. + let (plans, has_params) = SubscriptionPlan::compile(the_query, &schema_view, &auth)?; + ensure!( + !has_params, + "parameterized SQL is not supported for view materialization yet" + ); + + // Validate shape and disallow views-on-views. + for plan in &plans { + let phys = plan.optimized_physical_plan(); + let Some(source_schema) = phys.return_table() else { + bail!("query does not return plain table rows"); + }; + if phys.reads_from_view(true) || phys.reads_from_view(false) { + bail!("view definition cannot read from other views"); + } + if source_schema.row_type != *expected_row_type { + bail!( + "query returns `{}` but view expects `{}`", + fmt_algebraic_type(&AlgebraicType::Product(source_schema.row_type.clone())), + fmt_algebraic_type(&AlgebraicType::Product(expected_row_type.clone())), + ); + } + } + + let op = FuncCallType::View(call_info.clone()); + let mut metrics = ExecutionMetrics::default(); + let mut rows = Vec::new(); + + for plan in plans { + // Track read sets for all tables involved in this plan. + // TODO(jsdt): This means we will rerun the view and query for any change to these tables, so we should optimize this asap. + for table_id in plan.table_ids() { + tx.record_table_scan(&op, table_id); + } + + let pipelined = PipelinedProject::from(plan.optimized_physical_plan().clone()); + pipelined.execute(&*tx, &mut metrics, &mut |row| { + rows.push(row.to_product_value()); + Ok(()) + })?; + } + + Ok(rows) + } /// A [`MutTxId`] knows which views must be updated (re-evaluated). /// This method re-evaluates them and updates their backing tables. pub(crate) fn call_views_with_tx( From 89a45b3bbd964333f475d6537ff486010d5f78b6 Mon Sep 17 00:00:00 2001 From: Jeffrey Dallatezza Date: Fri, 21 Nov 2025 13:23:53 -0800 Subject: [PATCH 13/16] Handle errors running view queries --- crates/core/src/db/relational_db.rs | 9 +- .../src/host/wasm_common/module_host_actor.rs | 119 +++++++----------- 2 files changed, 48 insertions(+), 80 deletions(-) diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index bda63f980ed..a800f1f8e2c 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -1,7 +1,6 @@ use crate::db::MetricsRecorderQueue; use crate::error::{DBError, DatabaseError, RestoreSnapshotError}; use crate::messages::control_db::HostType; -use crate::sql::ast::SchemaViewer; use crate::subscription::ExecutionCounters; use crate::util::{asyncify, spawn_rayon}; use crate::worker_metrics::WORKER_METRICS; @@ -18,7 +17,7 @@ use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics; use spacetimedb_datastore::locking_tx_datastore::state_view::{ IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, IterTx, StateView, }; -use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId, ViewCallInfo}; +use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId}; use spacetimedb_datastore::system_tables::{ system_tables, StModuleRow, ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_SUB_ID, }; @@ -35,19 +34,14 @@ use spacetimedb_datastore::{ traits::TxData, }; use spacetimedb_durability as durability; -use spacetimedb_execution::pipelined::PipelinedProject; use spacetimedb_lib::bsatn::ToBsatn; use spacetimedb_lib::db::auth::StAccess; use spacetimedb_lib::db::raw_def::v9::{btree, RawModuleDefV9Builder, RawSql}; -use spacetimedb_lib::de::DeserializeSeed; -use spacetimedb_lib::identity::AuthCtx; -use spacetimedb_lib::metrics::ExecutionMetrics; use spacetimedb_lib::st_var::StVarValue; use spacetimedb_lib::ConnectionId; use spacetimedb_lib::Identity; use spacetimedb_paths::server::{CommitLogDir, ReplicaDir, SnapshotsPath}; use spacetimedb_primitives::*; -use spacetimedb_query::compile_subscription; use spacetimedb_sats::algebraic_type::fmt::fmt_algebraic_type; use spacetimedb_sats::memory_usage::MemoryUsage; use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductValue}; @@ -1688,7 +1682,6 @@ impl RelationalDB { Ok(()) } - } #[allow(unused)] #[derive(Clone)] diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index aba7d862b5a..0a21306f99f 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -26,7 +26,7 @@ use crate::subscription::module_subscription_actor::commit_and_broadcast_event; use crate::subscription::module_subscription_manager::TransactionOffset; use crate::util::prometheus_handle::{HistogramExt, TimerGuard}; use crate::worker_metrics::WORKER_METRICS; -use anyhow::{bail, ensure, Context}; +use anyhow::{anyhow, bail, ensure, Context}; use bytes::{Buf, Bytes}; use core::future::Future; use core::time::Duration; @@ -991,12 +991,6 @@ impl InstanceCommon { } = params; let _outer_span = start_call_function_span(&view_name, &caller, None); - let func_call_type = FuncCallType::View(ViewCallInfo { - view_id, - table_id, - view_name: view_name.clone(), - sender: sender.clone(), - }); let mut tx_slot = inst.tx_slot(); let (mut tx, result) = tx_slot.set(tx, || { @@ -1037,7 +1031,7 @@ impl InstanceCommon { let trapped = matches!(result.call_result, Err(ExecutionError::Trap(_))); - let outcome = match (result.call_result, sender) { + let outcome: ViewOutcome = match (result.call_result, sender) { (Err(ExecutionError::Recoverable(err) | ExecutionError::Trap(err)), _) => { inst.log_traceback("view", &view_name, &err); self.handle_outer_error(&result.stats.energy, &view_name).into() @@ -1048,72 +1042,52 @@ impl InstanceCommon { self.handle_outer_error(&result.stats.energy, &view_name).into() } (Ok(raw), sender) => { - // TODO: This shouldn't be able to cause fatal errors. - let result = ViewResult::from_return_data(raw) - .inspect_err(|err| { - log::error!("Fatal error parsing result for view `{view_name}`: {err}"); - }) - .expect("Fatal error parsing view result"); - let typespace = self.info.module_def.typespace(); - let row_product_type = typespace - .resolve(row_type) - .ty() - .clone() - .into_product() - .inspect_err(|t| { - log::error!("Fatal error resolving row type for view `{view_name}`: {t:?}"); - }) - .expect("Fatal error resolving row type for view"); - - let rows = match result { - ViewResult::Rows(bytes) => { - let rows = deserialize_view_rows(row_type, bytes, typespace) - .inspect_err(|err| { - log::error!("Fatal error materializing view `{view_name}`: {err}"); - }) - .expect("Fatal error materializing view"); - rows - } - ViewResult::RawSql(query) => self - .run_query_for_view( - &mut tx, - &query, - &row_product_type, - &ViewCallInfo { - view_id, - table_id, - view_name: view_name.clone(), - sender: sender.clone(), - }, - ) - .inspect_err(|err| { - log::error!("Fatal error executing raw SQL for view `{view_name}`: {err}"); - }) - .expect("Fatal error executing raw SQL for view"), - _ => unimplemented!("Parameterized query is not supported"), - }; - // let ViewResult::Rows(bytes) = result else { - // unimplemented!("View returned a non-rows format"); - // }; - - // let typespace = self.info.module_def.typespace(); - // let rows = deserialize_view_rows(row_type, bytes, typespace) - // .inspect_err(|err| { - // log::error!("Fatal error materializing view `{view_name}`: {err}"); - // }) - // .expect("Fatal error materializing view"); - - let res = match sender { - Some(sender) => stdb.materialize_view(&mut tx, table_id, sender, rows), - None => stdb.materialize_anonymous_view(&mut tx, table_id, rows), - }; + // This is wrapped in a closure to simplify error handling. + let outcome: Result = (|| { + let result = ViewResult::from_return_data(raw).context("Error parsing view result")?; + let typespace = self.info.module_def.typespace(); + let row_product_type = typespace + .resolve(row_type) + .ty() + .clone() + .into_product() + .map_err(|_| anyhow!("Error resolving row type for view"))?; + + let rows = match result { + ViewResult::Rows(bytes) => deserialize_view_rows(row_type, bytes, typespace) + .context("Error deserializing rows returned by view".to_string())?, + ViewResult::RawSql(query) => self + .run_query_for_view( + &mut tx, + &query, + &row_product_type, + &ViewCallInfo { + view_id, + table_id, + view_name: view_name.clone(), + sender, + }, + ) + .context("Error executing raw SQL returned by view".to_string())?, + _ => anyhow::bail!("View returned an unsupported format"), + }; + + let res = match sender { + Some(sender) => stdb.materialize_view(&mut tx, table_id, sender, rows), + None => stdb.materialize_anonymous_view(&mut tx, table_id, rows), + }; - res.inspect_err(|err| { - log::error!("Fatal error materializing view `{view_name}`: {err}"); - }) - .expect("Fatal error materializing view"); + res.context("Error materializing view")?; - ViewOutcome::Success + Ok(ViewOutcome::Success) + })(); + match outcome { + Ok(outcome) => outcome, + Err(err) => { + log::error!("Error materializing view `{view_name}`: {err:?}"); + ViewOutcome::Failed(format!("Error materializing view `{view_name}`: {err}")) + } + } } }; @@ -1135,6 +1109,7 @@ impl InstanceCommon { expected_row_type: &ProductType, call_info: &ViewCallInfo, ) -> anyhow::Result> { + log::info!("Materializing view `{}` with query: {}", call_info.view_name, the_query); if the_query.trim().is_empty() { return Ok(Vec::new()); } From 719274d2909d14a9fe38584d76b5107ede1a5a01 Mon Sep 17 00:00:00 2001 From: Jeffrey Dallatezza Date: Tue, 25 Nov 2025 10:18:53 -0800 Subject: [PATCH 14/16] Remove parameterized query type --- .../src/host/wasm_common/module_host_actor.rs | 17 ++--------------- crates/lib/src/db/raw_def/v9.rs | 16 ++-------------- 2 files changed, 4 insertions(+), 29 deletions(-) diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 68c1e2fc543..a00d7426256 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -203,14 +203,11 @@ pub enum ViewReturnData { HeaderFirst(Bytes), } +// A view result after processing the return header. pub enum ViewResult { + // The rows are encoded as a bsatn array of products. Rows(Bytes), RawSql(String), - ParameterizedQuery { - query: String, - parameters: Bytes, - parameter_types: Option, - }, } impl ViewResult { @@ -231,15 +228,6 @@ impl ViewResult { let remaining_bytes = bytes.slice(at..); Ok(ViewResult::Rows(remaining_bytes)) } - ViewResultHeader::ParameterizedQuery(header) => { - let at = bytes.len() - reader.remaining(); - let remaining_bytes = bytes.slice(at..); - Ok(ViewResult::ParameterizedQuery { - query: header.template, - parameters: remaining_bytes, - parameter_types: header.parameter_types, - }) - } } } } @@ -1081,7 +1069,6 @@ impl InstanceCommon { }, ) .context("Error executing raw SQL returned by view".to_string())?, - _ => anyhow::bail!("View returned an unsupported format"), }; let res = match sender { diff --git a/crates/lib/src/db/raw_def/v9.rs b/crates/lib/src/db/raw_def/v9.rs index a47b0705a18..3037aba8471 100644 --- a/crates/lib/src/db/raw_def/v9.rs +++ b/crates/lib/src/db/raw_def/v9.rs @@ -537,20 +537,8 @@ pub enum ViewResultHeader { RowData, // This means we the view wants to return the results of the sql query. RawSql(String), - // This means we the view wants to return the results of a sql query. - // Any query parameters will follow the header. - ParameterizedQuery(ParameterizedQueryHeader), -} - -#[derive(Debug, Clone, SpacetimeType)] -// A header for a parameterized query. This should be followed by a bsatn encoding of any parameters. -#[sats(crate = crate)] -pub struct ParameterizedQueryHeader { - // The sql query template. Add details on parameter syntax when it is supported. - pub template: String, - // If set, these are the types of the parameters. - // This is optional to support parameter inference in the future. - pub parameter_types: Option, + // We can add an option for parameterized queries later, + // which would make it easier to cache query plans on the host side. } /// A reducer definition. From 5c25c53cc545f89f88fed330bda8a7ce04faa947 Mon Sep 17 00:00:00 2001 From: Jeffrey Dallatezza Date: Tue, 25 Nov 2025 11:00:41 -0800 Subject: [PATCH 15/16] Add import --- crates/core/src/db/relational_db.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index a800f1f8e2c..77a75a83be5 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -2400,6 +2400,7 @@ mod tests { begin_tx, insert, make_snapshot, with_auto_commit, with_read_only, TestDB, }; use anyhow::bail; + use bytes::Bytes; use commitlog::payload::txdata; use commitlog::Commitlog; use durability::EmptyHistory; From a7954934d9235eb3c8bc5259f86ba5ab755a9cda Mon Sep 17 00:00:00 2001 From: Jeffrey Dallatezza Date: Fri, 28 Nov 2025 14:14:45 -0800 Subject: [PATCH 16/16] Update comments --- crates/core/src/host/wasm_common/module_host_actor.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index b1cfbefcbae..81a641e2514 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -1108,6 +1108,9 @@ impl InstanceCommon { (res, trapped) } + /// Compiles and runs a query that was returned from a view. + /// This tracks read dependencies for the view. + /// Note that this doesn't modify the resulting rows in any way. fn run_query_for_view( &self, tx: &mut MutTxId, @@ -1119,7 +1122,7 @@ impl InstanceCommon { return Ok(Vec::new()); } - // Views bypass RLS. + // Views bypass RLS, since views should enforce their own access control procedurally. let auth = AuthCtx::for_current(self.info.database_identity); let schema_view = SchemaViewer::new(&*tx, &auth);