Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/turtle_kv/core/algo/compute_running_total.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ namespace turtle_kv {

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
template <bool kDecayValue = false>
template <bool kDecayValue>
inline batt::RunningTotal compute_running_total(
batt::WorkerPool& worker_pool,
const MergeCompactor::ResultSet</*decay_to_items=*/false>& result_set,
const MergeCompactor::ResultSet<kDecayValue>& result_set,
DecayToItem<kDecayValue> decay_to_item [[maybe_unused]] = {})
{
auto merged_edits = result_set.get();
Expand Down
2 changes: 2 additions & 0 deletions src/turtle_kv/core/merge_compactor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,8 @@ template <bool kDecayToItems>
chunk_from_second.offset += first_size;
});

ans.chunks_.back().offset = first_size + second.chunks_.back().offset;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add regression test for this fix.


first.clear();
second.clear();

Expand Down
51 changes: 51 additions & 0 deletions src/turtle_kv/core/merge_compactor.test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>

#include <turtle_kv/core/testing/generate.hpp>
#include <turtle_kv/import/env.hpp>

#include <batteries/stream_util.hpp>
Expand All @@ -23,6 +24,8 @@ using namespace batt::int_types;
using batt::as_seq;
using batt::WorkerPool;

using llfs::StableStringStore;

using turtle_kv::CInterval;
using turtle_kv::EditSlice;
using turtle_kv::EditView;
Expand All @@ -39,6 +42,8 @@ using turtle_kv::Status;
using turtle_kv::StatusOr;
using turtle_kv::ValueView;

using turtle_kv::testing::RandomStringGenerator;

namespace seq = turtle_kv::seq;

constexpr usize kNumKeys = 16;
Expand Down Expand Up @@ -482,4 +487,50 @@ TEST(MergeCompactor, ResultSetDropKeyRange)
}
}

TEST(MergeCompactor, ResultSetConcat)
{
usize n = 200;
std::vector<EditView> all_edits;
std::unordered_set<KeyView> keys_set;
llfs::StableStringStore store;

std::string value_str = std::string(100, 'a');
ValueView value = ValueView::from_str(value_str);

std::default_random_engine rng{/*seed=*/30};
RandomStringGenerator generate_key;
while (all_edits.size() < n) {
KeyView key = generate_key(rng, store);
if (keys_set.contains(key)) {
continue;
}
keys_set.emplace(key);
all_edits.emplace_back(key, value);
}
std::sort(all_edits.begin(), all_edits.end(), KeyOrder{});

std::vector<EditView> first{all_edits.begin(), all_edits.begin() + (n / 2)};
std::vector<EditView> second{all_edits.begin() + (n / 2), all_edits.end()};

MergeCompactor::ResultSet<false> first_result_set;
first_result_set.append(std::move(first));
MergeCompactor::ResultSet<false> second_result_set;
second_result_set.append(std::move(second));

EXPECT_EQ(first_result_set.size(), n / 2);
EXPECT_EQ(second_result_set.size(), n / 2);

MergeCompactor::ResultSet<false> concatenated_result_set =
MergeCompactor::ResultSet<false>::concat(std::move(first_result_set),
std::move(second_result_set));

EXPECT_EQ(concatenated_result_set.size(), n);

usize i = 0;
for (const EditView& edit : concatenated_result_set.get()) {
EXPECT_EQ(edit, all_edits[i]);
++i;
}
}

} // namespace
28 changes: 24 additions & 4 deletions src/turtle_kv/core/testing/generate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <random>
#include <string>
#include <string_view>
#include <unordered_set>
#include <vector>

namespace turtle_kv {
Expand Down Expand Up @@ -184,21 +185,40 @@ class RandomResultSetGenerator : public MinMaxSize<usize{1} << 24>
}

template <bool kDecayToItems, typename Rng>
MergeCompactor::ResultSet</*kDecayToItems=*/kDecayToItems>
operator()(DecayToItem<kDecayToItems>, Rng& rng, llfs::StableStringStore& store)
MergeCompactor::ResultSet</*kDecayToItems=*/kDecayToItems> operator()(
DecayToItem<kDecayToItems>,
Rng& rng,
llfs::StableStringStore& store,
const std::vector<KeyView>& to_delete)
{
using ResultSet = MergeCompactor::ResultSet</*kDecayToItems=*/kDecayToItems>;
using Item = typename ResultSet::value_type;

const usize n = this->Super::pick_size(rng);
std::vector<EditView> items;

for (const KeyView& delete_key : to_delete) {
items.emplace_back(delete_key, ValueView::deleted());
}
if (items.size() > n) {
ResultSet result;
result.append(std::move(items));
return result;
}

std::unordered_set<KeyView> deleted_items_set{to_delete.begin(), to_delete.end()};
while (items.size() < n) {
for (usize i = items.size(); i < n; ++i) {
for (usize i = items.size(); i < n;) {
char ch = '_' + (i & 31);
items.emplace_back(this->key_generator_(rng, store),
KeyView key = this->key_generator_(rng, store);
if (deleted_items_set.count(key)) {
continue;
}
items.emplace_back(key,
ValueView::from_str(store.store(std::string(this->value_size_, ch))));
++i;
}

std::sort(items.begin(), items.end(), KeyOrder{});
items.erase(std::unique(items.begin(),
items.end(),
Expand Down
25 changes: 24 additions & 1 deletion src/turtle_kv/core/testing/generate.test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@

namespace {

using batt::int_types::usize;

using turtle_kv::DecayToItem;
using turtle_kv::ItemView;
using turtle_kv::KeyOrder;
using turtle_kv::KeyView;
using turtle_kv::StatusOr;
using turtle_kv::ValueView;
using turtle_kv::testing::RandomResultSetGenerator;

template <bool kDecayToItems>
Expand All @@ -24,10 +29,28 @@ TEST(GenerateTest, Test)

g.set_size(200);

ResultSet<true> result_set = g(DecayToItem<true>{}, rng, store);
std::vector<KeyView> to_delete;
ResultSet<true> result_set = g(DecayToItem<true>{}, rng, store, to_delete);

EXPECT_TRUE(std::is_sorted(result_set.get().begin(), result_set.get().end(), KeyOrder{}));
EXPECT_EQ(result_set.get().size(), 200u);

auto result_set_slice = result_set.get();
usize i = 0;
for (const ItemView& edit : result_set_slice) {
if (i % 2) {
to_delete.emplace_back(edit.key);
}
++i;
}

ResultSet<false> result_set_with_deletes = g(DecayToItem<false>{}, rng, store, to_delete);
for (const KeyView& deleted_key : to_delete) {
StatusOr<ValueView> deleted_value = result_set_with_deletes.find_key(deleted_key);
EXPECT_TRUE(deleted_value.ok());
EXPECT_EQ(*deleted_value, ValueView::deleted());
}
EXPECT_EQ(to_delete.size(), result_set_with_deletes.size() / 2);
}

} // namespace
29 changes: 18 additions & 11 deletions src/turtle_kv/kv_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -650,11 +650,19 @@ StatusOr<ValueView> KVStore::get(const KeyView& key) noexcept /*override*/
this->metrics_.mem_table_get_latency,
observed_state->mem_table_->get(key));

const auto return_memtable_value =
[](Optional<ValueView> mem_table_value,
FastCountMetric<u64>& get_count_metric) -> StatusOr<ValueView> {
get_count_metric.add(1);
if (mem_table_value->is_delete()) {
return {batt::StatusCode::kNotFound};
}
return *mem_table_value;
};

if (value) {
if (!value->needs_combine()) {
this->metrics_.mem_table_get_count.add(1);
// VLOG(1) << "found key " << batt::c_str_literal(key) << " in active MemTable";
return *value;
return return_memtable_value(value, this->metrics_.mem_table_get_count);
}
}

Expand All @@ -676,13 +684,15 @@ StatusOr<ValueView> KVStore::get(const KeyView& key) noexcept /*override*/
if (value) {
*value = combine(*value, *delta_value);
if (!value->needs_combine()) {
this->metrics_.delta_log2_get_count[batt::log2_ceil(observed_deltas_size - i)].add(1);
return *value;
return return_memtable_value(
value,
this->metrics_.delta_log2_get_count[batt::log2_ceil(observed_deltas_size - i)]);
}
} else {
if (!delta_value->needs_combine()) {
this->metrics_.delta_log2_get_count[batt::log2_ceil(observed_deltas_size - i)].add(1);
return *delta_value;
return return_memtable_value(
delta_value,
this->metrics_.delta_log2_get_count[batt::log2_ceil(observed_deltas_size - i)]);
}
value = delta_value;
}
Expand Down Expand Up @@ -757,7 +767,6 @@ StatusOr<usize> KVStore::scan_keys(const KeyView& min_key,
this->metrics_.scan_count.add(1);

KVStoreScanner scanner{*this, min_key};
scanner.set_keys_only(true);
BATT_REQUIRE_OK(scanner.start());

return scanner.read_keys(items_out);
Expand All @@ -766,9 +775,7 @@ StatusOr<usize> KVStore::scan_keys(const KeyView& min_key,
//
Status KVStore::remove(const KeyView& key) noexcept /*override*/
{
(void)key;

return batt::StatusCode::kUnimplemented;
return this->put(key, ValueView::deleted());
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
Expand Down
65 changes: 33 additions & 32 deletions src/turtle_kv/kv_store_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -435,15 +435,37 @@ Status KVStoreScanner::set_next_item()
ScanLevel* scan_level = this->heap_.first();

if (!this->next_item_) {
this->next_item_.emplace(scan_level->item(this->keys_only_));
this->next_item_.emplace(scan_level->item());

} else if (this->next_item_->key == scan_level->key) {
if (!this->keys_only_ && this->next_item_->needs_combine()) {
// If this->next_item_->key == scan_level->key, we need to search for a terminal value for
// the item, so combine it if necessary.
//
if (this->next_item_->needs_combine()) {
this->next_item_->value = combine(this->next_item_->value, scan_level->value());
}

} else {
break;
// If the item stored in this->next_item_ does not have the same key as the first key in
// the current scan_level, we have reached a terminal value for this->next_item_. Now,
// we have to decide whether we want to keep this->next_item_ and break from the loop
// (returning the item to the function's caller) OR discard it, because the terminal value
// represents a deleted item.
//
if (this->next_item_->value == ValueView::deleted()) {
// The terminal value represents a deleted item, so discard it by setting this->next_item_
// to None. Then, continue on to the next iteration of the loop, skipping the logic to
// advance the current scan_level. We do this because we now need to set the first key
// in the current scan_level to this->next_item_ to examine it next.
//
this->next_item_ = None;
if (this->needs_resume_) {
BATT_REQUIRE_OK(this->resume());
}
continue;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here should be the if needs resume, resume. And we can keep the flag, just call resume once when going from key to different key.

} else {
break;
}
}

if (scan_level->advance()) {
Expand Down Expand Up @@ -567,66 +589,45 @@ Status KVStoreScanner::set_next_item()

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
EditView KVStoreScanner::ScanLevel::item(bool key_only) const
EditView KVStoreScanner::ScanLevel::item() const
{
return batt::case_of(
this->state_impl,
[](NoneType) -> EditView {
BATT_PANIC() << "illegal state";
BATT_UNREACHABLE();
},
[this, key_only](const MemTableScanState<ARTBase::Synchronized::kTrue>& state) -> EditView {
[this](const MemTableScanState<ARTBase::Synchronized::kTrue>& state) -> EditView {
MemTableEntry entry;
const bool found = state.mem_table_->hash_index().find_key(this->key, entry);
BATT_CHECK(found);

if (key_only) {
return EditView{entry.key_, ValueView{}};
}
return EditView{entry.key_, entry.value_};
},
[this, key_only](const MemTableScanState<ARTBase::Synchronized::kFalse>& state) -> EditView {
[this](const MemTableScanState<ARTBase::Synchronized::kFalse>& state) -> EditView {
const MemTableEntry* entry = state.mem_table_->hash_index().unsynchronized_find_key(key);
BATT_CHECK_NOT_NULLPTR(entry);

if (key_only) {
return EditView{entry->key_, ValueView{}};
}
return EditView{entry->key_, entry->value_};
},
[key_only](const MemTableValueScanState<ARTBase::Synchronized::kTrue>& state) -> EditView {
[](const MemTableValueScanState<ARTBase::Synchronized::kTrue>& state) -> EditView {
const MemTableValueEntry& entry = state.art_scanner_->get_value();
if (key_only) {
return EditView{entry.key_view(), ValueView{}};
}
return EditView{entry.key_view(), entry.value_view()};
},
[key_only](const MemTableValueScanState<ARTBase::Synchronized::kFalse>& state) -> EditView {
[](const MemTableValueScanState<ARTBase::Synchronized::kFalse>& state) -> EditView {
const MemTableValueEntry& entry = state.art_scanner_->get_value();
if (key_only) {
return EditView{entry.key_view(), ValueView{}};
}
return EditView{entry.key_view(), entry.value_view()};
},
[](const Slice<const EditView>& state) -> EditView {
return state.front();
},
[this, key_only](const TreeLevelScanState& state) -> EditView {
if (key_only) {
return EditView{this->key, ValueView{}};
}
[this](const TreeLevelScanState& state) -> EditView {
return EditView{this->key, get_value(state.kv_slice.front())};
},
[this, key_only](const TreeLevelScanShardedState& state) -> EditView {
if (key_only) {
return EditView{this->key, ValueView{}};
}
[this](const TreeLevelScanShardedState& state) -> EditView {
return EditView{this->key, state.kv_slice.front_value()};
},
[this, key_only](const ShardedLeafScanState& state) -> EditView {
if (key_only) {
return EditView{this->key, ValueView{}};
}
[this](const ShardedLeafScanState& state) -> EditView {
return EditView{this->key, BATT_OK_RESULT_OR_PANIC(state.leaf_scanner_->front_value())};
});
}
Expand Down
Loading
Loading