@@ -5,7 +5,6 @@ use crate::subscription::ExecutionCounters;
55use crate :: util:: { asyncify, spawn_rayon} ;
66use crate :: worker_metrics:: WORKER_METRICS ;
77use anyhow:: { anyhow, Context } ;
8- use bytes:: Bytes ;
98use enum_map:: EnumMap ;
109use fs2:: FileExt ;
1110use spacetimedb_commitlog:: repo:: OnNewSegmentFn ;
@@ -38,17 +37,14 @@ use spacetimedb_durability as durability;
3837use spacetimedb_lib:: bsatn:: ToBsatn ;
3938use spacetimedb_lib:: db:: auth:: StAccess ;
4039use spacetimedb_lib:: db:: raw_def:: v9:: { btree, RawModuleDefV9Builder , RawSql } ;
41- use spacetimedb_lib:: de:: DeserializeSeed ;
4240use spacetimedb_lib:: st_var:: StVarValue ;
41+ use spacetimedb_lib:: ConnectionId ;
4342use spacetimedb_lib:: Identity ;
44- use spacetimedb_lib:: { bsatn, ConnectionId } ;
4543use spacetimedb_paths:: server:: { CommitLogDir , ReplicaDir , SnapshotsPath } ;
4644use spacetimedb_primitives:: * ;
4745use spacetimedb_sats:: algebraic_type:: fmt:: fmt_algebraic_type;
4846use spacetimedb_sats:: memory_usage:: MemoryUsage ;
49- use spacetimedb_sats:: {
50- AlgebraicType , AlgebraicTypeRef , AlgebraicValue , ProductType , ProductValue , Typespace , WithTypespace ,
51- } ;
47+ use spacetimedb_sats:: { AlgebraicType , AlgebraicValue , ProductType , ProductValue } ;
5248use spacetimedb_schema:: def:: { ModuleDef , TableDef , ViewDef } ;
5349use spacetimedb_schema:: schema:: {
5450 ColumnSchema , IndexSchema , RowLevelSecuritySchema , Schema , SequenceSchema , TableSchema ,
@@ -1593,29 +1589,24 @@ impl RelationalDB {
15931589 } )
15941590 }
15951591
1596- /// Write `bytes ` into a (sender) view's backing table.
1592+ /// Write `rows ` into a (sender) view's backing table.
15971593 ///
15981594 /// # Process
15991595 /// 1. Delete all rows for `sender` from the view's backing table
1600- /// 2. Deserialize `bytes`
1601- /// 3. Insert the new rows into the backing table
1596+ /// 2. Insert the new rows into the backing table
16021597 ///
16031598 /// # Arguments
16041599 /// * `tx` - Mutable transaction context
16051600 /// * `table_id` - The id of the view's backing table
16061601 /// * `sender` - The calling identity of the view being updated
1607- /// * `row_type` - Expected return type of the view
1608- /// * `bytes` - An array of product values (bsatn encoded)
1609- /// * `typespace` - Type information for deserialization
1602+ /// * `rows` - Product values to insert
16101603 #[ allow( clippy:: too_many_arguments) ]
16111604 pub fn materialize_view (
16121605 & self ,
16131606 tx : & mut MutTxId ,
16141607 table_id : TableId ,
16151608 sender : Identity ,
1616- row_type : AlgebraicTypeRef ,
1617- bytes : Bytes ,
1618- typespace : & Typespace ,
1609+ rows : Vec < ProductValue > ,
16191610 ) -> Result < ( ) , DBError > {
16201611 // Delete rows for `sender` from the backing table
16211612 let rows_to_delete = self
@@ -1624,91 +1615,69 @@ impl RelationalDB {
16241615 . collect :: < Vec < _ > > ( ) ;
16251616 self . delete ( tx, table_id, rows_to_delete) ;
16261617
1627- // Deserialize the return rows.
1628- // The return type is expected to be an array of products.
1629- let row_type = typespace. resolve ( row_type) ;
1630- let ret_type = AlgebraicType :: array ( row_type. ty ( ) . clone ( ) ) ;
1631- let seed = WithTypespace :: new ( typespace, & ret_type) ;
1632- let rows = seed
1633- . deserialize ( bsatn:: Deserializer :: new ( & mut & bytes[ ..] ) )
1634- . map_err ( |e| DatastoreError :: from ( ViewError :: DeserializeReturn ( e. to_string ( ) ) ) ) ?;
1635-
1636- // Insert new rows into the backing table
1637- for product in rows
1638- . into_array ( )
1639- . map_err ( |_| ViewError :: SerializeRow )
1640- . map_err ( DatastoreError :: from) ?
1641- . into_iter ( )
1642- {
1643- let product = product
1644- . into_product ( )
1645- . map_err ( |_| ViewError :: SerializeRow )
1646- . map_err ( DatastoreError :: from) ?;
1647- self . insert (
1648- tx,
1649- table_id,
1650- & ProductValue :: from_iter ( std:: iter:: once ( sender. into ( ) ) . chain ( product. elements ) )
1651- . to_bsatn_vec ( )
1652- . map_err ( |_| ViewError :: SerializeRow )
1653- . map_err ( DatastoreError :: from) ?,
1654- ) ?;
1655- }
1618+ self . write_view_rows ( tx, table_id, rows, Some ( sender) ) ?;
16561619
16571620 Ok ( ( ) )
16581621 }
16591622
1660- /// Write `bytes ` into an anonymous view's backing table.
1623+ /// Write `rows ` into an anonymous view's backing table.
16611624 ///
16621625 /// # Process
16631626 /// 1. Clear the view's backing table
1664- /// 2. Deserialize `bytes`
1665- /// 3. Insert the new rows into the backing table
1627+ /// 2. Insert the new rows into the backing table
16661628 ///
16671629 /// # Arguments
16681630 /// * `tx` - Mutable transaction context
16691631 /// * `table_id` - The id of the view's backing table
1670- /// * `row_type` - Expected return type of the view
1671- /// * `bytes` - An array of product values (bsatn encoded)
1672- /// * `typespace` - Type information for deserialization
1632+ /// * `rows` - Product values to insert
16731633 #[ allow( clippy:: too_many_arguments) ]
16741634 pub fn materialize_anonymous_view (
16751635 & self ,
16761636 tx : & mut MutTxId ,
16771637 table_id : TableId ,
1678- row_type : AlgebraicTypeRef ,
1679- bytes : Bytes ,
1680- typespace : & Typespace ,
1638+ rows : Vec < ProductValue > ,
16811639 ) -> Result < ( ) , DBError > {
16821640 // Clear entire backing table
16831641 self . clear_table ( tx, table_id) ?;
16841642
1685- // Deserialize the return rows.
1686- // The return type is expected to be an array of products.
1687- let row_type = typespace. resolve ( row_type) ;
1688- let ret_type = AlgebraicType :: array ( row_type. ty ( ) . clone ( ) ) ;
1689- let seed = WithTypespace :: new ( typespace, & ret_type) ;
1690- let rows = seed
1691- . deserialize ( bsatn:: Deserializer :: new ( & mut & bytes[ ..] ) )
1692- . map_err ( |e| DatastoreError :: from ( ViewError :: DeserializeReturn ( e. to_string ( ) ) ) ) ?;
1693-
1694- // Insert new rows into the backing table
1695- for product in rows
1696- . into_array ( )
1697- . map_err ( |_| ViewError :: SerializeRow )
1698- . map_err ( DatastoreError :: from) ?
1699- . into_iter ( )
1700- {
1701- self . insert (
1702- tx,
1703- table_id,
1704- & product
1705- . into_product ( )
1706- . map_err ( |_| ViewError :: SerializeRow )
1707- . map_err ( DatastoreError :: from) ?
1708- . to_bsatn_vec ( )
1709- . map_err ( |_| ViewError :: SerializeRow )
1710- . map_err ( DatastoreError :: from) ?,
1711- ) ?;
1643+ self . write_view_rows ( tx, table_id, rows, None ) ?;
1644+
1645+ Ok ( ( ) )
1646+ }
1647+
1648+ fn write_view_rows (
1649+ & self ,
1650+ tx : & mut MutTxId ,
1651+ table_id : TableId ,
1652+ rows : Vec < ProductValue > ,
1653+ sender : Option < Identity > ,
1654+ ) -> Result < ( ) , DBError > {
1655+ match sender {
1656+ Some ( sender) => {
1657+ for product in rows {
1658+ let value = ProductValue :: from_iter ( std:: iter:: once ( sender. into ( ) ) . chain ( product. elements ) ) ;
1659+ self . insert (
1660+ tx,
1661+ table_id,
1662+ & value
1663+ . to_bsatn_vec ( )
1664+ . map_err ( |_| ViewError :: SerializeRow )
1665+ . map_err ( DatastoreError :: from) ?,
1666+ ) ?;
1667+ }
1668+ }
1669+ None => {
1670+ for product in rows {
1671+ self . insert (
1672+ tx,
1673+ table_id,
1674+ & product
1675+ . to_bsatn_vec ( )
1676+ . map_err ( |_| ViewError :: SerializeRow )
1677+ . map_err ( DatastoreError :: from) ?,
1678+ ) ?;
1679+ }
1680+ }
17121681 }
17131682
17141683 Ok ( ( ) )
@@ -2431,6 +2400,7 @@ mod tests {
24312400 begin_tx, insert, make_snapshot, with_auto_commit, with_read_only, TestDB ,
24322401 } ;
24332402 use anyhow:: bail;
2403+ use bytes:: Bytes ;
24342404 use commitlog:: payload:: txdata;
24352405 use commitlog:: Commitlog ;
24362406 use durability:: EmptyHistory ;
@@ -2592,24 +2562,12 @@ mod tests {
25922562 Ok ( ( view_id, table_id, module_def. clone ( ) , view_def. clone ( ) ) )
25932563 }
25942564
2595- fn insert_view_row (
2596- stdb : & TestDB ,
2597- view_id : ViewId ,
2598- table_id : TableId ,
2599- typespace : & Typespace ,
2600- row_type : AlgebraicTypeRef ,
2601- sender : Identity ,
2602- v : u8 ,
2603- ) -> ResultTest < ( ) > {
2604- let to_bsatn = |pv : & ProductValue | {
2605- Bytes :: from ( bsatn:: to_vec ( & AlgebraicValue :: Array ( [ pv. clone ( ) ] . into ( ) ) ) . expect ( "bstan serialization failed" ) )
2606- } ;
2607-
2565+ fn insert_view_row ( stdb : & TestDB , view_id : ViewId , table_id : TableId , sender : Identity , v : u8 ) -> ResultTest < ( ) > {
26082566 let row_pv = |v : u8 | product ! [ v] ;
26092567
26102568 let mut tx = begin_mut_tx ( stdb) ;
26112569 tx. subscribe_view ( view_id, ArgId :: SENTINEL , sender) ?;
2612- stdb. materialize_view ( & mut tx, table_id, sender, row_type , to_bsatn ( & row_pv ( v) ) , typespace ) ?;
2570+ stdb. materialize_view ( & mut tx, table_id, sender, vec ! [ row_pv( v) ] ) ?;
26132571 stdb. commit_tx ( tx) ?;
26142572
26152573 Ok ( ( ) )
@@ -2633,13 +2591,11 @@ mod tests {
26332591 fn test_view_tables_are_ephemeral ( ) -> ResultTest < ( ) > {
26342592 let stdb = TestDB :: durable ( ) ?;
26352593
2636- let ( view_id, table_id, module_def, view_def) = setup_view ( & stdb) ?;
2637- let row_type = view_def. product_type_ref ;
2638- let typespace = module_def. typespace ( ) ;
2594+ let ( view_id, table_id, _, _) = setup_view ( & stdb) ?;
26392595
26402596 // Write some rows (reusing the same helper)
2641- insert_view_row ( & stdb, view_id, table_id, typespace , row_type , Identity :: ONE , 10 ) ?;
2642- insert_view_row ( & stdb, view_id, table_id, typespace , row_type , Identity :: ZERO , 20 ) ?;
2597+ insert_view_row ( & stdb, view_id, table_id, Identity :: ONE , 10 ) ?;
2598+ insert_view_row ( & stdb, view_id, table_id, Identity :: ZERO , 20 ) ?;
26432599
26442600 assert ! (
26452601 !project_views( & stdb, table_id, Identity :: ZERO ) . is_empty( ) ,
@@ -2668,14 +2624,12 @@ mod tests {
26682624 fn test_views ( ) -> ResultTest < ( ) > {
26692625 let stdb = TestDB :: durable ( ) ?;
26702626
2671- let ( view_id, table_id, module_def, view_def) = setup_view ( & stdb) ?;
2672- let row_type = view_def. product_type_ref ;
2673- let typespace = module_def. typespace ( ) ;
2627+ let ( view_id, table_id, _, _) = setup_view ( & stdb) ?;
26742628
26752629 let sender1 = Identity :: ONE ;
26762630
26772631 // Sender 1 insert
2678- insert_view_row ( & stdb, view_id, table_id, typespace , row_type , sender1, 42 ) ?;
2632+ insert_view_row ( & stdb, view_id, table_id, sender1, 42 ) ?;
26792633
26802634 assert_eq ! (
26812635 project_views( & stdb, table_id, sender1) [ 0 ] ,
@@ -2686,7 +2640,7 @@ mod tests {
26862640 // Sender 2 insert
26872641 let sender2 = Identity :: ZERO ;
26882642 let before_sender2 = Instant :: now ( ) ;
2689- insert_view_row ( & stdb, view_id, table_id, typespace , row_type , sender2, 84 ) ?;
2643+ insert_view_row ( & stdb, view_id, table_id, sender2, 84 ) ?;
26902644
26912645 assert_eq ! (
26922646 project_views( & stdb, table_id, sender2) [ 0 ] ,
@@ -2712,7 +2666,7 @@ mod tests {
27122666 stdb. commit_tx ( tx) ?;
27132667
27142668 // Reinsert after restart
2715- insert_view_row ( & stdb, view_id, table_id, typespace , row_type , sender2, 91 ) ?;
2669+ insert_view_row ( & stdb, view_id, table_id, sender2, 91 ) ?;
27162670 assert_eq ! (
27172671 project_views( & stdb, table_id, sender2) [ 0 ] ,
27182672 product![ 91u8 ] ,
0 commit comments