From 47a45721bc8fff2dabaeef2a508d8e48dca74d27 Mon Sep 17 00:00:00 2001 From: Vidya Silai Date: Tue, 11 Nov 2025 10:57:47 -0500 Subject: [PATCH 01/10] Initial commit --- conanfile.py | 2 +- .../core/algo/compute_running_total.hpp | 4 +- src/turtle_kv/core/testing/generate.hpp | 14 +- src/turtle_kv/kv_store.cpp | 4 +- src/turtle_kv/kv_store_scanner.cpp | 9 +- src/turtle_kv/tree/algo/segmented_levels.hpp | 12 + src/turtle_kv/tree/algo/segments.hpp | 16 + src/turtle_kv/tree/batch_update.cpp | 124 +++- src/turtle_kv/tree/batch_update.hpp | 28 +- src/turtle_kv/tree/in_memory_leaf.cpp | 25 + src/turtle_kv/tree/in_memory_leaf.hpp | 6 +- src/turtle_kv/tree/in_memory_node.cpp | 600 +++++++++++++++++- src/turtle_kv/tree/in_memory_node.hpp | 36 ++ src/turtle_kv/tree/in_memory_node.test.cpp | 30 +- src/turtle_kv/tree/packed_leaf_page.hpp | 33 +- src/turtle_kv/tree/subtree.cpp | 163 ++++- src/turtle_kv/tree/subtree.hpp | 25 + .../tree/testing/random_leaf_generator.hpp | 2 +- 18 files changed, 1073 insertions(+), 60 deletions(-) diff --git a/conanfile.py b/conanfile.py index 9492d30..0d0b7ef 100644 --- a/conanfile.py +++ b/conanfile.py @@ -60,7 +60,7 @@ def requirements(self): self.requires("gperftools/2.16", **VISIBLE) self.requires("llfs/0.42.0", **VISIBLE) self.requires("pcg-cpp/cci.20220409", **VISIBLE) - self.requires("vqf/0.2.5", **VISIBLE) + self.requires("vqf/0.2.5-devel", **VISIBLE) self.requires("zlib/1.3.1", **OVERRIDE) if platform.system() == "Linux": diff --git a/src/turtle_kv/core/algo/compute_running_total.hpp b/src/turtle_kv/core/algo/compute_running_total.hpp index 33a0568..9ff8f73 100644 --- a/src/turtle_kv/core/algo/compute_running_total.hpp +++ b/src/turtle_kv/core/algo/compute_running_total.hpp @@ -14,10 +14,10 @@ namespace turtle_kv { //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // -template +template inline batt::RunningTotal compute_running_total( batt::WorkerPool& worker_pool, - const MergeCompactor::ResultSet& result_set, + const MergeCompactor::ResultSet& result_set, DecayToItem decay_to_item [[maybe_unused]] = {}) { auto merged_edits = result_set.get(); diff --git a/src/turtle_kv/core/testing/generate.hpp b/src/turtle_kv/core/testing/generate.hpp index bafa95a..06b4c37 100644 --- a/src/turtle_kv/core/testing/generate.hpp +++ b/src/turtle_kv/core/testing/generate.hpp @@ -184,8 +184,11 @@ class RandomResultSetGenerator : public MinMaxSize } template - MergeCompactor::ResultSet - operator()(DecayToItem, Rng& rng, llfs::StableStringStore& store) + MergeCompactor::ResultSet operator()( + DecayToItem, + Rng& rng, + llfs::StableStringStore& store, + Optional> to_delete = None) { using ResultSet = MergeCompactor::ResultSet; using Item = typename ResultSet::value_type; @@ -199,6 +202,13 @@ class RandomResultSetGenerator : public MinMaxSize items.emplace_back(this->key_generator_(rng, store), ValueView::from_str(store.store(std::string(this->value_size_, ch)))); } + + if (to_delete) { + for (const KeyView& delete_key : *to_delete) { + items.emplace_back(delete_key, ValueView::deleted()); + } + } + std::sort(items.begin(), items.end(), KeyOrder{}); items.erase(std::unique(items.begin(), items.end(), diff --git a/src/turtle_kv/kv_store.cpp b/src/turtle_kv/kv_store.cpp index 727ac30..93b1e5f 100644 --- a/src/turtle_kv/kv_store.cpp +++ b/src/turtle_kv/kv_store.cpp @@ -766,9 +766,7 @@ StatusOr KVStore::scan_keys(const KeyView& min_key, // Status KVStore::remove(const KeyView& key) noexcept /*override*/ { - (void)key; - - return batt::StatusCode::kUnimplemented; + return this->put(key, ValueView::deleted()); } //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - diff --git a/src/turtle_kv/kv_store_scanner.cpp b/src/turtle_kv/kv_store_scanner.cpp index 61f6d02..36b2d14 100644 --- a/src/turtle_kv/kv_store_scanner.cpp +++ b/src/turtle_kv/kv_store_scanner.cpp @@ -443,7 +443,14 @@ Status KVStoreScanner::set_next_item() } } else { - break; + // TODO [vsilai 11-10-2025]: need to fix key only scans to look at values. + // + if (!this->keys_only_ && this->next_item_->value == ValueView::deleted()) { + this->next_item_ = None; + continue; + } else { + break; + } } if (scan_level->advance()) { diff --git a/src/turtle_kv/tree/algo/segmented_levels.hpp b/src/turtle_kv/tree/algo/segmented_levels.hpp index c9864e1..822233e 100644 --- a/src/turtle_kv/tree/algo/segmented_levels.hpp +++ b/src/turtle_kv/tree/algo/segmented_levels.hpp @@ -305,6 +305,18 @@ struct SegmentedLevelAlgorithms { return OkStatus(); } + /** \brief Merges the two given pivots, effectively erasing `right_pivot`. + */ + void merge_pivots(i32 left_pivot, i32 right_pivot) + { + const usize segment_count = this->level_.segment_count(); + + for (usize segment_i = 0; segment_i < segment_count; ++segment_i) { + SegmentT& segment = this->level_.get_segment(segment_i); + in_segment(segment).merge_pivots(left_pivot, right_pivot, this->level_); + } + } + /** \brief Invokes `fn` for each SegmentT& selected by `pivot_selector`. * * `pivot_selector` can be: diff --git a/src/turtle_kv/tree/algo/segments.hpp b/src/turtle_kv/tree/algo/segments.hpp index 54f8016..0798dfb 100644 --- a/src/turtle_kv/tree/algo/segments.hpp +++ b/src/turtle_kv/tree/algo/segments.hpp @@ -116,6 +116,22 @@ struct SegmentAlgorithms { return true; } + /** \brief Merges the two given pivots, effectively erasing `right_pivot`. + */ + template + [[nodiscard]] void merge_pivots(i32 left_pivot, i32 right_pivot, const LevelT& level) + { + BATT_CHECK(!this->segment_.is_pivot_active(left_pivot)); + + u32 new_flushed_upper_bound = this->segment_.get_flushed_item_upper_bound(level, right_pivot); + bool new_is_active = this->segment_.is_pivot_active(right_pivot); + + this->segment_.set_pivot_active(left_pivot, new_is_active); + this->segment_.set_flushed_item_upper_bound(left_pivot, new_flushed_upper_bound); + + this->segment_.remove_pivot(right_pivot); + } + /** \brief Invokes the speficied `fn` for each active pivot in the specified range, passing a * reference to the segment and the pivot index (i32). */ diff --git a/src/turtle_kv/tree/batch_update.cpp b/src/turtle_kv/tree/batch_update.cpp index 94babdd..8e5675d 100644 --- a/src/turtle_kv/tree/batch_update.cpp +++ b/src/turtle_kv/tree/batch_update.cpp @@ -9,7 +9,129 @@ using TrimResult = BatchUpdate::TrimResult; // void BatchUpdate::update_edit_size_totals() { - this->edit_size_totals.emplace(this->context.compute_running_total(this->result_set)); + this->edit_size_totals.emplace( + this->context.compute_running_total(this->result_set)); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +void BatchUpdate::update_edit_size_totals_decayed( + const MergeCompactor::ResultSet& decayed_result_set) +{ + this->edit_size_totals.emplace( + this->context.compute_running_total(decayed_result_set)); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +void BatchUpdate::decay_batch_to_items( + MergeCompactor::ResultSet& output_result_set) +{ + const batt::TaskCount max_tasks{this->context.worker_pool.size() + 1}; + std::vector decayed_items; + + if (max_tasks == 1) { + for (const EditView& edit : this->result_set.get()) { + Optional maybe_item = to_item_view(edit); + if (maybe_item) { + decayed_items.emplace_back(EditView::from_item_view(*maybe_item)); + } + } + } else { + const ParallelAlgoDefaults& algo_defaults = parallel_algo_defaults(); + + auto actual_edits = result_set.get(); + const auto src_begin = actual_edits.begin(); + const auto src_end = actual_edits.end(); + + const batt::WorkSlicePlan plan{batt::WorkSliceParams{ + algo_defaults.copy_decayed_items.min_task_size, + max_tasks, + }, + src_begin, + src_end}; + + BATT_CHECK_GT(plan.n_tasks, 0); + + batt::SmallVec output_size_per_shard(plan.n_tasks); + BATT_CHECK_EQ(output_size_per_shard.size(), plan.n_tasks); + + // First count the number of non-decayed items in the output for each shard. + { + batt::ScopedWorkContext work_context{this->context.worker_pool}; + + BATT_CHECK_OK(batt::slice_work( + work_context, + plan, + /*gen_work_fn=*/ + [&](usize task_index, isize task_offset, isize task_size) { + return [src_begin, task_index, task_offset, task_size, &output_size_per_shard] { + BATT_CHECK_LT(task_index, output_size_per_shard.size()); + + auto task_src_begin = std::next(src_begin, task_offset); + const auto task_src_end = std::next(task_src_begin, task_size); + + usize output_size = 0; + + for (; task_src_begin != task_src_end; ++task_src_begin) { + if (decays_to_item(*task_src_begin)) { + output_size += 1; + } + } + output_size_per_shard[task_index] = output_size; + }; + })) + << "worker_pool must not be closed!"; + } + + // Change to a rolling sum and do the actual copy. + // + usize output_total_size = 0; + batt::SmallVec output_shard_offset; + for (usize output_shard_size : output_size_per_shard) { + output_shard_offset.emplace_back(output_total_size); + output_total_size += output_shard_size; + } + + decayed_items.resize(output_total_size); + { + this->context.worker_pool.reset(); + + batt::ScopedWorkContext work_context{this->context.worker_pool}; + + BATT_CHECK_OK( + batt::slice_work(work_context, + plan, + /*gen_work_fn=*/ + [&](usize task_index, isize task_offset, isize task_size) { + return [src_begin, + &output_shard_offset, + &output_size_per_shard, + task_index, + task_offset, + task_size, + &decayed_items] { + auto task_src_begin = std::next(src_begin, task_offset); + const auto task_src_end = std::next(task_src_begin, task_size); + + BATT_CHECK_LT(task_index, output_shard_offset.size()); + auto task_dst_begin = + std::next(decayed_items.data(), output_shard_offset[task_index]); + + for (; task_src_begin != task_src_end; ++task_src_begin) { + Optional maybe_item = to_item_view(*task_src_begin); + if (maybe_item) { + *task_dst_begin = EditView::from_item_view(*maybe_item); + ++task_dst_begin; + } + } + }; + })) + << "worker_pool must not be closed!"; + } + } + + output_result_set.append(std::move(decayed_items)); } //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - diff --git a/src/turtle_kv/tree/batch_update.hpp b/src/turtle_kv/tree/batch_update.hpp index 161c370..7eeab1d 100644 --- a/src/turtle_kv/tree/batch_update.hpp +++ b/src/turtle_kv/tree/batch_update.hpp @@ -29,17 +29,18 @@ struct BatchUpdateContext { /** \brief Uses the worker_pool to perform a parallel merge-compaction of the lines * produced by the passed `generator_fn`, up to and including (but stopping at) `max_key`. */ - template - StatusOr> merge_compact_edits( + template + StatusOr> merge_compact_edits( const KeyView& max_key, GeneratorFn&& generator_fn); /** \brief Computes and returns the running total (prefix sum) of the edit sizes in result_set. */ + template batt::RunningTotal compute_running_total( - const MergeCompactor::ResultSet& result_set) const + const MergeCompactor::ResultSet& result_set) const { - return ::turtle_kv::compute_running_total(this->worker_pool, result_set); + return ::turtle_kv::compute_running_total(this->worker_pool, result_set); } }; @@ -63,6 +64,16 @@ struct BatchUpdate { */ void update_edit_size_totals(); + /** \brief Resets `this->edit_size_totals` to reflect the decayed version of `this->result_set`. + */ + void update_edit_size_totals_decayed( + const MergeCompactor::ResultSet& decayed_result_set); + + /** \brief Fills the output buffer `ResultSet` passed into the function with only the + * edits from this batch that decay to base-level items (e.g., no tombstones). + */ + void decay_batch_to_items(MergeCompactor::ResultSet& output_result_set); + /** \brief Returns the inclusive (closed) interval of keys in this batch. */ CInterval get_key_crange() const @@ -90,9 +101,10 @@ std::ostream& operator<<(std::ostream& out, const BatchUpdate::TrimResult& t); //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // -template -inline StatusOr> -BatchUpdateContext::merge_compact_edits(const KeyView& max_key, GeneratorFn&& generator_fn) +template +inline StatusOr> BatchUpdateContext::merge_compact_edits( + const KeyView& max_key, + GeneratorFn&& generator_fn) { MergeCompactor compactor{this->worker_pool}; @@ -100,7 +112,7 @@ BatchUpdateContext::merge_compact_edits(const KeyView& max_key, GeneratorFn&& ge BATT_REQUIRE_OK(BATT_FORWARD(generator_fn)(compactor)); compactor.finish_push_levels(); - MergeCompactor::EditBuffer edit_buffer; + MergeCompactor::OutputBuffer edit_buffer; this->worker_pool.reset(); return compactor.read(edit_buffer, max_key); diff --git a/src/turtle_kv/tree/in_memory_leaf.cpp b/src/turtle_kv/tree/in_memory_leaf.cpp index 9c02cf4..1708bf1 100644 --- a/src/turtle_kv/tree/in_memory_leaf.cpp +++ b/src/turtle_kv/tree/in_memory_leaf.cpp @@ -149,6 +149,31 @@ auto InMemoryLeaf::make_split_plan() const -> StatusOr return plan; } +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +StatusOr> InMemoryLeaf::try_merge(BatchUpdateContext& context, + InMemoryLeaf& sibling) +{ + auto merged_leaf = + std::make_unique(batt::make_copy(this->pinned_leaf_page_), this->tree_options); + + // Concatenate the two leaves' result sets in the correct order. + // + bool right_sibling = this->get_max_key() < sibling.get_min_key(); + if (right_sibling) { + merged_leaf->result_set = + MergeCompactor::ResultSet::concat(std::move(this->result_set), + std::move(sibling.result_set)); + } else { + merged_leaf->result_set = MergeCompactor::ResultSet::concat(std::move(sibling.result_set), + std::move(this->result_set)); + } + + merged_leaf->set_edit_size_totals(context.compute_running_total(merged_leaf->result_set)); + + return {std::move(merged_leaf)}; +} + //=#=#==#==#===============+=+=+=+=++=++++++++++++++-++-+--+-+----+--------------- //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - diff --git a/src/turtle_kv/tree/in_memory_leaf.hpp b/src/turtle_kv/tree/in_memory_leaf.hpp index 9d71d93..879fee4 100644 --- a/src/turtle_kv/tree/in_memory_leaf.hpp +++ b/src/turtle_kv/tree/in_memory_leaf.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -32,7 +33,7 @@ struct InMemoryLeaf { llfs::PinnedPage pinned_leaf_page_; TreeOptions tree_options; - MergeCompactor::ResultSet result_set; + MergeCompactor::ResultSet result_set; std::shared_ptr shared_edit_size_totals_; Optional edit_size_totals; mutable std::atomic future_id_{~u64{0}}; @@ -91,6 +92,9 @@ struct InMemoryLeaf { StatusOr make_split_plan() const; + StatusOr> try_merge(BatchUpdateContext& context, + InMemoryLeaf& sibling); + Status start_serialize(TreeSerializeContext& context); StatusOr finish_serialize(TreeSerializeContext& context); diff --git a/src/turtle_kv/tree/in_memory_node.cpp b/src/turtle_kv/tree/in_memory_node.cpp index 72fc976..032c566 100644 --- a/src/turtle_kv/tree/in_memory_node.cpp +++ b/src/turtle_kv/tree/in_memory_node.cpp @@ -288,7 +288,7 @@ Status InMemoryNode::update_buffer_insert(BatchUpdate& update) BATT_ASSIGN_OK_RESULT( // new_merged_level.result_set, - update.context.merge_compact_edits( // + update.context.merge_compact_edits( // global_max_key(), [&](MergeCompactor& compactor) -> Status { compactor.push_level(update.result_set.live_edit_slices()); @@ -402,7 +402,7 @@ Status InMemoryNode::compact_update_buffer_levels(BatchUpdateContext& update_con Status segment_load_status; BATT_ASSIGN_OK_RESULT(new_merged_level.result_set, - update_context.merge_compact_edits( + update_context.merge_compact_edits( global_max_key(), [&](MergeCompactor& compactor) -> Status { this->push_levels_to_merge(compactor, @@ -447,10 +447,10 @@ StatusOr InMemoryNode::collect_pivot_batch(BatchUpdateContext& upda // Merge/compact all pending edits for the specified pivot. // - BATT_ASSIGN_OK_RESULT( // - pivot_batch.result_set, // - update_context.merge_compact_edits( // - /*max_key=*/pivot_key_range.upper_bound, // + BATT_ASSIGN_OK_RESULT( // + pivot_batch.result_set, // + update_context.merge_compact_edits( // + /*max_key=*/pivot_key_range.upper_bound, // [&](MergeCompactor& compactor) -> Status { this->push_levels_to_merge(compactor, update_context.page_loader, @@ -589,8 +589,7 @@ Status InMemoryNode::make_child_viable(BatchUpdateContext& update_context, i32 p //----- --- -- - - - - [&](const NeedsMerge&) -> Status { - BATT_PANIC() << "TODO [tastolfi 2025-03-16] implement me!"; - return batt::StatusCode::kUnimplemented; + return this->merge_child(update_context, pivot_i); }); return status; @@ -691,6 +690,552 @@ Status InMemoryNode::split_child(BatchUpdateContext& update_context, i32 pivot_i return OkStatus(); } +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +Status InMemoryNode::merge_child(BatchUpdateContext& update_context, i32 pivot_i) +{ + Subtree& child = this->children[pivot_i]; + + // Decide which sibling to merge with. Edge cases: child that needs merge is the leftmost or + // rightmost child in the node. + // + i32 sibling_i = pivot_i; + i32 right_sibling = pivot_i + 1; + i32 left_sibling = pivot_i - 1; + + bool need_compaction = false; + u64 active_segmented_levels = this->update_buffer.compute_active_segmented_levels(); + if (pivot_i == 0) { + sibling_i = right_sibling; + if (get_bit(active_segmented_levels, pivot_i)) { + need_compaction = true; + } + } else if ((usize)pivot_i == this->children.size() - 1) { + sibling_i = left_sibling; + if (get_bit(active_segmented_levels, left_sibling)) { + need_compaction = true; + } + } else { + if (!get_bit(active_segmented_levels, pivot_i)) { + sibling_i = right_sibling; + } else { + if (!get_bit(active_segmented_levels, left_sibling)) { + sibling_i = left_sibling; + } else { + sibling_i = right_sibling; + need_compaction = true; + } + } + } + + BATT_CHECK_NE(pivot_i, sibling_i); + BATT_REQUIRE_OK(this->children[sibling_i].to_in_memory_subtree(update_context.page_loader, + this->tree_options, + this->height - 1)); + + // Call child.try_merge(). + // + Subtree& sibling = this->children[sibling_i]; + StatusOr> status_or_merged = child.try_merge(update_context, sibling); + if (!status_or_merged.ok()) { + LOG(ERROR) << BATT_INSPECT(child.get_viability()); + } + BATT_REQUIRE_OK(status_or_merged); + + if (!*status_or_merged) { + if (!batt::is_case(child.get_viability())) { + BATT_ASSIGN_OK_RESULT(KeyView new_pivot_key, child.try_borrow(update_context, sibling)); + + this->pivot_keys_[std::max(pivot_i, sibling_i)] = new_pivot_key; + + BATT_REQUIRE_OK(this->compact_update_buffer_levels(update_context)); + } + BATT_CHECK(batt::is_case(child.get_viability())); + return OkStatus(); + } + Subtree& merged_subtree = **status_or_merged; + + // Erase rightmost of {child subtree, sibling} in this->child_pages, overwrite leftmost + // with new PinnedPage{}. + // + i32 pivot_to_erase = std::max(pivot_i, sibling_i); + i32 pivot_to_overwrite = std::min(pivot_i, sibling_i); + + this->child_pages[pivot_to_overwrite] = llfs::PinnedPage{}; + this->child_pages.erase(this->child_pages.begin() + pivot_to_erase); + + // Update the update_buffer levels. + // + if (need_compaction) { + BATT_REQUIRE_OK(this->compact_update_buffer_levels(update_context)); + } else { + for (Level& level : this->update_buffer.levels) { + if (batt::is_case(level)) { + SegmentedLevel& segmented_level = std::get(level); + in_segmented_level(*this, segmented_level, update_context.page_loader) + .merge_pivots(pivot_to_overwrite, pivot_to_erase); + } + } + } + + // Update this->children, following same update method as with this->child_pages. + // + this->children[pivot_to_overwrite] = std::move(merged_subtree); + this->children.erase(this->children.begin() + pivot_to_erase); + + // Update pending_bytes. The leftmost of {subtree, sibling} should be incremented by the removed + // subtree's pending bytes values. Erase the pending bytes of the removed subtree. + // + this->pending_bytes[pivot_to_overwrite] += this->pending_bytes[pivot_to_erase]; + this->pending_bytes.erase(this->pending_bytes.begin() + pivot_to_erase); + + bool is_pending_bytes_exact = get_bit(this->pending_bytes_is_exact, pivot_to_overwrite) & + get_bit(this->pending_bytes_is_exact, pivot_to_erase); + this->pending_bytes_is_exact = + set_bit(this->pending_bytes_is_exact, pivot_to_overwrite, is_pending_bytes_exact); + this->pending_bytes_is_exact = remove_bit(this->pending_bytes_is_exact, pivot_to_erase); + + // Remove the pivot key of the removed child subtree from this->pivot_keys_. + // + this->pivot_keys_.erase(this->pivot_keys_.begin() + pivot_to_erase); + + if ((usize)pivot_to_erase == this->children.size()) { + BATT_ASSIGN_OK_RESULT( + this->max_key_, + this->children.back().get_max_key(update_context.page_loader, this->child_pages.back())); + } + + // Finally, split the newly merged child if needed. + // + SubtreeViability merged_viability = merged_subtree.get_viability(); + if (batt::is_case(merged_viability)) { + BATT_REQUIRE_OK(this->split_child(update_context, pivot_to_overwrite)); + } else { + BATT_CHECK(batt::is_case(merged_viability)); + } + + return OkStatus(); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +StatusOr> InMemoryNode::flush_and_shrink(BatchUpdateContext& context) +{ + // If more than one pivot exists, nothings needs to be done. + // + usize pivot_count = this->pivot_count(); + if (pivot_count > 1) { + return None; + } + + i32 single_pivot_i = 0; + BATT_CHECK_EQ(this->pending_bytes.size(), 1); + usize pending_bytes_count = this->pending_bytes[single_pivot_i]; + + // Flush until we have nothing left in the update buffer or until we gain more pivots. + // + while (pivot_count == 1 && pending_bytes_count > 0) { + BATT_REQUIRE_OK(this->flush_to_pivot(context, single_pivot_i)); + pivot_count = this->pivot_count(); + pending_bytes_count = this->pending_bytes[single_pivot_i]; + } + + // If still only one pivot remains, return the child. + // + if (pivot_count == 1) { + return std::move(this->children[single_pivot_i]); + } else { + return None; + } +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +StatusOr> InMemoryNode::try_merge(BatchUpdateContext& context, + InMemoryNode& sibling) +{ + // If merging both full nodes will cause the merged node's pivot count to exceed the max + // possible pivot count, return null so that we can try a borrow. + // + if (this->pivot_count() + sibling.pivot_count() > this->max_pivot_count()) { + return nullptr; + } + + auto new_node = std::make_unique(batt::make_copy(this->pinned_node_page_), + this->tree_options, + this->is_size_tiered()); + + const auto concat_metadata = [&](InMemoryNode& left, InMemoryNode& right) { + new_node->max_key_ = right.max_key_; + + new_node->latest_flush_pivot_i_ = None; + + new_node->pending_bytes.insert(new_node->pending_bytes.end(), + left.pending_bytes.begin(), + left.pending_bytes.end()); + new_node->pending_bytes.insert(new_node->pending_bytes.end(), + right.pending_bytes.begin(), + right.pending_bytes.end()); + + new_node->pending_bytes_is_exact = left.pending_bytes_is_exact & right.pending_bytes_is_exact; + + new_node->child_pages.insert(new_node->child_pages.end(), + std::make_move_iterator(left.child_pages.begin()), + std::make_move_iterator(left.child_pages.end())); + new_node->child_pages.insert(new_node->child_pages.end(), + std::make_move_iterator(right.child_pages.begin()), + std::make_move_iterator(right.child_pages.end())); + + new_node->children.insert(new_node->children.end(), + std::make_move_iterator(left.children.begin()), + std::make_move_iterator(left.children.end())); + new_node->children.insert(new_node->children.end(), + std::make_move_iterator(right.children.begin()), + std::make_move_iterator(right.children.end())); + + new_node->pivot_keys_.insert(new_node->pivot_keys_.end(), + left.pivot_keys_.begin(), + left.pivot_keys_.end() - 1); + new_node->pivot_keys_.insert(new_node->pivot_keys_.end(), + right.pivot_keys_.begin(), + right.pivot_keys_.end()); + }; + + const auto merge_update_buffers = [&](InMemoryNode& left, InMemoryNode& right) -> Status { + usize i = 0; + for (; i < left.update_buffer.levels.size(); ++i) { + Level& left_level = left.update_buffer.levels[i]; + BATT_REQUIRE_OK(batt::case_of( // + left_level, // + [&](EmptyLevel&) -> Status { + if (i < right.update_buffer.levels.size()) { + Level& right_level = right.update_buffer.levels[i]; + if (!batt::is_case(right_level)) { + new_node->update_buffer.levels.emplace_back(std::move(right_level)); + } + } + + return OkStatus(); + }, + [&](MergedLevel& left_merged_level) -> Status { + if (i < right.update_buffer.levels.size()) { + BATT_REQUIRE_OK(batt::case_of( + right.update_buffer.levels[i], + [&](EmptyLevel&) -> Status { + new_node->update_buffer.levels.emplace_back(std::move(left_merged_level)); + return OkStatus(); + }, + [&](MergedLevel& right_merged_level) -> Status { + new_node->update_buffer.levels.emplace_back( + left_merged_level.concat(right_merged_level)); + return OkStatus(); + }, + [&](SegmentedLevel& right_segmented_level) -> Status { + // When merging a MergedLevel and a SegmentedLevel, create a new MergedLevel. + // + MergedLevel new_merged_level; + HasPageRefs has_page_refs{false}; + Status segment_load_status; + + Slice levels_to_merge = + as_slice(right.update_buffer.levels.data() + i, usize{1}); + + BATT_ASSIGN_OK_RESULT( // + new_merged_level.result_set, + context.merge_compact_edits( // + global_max_key(), + [&](MergeCompactor& compactor) -> Status { + compactor.push_level(left_merged_level.result_set.live_edit_slices()); + right.push_levels_to_merge(compactor, + context.page_loader, + segment_load_status, + has_page_refs, + levels_to_merge, + /*min_pivot_i=*/0, + /*only_pivot=*/false); + return OkStatus(); + })); + + BATT_REQUIRE_OK(segment_load_status); + + new_node->update_buffer.levels.emplace_back(std::move(new_merged_level)); + + return OkStatus(); + })); + } else { + new_node->update_buffer.levels.emplace_back(std::move(left_merged_level)); + } + + return OkStatus(); + }, + [&](SegmentedLevel& left_segmented_level) -> Status { + if (i < right.update_buffer.levels.size()) { + BATT_REQUIRE_OK(batt::case_of( + right.update_buffer.levels[i], + [&](EmptyLevel&) -> Status { + new_node->update_buffer.levels.emplace_back(std::move(left_segmented_level)); + return OkStatus(); + }, + [&](MergedLevel& right_merged_level) -> Status { + MergedLevel new_merged_level; + HasPageRefs has_page_refs{false}; + Status segment_load_status; + + Slice levels_to_merge = + as_slice(left.update_buffer.levels.data() + i, usize{1}); + + BATT_ASSIGN_OK_RESULT( // + new_merged_level.result_set, + context.merge_compact_edits( // + global_max_key(), + [&](MergeCompactor& compactor) -> Status { + left.push_levels_to_merge(compactor, + context.page_loader, + segment_load_status, + has_page_refs, + levels_to_merge, + /*min_pivot_i=*/0, + /*only_pivot=*/false); + compactor.push_level( + right_merged_level.result_set.live_edit_slices()); + return OkStatus(); + })); + + BATT_REQUIRE_OK(segment_load_status); + + new_node->update_buffer.levels.emplace_back(std::move(new_merged_level)); + + return OkStatus(); + }, + [&](SegmentedLevel& right_segmented_level) -> Status { + // First shift the right level's bitsets to the left by the number of pivots + // in the left node. + // + usize left_node_pivot_count = left.pivot_count(); + for (usize segment_i = 0; segment_i < right_segmented_level.segment_count(); + ++segment_i) { + Segment& segment = right_segmented_level.get_segment(segment_i); + segment.flushed_pivots <<= left_node_pivot_count; + segment.active_pivots <<= left_node_pivot_count; + } + + new_node->update_buffer.levels.emplace_back(std::move(left_segmented_level)); + SegmentedLevel& new_segmented_level = + std::get(new_node->update_buffer.levels.back()); + new_segmented_level.segments.insert( + new_segmented_level.segments.end(), + std::make_move_iterator(right_segmented_level.segments.begin()), + std::make_move_iterator(right_segmented_level.segments.end())); + + return OkStatus(); + })); + } else { + new_node->update_buffer.levels.emplace_back(std::move(left_segmented_level)); + } + + return OkStatus(); + })); + } + + // Carry over any remaining levels from the right node's update buffer. + // + for (; i < right.update_buffer.levels.size(); ++i) { + Level& right_level = right.update_buffer.levels[i]; + if (!batt::is_case(right_level)) { + new_node->update_buffer.levels.emplace_back(std::move(right_level)); + } + } + + return OkStatus(); + }; + + if (this->get_max_key() < sibling.get_min_key()) { + concat_metadata(*this, sibling); + BATT_REQUIRE_OK(merge_update_buffers(*this, sibling)); + } else { + concat_metadata(sibling, *this); + BATT_REQUIRE_OK(merge_update_buffers(sibling, *this)); + } + + return {std::move(new_node)}; +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +StatusOr InMemoryNode::try_borrow(BatchUpdateContext& context, InMemoryNode& sibling) +{ + BATT_CHECK(batt::is_case(sibling.get_viability())); + + bool right_sibling = this->get_max_key() < sibling.get_min_key(); + + BATT_CHECK_LT(this->pivot_count(), 4); + u32 num_pivots_to_borrow = 4 - this->pivot_count(); + + i32 borrowed_min_pivot_i = -1; + KeyView borrowed_max_pivot_key; + if (right_sibling) { + borrowed_min_pivot_i = 0; + borrowed_max_pivot_key = sibling.get_pivot_key(num_pivots_to_borrow); + } else { + borrowed_min_pivot_i = sibling.pivot_count() - num_pivots_to_borrow; + borrowed_max_pivot_key = sibling.get_pivot_key(sibling.pivot_count()); + } + Interval borrowed_pivot_range{sibling.get_pivot_key(borrowed_min_pivot_i), + borrowed_max_pivot_key}; + + BatchUpdate borrowed_pivot_batch{ + .context = context, + .result_set = {}, + .edit_size_totals = None, + }; + + Status segment_load_status; + HasPageRefs has_page_refs{false}; + + BATT_ASSIGN_OK_RESULT( // + borrowed_pivot_batch.result_set, // + context.merge_compact_edits( // + /*max_key=*/borrowed_max_pivot_key, // + [&](MergeCompactor& compactor) -> Status { + sibling.push_levels_to_merge(compactor, + context.page_loader, + segment_load_status, + has_page_refs, + as_slice(sibling.update_buffer.levels), + /*min_pivot_i=*/borrowed_min_pivot_i, + /*only_pivot=*/false); + return OkStatus(); + })); + + BATT_REQUIRE_OK(segment_load_status); + + borrowed_pivot_batch.result_set.drop_key_range_half_open(Interval{ + borrowed_max_pivot_key, + sibling.key_upper_bound(), + }); + + borrowed_pivot_batch.edit_size_totals = None; + + if (right_sibling) { + this->pending_bytes.insert(this->pending_bytes.end(), + sibling.pending_bytes.begin(), + sibling.pending_bytes.begin() + num_pivots_to_borrow); + sibling.pending_bytes.erase(sibling.pending_bytes.begin(), + sibling.pending_bytes.begin() + num_pivots_to_borrow); + + u64 borrowed_pending_bytes_exact = + sibling.pending_bytes_is_exact & ((u64{1} << num_pivots_to_borrow) - 1); + u64 mask = ((u64{1} << num_pivots_to_borrow) - 1) << (this->pivot_count() - 1); + this->pending_bytes_is_exact = (this->pending_bytes_is_exact & ~mask) | + (borrowed_pending_bytes_exact << (this->pivot_count() - 1)); + + this->pivot_keys_.pop_back(); + this->pivot_keys_.insert(this->pivot_keys_.end(), + sibling.pivot_keys_.begin(), + sibling.pivot_keys_.begin() + num_pivots_to_borrow + 1); + sibling.pivot_keys_.erase(sibling.pivot_keys_.begin(), + sibling.pivot_keys_.begin() + num_pivots_to_borrow); + + this->child_pages.insert( + this->child_pages.end(), + std::make_move_iterator(sibling.child_pages.begin()), + std::make_move_iterator(sibling.child_pages.begin() + num_pivots_to_borrow)); + sibling.child_pages.erase(sibling.child_pages.begin(), + sibling.child_pages.begin() + num_pivots_to_borrow); + + this->children.insert(this->children.end(), + std::make_move_iterator(sibling.children.begin()), + std::make_move_iterator(sibling.children.begin() + num_pivots_to_borrow)); + sibling.children.erase(sibling.children.begin(), + sibling.children.begin() + num_pivots_to_borrow); + + BATT_ASSIGN_OK_RESULT( + this->max_key_, + this->children.back().get_max_key(context.page_loader, this->child_pages.back())); + } else { + this->pending_bytes.insert(this->pending_bytes.begin(), + sibling.pending_bytes.end() - num_pivots_to_borrow, + sibling.pending_bytes.end()); + sibling.pending_bytes.erase(sibling.pending_bytes.end() - num_pivots_to_borrow, + sibling.pending_bytes.end()); + + u64 borrowed_pending_bytes_exact = + sibling.pending_bytes_is_exact >> (64 - num_pivots_to_borrow); + this->pending_bytes_is_exact <<= num_pivots_to_borrow; + this->pending_bytes_is_exact |= borrowed_pending_bytes_exact; + + sibling.pivot_keys_.pop_back(); + this->pivot_keys_.insert(this->pivot_keys_.begin(), + sibling.pivot_keys_.end() - num_pivots_to_borrow, + sibling.pivot_keys_.end()); + sibling.pivot_keys_.erase(sibling.pivot_keys_.end() - num_pivots_to_borrow + 1, + sibling.pivot_keys_.end()); + + this->child_pages.insert( + this->child_pages.begin(), + std::make_move_iterator(sibling.child_pages.end() - num_pivots_to_borrow), + std::make_move_iterator(sibling.child_pages.end())); + sibling.child_pages.erase(sibling.child_pages.end() - num_pivots_to_borrow, + sibling.child_pages.end()); + + this->children.insert(this->children.begin(), + std::make_move_iterator(sibling.children.end() - num_pivots_to_borrow), + std::make_move_iterator(sibling.children.end())); + sibling.children.erase(sibling.children.end() - num_pivots_to_borrow, sibling.children.end()); + + BATT_ASSIGN_OK_RESULT( + sibling.max_key_, + sibling.children.back().get_max_key(context.page_loader, sibling.child_pages.back())); + } + + BATT_REQUIRE_OK(this->update_buffer_insert(borrowed_pivot_batch)); + + for (Level& level : sibling.update_buffer.levels) { + batt::case_of( // + level, // + [](EmptyLevel&) { + // nothing to do + }, + [&](MergedLevel& merged_level) { + merged_level.result_set.drop_key_range_half_open(borrowed_pivot_range); + }, + [&](SegmentedLevel& segmented_level) { + for (usize segment_i = 0; segment_i < segmented_level.segment_count(); ++segment_i) { + Segment& segment = segmented_level.get_segment(segment_i); + if (right_sibling) { + segment.flushed_pivots >>= num_pivots_to_borrow; + segment.active_pivots >>= num_pivots_to_borrow; + segment.flushed_item_upper_bound_.erase( + segment.flushed_item_upper_bound_.begin(), + segment.flushed_item_upper_bound_.begin() + num_pivots_to_borrow); + } else { + u64 mask = (u64{1} << (64 - num_pivots_to_borrow)) - 1; + segment.flushed_pivots &= mask; + segment.active_pivots &= mask; + segment.flushed_item_upper_bound_.erase( + segment.flushed_item_upper_bound_.end() - num_pivots_to_borrow, + segment.flushed_item_upper_bound_.end()); + } + } + }); + } + + KeyView left_child_max; + KeyView right_child_min; + if (right_sibling) { + left_child_max = this->get_max_key(); + right_child_min = sibling.get_min_key(); + } else { + left_child_max = sibling.get_max_key(); + right_child_min = this->get_min_key(); + } + + const KeyView prefix = llfs::find_common_prefix(0, left_child_max, right_child_min); + const KeyView new_sibling_pivot_key = right_child_min.substr(0, prefix.size() + 1); + + return new_sibling_pivot_key; +} + //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // Status InMemoryNode::set_pivot_items_flushed(llfs::PageLoader& page_loader, @@ -1672,6 +2217,27 @@ void InMemoryNode::UpdateBuffer::Segment::insert_pivot(i32 pivot_i, bool is_acti this->flushed_pivots = insert_bit(this->flushed_pivots, pivot_i, false); } +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +void InMemoryNode::UpdateBuffer::Segment::remove_pivot(i32 pivot_i) +{ + this->check_invariants(__FILE__, __LINE__); + auto on_scope_exit = batt::finally([&] { + this->check_invariants(__FILE__, __LINE__); + }); + + if (get_bit(this->flushed_pivots, pivot_i)) { + const i32 index = bit_rank(this->flushed_pivots, pivot_i); + BATT_ASSERT_GE(index, 0); + BATT_ASSERT_LT(index, this->flushed_item_upper_bound_.size()); + + this->flushed_item_upper_bound_.erase(this->flushed_item_upper_bound_.begin() + index); + } + + this->active_pivots = remove_bit(this->active_pivots, pivot_i); + this->flushed_pivots = remove_bit(this->flushed_pivots, pivot_i); +} + //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // void InMemoryNode::UpdateBuffer::Segment::pop_front_pivots(i32 count) @@ -1725,6 +2291,24 @@ SmallFn InMemoryNode::UpdateBuffer::dump() const }; } +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +u64 InMemoryNode::UpdateBuffer::compute_active_segmented_levels() const +{ + u64 active_pivots = 0; + for (const Level& level : this->levels) { + if (batt::is_case(level)) { + const SegmentedLevel& segmented_level = std::get(level); + for (usize segment_i = 0; segment_i < segmented_level.segment_count(); ++segment_i) { + const Segment& segment = segmented_level.get_segment(segment_i); + active_pivots |= segment.get_active_pivots(); + } + } + } + + return active_pivots; +} + //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // SmallFn InMemoryNode::UpdateBuffer::EmptyLevel::dump() const diff --git a/src/turtle_kv/tree/in_memory_node.hpp b/src/turtle_kv/tree/in_memory_node.hpp index 33ed541..7934f60 100644 --- a/src/turtle_kv/tree/in_memory_node.hpp +++ b/src/turtle_kv/tree/in_memory_node.hpp @@ -144,6 +144,11 @@ struct InMemoryNode { */ void insert_pivot(i32 pivot_i, bool is_active); + /** \brief Removes a pivot bit in this->active_pivots and this->flushed_pivots at position + * `pivot_i`. + */ + void remove_pivot(i32 pivot_i); + /** \brief Removes the specified number (`count`) pivots from the front of this segment. This * is used while splitting a node's update buffer. */ @@ -318,6 +323,14 @@ struct InMemoryNode { return estimated; } + MergedLevel concat(MergedLevel& that) + { + return MergedLevel{ + .result_set = MergeCompactor::ResultSet::concat(std::move(this->result_set), + std::move(that.result_set)), + .segment_future_ids_ = {}}; + } + /** \brief Returns the number of segment leaf page build jobs added to the context. */ StatusOr start_serialize(const InMemoryNode& node, TreeSerializeContext& context); @@ -338,6 +351,8 @@ struct InMemoryNode { SmallFn dump() const; + u64 compute_active_segmented_levels() const; + usize count_non_empty_levels() const { usize count = 0; @@ -525,10 +540,31 @@ struct InMemoryNode { */ Status try_flush(BatchUpdateContext& context); + /** \brief Merge the node with one of its siblings and return the newly merged node. + */ + StatusOr> try_merge(BatchUpdateContext& context, + InMemoryNode& sibling); + + /** \brief Attempts to make the node (that needs a merge) viable by borrowing data + * from one of its siblings. If successful, returns the new pivot key to be set in the parent + * of these two nodes to separate them. + */ + StatusOr try_borrow(BatchUpdateContext& context, InMemoryNode& sibling); + /** \brief Splits the specified child, inserting a new pivot immediately after `pivot_i`. */ Status split_child(BatchUpdateContext& update_context, i32 pivot_i); + /** \brief Merges the specified child with a sibling. + */ + Status merge_child(BatchUpdateContext& update_context, i32 pivot_i); + + /** \brief If the node has a single pivot, attempts to flush updates out of the update buffer + * to grow the number of pivots. If all the updates are flushed and still only a single pivot + * remains, the single pivot (child) is returned. + */ + StatusOr> flush_and_shrink(BatchUpdateContext& context); + /** \brief Returns true iff there are no MergedLevels or unserialized Subtree children in this * node. */ diff --git a/src/turtle_kv/tree/in_memory_node.test.cpp b/src/turtle_kv/tree/in_memory_node.test.cpp index 490d30d..dc0f3ea 100644 --- a/src/turtle_kv/tree/in_memory_node.test.cpp +++ b/src/turtle_kv/tree/in_memory_node.test.cpp @@ -131,7 +131,7 @@ void verify_table_point_queries(Table& expected_table, Table& actual_table, Rng& } } -void verify_range_scan(LatencyMetric* scan_latency, +/*void verify_range_scan(LatencyMetric* scan_latency, Table& expected_table, const Slice>& actual_read_items, const KeyView& min_key, @@ -164,7 +164,7 @@ void verify_range_scan(LatencyMetric* scan_latency, ++expected_item_iter; ++actual_item_iter; } -} +} */ struct SubtreeBatchUpdateScenario { static std::atomic& size_tiered_count() @@ -279,6 +279,7 @@ TEST(InMemoryNodeTest, Subtree) if (n_threads != 0) { runner.n_threads(n_threads); } + runner.n_threads(usize{1}); runner.n_seeds(n_seeds); if (n_seeds < 128) { @@ -323,7 +324,7 @@ void SubtreeBatchUpdateScenario::run() } TreeOptions tree_options = TreeOptions::with_default_values() // - .set_leaf_size(512 * kKiB) + .set_leaf_size(32 * kKiB) .set_node_size(4 * kKiB) .set_key_size_hint(24) .set_value_size_hint(100) @@ -365,7 +366,9 @@ void SubtreeBatchUpdateScenario::run() usize total_items = 0; - for (usize i = 0; i < max_i; ++i) { + std::vector pending_deletes; + + for (usize i = 0; i < max_i; ++i) { BatchUpdate update{ .context = BatchUpdateContext{ @@ -373,7 +376,7 @@ void SubtreeBatchUpdateScenario::run() .page_loader = *page_loader, .cancel_token = batt::CancelToken{}, }, - .result_set = result_set_generator(DecayToItem{}, rng, strings), + .result_set = result_set_generator(DecayToItem{}, rng, strings, pending_deletes), .edit_size_totals = None, }; update.update_edit_size_totals(); @@ -386,6 +389,19 @@ void SubtreeBatchUpdateScenario::run() Status table_update_status = update_table(expected_table, update.result_set); ASSERT_TRUE(table_update_status.ok()) << BATT_INSPECT(table_update_status); + if (my_id == 0) { + if (!pending_deletes.empty()) { + pending_deletes.clear(); + } + + if (i > 0) { + BATT_CHECK(pending_deletes.empty()); + for (const EditView& edit : update.result_set.get()) { + pending_deletes.emplace_back(edit.key); + } + } + } + StatusOr tree_height = tree.get_height(*page_loader); ASSERT_TRUE(tree_height.ok()) << BATT_INSPECT(tree_height); @@ -439,7 +455,7 @@ void SubtreeBatchUpdateScenario::run() << BATT_INSPECT(this->seed) << BATT_INSPECT(i); { - auto root_ptr = std::make_shared(tree.clone_serialized_or_panic()); + /*auto root_ptr = std::make_shared(tree.clone_serialized_or_panic()); std::unique_ptr scanner_page_job = page_cache->new_job(); const usize scan_len = pick_scan_len(rng); @@ -475,7 +491,7 @@ void SubtreeBatchUpdateScenario::run() as_slice(scan_items_buffer.data(), n_read), min_key, scan_len)) - << BATT_INSPECT(i) << BATT_INSPECT_STR(min_key) << BATT_INSPECT(scan_len); + << BATT_INSPECT(i) << BATT_INSPECT_STR(min_key) << BATT_INSPECT(scan_len); */ } if (my_id == 0) { diff --git a/src/turtle_kv/tree/packed_leaf_page.hpp b/src/turtle_kv/tree/packed_leaf_page.hpp index 1a2ad3e..74e65d3 100644 --- a/src/turtle_kv/tree/packed_leaf_page.hpp +++ b/src/turtle_kv/tree/packed_leaf_page.hpp @@ -513,23 +513,22 @@ struct LeafItemsSummary { struct AddLeafItemsSummary { LeafItemsSummary operator()(const LeafItemsSummary& prior, const EditView& edit) const noexcept { - if (!decays_to_item(edit.value)) { - LOG(ERROR) << "TODO [tastolfi 2025-05-27] support deletes:" << BATT_INSPECT(edit); - - return LeafItemsSummary{ - .drop_count = prior.drop_count + 1, - .key_count = prior.key_count, - .key_data_size = prior.key_data_size, - .value_data_size = prior.value_data_size, - }; - } else { - return LeafItemsSummary{ - .drop_count = prior.drop_count, - .key_count = prior.key_count + 1, - .key_data_size = prior.key_data_size + (edit.key.size() + 4), - .value_data_size = prior.value_data_size + (1 + edit.value.size()), - }; + usize drop_count = prior.drop_count; + if (decays_to_item(edit)) { + drop_count++; } + return LeafItemsSummary{ + .drop_count = drop_count, + .key_count = prior.key_count + 1, + .key_data_size = prior.key_data_size + (edit.key.size() + 4), + .value_data_size = prior.value_data_size + (1 + edit.value.size()), + }; + } + + LeafItemsSummary operator()(const LeafItemsSummary& prior, + const ItemView& edit) const noexcept + { + return AddLeafItemsSummary{}(BATT_FORWARD(prior), EditView::from_item_view(edit)); } LeafItemsSummary operator()(const LeafItemsSummary& left, @@ -556,8 +555,6 @@ template LeafItemsSummary{}, AddLeafItemsSummary{}); - BATT_CHECK_EQ(summary.drop_count, 0); - PackedLeafLayoutPlanBuilder plan_builder; plan_builder.page_size = page_size; diff --git a/src/turtle_kv/tree/subtree.cpp b/src/turtle_kv/tree/subtree.cpp index a104b05..4c6b659 100644 --- a/src/turtle_kv/tree/subtree.cpp +++ b/src/turtle_kv/tree/subtree.cpp @@ -139,10 +139,10 @@ Status Subtree::apply_batch_update(const TreeOptions& tree_options, auto new_leaf = std::make_unique(llfs::PinnedPage{}, tree_options); - new_leaf->result_set = update.result_set; + update.decay_batch_to_items(new_leaf->result_set); if (!update.edit_size_totals) { - update.update_edit_size_totals(); + update.update_edit_size_totals_decayed(new_leaf->result_set); } new_leaf->set_edit_size_totals(std::move(*update.edit_size_totals)); @@ -177,7 +177,7 @@ Status Subtree::apply_batch_update(const TreeOptions& tree_options, BATT_ASSIGN_OK_RESULT( // new_leaf->result_set, - update.context.merge_compact_edits( // + update.context.merge_compact_edits( // global_max_key(), [&](MergeCompactor& compactor) -> Status { compactor.push_level(update.result_set.live_edit_slices()); @@ -216,7 +216,7 @@ Status Subtree::apply_batch_update(const TreeOptions& tree_options, BATT_ASSIGN_OK_RESULT( in_memory_leaf->result_set, - update.context.merge_compact_edits( + update.context.merge_compact_edits( global_max_key(), [&](MergeCompactor& compactor) -> Status { compactor.push_level(update.result_set.live_edit_slices()); @@ -265,9 +265,14 @@ Status Subtree::apply_batch_update(const TreeOptions& tree_options, return status; }, [&](const NeedsMerge& needs_merge) { - BATT_CHECK(!needs_merge.single_pivot) - << "TODO [tastolfi 2025-03-26] implement flush and shrink"; - return OkStatus(); + // Only perform a shrink if the root has a single pivot. + // + Status status = new_subtree->flush_and_shrink(update.context); + + if (!status.ok()) { + LOG(INFO) << "flush_and_shrink failed;" << BATT_INSPECT(needs_merge); + } + return status; })); } @@ -306,6 +311,38 @@ Status Subtree::split_and_grow(BatchUpdateContext& context, return OkStatus(); } +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +Status Subtree::flush_and_shrink(BatchUpdateContext& context) +{ + BATT_CHECK(!this->locked_.load()); + + return batt::case_of( + this->impl_, + + [&](const llfs::PageIdSlot& page_id_slot [[maybe_unused]]) -> Status { + return {batt::StatusCode::kUnimplemented}; + }, + + [&](const std::unique_ptr& leaf [[maybe_unused]]) -> Status { + return OkStatus(); + }, + + [&](std::unique_ptr& node) -> Status { + StatusOr> status_or_new_root = node->flush_and_shrink(context); + + BATT_REQUIRE_OK(status_or_new_root); + + if (!*status_or_new_root) { + return OkStatus(); + } + + this->impl_ = std::move((*status_or_new_root)->impl_); + + return OkStatus(); + }); +} + //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // StatusOr Subtree::get_height(llfs::PageLoader& page_loader) const @@ -526,6 +563,74 @@ StatusOr> Subtree::try_split(BatchUpdateContext& context) }); } +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +StatusOr> Subtree::try_merge(BatchUpdateContext& context, Subtree& sibling) +{ + BATT_CHECK(!this->locked_.load()); + + return batt::case_of( + this->impl_, + + [&](const llfs::PageIdSlot& page_id_slot) -> StatusOr> { + BATT_PANIC() << "Cannot try merging a serialized subtree!"; + + return {batt::StatusCode::kUnimplemented}; + }, + + [&](const std::unique_ptr& leaf) -> StatusOr> { + BATT_CHECK(batt::is_case>(sibling.impl_)); + auto& sibling_leaf_ptr = std::get>(sibling.impl_); + BATT_CHECK(sibling_leaf_ptr); + + BATT_ASSIGN_OK_RESULT(std::unique_ptr merged_leaf, // + leaf->try_merge(context, *sibling_leaf_ptr)); + + return {Subtree{std::move(merged_leaf)}}; + }, + + [&](const std::unique_ptr& node) -> StatusOr> { + BATT_CHECK(batt::is_case>(sibling.impl_)); + auto& sibling_node_ptr = std::get>(sibling.impl_); + BATT_CHECK(sibling_node_ptr); + + BATT_ASSIGN_OK_RESULT(std::unique_ptr merged_node, // + node->try_merge(context, *sibling_node_ptr)); + + if (merged_node == nullptr) { + return Optional{None}; + } + + return {Subtree{std::move(merged_node)}}; + }); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +StatusOr Subtree::try_borrow(BatchUpdateContext& context, Subtree& sibling) +{ + BATT_CHECK(!this->locked_.load()); + + return batt::case_of( + this->impl_, + + [&](const llfs::PageIdSlot& page_id_slot [[maybe_unused]]) -> StatusOr { + return {batt::StatusCode::kUnimplemented}; + }, + + [&](const std::unique_ptr& leaf [[maybe_unused]]) -> StatusOr { + return {batt::StatusCode::kUnimplemented}; + }, + + [&](const std::unique_ptr& node) -> StatusOr { + BATT_CHECK(batt::is_case>(sibling.impl_)); + auto& sibling_node_ptr = std::get>(sibling.impl_); + BATT_CHECK(sibling_node_ptr); + + return node->try_borrow(context, *sibling_node_ptr); + }); +} + //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // Status Subtree::try_flush(BatchUpdateContext& context) @@ -645,4 +750,48 @@ void Subtree::lock() this->locked_.store(true); } +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +Status Subtree::to_in_memory_subtree(llfs::PageLoader& page_loader, + const TreeOptions& tree_options, + i32 height) +{ + BATT_CHECK_GT(height, 0); + + if (this->is_serialized()) { + llfs::PageIdSlot& page_id_slot = std::get(this->impl_); + + BATT_CHECK(page_id_slot.is_valid()); + + llfs::PageLayoutId expected_layout = Subtree::expected_layout_for_height(height); + + StatusOr status_or_pinned_page = page_id_slot.load_through( + page_loader, + llfs::PageLoadOptions{ + expected_layout, + llfs::PinPageToJob::kDefault, + llfs::OkIfNotFound{false}, + llfs::LruPriority{(height > 2) ? kNodeLruPriority : kLeafLruPriority}, + }); + + BATT_REQUIRE_OK(status_or_pinned_page) << BATT_INSPECT(height); + + llfs::PinnedPage& pinned_page = *status_or_pinned_page; + + if (height == 1) { + auto new_leaf = std::make_unique(batt::make_copy(pinned_page), tree_options); + this->impl_ = std::move(new_leaf); + } else { + const PackedNodePage& packed_node = PackedNodePage::view_of(pinned_page); + + BATT_ASSIGN_OK_RESULT( + std::unique_ptr node, + InMemoryNode::unpack(batt::make_copy(pinned_page), tree_options, packed_node)); + + this->impl_ = std::move(node); + } + } + + return OkStatus(); +} } // namespace turtle_kv diff --git a/src/turtle_kv/tree/subtree.hpp b/src/turtle_kv/tree/subtree.hpp index 1dc48b6..a3e296b 100644 --- a/src/turtle_kv/tree/subtree.hpp +++ b/src/turtle_kv/tree/subtree.hpp @@ -138,6 +138,19 @@ class Subtree */ StatusOr> try_split(BatchUpdateContext& context); + /** \brief Attempts to merge the given Subtree with one of its siblings. If successful, the + * newly merged Subtree is returned. + * + * If no merge, returns None. + */ + StatusOr> try_merge(BatchUpdateContext& context, Subtree& sibling); + + /** \brief Attempts to make the Subtree viable by borrowing data from one of its siblings. + * Called when the Subtree needs a merge, but borrowing is the only option to make the tree + * viable. + */ + StatusOr try_borrow(BatchUpdateContext& context, Subtree& sibling); + /** \brief Attempt to make the root viable by flushing a batch. */ Status try_flush(BatchUpdateContext& context); @@ -172,12 +185,24 @@ class Subtree */ bool is_locked() const; + /** \brief Converts a serialized Subtree to its in-memory equivalent. + */ + Status to_in_memory_subtree(llfs::PageLoader& page_loader, + const TreeOptions& tree_options, + i32 height); + //+++++++++++-+-+--+----- --- -- - - - - private: Status split_and_grow(BatchUpdateContext& context, const TreeOptions& tree_options, const KeyView& key_upper_bound); + /** \brief Called when the root of the tree is a node with a single pivot. This function + * flushes the root's update buffer until its is either empty + * (causing the tree to shrink in height) or until it gains more pivots. + */ + Status flush_and_shrink(BatchUpdateContext& context); + //+++++++++++-+-+--+----- --- -- - - - - std::variant, std::unique_ptr> diff --git a/src/turtle_kv/tree/testing/random_leaf_generator.hpp b/src/turtle_kv/tree/testing/random_leaf_generator.hpp index 7f128be..152d707 100644 --- a/src/turtle_kv/tree/testing/random_leaf_generator.hpp +++ b/src/turtle_kv/tree/testing/random_leaf_generator.hpp @@ -72,7 +72,7 @@ class RandomLeafGenerator // Compute a running total of packed sizes, so we can split the result set in to leaf pages. // batt::RunningTotal running_total = - compute_running_total(worker_pool, result.result_set, DecayToItem{}); + compute_running_total(worker_pool, result.result_set, DecayToItem{}); SplitParts page_parts = split_parts( // running_total, // From 47dc60483cb5c92b90c9a4ad62e79c80f393ca77 Mon Sep 17 00:00:00 2001 From: Vidya Silai Date: Wed, 12 Nov 2025 11:23:54 -0500 Subject: [PATCH 02/10] Added some more comments --- src/turtle_kv/tree/in_memory_node.cpp | 45 +++++++++++++++++++++------ 1 file changed, 35 insertions(+), 10 deletions(-) diff --git a/src/turtle_kv/tree/in_memory_node.cpp b/src/turtle_kv/tree/in_memory_node.cpp index 032c566..0c97057 100644 --- a/src/turtle_kv/tree/in_memory_node.cpp +++ b/src/turtle_kv/tree/in_memory_node.cpp @@ -703,19 +703,18 @@ Status InMemoryNode::merge_child(BatchUpdateContext& update_context, i32 pivot_i i32 right_sibling = pivot_i + 1; i32 left_sibling = pivot_i - 1; - bool need_compaction = false; + bool need_update_buffer_compaction = false; u64 active_segmented_levels = this->update_buffer.compute_active_segmented_levels(); if (pivot_i == 0) { sibling_i = right_sibling; - if (get_bit(active_segmented_levels, pivot_i)) { - need_compaction = true; - } - } else if ((usize)pivot_i == this->children.size() - 1) { + } else if ((usize)pivot_i == this->pivot_count() - 1) { sibling_i = left_sibling; - if (get_bit(active_segmented_levels, left_sibling)) { - need_compaction = true; - } } else { + // If we don't have one of the edge cases, try and pick the sibling where the leftmost of + // {child, sibling} is inactive in all segmented levels. This way, the final merged pivot + // won't have on/off flushed ranges in segments. If this is not possible, pick the right + // sibling. + // if (!get_bit(active_segmented_levels, pivot_i)) { sibling_i = right_sibling; } else { @@ -723,12 +722,15 @@ Status InMemoryNode::merge_child(BatchUpdateContext& update_context, i32 pivot_i sibling_i = left_sibling; } else { sibling_i = right_sibling; - need_compaction = true; } } } BATT_CHECK_NE(pivot_i, sibling_i); + if (get_bit(active_segmented_levels, std::min(pivot_i, sibling_i))) { + need_update_buffer_compaction = true; + } + BATT_REQUIRE_OK(this->children[sibling_i].to_in_memory_subtree(update_context.page_loader, this->tree_options, this->height - 1)); @@ -744,6 +746,8 @@ Status InMemoryNode::merge_child(BatchUpdateContext& update_context, i32 pivot_i if (!*status_or_merged) { if (!batt::is_case(child.get_viability())) { + // If the full merge wasn't possible, try borrowing from the sibling. + // BATT_ASSIGN_OK_RESULT(KeyView new_pivot_key, child.try_borrow(update_context, sibling)); this->pivot_keys_[std::max(pivot_i, sibling_i)] = new_pivot_key; @@ -766,7 +770,7 @@ Status InMemoryNode::merge_child(BatchUpdateContext& update_context, i32 pivot_i // Update the update_buffer levels. // - if (need_compaction) { + if (need_update_buffer_compaction) { BATT_REQUIRE_OK(this->compact_update_buffer_levels(update_context)); } else { for (Level& level : this->update_buffer.levels) { @@ -1071,6 +1075,9 @@ StatusOr InMemoryNode::try_borrow(BatchUpdateContext& context, InMemory BATT_CHECK_LT(this->pivot_count(), 4); u32 num_pivots_to_borrow = 4 - this->pivot_count(); + // Calculate the pivot range to borrow from the sibling, and then extract updates from the + // sibling's update buffer that contain this range. + // i32 borrowed_min_pivot_i = -1; KeyView borrowed_max_pivot_key; if (right_sibling) { @@ -1116,6 +1123,8 @@ StatusOr InMemoryNode::try_borrow(BatchUpdateContext& context, InMemory borrowed_pivot_batch.edit_size_totals = None; + // Borrow node metadata from the sibling. + // if (right_sibling) { this->pending_bytes.insert(this->pending_bytes.end(), sibling.pending_bytes.begin(), @@ -1123,12 +1132,18 @@ StatusOr InMemoryNode::try_borrow(BatchUpdateContext& context, InMemory sibling.pending_bytes.erase(sibling.pending_bytes.begin(), sibling.pending_bytes.begin() + num_pivots_to_borrow); + // Update this->pending_bytes_is_exact by placing the borrowing pending bytes bits from the + // right sibling right after the pending bytes bits for this node. + // u64 borrowed_pending_bytes_exact = sibling.pending_bytes_is_exact & ((u64{1} << num_pivots_to_borrow) - 1); u64 mask = ((u64{1} << num_pivots_to_borrow) - 1) << (this->pivot_count() - 1); this->pending_bytes_is_exact = (this->pending_bytes_is_exact & ~mask) | (borrowed_pending_bytes_exact << (this->pivot_count() - 1)); + // Get rid of the key upper bound in this node and insert the borrowed pivot keys, including + // one past num_pivots_to_borrow, to set the new key upper bound. + // this->pivot_keys_.pop_back(); this->pivot_keys_.insert(this->pivot_keys_.end(), sibling.pivot_keys_.begin(), @@ -1159,6 +1174,9 @@ StatusOr InMemoryNode::try_borrow(BatchUpdateContext& context, InMemory sibling.pending_bytes.erase(sibling.pending_bytes.end() - num_pivots_to_borrow, sibling.pending_bytes.end()); + // Shift this->pending_bytes_is_exact up by num_pivots_to_borrow, and place the borrowed + // pending bytes bits at the lowest order bits. + // u64 borrowed_pending_bytes_exact = sibling.pending_bytes_is_exact >> (64 - num_pivots_to_borrow); this->pending_bytes_is_exact <<= num_pivots_to_borrow; @@ -1188,8 +1206,13 @@ StatusOr InMemoryNode::try_borrow(BatchUpdateContext& context, InMemory sibling.children.back().get_max_key(context.page_loader, sibling.child_pages.back())); } + // Now that metadata has been borrowed, inserted the borrowed updates into the update buffer. + // BATT_REQUIRE_OK(this->update_buffer_insert(borrowed_pivot_batch)); + // Adjust the update buffer levels metadata in the sibling now that the borrowed updates have + // been extracted. + // for (Level& level : sibling.update_buffer.levels) { batt::case_of( // level, // @@ -1220,6 +1243,8 @@ StatusOr InMemoryNode::try_borrow(BatchUpdateContext& context, InMemory }); } + // Calculate and return the new pivot key for the parent. + // KeyView left_child_max; KeyView right_child_min; if (right_sibling) { From 99d750feb66c7df73d53578c7aec614d789b293c Mon Sep 17 00:00:00 2001 From: Vidya Silai Date: Wed, 12 Nov 2025 11:28:19 -0500 Subject: [PATCH 03/10] Remove conan file changes --- conanfile.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conanfile.py b/conanfile.py index 0d0b7ef..9492d30 100644 --- a/conanfile.py +++ b/conanfile.py @@ -60,7 +60,7 @@ def requirements(self): self.requires("gperftools/2.16", **VISIBLE) self.requires("llfs/0.42.0", **VISIBLE) self.requires("pcg-cpp/cci.20220409", **VISIBLE) - self.requires("vqf/0.2.5-devel", **VISIBLE) + self.requires("vqf/0.2.5", **VISIBLE) self.requires("zlib/1.3.1", **OVERRIDE) if platform.system() == "Linux": From 969c140a2b555c86eba5cddf2a5300ff66737ea8 Mon Sep 17 00:00:00 2001 From: Vidya Silai Date: Sat, 15 Nov 2025 12:38:18 -0500 Subject: [PATCH 04/10] Some bug fixes; new test case --- src/turtle_kv/core/merge_compactor.cpp | 2 + src/turtle_kv/tree/in_memory_leaf.cpp | 26 ++-- src/turtle_kv/tree/in_memory_node.cpp | 5 +- src/turtle_kv/tree/in_memory_node.test.cpp | 159 ++++++++++++++++++++- src/turtle_kv/tree/subtree.cpp | 24 +++- src/turtle_kv/tree/subtree.hpp | 2 +- 6 files changed, 202 insertions(+), 16 deletions(-) diff --git a/src/turtle_kv/core/merge_compactor.cpp b/src/turtle_kv/core/merge_compactor.cpp index a50409e..963d2ce 100644 --- a/src/turtle_kv/core/merge_compactor.cpp +++ b/src/turtle_kv/core/merge_compactor.cpp @@ -495,6 +495,8 @@ template chunk_from_second.offset += first_size; }); + ans.chunks_.back().offset = first_size + second.chunks_.back().offset; + first.clear(); second.clear(); diff --git a/src/turtle_kv/tree/in_memory_leaf.cpp b/src/turtle_kv/tree/in_memory_leaf.cpp index 1708bf1..a5f5e55 100644 --- a/src/turtle_kv/tree/in_memory_leaf.cpp +++ b/src/turtle_kv/tree/in_memory_leaf.cpp @@ -159,17 +159,25 @@ StatusOr> InMemoryLeaf::try_merge(BatchUpdateConte // Concatenate the two leaves' result sets in the correct order. // - bool right_sibling = this->get_max_key() < sibling.get_min_key(); - if (right_sibling) { - merged_leaf->result_set = - MergeCompactor::ResultSet::concat(std::move(this->result_set), - std::move(sibling.result_set)); + if (this->result_set.empty()) { + merged_leaf->result_set = std::move(sibling.result_set); + merged_leaf->shared_edit_size_totals_ = std::move(sibling.shared_edit_size_totals_); + merged_leaf->edit_size_totals.emplace(merged_leaf->shared_edit_size_totals_->begin(), + merged_leaf->shared_edit_size_totals_->end()); } else { - merged_leaf->result_set = MergeCompactor::ResultSet::concat(std::move(sibling.result_set), - std::move(this->result_set)); - } + bool right_sibling = this->get_max_key() < sibling.get_min_key(); + if (right_sibling) { + merged_leaf->result_set = + MergeCompactor::ResultSet::concat(std::move(this->result_set), + std::move(sibling.result_set)); + } else { + merged_leaf->result_set = + MergeCompactor::ResultSet::concat(std::move(sibling.result_set), + std::move(this->result_set)); + } - merged_leaf->set_edit_size_totals(context.compute_running_total(merged_leaf->result_set)); + merged_leaf->set_edit_size_totals(context.compute_running_total(merged_leaf->result_set)); + } return {std::move(merged_leaf)}; } diff --git a/src/turtle_kv/tree/in_memory_node.cpp b/src/turtle_kv/tree/in_memory_node.cpp index 0c97057..37da683 100644 --- a/src/turtle_kv/tree/in_memory_node.cpp +++ b/src/turtle_kv/tree/in_memory_node.cpp @@ -731,13 +731,14 @@ Status InMemoryNode::merge_child(BatchUpdateContext& update_context, i32 pivot_i need_update_buffer_compaction = true; } - BATT_REQUIRE_OK(this->children[sibling_i].to_in_memory_subtree(update_context.page_loader, + BATT_REQUIRE_OK(this->children[sibling_i].to_in_memory_subtree(update_context, this->tree_options, this->height - 1)); // Call child.try_merge(). // Subtree& sibling = this->children[sibling_i]; + BATT_CHECK(batt::is_case(sibling.get_viability())); StatusOr> status_or_merged = child.try_merge(update_context, sibling); if (!status_or_merged.ok()) { LOG(ERROR) << BATT_INSPECT(child.get_viability()); @@ -811,7 +812,7 @@ Status InMemoryNode::merge_child(BatchUpdateContext& update_context, i32 pivot_i // Finally, split the newly merged child if needed. // - SubtreeViability merged_viability = merged_subtree.get_viability(); + SubtreeViability merged_viability = this->children[pivot_to_overwrite].get_viability(); if (batt::is_case(merged_viability)) { BATT_REQUIRE_OK(this->split_child(update_context, pivot_to_overwrite)); } else { diff --git a/src/turtle_kv/tree/in_memory_node.test.cpp b/src/turtle_kv/tree/in_memory_node.test.cpp index dc0f3ea..4e24c5b 100644 --- a/src/turtle_kv/tree/in_memory_node.test.cpp +++ b/src/turtle_kv/tree/in_memory_node.test.cpp @@ -56,6 +56,7 @@ using turtle_kv::KVStoreScanner; using turtle_kv::LatencyMetric; using turtle_kv::LatencyTimer; using turtle_kv::make_memory_page_cache; +using turtle_kv::NeedsMerge; using turtle_kv::NeedsSplit; using turtle_kv::None; using turtle_kv::OkStatus; @@ -74,7 +75,9 @@ using turtle_kv::TreeOptions; using turtle_kv::TreeSerializeContext; using turtle_kv::ValueView; using turtle_kv::testing::RandomResultSetGenerator; +using turtle_kv::testing::RandomStringGenerator; +using llfs::get_key; using llfs::StableStringStore; using batt::getenv_as; @@ -368,7 +371,7 @@ void SubtreeBatchUpdateScenario::run() std::vector pending_deletes; - for (usize i = 0; i < max_i; ++i) { + for (usize i = 0; i < max_i; ++i) { BatchUpdate update{ .context = BatchUpdateContext{ @@ -509,4 +512,158 @@ void SubtreeBatchUpdateScenario::run() } } +TEST(InMemoryNodeTest, SubtreeDeletions) +{ + const usize key_size = 24; + const usize value_size = 100; + + TreeOptions tree_options = TreeOptions::with_default_values() // + .set_leaf_size(32 * kKiB) + .set_node_size(4 * kKiB) + .set_key_size_hint(key_size) + .set_value_size_hint(value_size); + + usize items_per_leaf = tree_options.flush_size() / tree_options.expected_item_size(); + usize total_batches = 81; + + std::vector keys; + keys.reserve(total_batches * items_per_leaf); + + std::string value_str = std::string(value_size, 'a'); + ValueView value = ValueView::from_str(value_str); + + std::default_random_engine rng{/*seed=*/1}; + RandomStringGenerator generate_key; + for (usize i = 0; i < total_batches * items_per_leaf; ++i) { + keys.emplace_back(generate_key(rng)); + } + std::sort(keys.begin(), keys.end(), llfs::KeyOrder{}); + keys.erase(std::unique(keys.begin(), + keys.end(), + [](const auto& l, const auto& r) { + return get_key(l) == get_key(r); + }), + keys.end()); + BATT_CHECK_EQ(keys.size(), total_batches * items_per_leaf); + + std::shared_ptr page_cache = + make_memory_page_cache(batt::Runtime::instance().default_scheduler(), + tree_options, + /*byte_capacity=*/1500 * kMiB); + + Subtree tree = Subtree::make_empty(); + + ASSERT_TRUE(tree.is_serialized()); + + batt::WorkerPool& worker_pool = batt::WorkerPool::null_pool(); + + Optional page_loader{*page_cache}; + + const auto create_insertion_batch = [&](usize batch_number) -> std::vector { + std::vector current_batch; + current_batch.reserve(items_per_leaf); + for (usize j = 0; j < items_per_leaf; ++j) { + current_batch.emplace_back(keys[(batch_number * items_per_leaf) + j], value); + } + + return current_batch; + }; + + const auto create_deletion_batch = [&](usize batch_number) -> std::vector { + std::vector current_batch; + current_batch.reserve(items_per_leaf); + + usize per_batch = items_per_leaf / total_batches; + usize batch_remainder = items_per_leaf % total_batches; + usize total_amount_per_batch = per_batch + (batch_number < batch_remainder ? 1 : 0); + + for (usize i = 0; i < total_batches; ++i) { + usize base_i = i * items_per_leaf; + usize offset = batch_number * per_batch + std::min(batch_number, batch_remainder); + + for (usize j = 0; j < total_amount_per_batch; ++j) { + current_batch.emplace_back(keys[base_i + offset + j], ValueView::deleted()); + } + } + BATT_CHECK_LE(current_batch.size(), items_per_leaf) << BATT_INSPECT(batch_number); + + return current_batch; + }; + + const auto apply_tree_updates = [&](auto batch_creation_func) { + for (usize i = 0; i < total_batches; ++i) { + std::vector current_batch = batch_creation_func(i); + // LOG(INFO) << "current batch: " << i << ", size: " << current_batch.size(); + + ResultSet result; + result.append(std::move(current_batch)); + + BatchUpdate update{ + .context = + BatchUpdateContext{ + .worker_pool = worker_pool, + .page_loader = *page_loader, + .cancel_token = batt::CancelToken{}, + }, + .result_set = std::move(result), + .edit_size_totals = None, + }; + update.update_edit_size_totals(); + + StatusOr tree_height = tree.get_height(*page_loader); + ASSERT_TRUE(tree_height.ok()) << BATT_INSPECT(tree_height); + // LOG(INFO) << "tree height at batch_number " << i << ": " << *tree_height; + + Status status = // + tree.apply_batch_update(tree_options, + ParentNodeHeight{*tree_height + 1}, + update, + /*key_upper_bound=*/global_max_key(), + IsRoot{true}); + + ASSERT_TRUE(status.ok()) << BATT_INSPECT(status) << BATT_INSPECT(i); + ASSERT_FALSE(tree.is_serialized()); + ASSERT_FALSE(batt::is_case(tree.get_viability())); + } + }; + + apply_tree_updates(create_insertion_batch); + + std::unique_ptr page_job = page_cache->new_job(); + TreeSerializeContext context{tree_options, *page_job, worker_pool}; + + Status start_status = tree.start_serialize(context); + ASSERT_TRUE(start_status.ok()) << BATT_INSPECT(start_status); + + Status build_status = context.build_all_pages(); + ASSERT_TRUE(build_status.ok()) << BATT_INSPECT(build_status); + + StatusOr finish_status = tree.finish_serialize(context); + ASSERT_TRUE(finish_status.ok()) << BATT_INSPECT(finish_status); + + page_job->new_root(*finish_status); + Status commit_status = llfs::unsafe_commit_job(std::move(page_job)); + ASSERT_TRUE(commit_status.ok()) << BATT_INSPECT(commit_status); + + page_loader.emplace(*page_cache); + + apply_tree_updates(create_deletion_batch); + + StatusOr tree_height = tree.get_height(*page_loader); + ASSERT_TRUE(tree_height.ok()) << BATT_INSPECT(tree_height); + + /*BatchUpdateContext update_context{ + .worker_pool = worker_pool, + .page_loader = *page_loader, + .cancel_token = batt::CancelToken{}, + }; + while (*tree_height > 2) { + Status flush_status = tree.try_flush(update_context); + ASSERT_TRUE(flush_status.ok()); + + tree_height = tree.get_height(*page_loader); + ASSERT_TRUE(tree_height.ok()) << BATT_INSPECT(tree_height); + } */ +} + } // namespace diff --git a/src/turtle_kv/tree/subtree.cpp b/src/turtle_kv/tree/subtree.cpp index 4c6b659..8adfddc 100644 --- a/src/turtle_kv/tree/subtree.cpp +++ b/src/turtle_kv/tree/subtree.cpp @@ -256,6 +256,14 @@ Status Subtree::apply_batch_update(const TreeOptions& tree_options, return OkStatus(); }, [&](NeedsSplit needs_split) { + if (needs_split.too_many_segments && !needs_split.too_many_pivots && + !needs_split.keys_too_large) { + Status flush_status = new_subtree->try_flush(update.context); + if (flush_status.ok() && batt::is_case(new_subtree->get_viability())) { + return OkStatus(); + } + } + Status status = new_subtree->split_and_grow(update.context, tree_options, key_upper_bound); @@ -626,7 +634,7 @@ StatusOr Subtree::try_borrow(BatchUpdateContext& context, Subtree& sibl BATT_CHECK(batt::is_case>(sibling.impl_)); auto& sibling_node_ptr = std::get>(sibling.impl_); BATT_CHECK(sibling_node_ptr); - + return node->try_borrow(context, *sibling_node_ptr); }); } @@ -752,7 +760,7 @@ void Subtree::lock() //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // -Status Subtree::to_in_memory_subtree(llfs::PageLoader& page_loader, +Status Subtree::to_in_memory_subtree(BatchUpdateContext& context, const TreeOptions& tree_options, i32 height) { @@ -766,7 +774,7 @@ Status Subtree::to_in_memory_subtree(llfs::PageLoader& page_loader, llfs::PageLayoutId expected_layout = Subtree::expected_layout_for_height(height); StatusOr status_or_pinned_page = page_id_slot.load_through( - page_loader, + context.page_loader, llfs::PageLoadOptions{ expected_layout, llfs::PinPageToJob::kDefault, @@ -780,6 +788,16 @@ Status Subtree::to_in_memory_subtree(llfs::PageLoader& page_loader, if (height == 1) { auto new_leaf = std::make_unique(batt::make_copy(pinned_page), tree_options); + const PackedLeafPage& packed_leaf = PackedLeafPage::view_of(pinned_page); + + std::vector items; + for (const PackedKeyValue& pkv : packed_leaf.items_slice()) { + items.emplace_back(to_edit_view(pkv)); + } + new_leaf->result_set.append(std::move(items)); + + new_leaf->set_edit_size_totals(context.compute_running_total(new_leaf->result_set)); + this->impl_ = std::move(new_leaf); } else { const PackedNodePage& packed_node = PackedNodePage::view_of(pinned_page); diff --git a/src/turtle_kv/tree/subtree.hpp b/src/turtle_kv/tree/subtree.hpp index a3e296b..3f883cb 100644 --- a/src/turtle_kv/tree/subtree.hpp +++ b/src/turtle_kv/tree/subtree.hpp @@ -187,7 +187,7 @@ class Subtree /** \brief Converts a serialized Subtree to its in-memory equivalent. */ - Status to_in_memory_subtree(llfs::PageLoader& page_loader, + Status to_in_memory_subtree(BatchUpdateContext& context, const TreeOptions& tree_options, i32 height); From ca5b89ba674311b476d6658a31232cdf52b47549 Mon Sep 17 00:00:00 2001 From: Vidya Silai Date: Mon, 17 Nov 2025 00:12:45 -0500 Subject: [PATCH 05/10] More bug fixes --- src/turtle_kv/kv_store_scanner.cpp | 3 +- src/turtle_kv/tree/in_memory_leaf.cpp | 35 +++-- src/turtle_kv/tree/in_memory_leaf.hpp | 2 +- src/turtle_kv/tree/in_memory_node.cpp | 128 +++++++++--------- src/turtle_kv/tree/in_memory_node.hpp | 14 +- src/turtle_kv/tree/in_memory_node.test.cpp | 145 +++++++++++++++------ src/turtle_kv/tree/subtree.cpp | 27 +++- src/turtle_kv/tree/subtree.hpp | 10 +- 8 files changed, 228 insertions(+), 136 deletions(-) diff --git a/src/turtle_kv/kv_store_scanner.cpp b/src/turtle_kv/kv_store_scanner.cpp index 36b2d14..63fdb5b 100644 --- a/src/turtle_kv/kv_store_scanner.cpp +++ b/src/turtle_kv/kv_store_scanner.cpp @@ -461,7 +461,8 @@ Status KVStoreScanner::set_next_item() LatencyTimer timer{batt::Every2ToTheConst<8>{}, KVStoreScanner::metrics().heap_remove_latency}; this->heap_.remove_first(); - this->needs_resume_ = true; + //this->needs_resume_ = true; + BATT_REQUIRE_OK(this->resume()); } } diff --git a/src/turtle_kv/tree/in_memory_leaf.cpp b/src/turtle_kv/tree/in_memory_leaf.cpp index a5f5e55..2a2193c 100644 --- a/src/turtle_kv/tree/in_memory_leaf.cpp +++ b/src/turtle_kv/tree/in_memory_leaf.cpp @@ -151,34 +151,31 @@ auto InMemoryLeaf::make_split_plan() const -> StatusOr //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // -StatusOr> InMemoryLeaf::try_merge(BatchUpdateContext& context, - InMemoryLeaf& sibling) +StatusOr> InMemoryLeaf::try_merge( + BatchUpdateContext& context, + std::unique_ptr sibling) noexcept { + if (this->result_set.empty()) { + return {std::move(sibling)}; + } + auto merged_leaf = std::make_unique(batt::make_copy(this->pinned_leaf_page_), this->tree_options); // Concatenate the two leaves' result sets in the correct order. // - if (this->result_set.empty()) { - merged_leaf->result_set = std::move(sibling.result_set); - merged_leaf->shared_edit_size_totals_ = std::move(sibling.shared_edit_size_totals_); - merged_leaf->edit_size_totals.emplace(merged_leaf->shared_edit_size_totals_->begin(), - merged_leaf->shared_edit_size_totals_->end()); + bool right_sibling = this->get_max_key() < sibling->get_min_key(); + if (right_sibling) { + merged_leaf->result_set = + MergeCompactor::ResultSet::concat(std::move(this->result_set), + std::move(sibling->result_set)); } else { - bool right_sibling = this->get_max_key() < sibling.get_min_key(); - if (right_sibling) { - merged_leaf->result_set = - MergeCompactor::ResultSet::concat(std::move(this->result_set), - std::move(sibling.result_set)); - } else { - merged_leaf->result_set = - MergeCompactor::ResultSet::concat(std::move(sibling.result_set), - std::move(this->result_set)); - } - - merged_leaf->set_edit_size_totals(context.compute_running_total(merged_leaf->result_set)); + merged_leaf->result_set = MergeCompactor::ResultSet::concat(std::move(sibling->result_set), + std::move(this->result_set)); } + merged_leaf->set_edit_size_totals(context.compute_running_total(merged_leaf->result_set)); + return {std::move(merged_leaf)}; } diff --git a/src/turtle_kv/tree/in_memory_leaf.hpp b/src/turtle_kv/tree/in_memory_leaf.hpp index 879fee4..ca26bd7 100644 --- a/src/turtle_kv/tree/in_memory_leaf.hpp +++ b/src/turtle_kv/tree/in_memory_leaf.hpp @@ -93,7 +93,7 @@ struct InMemoryLeaf { StatusOr make_split_plan() const; StatusOr> try_merge(BatchUpdateContext& context, - InMemoryLeaf& sibling); + std::unique_ptr sibling) noexcept; Status start_serialize(TreeSerializeContext& context); diff --git a/src/turtle_kv/tree/in_memory_node.cpp b/src/turtle_kv/tree/in_memory_node.cpp index 37da683..b2722fd 100644 --- a/src/turtle_kv/tree/in_memory_node.cpp +++ b/src/turtle_kv/tree/in_memory_node.cpp @@ -692,8 +692,16 @@ Status InMemoryNode::split_child(BatchUpdateContext& update_context, i32 pivot_i //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // -Status InMemoryNode::merge_child(BatchUpdateContext& update_context, i32 pivot_i) +Status InMemoryNode::merge_child(BatchUpdateContext& update_context, i32 pivot_i) noexcept { + // Special case: we have a tree composed of one node (root) and one leaf (its only child). + // In this case, don't progress with the rest of the function, as we are in the middle of a + // flush and shrink. + // + if (this->height == 2 && this->pivot_count() == 1) { + return OkStatus(); + } + Subtree& child = this->children[pivot_i]; // Decide which sibling to merge with. Edge cases: child that needs merge is the leftmost or @@ -763,8 +771,9 @@ Status InMemoryNode::merge_child(BatchUpdateContext& update_context, i32 pivot_i // Erase rightmost of {child subtree, sibling} in this->child_pages, overwrite leftmost // with new PinnedPage{}. // - i32 pivot_to_erase = std::max(pivot_i, sibling_i); - i32 pivot_to_overwrite = std::min(pivot_i, sibling_i); + const i32 pivot_to_erase = std::max(pivot_i, sibling_i); + const i32 pivot_to_overwrite = std::min(pivot_i, sibling_i); + const usize old_pivot_count = this->pivot_count(); this->child_pages[pivot_to_overwrite] = llfs::PinnedPage{}; this->child_pages.erase(this->child_pages.begin() + pivot_to_erase); @@ -804,7 +813,7 @@ Status InMemoryNode::merge_child(BatchUpdateContext& update_context, i32 pivot_i // this->pivot_keys_.erase(this->pivot_keys_.begin() + pivot_to_erase); - if ((usize)pivot_to_erase == this->children.size()) { + if ((usize)pivot_to_erase == old_pivot_count - 1) { BATT_ASSIGN_OK_RESULT( this->max_key_, this->children.back().get_max_key(update_context.page_loader, this->child_pages.back())); @@ -824,7 +833,7 @@ Status InMemoryNode::merge_child(BatchUpdateContext& update_context, i32 pivot_i //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // -StatusOr> InMemoryNode::flush_and_shrink(BatchUpdateContext& context) +StatusOr> InMemoryNode::flush_and_shrink(BatchUpdateContext& context) noexcept { // If more than one pivot exists, nothings needs to be done. // @@ -833,7 +842,7 @@ StatusOr> InMemoryNode::flush_and_shrink(BatchUpdateContext& c return None; } - i32 single_pivot_i = 0; + const i32 single_pivot_i = 0; BATT_CHECK_EQ(this->pending_bytes.size(), 1); usize pending_bytes_count = this->pending_bytes[single_pivot_i]; @@ -857,7 +866,7 @@ StatusOr> InMemoryNode::flush_and_shrink(BatchUpdateContext& c //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // StatusOr> InMemoryNode::try_merge(BatchUpdateContext& context, - InMemoryNode& sibling) + InMemoryNode& sibling) noexcept { // If merging both full nodes will cause the merged node's pivot count to exceed the max // possible pivot count, return null so that we can try a borrow. @@ -873,6 +882,8 @@ StatusOr> InMemoryNode::try_merge(BatchUpdateConte const auto concat_metadata = [&](InMemoryNode& left, InMemoryNode& right) { new_node->max_key_ = right.max_key_; + new_node->height = left.height; + new_node->latest_flush_pivot_i_ = None; new_node->pending_bytes.insert(new_node->pending_bytes.end(), @@ -938,30 +949,12 @@ StatusOr> InMemoryNode::try_merge(BatchUpdateConte [&](SegmentedLevel& right_segmented_level) -> Status { // When merging a MergedLevel and a SegmentedLevel, create a new MergedLevel. // - MergedLevel new_merged_level; - HasPageRefs has_page_refs{false}; - Status segment_load_status; - - Slice levels_to_merge = - as_slice(right.update_buffer.levels.data() + i, usize{1}); - - BATT_ASSIGN_OK_RESULT( // - new_merged_level.result_set, - context.merge_compact_edits( // - global_max_key(), - [&](MergeCompactor& compactor) -> Status { - compactor.push_level(left_merged_level.result_set.live_edit_slices()); - right.push_levels_to_merge(compactor, - context.page_loader, - segment_load_status, - has_page_refs, - levels_to_merge, - /*min_pivot_i=*/0, - /*only_pivot=*/false); - return OkStatus(); - })); - - BATT_REQUIRE_OK(segment_load_status); + BATT_ASSIGN_OK_RESULT( + MergedLevel new_merged_level, + UpdateBuffer::merge_segmented_and_merged_level(context, + left_merged_level, + right_segmented_level, + right)); new_node->update_buffer.levels.emplace_back(std::move(new_merged_level)); @@ -982,31 +975,12 @@ StatusOr> InMemoryNode::try_merge(BatchUpdateConte return OkStatus(); }, [&](MergedLevel& right_merged_level) -> Status { - MergedLevel new_merged_level; - HasPageRefs has_page_refs{false}; - Status segment_load_status; - - Slice levels_to_merge = - as_slice(left.update_buffer.levels.data() + i, usize{1}); - - BATT_ASSIGN_OK_RESULT( // - new_merged_level.result_set, - context.merge_compact_edits( // - global_max_key(), - [&](MergeCompactor& compactor) -> Status { - left.push_levels_to_merge(compactor, - context.page_loader, - segment_load_status, - has_page_refs, - levels_to_merge, - /*min_pivot_i=*/0, - /*only_pivot=*/false); - compactor.push_level( - right_merged_level.result_set.live_edit_slices()); - return OkStatus(); - })); - - BATT_REQUIRE_OK(segment_load_status); + BATT_ASSIGN_OK_RESULT( + MergedLevel new_merged_level, + UpdateBuffer::merge_segmented_and_merged_level(context, + right_merged_level, + left_segmented_level, + left)); new_node->update_buffer.levels.emplace_back(std::move(new_merged_level)); @@ -1067,7 +1041,8 @@ StatusOr> InMemoryNode::try_merge(BatchUpdateConte //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // -StatusOr InMemoryNode::try_borrow(BatchUpdateContext& context, InMemoryNode& sibling) +StatusOr InMemoryNode::try_borrow(BatchUpdateContext& context, + InMemoryNode& sibling) noexcept { BATT_CHECK(batt::is_case(sibling.get_viability())); @@ -1212,7 +1187,7 @@ StatusOr InMemoryNode::try_borrow(BatchUpdateContext& context, InMemory BATT_REQUIRE_OK(this->update_buffer_insert(borrowed_pivot_batch)); // Adjust the update buffer levels metadata in the sibling now that the borrowed updates have - // been extracted. + // been extracted. // for (Level& level : sibling.update_buffer.levels) { batt::case_of( // @@ -2159,6 +2134,43 @@ void InMemoryNode::UpdateBuffer::SegmentedLevel::check_items_sorted( } } +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +/*static*/ StatusOr InMemoryNode::UpdateBuffer::merge_segmented_and_merged_level( + BatchUpdateContext& context, + MergedLevel& merged_level, + SegmentedLevel& segmented_level, + InMemoryNode& segmented_level_node) noexcept +{ + MergedLevel new_merged_level; + HasPageRefs has_page_refs{false}; + Status segment_load_status; + + BoxedSeq segmented_level_slices = + SegmentedLevelScanner{ + segmented_level_node, + segmented_level, + context.page_loader, + llfs::PinPageToJob::kDefault, + segment_load_status, + /*min_pivot_i=*/0} // + | seq::boxed(); + + BATT_ASSIGN_OK_RESULT( // + new_merged_level.result_set, + context.merge_compact_edits( // + global_max_key(), + [&](MergeCompactor& compactor) -> Status { + compactor.push_level(merged_level.result_set.live_edit_slices()); + compactor.push_level(std::move(segmented_level_slices)); + return OkStatus(); + })); + + BATT_REQUIRE_OK(segment_load_status); + + return new_merged_level; +} + //=#=#==#==#===============+=+=+=+=++=++++++++++++++-++-+--+-+----+--------------- //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - diff --git a/src/turtle_kv/tree/in_memory_node.hpp b/src/turtle_kv/tree/in_memory_node.hpp index 7934f60..15e63f4 100644 --- a/src/turtle_kv/tree/in_memory_node.hpp +++ b/src/turtle_kv/tree/in_memory_node.hpp @@ -349,6 +349,12 @@ struct InMemoryNode { //+++++++++++-+-+--+----- --- -- - - - - + static StatusOr merge_segmented_and_merged_level( + BatchUpdateContext& context, // + MergedLevel& merged_level, + SegmentedLevel& segmented_level, + InMemoryNode& segmented_level_node) noexcept; + SmallFn dump() const; u64 compute_active_segmented_levels() const; @@ -543,13 +549,13 @@ struct InMemoryNode { /** \brief Merge the node with one of its siblings and return the newly merged node. */ StatusOr> try_merge(BatchUpdateContext& context, - InMemoryNode& sibling); + InMemoryNode& sibling) noexcept; /** \brief Attempts to make the node (that needs a merge) viable by borrowing data * from one of its siblings. If successful, returns the new pivot key to be set in the parent * of these two nodes to separate them. */ - StatusOr try_borrow(BatchUpdateContext& context, InMemoryNode& sibling); + StatusOr try_borrow(BatchUpdateContext& context, InMemoryNode& sibling) noexcept; /** \brief Splits the specified child, inserting a new pivot immediately after `pivot_i`. */ @@ -557,13 +563,13 @@ struct InMemoryNode { /** \brief Merges the specified child with a sibling. */ - Status merge_child(BatchUpdateContext& update_context, i32 pivot_i); + Status merge_child(BatchUpdateContext& update_context, i32 pivot_i) noexcept; /** \brief If the node has a single pivot, attempts to flush updates out of the update buffer * to grow the number of pivots. If all the updates are flushed and still only a single pivot * remains, the single pivot (child) is returned. */ - StatusOr> flush_and_shrink(BatchUpdateContext& context); + StatusOr> flush_and_shrink(BatchUpdateContext& context) noexcept; /** \brief Returns true iff there are no MergedLevels or unserialized Subtree children in this * node. diff --git a/src/turtle_kv/tree/in_memory_node.test.cpp b/src/turtle_kv/tree/in_memory_node.test.cpp index 4e24c5b..41b26f1 100644 --- a/src/turtle_kv/tree/in_memory_node.test.cpp +++ b/src/turtle_kv/tree/in_memory_node.test.cpp @@ -134,7 +134,7 @@ void verify_table_point_queries(Table& expected_table, Table& actual_table, Rng& } } -/*void verify_range_scan(LatencyMetric* scan_latency, +void verify_range_scan(LatencyMetric* scan_latency, Table& expected_table, const Slice>& actual_read_items, const KeyView& min_key, @@ -167,7 +167,7 @@ void verify_table_point_queries(Table& expected_table, Table& actual_table, Rng& ++expected_item_iter; ++actual_item_iter; } -} */ +} struct SubtreeBatchUpdateScenario { static std::atomic& size_tiered_count() @@ -397,7 +397,7 @@ void SubtreeBatchUpdateScenario::run() pending_deletes.clear(); } - if (i > 0) { + if (i % 5 == 0) { BATT_CHECK(pending_deletes.empty()); for (const EditView& edit : update.result_set.get()) { pending_deletes.emplace_back(edit.key); @@ -458,7 +458,7 @@ void SubtreeBatchUpdateScenario::run() << BATT_INSPECT(this->seed) << BATT_INSPECT(i); { - /*auto root_ptr = std::make_shared(tree.clone_serialized_or_panic()); + auto root_ptr = std::make_shared(tree.clone_serialized_or_panic()); std::unique_ptr scanner_page_job = page_cache->new_job(); const usize scan_len = pick_scan_len(rng); @@ -494,7 +494,7 @@ void SubtreeBatchUpdateScenario::run() as_slice(scan_items_buffer.data(), n_read), min_key, scan_len)) - << BATT_INSPECT(i) << BATT_INSPECT_STR(min_key) << BATT_INSPECT(scan_len); */ + << BATT_INSPECT(i) << BATT_INSPECT_STR(min_key) << BATT_INSPECT(scan_len); } if (my_id == 0) { @@ -516,6 +516,7 @@ TEST(InMemoryNodeTest, SubtreeDeletions) { const usize key_size = 24; const usize value_size = 100; + const usize chi = 4; TreeOptions tree_options = TreeOptions::with_default_values() // .set_leaf_size(32 * kKiB) @@ -532,7 +533,7 @@ TEST(InMemoryNodeTest, SubtreeDeletions) std::string value_str = std::string(value_size, 'a'); ValueView value = ValueView::from_str(value_str); - std::default_random_engine rng{/*seed=*/1}; + std::default_random_engine rng{/*seed=*/3}; RandomStringGenerator generate_key; for (usize i = 0; i < total_batches * items_per_leaf; ++i) { keys.emplace_back(generate_key(rng)); @@ -552,9 +553,11 @@ TEST(InMemoryNodeTest, SubtreeDeletions) /*byte_capacity=*/1500 * kMiB); Subtree tree = Subtree::make_empty(); - ASSERT_TRUE(tree.is_serialized()); + turtle_kv::OrderedMapTable> expected_table; + SubtreeTable actual_table{*page_cache, tree_options, tree}; + batt::WorkerPool& worker_pool = batt::WorkerPool::null_pool(); Optional page_loader{*page_cache}; @@ -590,10 +593,9 @@ TEST(InMemoryNodeTest, SubtreeDeletions) return current_batch; }; - const auto apply_tree_updates = [&](auto batch_creation_func) { + const auto apply_tree_updates = [&](auto batch_creation_func, bool perform_scan) { for (usize i = 0; i < total_batches; ++i) { std::vector current_batch = batch_creation_func(i); - // LOG(INFO) << "current batch: " << i << ", size: " << current_batch.size(); ResultSet result; result.append(std::move(current_batch)); @@ -610,60 +612,119 @@ TEST(InMemoryNodeTest, SubtreeDeletions) }; update.update_edit_size_totals(); - StatusOr tree_height = tree.get_height(*page_loader); - ASSERT_TRUE(tree_height.ok()) << BATT_INSPECT(tree_height); - // LOG(INFO) << "tree height at batch_number " << i << ": " << *tree_height; + Status table_update_status = update_table(expected_table, update.result_set); + ASSERT_TRUE(table_update_status.ok()) << BATT_INSPECT(table_update_status); + + StatusOr tree_height_before = tree.get_height(*page_loader); + ASSERT_TRUE(tree_height_before.ok()) << BATT_INSPECT(tree_height_before); Status status = // tree.apply_batch_update(tree_options, - ParentNodeHeight{*tree_height + 1}, + ParentNodeHeight{*tree_height_before + 1}, update, /*key_upper_bound=*/global_max_key(), IsRoot{true}); ASSERT_TRUE(status.ok()) << BATT_INSPECT(status) << BATT_INSPECT(i); - ASSERT_FALSE(tree.is_serialized()); - ASSERT_FALSE(batt::is_case(tree.get_viability())); - } - }; - apply_tree_updates(create_insertion_batch); + StatusOr tree_height_after = tree.get_height(*page_loader); + ASSERT_TRUE(tree_height_after.ok()) << BATT_INSPECT(tree_height_after); + + if (*tree_height_after == 0) { + ASSERT_LT(*tree_height_after, *tree_height_before); + ASSERT_TRUE(tree.is_serialized()); + break; + } else { + ASSERT_FALSE(tree.is_serialized()); + } + + ASSERT_FALSE(batt::is_case(tree.get_viability())); - std::unique_ptr page_job = page_cache->new_job(); - TreeSerializeContext context{tree_options, *page_job, worker_pool}; + ASSERT_NO_FATAL_FAILURE( + verify_table_point_queries(expected_table, actual_table, rng, batt::log2_ceil(i))) + << BATT_INSPECT(i); - Status start_status = tree.start_serialize(context); - ASSERT_TRUE(start_status.ok()) << BATT_INSPECT(start_status); + if (((i + 1) % chi) == 0) { + LOG(INFO) << "Taking checkpoint..."; - Status build_status = context.build_all_pages(); - ASSERT_TRUE(build_status.ok()) << BATT_INSPECT(build_status); + std::unique_ptr page_job = page_cache->new_job(); + TreeSerializeContext context{tree_options, *page_job, worker_pool}; - StatusOr finish_status = tree.finish_serialize(context); - ASSERT_TRUE(finish_status.ok()) << BATT_INSPECT(finish_status); + Status start_status = tree.start_serialize(context); + ASSERT_TRUE(start_status.ok()) << BATT_INSPECT(start_status); - page_job->new_root(*finish_status); - Status commit_status = llfs::unsafe_commit_job(std::move(page_job)); - ASSERT_TRUE(commit_status.ok()) << BATT_INSPECT(commit_status); + Status build_status = context.build_all_pages(); + ASSERT_TRUE(build_status.ok()) << BATT_INSPECT(build_status); - page_loader.emplace(*page_cache); + StatusOr finish_status = tree.finish_serialize(context); + ASSERT_TRUE(finish_status.ok()) << BATT_INSPECT(finish_status); - apply_tree_updates(create_deletion_batch); + page_job->new_root(*finish_status); + Status commit_status = llfs::unsafe_commit_job(std::move(page_job)); + ASSERT_TRUE(commit_status.ok()) << BATT_INSPECT(commit_status); - StatusOr tree_height = tree.get_height(*page_loader); - ASSERT_TRUE(tree_height.ok()) << BATT_INSPECT(tree_height); + ASSERT_NO_FATAL_FAILURE( + verify_table_point_queries(expected_table, actual_table, rng, batt::log2_ceil(i))) + << BATT_INSPECT(i); + + if (perform_scan) { + auto root_ptr = std::make_shared(tree.clone_serialized_or_panic()); + std::unique_ptr scanner_page_job = page_cache->new_job(); + + const usize scan_len = 20; + std::array, kMaxScanSize> scan_items_buffer; + KeyView min_key = update.result_set.get_min_key(); + + KVStoreScanner kv_scanner{*page_loader, + root_ptr->page_id_slot_or_panic(), + BATT_OK_RESULT_OR_PANIC(root_ptr->get_height(*page_loader)), + min_key, + tree_options.trie_index_sharded_view_size(), + None}; + + usize n_read = 0; + { + BATT_CHECK_OK(kv_scanner.start()); + for (auto& kv_pair : scan_items_buffer) { + Optional item = kv_scanner.next(); + if (!item) { + break; + } + kv_pair.first = item->key; + kv_pair.second = item->value; + ++n_read; + if (n_read == scan_len) { + break; + } + } + } + ASSERT_NO_FATAL_FAILURE(verify_range_scan(nullptr, + expected_table, + as_slice(scan_items_buffer.data(), n_read), + min_key, + scan_len)) + << BATT_INSPECT(i) << BATT_INSPECT_STR(min_key) << BATT_INSPECT(scan_len); + } - /*BatchUpdateContext update_context{ - .worker_pool = worker_pool, - .page_loader = *page_loader, - .cancel_token = batt::CancelToken{}, + page_loader.emplace(*page_cache); + } + } }; - while (*tree_height > 2) { - Status flush_status = tree.try_flush(update_context); - ASSERT_TRUE(flush_status.ok()); - tree_height = tree.get_height(*page_loader); + LOG(INFO) << "Inserting key/value pairs into tree.."; + apply_tree_updates(create_insertion_batch, false); + + LOG(INFO) << "Deleting key/value pairs from tree..."; + for (usize i = 0; i < total_batches; ++i) { + bool perform_scan = i == 0 ? true : false; + StatusOr tree_height = tree.get_height(*page_loader); ASSERT_TRUE(tree_height.ok()) << BATT_INSPECT(tree_height); - } */ + if (*tree_height > 0) { + apply_tree_updates(create_deletion_batch, perform_scan); + } else { + break; + } + } } } // namespace diff --git a/src/turtle_kv/tree/subtree.cpp b/src/turtle_kv/tree/subtree.cpp index 8adfddc..5e99a1d 100644 --- a/src/turtle_kv/tree/subtree.cpp +++ b/src/turtle_kv/tree/subtree.cpp @@ -321,7 +321,7 @@ Status Subtree::split_and_grow(BatchUpdateContext& context, //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // -Status Subtree::flush_and_shrink(BatchUpdateContext& context) +Status Subtree::flush_and_shrink(BatchUpdateContext& context) noexcept { BATT_CHECK(!this->locked_.load()); @@ -345,6 +345,20 @@ Status Subtree::flush_and_shrink(BatchUpdateContext& context) return OkStatus(); } + if (batt::is_case>((*status_or_new_root)->impl_)) { + const auto& leaf_ptr = + std::get>((*status_or_new_root)->impl_); + BATT_CHECK(leaf_ptr); + + // If the new root that is returned is an empty leaf, set the root to be an empty + // subtree. + // + if (!leaf_ptr->get_items_size()) { + this->impl_ = llfs::PageIdSlot::from_page_id(llfs::PageId{}); + return OkStatus(); + } + } + this->impl_ = std::move((*status_or_new_root)->impl_); return OkStatus(); @@ -573,7 +587,8 @@ StatusOr> Subtree::try_split(BatchUpdateContext& context) //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // -StatusOr> Subtree::try_merge(BatchUpdateContext& context, Subtree& sibling) +StatusOr> Subtree::try_merge(BatchUpdateContext& context, + Subtree& sibling) noexcept { BATT_CHECK(!this->locked_.load()); @@ -592,7 +607,7 @@ StatusOr> Subtree::try_merge(BatchUpdateContext& context, Subt BATT_CHECK(sibling_leaf_ptr); BATT_ASSIGN_OK_RESULT(std::unique_ptr merged_leaf, // - leaf->try_merge(context, *sibling_leaf_ptr)); + leaf->try_merge(context, std::move(sibling_leaf_ptr))); return {Subtree{std::move(merged_leaf)}}; }, @@ -615,7 +630,7 @@ StatusOr> Subtree::try_merge(BatchUpdateContext& context, Subt //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // -StatusOr Subtree::try_borrow(BatchUpdateContext& context, Subtree& sibling) +StatusOr Subtree::try_borrow(BatchUpdateContext& context, Subtree& sibling) noexcept { BATT_CHECK(!this->locked_.load()); @@ -762,7 +777,7 @@ void Subtree::lock() // Status Subtree::to_in_memory_subtree(BatchUpdateContext& context, const TreeOptions& tree_options, - i32 height) + i32 height) noexcept { BATT_CHECK_GT(height, 0); @@ -771,7 +786,7 @@ Status Subtree::to_in_memory_subtree(BatchUpdateContext& context, BATT_CHECK(page_id_slot.is_valid()); - llfs::PageLayoutId expected_layout = Subtree::expected_layout_for_height(height); + const llfs::PageLayoutId expected_layout = Subtree::expected_layout_for_height(height); StatusOr status_or_pinned_page = page_id_slot.load_through( context.page_loader, diff --git a/src/turtle_kv/tree/subtree.hpp b/src/turtle_kv/tree/subtree.hpp index 3f883cb..31144ea 100644 --- a/src/turtle_kv/tree/subtree.hpp +++ b/src/turtle_kv/tree/subtree.hpp @@ -140,16 +140,16 @@ class Subtree /** \brief Attempts to merge the given Subtree with one of its siblings. If successful, the * newly merged Subtree is returned. - * + * * If no merge, returns None. */ - StatusOr> try_merge(BatchUpdateContext& context, Subtree& sibling); + StatusOr> try_merge(BatchUpdateContext& context, Subtree& sibling) noexcept; /** \brief Attempts to make the Subtree viable by borrowing data from one of its siblings. * Called when the Subtree needs a merge, but borrowing is the only option to make the tree * viable. */ - StatusOr try_borrow(BatchUpdateContext& context, Subtree& sibling); + StatusOr try_borrow(BatchUpdateContext& context, Subtree& sibling) noexcept; /** \brief Attempt to make the root viable by flushing a batch. */ @@ -189,7 +189,7 @@ class Subtree */ Status to_in_memory_subtree(BatchUpdateContext& context, const TreeOptions& tree_options, - i32 height); + i32 height) noexcept; //+++++++++++-+-+--+----- --- -- - - - - private: @@ -201,7 +201,7 @@ class Subtree * flushes the root's update buffer until its is either empty * (causing the tree to shrink in height) or until it gains more pivots. */ - Status flush_and_shrink(BatchUpdateContext& context); + Status flush_and_shrink(BatchUpdateContext& context) noexcept; //+++++++++++-+-+--+----- --- -- - - - - From 51a4e0264c57ff5c88554aa66b384a6a58c858a3 Mon Sep 17 00:00:00 2001 From: Vidya Silai Date: Mon, 17 Nov 2025 20:38:07 -0500 Subject: [PATCH 06/10] KVStoreScanner updates --- src/turtle_kv/kv_store.cpp | 1 - src/turtle_kv/kv_store_scanner.cpp | 45 ++++++---------------- src/turtle_kv/kv_store_scanner.hpp | 8 +--- src/turtle_kv/tree/in_memory_node.test.cpp | 1 - 4 files changed, 12 insertions(+), 43 deletions(-) diff --git a/src/turtle_kv/kv_store.cpp b/src/turtle_kv/kv_store.cpp index 93b1e5f..b36122b 100644 --- a/src/turtle_kv/kv_store.cpp +++ b/src/turtle_kv/kv_store.cpp @@ -757,7 +757,6 @@ StatusOr KVStore::scan_keys(const KeyView& min_key, this->metrics_.scan_count.add(1); KVStoreScanner scanner{*this, min_key}; - scanner.set_keys_only(true); BATT_REQUIRE_OK(scanner.start()); return scanner.read_keys(items_out); diff --git a/src/turtle_kv/kv_store_scanner.cpp b/src/turtle_kv/kv_store_scanner.cpp index 63fdb5b..b1e8423 100644 --- a/src/turtle_kv/kv_store_scanner.cpp +++ b/src/turtle_kv/kv_store_scanner.cpp @@ -435,17 +435,15 @@ Status KVStoreScanner::set_next_item() ScanLevel* scan_level = this->heap_.first(); if (!this->next_item_) { - this->next_item_.emplace(scan_level->item(this->keys_only_)); + this->next_item_.emplace(scan_level->item()); } else if (this->next_item_->key == scan_level->key) { - if (!this->keys_only_ && this->next_item_->needs_combine()) { + if (this->next_item_->needs_combine()) { this->next_item_->value = combine(this->next_item_->value, scan_level->value()); } } else { - // TODO [vsilai 11-10-2025]: need to fix key only scans to look at values. - // - if (!this->keys_only_ && this->next_item_->value == ValueView::deleted()) { + if (this->next_item_->value == ValueView::deleted()) { this->next_item_ = None; continue; } else { @@ -575,7 +573,7 @@ Status KVStoreScanner::set_next_item() //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // -EditView KVStoreScanner::ScanLevel::item(bool key_only) const +EditView KVStoreScanner::ScanLevel::item() const { return batt::case_of( this->state_impl, @@ -583,58 +581,37 @@ EditView KVStoreScanner::ScanLevel::item(bool key_only) const BATT_PANIC() << "illegal state"; BATT_UNREACHABLE(); }, - [this, key_only](const MemTableScanState& state) -> EditView { + [this](const MemTableScanState& state) -> EditView { MemTableEntry entry; const bool found = state.mem_table_->hash_index().find_key(this->key, entry); BATT_CHECK(found); - if (key_only) { - return EditView{entry.key_, ValueView{}}; - } return EditView{entry.key_, entry.value_}; }, - [this, key_only](const MemTableScanState& state) -> EditView { + [this](const MemTableScanState& state) -> EditView { const MemTableEntry* entry = state.mem_table_->hash_index().unsynchronized_find_key(key); BATT_CHECK_NOT_NULLPTR(entry); - if (key_only) { - return EditView{entry->key_, ValueView{}}; - } return EditView{entry->key_, entry->value_}; }, - [key_only](const MemTableValueScanState& state) -> EditView { + [](const MemTableValueScanState& state) -> EditView { const MemTableValueEntry& entry = state.art_scanner_->get_value(); - if (key_only) { - return EditView{entry.key_view(), ValueView{}}; - } return EditView{entry.key_view(), entry.value_view()}; }, - [key_only](const MemTableValueScanState& state) -> EditView { + [](const MemTableValueScanState& state) -> EditView { const MemTableValueEntry& entry = state.art_scanner_->get_value(); - if (key_only) { - return EditView{entry.key_view(), ValueView{}}; - } return EditView{entry.key_view(), entry.value_view()}; }, [](const Slice& state) -> EditView { return state.front(); }, - [this, key_only](const TreeLevelScanState& state) -> EditView { - if (key_only) { - return EditView{this->key, ValueView{}}; - } + [this](const TreeLevelScanState& state) -> EditView { return EditView{this->key, get_value(state.kv_slice.front())}; }, - [this, key_only](const TreeLevelScanShardedState& state) -> EditView { - if (key_only) { - return EditView{this->key, ValueView{}}; - } + [this](const TreeLevelScanShardedState& state) -> EditView { return EditView{this->key, state.kv_slice.front_value()}; }, - [this, key_only](const ShardedLeafScanState& state) -> EditView { - if (key_only) { - return EditView{this->key, ValueView{}}; - } + [this](const ShardedLeafScanState& state) -> EditView { return EditView{this->key, BATT_OK_RESULT_OR_PANIC(state.leaf_scanner_->front_value())}; }); } diff --git a/src/turtle_kv/kv_store_scanner.hpp b/src/turtle_kv/kv_store_scanner.hpp index dc7d26c..83c9468 100644 --- a/src/turtle_kv/kv_store_scanner.hpp +++ b/src/turtle_kv/kv_store_scanner.hpp @@ -198,7 +198,7 @@ class KVStoreScanner /** \brief Returns the current item as an EditView. */ - EditView item(bool key_only) const; + EditView item() const; /** \brief Returns the value of the current item. */ @@ -306,11 +306,6 @@ class KVStoreScanner StatusOr read_keys(const Slice& buffer); - void set_keys_only(bool b) noexcept - { - this->keys_only_ = b; - } - //+++++++++++-+-+--+----- --- -- - - - -- private: Status validate_page_layout(i32 height, const llfs::PinnedPage& pinned_page); @@ -353,7 +348,6 @@ class KVStoreScanner boost::container::static_vector tree_scan_path_; boost::container::small_vector scan_levels_; StackMerger heap_; - bool keys_only_ = false; Optional sharded_leaf_scanner_; }; diff --git a/src/turtle_kv/tree/in_memory_node.test.cpp b/src/turtle_kv/tree/in_memory_node.test.cpp index 41b26f1..4b56c15 100644 --- a/src/turtle_kv/tree/in_memory_node.test.cpp +++ b/src/turtle_kv/tree/in_memory_node.test.cpp @@ -282,7 +282,6 @@ TEST(InMemoryNodeTest, Subtree) if (n_threads != 0) { runner.n_threads(n_threads); } - runner.n_threads(usize{1}); runner.n_seeds(n_seeds); if (n_seeds < 128) { From b126c03fb1a870fcf46278cce69b3fcdc320a774 Mon Sep 17 00:00:00 2001 From: Vidya Silai Date: Mon, 17 Nov 2025 20:42:44 -0500 Subject: [PATCH 07/10] Subtree test config change --- src/turtle_kv/tree/in_memory_node.test.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/turtle_kv/tree/in_memory_node.test.cpp b/src/turtle_kv/tree/in_memory_node.test.cpp index 4b56c15..80cd6da 100644 --- a/src/turtle_kv/tree/in_memory_node.test.cpp +++ b/src/turtle_kv/tree/in_memory_node.test.cpp @@ -326,7 +326,7 @@ void SubtreeBatchUpdateScenario::run() } TreeOptions tree_options = TreeOptions::with_default_values() // - .set_leaf_size(32 * kKiB) + .set_leaf_size(512 * kKiB) .set_node_size(4 * kKiB) .set_key_size_hint(24) .set_value_size_hint(100) From afa9940e62563cc48ff584e346c840f762e038a5 Mon Sep 17 00:00:00 2001 From: Vidya Silai Date: Fri, 21 Nov 2025 09:48:05 -0500 Subject: [PATCH 08/10] First round of feedback --- src/turtle_kv/core/testing/generate.hpp | 12 ++-- src/turtle_kv/core/testing/generate.test.cpp | 25 +++++++- src/turtle_kv/kv_store_scanner.cpp | 16 ++++- src/turtle_kv/tree/in_memory_node.test.cpp | 2 +- src/turtle_kv/tree/packed_leaf_page.hpp | 61 ++++++++++++++++--- .../tree/testing/random_leaf_generator.hpp | 2 +- 6 files changed, 99 insertions(+), 19 deletions(-) diff --git a/src/turtle_kv/core/testing/generate.hpp b/src/turtle_kv/core/testing/generate.hpp index 06b4c37..4410276 100644 --- a/src/turtle_kv/core/testing/generate.hpp +++ b/src/turtle_kv/core/testing/generate.hpp @@ -188,13 +188,17 @@ class RandomResultSetGenerator : public MinMaxSize DecayToItem, Rng& rng, llfs::StableStringStore& store, - Optional> to_delete = None) + const std::vector& to_delete) { using ResultSet = MergeCompactor::ResultSet; using Item = typename ResultSet::value_type; const usize n = this->Super::pick_size(rng); std::vector items; + + for (const KeyView& delete_key : to_delete) { + items.emplace_back(delete_key, ValueView::deleted()); + } while (items.size() < n) { for (usize i = items.size(); i < n; ++i) { @@ -203,12 +207,6 @@ class RandomResultSetGenerator : public MinMaxSize ValueView::from_str(store.store(std::string(this->value_size_, ch)))); } - if (to_delete) { - for (const KeyView& delete_key : *to_delete) { - items.emplace_back(delete_key, ValueView::deleted()); - } - } - std::sort(items.begin(), items.end(), KeyOrder{}); items.erase(std::unique(items.begin(), items.end(), diff --git a/src/turtle_kv/core/testing/generate.test.cpp b/src/turtle_kv/core/testing/generate.test.cpp index b26c2a7..5076623 100644 --- a/src/turtle_kv/core/testing/generate.test.cpp +++ b/src/turtle_kv/core/testing/generate.test.cpp @@ -7,9 +7,14 @@ namespace { +using batt::int_types::usize; + using turtle_kv::DecayToItem; using turtle_kv::ItemView; using turtle_kv::KeyOrder; +using turtle_kv::KeyView; +using turtle_kv::StatusOr; +using turtle_kv::ValueView; using turtle_kv::testing::RandomResultSetGenerator; template @@ -24,10 +29,28 @@ TEST(GenerateTest, Test) g.set_size(200); - ResultSet result_set = g(DecayToItem{}, rng, store); + std::vector to_delete; + ResultSet result_set = g(DecayToItem{}, rng, store, to_delete); EXPECT_TRUE(std::is_sorted(result_set.get().begin(), result_set.get().end(), KeyOrder{})); EXPECT_EQ(result_set.get().size(), 200u); + + auto result_set_slice = result_set.get(); + usize i = 0; + for (const ItemView& edit : result_set_slice) { + if (i % 2) { + to_delete.emplace_back(edit.key); + } + ++i; + } + + ResultSet result_set_with_deletes = g(DecayToItem{}, rng, store, to_delete); + for (const KeyView& deleted_key : to_delete) { + StatusOr deleted_value = result_set_with_deletes.find_key(deleted_key); + EXPECT_TRUE(deleted_value.ok()); + EXPECT_EQ(*deleted_value, ValueView::deleted()); + } + EXPECT_EQ(to_delete.size(), result_set_with_deletes.size() / 2); } } // namespace diff --git a/src/turtle_kv/kv_store_scanner.cpp b/src/turtle_kv/kv_store_scanner.cpp index b1e8423..f3b5164 100644 --- a/src/turtle_kv/kv_store_scanner.cpp +++ b/src/turtle_kv/kv_store_scanner.cpp @@ -438,12 +438,26 @@ Status KVStoreScanner::set_next_item() this->next_item_.emplace(scan_level->item()); } else if (this->next_item_->key == scan_level->key) { + // If this->next_item_->key == scan_level->key, we need to search for a terminal value for + // the item, so combine it if necessary. + // if (this->next_item_->needs_combine()) { this->next_item_->value = combine(this->next_item_->value, scan_level->value()); } } else { + // If the item stored in this->next_item_ does not have the same key as the first key in + // the current scan_level, we have reached a terminal value for this->next_item_. Now, + // we have to decide whether we want to keep this->next_item_ and break from the loop + // (returning the item to the function's caller) OR discard it, because the terminal value + // represents a deleted item. + // if (this->next_item_->value == ValueView::deleted()) { + // The terminal value represents a deleted item, so discard it by setting this->next_item_ + // to None. Then, continue on to the next iteration of the loop, skipping the logic to + // advance the current scan_level. We do this because we now need to set the first key + // in the current scan_level to this->next_item_ to examine it next. + // this->next_item_ = None; continue; } else { @@ -459,7 +473,7 @@ Status KVStoreScanner::set_next_item() LatencyTimer timer{batt::Every2ToTheConst<8>{}, KVStoreScanner::metrics().heap_remove_latency}; this->heap_.remove_first(); - //this->needs_resume_ = true; + // this->needs_resume_ = true; BATT_REQUIRE_OK(this->resume()); } } diff --git a/src/turtle_kv/tree/in_memory_node.test.cpp b/src/turtle_kv/tree/in_memory_node.test.cpp index 80cd6da..d1b1021 100644 --- a/src/turtle_kv/tree/in_memory_node.test.cpp +++ b/src/turtle_kv/tree/in_memory_node.test.cpp @@ -710,7 +710,7 @@ TEST(InMemoryNodeTest, SubtreeDeletions) } }; - LOG(INFO) << "Inserting key/value pairs into tree.."; + LOG(INFO) << "Inserting key/value pairs into tree..."; apply_tree_updates(create_insertion_batch, false); LOG(INFO) << "Deleting key/value pairs from tree..."; diff --git a/src/turtle_kv/tree/packed_leaf_page.hpp b/src/turtle_kv/tree/packed_leaf_page.hpp index 74e65d3..cb1b9ea 100644 --- a/src/turtle_kv/tree/packed_leaf_page.hpp +++ b/src/turtle_kv/tree/packed_leaf_page.hpp @@ -320,6 +320,8 @@ struct PackedLeafLayoutPlan { usize page_size; usize key_count; usize trie_index_reserved_size; + usize avg_key_len; + usize drop_count; usize trie_index_begin; usize trie_index_end; @@ -361,6 +363,8 @@ struct PackedLeafLayoutPlan { } void check_valid(std::string_view label) const; + + usize compute_trie_step_size() const; }; //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - @@ -370,6 +374,8 @@ BATT_OBJECT_PRINT_IMPL((inline), (page_size, key_count, trie_index_reserved_size, + avg_key_len, + drop_count, trie_index_begin, trie_index_end, leaf_header_begin, @@ -392,6 +398,43 @@ inline void PackedLeafLayoutPlan::check_valid(std::string_view label) const BATT_CHECK(this->is_valid()) << *this << BATT_INSPECT_STR(label); } +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +inline usize PackedLeafLayoutPlan::compute_trie_step_size() const +{ + BATT_CHECK_GT(this->key_count, 0); + BATT_CHECK_GT(this->avg_key_len, 0); + + // If there are no deleted items in this leaf, return 16. + // + if (this->drop_count == 0) { + return 16; + } + + usize trie_buffer_size = this->trie_index_end - this->trie_index_begin; + BATT_CHECK_GT(trie_buffer_size, 0); + + // Determine the number of pivot keys to intialize the trie with by using the size of the trie + // buffer and the average key length across the items in the leaf. + // + usize pivot_count = trie_buffer_size / this->avg_key_len; + usize step_size = (this->key_count + pivot_count - 1) / pivot_count; + + BATT_CHECK_GT(step_size, 0); + + // If the calculated step size is already a power of 2, return it now. + // + if ((step_size & (step_size - 1)) == 0) { + return step_size; + } + + // Otherwise, calculate the nearest power of 2 less than `step_size`. + // + i32 shift = batt::log2_floor(step_size); + BATT_CHECK_GE(shift, 0); + return usize{1} << shift; +} + //=#=#==#==#===============+=+=+=+=++=++++++++++++++-++-+--+-+----+--------------- // class PackedLeafLayoutPlanBuilder @@ -423,6 +466,7 @@ class PackedLeafLayoutPlanBuilder plan.page_size = this->page_size; plan.key_count = BATT_CHECKED_CAST(u32, this->key_count); plan.trie_index_reserved_size = this->trie_index_reserved_size; + plan.avg_key_len = plan.key_count > 0 ? this->key_data_size / plan.key_count : 0; usize offset = 0; const auto append = [&offset](usize size) { @@ -514,19 +558,18 @@ struct AddLeafItemsSummary { LeafItemsSummary operator()(const LeafItemsSummary& prior, const EditView& edit) const noexcept { usize drop_count = prior.drop_count; - if (decays_to_item(edit)) { + if (!decays_to_item(edit)) { drop_count++; } return LeafItemsSummary{ - .drop_count = drop_count, - .key_count = prior.key_count + 1, - .key_data_size = prior.key_data_size + (edit.key.size() + 4), - .value_data_size = prior.value_data_size + (1 + edit.value.size()), + .drop_count = drop_count, + .key_count = prior.key_count + 1, + .key_data_size = prior.key_data_size + (edit.key.size() + 4), + .value_data_size = prior.value_data_size + (1 + edit.value.size()), }; } - LeafItemsSummary operator()(const LeafItemsSummary& prior, - const ItemView& edit) const noexcept + LeafItemsSummary operator()(const LeafItemsSummary& prior, const ItemView& edit) const noexcept { return AddLeafItemsSummary{}(BATT_FORWARD(prior), EditView::from_item_view(edit)); } @@ -565,6 +608,8 @@ template PackedLeafLayoutPlan plan = plan_builder.build(); + plan.drop_count = summary.drop_count; + return plan; } @@ -690,7 +735,7 @@ inline PackedLeafPage* build_leaf_page(MutableBuffer buffer, if (plan.trie_index_reserved_size > 0) { const MutableBuffer trie_buffer{(void*)advance_pointer(buffer.data(), plan.trie_index_begin), plan.trie_index_end - plan.trie_index_begin}; - usize step_size = 16; + usize step_size = plan.compute_trie_step_size(); bool retried = false; batt::SmallVec pivot_keys; for (;;) { diff --git a/src/turtle_kv/tree/testing/random_leaf_generator.hpp b/src/turtle_kv/tree/testing/random_leaf_generator.hpp index 152d707..eb1b74b 100644 --- a/src/turtle_kv/tree/testing/random_leaf_generator.hpp +++ b/src/turtle_kv/tree/testing/random_leaf_generator.hpp @@ -59,7 +59,7 @@ class RandomLeafGenerator // Generate a sorted run of random key/value pairs. // - result.result_set = this->items_generator_(decay_to_items, rng, store); + result.result_set = this->items_generator_(decay_to_items, rng, store, {}); batt::WorkerPool& worker_pool = batt::WorkerPool::null_pool(); From 8dbab04e3b18737c9829a5129ece3aeb9c842e3b Mon Sep 17 00:00:00 2001 From: Vidya Silai Date: Mon, 24 Nov 2025 17:44:46 -0500 Subject: [PATCH 09/10] Improvements to Subtree test --- src/turtle_kv/kv_store.cpp | 9 ++++++++ src/turtle_kv/tree/algo/nodes.hpp | 2 +- src/turtle_kv/tree/in_memory_node.test.cpp | 24 +++++++++++++++++++--- 3 files changed, 31 insertions(+), 4 deletions(-) diff --git a/src/turtle_kv/kv_store.cpp b/src/turtle_kv/kv_store.cpp index b36122b..0713d51 100644 --- a/src/turtle_kv/kv_store.cpp +++ b/src/turtle_kv/kv_store.cpp @@ -653,6 +653,9 @@ StatusOr KVStore::get(const KeyView& key) noexcept /*override*/ if (value) { if (!value->needs_combine()) { this->metrics_.mem_table_get_count.add(1); + if (value->is_delete()) { + return {batt::StatusCode::kNotFound}; + } // VLOG(1) << "found key " << batt::c_str_literal(key) << " in active MemTable"; return *value; } @@ -677,11 +680,17 @@ StatusOr KVStore::get(const KeyView& key) noexcept /*override*/ *value = combine(*value, *delta_value); if (!value->needs_combine()) { this->metrics_.delta_log2_get_count[batt::log2_ceil(observed_deltas_size - i)].add(1); + if (value->is_delete()) { + return {batt::StatusCode::kNotFound}; + } return *value; } } else { if (!delta_value->needs_combine()) { this->metrics_.delta_log2_get_count[batt::log2_ceil(observed_deltas_size - i)].add(1); + if (delta_value->is_delete()) { + return {batt::StatusCode::kNotFound}; + } return *delta_value; } value = delta_value; diff --git a/src/turtle_kv/tree/algo/nodes.hpp b/src/turtle_kv/tree/algo/nodes.hpp index 8bb8fe5..50c066a 100644 --- a/src/turtle_kv/tree/algo/nodes.hpp +++ b/src/turtle_kv/tree/algo/nodes.hpp @@ -179,7 +179,7 @@ struct NodeAlgorithms { BATT_REQUIRE_OK(combine_in_place(&value, subtree_result)); - if (!value) { + if (!value || value->is_delete()) { return {batt::StatusCode::kNotFound}; } diff --git a/src/turtle_kv/tree/in_memory_node.test.cpp b/src/turtle_kv/tree/in_memory_node.test.cpp index d1b1021..211326d 100644 --- a/src/turtle_kv/tree/in_memory_node.test.cpp +++ b/src/turtle_kv/tree/in_memory_node.test.cpp @@ -134,6 +134,16 @@ void verify_table_point_queries(Table& expected_table, Table& actual_table, Rng& } } +void verify_deleted_point_queries(Table& expected_table, + Table& actual_table, + const std::vector& deleted_keys) +{ + for (const KeyView& key : deleted_keys) { + EXPECT_EQ(expected_table.get(key).status(), batt::StatusCode::kNotFound); + EXPECT_EQ(actual_table.get(key).status(), batt::StatusCode::kNotFound); + } +} + void verify_range_scan(LatencyMetric* scan_latency, Table& expected_table, const Slice>& actual_read_items, @@ -427,6 +437,10 @@ void SubtreeBatchUpdateScenario::run() verify_table_point_queries(expected_table, actual_table, rng, batt::log2_ceil(i))) << BATT_INSPECT(this->seed) << BATT_INSPECT(i); + ASSERT_NO_FATAL_FAILURE( + verify_deleted_point_queries(expected_table, actual_table, pending_deletes)) + << BATT_INSPECT(this->seed) << BATT_INSPECT(i); + if (((i + 1) % chi) == 0) { if (my_id == 0) { LOG(INFO) << "taking checkpoint..."; @@ -456,6 +470,10 @@ void SubtreeBatchUpdateScenario::run() verify_table_point_queries(expected_table, actual_table, rng, batt::log2_ceil(i))) << BATT_INSPECT(this->seed) << BATT_INSPECT(i); + ASSERT_NO_FATAL_FAILURE( + verify_deleted_point_queries(expected_table, actual_table, pending_deletes)) + << BATT_INSPECT(this->seed) << BATT_INSPECT(i); + { auto root_ptr = std::make_shared(tree.clone_serialized_or_panic()); std::unique_ptr scanner_page_job = page_cache->new_job(); @@ -663,8 +681,8 @@ TEST(InMemoryNodeTest, SubtreeDeletions) ASSERT_TRUE(commit_status.ok()) << BATT_INSPECT(commit_status); ASSERT_NO_FATAL_FAILURE( - verify_table_point_queries(expected_table, actual_table, rng, batt::log2_ceil(i))) - << BATT_INSPECT(i); + verify_table_point_queries(expected_table, actual_table, rng, batt::log2_ceil(i))) + << BATT_INSPECT(i); if (perform_scan) { auto root_ptr = std::make_shared(tree.clone_serialized_or_panic()); @@ -702,7 +720,7 @@ TEST(InMemoryNodeTest, SubtreeDeletions) as_slice(scan_items_buffer.data(), n_read), min_key, scan_len)) - << BATT_INSPECT(i) << BATT_INSPECT_STR(min_key) << BATT_INSPECT(scan_len); + << BATT_INSPECT(i) << BATT_INSPECT_STR(min_key) << BATT_INSPECT(scan_len); } page_loader.emplace(*page_cache); From 969efc0788cb175a4899f8c043eafc1140a7eb5f Mon Sep 17 00:00:00 2001 From: Vidya Silai Date: Fri, 28 Nov 2025 11:48:42 -0500 Subject: [PATCH 10/10] More feedback --- src/turtle_kv/core/merge_compactor.test.cpp | 51 ++++++++ src/turtle_kv/core/testing/generate.hpp | 18 ++- src/turtle_kv/kv_store.cpp | 33 ++--- src/turtle_kv/kv_store_scanner.cpp | 6 +- src/turtle_kv/tree/batch_update.cpp | 121 ------------------ src/turtle_kv/tree/batch_update.hpp | 130 ++++++++++++++++++-- src/turtle_kv/tree/in_memory_node.test.cpp | 20 +-- src/turtle_kv/tree/subtree.cpp | 9 +- 8 files changed, 220 insertions(+), 168 deletions(-) diff --git a/src/turtle_kv/core/merge_compactor.test.cpp b/src/turtle_kv/core/merge_compactor.test.cpp index 651371a..af42abc 100644 --- a/src/turtle_kv/core/merge_compactor.test.cpp +++ b/src/turtle_kv/core/merge_compactor.test.cpp @@ -5,6 +5,7 @@ #include #include +#include #include #include @@ -23,6 +24,8 @@ using namespace batt::int_types; using batt::as_seq; using batt::WorkerPool; +using llfs::StableStringStore; + using turtle_kv::CInterval; using turtle_kv::EditSlice; using turtle_kv::EditView; @@ -39,6 +42,8 @@ using turtle_kv::Status; using turtle_kv::StatusOr; using turtle_kv::ValueView; +using turtle_kv::testing::RandomStringGenerator; + namespace seq = turtle_kv::seq; constexpr usize kNumKeys = 16; @@ -482,4 +487,50 @@ TEST(MergeCompactor, ResultSetDropKeyRange) } } +TEST(MergeCompactor, ResultSetConcat) +{ + usize n = 200; + std::vector all_edits; + std::unordered_set keys_set; + llfs::StableStringStore store; + + std::string value_str = std::string(100, 'a'); + ValueView value = ValueView::from_str(value_str); + + std::default_random_engine rng{/*seed=*/30}; + RandomStringGenerator generate_key; + while (all_edits.size() < n) { + KeyView key = generate_key(rng, store); + if (keys_set.contains(key)) { + continue; + } + keys_set.emplace(key); + all_edits.emplace_back(key, value); + } + std::sort(all_edits.begin(), all_edits.end(), KeyOrder{}); + + std::vector first{all_edits.begin(), all_edits.begin() + (n / 2)}; + std::vector second{all_edits.begin() + (n / 2), all_edits.end()}; + + MergeCompactor::ResultSet first_result_set; + first_result_set.append(std::move(first)); + MergeCompactor::ResultSet second_result_set; + second_result_set.append(std::move(second)); + + EXPECT_EQ(first_result_set.size(), n / 2); + EXPECT_EQ(second_result_set.size(), n / 2); + + MergeCompactor::ResultSet concatenated_result_set = + MergeCompactor::ResultSet::concat(std::move(first_result_set), + std::move(second_result_set)); + + EXPECT_EQ(concatenated_result_set.size(), n); + + usize i = 0; + for (const EditView& edit : concatenated_result_set.get()) { + EXPECT_EQ(edit, all_edits[i]); + ++i; + } +} + } // namespace diff --git a/src/turtle_kv/core/testing/generate.hpp b/src/turtle_kv/core/testing/generate.hpp index 4410276..c48c112 100644 --- a/src/turtle_kv/core/testing/generate.hpp +++ b/src/turtle_kv/core/testing/generate.hpp @@ -12,6 +12,7 @@ #include #include #include +#include #include namespace turtle_kv { @@ -195,16 +196,27 @@ class RandomResultSetGenerator : public MinMaxSize const usize n = this->Super::pick_size(rng); std::vector items; - + for (const KeyView& delete_key : to_delete) { items.emplace_back(delete_key, ValueView::deleted()); } + if (items.size() > n) { + ResultSet result; + result.append(std::move(items)); + return result; + } + std::unordered_set deleted_items_set{to_delete.begin(), to_delete.end()}; while (items.size() < n) { - for (usize i = items.size(); i < n; ++i) { + for (usize i = items.size(); i < n;) { char ch = '_' + (i & 31); - items.emplace_back(this->key_generator_(rng, store), + KeyView key = this->key_generator_(rng, store); + if (deleted_items_set.count(key)) { + continue; + } + items.emplace_back(key, ValueView::from_str(store.store(std::string(this->value_size_, ch)))); + ++i; } std::sort(items.begin(), items.end(), KeyOrder{}); diff --git a/src/turtle_kv/kv_store.cpp b/src/turtle_kv/kv_store.cpp index 0713d51..b324d8d 100644 --- a/src/turtle_kv/kv_store.cpp +++ b/src/turtle_kv/kv_store.cpp @@ -650,14 +650,19 @@ StatusOr KVStore::get(const KeyView& key) noexcept /*override*/ this->metrics_.mem_table_get_latency, observed_state->mem_table_->get(key)); + const auto return_memtable_value = + [](Optional mem_table_value, + FastCountMetric& get_count_metric) -> StatusOr { + get_count_metric.add(1); + if (mem_table_value->is_delete()) { + return {batt::StatusCode::kNotFound}; + } + return *mem_table_value; + }; + if (value) { if (!value->needs_combine()) { - this->metrics_.mem_table_get_count.add(1); - if (value->is_delete()) { - return {batt::StatusCode::kNotFound}; - } - // VLOG(1) << "found key " << batt::c_str_literal(key) << " in active MemTable"; - return *value; + return return_memtable_value(value, this->metrics_.mem_table_get_count); } } @@ -679,19 +684,15 @@ StatusOr KVStore::get(const KeyView& key) noexcept /*override*/ if (value) { *value = combine(*value, *delta_value); if (!value->needs_combine()) { - this->metrics_.delta_log2_get_count[batt::log2_ceil(observed_deltas_size - i)].add(1); - if (value->is_delete()) { - return {batt::StatusCode::kNotFound}; - } - return *value; + return return_memtable_value( + value, + this->metrics_.delta_log2_get_count[batt::log2_ceil(observed_deltas_size - i)]); } } else { if (!delta_value->needs_combine()) { - this->metrics_.delta_log2_get_count[batt::log2_ceil(observed_deltas_size - i)].add(1); - if (delta_value->is_delete()) { - return {batt::StatusCode::kNotFound}; - } - return *delta_value; + return return_memtable_value( + delta_value, + this->metrics_.delta_log2_get_count[batt::log2_ceil(observed_deltas_size - i)]); } value = delta_value; } diff --git a/src/turtle_kv/kv_store_scanner.cpp b/src/turtle_kv/kv_store_scanner.cpp index f3b5164..336b152 100644 --- a/src/turtle_kv/kv_store_scanner.cpp +++ b/src/turtle_kv/kv_store_scanner.cpp @@ -459,6 +459,9 @@ Status KVStoreScanner::set_next_item() // in the current scan_level to this->next_item_ to examine it next. // this->next_item_ = None; + if (this->needs_resume_) { + BATT_REQUIRE_OK(this->resume()); + } continue; } else { break; @@ -473,8 +476,7 @@ Status KVStoreScanner::set_next_item() LatencyTimer timer{batt::Every2ToTheConst<8>{}, KVStoreScanner::metrics().heap_remove_latency}; this->heap_.remove_first(); - // this->needs_resume_ = true; - BATT_REQUIRE_OK(this->resume()); + this->needs_resume_ = true; } } diff --git a/src/turtle_kv/tree/batch_update.cpp b/src/turtle_kv/tree/batch_update.cpp index 8e5675d..262564d 100644 --- a/src/turtle_kv/tree/batch_update.cpp +++ b/src/turtle_kv/tree/batch_update.cpp @@ -13,127 +13,6 @@ void BatchUpdate::update_edit_size_totals() this->context.compute_running_total(this->result_set)); } -//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - -// -void BatchUpdate::update_edit_size_totals_decayed( - const MergeCompactor::ResultSet& decayed_result_set) -{ - this->edit_size_totals.emplace( - this->context.compute_running_total(decayed_result_set)); -} - -//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - -// -void BatchUpdate::decay_batch_to_items( - MergeCompactor::ResultSet& output_result_set) -{ - const batt::TaskCount max_tasks{this->context.worker_pool.size() + 1}; - std::vector decayed_items; - - if (max_tasks == 1) { - for (const EditView& edit : this->result_set.get()) { - Optional maybe_item = to_item_view(edit); - if (maybe_item) { - decayed_items.emplace_back(EditView::from_item_view(*maybe_item)); - } - } - } else { - const ParallelAlgoDefaults& algo_defaults = parallel_algo_defaults(); - - auto actual_edits = result_set.get(); - const auto src_begin = actual_edits.begin(); - const auto src_end = actual_edits.end(); - - const batt::WorkSlicePlan plan{batt::WorkSliceParams{ - algo_defaults.copy_decayed_items.min_task_size, - max_tasks, - }, - src_begin, - src_end}; - - BATT_CHECK_GT(plan.n_tasks, 0); - - batt::SmallVec output_size_per_shard(plan.n_tasks); - BATT_CHECK_EQ(output_size_per_shard.size(), plan.n_tasks); - - // First count the number of non-decayed items in the output for each shard. - { - batt::ScopedWorkContext work_context{this->context.worker_pool}; - - BATT_CHECK_OK(batt::slice_work( - work_context, - plan, - /*gen_work_fn=*/ - [&](usize task_index, isize task_offset, isize task_size) { - return [src_begin, task_index, task_offset, task_size, &output_size_per_shard] { - BATT_CHECK_LT(task_index, output_size_per_shard.size()); - - auto task_src_begin = std::next(src_begin, task_offset); - const auto task_src_end = std::next(task_src_begin, task_size); - - usize output_size = 0; - - for (; task_src_begin != task_src_end; ++task_src_begin) { - if (decays_to_item(*task_src_begin)) { - output_size += 1; - } - } - output_size_per_shard[task_index] = output_size; - }; - })) - << "worker_pool must not be closed!"; - } - - // Change to a rolling sum and do the actual copy. - // - usize output_total_size = 0; - batt::SmallVec output_shard_offset; - for (usize output_shard_size : output_size_per_shard) { - output_shard_offset.emplace_back(output_total_size); - output_total_size += output_shard_size; - } - - decayed_items.resize(output_total_size); - { - this->context.worker_pool.reset(); - - batt::ScopedWorkContext work_context{this->context.worker_pool}; - - BATT_CHECK_OK( - batt::slice_work(work_context, - plan, - /*gen_work_fn=*/ - [&](usize task_index, isize task_offset, isize task_size) { - return [src_begin, - &output_shard_offset, - &output_size_per_shard, - task_index, - task_offset, - task_size, - &decayed_items] { - auto task_src_begin = std::next(src_begin, task_offset); - const auto task_src_end = std::next(task_src_begin, task_size); - - BATT_CHECK_LT(task_index, output_shard_offset.size()); - auto task_dst_begin = - std::next(decayed_items.data(), output_shard_offset[task_index]); - - for (; task_src_begin != task_src_end; ++task_src_begin) { - Optional maybe_item = to_item_view(*task_src_begin); - if (maybe_item) { - *task_dst_begin = EditView::from_item_view(*maybe_item); - ++task_dst_begin; - } - } - }; - })) - << "worker_pool must not be closed!"; - } - } - - output_result_set.append(std::move(decayed_items)); -} - //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // usize BatchUpdate::get_byte_size() diff --git a/src/turtle_kv/tree/batch_update.hpp b/src/turtle_kv/tree/batch_update.hpp index 7eeab1d..2b5150b 100644 --- a/src/turtle_kv/tree/batch_update.hpp +++ b/src/turtle_kv/tree/batch_update.hpp @@ -42,6 +42,12 @@ struct BatchUpdateContext { { return ::turtle_kv::compute_running_total(this->worker_pool, result_set); } + + /** \brief Returns a `ResultSet` with only the edits from the batch passed into the function + * that decay to base-level items (e.g., no tombstones). + */ + MergeCompactor::ResultSet decay_batch_to_items( + MergeCompactor::ResultSet& batch); }; //=#=#==#==#===============+=+=+=+=++=++++++++++++++-++-+--+-+----+--------------- @@ -64,16 +70,6 @@ struct BatchUpdate { */ void update_edit_size_totals(); - /** \brief Resets `this->edit_size_totals` to reflect the decayed version of `this->result_set`. - */ - void update_edit_size_totals_decayed( - const MergeCompactor::ResultSet& decayed_result_set); - - /** \brief Fills the output buffer `ResultSet` passed into the function with only the - * edits from this batch that decay to base-level items (e.g., no tombstones). - */ - void decay_batch_to_items(MergeCompactor::ResultSet& output_result_set); - /** \brief Returns the inclusive (closed) interval of keys in this batch. */ CInterval get_key_crange() const @@ -118,4 +114,118 @@ inline StatusOr> BatchUpdateContext::me return compactor.read(edit_buffer, max_key); } +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +inline MergeCompactor::ResultSet BatchUpdateContext::decay_batch_to_items( + MergeCompactor::ResultSet& batch) +{ + const batt::TaskCount max_tasks{this->worker_pool.size() + 1}; + std::vector decayed_items; + + if (max_tasks == 1) { + for (const EditView& edit : batch.get()) { + Optional maybe_item = to_item_view(edit); + if (maybe_item) { + decayed_items.emplace_back(EditView::from_item_view(*maybe_item)); + } + } + } else { + const ParallelAlgoDefaults& algo_defaults = parallel_algo_defaults(); + + auto actual_edits = batch.get(); + const auto src_begin = actual_edits.begin(); + const auto src_end = actual_edits.end(); + + const batt::WorkSlicePlan plan{batt::WorkSliceParams{ + algo_defaults.copy_decayed_items.min_task_size, + max_tasks, + }, + src_begin, + src_end}; + + BATT_CHECK_GT(plan.n_tasks, 0); + + batt::SmallVec output_size_per_shard(plan.n_tasks); + BATT_CHECK_EQ(output_size_per_shard.size(), plan.n_tasks); + + // First count the number of non-decayed items in the output for each shard. + { + batt::ScopedWorkContext work_context{this->worker_pool}; + + BATT_CHECK_OK(batt::slice_work( + work_context, + plan, + /*gen_work_fn=*/ + [&](usize task_index, isize task_offset, isize task_size) { + return [src_begin, task_index, task_offset, task_size, &output_size_per_shard] { + BATT_CHECK_LT(task_index, output_size_per_shard.size()); + + auto task_src_begin = std::next(src_begin, task_offset); + const auto task_src_end = std::next(task_src_begin, task_size); + + usize output_size = 0; + + for (; task_src_begin != task_src_end; ++task_src_begin) { + if (decays_to_item(*task_src_begin)) { + output_size += 1; + } + } + output_size_per_shard[task_index] = output_size; + }; + })) + << "worker_pool must not be closed!"; + } + + // Change to a rolling sum and do the actual copy. + // + usize output_total_size = 0; + batt::SmallVec output_shard_offset; + for (usize output_shard_size : output_size_per_shard) { + output_shard_offset.emplace_back(output_total_size); + output_total_size += output_shard_size; + } + + decayed_items.resize(output_total_size); + { + this->worker_pool.reset(); + + batt::ScopedWorkContext work_context{this->worker_pool}; + + BATT_CHECK_OK( + batt::slice_work(work_context, + plan, + /*gen_work_fn=*/ + [&](usize task_index, isize task_offset, isize task_size) { + return [src_begin, + &output_shard_offset, + &output_size_per_shard, + task_index, + task_offset, + task_size, + &decayed_items] { + auto task_src_begin = std::next(src_begin, task_offset); + const auto task_src_end = std::next(task_src_begin, task_size); + + BATT_CHECK_LT(task_index, output_shard_offset.size()); + auto task_dst_begin = + std::next(decayed_items.data(), output_shard_offset[task_index]); + + for (; task_src_begin != task_src_end; ++task_src_begin) { + Optional maybe_item = to_item_view(*task_src_begin); + if (maybe_item) { + *task_dst_begin = EditView::from_item_view(*maybe_item); + ++task_dst_begin; + } + } + }; + })) + << "worker_pool must not be closed!"; + } + } + + MergeCompactor::ResultSet output_result_set; + output_result_set.append(std::move(decayed_items)); + return output_result_set; +} + } // namespace turtle_kv diff --git a/src/turtle_kv/tree/in_memory_node.test.cpp b/src/turtle_kv/tree/in_memory_node.test.cpp index 211326d..8e9f1db 100644 --- a/src/turtle_kv/tree/in_memory_node.test.cpp +++ b/src/turtle_kv/tree/in_memory_node.test.cpp @@ -544,7 +544,7 @@ TEST(InMemoryNodeTest, SubtreeDeletions) usize items_per_leaf = tree_options.flush_size() / tree_options.expected_item_size(); usize total_batches = 81; - std::vector keys; + std::vector keys; keys.reserve(total_batches * items_per_leaf); std::string value_str = std::string(value_size, 'a'); @@ -552,17 +552,17 @@ TEST(InMemoryNodeTest, SubtreeDeletions) std::default_random_engine rng{/*seed=*/3}; RandomStringGenerator generate_key; - for (usize i = 0; i < total_batches * items_per_leaf; ++i) { - keys.emplace_back(generate_key(rng)); + llfs::StableStringStore store; + std::unordered_set keys_set; + while (keys.size() < total_batches * items_per_leaf) { + KeyView key = generate_key(rng, store); + if (keys_set.contains(key)) { + continue; + } + keys_set.emplace(key); + keys.emplace_back(key); } std::sort(keys.begin(), keys.end(), llfs::KeyOrder{}); - keys.erase(std::unique(keys.begin(), - keys.end(), - [](const auto& l, const auto& r) { - return get_key(l) == get_key(r); - }), - keys.end()); - BATT_CHECK_EQ(keys.size(), total_batches * items_per_leaf); std::shared_ptr page_cache = make_memory_page_cache(batt::Runtime::instance().default_scheduler(), diff --git a/src/turtle_kv/tree/subtree.cpp b/src/turtle_kv/tree/subtree.cpp index 5e99a1d..fb85510 100644 --- a/src/turtle_kv/tree/subtree.cpp +++ b/src/turtle_kv/tree/subtree.cpp @@ -139,13 +139,10 @@ Status Subtree::apply_batch_update(const TreeOptions& tree_options, auto new_leaf = std::make_unique(llfs::PinnedPage{}, tree_options); - update.decay_batch_to_items(new_leaf->result_set); + new_leaf->result_set = std::move(update.context.decay_batch_to_items(update.result_set)); - if (!update.edit_size_totals) { - update.update_edit_size_totals_decayed(new_leaf->result_set); - } - - new_leaf->set_edit_size_totals(std::move(*update.edit_size_totals)); + new_leaf->set_edit_size_totals( + update.context.compute_running_total(new_leaf->result_set)); update.edit_size_totals = None; return Subtree{std::move(new_leaf)};