From 36b2651dcd1d0ffbb0d279c0f5f70bcab557a22b Mon Sep 17 00:00:00 2001 From: zaidoon Date: Sat, 15 Nov 2025 03:06:25 -0500 Subject: [PATCH 01/17] Expose Ingestion API with Inversion of Control --- src/abstract.rs | 28 +--- src/any_tree.rs | 62 ++++++- src/blob_tree/ingest.rs | 190 ++++++++++++++++++++++ src/blob_tree/mod.rs | 114 +------------ src/error.rs | 3 + src/lib.rs | 5 +- src/tree/ingest.rs | 81 +++++++-- src/tree/inner.rs | 2 +- src/tree/mod.rs | 102 ++++++------ tests/ingestion_api.rs | 298 ++++++++++++++++++++++++++++++++++ tests/ingestion_invariants.rs | 153 +++++++++++++++++ tests/tree_bulk_ingest.rs | 102 ++++++------ 12 files changed, 887 insertions(+), 253 deletions(-) create mode 100644 src/blob_tree/ingest.rs create mode 100644 tests/ingestion_api.rs create mode 100644 tests/ingestion_invariants.rs diff --git a/src/abstract.rs b/src/abstract.rs index 6b1bbf6e..4a45cd00 100644 --- a/src/abstract.rs +++ b/src/abstract.rs @@ -3,9 +3,10 @@ // (found in the LICENSE-* files in the repository) use crate::{ - iter_guard::IterGuardImpl, table::Table, version::Version, vlog::BlobFile, AnyTree, BlobTree, - Config, Guard, InternalValue, KvPair, Memtable, SeqNo, SequenceNumberCounter, TableId, Tree, - UserKey, UserValue, + blob_tree::FragmentationMap, compaction::CompactionStrategy, config::TreeType, + iter_guard::IterGuardImpl, table::Table, tree::inner::MemtableId, version::Version, + vlog::BlobFile, AnyTree, BlobTree, Config, Guard, InternalValue, KvPair, Memtable, SeqNo, + TableId, Tree, TreeId, UserKey, UserValue, }; use std::{ ops::RangeBounds, @@ -137,27 +138,6 @@ pub trait AbstractTree { index: Option>, ) -> Box + Send + 'static>; - /// Ingests a sorted stream of key-value pairs into the tree. - /// - /// Can only be called on a new fresh, empty tree. - /// - /// # Errors - /// - /// Will return `Err` if an IO error occurs. - /// - /// # Panics - /// - /// Panics if the tree is **not** initially empty. - /// - /// Will panic if the input iterator is not sorted in ascending order. - #[doc(hidden)] - fn ingest( - &self, - iter: impl Iterator, - seqno_generator: &SequenceNumberCounter, - visible_seqno: &SequenceNumberCounter, - ) -> crate::Result<()>; - /// Returns the approximate number of tombstones in the tree. fn tombstone_count(&self) -> u64; diff --git a/src/any_tree.rs b/src/any_tree.rs index 29cfbaf6..e39a35ea 100644 --- a/src/any_tree.rs +++ b/src/any_tree.rs @@ -2,7 +2,10 @@ // This source code is licensed under both the Apache 2.0 and MIT License // (found in the LICENSE-* files in the repository) -use crate::{BlobTree, Tree}; +use crate::{ + blob_tree::ingest::BlobIngestion, tree::ingest::Ingestion, BlobTree, SeqNo, Tree, UserKey, + UserValue, +}; use enum_dispatch::enum_dispatch; /// May be a standard [`Tree`] or a [`BlobTree`] @@ -15,3 +18,60 @@ pub enum AnyTree { /// Key-value separated LSM-tree, see [`BlobTree`] Blob(BlobTree), } + +/// Unified ingestion builder over `AnyTree` +// Keep zero allocations and direct dispatch; boxing introduces heap indirection and `dyn` adds virtual dispatch. +// Ingestion calls use `&mut self` in tight loops; the active variant is stable and branch prediction makes the match cheap. +// Allowing this lint preserves hot-path performance at the cost of a larger enum size. +#[allow(clippy::large_enum_variant)] +pub enum AnyIngestion<'a> { + /// Ingestion for a standard LSM-tree + Standard(Ingestion<'a>), + /// Ingestion for a [`BlobTree`] with KV separation + Blob(BlobIngestion<'a>), +} + +impl<'a> AnyIngestion<'a> { + #[must_use] + /// Sets the sequence number used for subsequent writes + pub fn with_seqno(self, seqno: SeqNo) -> Self { + match self { + AnyIngestion::Standard(i) => AnyIngestion::Standard(i.with_seqno(seqno)), + AnyIngestion::Blob(b) => AnyIngestion::Blob(b.with_seqno(seqno)), + } + } + + /// Writes a key-value pair + pub fn write(&mut self, key: UserKey, value: UserValue) -> crate::Result<()> { + match self { + AnyIngestion::Standard(i) => i.write(key, value), + AnyIngestion::Blob(b) => b.write(key, value), + } + } + + /// Writes a tombstone for a key + pub fn write_tombstone(&mut self, key: UserKey) -> crate::Result<()> { + match self { + AnyIngestion::Standard(i) => i.write_tombstone(key), + AnyIngestion::Blob(b) => b.write_tombstone(key), + } + } + + /// Finalizes ingestion and registers created tables (and blob files if present) + pub fn finish(self) -> crate::Result<()> { + match self { + AnyIngestion::Standard(i) => i.finish(), + AnyIngestion::Blob(b) => b.finish(), + } + } +} + +impl AnyTree { + /// Starts an ingestion for any tree type (standard or blob) + pub fn ingestion(&self) -> crate::Result> { + match self { + AnyTree::Standard(t) => Ok(AnyIngestion::Standard(Ingestion::new(t)?)), + AnyTree::Blob(b) => Ok(AnyIngestion::Blob(BlobIngestion::new(b)?)), + } + } +} diff --git a/src/blob_tree/ingest.rs b/src/blob_tree/ingest.rs new file mode 100644 index 00000000..c0b457a3 --- /dev/null +++ b/src/blob_tree/ingest.rs @@ -0,0 +1,190 @@ +use crate::{ + blob_tree::handle::BlobIndirection, + file::BLOBS_FOLDER, + table::Table, + tree::ingest::Ingestion as TableIngestion, + vlog::{BlobFileWriter, ValueHandle}, + SeqNo, UserKey, UserValue, +}; + +/// Bulk ingestion for BlobTree +/// +/// Items NEED to be added in ascending key order. +/// +/// Uses table ingestion for the index and a blob file writer for large +/// values so both streams advance together. +pub struct BlobIngestion<'a> { + tree: &'a crate::BlobTree, + pub(crate) table: TableIngestion<'a>, + pub(crate) blob: BlobFileWriter, + seqno: SeqNo, + separation_threshold: u32, + last_key: Option, +} + +impl<'a> BlobIngestion<'a> { + /// Creates a new ingestion. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn new(tree: &'a crate::BlobTree) -> crate::Result { + let kv = tree + .index + .config + .kv_separation_opts + .as_ref() + .expect("kv separation options should exist"); + + let blob_file_size = kv.file_target_size; + + let table = TableIngestion::new(&tree.index)?; + let blob = BlobFileWriter::new( + tree.index.0.blob_file_id_counter.clone(), + blob_file_size, + tree.index.config.path.join(BLOBS_FOLDER), + )? + .use_compression(kv.compression); + + let separation_threshold = kv.separation_threshold; + + Ok(Self { + tree, + table, + blob, + seqno: 0, + separation_threshold, + last_key: None, + }) + } + + /// Sets the ingestion seqno. + #[must_use] + pub fn with_seqno(mut self, seqno: SeqNo) -> Self { + self.seqno = seqno; + self.table = self.table.with_seqno(seqno); + self + } + + /// Writes a key-value pair. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn write(&mut self, key: UserKey, value: UserValue) -> crate::Result<()> { + // Check order before any blob I/O to avoid partial writes on failure + if let Some(prev) = &self.last_key { + assert!( + key > *prev, + "next key in ingestion must be greater than last key" + ); + } + + #[allow(clippy::cast_possible_truncation)] + let value_size = value.len() as u32; + + if value_size >= self.separation_threshold { + let offset = self.blob.offset(); + let blob_file_id = self.blob.blob_file_id(); + let on_disk_size = self.blob.write(&key, self.seqno, &value)?; + + let indirection = BlobIndirection { + vhandle: ValueHandle { + blob_file_id, + offset, + on_disk_size, + }, + size: value_size, + }; + + let cloned_key = key.clone(); + let res = self.table.write_indirection(key, indirection); + if res.is_ok() { + self.last_key = Some(cloned_key); + } + res + } else { + let cloned_key = key.clone(); + let res = self.table.write(key, value); + if res.is_ok() { + self.last_key = Some(cloned_key); + } + res + } + } + + /// Writes a tombstone for a key. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn write_tombstone(&mut self, key: UserKey) -> crate::Result<()> { + if let Some(prev) = &self.last_key { + assert!( + key > *prev, + "next key in ingestion must be greater than last key" + ); + } + + let cloned_key = key.clone(); + let res = self.table.write_tombstone(key); + if res.is_ok() { + self.last_key = Some(cloned_key); + } + res + } + + /// Finishes the ingestion. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn finish(self) -> crate::Result<()> { + use crate::AbstractTree; + + // Capture required handles before consuming fields during finalization + let index = self.index().clone(); + let tree = self.tree.clone(); + + // Finalize both value log and index writer so the index sees a + // consistent set of blob files. + let blob_files = self.blob.finish()?; + let results = self.table.writer.finish()?; + + let pin_filter = index.config.filter_block_pinning_policy.get(0); + let pin_index = index.config.index_block_pinning_policy.get(0); + + let created_tables = results + .into_iter() + .map(|(table_id, checksum)| -> crate::Result { + Table::recover( + index + .config + .path + .join(crate::file::TABLES_FOLDER) + .join(table_id.to_string()), + checksum, + index.id, + index.config.cache.clone(), + index.config.descriptor_table.clone(), + pin_filter, + pin_index, + #[cfg(feature = "metrics")] + index.metrics.clone(), + ) + }) + .collect::>>()?; + + // Blob ingestion only appends new tables and blob files; sealed + // memtables remain unchanged and GC watermark stays at its + // neutral value for this operation. + tree.register_tables(&created_tables, Some(&blob_files), None, &[], 0)?; + + Ok(()) + } + + #[inline] + fn index(&self) -> &crate::Tree { + &self.tree.index + } +} diff --git a/src/blob_tree/mod.rs b/src/blob_tree/mod.rs index 1d04aa12..bc036147 100644 --- a/src/blob_tree/mod.rs +++ b/src/blob_tree/mod.rs @@ -4,6 +4,7 @@ mod gc; pub mod handle; +pub mod ingest; #[doc(hidden)] pub use gc::{FragmentationEntry, FragmentationMap}; @@ -264,119 +265,6 @@ impl AbstractTree for BlobTree { self.index.drop_range(range) } - fn ingest( - &self, - iter: impl Iterator, - seqno_generator: &SequenceNumberCounter, - visible_seqno: &SequenceNumberCounter, - ) -> crate::Result<()> { - use crate::tree::ingest::Ingestion; - use std::time::Instant; - - let seqno = seqno_generator.next(); - - let blob_file_size = self - .index - .config - .kv_separation_opts - .as_ref() - .expect("kv separation options should exist") - .file_target_size; - - let mut table_writer = Ingestion::new(&self.index)?.with_seqno(seqno); - let mut blob_writer = BlobFileWriter::new( - self.index.0.blob_file_id_counter.clone(), - blob_file_size, - self.index.config.path.join(BLOBS_FOLDER), - )? - .use_compression( - self.index - .config - .kv_separation_opts - .as_ref() - .expect("blob options should exist") - .compression, - ); - - let start = Instant::now(); - let mut count = 0; - let mut last_key = None; - - let separation_threshold = self - .index - .config - .kv_separation_opts - .as_ref() - .expect("kv separation options should exist") - .separation_threshold; - - for (key, value) in iter { - if let Some(last_key) = &last_key { - assert!( - key > last_key, - "next key in bulk ingest was not greater than last key", - ); - } - last_key = Some(key.clone()); - - #[expect(clippy::cast_possible_truncation, reason = "values are 32-bit max")] - let value_size = value.len() as u32; - - if value_size >= separation_threshold { - let offset = blob_writer.offset(); - let blob_file_id = blob_writer.blob_file_id(); - let on_disk_size = blob_writer.write(&key, seqno, &value)?; - - let indirection = BlobIndirection { - vhandle: ValueHandle { - blob_file_id, - offset, - on_disk_size, - }, - size: value_size, - }; - - table_writer.write_indirection(key, indirection)?; - } else { - table_writer.write(key, value)?; - } - - count += 1; - } - - let blob_files = blob_writer.finish()?; - let results = table_writer.writer.finish()?; - - let created_tables = results - .into_iter() - .map(|(table_id, checksum)| -> crate::Result
{ - Table::recover( - self.index - .config - .path - .join(crate::file::TABLES_FOLDER) - .join(table_id.to_string()), - checksum, - self.index.id, - self.index.config.cache.clone(), - self.index.config.descriptor_table.clone(), - false, - false, - #[cfg(feature = "metrics")] - self.index.metrics.clone(), - ) - }) - .collect::>>()?; - - self.register_tables(&created_tables, Some(&blob_files), None, &[], 0)?; - - visible_seqno.fetch_max(seqno + 1); - - log::info!("Ingested {count} items in {:?}", start.elapsed()); - - Ok(()) - } - fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> { self.index.major_compact(target_size, seqno_threshold) } diff --git a/src/error.rs b/src/error.rs index 530da2a5..037928ed 100644 --- a/src/error.rs +++ b/src/error.rs @@ -20,6 +20,9 @@ pub enum Error { /// Some required files could not be recovered from disk Unrecoverable, + /// Ingestion could not start because required preconditions were not met + IngestionPreconditionFailed, + /// Checksum mismatch ChecksumMismatch { /// Checksum of loaded block diff --git a/src/lib.rs b/src/lib.rs index 8041a7fa..844cf69f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -169,14 +169,14 @@ pub use { merge::BoxedIterator, slice::Builder, table::{GlobalTableId, Table, TableId}, - tree::ingest::Ingestion, tree::inner::TreeId, tree::Guard as StandardGuard, value::InternalValue, }; pub use { - any_tree::AnyTree, + any_tree::{AnyIngestion, AnyTree}, + blob_tree::ingest::BlobIngestion, blob_tree::BlobTree, cache::Cache, compression::CompressionType, @@ -189,6 +189,7 @@ pub use { r#abstract::AbstractTree, seqno::SequenceNumberCounter, slice::Slice, + tree::ingest::Ingestion, tree::Tree, value::SeqNo, value_type::ValueType, diff --git a/src/tree/ingest.rs b/src/tree/ingest.rs index ee3d09ac..6f87c895 100644 --- a/src/tree/ingest.rs +++ b/src/tree/ingest.rs @@ -4,21 +4,25 @@ use super::Tree; use crate::{ - compaction::MoveDown, config::FilterPolicyEntry, table::multi_writer::MultiWriter, - AbstractTree, BlobIndirection, SeqNo, UserKey, UserValue, + config::FilterPolicyEntry, table::multi_writer::MultiWriter, AbstractTree, BlobIndirection, + SeqNo, UserKey, UserValue, }; -use std::{path::PathBuf, sync::Arc}; +use std::path::PathBuf; pub const INITIAL_CANONICAL_LEVEL: usize = 1; /// Bulk ingestion /// /// Items NEED to be added in ascending key order. +/// +/// Ingested data bypasses memtables and is written directly into new tables, +/// using the same table writer configuration that is used for flush and compaction. pub struct Ingestion<'a> { folder: PathBuf, tree: &'a Tree, pub(crate) writer: MultiWriter, seqno: SeqNo, + last_key: Option, } impl<'a> Ingestion<'a> { @@ -28,6 +32,12 @@ impl<'a> Ingestion<'a> { /// /// Will return `Err` if an IO error occurs. pub fn new(tree: &'a Tree) -> crate::Result { + // Use the shared flush helper so ingestion participates in the same + // path as normal writes: any dirty memtable content is moved into + // tables before building new tables from the ingestion stream. + // This keeps the lookup path ordered as active > sealed > tables. + tree.flush_active_memtable(SeqNo::MAX)?; + let folder = tree.config.path.join(crate::file::TABLES_FOLDER); log::debug!("Ingesting into tables in {}", folder.display()); @@ -100,6 +110,7 @@ impl<'a> Ingestion<'a> { tree, writer, seqno: 0, + last_key: None, }) } @@ -122,6 +133,14 @@ impl<'a> Ingestion<'a> { ) -> crate::Result<()> { use crate::coding::Encode; + if let Some(prev) = &self.last_key { + assert!( + key > *prev, + "next key in ingestion must be greater than last key" + ); + } + + let cloned_key = key.clone(); self.writer.write(crate::InternalValue::from_components( key, indirection.encode_into_vec(), @@ -131,6 +150,8 @@ impl<'a> Ingestion<'a> { self.writer.register_blob(indirection); + // Remember the last user key to validate the next call's ordering + self.last_key = Some(cloned_key); Ok(()) } @@ -140,12 +161,24 @@ impl<'a> Ingestion<'a> { /// /// Will return `Err` if an IO error occurs. pub fn write(&mut self, key: UserKey, value: UserValue) -> crate::Result<()> { - self.writer.write(crate::InternalValue::from_components( + if let Some(prev) = &self.last_key { + assert!( + key > *prev, + "next key in ingestion must be greater than last key" + ); + } + + let cloned_key = key.clone(); + let res = self.writer.write(crate::InternalValue::from_components( key, value, self.seqno, crate::ValueType::Value, - )) + )); + if res.is_ok() { + self.last_key = Some(cloned_key); + } + res } /// Writes a key-value pair. @@ -153,14 +186,25 @@ impl<'a> Ingestion<'a> { /// # Errors /// /// Will return `Err` if an IO error occurs. - #[doc(hidden)] pub fn write_tombstone(&mut self, key: UserKey) -> crate::Result<()> { - self.writer.write(crate::InternalValue::from_components( + if let Some(prev) = &self.last_key { + assert!( + key > *prev, + "next key in ingestion must be greater than last key" + ); + } + + let cloned_key = key.clone(); + let res = self.writer.write(crate::InternalValue::from_components( key, crate::UserValue::empty(), self.seqno, crate::ValueType::Tombstone, - )) + )); + if res.is_ok() { + self.last_key = Some(cloned_key); + } + res } /// Finishes the ingestion. @@ -175,6 +219,20 @@ impl<'a> Ingestion<'a> { log::info!("Finished ingestion writer"); + let pin_filter = self + .tree + .config + .filter_block_pinning_policy + .get(INITIAL_CANONICAL_LEVEL); + + let pin_index = self + .tree + .config + .index_block_pinning_policy + .get(INITIAL_CANONICAL_LEVEL); + + // Turn the writer output into fully recovered tables that can be + // registered as a fresh L0 run. let created_tables = results .into_iter() .map(|(table_id, checksum)| -> crate::Result
{ @@ -190,14 +248,17 @@ impl<'a> Ingestion<'a> { self.tree.id, self.tree.config.cache.clone(), self.tree.config.descriptor_table.clone(), - false, - false, + pin_filter, + pin_index, #[cfg(feature = "metrics")] self.tree.metrics.clone(), ) }) .collect::>>()?; + // Ingestion produces new tables only and does not touch sealed + // memtables directly, so the deletion set is empty and the + // watermark is left at its neutral value. self.tree .register_tables(&created_tables, None, None, &[], 0)?; diff --git a/src/tree/inner.rs b/src/tree/inner.rs index 02dc5c26..4cc05830 100644 --- a/src/tree/inner.rs +++ b/src/tree/inner.rs @@ -63,8 +63,8 @@ pub struct TreeInner { /// can be concurrent next to each other. pub(crate) major_compaction_lock: RwLock<()>, + /// Serializes flush operations. pub(crate) flush_lock: Mutex<()>, - #[doc(hidden)] #[cfg(feature = "metrics")] pub metrics: Arc, diff --git a/src/tree/mod.rs b/src/tree/mod.rs index 49691945..25d920d2 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -7,7 +7,7 @@ pub mod inner; pub mod sealed; use crate::{ - compaction::{drop_range::OwnedBounds, CompactionStrategy}, + compaction::{drop_range::OwnedBounds, state::CompactionState, CompactionStrategy}, config::Config, format_version::FormatVersion, iter_guard::{IterGuard, IterGuardImpl}, @@ -103,8 +103,14 @@ impl AbstractTree for Tree { #[expect(clippy::significant_drop_tightening)] fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result> { + // Returns the newest visible version (`entry.seqno < seqno`) by consulting sources + // in recency order and returning on first hit: active memtable, sealed memtables, + // then tables (each scanned newest-first). + let version_history_lock = self.version_history.read().expect("lock is poisoned"); let super_version = version_history_lock.get_version_for_snapshot(seqno); + // Avoid holding the read lock across potential I/O in table lookups + drop(version_history_lock); if let Some(entry) = super_version.active_memtable.get(key, seqno) { return Ok(ignore_tombstone_value(entry)); @@ -118,7 +124,13 @@ impl AbstractTree for Tree { } // Now look in tables... this may involve disk I/O - self.get_internal_entry_from_tables(&super_version.version, key, seqno) + if let Some(entry) = + self.get_internal_entry_from_tables(&super_version.version, key, seqno)? + { + return Ok(ignore_tombstone_value(entry)); + } + + Ok(None) } fn current_version(&self) -> Version { @@ -193,48 +205,6 @@ impl AbstractTree for Tree { .sum() } - fn ingest( - &self, - iter: impl Iterator, - seqno_generator: &SequenceNumberCounter, - visible_seqno: &SequenceNumberCounter, - ) -> crate::Result<()> { - use crate::tree::ingest::Ingestion; - use std::time::Instant; - - let seqno = seqno_generator.next(); - - // TODO: allow ingestion always, by flushing memtable - - let mut writer = Ingestion::new(self)?.with_seqno(seqno); - - let start = Instant::now(); - let mut count = 0; - let mut last_key = None; - - for (key, value) in iter { - if let Some(last_key) = &last_key { - assert!( - key > last_key, - "next key in bulk ingest was not greater than last key, last: {last_key:?}, next: {key:?}", - ); - } - last_key = Some(key.clone()); - - writer.write(key, value)?; - - count += 1; - } - - writer.finish()?; - - visible_seqno.fetch_max(seqno + 1); - - log::info!("Ingested {count} items in {:?}", start.elapsed()); - - Ok(()) - } - fn drop_range, R: RangeBounds>(&self, range: R) -> crate::Result<()> { let (bounds, is_empty) = Self::range_bounds_to_owned_bounds(&range); @@ -545,7 +515,6 @@ impl AbstractTree for Tree { self.current_version().level(idx).map(|x| x.table_count()) } - #[expect(clippy::significant_drop_tightening)] fn approximate_len(&self) -> usize { let super_version = self .version_history @@ -578,7 +547,6 @@ impl AbstractTree for Tree { .sum() } - #[expect(clippy::significant_drop_tightening)] fn get_highest_memtable_seqno(&self) -> Option { let version = self .version_history @@ -709,6 +677,38 @@ impl Tree { Ok(tree) } + pub(crate) fn consume_writer( + &self, + writer: crate::table::Writer, + ) -> crate::Result> { + let table_file_path = writer.path.clone(); + + let Some((_, checksum)) = writer.finish()? else { + return Ok(None); + }; + + log::debug!("Finalized table write at {}", table_file_path.display()); + + let pin_filter = self.config.filter_block_pinning_policy.get(0); + let pin_index = self.config.index_block_pinning_policy.get(0); + + let created_table = Table::recover( + table_file_path, + checksum, + self.id, + self.config.cache.clone(), + self.config.descriptor_table.clone(), + pin_filter, + pin_index, + #[cfg(feature = "metrics")] + self.metrics.clone(), + )?; + + log::debug!("Flushed table to {:?}", created_table.path); + + Ok(Some(created_table)) + } + /// Returns `true` if there are some tables that are being compacted. #[doc(hidden)] #[must_use] @@ -731,10 +731,10 @@ impl Tree { return Some(entry); } } - None } + // Scan levels top-down and runs newest-first; return the first table hit fn get_internal_entry_from_tables( &self, version: &Version, @@ -751,7 +751,7 @@ impl Tree { if run.len() >= 4 { if let Some(table) = run.get_for_key(key) { if let Some(item) = table.get(key, seqno, key_hash)? { - return Ok(ignore_tombstone_value(item)); + return Ok(Some(item)); } } } else { @@ -762,7 +762,7 @@ impl Tree { } if let Some(item) = table.get(key, seqno, key_hash)? { - return Ok(ignore_tombstone_value(item)); + return Ok(Some(item)); } } } @@ -931,9 +931,7 @@ impl Tree { config, major_compaction_lock: RwLock::default(), flush_lock: Mutex::default(), - compaction_state: Arc::new(Mutex::new( - crate::compaction::state::CompactionState::default(), - )), + compaction_state: Arc::new(Mutex::new(CompactionState::default())), #[cfg(feature = "metrics")] metrics, diff --git a/tests/ingestion_api.rs b/tests/ingestion_api.rs new file mode 100644 index 00000000..c790d01f --- /dev/null +++ b/tests/ingestion_api.rs @@ -0,0 +1,298 @@ +use lsm_tree::{AbstractTree, Config, KvSeparationOptions, SeqNo}; + +#[test] +fn tree_ingestion_tombstones_delete_existing_keys() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(folder, Default::default()).open()?; + + for i in 0..10u32 { + let key = format!("k{:03}", i); + tree.insert(key.as_bytes(), b"v", 0); + } + let mut ingest = tree.ingestion()?.with_seqno(10); + for i in 0..10u32 { + let key = format!("k{:03}", i); + ingest.write_tombstone(key.as_bytes().into())?; + } + ingest.finish()?; + + for i in 0..10u32 { + let key = format!("k{:03}", i); + assert!(tree.get(key.as_bytes(), SeqNo::MAX)?.is_none()); + } + assert_eq!(tree.tombstone_count(), 10); + + Ok(()) +} + +#[test] +fn sealed_memtable_value_overrides_table_value() -> lsm_tree::Result<()> { + use lsm_tree::AbstractTree; + let folder = tempfile::tempdir()?; + let tree = lsm_tree::Config::new(folder, Default::default()).open()?; + + // Older table value via ingestion (seqno 1) + { + let mut ingest = tree.ingestion()?.with_seqno(1); + ingest.write(b"k".as_slice().into(), b"old".as_slice().into())?; + ingest.finish()?; + } + + // Newer value in memtable (seqno 2), then seal it + tree.insert(b"k", b"new", 2); + let _ = tree.rotate_memtable(); // move active -> sealed + + // Read should return the sealed memtable value + assert_eq!( + tree.get(b"k", lsm_tree::SeqNo::MAX)?, + Some(b"new".as_slice().into()) + ); + Ok(()) +} + +#[test] +fn sealed_memtable_tombstone_overrides_table_value() -> lsm_tree::Result<()> { + use lsm_tree::AbstractTree; + let folder = tempfile::tempdir()?; + let tree = lsm_tree::Config::new(folder, Default::default()).open()?; + + // Older table value via ingestion (seqno 1) + { + let mut ingest = tree.ingestion()?.with_seqno(1); + ingest.write(b"k".as_slice().into(), b"old".as_slice().into())?; + ingest.finish()?; + } + + // Newer tombstone in memtable (seqno 2), then seal it + tree.remove(b"k", 2); + let _ = tree.rotate_memtable(); + + // Read should see the delete from sealed memtable + assert!(tree.get(b"k", lsm_tree::SeqNo::MAX)?.is_none()); + Ok(()) +} + +#[test] +fn tables_newest_first_returns_highest_seqno() -> lsm_tree::Result<()> { + use lsm_tree::AbstractTree; + let folder = tempfile::tempdir()?; + let tree = lsm_tree::Config::new(folder, Default::default()).open()?; + + // Two separate ingestions create two tables containing the same key at different seqnos + { + let mut ingest = tree.ingestion()?.with_seqno(1); + ingest.write(b"k".as_slice().into(), b"v1".as_slice().into())?; + ingest.finish()?; + } + { + let mut ingest = tree.ingestion()?.with_seqno(2); + ingest.write(b"k".as_slice().into(), b"v2".as_slice().into())?; + ingest.finish()?; + } + + // With memtables empty, read should return the value from the newest table run (seqno 2) + assert_eq!( + tree.get(b"k", lsm_tree::SeqNo::MAX)?, + Some(b"v2".as_slice().into()) + ); + Ok(()) +} + +#[test] +#[should_panic(expected = "next key in ingestion must be greater than last key")] +fn ingestion_enforces_order_standard_panics() { + let folder = tempfile::tempdir().unwrap(); + let tree = lsm_tree::Config::new(folder, Default::default()) + .open() + .unwrap(); + + let mut ingest = tree.ingestion().unwrap().with_seqno(1); + // First write higher key, then lower to trigger ordering assertion + ingest + .write(b"k2".as_slice().into(), b"v".as_slice().into()) + .unwrap(); + // Panics here + let _ = ingest.write(b"k1".as_slice().into(), b"v".as_slice().into()); +} + +#[test] +fn blob_ingestion_out_of_order_panics_without_blob_write() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = lsm_tree::Config::new(folder, Default::default()) + .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(8))) + .open()?; + + let before = tree.blob_file_count(); + + // Use a small value for the first write to avoid blob I/O + let result = std::panic::catch_unwind(|| { + let mut ingest = tree.ingestion().unwrap().with_seqno(1); + ingest + .write(b"k2".as_slice().into(), b"x".as_slice().into()) + .unwrap(); + // Second write would require blob I/O, but ordering check should fire before any blob write + let _ = ingest.write(b"k1".as_slice().into(), vec![1u8; 16].into()); + }); + assert!(result.is_err()); + + let after = tree.blob_file_count(); + assert_eq!(before, after); + Ok(()) +} + +#[test] +fn memtable_put_overrides_table_tombstone() -> lsm_tree::Result<()> { + use lsm_tree::AbstractTree; + let folder = tempfile::tempdir()?; + let tree = lsm_tree::Config::new(folder, Default::default()).open()?; + + // Older put written via ingestion to tables (seqno 1) + { + let mut ingest = tree.ingestion()?.with_seqno(1); + ingest.write(b"k".as_slice().into(), b"v1".as_slice().into())?; + ingest.finish()?; + } + + // Newer tombstone written via ingestion to tables (seqno 2) + { + let mut ingest = tree.ingestion()?.with_seqno(2); + ingest.write_tombstone(b"k".as_slice().into())?; + ingest.finish()?; + } + + // Newest put in active memtable (seqno 3) should override table tombstone + tree.insert(b"k", b"v3", 3); + assert_eq!( + tree.get(b"k", lsm_tree::SeqNo::MAX)?, + Some(b"v3".as_slice().into()) + ); + Ok(()) +} + +#[test] +fn blob_tree_ingestion_tombstones_delete_existing_keys() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(folder, Default::default()) + .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1))) + .open()?; + + for i in 0..8u32 { + let key = format!("b{:03}", i); + tree.insert(key.as_bytes(), b"x", 0); + } + + let mut ingest = tree.ingestion()?.with_seqno(10); + for i in 0..8u32 { + let key = format!("b{:03}", i); + ingest.write_tombstone(key.as_bytes().into())?; + } + ingest.finish()?; + + for i in 0..8u32 { + let key = format!("b{:03}", i); + assert!(tree.get(key.as_bytes(), SeqNo::MAX)?.is_none()); + } + assert_eq!(tree.tombstone_count(), 8); + + Ok(()) +} + +#[test] +fn tree_ingestion_finish_no_writes_noop() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(folder, Default::default()).open()?; + + let before_tables = tree.table_count(); + tree.ingestion()?.finish()?; + let after_tables = tree.table_count(); + + assert_eq!(before_tables, after_tables); + assert!(tree.is_empty(SeqNo::MAX, None)?); + + Ok(()) +} + +#[test] +fn blob_ingestion_only_tombstones_does_not_create_blob_files() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(folder, Default::default()) + .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1))) + .open()?; + + for i in 0..5u32 { + let key = format!("d{:03}", i); + tree.insert(key.as_bytes(), b"value", 0); + } + + let before_blobs = tree.blob_file_count(); + + let mut ingest = tree.ingestion()?.with_seqno(10); + for i in 0..5u32 { + let key = format!("d{:03}", i); + ingest.write_tombstone(key.as_bytes().into())?; + } + ingest.finish()?; + + let after_blobs = tree.blob_file_count(); + assert_eq!(before_blobs, after_blobs); + + for i in 0..5u32 { + let key = format!("d{:03}", i); + assert!(tree.get(key.as_bytes(), SeqNo::MAX)?.is_none()); + } + + Ok(()) +} + +#[test] +fn blob_ingestion_finish_no_writes_noop() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(folder, Default::default()) + .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1))) + .open()?; + + let before_tables = tree.table_count(); + let before_blobs = tree.blob_file_count(); + + tree.ingestion()?.finish()?; + + let after_tables = tree.table_count(); + let after_blobs = tree.blob_file_count(); + + assert_eq!(before_tables, after_tables); + assert_eq!(before_blobs, after_blobs); + assert!(tree.is_empty(SeqNo::MAX, None)?); + + Ok(()) +} + +#[test] +fn blob_ingestion_separates_large_values_and_reads_ok() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(folder, Default::default()) + .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(8))) + .open()?; + + let mut ingest = tree.ingestion()?.with_seqno(1); + ingest.write("k_big1".as_bytes().into(), vec![1u8; 16].into())?; + ingest.write("k_big2".as_bytes().into(), vec![2u8; 32].into())?; + ingest.write("k_small".as_bytes().into(), b"abc".as_slice().into())?; + ingest.finish()?; + + assert!(tree.blob_file_count() >= 1); + + assert_eq!( + tree.get("k_small", SeqNo::MAX)?, + Some(b"abc".as_slice().into()) + ); + assert_eq!( + tree.get("k_big1", SeqNo::MAX)?.as_deref().map(|s| s.len()), + Some(16) + ); + assert_eq!( + tree.get("k_big2", SeqNo::MAX)?.as_deref().map(|s| s.len()), + Some(32) + ); + + Ok(()) +} diff --git a/tests/ingestion_invariants.rs b/tests/ingestion_invariants.rs new file mode 100644 index 00000000..8f6c689a --- /dev/null +++ b/tests/ingestion_invariants.rs @@ -0,0 +1,153 @@ +use std::sync::mpsc; +use std::thread; +use std::time::Duration; + +use lsm_tree::{AbstractTree, Config, KvSeparationOptions, SeqNo}; + +#[test] +fn ingestion_autoflushes_active_memtable() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(&folder, Default::default()).open()?; + + // Write to active memtable + for i in 0..10u32 { + let k = format!("a{:03}", i); + tree.insert(k.as_bytes(), b"v", 1); + } + + let tables_before = tree.table_count(); + let sealed_before = tree.sealed_memtable_count(); + assert_eq!(sealed_before, 0); + + // Start ingestion (should auto-flush active) + tree.ingestion()?.with_seqno(10).finish()?; + + // After ingestion, data is in tables; no sealed memtables + assert_eq!(tree.sealed_memtable_count(), 0); + assert!(tree.table_count() >= tables_before + 1); + + // Reads must succeed from tables + for i in 0..10u32 { + let k = format!("a{:03}", i); + assert_eq!( + tree.get(k.as_bytes(), SeqNo::MAX)?, + Some(b"v".as_slice().into()) + ); + } + + Ok(()) +} + +#[test] +fn ingestion_flushes_sealed_memtables() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(&folder, Default::default()).open()?; + + // Put items into active and seal them + for i in 0..8u32 { + let k = format!("s{:03}", i); + tree.insert(k.as_bytes(), b"x", 1); + } + assert!(tree.rotate_memtable().is_some()); + assert!(tree.sealed_memtable_count() > 0); + + let tables_before = tree.table_count(); + + // Ingestion should flush sealed memtables and register resulting tables + tree.ingestion()?.with_seqno(20).finish()?; + + assert_eq!(tree.sealed_memtable_count(), 0); + assert!(tree.table_count() >= tables_before + 1); + + for i in 0..8u32 { + let k = format!("s{:03}", i); + assert_eq!( + tree.get(k.as_bytes(), SeqNo::MAX)?, + Some(b"x".as_slice().into()) + ); + } + + Ok(()) +} + +#[test] +fn ingestion_blocks_memtable_writes_until_finish() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(&folder, Default::default()).open()?; + + // Acquire ingestion and keep it active while another thread performs writes + let ingest = tree.ingestion()?.with_seqno(5); + + let (started_tx, started_rx) = mpsc::channel(); + let (done_tx, done_rx) = mpsc::channel(); + let tree2 = tree.clone(); + + let handle = thread::spawn(move || { + started_tx.send(()).ok(); + tree2.insert(b"k_block", b"v", 6); + done_tx.send(()).ok(); + }); + + // Wait for the writer thread to start the attempt + started_rx.recv().unwrap(); + + // Give it a moment; the insert should complete and not be blocked by ingestion + thread::sleep(Duration::from_millis(100)); + assert!(done_rx.try_recv().is_ok(), "insert should not be blocked"); + + handle.join().unwrap(); + ingest.finish()?; + + // Verify the write landed and is visible + assert_eq!( + tree.get(b"k_block", SeqNo::MAX)?, + Some(b"v".as_slice().into()) + ); + + Ok(()) +} + +#[test] +fn blob_ingestion_honors_invariants_and_blocks_writes() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(&folder, Default::default()) + .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1))) + .open()?; + + // Write small values into memtable and then start blob ingestion + for i in 0..4u32 { + let k = format!("b{:03}", i); + tree.insert(k.as_bytes(), b"y", 1); + } + + let (started_tx, started_rx) = mpsc::channel(); + let (done_tx, done_rx) = mpsc::channel(); + let tree2 = tree.clone(); + + let ingest = tree.ingestion()?.with_seqno(30); + + let handle = thread::spawn(move || { + started_tx.send(()).ok(); + tree2.insert(b"b999", b"z", 31); + done_tx.send(()).ok(); + }); + + started_rx.recv().unwrap(); + thread::sleep(Duration::from_millis(100)); + assert!(done_rx.try_recv().is_ok()); + + handle.join().unwrap(); + ingest.finish()?; + + // Data visible after ingestion, including concurrent write + for i in 0..4u32 { + let k = format!("b{:03}", i); + assert_eq!( + tree.get(k.as_bytes(), SeqNo::MAX)?, + Some(b"y".as_slice().into()) + ); + } + assert_eq!(tree.get(b"b999", SeqNo::MAX)?, Some(b"z".as_slice().into())); + + Ok(()) +} diff --git a/tests/tree_bulk_ingest.rs b/tests/tree_bulk_ingest.rs index 0712ed73..3f988b34 100644 --- a/tests/tree_bulk_ingest.rs +++ b/tests/tree_bulk_ingest.rs @@ -12,15 +12,15 @@ fn tree_bulk_ingest() -> lsm_tree::Result<()> { let tree = Config::new(folder, seqno.clone()).open()?; - tree.ingest( - (0..ITEM_COUNT as u64).map(|x| { - let k = x.to_be_bytes(); - let v = nanoid::nanoid!(); - (k.into(), v.into()) - }), - &seqno, - &visible_seqno, - )?; + let seq = seqno.next(); + let mut ingestion = tree.ingestion()?.with_seqno(seq); + for x in 0..ITEM_COUNT as u64 { + let k = x.to_be_bytes(); + let v = nanoid::nanoid!(); + ingestion.write(k.into(), v.into())?; + } + ingestion.finish()?; + visible_seqno.fetch_max(seq + 1); assert_eq!(tree.len(SeqNo::MAX, None)?, ITEM_COUNT); assert_eq!( @@ -47,15 +47,15 @@ fn tree_copy() -> lsm_tree::Result<()> { let src = Config::new(folder, seqno.clone()).open()?; - src.ingest( - (0..ITEM_COUNT as u64).map(|x| { - let k = x.to_be_bytes(); - let v = nanoid::nanoid!(); - (k.into(), v.into()) - }), - &seqno, - &visible_seqno, - )?; + let seq = seqno.next(); + let mut ingestion = src.ingestion()?.with_seqno(seq); + for x in 0..ITEM_COUNT as u64 { + let k = x.to_be_bytes(); + let v = nanoid::nanoid!(); + ingestion.write(k.into(), v.into())?; + } + ingestion.finish()?; + visible_seqno.fetch_max(seq + 1); assert_eq!(src.len(SeqNo::MAX, None)?, ITEM_COUNT); assert_eq!( @@ -73,13 +73,14 @@ fn tree_copy() -> lsm_tree::Result<()> { let folder = tempfile::tempdir()?; let dest = Config::new(folder, seqno.clone()).open()?; - dest.ingest( - src.iter(SeqNo::MAX, None) - .map(|x| x.into_inner()) - .map(|x| x.unwrap()), - &seqno, - &visible_seqno, - )?; + let seq = seqno.next(); + let mut ingestion = dest.ingestion()?.with_seqno(seq); + for item in src.iter(SeqNo::MAX, None) { + let (k, v) = item.into_inner().unwrap(); + ingestion.write(k, v)?; + } + ingestion.finish()?; + visible_seqno.fetch_max(seq + 1); assert_eq!(dest.len(SeqNo::MAX, None)?, ITEM_COUNT); assert_eq!( @@ -108,15 +109,15 @@ fn blob_tree_bulk_ingest() -> lsm_tree::Result<()> { .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1))) .open()?; - tree.ingest( - (0..ITEM_COUNT as u64).map(|x| { - let k = x.to_be_bytes(); - let v = nanoid::nanoid!(); - (k.into(), v.into()) - }), - &seqno, - &visible_seqno, - )?; + let seq = seqno.next(); + let mut ingestion = tree.ingestion()?.with_seqno(seq); + for x in 0..ITEM_COUNT as u64 { + let k = x.to_be_bytes(); + let v = nanoid::nanoid!(); + ingestion.write(k.into(), v.into())?; + } + ingestion.finish()?; + visible_seqno.fetch_max(seq + 1); assert_eq!(tree.len(SeqNo::MAX, None)?, ITEM_COUNT); assert_eq!( @@ -146,15 +147,15 @@ fn blob_tree_copy() -> lsm_tree::Result<()> { .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1))) .open()?; - src.ingest( - (0..ITEM_COUNT as u64).map(|x| { - let k = x.to_be_bytes(); - let v = nanoid::nanoid!(); - (k.into(), v.into()) - }), - &seqno, - &visible_seqno, - )?; + let seq = seqno.next(); + let mut ingestion = src.ingestion()?.with_seqno(seq); + for x in 0..ITEM_COUNT as u64 { + let k = x.to_be_bytes(); + let v = nanoid::nanoid!(); + ingestion.write(k.into(), v.into())?; + } + ingestion.finish()?; + visible_seqno.fetch_max(seq + 1); assert_eq!(src.len(SeqNo::MAX, None)?, ITEM_COUNT); assert_eq!( @@ -175,13 +176,14 @@ fn blob_tree_copy() -> lsm_tree::Result<()> { .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1))) .open()?; - dest.ingest( - src.iter(SeqNo::MAX, None) - .map(|x| x.into_inner()) - .map(|x| x.unwrap()), - &seqno, - &visible_seqno, - )?; + let seq = seqno.next(); + let mut ingestion = dest.ingestion()?.with_seqno(seq); + for item in src.iter(SeqNo::MAX, None) { + let (k, v) = item.into_inner().unwrap(); + ingestion.write(k, v)?; + } + ingestion.finish()?; + visible_seqno.fetch_max(seq + 1); assert_eq!(dest.len(SeqNo::MAX, None)?, ITEM_COUNT); assert_eq!( From d9d041fb78b56453d15529491a0e2998cddbc688 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Sat, 15 Nov 2025 13:23:17 +0100 Subject: [PATCH 02/17] clippy --- src/blob_tree/ingest.rs | 2 +- src/error.rs | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/src/blob_tree/ingest.rs b/src/blob_tree/ingest.rs index c0b457a3..9e654850 100644 --- a/src/blob_tree/ingest.rs +++ b/src/blob_tree/ingest.rs @@ -7,7 +7,7 @@ use crate::{ SeqNo, UserKey, UserValue, }; -/// Bulk ingestion for BlobTree +/// Bulk ingestion for [`BlobTree`] /// /// Items NEED to be added in ascending key order. /// diff --git a/src/error.rs b/src/error.rs index 037928ed..530da2a5 100644 --- a/src/error.rs +++ b/src/error.rs @@ -20,9 +20,6 @@ pub enum Error { /// Some required files could not be recovered from disk Unrecoverable, - /// Ingestion could not start because required preconditions were not met - IngestionPreconditionFailed, - /// Checksum mismatch ChecksumMismatch { /// Checksum of loaded block From 3489b8fbcc2d8673d4b508f8566b006b273d69b1 Mon Sep 17 00:00:00 2001 From: zaidoon Date: Sat, 15 Nov 2025 09:06:03 -0500 Subject: [PATCH 03/17] don't pin ingestion output tables --- src/blob_tree/ingest.rs | 10 +++++----- src/tree/ingest.rs | 19 +++++-------------- 2 files changed, 10 insertions(+), 19 deletions(-) diff --git a/src/blob_tree/ingest.rs b/src/blob_tree/ingest.rs index 9e654850..8372d3f1 100644 --- a/src/blob_tree/ingest.rs +++ b/src/blob_tree/ingest.rs @@ -151,12 +151,12 @@ impl<'a> BlobIngestion<'a> { let blob_files = self.blob.finish()?; let results = self.table.writer.finish()?; - let pin_filter = index.config.filter_block_pinning_policy.get(0); - let pin_index = index.config.index_block_pinning_policy.get(0); - let created_tables = results .into_iter() .map(|(table_id, checksum)| -> crate::Result
{ + // Do not pin ingestion output tables here. Large ingests are + // typically placed in level 1 and would otherwise keep all + // filter and index blocks pinned, increasing memory pressure. Table::recover( index .config @@ -167,8 +167,8 @@ impl<'a> BlobIngestion<'a> { index.id, index.config.cache.clone(), index.config.descriptor_table.clone(), - pin_filter, - pin_index, + false, + false, #[cfg(feature = "metrics")] index.metrics.clone(), ) diff --git a/src/tree/ingest.rs b/src/tree/ingest.rs index 6f87c895..4430b926 100644 --- a/src/tree/ingest.rs +++ b/src/tree/ingest.rs @@ -219,18 +219,6 @@ impl<'a> Ingestion<'a> { log::info!("Finished ingestion writer"); - let pin_filter = self - .tree - .config - .filter_block_pinning_policy - .get(INITIAL_CANONICAL_LEVEL); - - let pin_index = self - .tree - .config - .index_block_pinning_policy - .get(INITIAL_CANONICAL_LEVEL); - // Turn the writer output into fully recovered tables that can be // registered as a fresh L0 run. let created_tables = results @@ -242,14 +230,17 @@ impl<'a> Ingestion<'a> { // .with_metrics(metrics) // .run(path, tree_id, cache, descriptor_table); + // Do not pin ingestion output tables here. Large ingests are + // typically placed in level 1 and would otherwise keep all + // filter and index blocks pinned, increasing memory pressure. Table::recover( self.folder.join(table_id.to_string()), checksum, self.tree.id, self.tree.config.cache.clone(), self.tree.config.descriptor_table.clone(), - pin_filter, - pin_index, + false, + false, #[cfg(feature = "metrics")] self.tree.metrics.clone(), ) From 25c9cdfb3574bbba72d1074279a2eeb0ef6c9d61 Mon Sep 17 00:00:00 2001 From: zaidoon Date: Sat, 15 Nov 2025 14:40:40 -0500 Subject: [PATCH 04/17] split ingestion initialization from seqno assignment in bulk ingest tests --- tests/tree_bulk_ingest.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/tests/tree_bulk_ingest.rs b/tests/tree_bulk_ingest.rs index 3f988b34..268cbe8c 100644 --- a/tests/tree_bulk_ingest.rs +++ b/tests/tree_bulk_ingest.rs @@ -12,8 +12,9 @@ fn tree_bulk_ingest() -> lsm_tree::Result<()> { let tree = Config::new(folder, seqno.clone()).open()?; + let mut ingestion = tree.ingestion()?; let seq = seqno.next(); - let mut ingestion = tree.ingestion()?.with_seqno(seq); + ingestion = ingestion.with_seqno(seq); for x in 0..ITEM_COUNT as u64 { let k = x.to_be_bytes(); let v = nanoid::nanoid!(); @@ -47,8 +48,9 @@ fn tree_copy() -> lsm_tree::Result<()> { let src = Config::new(folder, seqno.clone()).open()?; + let mut ingestion = src.ingestion()?; let seq = seqno.next(); - let mut ingestion = src.ingestion()?.with_seqno(seq); + ingestion = ingestion.with_seqno(seq); for x in 0..ITEM_COUNT as u64 { let k = x.to_be_bytes(); let v = nanoid::nanoid!(); @@ -73,8 +75,9 @@ fn tree_copy() -> lsm_tree::Result<()> { let folder = tempfile::tempdir()?; let dest = Config::new(folder, seqno.clone()).open()?; + let mut ingestion = dest.ingestion()?; let seq = seqno.next(); - let mut ingestion = dest.ingestion()?.with_seqno(seq); + ingestion = ingestion.with_seqno(seq); for item in src.iter(SeqNo::MAX, None) { let (k, v) = item.into_inner().unwrap(); ingestion.write(k, v)?; @@ -109,8 +112,9 @@ fn blob_tree_bulk_ingest() -> lsm_tree::Result<()> { .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1))) .open()?; + let mut ingestion = tree.ingestion()?; let seq = seqno.next(); - let mut ingestion = tree.ingestion()?.with_seqno(seq); + ingestion = ingestion.with_seqno(seq); for x in 0..ITEM_COUNT as u64 { let k = x.to_be_bytes(); let v = nanoid::nanoid!(); @@ -147,8 +151,9 @@ fn blob_tree_copy() -> lsm_tree::Result<()> { .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1))) .open()?; + let mut ingestion = src.ingestion()?; let seq = seqno.next(); - let mut ingestion = src.ingestion()?.with_seqno(seq); + ingestion = ingestion.with_seqno(seq); for x in 0..ITEM_COUNT as u64 { let k = x.to_be_bytes(); let v = nanoid::nanoid!(); @@ -176,8 +181,9 @@ fn blob_tree_copy() -> lsm_tree::Result<()> { .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1))) .open()?; + let mut ingestion = dest.ingestion()?; let seq = seqno.next(); - let mut ingestion = dest.ingestion()?.with_seqno(seq); + ingestion = ingestion.with_seqno(seq); for item in src.iter(SeqNo::MAX, None) { let (k, v) = item.into_inner().unwrap(); ingestion.write(k, v)?; From 90b02a6450651785e1badb77ff40d642a9a7b1b7 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Wed, 19 Nov 2025 15:55:50 +0100 Subject: [PATCH 05/17] test: dirty snapshot after ingestion --- tests/ingest_dirty_snapshot.rs | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 tests/ingest_dirty_snapshot.rs diff --git a/tests/ingest_dirty_snapshot.rs b/tests/ingest_dirty_snapshot.rs new file mode 100644 index 00000000..59f6ddb1 --- /dev/null +++ b/tests/ingest_dirty_snapshot.rs @@ -0,0 +1,23 @@ +use lsm_tree::{AbstractTree, Config, SequenceNumberCounter}; + +#[test] +fn ingestion_dirty_snapshot() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let seqno = SequenceNumberCounter::default(); + let tree = Config::new(&folder, seqno.clone()).open()?; + + tree.insert("a", "a", seqno.next()); + tree.insert("a", "b", seqno.next()); + + let snapshot_seqno = 1; + assert_eq!(b"a", &*tree.get("a", snapshot_seqno)?.unwrap()); + + let mut ingest = tree.ingestion()?.with_seqno(seqno.next()); + ingest.write("b".into(), "b".into())?; + ingest.finish()?; + + assert_eq!(b"a", &*tree.get("a", snapshot_seqno)?.unwrap()); + + Ok(()) +} From b73aa4e03a55a5ff1f660d8d82271a2c43fd82b2 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Wed, 19 Nov 2025 15:58:34 +0100 Subject: [PATCH 06/17] refactor --- src/blob_tree/ingest.rs | 2 +- src/blob_tree/mod.rs | 5 ++--- src/compaction/worker.rs | 2 +- src/version/mod.rs | 2 +- src/vlog/blob_file/multi_writer.rs | 3 +-- tests/ingestion_invariants.rs | 7 +++---- 6 files changed, 9 insertions(+), 12 deletions(-) diff --git a/src/blob_tree/ingest.rs b/src/blob_tree/ingest.rs index 8372d3f1..3b731a22 100644 --- a/src/blob_tree/ingest.rs +++ b/src/blob_tree/ingest.rs @@ -41,9 +41,9 @@ impl<'a> BlobIngestion<'a> { let table = TableIngestion::new(&tree.index)?; let blob = BlobFileWriter::new( tree.index.0.blob_file_id_counter.clone(), - blob_file_size, tree.index.config.path.join(BLOBS_FOLDER), )? + .use_target_size(blob_file_size) .use_compression(kv.compression); let separation_threshold = kv.separation_threshold; diff --git a/src/blob_tree/mod.rs b/src/blob_tree/mod.rs index 1240f926..ee0fdaba 100644 --- a/src/blob_tree/mod.rs +++ b/src/blob_tree/mod.rs @@ -19,8 +19,7 @@ use crate::{ value::InternalValue, version::Version, vlog::{Accessor, BlobFile, BlobFileWriter, ValueHandle}, - Cache, Config, DescriptorTable, Memtable, SeqNo, SequenceNumberCounter, TableId, TreeId, - UserKey, UserValue, + Cache, Config, DescriptorTable, Memtable, SeqNo, TableId, TreeId, UserKey, UserValue, }; use handle::BlobIndirection; use std::{ @@ -396,9 +395,9 @@ impl AbstractTree for BlobTree { let mut blob_writer = BlobFileWriter::new( self.index.0.blob_file_id_counter.clone(), - kv_opts.file_target_size, self.index.config.path.join(BLOBS_FOLDER), )? + .use_target_size(kv_opts.file_target_size) .use_compression( self.index .config diff --git a/src/compaction/worker.rs b/src/compaction/worker.rs index fc15b9bd..02d49e70 100644 --- a/src/compaction/worker.rs +++ b/src/compaction/worker.rs @@ -424,9 +424,9 @@ fn merge_tables( let writer = BlobFileWriter::new( opts.blob_file_id_generator.clone(), - blob_opts.file_target_size, opts.config.path.join(BLOBS_FOLDER), )? + .use_target_size(blob_opts.file_target_size) .use_passthrough_compression(blob_opts.compression); let inner = StandardCompaction::new(table_writer, tables); diff --git a/src/version/mod.rs b/src/version/mod.rs index 2d82e92f..a46d3758 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -18,11 +18,11 @@ use crate::blob_tree::{FragmentationEntry, FragmentationMap}; use crate::coding::Encode; use crate::compaction::state::hidden_set::HiddenSet; use crate::version::recovery::Recovery; +use crate::TreeType; use crate::{ vlog::{BlobFile, BlobFileId}, HashSet, KeyRange, Table, TableId, }; -use crate::{Tree, TreeType}; use optimize::optimize_runs; use run::Ranged; use std::fs::File; diff --git a/src/vlog/blob_file/multi_writer.rs b/src/vlog/blob_file/multi_writer.rs index 5dfe2270..03f924f3 100644 --- a/src/vlog/blob_file/multi_writer.rs +++ b/src/vlog/blob_file/multi_writer.rs @@ -39,7 +39,6 @@ impl MultiWriter { #[doc(hidden)] pub fn new>( id_generator: SequenceNumberCounter, - target_size: u64, folder: P, ) -> crate::Result { let folder = folder.as_ref(); @@ -50,7 +49,7 @@ impl MultiWriter { Ok(Self { id_generator, folder: folder.into(), - target_size, + target_size: 64 * 1_024 * 1_024, active_writer: Writer::new(blob_file_path, blob_file_id)?, diff --git a/tests/ingestion_invariants.rs b/tests/ingestion_invariants.rs index 8f6c689a..7061fa31 100644 --- a/tests/ingestion_invariants.rs +++ b/tests/ingestion_invariants.rs @@ -1,9 +1,8 @@ +use lsm_tree::{AbstractTree, Config, KvSeparationOptions, SeqNo}; use std::sync::mpsc; use std::thread; use std::time::Duration; -use lsm_tree::{AbstractTree, Config, KvSeparationOptions, SeqNo}; - #[test] fn ingestion_autoflushes_active_memtable() -> lsm_tree::Result<()> { let folder = tempfile::tempdir()?; @@ -24,7 +23,7 @@ fn ingestion_autoflushes_active_memtable() -> lsm_tree::Result<()> { // After ingestion, data is in tables; no sealed memtables assert_eq!(tree.sealed_memtable_count(), 0); - assert!(tree.table_count() >= tables_before + 1); + assert!(tree.table_count() > tables_before); // Reads must succeed from tables for i in 0..10u32 { @@ -57,7 +56,7 @@ fn ingestion_flushes_sealed_memtables() -> lsm_tree::Result<()> { tree.ingestion()?.with_seqno(20).finish()?; assert_eq!(tree.sealed_memtable_count(), 0); - assert!(tree.table_count() >= tables_before + 1); + assert!(tree.table_count() > tables_before); for i in 0..8u32 { let k = format!("s{:03}", i); From 1877b57c360bf0fee72c5c843281139cb4b743c6 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Wed, 19 Nov 2025 15:58:34 +0100 Subject: [PATCH 07/17] refactor --- src/blob_tree/ingest.rs | 2 +- src/blob_tree/mod.rs | 5 ++--- src/compaction/worker.rs | 2 +- src/version/mod.rs | 2 +- src/vlog/blob_file/multi_writer.rs | 3 +-- src/vlog/blob_file/reader.rs | 8 ++++---- tests/ingestion_invariants.rs | 7 +++---- 7 files changed, 13 insertions(+), 16 deletions(-) diff --git a/src/blob_tree/ingest.rs b/src/blob_tree/ingest.rs index 8372d3f1..3b731a22 100644 --- a/src/blob_tree/ingest.rs +++ b/src/blob_tree/ingest.rs @@ -41,9 +41,9 @@ impl<'a> BlobIngestion<'a> { let table = TableIngestion::new(&tree.index)?; let blob = BlobFileWriter::new( tree.index.0.blob_file_id_counter.clone(), - blob_file_size, tree.index.config.path.join(BLOBS_FOLDER), )? + .use_target_size(blob_file_size) .use_compression(kv.compression); let separation_threshold = kv.separation_threshold; diff --git a/src/blob_tree/mod.rs b/src/blob_tree/mod.rs index 1240f926..ee0fdaba 100644 --- a/src/blob_tree/mod.rs +++ b/src/blob_tree/mod.rs @@ -19,8 +19,7 @@ use crate::{ value::InternalValue, version::Version, vlog::{Accessor, BlobFile, BlobFileWriter, ValueHandle}, - Cache, Config, DescriptorTable, Memtable, SeqNo, SequenceNumberCounter, TableId, TreeId, - UserKey, UserValue, + Cache, Config, DescriptorTable, Memtable, SeqNo, TableId, TreeId, UserKey, UserValue, }; use handle::BlobIndirection; use std::{ @@ -396,9 +395,9 @@ impl AbstractTree for BlobTree { let mut blob_writer = BlobFileWriter::new( self.index.0.blob_file_id_counter.clone(), - kv_opts.file_target_size, self.index.config.path.join(BLOBS_FOLDER), )? + .use_target_size(kv_opts.file_target_size) .use_compression( self.index .config diff --git a/src/compaction/worker.rs b/src/compaction/worker.rs index fc15b9bd..02d49e70 100644 --- a/src/compaction/worker.rs +++ b/src/compaction/worker.rs @@ -424,9 +424,9 @@ fn merge_tables( let writer = BlobFileWriter::new( opts.blob_file_id_generator.clone(), - blob_opts.file_target_size, opts.config.path.join(BLOBS_FOLDER), )? + .use_target_size(blob_opts.file_target_size) .use_passthrough_compression(blob_opts.compression); let inner = StandardCompaction::new(table_writer, tables); diff --git a/src/version/mod.rs b/src/version/mod.rs index 2d82e92f..a46d3758 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -18,11 +18,11 @@ use crate::blob_tree::{FragmentationEntry, FragmentationMap}; use crate::coding::Encode; use crate::compaction::state::hidden_set::HiddenSet; use crate::version::recovery::Recovery; +use crate::TreeType; use crate::{ vlog::{BlobFile, BlobFileId}, HashSet, KeyRange, Table, TableId, }; -use crate::{Tree, TreeType}; use optimize::optimize_runs; use run::Ranged; use std::fs::File; diff --git a/src/vlog/blob_file/multi_writer.rs b/src/vlog/blob_file/multi_writer.rs index 5dfe2270..03f924f3 100644 --- a/src/vlog/blob_file/multi_writer.rs +++ b/src/vlog/blob_file/multi_writer.rs @@ -39,7 +39,6 @@ impl MultiWriter { #[doc(hidden)] pub fn new>( id_generator: SequenceNumberCounter, - target_size: u64, folder: P, ) -> crate::Result { let folder = folder.as_ref(); @@ -50,7 +49,7 @@ impl MultiWriter { Ok(Self { id_generator, folder: folder.into(), - target_size, + target_size: 64 * 1_024 * 1_024, active_writer: Writer::new(blob_file_path, blob_file_id)?, diff --git a/src/vlog/blob_file/reader.rs b/src/vlog/blob_file/reader.rs index 21582508..4ae02fae 100644 --- a/src/vlog/blob_file/reader.rs +++ b/src/vlog/blob_file/reader.rs @@ -112,8 +112,8 @@ mod tests { let id_generator = SequenceNumberCounter::default(); let folder = tempfile::tempdir()?; - let mut writer = - crate::vlog::BlobFileWriter::new(id_generator, u64::MAX, folder.path()).unwrap(); + let mut writer = crate::vlog::BlobFileWriter::new(id_generator, folder.path())? + .use_target_size(u64::MAX); let offset = writer.offset(); let on_disk_size = writer.write(b"a", 0, b"abcdef")?; @@ -140,8 +140,8 @@ mod tests { let id_generator = SequenceNumberCounter::default(); let folder = tempfile::tempdir()?; - let mut writer = crate::vlog::BlobFileWriter::new(id_generator, u64::MAX, folder.path()) - .unwrap() + let mut writer = crate::vlog::BlobFileWriter::new(id_generator, folder.path())? + .use_target_size(u64::MAX) .use_compression(CompressionType::Lz4); let offset = writer.offset(); diff --git a/tests/ingestion_invariants.rs b/tests/ingestion_invariants.rs index 8f6c689a..7061fa31 100644 --- a/tests/ingestion_invariants.rs +++ b/tests/ingestion_invariants.rs @@ -1,9 +1,8 @@ +use lsm_tree::{AbstractTree, Config, KvSeparationOptions, SeqNo}; use std::sync::mpsc; use std::thread; use std::time::Duration; -use lsm_tree::{AbstractTree, Config, KvSeparationOptions, SeqNo}; - #[test] fn ingestion_autoflushes_active_memtable() -> lsm_tree::Result<()> { let folder = tempfile::tempdir()?; @@ -24,7 +23,7 @@ fn ingestion_autoflushes_active_memtable() -> lsm_tree::Result<()> { // After ingestion, data is in tables; no sealed memtables assert_eq!(tree.sealed_memtable_count(), 0); - assert!(tree.table_count() >= tables_before + 1); + assert!(tree.table_count() > tables_before); // Reads must succeed from tables for i in 0..10u32 { @@ -57,7 +56,7 @@ fn ingestion_flushes_sealed_memtables() -> lsm_tree::Result<()> { tree.ingestion()?.with_seqno(20).finish()?; assert_eq!(tree.sealed_memtable_count(), 0); - assert!(tree.table_count() >= tables_before + 1); + assert!(tree.table_count() > tables_before); for i in 0..8u32 { let k = format!("s{:03}", i); From 7849efa550109a7a789c7ed0c7cf98fa2225b714 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Wed, 19 Nov 2025 16:10:26 +0100 Subject: [PATCH 08/17] more ergonomic ingestion API, add more tests --- src/abstract.rs | 6 ++-- src/any_tree.rs | 54 +++++++++++++++++++---------- tests/blob_ingest.rs | 56 ++++++++++++++++++++++++++++++ tests/blob_ingest_relink.rs | 62 ++++++++++++++++++++++++++++++++++ tests/ingest_dirty_snapshot.rs | 2 +- tests/ingestion_api.rs | 43 ++++++++++++----------- tests/tree_bulk_ingest.rs | 8 ++--- 7 files changed, 185 insertions(+), 46 deletions(-) create mode 100644 tests/blob_ingest.rs create mode 100644 tests/blob_ingest_relink.rs diff --git a/src/abstract.rs b/src/abstract.rs index 5b038f37..301de38b 100644 --- a/src/abstract.rs +++ b/src/abstract.rs @@ -3,10 +3,8 @@ // (found in the LICENSE-* files in the repository) use crate::{ - blob_tree::FragmentationMap, compaction::CompactionStrategy, config::TreeType, - iter_guard::IterGuardImpl, table::Table, tree::inner::MemtableId, version::Version, - vlog::BlobFile, AnyTree, BlobTree, Config, Guard, InternalValue, KvPair, Memtable, SeqNo, - TableId, Tree, TreeId, UserKey, UserValue, + iter_guard::IterGuardImpl, table::Table, version::Version, vlog::BlobFile, AnyTree, BlobTree, + Config, Guard, InternalValue, KvPair, Memtable, SeqNo, TableId, Tree, UserKey, UserValue, }; use std::{ ops::RangeBounds, diff --git a/src/any_tree.rs b/src/any_tree.rs index e39a35ea..f780feb6 100644 --- a/src/any_tree.rs +++ b/src/any_tree.rs @@ -31,47 +31,67 @@ pub enum AnyIngestion<'a> { Blob(BlobIngestion<'a>), } -impl<'a> AnyIngestion<'a> { +impl AnyIngestion<'_> { #[must_use] /// Sets the sequence number used for subsequent writes pub fn with_seqno(self, seqno: SeqNo) -> Self { match self { - AnyIngestion::Standard(i) => AnyIngestion::Standard(i.with_seqno(seqno)), - AnyIngestion::Blob(b) => AnyIngestion::Blob(b.with_seqno(seqno)), + Self::Standard(i) => Self::Standard(i.with_seqno(seqno)), + Self::Blob(b) => Self::Blob(b.with_seqno(seqno)), } } - /// Writes a key-value pair - pub fn write(&mut self, key: UserKey, value: UserValue) -> crate::Result<()> { + /// Writes a key-value pair. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn write, V: Into>( + &mut self, + key: K, + value: V, + ) -> crate::Result<()> { match self { - AnyIngestion::Standard(i) => i.write(key, value), - AnyIngestion::Blob(b) => b.write(key, value), + Self::Standard(i) => i.write(key.into(), value.into()), + Self::Blob(b) => b.write(key.into(), value.into()), } } - /// Writes a tombstone for a key - pub fn write_tombstone(&mut self, key: UserKey) -> crate::Result<()> { + /// Writes a tombstone for a key. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn write_tombstone>(&mut self, key: K) -> crate::Result<()> { match self { - AnyIngestion::Standard(i) => i.write_tombstone(key), - AnyIngestion::Blob(b) => b.write_tombstone(key), + Self::Standard(i) => i.write_tombstone(key.into()), + Self::Blob(b) => b.write_tombstone(key.into()), } } - /// Finalizes ingestion and registers created tables (and blob files if present) + /// Finalizes ingestion and registers created tables (and blob files if present). + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. pub fn finish(self) -> crate::Result<()> { match self { - AnyIngestion::Standard(i) => i.finish(), - AnyIngestion::Blob(b) => b.finish(), + Self::Standard(i) => i.finish(), + Self::Blob(b) => b.finish(), } } } impl AnyTree { - /// Starts an ingestion for any tree type (standard or blob) + /// Starts an ingestion for any tree type (standard or blob). + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. pub fn ingestion(&self) -> crate::Result> { match self { - AnyTree::Standard(t) => Ok(AnyIngestion::Standard(Ingestion::new(t)?)), - AnyTree::Blob(b) => Ok(AnyIngestion::Blob(BlobIngestion::new(b)?)), + Self::Standard(t) => Ok(AnyIngestion::Standard(Ingestion::new(t)?)), + Self::Blob(b) => Ok(AnyIngestion::Blob(BlobIngestion::new(b)?)), } } } diff --git a/tests/blob_ingest.rs b/tests/blob_ingest.rs new file mode 100644 index 00000000..0aa7811d --- /dev/null +++ b/tests/blob_ingest.rs @@ -0,0 +1,56 @@ +use lsm_tree::{ + blob_tree::FragmentationEntry, AbstractTree, KvSeparationOptions, SeqNo, SequenceNumberCounter, +}; +use test_log::test; + +#[test] +fn blob_ingest_gc_stats() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let path = folder.path(); + + let big_value = b"neptune!".repeat(128_000); + let new_big_value = b"winter!".repeat(128_000); + + { + let tree = lsm_tree::Config::new(path, SequenceNumberCounter::default()) + .with_kv_separation(Some( + KvSeparationOptions::default().compression(lsm_tree::CompressionType::None), + )) + .open()?; + + let mut ingestion = tree.ingestion()?; + ingestion.write("big", &big_value)?; + ingestion.write("smol", "small value")?; + ingestion.finish()?; + + let value = tree.get("big", SeqNo::MAX)?.expect("should exist"); + assert_eq!(&*value, big_value); + assert_eq!(1, tree.table_count()); + assert_eq!(1, tree.blob_file_count()); + + let mut ingestion = tree.ingestion()?; + ingestion.write("big", &new_big_value)?; + ingestion.finish()?; + + // Blob file has no fragmentation before compaction (in stats) + // so it is not rewritten + tree.major_compact(64_000_000, 1_000)?; + assert_eq!(1, tree.table_count()); + assert_eq!(2, tree.blob_file_count()); + + let gc_stats = tree.current_version().gc_stats().clone(); + + // "big":0 is expired + assert_eq!( + &{ + let mut map = lsm_tree::HashMap::default(); + let size = big_value.len() as u64; + map.insert(0, FragmentationEntry::new(1, size, size)); + map + }, + &*gc_stats, + ); + } + + Ok(()) +} diff --git a/tests/blob_ingest_relink.rs b/tests/blob_ingest_relink.rs new file mode 100644 index 00000000..cc5d317c --- /dev/null +++ b/tests/blob_ingest_relink.rs @@ -0,0 +1,62 @@ +use lsm_tree::{AbstractTree, KvSeparationOptions, SeqNo, SequenceNumberCounter}; +use test_log::test; + +#[test] +fn blob_tree_ingest_relink() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let path = folder.path(); + + let big_value = b"neptune!".repeat(128_000); + + { + let tree = lsm_tree::Config::new(path, SequenceNumberCounter::default()) + .with_kv_separation(Some( + KvSeparationOptions::default().compression(lsm_tree::CompressionType::None), + )) + .open()?; + + let mut ingestion = tree.ingestion()?; + ingestion.write("big", &big_value)?; + ingestion.write("smol", "small value")?; + ingestion.finish()?; + + let value = tree.get("big", SeqNo::MAX)?.expect("should exist"); + assert_eq!(&*value, big_value); + assert_eq!(1, tree.table_count()); + assert_eq!(1, tree.blob_file_count()); + + assert_eq!( + Some(vec![lsm_tree::table::writer::LinkedFile { + blob_file_id: 0, + bytes: big_value.len() as u64, + on_disk_bytes: big_value.len() as u64, + len: 1, + }]), + tree.current_version() + .iter_tables() + .next() + .unwrap() + .list_blob_file_references()?, + ); + + tree.major_compact(64_000_000, 1_000)?; + assert_eq!(1, tree.table_count()); + assert_eq!(1, tree.blob_file_count()); + + assert_eq!( + Some(vec![lsm_tree::table::writer::LinkedFile { + blob_file_id: 0, + bytes: big_value.len() as u64, + on_disk_bytes: big_value.len() as u64, + len: 1, + }]), + tree.current_version() + .iter_tables() + .next() + .unwrap() + .list_blob_file_references()?, + ); + } + + Ok(()) +} diff --git a/tests/ingest_dirty_snapshot.rs b/tests/ingest_dirty_snapshot.rs index 59f6ddb1..4ddfb92f 100644 --- a/tests/ingest_dirty_snapshot.rs +++ b/tests/ingest_dirty_snapshot.rs @@ -14,7 +14,7 @@ fn ingestion_dirty_snapshot() -> lsm_tree::Result<()> { assert_eq!(b"a", &*tree.get("a", snapshot_seqno)?.unwrap()); let mut ingest = tree.ingestion()?.with_seqno(seqno.next()); - ingest.write("b".into(), "b".into())?; + ingest.write("b", "b")?; ingest.finish()?; assert_eq!(b"a", &*tree.get("a", snapshot_seqno)?.unwrap()); diff --git a/tests/ingestion_api.rs b/tests/ingestion_api.rs index c790d01f..97f5164f 100644 --- a/tests/ingestion_api.rs +++ b/tests/ingestion_api.rs @@ -9,10 +9,11 @@ fn tree_ingestion_tombstones_delete_existing_keys() -> lsm_tree::Result<()> { let key = format!("k{:03}", i); tree.insert(key.as_bytes(), b"v", 0); } + let mut ingest = tree.ingestion()?.with_seqno(10); for i in 0..10u32 { let key = format!("k{:03}", i); - ingest.write_tombstone(key.as_bytes().into())?; + ingest.write_tombstone(key)?; } ingest.finish()?; @@ -34,7 +35,7 @@ fn sealed_memtable_value_overrides_table_value() -> lsm_tree::Result<()> { // Older table value via ingestion (seqno 1) { let mut ingest = tree.ingestion()?.with_seqno(1); - ingest.write(b"k".as_slice().into(), b"old".as_slice().into())?; + ingest.write(b"k", b"old")?; ingest.finish()?; } @@ -47,6 +48,7 @@ fn sealed_memtable_value_overrides_table_value() -> lsm_tree::Result<()> { tree.get(b"k", lsm_tree::SeqNo::MAX)?, Some(b"new".as_slice().into()) ); + Ok(()) } @@ -59,7 +61,7 @@ fn sealed_memtable_tombstone_overrides_table_value() -> lsm_tree::Result<()> { // Older table value via ingestion (seqno 1) { let mut ingest = tree.ingestion()?.with_seqno(1); - ingest.write(b"k".as_slice().into(), b"old".as_slice().into())?; + ingest.write(b"k", b"old")?; ingest.finish()?; } @@ -69,6 +71,7 @@ fn sealed_memtable_tombstone_overrides_table_value() -> lsm_tree::Result<()> { // Read should see the delete from sealed memtable assert!(tree.get(b"k", lsm_tree::SeqNo::MAX)?.is_none()); + Ok(()) } @@ -81,12 +84,12 @@ fn tables_newest_first_returns_highest_seqno() -> lsm_tree::Result<()> { // Two separate ingestions create two tables containing the same key at different seqnos { let mut ingest = tree.ingestion()?.with_seqno(1); - ingest.write(b"k".as_slice().into(), b"v1".as_slice().into())?; + ingest.write(b"k", b"v1")?; ingest.finish()?; } { let mut ingest = tree.ingestion()?.with_seqno(2); - ingest.write(b"k".as_slice().into(), b"v2".as_slice().into())?; + ingest.write(b"k", b"v2")?; ingest.finish()?; } @@ -107,12 +110,12 @@ fn ingestion_enforces_order_standard_panics() { .unwrap(); let mut ingest = tree.ingestion().unwrap().with_seqno(1); + // First write higher key, then lower to trigger ordering assertion - ingest - .write(b"k2".as_slice().into(), b"v".as_slice().into()) - .unwrap(); + ingest.write(b"k2", b"v").unwrap(); + // Panics here - let _ = ingest.write(b"k1".as_slice().into(), b"v".as_slice().into()); + let _ = ingest.write(b"k1", b"v"); } #[test] @@ -127,16 +130,16 @@ fn blob_ingestion_out_of_order_panics_without_blob_write() -> lsm_tree::Result<( // Use a small value for the first write to avoid blob I/O let result = std::panic::catch_unwind(|| { let mut ingest = tree.ingestion().unwrap().with_seqno(1); - ingest - .write(b"k2".as_slice().into(), b"x".as_slice().into()) - .unwrap(); + ingest.write(b"k2", b"x").unwrap(); + // Second write would require blob I/O, but ordering check should fire before any blob write - let _ = ingest.write(b"k1".as_slice().into(), vec![1u8; 16].into()); + let _ = ingest.write(b"k1", [1u8; 16]); }); assert!(result.is_err()); let after = tree.blob_file_count(); assert_eq!(before, after); + Ok(()) } @@ -149,14 +152,14 @@ fn memtable_put_overrides_table_tombstone() -> lsm_tree::Result<()> { // Older put written via ingestion to tables (seqno 1) { let mut ingest = tree.ingestion()?.with_seqno(1); - ingest.write(b"k".as_slice().into(), b"v1".as_slice().into())?; + ingest.write(b"k", b"v1")?; ingest.finish()?; } // Newer tombstone written via ingestion to tables (seqno 2) { let mut ingest = tree.ingestion()?.with_seqno(2); - ingest.write_tombstone(b"k".as_slice().into())?; + ingest.write_tombstone(b"k")?; ingest.finish()?; } @@ -184,7 +187,7 @@ fn blob_tree_ingestion_tombstones_delete_existing_keys() -> lsm_tree::Result<()> let mut ingest = tree.ingestion()?.with_seqno(10); for i in 0..8u32 { let key = format!("b{:03}", i); - ingest.write_tombstone(key.as_bytes().into())?; + ingest.write_tombstone(key)?; } ingest.finish()?; @@ -229,7 +232,7 @@ fn blob_ingestion_only_tombstones_does_not_create_blob_files() -> lsm_tree::Resu let mut ingest = tree.ingestion()?.with_seqno(10); for i in 0..5u32 { let key = format!("d{:03}", i); - ingest.write_tombstone(key.as_bytes().into())?; + ingest.write_tombstone(key)?; } ingest.finish()?; @@ -274,9 +277,9 @@ fn blob_ingestion_separates_large_values_and_reads_ok() -> lsm_tree::Result<()> .open()?; let mut ingest = tree.ingestion()?.with_seqno(1); - ingest.write("k_big1".as_bytes().into(), vec![1u8; 16].into())?; - ingest.write("k_big2".as_bytes().into(), vec![2u8; 32].into())?; - ingest.write("k_small".as_bytes().into(), b"abc".as_slice().into())?; + ingest.write("k_big1", [1u8; 16])?; + ingest.write("k_big2", [2u8; 32])?; + ingest.write("k_small", "abc")?; ingest.finish()?; assert!(tree.blob_file_count() >= 1); diff --git a/tests/tree_bulk_ingest.rs b/tests/tree_bulk_ingest.rs index 268cbe8c..c989bfc9 100644 --- a/tests/tree_bulk_ingest.rs +++ b/tests/tree_bulk_ingest.rs @@ -18,7 +18,7 @@ fn tree_bulk_ingest() -> lsm_tree::Result<()> { for x in 0..ITEM_COUNT as u64 { let k = x.to_be_bytes(); let v = nanoid::nanoid!(); - ingestion.write(k.into(), v.into())?; + ingestion.write(k, v)?; } ingestion.finish()?; visible_seqno.fetch_max(seq + 1); @@ -54,7 +54,7 @@ fn tree_copy() -> lsm_tree::Result<()> { for x in 0..ITEM_COUNT as u64 { let k = x.to_be_bytes(); let v = nanoid::nanoid!(); - ingestion.write(k.into(), v.into())?; + ingestion.write(k, v)?; } ingestion.finish()?; visible_seqno.fetch_max(seq + 1); @@ -118,7 +118,7 @@ fn blob_tree_bulk_ingest() -> lsm_tree::Result<()> { for x in 0..ITEM_COUNT as u64 { let k = x.to_be_bytes(); let v = nanoid::nanoid!(); - ingestion.write(k.into(), v.into())?; + ingestion.write(k, v)?; } ingestion.finish()?; visible_seqno.fetch_max(seq + 1); @@ -157,7 +157,7 @@ fn blob_tree_copy() -> lsm_tree::Result<()> { for x in 0..ITEM_COUNT as u64 { let k = x.to_be_bytes(); let v = nanoid::nanoid!(); - ingestion.write(k.into(), v.into())?; + ingestion.write(k, v)?; } ingestion.finish()?; visible_seqno.fetch_max(seq + 1); From 0725204ed1dfb67567d0ff290e6194c71573abd8 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Wed, 19 Nov 2025 16:11:57 +0100 Subject: [PATCH 09/17] refactor --- src/tree/ingest.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/tree/ingest.rs b/src/tree/ingest.rs index 4430b926..98171a29 100644 --- a/src/tree/ingest.rs +++ b/src/tree/ingest.rs @@ -152,6 +152,7 @@ impl<'a> Ingestion<'a> { // Remember the last user key to validate the next call's ordering self.last_key = Some(cloned_key); + Ok(()) } @@ -169,16 +170,18 @@ impl<'a> Ingestion<'a> { } let cloned_key = key.clone(); - let res = self.writer.write(crate::InternalValue::from_components( + + self.writer.write(crate::InternalValue::from_components( key, value, self.seqno, crate::ValueType::Value, - )); - if res.is_ok() { - self.last_key = Some(cloned_key); - } - res + ))?; + + // Remember the last user key to validate the next call's ordering + self.last_key = Some(cloned_key); + + Ok(()) } /// Writes a key-value pair. From f06a70ce716e9e416bdef9cd95ba3eba550da0f8 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Wed, 19 Nov 2025 16:28:37 +0100 Subject: [PATCH 10/17] apply pinning on recovery --- src/tree/mod.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/tree/mod.rs b/src/tree/mod.rs index 343ae331..92a185ef 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -900,8 +900,7 @@ impl Tree { let version = Self::recover_levels( &config.path, tree_id, - &config.cache, - &config.descriptor_table, + &config, #[cfg(feature = "metrics")] &metrics, )?; @@ -1000,8 +999,7 @@ impl Tree { fn recover_levels>( tree_path: P, tree_id: TreeId, - cache: &Arc, - descriptor_table: &Arc, + config: &Config, #[cfg(feature = "metrics")] metrics: &Arc, ) -> crate::Result { use crate::{file::fsync_directory, file::TABLES_FOLDER, TableId}; @@ -1088,14 +1086,17 @@ impl Tree { })?; if let Some(&(level_idx, checksum)) = table_map.get(&table_id) { + let pin_filter = config.filter_block_pinning_policy.get(level_idx.into()); + let pin_index = config.index_block_pinning_policy.get(level_idx.into()); + let table = Table::recover( table_file_path, checksum, tree_id, - cache.clone(), - descriptor_table.clone(), - level_idx <= 1, // TODO: look at configuration - level_idx <= 2, // TODO: look at configuration + config.cache.clone(), + config.descriptor_table.clone(), + pin_filter, + pin_index, #[cfg(feature = "metrics")] metrics.clone(), )?; From 976cf87be208ee5ae568929c005e8e42bf92e871 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Thu, 20 Nov 2025 17:56:29 +0100 Subject: [PATCH 11/17] change ingestion flush watermark to 0 --- src/tree/ingest.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tree/ingest.rs b/src/tree/ingest.rs index 98171a29..ec62596d 100644 --- a/src/tree/ingest.rs +++ b/src/tree/ingest.rs @@ -36,7 +36,7 @@ impl<'a> Ingestion<'a> { // path as normal writes: any dirty memtable content is moved into // tables before building new tables from the ingestion stream. // This keeps the lookup path ordered as active > sealed > tables. - tree.flush_active_memtable(SeqNo::MAX)?; + tree.flush_active_memtable(0)?; let folder = tree.config.path.join(crate::file::TABLES_FOLDER); log::debug!("Ingesting into tables in {}", folder.display()); From 7fe024fc639f3090b3b2307b77321a87142920d6 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Thu, 20 Nov 2025 21:26:22 +0100 Subject: [PATCH 12/17] lint --- src/any_tree.rs | 2 +- src/blob_tree/ingest.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/any_tree.rs b/src/any_tree.rs index f780feb6..6cbab047 100644 --- a/src/any_tree.rs +++ b/src/any_tree.rs @@ -23,7 +23,7 @@ pub enum AnyTree { // Keep zero allocations and direct dispatch; boxing introduces heap indirection and `dyn` adds virtual dispatch. // Ingestion calls use `&mut self` in tight loops; the active variant is stable and branch prediction makes the match cheap. // Allowing this lint preserves hot-path performance at the cost of a larger enum size. -#[allow(clippy::large_enum_variant)] +#[expect(clippy::large_enum_variant)] pub enum AnyIngestion<'a> { /// Ingestion for a standard LSM-tree Standard(Ingestion<'a>), diff --git a/src/blob_tree/ingest.rs b/src/blob_tree/ingest.rs index 3b731a22..8d56e009 100644 --- a/src/blob_tree/ingest.rs +++ b/src/blob_tree/ingest.rs @@ -80,7 +80,7 @@ impl<'a> BlobIngestion<'a> { ); } - #[allow(clippy::cast_possible_truncation)] + #[expect(clippy::cast_possible_truncation)] let value_size = value.len() as u32; if value_size >= self.separation_threshold { From 19e08761bb4fd1e991bcbe629cca8581c8f1e14e Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Sat, 22 Nov 2025 16:32:41 +0100 Subject: [PATCH 13/17] feat: table global seqno --- src/blob_tree/ingest.rs | 1 + src/blob_tree/mod.rs | 1 + src/compaction/flavour.rs | 1 + src/range.rs | 20 +++++++++------- src/table/inner.rs | 4 +++- src/table/iter.rs | 50 +++++++++++++++++++++++++++++++++------ src/table/meta.rs | 2 +- src/table/mod.rs | 13 +++++++++- src/table/scanner.rs | 10 ++++++-- src/table/tests.rs | 8 +++++++ src/tree/ingest.rs | 1 + src/tree/mod.rs | 10 +++++--- src/version/mod.rs | 3 ++- src/version/recovery.rs | 8 ++++--- 14 files changed, 104 insertions(+), 28 deletions(-) diff --git a/src/blob_tree/ingest.rs b/src/blob_tree/ingest.rs index 8d56e009..c7fd0be7 100644 --- a/src/blob_tree/ingest.rs +++ b/src/blob_tree/ingest.rs @@ -164,6 +164,7 @@ impl<'a> BlobIngestion<'a> { .join(crate::file::TABLES_FOLDER) .join(table_id.to_string()), checksum, + todo!(), index.id, index.config.cache.clone(), index.config.descriptor_table.clone(), diff --git a/src/blob_tree/mod.rs b/src/blob_tree/mod.rs index c49febee..b58c3ee7 100644 --- a/src/blob_tree/mod.rs +++ b/src/blob_tree/mod.rs @@ -467,6 +467,7 @@ impl AbstractTree for BlobTree { Table::recover( table_folder.join(table_id.to_string()), checksum, + 0, self.index.id, self.index.config.cache.clone(), self.index.config.descriptor_table.clone(), diff --git a/src/compaction/flavour.rs b/src/compaction/flavour.rs index fd135fb3..16be2cfc 100644 --- a/src/compaction/flavour.rs +++ b/src/compaction/flavour.rs @@ -334,6 +334,7 @@ impl StandardCompaction { Table::recover( table_base_folder.join(table_id.to_string()), checksum, + 0, opts.tree_id, opts.config.cache.clone(), opts.config.descriptor_table.clone(), diff --git a/src/range.rs b/src/range.rs index c89f0634..ea9a9402 100644 --- a/src/range.rs +++ b/src/range.rs @@ -163,15 +163,17 @@ impl TreeIter { range.start_bound().map(|x| &*x.user_key), range.end_bound().map(|x| &*x.user_key), )) { - let reader = table.range(( - range.start_bound().map(|x| &x.user_key).cloned(), - range.end_bound().map(|x| &x.user_key).cloned(), - )); - - iters.push(Box::new(reader.filter(move |item| match item { - Ok(item) => seqno_filter(item.key.seqno, seqno), - Err(_) => true, - }))); + let reader = table + .range(( + range.start_bound().map(|x| &x.user_key).cloned(), + range.end_bound().map(|x| &x.user_key).cloned(), + )) + .filter(move |item| match item { + Ok(item) => seqno_filter(item.key.seqno, seqno), + Err(_) => true, + }); + + iters.push(Box::new(reader)); } } _ => { diff --git a/src/table/inner.rs b/src/table/inner.rs index b77c1206..0d7b0acd 100644 --- a/src/table/inner.rs +++ b/src/table/inner.rs @@ -11,7 +11,7 @@ use crate::{ descriptor_table::DescriptorTable, table::{filter::block::FilterBlock, IndexBlock}, tree::inner::TreeId, - Checksum, GlobalTableId, + Checksum, GlobalTableId, SeqNo, }; use std::{ path::PathBuf, @@ -53,6 +53,8 @@ pub struct Inner { pub(super) checksum: Checksum, + pub(super) global_seqno: SeqNo, + #[cfg(feature = "metrics")] pub(crate) metrics: Arc, diff --git a/src/table/iter.rs b/src/table/iter.rs index 4ac587ed..932b7fab 100644 --- a/src/table/iter.rs +++ b/src/table/iter.rs @@ -94,6 +94,8 @@ pub struct Iter { table_id: GlobalTableId, path: Arc, + global_seqno: SeqNo, + #[expect(clippy::struct_field_names)] index_iter: BlockIndexIterImpl, @@ -118,6 +120,7 @@ pub struct Iter { impl Iter { pub fn new( table_id: GlobalTableId, + global_seqno: SeqNo, path: Arc, index_iter: BlockIndexIterImpl, descriptor_table: Arc, @@ -129,6 +132,8 @@ impl Iter { table_id, path, + global_seqno, + index_iter, descriptor_table, cache, @@ -165,7 +170,14 @@ impl Iterator for Iter { // Always try to keep iterating inside the already-materialized low data block first; this // lets callers consume multiple entries without touching the index or cache again. if let Some(block) = &mut self.lo_data_block { - if let Some(item) = block.next().map(Ok) { + if let Some(item) = block + .next() + .map(|mut v| { + v.key.seqno += self.global_seqno; + v + }) + .map(Ok) + { return Some(item); } } @@ -212,7 +224,14 @@ impl Iterator for Iter { // No more block handles coming from the index. Flush any pending items buffered on // the high side (used by reverse iteration) before signalling completion. if let Some(block) = &mut self.hi_data_block { - if let Some(item) = block.next().map(Ok) { + if let Some(item) = block + .next() + .map(|mut v| { + v.key.seqno += self.global_seqno; + v + }) + .map(Ok) + { return Some(item); } } @@ -226,7 +245,6 @@ impl Iterator for Iter { // Load the next data block referenced by the index handle. We try the shared block // cache first to avoid hitting the filesystem, and fall back to `load_block` on miss. - #[expect(clippy::single_match_else)] let block = match self.cache.get_block(self.table_id, handle.offset()) { Some(block) => block, None => { @@ -262,7 +280,9 @@ impl Iterator for Iter { self.lo_offset = handle.offset(); self.lo_data_block = Some(reader); - if let Some(item) = item { + if let Some(mut item) = item { + item.key.seqno += self.global_seqno; + // Serving the first item immediately avoids stashing it in a temporary buffer and // keeps block iteration semantics identical to the simple case at the top. return Some(Ok(item)); @@ -276,7 +296,14 @@ impl DoubleEndedIterator for Iter { // Mirror the forward iterator: prefer consuming buffered items from the high data block to // avoid touching the index once a block has been materialized. if let Some(block) = &mut self.hi_data_block { - if let Some(item) = block.next_back().map(Ok) { + if let Some(item) = block + .next_back() + .map(|mut v| { + v.key.seqno += self.global_seqno; + v + }) + .map(Ok) + { return Some(item); } } @@ -318,7 +345,14 @@ impl DoubleEndedIterator for Iter { // Once we exhaust the index in reverse order, flush any items that were buffered on // the low side (set when iterating forward first) before signalling completion. if let Some(block) = &mut self.lo_data_block { - if let Some(item) = block.next_back().map(Ok) { + if let Some(item) = block + .next_back() + .map(|mut v| { + v.key.seqno += self.global_seqno; + v + }) + .map(Ok) + { return Some(item); } } @@ -367,7 +401,9 @@ impl DoubleEndedIterator for Iter { self.hi_offset = handle.offset(); self.hi_data_block = Some(reader); - if let Some(item) = item { + if let Some(mut item) = item { + item.key.seqno += self.global_seqno; + // Emit the first materialized entry immediately to match the forward path and avoid // storing it in a temporary buffer. return Some(Ok(item)); diff --git a/src/table/meta.rs b/src/table/meta.rs index abfd43fa..18004af0 100644 --- a/src/table/meta.rs +++ b/src/table/meta.rs @@ -38,7 +38,7 @@ pub struct ParsedMeta { pub data_block_count: u64, pub index_block_count: u64, pub key_range: KeyRange, - pub seqnos: (SeqNo, SeqNo), + pub(super) seqnos: (SeqNo, SeqNo), pub file_size: u64, pub item_count: u64, pub tombstone_count: u64, diff --git a/src/table/mod.rs b/src/table/mod.rs index 6800f1dc..dc9a87bd 100644 --- a/src/table/mod.rs +++ b/src/table/mod.rs @@ -96,6 +96,11 @@ impl std::fmt::Debug for Table { } impl Table { + #[must_use] + pub fn global_seqno(&self) -> SeqNo { + self.0.global_seqno + } + pub fn referenced_blob_bytes(&self) -> crate::Result { if let Some(v) = self.0.cached_blob_bytes.get() { return Ok(*v); @@ -239,7 +244,7 @@ impl Table { #[cfg(feature = "metrics")] use std::sync::atomic::Ordering::Relaxed; - if self.metadata.seqnos.0 >= seqno { + if (self.metadata.seqnos.0 + self.global_seqno()) >= seqno { return Ok(None); } @@ -302,6 +307,8 @@ impl Table { return Ok(None); }; + let seqno = seqno.saturating_sub(self.global_seqno()); + for block_handle in iter { let block_handle = block_handle?; @@ -344,6 +351,7 @@ impl Table { &self.path, block_count, self.metadata.data_block_compression, + self.global_seqno(), ) } @@ -373,6 +381,7 @@ impl Table { let mut iter = Iter::new( self.global_id(), + self.global_seqno(), self.path.clone(), index_iter, self.descriptor_table.clone(), @@ -421,6 +430,7 @@ impl Table { pub fn recover( file_path: PathBuf, checksum: Checksum, + global_seqno: SeqNo, tree_id: TreeId, cache: Arc, descriptor_table: Arc, @@ -545,6 +555,7 @@ impl Table { is_deleted: AtomicBool::default(), checksum, + global_seqno, #[cfg(feature = "metrics")] metrics, diff --git a/src/table/scanner.rs b/src/table/scanner.rs index 25f01318..dc46858b 100644 --- a/src/table/scanner.rs +++ b/src/table/scanner.rs @@ -5,7 +5,7 @@ use super::{Block, DataBlock}; use crate::{ table::{block::BlockType, iter::OwnedDataBlockIter}, - CompressionType, InternalValue, + CompressionType, InternalValue, SeqNo, }; use std::{fs::File, io::BufReader, path::Path}; @@ -17,6 +17,8 @@ pub struct Scanner { compression: CompressionType, block_count: usize, read_count: usize, + + global_seqno: SeqNo, } impl Scanner { @@ -24,6 +26,7 @@ impl Scanner { path: &Path, block_count: usize, compression: CompressionType, + global_seqno: SeqNo, ) -> crate::Result { // TODO: a larger buffer size may be better for HDD, maybe make this configurable let mut reader = BufReader::with_capacity(8 * 4_096, File::open(path)?); @@ -38,6 +41,8 @@ impl Scanner { compression, block_count, read_count: 1, + + global_seqno, }) } @@ -68,7 +73,8 @@ impl Iterator for Scanner { fn next(&mut self) -> Option { loop { - if let Some(item) = self.iter.next() { + if let Some(mut item) = self.iter.next() { + item.key.seqno += self.global_seqno; return Some(Ok(item)); } diff --git a/src/table/tests.rs b/src/table/tests.rs index 59b76f4d..31f43272 100644 --- a/src/table/tests.rs +++ b/src/table/tests.rs @@ -49,6 +49,7 @@ fn test_with_table( file.clone(), checksum, 0, + 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Arc::new(DescriptorTable::new(10)), false, @@ -74,6 +75,7 @@ fn test_with_table( file.clone(), checksum, 0, + 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Arc::new(DescriptorTable::new(10)), true, @@ -99,6 +101,7 @@ fn test_with_table( file.clone(), checksum, 0, + 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Arc::new(DescriptorTable::new(10)), false, @@ -124,6 +127,7 @@ fn test_with_table( file.clone(), checksum, 0, + 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Arc::new(DescriptorTable::new(10)), true, @@ -169,6 +173,7 @@ fn test_with_table( file.clone(), checksum, 0, + 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Arc::new(DescriptorTable::new(10)), false, @@ -193,6 +198,7 @@ fn test_with_table( file.clone(), checksum, 0, + 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Arc::new(DescriptorTable::new(10)), true, @@ -217,6 +223,7 @@ fn test_with_table( file.clone(), checksum, 0, + 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Arc::new(DescriptorTable::new(10)), false, @@ -242,6 +249,7 @@ fn test_with_table( file, checksum, 0, + 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Arc::new(DescriptorTable::new(10)), true, diff --git a/src/tree/ingest.rs b/src/tree/ingest.rs index ec62596d..ffda0948 100644 --- a/src/tree/ingest.rs +++ b/src/tree/ingest.rs @@ -239,6 +239,7 @@ impl<'a> Ingestion<'a> { Table::recover( self.folder.join(table_id.to_string()), checksum, + todo!(), self.tree.id, self.tree.config.cache.clone(), self.tree.config.descriptor_table.clone(), diff --git a/src/tree/mod.rs b/src/tree/mod.rs index fe67cfcf..19d50a7c 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -341,6 +341,7 @@ impl AbstractTree for Tree { Table::recover( folder.join(table_id.to_string()), checksum, + 0, self.id, self.config.cache.clone(), self.config.descriptor_table.clone(), @@ -772,6 +773,7 @@ impl Tree { let created_table = Table::recover( table_file_path, checksum, + 0, self.id, self.config.cache.clone(), self.config.descriptor_table.clone(), @@ -1006,12 +1008,12 @@ impl Tree { let recovery = recover(tree_path)?; let table_map = { - let mut result: crate::HashMap = + let mut result: crate::HashMap = crate::HashMap::default(); for (level_idx, table_ids) in recovery.table_ids.iter().enumerate() { for run in table_ids { - for &(table_id, checksum) in run { + for &(table_id, checksum, seqno) in run { #[expect( clippy::expect_used, reason = "there are always less than 256 levels" @@ -1023,6 +1025,7 @@ impl Tree { .try_into() .expect("there are less than 256 levels"), checksum, + seqno, ), ); } @@ -1082,13 +1085,14 @@ impl Tree { crate::Error::Unrecoverable })?; - if let Some(&(level_idx, checksum)) = table_map.get(&table_id) { + if let Some(&(level_idx, checksum, global_seqno)) = table_map.get(&table_id) { let pin_filter = config.filter_block_pinning_policy.get(level_idx.into()); let pin_index = config.index_block_pinning_policy.get(level_idx.into()); let table = Table::recover( table_file_path, checksum, + global_seqno, tree_id, config.cache.clone(), config.descriptor_table.clone(), diff --git a/src/version/mod.rs b/src/version/mod.rs index a46d3758..c51f8e0f 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -249,7 +249,7 @@ impl Version { .map(|run| { let run_tables = run .iter() - .map(|&(table_id, _)| { + .map(|&(table_id, _, _)| { tables .iter() .find(|x| x.id() == table_id) @@ -674,6 +674,7 @@ impl Version { writer.write_u64::(table.id())?; writer.write_u8(0)?; // Checksum type, 0 = XXH3 writer.write_u128::(table.checksum().into_u128())?; + writer.write_u64::(table.global_seqno())?; } } } diff --git a/src/version/recovery.rs b/src/version/recovery.rs index b7a6d7a7..6c051009 100644 --- a/src/version/recovery.rs +++ b/src/version/recovery.rs @@ -4,7 +4,7 @@ use crate::{ coding::Decode, file::CURRENT_VERSION_FILE, version::VersionId, vlog::BlobFileId, Checksum, - TableId, TreeType, + SeqNo, TableId, TreeType, }; use byteorder::{LittleEndian, ReadBytesExt}; use std::path::Path; @@ -20,7 +20,7 @@ pub fn get_current_version(folder: &std::path::Path) -> crate::Result pub struct Recovery { pub tree_type: TreeType, pub curr_version_id: VersionId, - pub table_ids: Vec>>, + pub table_ids: Vec>>, pub blob_file_ids: Vec<(BlobFileId, Checksum)>, pub gc_stats: crate::blob_tree::FragmentationMap, } @@ -70,7 +70,9 @@ pub fn recover(folder: &Path) -> crate::Result { let checksum = reader.read_u128::()?; let checksum = Checksum::from_raw(checksum); - run.push((id, checksum)); + let global_seqno = reader.read_u64::()?; + + run.push((id, checksum, global_seqno)); } level.push(run); From bc0f85f761602f8019f7e53e04ad89dbefdf695a Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Sat, 22 Nov 2025 16:36:30 +0100 Subject: [PATCH 14/17] refactor --- src/tree/mod.rs | 8 ++++---- src/version/mod.rs | 4 ++-- src/version/recovery.rs | 14 ++++++++++++-- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/src/tree/mod.rs b/src/tree/mod.rs index 19d50a7c..aec50a36 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -1013,19 +1013,19 @@ impl Tree { for (level_idx, table_ids) in recovery.table_ids.iter().enumerate() { for run in table_ids { - for &(table_id, checksum, seqno) in run { + for table in run { #[expect( clippy::expect_used, reason = "there are always less than 256 levels" )] result.insert( - table_id, + table.id, ( level_idx .try_into() .expect("there are less than 256 levels"), - checksum, - seqno, + table.checksum, + table.global_seqno, ), ); } diff --git a/src/version/mod.rs b/src/version/mod.rs index c51f8e0f..f04917f2 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -249,10 +249,10 @@ impl Version { .map(|run| { let run_tables = run .iter() - .map(|&(table_id, _, _)| { + .map(|table| { tables .iter() - .find(|x| x.id() == table_id) + .find(|x| x.id() == table.id) .cloned() .ok_or(crate::Error::Unrecoverable) }) diff --git a/src/version/recovery.rs b/src/version/recovery.rs index 6c051009..d8d7bf69 100644 --- a/src/version/recovery.rs +++ b/src/version/recovery.rs @@ -17,10 +17,16 @@ pub fn get_current_version(folder: &std::path::Path) -> crate::Result .map_err(Into::into) } +pub struct RecoveredTable { + pub id: TableId, + pub checksum: Checksum, + pub global_seqno: SeqNo, +} + pub struct Recovery { pub tree_type: TreeType, pub curr_version_id: VersionId, - pub table_ids: Vec>>, + pub table_ids: Vec>>, pub blob_file_ids: Vec<(BlobFileId, Checksum)>, pub gc_stats: crate::blob_tree::FragmentationMap, } @@ -72,7 +78,11 @@ pub fn recover(folder: &Path) -> crate::Result { let global_seqno = reader.read_u64::()?; - run.push((id, checksum, global_seqno)); + run.push(RecoveredTable { + id, + checksum, + global_seqno, + }); } level.push(run); From 0be5b17d5064dbf0cc8f8d5931a5a0280ee0a950 Mon Sep 17 00:00:00 2001 From: zaidoon Date: Sat, 22 Nov 2025 13:53:47 -0500 Subject: [PATCH 15/17] implement atomic flush and global_seqno coordination --- src/blob_tree/ingest.rs | 99 +++++++++++++++++++++++++++++++++++------ src/tree/ingest.rs | 96 +++++++++++++++++++++++++++++---------- 2 files changed, 158 insertions(+), 37 deletions(-) diff --git a/src/blob_tree/ingest.rs b/src/blob_tree/ingest.rs index c7fd0be7..0f3a9b79 100644 --- a/src/blob_tree/ingest.rs +++ b/src/blob_tree/ingest.rs @@ -140,23 +140,61 @@ impl<'a> BlobIngestion<'a> { /// /// Will return `Err` if an IO error occurs. pub fn finish(self) -> crate::Result<()> { - use crate::AbstractTree; + use crate::{version::persist_version, AbstractTree}; - // Capture required handles before consuming fields during finalization let index = self.index().clone(); - let tree = self.tree.clone(); - // Finalize both value log and index writer so the index sees a - // consistent set of blob files. + // CRITICAL SECTION: Atomic flush + seqno allocation + registration + // + // For BlobTree, we must coordinate THREE components atomically: + // 1. Index tree memtable flush + // 2. Value log blob files + // 3. Index tree tables (with blob indirections) + // + // The sequence ensures all components see the same global_seqno: + // 1. Acquire flush lock on index tree + // 2. Flush index tree active memtable + // 3. Finalize blob writer (creates blob files) + // 4. Finalize table writer (creates index tables) + // 5. Allocate next global seqno + // 6. Recover tables with that seqno + // 7. Register version with same seqno + blob files + // + // This prevents race conditions where blob files and their index + // entries could have mismatched sequence numbers. + let flush_lock = index.get_flush_lock(); + + // Flush any pending index memtable writes to ensure ingestion sees + // a consistent snapshot of the index. + // We call rotate + flush directly because we already hold the lock. + index.rotate_memtable(); + index.flush(&flush_lock, 0)?; + + // Finalize the blob writer first, ensuring all large values are + // written to blob files before we finalize the index tables that + // reference them. let blob_files = self.blob.finish()?; + + // Finalize the table writer, creating index tables with blob + // indirections pointing to the blob files we just created. let results = self.table.writer.finish()?; + // Allocate the next global sequence number. This seqno will be shared + // by all ingested tables, blob files, and the version that registers + // them, ensuring consistent MVCC snapshots across the value log. + let global_seqno = index.config.seqno.next(); + + // Recover all created index tables, assigning them the global_seqno + // we just allocated. These tables contain indirections to the blob + // files created above, so they must share the same sequence number + // for MVCC correctness. + // + // We intentionally do NOT pin filter/index blocks here. Large ingests + // are typically placed in level 1, and pinning would increase memory + // pressure unnecessarily. let created_tables = results .into_iter() .map(|(table_id, checksum)| -> crate::Result
{ - // Do not pin ingestion output tables here. Large ingests are - // typically placed in level 1 and would otherwise keep all - // filter and index blocks pinned, increasing memory pressure. Table::recover( index .config @@ -164,7 +202,7 @@ impl<'a> BlobIngestion<'a> { .join(crate::file::TABLES_FOLDER) .join(table_id.to_string()), checksum, - todo!(), + global_seqno, index.id, index.config.cache.clone(), index.config.descriptor_table.clone(), @@ -176,10 +214,45 @@ impl<'a> BlobIngestion<'a> { }) .collect::>>()?; - // Blob ingestion only appends new tables and blob files; sealed - // memtables remain unchanged and GC watermark stays at its - // neutral value for this operation. - tree.register_tables(&created_tables, Some(&blob_files), None, &[], 0)?; + // Acquire locks for version registration on the index tree. We must + // hold both the compaction state lock and version history lock to + // safely modify the tree's version. + let mut _compaction_state = index.compaction_state.lock().expect("lock is poisoned"); + let mut version_lock = index.version_history.write().expect("lock is poisoned"); + + // Create the next version by adding both: + // - Index tables as a new L0 run + // - Blob files to the value log + // + // We manually set the seqno to the global_seqno we allocated earlier, + // ensuring the version, tables, and blob files all share the same + // sequence number. This is critical for GC correctness - we must not + // delete blob files that are still referenced by visible snapshots. + // + // Why not use register_tables()? + // register_tables() calls upgrade_version(), which would allocate a + // DIFFERENT seqno via seqno.next(). We need to use the SAME seqno we + // already allocated and assigned to the tables. + let mut next_version = { + let current = version_lock.latest_version(); + let mut copy = current.clone(); + copy.version = copy + .version + .with_new_l0_run(&created_tables, Some(&blob_files), None); + copy + }; + + next_version.seqno = global_seqno; + + // Persist the new version to disk and append it to the version history. + persist_version(&index.config.path, &next_version.version)?; + version_lock.append_version(next_version); + + // Perform maintenance on the version history (e.g., clean up old versions). + // We use gc_watermark=0 since ingestion doesn't affect sealed memtables. + if let Err(e) = version_lock.maintenance(&index.config.path, 0) { + log::warn!("Version GC failed: {e:?}"); + } Ok(()) } diff --git a/src/tree/ingest.rs b/src/tree/ingest.rs index ffda0948..3193af73 100644 --- a/src/tree/ingest.rs +++ b/src/tree/ingest.rs @@ -32,12 +32,6 @@ impl<'a> Ingestion<'a> { /// /// Will return `Err` if an IO error occurs. pub fn new(tree: &'a Tree) -> crate::Result { - // Use the shared flush helper so ingestion participates in the same - // path as normal writes: any dirty memtable content is moved into - // tables before building new tables from the ingestion stream. - // This keeps the lookup path ordered as active > sealed > tables. - tree.flush_active_memtable(0)?; - let folder = tree.config.path.join(crate::file::TABLES_FOLDER); log::debug!("Ingesting into tables in {}", folder.display()); @@ -216,30 +210,57 @@ impl<'a> Ingestion<'a> { /// /// Will return `Err` if an IO error occurs. pub fn finish(self) -> crate::Result<()> { - use crate::Table; + use crate::{version::persist_version, AbstractTree, Table}; + + // CRITICAL SECTION: Atomic flush + seqno allocation + registration + // + // We must ensure no concurrent writes interfere between flushing the + // active memtable and registering the ingested tables. The sequence is: + // 1. Acquire flush lock (prevents concurrent flushes) + // 2. Flush active memtable (ensures no pending writes) + // 3. Finish ingestion writer (creates table files) + // 4. Allocate next global seqno (atomic timestamp) + // 5. Recover tables with that seqno + // 6. Register version with same seqno + // + // Why not flush in new()? + // If we flushed in new(), there would be a race condition: + // new() -> flush -> [TIME PASSES + OTHER WRITES] -> finish() -> seqno + // The seqno would be disconnected from the flush, violating MVCC. + // + // By holding the flush lock throughout, we guarantee atomicity. + let flush_lock = self.tree.get_flush_lock(); + + // Flush any pending memtable writes to ensure ingestion sees a + // consistent snapshot and lookup order remains correct. + // We call rotate + flush directly because we already hold the lock. + self.tree.rotate_memtable(); + self.tree.flush(&flush_lock, 0)?; + // Finalize the ingestion writer, writing all buffered data to disk. let results = self.writer.finish()?; log::info!("Finished ingestion writer"); - // Turn the writer output into fully recovered tables that can be - // registered as a fresh L0 run. + // Allocate the next global sequence number. This seqno will be shared + // by all ingested tables and the version that registers them, ensuring + // consistent MVCC snapshots. + let global_seqno = self.tree.config.seqno.next(); + + // Recover all created tables, assigning them the global_seqno we just + // allocated. This ensures all ingested tables share the same sequence + // number, which is critical for MVCC correctness. + // + // We intentionally do NOT pin filter/index blocks here. Large ingests + // are typically placed in level 1, and pinning would increase memory + // pressure unnecessarily. let created_tables = results .into_iter() .map(|(table_id, checksum)| -> crate::Result
{ - // TODO: table recoverer struct w/ builder pattern - // Table::recover() - // .pin_filters(true) - // .with_metrics(metrics) - // .run(path, tree_id, cache, descriptor_table); - - // Do not pin ingestion output tables here. Large ingests are - // typically placed in level 1 and would otherwise keep all - // filter and index blocks pinned, increasing memory pressure. Table::recover( self.folder.join(table_id.to_string()), checksum, - todo!(), + global_seqno, self.tree.id, self.tree.config.cache.clone(), self.tree.config.descriptor_table.clone(), @@ -251,11 +272,38 @@ impl<'a> Ingestion<'a> { }) .collect::>>()?; - // Ingestion produces new tables only and does not touch sealed - // memtables directly, so the deletion set is empty and the - // watermark is left at its neutral value. - self.tree - .register_tables(&created_tables, None, None, &[], 0)?; + // Acquire locks for version registration. We must hold both the + // compaction state lock and version history lock to safely modify + // the tree's version. + let mut _compaction_state = self.tree.compaction_state.lock().expect("lock is poisoned"); + let mut version_lock = self.tree.version_history.write().expect("lock is poisoned"); + + // Create the next version by adding our ingested tables as a new L0 run. + // We manually set the seqno to the global_seqno we allocated earlier, + // ensuring the version and tables share the same sequence number. + // + // Why not use register_tables()? + // register_tables() calls upgrade_version(), which would allocate a + // DIFFERENT seqno via seqno.next(). We need to use the SAME seqno we + // already allocated and assigned to the tables. + let mut next_version = { + let current = version_lock.latest_version(); + let mut copy = current.clone(); + copy.version = copy.version.with_new_l0_run(&created_tables, None, None); + copy + }; + + next_version.seqno = global_seqno; + + // Persist the new version to disk and append it to the version history. + persist_version(&self.tree.config.path, &next_version.version)?; + version_lock.append_version(next_version); + + // Perform maintenance on the version history (e.g., clean up old versions). + // We use gc_watermark=0 since ingestion doesn't affect sealed memtables. + if let Err(e) = version_lock.maintenance(&self.tree.config.path, 0) { + log::warn!("Version GC failed: {e:?}"); + } Ok(()) } From f69061a9687e5372ed806910a2b8d013ad44764e Mon Sep 17 00:00:00 2001 From: zaidoon Date: Sat, 22 Nov 2025 15:29:19 -0500 Subject: [PATCH 16/17] refactor ingestion to use upgrade_version_with_seqno for explicit seqno control --- src/blob_tree/ingest.rs | 47 +++++++++++++++--------------------- src/tree/ingest.rs | 36 ++++++++++++--------------- src/version/super_version.rs | 23 ++++++++++++------ 3 files changed, 52 insertions(+), 54 deletions(-) diff --git a/src/blob_tree/ingest.rs b/src/blob_tree/ingest.rs index 0f3a9b79..64550a75 100644 --- a/src/blob_tree/ingest.rs +++ b/src/blob_tree/ingest.rs @@ -140,7 +140,7 @@ impl<'a> BlobIngestion<'a> { /// /// Will return `Err` if an IO error occurs. pub fn finish(self) -> crate::Result<()> { - use crate::{version::persist_version, AbstractTree}; + use crate::AbstractTree; let index = self.index().clone(); @@ -220,33 +220,26 @@ impl<'a> BlobIngestion<'a> { let mut _compaction_state = index.compaction_state.lock().expect("lock is poisoned"); let mut version_lock = index.version_history.write().expect("lock is poisoned"); - // Create the next version by adding both: - // - Index tables as a new L0 run - // - Blob files to the value log + // Upgrade the version with our ingested tables and blob files, using + // the global_seqno we allocated earlier. This ensures the version, + // tables, and blob files all share the same sequence number, which is + // critical for GC correctness - we must not delete blob files that are + // still referenced by visible snapshots. // - // We manually set the seqno to the global_seqno we allocated earlier, - // ensuring the version, tables, and blob files all share the same - // sequence number. This is critical for GC correctness - we must not - // delete blob files that are still referenced by visible snapshots. - // - // Why not use register_tables()? - // register_tables() calls upgrade_version(), which would allocate a - // DIFFERENT seqno via seqno.next(). We need to use the SAME seqno we - // already allocated and assigned to the tables. - let mut next_version = { - let current = version_lock.latest_version(); - let mut copy = current.clone(); - copy.version = copy - .version - .with_new_l0_run(&created_tables, Some(&blob_files), None); - copy - }; - - next_version.seqno = global_seqno; - - // Persist the new version to disk and append it to the version history. - persist_version(&index.config.path, &next_version.version)?; - version_lock.append_version(next_version); + // We use upgrade_version_with_seqno (instead of upgrade_version) because + // we need precise control over the seqno: it must match the seqno we + // already assigned to the recovered tables. + version_lock.upgrade_version_with_seqno( + &index.config.path, + |current| { + let mut copy = current.clone(); + copy.version = + copy.version + .with_new_l0_run(&created_tables, Some(&blob_files), None); + Ok(copy) + }, + global_seqno, + )?; // Perform maintenance on the version history (e.g., clean up old versions). // We use gc_watermark=0 since ingestion doesn't affect sealed memtables. diff --git a/src/tree/ingest.rs b/src/tree/ingest.rs index 3193af73..968bd461 100644 --- a/src/tree/ingest.rs +++ b/src/tree/ingest.rs @@ -210,7 +210,7 @@ impl<'a> Ingestion<'a> { /// /// Will return `Err` if an IO error occurs. pub fn finish(self) -> crate::Result<()> { - use crate::{version::persist_version, AbstractTree, Table}; + use crate::{AbstractTree, Table}; // CRITICAL SECTION: Atomic flush + seqno allocation + registration // @@ -278,26 +278,22 @@ impl<'a> Ingestion<'a> { let mut _compaction_state = self.tree.compaction_state.lock().expect("lock is poisoned"); let mut version_lock = self.tree.version_history.write().expect("lock is poisoned"); - // Create the next version by adding our ingested tables as a new L0 run. - // We manually set the seqno to the global_seqno we allocated earlier, - // ensuring the version and tables share the same sequence number. + // Upgrade the version with our ingested tables, using the global_seqno + // we allocated earlier. This ensures the version and all tables share + // the same sequence number. // - // Why not use register_tables()? - // register_tables() calls upgrade_version(), which would allocate a - // DIFFERENT seqno via seqno.next(). We need to use the SAME seqno we - // already allocated and assigned to the tables. - let mut next_version = { - let current = version_lock.latest_version(); - let mut copy = current.clone(); - copy.version = copy.version.with_new_l0_run(&created_tables, None, None); - copy - }; - - next_version.seqno = global_seqno; - - // Persist the new version to disk and append it to the version history. - persist_version(&self.tree.config.path, &next_version.version)?; - version_lock.append_version(next_version); + // We use upgrade_version_with_seqno (instead of upgrade_version) because + // we need precise control over the seqno: it must match the seqno we + // already assigned to the recovered tables. + version_lock.upgrade_version_with_seqno( + &self.tree.config.path, + |current| { + let mut copy = current.clone(); + copy.version = copy.version.with_new_l0_run(&created_tables, None, None); + Ok(copy) + }, + global_seqno, + )?; // Perform maintenance on the version history (e.g., clean up old versions). // We use gc_watermark=0 since ingestion doesn't affect sealed memtables. diff --git a/src/version/super_version.rs b/src/version/super_version.rs index 5d64bf8b..d6ffa0f6 100644 --- a/src/version/super_version.rs +++ b/src/version/super_version.rs @@ -111,14 +111,23 @@ impl SuperVersions { f: F, seqno: &SequenceNumberCounter, ) -> crate::Result<()> { - // NOTE: Copy-on-write... - // - // Create a copy of the levels we can operate on - // without mutating the current level manifest - // If persisting to disk fails, this way the level manifest - // is unchanged + self.upgrade_version_with_seqno(tree_path, f, seqno.next()) + } + + /// Like `upgrade_version`, but takes an already-allocated sequence number. + /// + /// This is useful when the seqno must be coordinated with other operations + /// (e.g., bulk ingestion where tables are recovered with the same seqno). + pub(crate) fn upgrade_version_with_seqno< + F: FnOnce(&SuperVersion) -> crate::Result, + >( + &mut self, + tree_path: &Path, + f: F, + seqno: SeqNo, + ) -> crate::Result<()> { let mut next_version = f(&self.latest_version())?; - next_version.seqno = seqno.next(); + next_version.seqno = seqno; log::trace!("Next version seqno={}", next_version.seqno); persist_version(tree_path, &next_version.version)?; From 51e0fdc5a72818865e19c6d981baeff8e16ce912 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Sun, 23 Nov 2025 18:22:09 +0100 Subject: [PATCH 17/17] refactor --- src/any_tree.rs | 77 ---------------------------------- src/blob_tree/ingest.rs | 29 ++++++------- src/ingestion.rs | 76 +++++++++++++++++++++++++++++++++ src/lib.rs | 6 +-- src/tree/ingest.rs | 24 ++++------- tests/ingest_dirty_snapshot.rs | 2 +- tests/ingestion_api.rs | 24 +++++------ tests/ingestion_invariants.rs | 8 ++-- tests/tree_bulk_ingest.rs | 24 +++-------- 9 files changed, 124 insertions(+), 146 deletions(-) create mode 100644 src/ingestion.rs diff --git a/src/any_tree.rs b/src/any_tree.rs index 6cbab047..46084bc6 100644 --- a/src/any_tree.rs +++ b/src/any_tree.rs @@ -18,80 +18,3 @@ pub enum AnyTree { /// Key-value separated LSM-tree, see [`BlobTree`] Blob(BlobTree), } - -/// Unified ingestion builder over `AnyTree` -// Keep zero allocations and direct dispatch; boxing introduces heap indirection and `dyn` adds virtual dispatch. -// Ingestion calls use `&mut self` in tight loops; the active variant is stable and branch prediction makes the match cheap. -// Allowing this lint preserves hot-path performance at the cost of a larger enum size. -#[expect(clippy::large_enum_variant)] -pub enum AnyIngestion<'a> { - /// Ingestion for a standard LSM-tree - Standard(Ingestion<'a>), - /// Ingestion for a [`BlobTree`] with KV separation - Blob(BlobIngestion<'a>), -} - -impl AnyIngestion<'_> { - #[must_use] - /// Sets the sequence number used for subsequent writes - pub fn with_seqno(self, seqno: SeqNo) -> Self { - match self { - Self::Standard(i) => Self::Standard(i.with_seqno(seqno)), - Self::Blob(b) => Self::Blob(b.with_seqno(seqno)), - } - } - - /// Writes a key-value pair. - /// - /// # Errors - /// - /// Will return `Err` if an IO error occurs. - pub fn write, V: Into>( - &mut self, - key: K, - value: V, - ) -> crate::Result<()> { - match self { - Self::Standard(i) => i.write(key.into(), value.into()), - Self::Blob(b) => b.write(key.into(), value.into()), - } - } - - /// Writes a tombstone for a key. - /// - /// # Errors - /// - /// Will return `Err` if an IO error occurs. - pub fn write_tombstone>(&mut self, key: K) -> crate::Result<()> { - match self { - Self::Standard(i) => i.write_tombstone(key.into()), - Self::Blob(b) => b.write_tombstone(key.into()), - } - } - - /// Finalizes ingestion and registers created tables (and blob files if present). - /// - /// # Errors - /// - /// Will return `Err` if an IO error occurs. - pub fn finish(self) -> crate::Result<()> { - match self { - Self::Standard(i) => i.finish(), - Self::Blob(b) => b.finish(), - } - } -} - -impl AnyTree { - /// Starts an ingestion for any tree type (standard or blob). - /// - /// # Errors - /// - /// Will return `Err` if an IO error occurs. - pub fn ingestion(&self) -> crate::Result> { - match self { - Self::Standard(t) => Ok(AnyIngestion::Standard(Ingestion::new(t)?)), - Self::Blob(b) => Ok(AnyIngestion::Blob(BlobIngestion::new(b)?)), - } - } -} diff --git a/src/blob_tree/ingest.rs b/src/blob_tree/ingest.rs index 64550a75..30189daa 100644 --- a/src/blob_tree/ingest.rs +++ b/src/blob_tree/ingest.rs @@ -1,3 +1,7 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + use crate::{ blob_tree::handle::BlobIndirection, file::BLOBS_FOLDER, @@ -58,14 +62,6 @@ impl<'a> BlobIngestion<'a> { }) } - /// Sets the ingestion seqno. - #[must_use] - pub fn with_seqno(mut self, seqno: SeqNo) -> Self { - self.seqno = seqno; - self.table = self.table.with_seqno(seqno); - self - } - /// Writes a key-value pair. /// /// # Errors @@ -139,7 +135,8 @@ impl<'a> BlobIngestion<'a> { /// # Errors /// /// Will return `Err` if an IO error occurs. - pub fn finish(self) -> crate::Result<()> { + #[allow(clippy::significant_drop_tightening)] + pub fn finish(self) -> crate::Result { use crate::AbstractTree; let index = self.index().clone(); @@ -179,6 +176,12 @@ impl<'a> BlobIngestion<'a> { // indirections pointing to the blob files we just created. let results = self.table.writer.finish()?; + // Acquire locks for version registration on the index tree. We must + // hold both the compaction state lock and version history lock to + // safely modify the tree's version. + let mut _compaction_state = index.compaction_state.lock().expect("lock is poisoned"); + let mut version_lock = index.version_history.write().expect("lock is poisoned"); + // Allocate the next global sequence number. This seqno will be shared // by all ingested tables, blob files, and the version that registers // them, ensuring consistent MVCC snapshots across the value log. @@ -214,12 +217,6 @@ impl<'a> BlobIngestion<'a> { }) .collect::>>()?; - // Acquire locks for version registration on the index tree. We must - // hold both the compaction state lock and version history lock to - // safely modify the tree's version. - let mut _compaction_state = index.compaction_state.lock().expect("lock is poisoned"); - let mut version_lock = index.version_history.write().expect("lock is poisoned"); - // Upgrade the version with our ingested tables and blob files, using // the global_seqno we allocated earlier. This ensures the version, // tables, and blob files all share the same sequence number, which is @@ -247,7 +244,7 @@ impl<'a> BlobIngestion<'a> { log::warn!("Version GC failed: {e:?}"); } - Ok(()) + Ok(global_seqno) } #[inline] diff --git a/src/ingestion.rs b/src/ingestion.rs new file mode 100644 index 00000000..3740a446 --- /dev/null +++ b/src/ingestion.rs @@ -0,0 +1,76 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + +use crate::{ + blob_tree::ingest::BlobIngestion, tree::ingest::Ingestion, AnyTree, SeqNo, UserKey, UserValue, +}; + +/// Unified ingestion builder over `AnyTree` +// Keep zero allocations and direct dispatch; boxing introduces heap indirection and `dyn` adds virtual dispatch. +// Ingestion calls use `&mut self` in tight loops; the active variant is stable and branch prediction makes the match cheap. +// Allowing this lint preserves hot-path performance at the cost of a larger enum size. +#[expect(clippy::large_enum_variant)] +pub enum AnyIngestion<'a> { + /// Ingestion for a standard LSM-tree + Standard(Ingestion<'a>), + + /// Ingestion for a [`BlobTree`] with KV separation + Blob(BlobIngestion<'a>), +} + +impl AnyIngestion<'_> { + /// Writes a key-value pair. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn write, V: Into>( + &mut self, + key: K, + value: V, + ) -> crate::Result<()> { + match self { + Self::Standard(i) => i.write(key.into(), value.into()), + Self::Blob(b) => b.write(key.into(), value.into()), + } + } + + /// Writes a tombstone for a key. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn write_tombstone>(&mut self, key: K) -> crate::Result<()> { + match self { + Self::Standard(i) => i.write_tombstone(key.into()), + Self::Blob(b) => b.write_tombstone(key.into()), + } + } + + /// Finalizes ingestion and registers created tables (and blob files if present). + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn finish(self) -> crate::Result { + match self { + Self::Standard(i) => i.finish(), + Self::Blob(b) => b.finish(), + } + } +} + +impl AnyTree { + /// Starts an ingestion for any tree type (standard or blob). + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn ingestion(&self) -> crate::Result> { + match self { + Self::Standard(t) => Ok(AnyIngestion::Standard(Ingestion::new(t)?)), + Self::Blob(b) => Ok(AnyIngestion::Blob(BlobIngestion::new(b)?)), + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 629c2b2b..89c37944 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -100,6 +100,7 @@ mod error; pub mod file; mod hash; +mod ingestion; mod iter_guard; mod key; mod key_range; @@ -170,8 +171,7 @@ pub use { }; pub use { - any_tree::{AnyIngestion, AnyTree}, - blob_tree::ingest::BlobIngestion, + any_tree::AnyTree, blob_tree::BlobTree, cache::Cache, compression::CompressionType, @@ -179,12 +179,12 @@ pub use { descriptor_table::DescriptorTable, error::{Error, Result}, format_version::FormatVersion, + ingestion::AnyIngestion, iter_guard::IterGuard as Guard, memtable::Memtable, r#abstract::AbstractTree, seqno::SequenceNumberCounter, slice::Slice, - tree::ingest::Ingestion, tree::Tree, value::SeqNo, value_type::ValueType, diff --git a/src/tree/ingest.rs b/src/tree/ingest.rs index 968bd461..48f427a0 100644 --- a/src/tree/ingest.rs +++ b/src/tree/ingest.rs @@ -108,13 +108,6 @@ impl<'a> Ingestion<'a> { }) } - /// Sets the ingestion seqno. - #[must_use] - pub fn with_seqno(mut self, seqno: SeqNo) -> Self { - self.seqno = seqno; - self - } - /// Writes a key-value pair. /// /// # Errors @@ -209,7 +202,8 @@ impl<'a> Ingestion<'a> { /// # Errors /// /// Will return `Err` if an IO error occurs. - pub fn finish(self) -> crate::Result<()> { + #[allow(clippy::significant_drop_tightening)] + pub fn finish(self) -> crate::Result { use crate::{AbstractTree, Table}; // CRITICAL SECTION: Atomic flush + seqno allocation + registration @@ -242,6 +236,12 @@ impl<'a> Ingestion<'a> { log::info!("Finished ingestion writer"); + // Acquire locks for version registration. We must hold both the + // compaction state lock and version history lock to safely modify + // the tree's version. + let mut _compaction_state = self.tree.compaction_state.lock().expect("lock is poisoned"); + let mut version_lock = self.tree.version_history.write().expect("lock is poisoned"); + // Allocate the next global sequence number. This seqno will be shared // by all ingested tables and the version that registers them, ensuring // consistent MVCC snapshots. @@ -272,12 +272,6 @@ impl<'a> Ingestion<'a> { }) .collect::>>()?; - // Acquire locks for version registration. We must hold both the - // compaction state lock and version history lock to safely modify - // the tree's version. - let mut _compaction_state = self.tree.compaction_state.lock().expect("lock is poisoned"); - let mut version_lock = self.tree.version_history.write().expect("lock is poisoned"); - // Upgrade the version with our ingested tables, using the global_seqno // we allocated earlier. This ensures the version and all tables share // the same sequence number. @@ -301,6 +295,6 @@ impl<'a> Ingestion<'a> { log::warn!("Version GC failed: {e:?}"); } - Ok(()) + Ok(global_seqno) } } diff --git a/tests/ingest_dirty_snapshot.rs b/tests/ingest_dirty_snapshot.rs index 4ddfb92f..043b3650 100644 --- a/tests/ingest_dirty_snapshot.rs +++ b/tests/ingest_dirty_snapshot.rs @@ -13,7 +13,7 @@ fn ingestion_dirty_snapshot() -> lsm_tree::Result<()> { let snapshot_seqno = 1; assert_eq!(b"a", &*tree.get("a", snapshot_seqno)?.unwrap()); - let mut ingest = tree.ingestion()?.with_seqno(seqno.next()); + let mut ingest = tree.ingestion()?; ingest.write("b", "b")?; ingest.finish()?; diff --git a/tests/ingestion_api.rs b/tests/ingestion_api.rs index 97f5164f..75551bea 100644 --- a/tests/ingestion_api.rs +++ b/tests/ingestion_api.rs @@ -10,7 +10,7 @@ fn tree_ingestion_tombstones_delete_existing_keys() -> lsm_tree::Result<()> { tree.insert(key.as_bytes(), b"v", 0); } - let mut ingest = tree.ingestion()?.with_seqno(10); + let mut ingest = tree.ingestion()?; for i in 0..10u32 { let key = format!("k{:03}", i); ingest.write_tombstone(key)?; @@ -34,7 +34,7 @@ fn sealed_memtable_value_overrides_table_value() -> lsm_tree::Result<()> { // Older table value via ingestion (seqno 1) { - let mut ingest = tree.ingestion()?.with_seqno(1); + let mut ingest = tree.ingestion()?; ingest.write(b"k", b"old")?; ingest.finish()?; } @@ -60,7 +60,7 @@ fn sealed_memtable_tombstone_overrides_table_value() -> lsm_tree::Result<()> { // Older table value via ingestion (seqno 1) { - let mut ingest = tree.ingestion()?.with_seqno(1); + let mut ingest = tree.ingestion()?; ingest.write(b"k", b"old")?; ingest.finish()?; } @@ -83,12 +83,12 @@ fn tables_newest_first_returns_highest_seqno() -> lsm_tree::Result<()> { // Two separate ingestions create two tables containing the same key at different seqnos { - let mut ingest = tree.ingestion()?.with_seqno(1); + let mut ingest = tree.ingestion()?; ingest.write(b"k", b"v1")?; ingest.finish()?; } { - let mut ingest = tree.ingestion()?.with_seqno(2); + let mut ingest = tree.ingestion()?; ingest.write(b"k", b"v2")?; ingest.finish()?; } @@ -109,7 +109,7 @@ fn ingestion_enforces_order_standard_panics() { .open() .unwrap(); - let mut ingest = tree.ingestion().unwrap().with_seqno(1); + let mut ingest = tree.ingestion().unwrap(); // First write higher key, then lower to trigger ordering assertion ingest.write(b"k2", b"v").unwrap(); @@ -129,7 +129,7 @@ fn blob_ingestion_out_of_order_panics_without_blob_write() -> lsm_tree::Result<( // Use a small value for the first write to avoid blob I/O let result = std::panic::catch_unwind(|| { - let mut ingest = tree.ingestion().unwrap().with_seqno(1); + let mut ingest = tree.ingestion().unwrap(); ingest.write(b"k2", b"x").unwrap(); // Second write would require blob I/O, but ordering check should fire before any blob write @@ -151,14 +151,14 @@ fn memtable_put_overrides_table_tombstone() -> lsm_tree::Result<()> { // Older put written via ingestion to tables (seqno 1) { - let mut ingest = tree.ingestion()?.with_seqno(1); + let mut ingest = tree.ingestion()?; ingest.write(b"k", b"v1")?; ingest.finish()?; } // Newer tombstone written via ingestion to tables (seqno 2) { - let mut ingest = tree.ingestion()?.with_seqno(2); + let mut ingest = tree.ingestion()?; ingest.write_tombstone(b"k")?; ingest.finish()?; } @@ -184,7 +184,7 @@ fn blob_tree_ingestion_tombstones_delete_existing_keys() -> lsm_tree::Result<()> tree.insert(key.as_bytes(), b"x", 0); } - let mut ingest = tree.ingestion()?.with_seqno(10); + let mut ingest = tree.ingestion()?; for i in 0..8u32 { let key = format!("b{:03}", i); ingest.write_tombstone(key)?; @@ -229,7 +229,7 @@ fn blob_ingestion_only_tombstones_does_not_create_blob_files() -> lsm_tree::Resu let before_blobs = tree.blob_file_count(); - let mut ingest = tree.ingestion()?.with_seqno(10); + let mut ingest = tree.ingestion()?; for i in 0..5u32 { let key = format!("d{:03}", i); ingest.write_tombstone(key)?; @@ -276,7 +276,7 @@ fn blob_ingestion_separates_large_values_and_reads_ok() -> lsm_tree::Result<()> .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(8))) .open()?; - let mut ingest = tree.ingestion()?.with_seqno(1); + let mut ingest = tree.ingestion()?; ingest.write("k_big1", [1u8; 16])?; ingest.write("k_big2", [2u8; 32])?; ingest.write("k_small", "abc")?; diff --git a/tests/ingestion_invariants.rs b/tests/ingestion_invariants.rs index 7061fa31..ce9dec65 100644 --- a/tests/ingestion_invariants.rs +++ b/tests/ingestion_invariants.rs @@ -19,7 +19,7 @@ fn ingestion_autoflushes_active_memtable() -> lsm_tree::Result<()> { assert_eq!(sealed_before, 0); // Start ingestion (should auto-flush active) - tree.ingestion()?.with_seqno(10).finish()?; + tree.ingestion()?.finish()?; // After ingestion, data is in tables; no sealed memtables assert_eq!(tree.sealed_memtable_count(), 0); @@ -53,7 +53,7 @@ fn ingestion_flushes_sealed_memtables() -> lsm_tree::Result<()> { let tables_before = tree.table_count(); // Ingestion should flush sealed memtables and register resulting tables - tree.ingestion()?.with_seqno(20).finish()?; + tree.ingestion()?.finish()?; assert_eq!(tree.sealed_memtable_count(), 0); assert!(tree.table_count() > tables_before); @@ -75,7 +75,7 @@ fn ingestion_blocks_memtable_writes_until_finish() -> lsm_tree::Result<()> { let tree = Config::new(&folder, Default::default()).open()?; // Acquire ingestion and keep it active while another thread performs writes - let ingest = tree.ingestion()?.with_seqno(5); + let ingest = tree.ingestion()?; let (started_tx, started_rx) = mpsc::channel(); let (done_tx, done_rx) = mpsc::channel(); @@ -123,7 +123,7 @@ fn blob_ingestion_honors_invariants_and_blocks_writes() -> lsm_tree::Result<()> let (done_tx, done_rx) = mpsc::channel(); let tree2 = tree.clone(); - let ingest = tree.ingestion()?.with_seqno(30); + let ingest = tree.ingestion()?; let handle = thread::spawn(move || { started_tx.send(()).ok(); diff --git a/tests/tree_bulk_ingest.rs b/tests/tree_bulk_ingest.rs index c989bfc9..c5c2e575 100644 --- a/tests/tree_bulk_ingest.rs +++ b/tests/tree_bulk_ingest.rs @@ -13,14 +13,12 @@ fn tree_bulk_ingest() -> lsm_tree::Result<()> { let tree = Config::new(folder, seqno.clone()).open()?; let mut ingestion = tree.ingestion()?; - let seq = seqno.next(); - ingestion = ingestion.with_seqno(seq); for x in 0..ITEM_COUNT as u64 { let k = x.to_be_bytes(); let v = nanoid::nanoid!(); ingestion.write(k, v)?; } - ingestion.finish()?; + let seq = ingestion.finish()?; visible_seqno.fetch_max(seq + 1); assert_eq!(tree.len(SeqNo::MAX, None)?, ITEM_COUNT); @@ -49,14 +47,12 @@ fn tree_copy() -> lsm_tree::Result<()> { let src = Config::new(folder, seqno.clone()).open()?; let mut ingestion = src.ingestion()?; - let seq = seqno.next(); - ingestion = ingestion.with_seqno(seq); for x in 0..ITEM_COUNT as u64 { let k = x.to_be_bytes(); let v = nanoid::nanoid!(); ingestion.write(k, v)?; } - ingestion.finish()?; + let seq = ingestion.finish()?; visible_seqno.fetch_max(seq + 1); assert_eq!(src.len(SeqNo::MAX, None)?, ITEM_COUNT); @@ -76,13 +72,11 @@ fn tree_copy() -> lsm_tree::Result<()> { let dest = Config::new(folder, seqno.clone()).open()?; let mut ingestion = dest.ingestion()?; - let seq = seqno.next(); - ingestion = ingestion.with_seqno(seq); for item in src.iter(SeqNo::MAX, None) { let (k, v) = item.into_inner().unwrap(); ingestion.write(k, v)?; } - ingestion.finish()?; + let seq = ingestion.finish()?; visible_seqno.fetch_max(seq + 1); assert_eq!(dest.len(SeqNo::MAX, None)?, ITEM_COUNT); @@ -113,14 +107,12 @@ fn blob_tree_bulk_ingest() -> lsm_tree::Result<()> { .open()?; let mut ingestion = tree.ingestion()?; - let seq = seqno.next(); - ingestion = ingestion.with_seqno(seq); for x in 0..ITEM_COUNT as u64 { let k = x.to_be_bytes(); let v = nanoid::nanoid!(); ingestion.write(k, v)?; } - ingestion.finish()?; + let seq = ingestion.finish()?; visible_seqno.fetch_max(seq + 1); assert_eq!(tree.len(SeqNo::MAX, None)?, ITEM_COUNT); @@ -152,14 +144,12 @@ fn blob_tree_copy() -> lsm_tree::Result<()> { .open()?; let mut ingestion = src.ingestion()?; - let seq = seqno.next(); - ingestion = ingestion.with_seqno(seq); for x in 0..ITEM_COUNT as u64 { let k = x.to_be_bytes(); let v = nanoid::nanoid!(); ingestion.write(k, v)?; } - ingestion.finish()?; + let seq = ingestion.finish()?; visible_seqno.fetch_max(seq + 1); assert_eq!(src.len(SeqNo::MAX, None)?, ITEM_COUNT); @@ -182,13 +172,11 @@ fn blob_tree_copy() -> lsm_tree::Result<()> { .open()?; let mut ingestion = dest.ingestion()?; - let seq = seqno.next(); - ingestion = ingestion.with_seqno(seq); for item in src.iter(SeqNo::MAX, None) { let (k, v) = item.into_inner().unwrap(); ingestion.write(k, v)?; } - ingestion.finish()?; + let seq = ingestion.finish()?; visible_seqno.fetch_max(seq + 1); assert_eq!(dest.len(SeqNo::MAX, None)?, ITEM_COUNT);