Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
36b2651
Expose Ingestion API with Inversion of Control
zaidoon1 Nov 15, 2025
d9d041f
clippy
marvin-j97 Nov 15, 2025
3489b8f
don't pin ingestion output tables
zaidoon1 Nov 15, 2025
25c9cdf
split ingestion initialization from seqno assignment in bulk ingest t…
zaidoon1 Nov 15, 2025
087e5de
Merge branch 'main' into zaidoon/ingestion-api-inversion-control
marvin-j97 Nov 15, 2025
359a585
Merge branch 'main' into zaidoon/ingestion-api-inversion-control
marvin-j97 Nov 17, 2025
3314099
Merge remote-tracking branch 'zaidoon1/zaidoon/ingestion-api-inversio…
marvin-j97 Nov 19, 2025
96a7f58
Merge branch 'main' into zaidoon/ingestion-api-inversion-control
marvin-j97 Nov 19, 2025
90b02a6
test: dirty snapshot after ingestion
marvin-j97 Nov 19, 2025
b73aa4e
refactor
marvin-j97 Nov 19, 2025
1877b57
refactor
marvin-j97 Nov 19, 2025
f3adbb7
Merge remote-tracking branch 'zaidoon1/zaidoon/ingestion-api-inversio…
marvin-j97 Nov 19, 2025
7849efa
more ergonomic ingestion API, add more tests
marvin-j97 Nov 19, 2025
0725204
refactor
marvin-j97 Nov 19, 2025
f06a70c
apply pinning on recovery
marvin-j97 Nov 19, 2025
8971f4c
Merge branch 'main' into zaidoon/ingestion-api-inversion-control
marvin-j97 Nov 19, 2025
0aa3ff4
Merge branch 'main' into zaidoon/ingestion-api-inversion-control
marvin-j97 Nov 19, 2025
fe64606
Merge branch 'main' into zaidoon/ingestion-api-inversion-control
marvin-j97 Nov 19, 2025
976cf87
change ingestion flush watermark to 0
marvin-j97 Nov 20, 2025
7fe024f
lint
marvin-j97 Nov 20, 2025
19e0876
feat: table global seqno
marvin-j97 Nov 22, 2025
bc0f85f
refactor
marvin-j97 Nov 22, 2025
0be5b17
implement atomic flush and global_seqno coordination
zaidoon1 Nov 22, 2025
f69061a
refactor ingestion to use upgrade_version_with_seqno for explicit seq…
zaidoon1 Nov 22, 2025
51e0fdc
refactor
marvin-j97 Nov 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 86 additions & 13 deletions src/blob_tree/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,31 +140,69 @@ impl<'a> BlobIngestion<'a> {
///
/// Will return `Err` if an IO error occurs.
pub fn finish(self) -> crate::Result<()> {
use crate::AbstractTree;
use crate::{version::persist_version, AbstractTree};

// Capture required handles before consuming fields during finalization
let index = self.index().clone();
let tree = self.tree.clone();

// Finalize both value log and index writer so the index sees a
// consistent set of blob files.
// CRITICAL SECTION: Atomic flush + seqno allocation + registration
//
// For BlobTree, we must coordinate THREE components atomically:
// 1. Index tree memtable flush
// 2. Value log blob files
// 3. Index tree tables (with blob indirections)
//
// The sequence ensures all components see the same global_seqno:
// 1. Acquire flush lock on index tree
// 2. Flush index tree active memtable
// 3. Finalize blob writer (creates blob files)
// 4. Finalize table writer (creates index tables)
// 5. Allocate next global seqno
// 6. Recover tables with that seqno
// 7. Register version with same seqno + blob files
//
// This prevents race conditions where blob files and their index
// entries could have mismatched sequence numbers.
let flush_lock = index.get_flush_lock();

// Flush any pending index memtable writes to ensure ingestion sees
// a consistent snapshot of the index.
// We call rotate + flush directly because we already hold the lock.
index.rotate_memtable();
index.flush(&flush_lock, 0)?;

// Finalize the blob writer first, ensuring all large values are
// written to blob files before we finalize the index tables that
// reference them.
let blob_files = self.blob.finish()?;

// Finalize the table writer, creating index tables with blob
// indirections pointing to the blob files we just created.
let results = self.table.writer.finish()?;

// Allocate the next global sequence number. This seqno will be shared
// by all ingested tables, blob files, and the version that registers
// them, ensuring consistent MVCC snapshots across the value log.
let global_seqno = index.config.seqno.next();

// Recover all created index tables, assigning them the global_seqno
// we just allocated. These tables contain indirections to the blob
// files created above, so they must share the same sequence number
// for MVCC correctness.
//
// We intentionally do NOT pin filter/index blocks here. Large ingests
// are typically placed in level 1, and pinning would increase memory
// pressure unnecessarily.
let created_tables = results
.into_iter()
.map(|(table_id, checksum)| -> crate::Result<Table> {
// Do not pin ingestion output tables here. Large ingests are
// typically placed in level 1 and would otherwise keep all
// filter and index blocks pinned, increasing memory pressure.
Table::recover(
index
.config
.path
.join(crate::file::TABLES_FOLDER)
.join(table_id.to_string()),
checksum,
todo!(),
global_seqno,
index.id,
index.config.cache.clone(),
index.config.descriptor_table.clone(),
Expand All @@ -176,10 +214,45 @@ impl<'a> BlobIngestion<'a> {
})
.collect::<crate::Result<Vec<_>>>()?;

// Blob ingestion only appends new tables and blob files; sealed
// memtables remain unchanged and GC watermark stays at its
// neutral value for this operation.
tree.register_tables(&created_tables, Some(&blob_files), None, &[], 0)?;
// Acquire locks for version registration on the index tree. We must
// hold both the compaction state lock and version history lock to
// safely modify the tree's version.
let mut _compaction_state = index.compaction_state.lock().expect("lock is poisoned");
let mut version_lock = index.version_history.write().expect("lock is poisoned");

// Create the next version by adding both:
// - Index tables as a new L0 run
// - Blob files to the value log
//
// We manually set the seqno to the global_seqno we allocated earlier,
// ensuring the version, tables, and blob files all share the same
// sequence number. This is critical for GC correctness - we must not
// delete blob files that are still referenced by visible snapshots.
//
// Why not use register_tables()?
// register_tables() calls upgrade_version(), which would allocate a
// DIFFERENT seqno via seqno.next(). We need to use the SAME seqno we
// already allocated and assigned to the tables.
let mut next_version = {
let current = version_lock.latest_version();
let mut copy = current.clone();
copy.version = copy
.version
.with_new_l0_run(&created_tables, Some(&blob_files), None);
copy
};

next_version.seqno = global_seqno;

// Persist the new version to disk and append it to the version history.
persist_version(&index.config.path, &next_version.version)?;
version_lock.append_version(next_version);

// Perform maintenance on the version history (e.g., clean up old versions).
// We use gc_watermark=0 since ingestion doesn't affect sealed memtables.
if let Err(e) = version_lock.maintenance(&index.config.path, 0) {
log::warn!("Version GC failed: {e:?}");
}

Ok(())
}
Expand Down
96 changes: 72 additions & 24 deletions src/tree/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,6 @@ impl<'a> Ingestion<'a> {
///
/// Will return `Err` if an IO error occurs.
pub fn new(tree: &'a Tree) -> crate::Result<Self> {
// Use the shared flush helper so ingestion participates in the same
// path as normal writes: any dirty memtable content is moved into
// tables before building new tables from the ingestion stream.
// This keeps the lookup path ordered as active > sealed > tables.
tree.flush_active_memtable(0)?;

let folder = tree.config.path.join(crate::file::TABLES_FOLDER);
log::debug!("Ingesting into tables in {}", folder.display());

Expand Down Expand Up @@ -216,30 +210,57 @@ impl<'a> Ingestion<'a> {
///
/// Will return `Err` if an IO error occurs.
pub fn finish(self) -> crate::Result<()> {
use crate::Table;
use crate::{version::persist_version, AbstractTree, Table};

// CRITICAL SECTION: Atomic flush + seqno allocation + registration
//
// We must ensure no concurrent writes interfere between flushing the
// active memtable and registering the ingested tables. The sequence is:
// 1. Acquire flush lock (prevents concurrent flushes)
// 2. Flush active memtable (ensures no pending writes)
// 3. Finish ingestion writer (creates table files)
// 4. Allocate next global seqno (atomic timestamp)
// 5. Recover tables with that seqno
// 6. Register version with same seqno
//
// Why not flush in new()?
// If we flushed in new(), there would be a race condition:
// new() -> flush -> [TIME PASSES + OTHER WRITES] -> finish() -> seqno
// The seqno would be disconnected from the flush, violating MVCC.
//
// By holding the flush lock throughout, we guarantee atomicity.
let flush_lock = self.tree.get_flush_lock();

// Flush any pending memtable writes to ensure ingestion sees a
// consistent snapshot and lookup order remains correct.
// We call rotate + flush directly because we already hold the lock.
self.tree.rotate_memtable();
self.tree.flush(&flush_lock, 0)?;

// Finalize the ingestion writer, writing all buffered data to disk.
let results = self.writer.finish()?;

log::info!("Finished ingestion writer");

// Turn the writer output into fully recovered tables that can be
// registered as a fresh L0 run.
// Allocate the next global sequence number. This seqno will be shared
// by all ingested tables and the version that registers them, ensuring
// consistent MVCC snapshots.
let global_seqno = self.tree.config.seqno.next();

// Recover all created tables, assigning them the global_seqno we just
// allocated. This ensures all ingested tables share the same sequence
// number, which is critical for MVCC correctness.
//
// We intentionally do NOT pin filter/index blocks here. Large ingests
// are typically placed in level 1, and pinning would increase memory
// pressure unnecessarily.
let created_tables = results
.into_iter()
.map(|(table_id, checksum)| -> crate::Result<Table> {
// TODO: table recoverer struct w/ builder pattern
// Table::recover()
// .pin_filters(true)
// .with_metrics(metrics)
// .run(path, tree_id, cache, descriptor_table);

// Do not pin ingestion output tables here. Large ingests are
// typically placed in level 1 and would otherwise keep all
// filter and index blocks pinned, increasing memory pressure.
Table::recover(
self.folder.join(table_id.to_string()),
checksum,
todo!(),
global_seqno,
self.tree.id,
self.tree.config.cache.clone(),
self.tree.config.descriptor_table.clone(),
Expand All @@ -251,11 +272,38 @@ impl<'a> Ingestion<'a> {
})
.collect::<crate::Result<Vec<_>>>()?;

// Ingestion produces new tables only and does not touch sealed
// memtables directly, so the deletion set is empty and the
// watermark is left at its neutral value.
self.tree
.register_tables(&created_tables, None, None, &[], 0)?;
// Acquire locks for version registration. We must hold both the
// compaction state lock and version history lock to safely modify
// the tree's version.
let mut _compaction_state = self.tree.compaction_state.lock().expect("lock is poisoned");
let mut version_lock = self.tree.version_history.write().expect("lock is poisoned");

// Create the next version by adding our ingested tables as a new L0 run.
// We manually set the seqno to the global_seqno we allocated earlier,
// ensuring the version and tables share the same sequence number.
//
// Why not use register_tables()?
// register_tables() calls upgrade_version(), which would allocate a
// DIFFERENT seqno via seqno.next(). We need to use the SAME seqno we
// already allocated and assigned to the tables.
let mut next_version = {
let current = version_lock.latest_version();
let mut copy = current.clone();
copy.version = copy.version.with_new_l0_run(&created_tables, None, None);
copy
};

next_version.seqno = global_seqno;

// Persist the new version to disk and append it to the version history.
persist_version(&self.tree.config.path, &next_version.version)?;
version_lock.append_version(next_version);

// Perform maintenance on the version history (e.g., clean up old versions).
// We use gc_watermark=0 since ingestion doesn't affect sealed memtables.
if let Err(e) = version_lock.maintenance(&self.tree.config.path, 0) {
log::warn!("Version GC failed: {e:?}");
}

Ok(())
}
Expand Down