Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions crates/datastore/src/locking_tx_datastore/committed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ impl CommittedState {
pub(super) fn views_for_table_scan(&self, table_id: &TableId) -> impl Iterator<Item = &ViewCallInfo> {
self.read_sets.views_for_table_scan(table_id)
}

/// Returns the views that perform an precise index seek on given `row_ref` of `table_id`
pub fn views_for_index_seek<'a>(
&'a self,
table_id: &TableId,
row_ref: RowRef<'a>,
) -> impl Iterator<Item = &'a ViewCallInfo> {
self.read_sets.views_for_index_seek(table_id, row_ref)
}
}

impl MemoryUsage for CommittedState {
Expand Down
149 changes: 135 additions & 14 deletions crates/datastore/src/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub struct ViewCallInfo {
#[derive(Default)]
pub struct ViewReadSets {
tables: IntMap<TableId, TableReadSet>,
indexes: IntMap<TableId, IndexColReadSet>,
}

impl MemoryUsage for ViewReadSets {
Expand All @@ -102,7 +103,7 @@ impl ViewReadSets {
}

/// Record that a view performs a full scan of this table
fn insert_scan(&mut self, table_id: TableId, call: ViewCallInfo) {
fn insert_table_scan(&mut self, table_id: TableId, call: ViewCallInfo) {
self.tables.entry(table_id).or_default().insert_scan(call);
}

Expand All @@ -112,16 +113,78 @@ impl ViewReadSets {
readset.remove_view(view_id, sender);
!readset.is_empty()
});

// index cleanup
self.remove_view_from_indexes(view_id, sender);
}

/// Merge or union read sets together
pub fn merge(&mut self, readset: Self) {
for (table_id, rs) in readset.tables {
self.tables.entry(table_id).or_default().merge(rs);
}

self.merge_index_reads(readset.indexes);
}

/// Record that a view reads a specific index key
pub fn insert_index_scan(&mut self, table_id: TableId, cols: ColList, proj: AlgebraicValue, call: ViewCallInfo) {
let col_map = self.indexes.entry(table_id).or_default();
let key_map = col_map.entry(cols).or_default();
key_map.entry(proj).or_default().insert(call);
}

/// Returns the views that index seek on the given row pointer
pub fn views_for_index_seek<'a>(
&'a self,
table_id: &TableId,
row_ptr: RowRef<'a>,
) -> impl Iterator<Item = &'a ViewCallInfo> {
self.indexes.get(table_id).into_iter().flat_map(move |table_sets| {
table_sets.iter().flat_map(move |(cols, av_sets)| {
row_ptr
.project(cols)
.ok()
.and_then(|k| av_sets.get(&k))
.into_iter()
.flat_map(|views| views.iter())
})
})
}

// Remove all references to a view from the index read sets
fn remove_view_from_indexes(&mut self, view_id: ViewId, sender: Option<Identity>) {
self.indexes.retain(|_, col_map| {
col_map.retain(|_, key_map| {
key_map.retain(|_, views| {
views.retain(|call| {
!(call.view_id == view_id && sender.as_ref().is_none_or(|s| call.sender.as_ref() == Some(s)))
});
!views.is_empty()
});
!key_map.is_empty()
});
!col_map.is_empty()
});
}

/// Merge (union) another index reads into this one
fn merge_index_reads(&mut self, other: IntMap<TableId, IndexColReadSet>) {
for (table_id, other_col_map) in other {
let col_map = self.indexes.entry(table_id).or_default();
for (cols, other_key_map) in other_col_map {
let key_map = col_map.entry(cols).or_default();
for (key, other_views) in other_key_map {
key_map.entry(key).or_default().extend(other_views);
}
}
}
}
}

type IndexKeyReadSet = HashMap<AlgebraicValue, HashSet<ViewCallInfo>>;
type IndexColReadSet = HashMap<ColList, IndexKeyReadSet>;

/// A table-level read set for views
#[derive(Default)]
struct TableReadSet {
Expand Down Expand Up @@ -186,13 +249,13 @@ pub struct MutTxId {
pub metrics: ExecutionMetrics,
}

static_assert_size!(MutTxId, 432);
static_assert_size!(MutTxId, 464);

impl MutTxId {
/// Record that a view performs a table scan in this transaction's read set
pub fn record_table_scan(&mut self, op: &FuncCallType, table_id: TableId) {
if let FuncCallType::View(view) = op {
self.read_sets.insert_scan(table_id, view.clone());
self.read_sets.insert_table_scan(table_id, view.clone());
}
}

Expand All @@ -201,28 +264,86 @@ impl MutTxId {
&mut self,
op: &FuncCallType,
table_id: TableId,
_: IndexId,
_: Bound<AlgebraicValue>,
_: Bound<AlgebraicValue>,
index_id: IndexId,
lower: Bound<AlgebraicValue>,
upper: Bound<AlgebraicValue>,
) {
// TODO: Implement read set tracking for index scans
if let FuncCallType::View(view) = op {
self.read_sets.insert_scan(table_id, view.clone());
let FuncCallType::View(view) = op else {
return;
};

// Check for precise index seek
if let (Bound::Included(low_val), Bound::Included(up_val)) = (&lower, &upper) {
if low_val == up_val {
// Fetch index metadata
let Some((_, idx, _)) = self.get_table_and_index(index_id) else {
return;
};

let cols = idx.index().indexed_columns.clone();
self.read_sets
.insert_index_scan(table_id, cols, low_val.clone(), view.clone());
return;
}
}

// Everything else is treated as a table scan
self.read_sets.insert_table_scan(table_id, view.clone());
}

/// Returns the views whose read sets overlaps with this transaction's write set
pub fn view_for_update(&self) -> impl Iterator<Item = &ViewCallInfo> {
self.tx_state
pub fn view_for_update(&self) -> impl Iterator<Item = &ViewCallInfo> + '_ {
let mut res = self
.tx_state
.insert_tables
.keys()
.filter(|table_id| !self.tx_state.delete_tables.contains_key(table_id))
.chain(self.tx_state.delete_tables.keys())
.flat_map(|table_id| self.committed_state_write_lock.views_for_table_scan(table_id))
.collect::<HashSet<_>>()
.into_iter()
}
.collect::<HashSet<_>>();

// Include views that perform precise index seeks.
for (table_id, deleted_table) in &self.tx_state.delete_tables {
let (table, blob_store, _) = self
.committed_state_write_lock
.get_table_and_blob_store(*table_id)
.expect("table must exist in committed state for deleted table");

// Skip tables without indexes.
if table.indexes.is_empty() {
continue;
}

for ptr in deleted_table.iter() {
let Some(row_ref) = table.get_row_ref(blob_store, ptr) else {
continue;
};
for view_call in self.committed_state_write_lock.views_for_index_seek(table_id, row_ref) {
res.insert(view_call);
}
}
}

for (table_id, inserted_table) in &self.tx_state.insert_tables {
let (table, blob_store, _) = self
.committed_state_write_lock
.get_table_and_blob_store(*table_id)
.expect("table must exist in committed state for inserted table");

// Skip tables without indexes.
if table.indexes.is_empty() {
continue;
}

for row_ref in inserted_table.scan_rows(blob_store) {
for view_call in self.committed_state_write_lock.views_for_index_seek(table_id, row_ref) {
res.insert(view_call);
}
}
}

res.into_iter()
}
/// Removes keys for `view_id` from the committed read set.
/// Used for dropping views in an auto-migration.
pub fn drop_view_from_committed_read_set(&mut self, view_id: ViewId) {
Expand Down
57 changes: 49 additions & 8 deletions smoketests/tests/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,21 +190,31 @@ def test_http_sql(self):
# since it relies on log capturing starting from an empty log.
def test_a_view_materialization(self):
"""This test asserts whether views are materialized correctly"""
self.insert_initial_data()

player_called_log = "player view called"

# call view, with no data
self.assertSql("SELECT * FROM player", """\
id | level
----+-------
""")
logs = self.logs(100)
self.assertEqual(logs.count(player_called_log), 1)

self.assertNotIn(player_called_log, self.logs(100))
self.insert_initial_data()

# Should invoke view as data is inserted
self.call_player_view()
#On first call, the view is evaluated
self.assertIn(player_called_log, self.logs(100))

logs = self.logs(100)
self.assertEqual(logs.count(player_called_log), 2)

self.call_player_view()
#On second call, the view is cached
# the view is cached
logs = self.logs(100)
self.assertEqual(logs.count(player_called_log), 1)
self.assertEqual(logs.count(player_called_log), 2)

# insert to cause cache invalidation
# inserting new row should not trigger view invocation due to readsets
self.spacetime(
"sql",
self.database_identity,
Expand All @@ -214,10 +224,41 @@ def test_a_view_materialization(self):
)

self.call_player_view()
#On third call, after invalidation, the view is evaluated again
logs = self.logs(100)
self.assertEqual(logs.count(player_called_log), 2)

# Updating the row that the view depends on should trigger re-evaluation
self.spacetime(
"sql",
self.database_identity,
"""
UPDATE player_state SET level = 9 WHERE id = 42;
""",
)

# Call the view again
self.spacetime(
"sql",
self.database_identity,
"""
Select * FROM player_state WHERE id = 42;
""",
)

# On fourth call, after updating the dependent row, the view is re-evaluated
logs = self.logs(100)
self.assertEqual(logs.count(player_called_log), 3)


# Updating it back for other tests to work
self.spacetime(
"sql",
self.database_identity,
"""
UPDATE player_state SET level = 7 WHERE id = 42;
""",
)

def test_query_anonymous_view_reducer(self):
"""Tests that anonymous views are updated for reducers"""
self.call("add_player_level", 0, 1)
Expand Down
Loading