Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/bindings-typescript/src/server/sys.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ declare module 'spacetime:sys@1.0' {

declare module 'spacetime:sys@1.1' {
export type ModuleHooks = {
__call_view__(id: u32, sender: u256, args: Uint8Array): Uint8Array;
__call_view_anon__(id: u32, args: Uint8Array): Uint8Array;
__call_view__(id: u32, sender: u256, args: Uint8Array): Uint8Array | object;
__call_view_anon__(id: u32, args: Uint8Array): Uint8Array | object;
};

export function register_hooks(hooks: ModuleHooks);
Expand Down
2 changes: 2 additions & 0 deletions crates/codegen/examples/regen-typescript-moduledef.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use fs_err as fs;
use regex::Regex;
use spacetimedb_codegen::{generate, typescript, OutputFile};
use spacetimedb_lib::db::raw_def::v9::ViewResultHeader;
use spacetimedb_lib::{RawModuleDef, RawModuleDefV8};
use spacetimedb_schema::def::ModuleDef;
use std::path::Path;
Expand All @@ -22,6 +23,7 @@ macro_rules! regex_replace {
fn main() -> anyhow::Result<()> {
let module = RawModuleDefV8::with_builder(|module| {
module.add_type::<RawModuleDef>();
module.add_type::<ViewResultHeader>();
module.add_type::<spacetimedb_lib::http::Request>();
module.add_type::<spacetimedb_lib::http::Response>();
});
Expand Down
164 changes: 59 additions & 105 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::subscription::ExecutionCounters;
use crate::util::{asyncify, spawn_rayon};
use crate::worker_metrics::WORKER_METRICS;
use anyhow::{anyhow, Context};
use bytes::Bytes;
use enum_map::EnumMap;
use fs2::FileExt;
use spacetimedb_commitlog::repo::OnNewSegmentFn;
Expand Down Expand Up @@ -38,17 +37,14 @@ use spacetimedb_durability as durability;
use spacetimedb_lib::bsatn::ToBsatn;
use spacetimedb_lib::db::auth::StAccess;
use spacetimedb_lib::db::raw_def::v9::{btree, RawModuleDefV9Builder, RawSql};
use spacetimedb_lib::de::DeserializeSeed;
use spacetimedb_lib::st_var::StVarValue;
use spacetimedb_lib::ConnectionId;
use spacetimedb_lib::Identity;
use spacetimedb_lib::{bsatn, ConnectionId};
use spacetimedb_paths::server::{CommitLogDir, ReplicaDir, SnapshotsPath};
use spacetimedb_primitives::*;
use spacetimedb_sats::algebraic_type::fmt::fmt_algebraic_type;
use spacetimedb_sats::memory_usage::MemoryUsage;
use spacetimedb_sats::{
AlgebraicType, AlgebraicTypeRef, AlgebraicValue, ProductType, ProductValue, Typespace, WithTypespace,
};
use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductValue};
use spacetimedb_schema::def::{ModuleDef, TableDef, ViewDef};
use spacetimedb_schema::schema::{
ColumnSchema, IndexSchema, RowLevelSecuritySchema, Schema, SequenceSchema, TableSchema,
Expand Down Expand Up @@ -1593,29 +1589,24 @@ impl RelationalDB {
})
}

/// Write `bytes` into a (sender) view's backing table.
/// Write `rows` into a (sender) view's backing table.
///
/// # Process
/// 1. Delete all rows for `sender` from the view's backing table
/// 2. Deserialize `bytes`
/// 3. Insert the new rows into the backing table
/// 2. Insert the new rows into the backing table
///
/// # Arguments
/// * `tx` - Mutable transaction context
/// * `table_id` - The id of the view's backing table
/// * `sender` - The calling identity of the view being updated
/// * `row_type` - Expected return type of the view
/// * `bytes` - An array of product values (bsatn encoded)
/// * `typespace` - Type information for deserialization
/// * `rows` - Product values to insert
#[allow(clippy::too_many_arguments)]
pub fn materialize_view(
&self,
tx: &mut MutTxId,
table_id: TableId,
sender: Identity,
row_type: AlgebraicTypeRef,
bytes: Bytes,
typespace: &Typespace,
rows: Vec<ProductValue>,
) -> Result<(), DBError> {
// Delete rows for `sender` from the backing table
let rows_to_delete = self
Expand All @@ -1624,91 +1615,69 @@ impl RelationalDB {
.collect::<Vec<_>>();
self.delete(tx, table_id, rows_to_delete);

// Deserialize the return rows.
// The return type is expected to be an array of products.
let row_type = typespace.resolve(row_type);
let ret_type = AlgebraicType::array(row_type.ty().clone());
let seed = WithTypespace::new(typespace, &ret_type);
let rows = seed
.deserialize(bsatn::Deserializer::new(&mut &bytes[..]))
.map_err(|e| DatastoreError::from(ViewError::DeserializeReturn(e.to_string())))?;

// Insert new rows into the backing table
for product in rows
.into_array()
.map_err(|_| ViewError::SerializeRow)
.map_err(DatastoreError::from)?
.into_iter()
{
let product = product
.into_product()
.map_err(|_| ViewError::SerializeRow)
.map_err(DatastoreError::from)?;
self.insert(
tx,
table_id,
&ProductValue::from_iter(std::iter::once(sender.into()).chain(product.elements))
.to_bsatn_vec()
.map_err(|_| ViewError::SerializeRow)
.map_err(DatastoreError::from)?,
)?;
}
self.write_view_rows(tx, table_id, rows, Some(sender))?;

Ok(())
}

/// Write `bytes` into an anonymous view's backing table.
/// Write `rows` into an anonymous view's backing table.
///
/// # Process
/// 1. Clear the view's backing table
/// 2. Deserialize `bytes`
/// 3. Insert the new rows into the backing table
/// 2. Insert the new rows into the backing table
///
/// # Arguments
/// * `tx` - Mutable transaction context
/// * `table_id` - The id of the view's backing table
/// * `row_type` - Expected return type of the view
/// * `bytes` - An array of product values (bsatn encoded)
/// * `typespace` - Type information for deserialization
/// * `rows` - Product values to insert
#[allow(clippy::too_many_arguments)]
pub fn materialize_anonymous_view(
&self,
tx: &mut MutTxId,
table_id: TableId,
row_type: AlgebraicTypeRef,
bytes: Bytes,
typespace: &Typespace,
rows: Vec<ProductValue>,
) -> Result<(), DBError> {
// Clear entire backing table
self.clear_table(tx, table_id)?;

// Deserialize the return rows.
// The return type is expected to be an array of products.
let row_type = typespace.resolve(row_type);
let ret_type = AlgebraicType::array(row_type.ty().clone());
let seed = WithTypespace::new(typespace, &ret_type);
let rows = seed
.deserialize(bsatn::Deserializer::new(&mut &bytes[..]))
.map_err(|e| DatastoreError::from(ViewError::DeserializeReturn(e.to_string())))?;

// Insert new rows into the backing table
for product in rows
.into_array()
.map_err(|_| ViewError::SerializeRow)
.map_err(DatastoreError::from)?
.into_iter()
{
self.insert(
tx,
table_id,
&product
.into_product()
.map_err(|_| ViewError::SerializeRow)
.map_err(DatastoreError::from)?
.to_bsatn_vec()
.map_err(|_| ViewError::SerializeRow)
.map_err(DatastoreError::from)?,
)?;
self.write_view_rows(tx, table_id, rows, None)?;

Ok(())
}

fn write_view_rows(
&self,
tx: &mut MutTxId,
table_id: TableId,
rows: Vec<ProductValue>,
sender: Option<Identity>,
) -> Result<(), DBError> {
match sender {
Some(sender) => {
for product in rows {
let value = ProductValue::from_iter(std::iter::once(sender.into()).chain(product.elements));
self.insert(
tx,
table_id,
&value
.to_bsatn_vec()
.map_err(|_| ViewError::SerializeRow)
.map_err(DatastoreError::from)?,
)?;
}
}
None => {
for product in rows {
self.insert(
tx,
table_id,
&product
.to_bsatn_vec()
.map_err(|_| ViewError::SerializeRow)
.map_err(DatastoreError::from)?,
)?;
}
}
}

Ok(())
Expand Down Expand Up @@ -2431,6 +2400,7 @@ mod tests {
begin_tx, insert, make_snapshot, with_auto_commit, with_read_only, TestDB,
};
use anyhow::bail;
use bytes::Bytes;
use commitlog::payload::txdata;
use commitlog::Commitlog;
use durability::EmptyHistory;
Expand Down Expand Up @@ -2592,24 +2562,12 @@ mod tests {
Ok((view_id, table_id, module_def.clone(), view_def.clone()))
}

fn insert_view_row(
stdb: &TestDB,
view_id: ViewId,
table_id: TableId,
typespace: &Typespace,
row_type: AlgebraicTypeRef,
sender: Identity,
v: u8,
) -> ResultTest<()> {
let to_bsatn = |pv: &ProductValue| {
Bytes::from(bsatn::to_vec(&AlgebraicValue::Array([pv.clone()].into())).expect("bstan serialization failed"))
};

fn insert_view_row(stdb: &TestDB, view_id: ViewId, table_id: TableId, sender: Identity, v: u8) -> ResultTest<()> {
let row_pv = |v: u8| product![v];

let mut tx = begin_mut_tx(stdb);
tx.subscribe_view(view_id, ArgId::SENTINEL, sender)?;
stdb.materialize_view(&mut tx, table_id, sender, row_type, to_bsatn(&row_pv(v)), typespace)?;
stdb.materialize_view(&mut tx, table_id, sender, vec![row_pv(v)])?;
stdb.commit_tx(tx)?;

Ok(())
Expand All @@ -2633,13 +2591,11 @@ mod tests {
fn test_view_tables_are_ephemeral() -> ResultTest<()> {
let stdb = TestDB::durable()?;

let (view_id, table_id, module_def, view_def) = setup_view(&stdb)?;
let row_type = view_def.product_type_ref;
let typespace = module_def.typespace();
let (view_id, table_id, _, _) = setup_view(&stdb)?;

// Write some rows (reusing the same helper)
insert_view_row(&stdb, view_id, table_id, typespace, row_type, Identity::ONE, 10)?;
insert_view_row(&stdb, view_id, table_id, typespace, row_type, Identity::ZERO, 20)?;
insert_view_row(&stdb, view_id, table_id, Identity::ONE, 10)?;
insert_view_row(&stdb, view_id, table_id, Identity::ZERO, 20)?;

assert!(
!project_views(&stdb, table_id, Identity::ZERO).is_empty(),
Expand Down Expand Up @@ -2668,14 +2624,12 @@ mod tests {
fn test_views() -> ResultTest<()> {
let stdb = TestDB::durable()?;

let (view_id, table_id, module_def, view_def) = setup_view(&stdb)?;
let row_type = view_def.product_type_ref;
let typespace = module_def.typespace();
let (view_id, table_id, _, _) = setup_view(&stdb)?;

let sender1 = Identity::ONE;

// Sender 1 insert
insert_view_row(&stdb, view_id, table_id, typespace, row_type, sender1, 42)?;
insert_view_row(&stdb, view_id, table_id, sender1, 42)?;

assert_eq!(
project_views(&stdb, table_id, sender1)[0],
Expand All @@ -2686,7 +2640,7 @@ mod tests {
// Sender 2 insert
let sender2 = Identity::ZERO;
let before_sender2 = Instant::now();
insert_view_row(&stdb, view_id, table_id, typespace, row_type, sender2, 84)?;
insert_view_row(&stdb, view_id, table_id, sender2, 84)?;

assert_eq!(
project_views(&stdb, table_id, sender2)[0],
Expand All @@ -2712,7 +2666,7 @@ mod tests {
stdb.commit_tx(tx)?;

// Reinsert after restart
insert_view_row(&stdb, view_id, table_id, typespace, row_type, sender2, 91)?;
insert_view_row(&stdb, view_id, table_id, sender2, 91)?;
assert_eq!(
project_views(&stdb, table_id, sender2)[0],
product![91u8],
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/host/v8/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,7 @@ fn common_call<'scope, R, O, F>(
budget: FunctionBudget,
op: O,
call: F,
) -> ExecutionResult<Result<R, ExecutionError>>
) -> ExecutionResult<R, ExecutionError>
where
O: InstanceOp,
F: FnOnce(&mut PinScope<'scope, '_>, O) -> Result<R, ErrorOrException<ExceptionThrown>>,
Expand Down
8 changes: 5 additions & 3 deletions crates/core/src/host/v8/syscall/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use v8::{callback_scope, Context, FixedArray, Local, Module, PinScope};
use crate::host::v8::de::scratch_buf;
use crate::host::v8::error::{ErrorOrException, ExcResult, ExceptionThrown, Throwable, TypeError};
use crate::host::wasm_common::abi::parse_abi_version;
use crate::host::wasm_common::module_host_actor::{AnonymousViewOp, ProcedureOp, ReducerOp, ReducerResult, ViewOp};
use crate::host::wasm_common::module_host_actor::{
AnonymousViewOp, ProcedureOp, ReducerOp, ReducerResult, ViewOp, ViewReturnData,
};

mod hooks;
mod v1;
Expand Down Expand Up @@ -85,7 +87,7 @@ pub(super) fn call_call_view(
scope: &mut PinScope<'_, '_>,
hooks: &HookFunctions<'_>,
op: ViewOp<'_>,
) -> Result<Bytes, ErrorOrException<ExceptionThrown>> {
) -> Result<ViewReturnData, ErrorOrException<ExceptionThrown>> {
match hooks.abi {
AbiVersion::V1 => v1::call_call_view(scope, hooks, op),
}
Expand All @@ -98,7 +100,7 @@ pub(super) fn call_call_view_anon(
scope: &mut PinScope<'_, '_>,
hooks: &HookFunctions<'_>,
op: AnonymousViewOp<'_>,
) -> Result<Bytes, ErrorOrException<ExceptionThrown>> {
) -> Result<ViewReturnData, ErrorOrException<ExceptionThrown>> {
match hooks.abi {
AbiVersion::V1 => v1::call_call_view_anon(scope, hooks, op),
}
Expand Down
Loading