diff --git a/.gitignore b/.gitignore index b46cac7c2..c02f79962 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ .idea/** .cache .clangd +.claude/* build test_package/build diff --git a/conanfile.py b/conanfile.py index db335c8f2..bb98baeb3 100644 --- a/conanfile.py +++ b/conanfile.py @@ -10,7 +10,7 @@ class HomeObjectConan(ConanFile): name = "homeobject" - version = "3.0.20" + version = "4.1.0" homepage = "https://github.com/eBay/HomeObject" description = "Blob Store built on HomeStore" diff --git a/src/include/homeobject/blob_manager.hpp b/src/include/homeobject/blob_manager.hpp index 95b038eb0..75ee77199 100644 --- a/src/include/homeobject/blob_manager.hpp +++ b/src/include/homeobject/blob_manager.hpp @@ -43,7 +43,7 @@ class BlobManager : public Manager< BlobError > { public: virtual AsyncResult< blob_id_t > put(shard_id_t shard, Blob&&, trace_id_t tid = 0) = 0; virtual AsyncResult< Blob > get(shard_id_t shard, blob_id_t const& blob, uint64_t off = 0, uint64_t len = 0, - trace_id_t tid = 0) const = 0; + bool allow_skip_verify = false, trace_id_t tid = 0) const = 0; virtual NullAsyncResult del(shard_id_t shard, blob_id_t const& blob, trace_id_t tid = 0) = 0; }; diff --git a/src/include/homeobject/shard_manager.hpp b/src/include/homeobject/shard_manager.hpp index d5bd44a78..5e7a6dd60 100644 --- a/src/include/homeobject/shard_manager.hpp +++ b/src/include/homeobject/shard_manager.hpp @@ -25,6 +25,7 @@ struct ShardError { }; struct ShardInfo { + static constexpr uint64_t meta_length = 1024 + 1; enum class State : uint8_t { OPEN = 0, SEALED = 1, @@ -40,6 +41,7 @@ struct ShardInfo { uint64_t available_capacity_bytes; uint64_t total_capacity_bytes; std::optional< peer_id_t > current_leader{std::nullopt}; + uint8_t meta[meta_length]{}; auto operator<=>(ShardInfo const& rhs) const { return id <=> rhs.id; } auto operator==(ShardInfo const& rhs) const { return id == rhs.id; } @@ -55,7 +57,7 @@ class ShardManager : public Manager< ShardError > { virtual AsyncResult< ShardInfo > get_shard(shard_id_t id, trace_id_t tid = 0) const = 0; virtual AsyncResult< InfoList > list_shards(pg_id_t id, trace_id_t tid = 0) const = 0; - virtual AsyncResult< ShardInfo > create_shard(pg_id_t pg_owner, uint64_t size_bytes, trace_id_t tid = 0) = 0; + virtual AsyncResult< ShardInfo > create_shard(pg_id_t pg_owner, uint64_t size_bytes, std::string meta, trace_id_t tid = 0) = 0; virtual AsyncResult< ShardInfo > seal_shard(shard_id_t id, trace_id_t tid = 0) = 0; }; diff --git a/src/lib/blob_manager.cpp b/src/lib/blob_manager.cpp index 739ac5648..d7b54a882 100644 --- a/src/lib/blob_manager.cpp +++ b/src/lib/blob_manager.cpp @@ -5,11 +5,11 @@ namespace homeobject { std::shared_ptr< BlobManager > HomeObjectImpl::blob_manager() { return shared_from_this(); } BlobManager::AsyncResult< Blob > HomeObjectImpl::get(shard_id_t shard, blob_id_t const& blob_id, uint64_t off, - uint64_t len, trace_id_t tid) const { + uint64_t len, bool allow_skip_verify, trace_id_t tid) const { return _get_shard(shard, tid) - .thenValue([this, blob_id, off, len, tid](auto const e) -> BlobManager::AsyncResult< Blob > { + .thenValue([this, blob_id, off, len, allow_skip_verify, tid](auto const e) -> BlobManager::AsyncResult< Blob > { if (!e) return folly::makeUnexpected(BlobError(BlobErrorCode::UNKNOWN_SHARD)); - return _get_blob(e.value(), blob_id, off, len, tid); + return _get_blob(e.value(), blob_id, off, len, allow_skip_verify, tid); }); } diff --git a/src/lib/homeobject_impl.hpp b/src/lib/homeobject_impl.hpp index c443d2523..4eb2af48f 100644 --- a/src/lib/homeobject_impl.hpp +++ b/src/lib/homeobject_impl.hpp @@ -88,12 +88,12 @@ class HomeObjectImpl : public HomeObject, public std::enable_shared_from_this< HomeObjectImpl > { /// Implementation defines these - virtual ShardManager::AsyncResult< ShardInfo > _create_shard(pg_id_t, uint64_t size_bytes, trace_id_t tid) = 0; + virtual ShardManager::AsyncResult< ShardInfo > _create_shard(pg_id_t, uint64_t size_bytes, std::string meta, trace_id_t tid) = 0; virtual ShardManager::AsyncResult< ShardInfo > _seal_shard(ShardInfo const&, trace_id_t tid) = 0; virtual BlobManager::AsyncResult< blob_id_t > _put_blob(ShardInfo const&, Blob&&, trace_id_t tid) = 0; virtual BlobManager::AsyncResult< Blob > _get_blob(ShardInfo const&, blob_id_t, uint64_t off, uint64_t len, - trace_id_t tid) const = 0; + bool allow_skip_verify, trace_id_t tid) const = 0; virtual BlobManager::NullAsyncResult _del_blob(ShardInfo const&, blob_id_t, trace_id_t tid) = 0; /// @@ -189,7 +189,7 @@ class HomeObjectImpl : public HomeObject, /// ShardManager ShardManager::AsyncResult< ShardInfo > get_shard(shard_id_t id, trace_id_t tid) const final; - ShardManager::AsyncResult< ShardInfo > create_shard(pg_id_t pg_owner, uint64_t size_bytes, trace_id_t tid) final; + ShardManager::AsyncResult< ShardInfo > create_shard(pg_id_t pg_owner, uint64_t size_bytes, std::string meta, trace_id_t tid) final; ShardManager::AsyncResult< InfoList > list_shards(pg_id_t pg, trace_id_t tid) const final; ShardManager::AsyncResult< ShardInfo > seal_shard(shard_id_t id, trace_id_t tid) final; uint64_t get_current_timestamp(); @@ -197,7 +197,7 @@ class HomeObjectImpl : public HomeObject, /// BlobManager BlobManager::AsyncResult< blob_id_t > put(shard_id_t shard, Blob&&, trace_id_t tid) final; BlobManager::AsyncResult< Blob > get(shard_id_t shard, blob_id_t const& blob, uint64_t off, uint64_t len, - trace_id_t tid) const final; + bool allow_skip_verify, trace_id_t tid) const final; BlobManager::NullAsyncResult del(shard_id_t shard, blob_id_t const& blob, trace_id_t tid) final; }; diff --git a/src/lib/homestore_backend/gc_manager.cpp b/src/lib/homestore_backend/gc_manager.cpp index 51beadd33..0f2e97a9d 100644 --- a/src/lib/homestore_backend/gc_manager.cpp +++ b/src/lib/homestore_backend/gc_manager.cpp @@ -122,7 +122,10 @@ void GCManager::start() { gc_actor->start(); LOGINFOMOD(gcmgr, "start gc actor for pdev={}", pdev_id); } + start_gc_scan_timer(); +} +void GCManager::start_gc_scan_timer() { const auto gc_scan_interval_sec = HS_BACKEND_DYNAMIC_CONFIG(gc_scan_interval_sec); // the initial idea here is that we want gc timer to run in a reactor that not shared with other fibers that @@ -146,9 +149,7 @@ void GCManager::start() { LOGINFOMOD(gcmgr, "gc scheduler timer has started, interval is set to {} seconds", gc_scan_interval_sec); } -bool GCManager::is_started() { return m_gc_timer_hdl != iomgr::null_timer_handle; } - -void GCManager::stop() { +void GCManager::stop_gc_scan_timer() { if (m_gc_timer_hdl == iomgr::null_timer_handle) { LOGWARNMOD(gcmgr, "gc scheduler timer is not running, no need to stop it"); return; @@ -162,6 +163,10 @@ void GCManager::stop() { m_gc_timer_hdl = iomgr::null_timer_handle; }); m_gc_timer_fiber = nullptr; +} + +void GCManager::stop() { + stop_gc_scan_timer(); for (const auto& [pdev_id, gc_actor] : m_pdev_gc_actors) { gc_actor->stop(); @@ -170,9 +175,20 @@ void GCManager::stop() { } folly::SemiFuture< bool > GCManager::submit_gc_task(task_priority priority, chunk_id_t chunk_id) { - if (!is_started()) return folly::makeFuture< bool >(false); + auto ex_vchunk = m_chunk_selector->get_extend_vchunk(chunk_id); + if (ex_vchunk == nullptr) { + LOGERRORMOD(gcmgr, "chunk {} not found when submit gc task!", chunk_id); + return folly::makeFuture< bool >(false); + } + + // if the chunk has no garbage to be reclaimed, we don`t need to gc it , return true directly + const auto defrag_blk_num = ex_vchunk->get_defrag_nblks(); + if (!defrag_blk_num && task_priority::normal == priority) { + LOGERRORMOD(gcmgr, "chunk {} has no garbage to be reclaimed, skip gc for this chunk!", chunk_id); + return folly::makeFuture< bool >(true); + } - auto pdev_id = m_chunk_selector->get_extend_vchunk(chunk_id)->get_pdev_id(); + auto pdev_id = ex_vchunk->get_pdev_id(); auto it = m_pdev_gc_actors.find(pdev_id); if (it == m_pdev_gc_actors.end()) { LOGINFOMOD(gcmgr, "pdev gc actor not found for pdev_id={}, chunk={}", pdev_id, chunk_id); @@ -218,7 +234,7 @@ bool GCManager::is_eligible_for_gc(chunk_id_t chunk_id) { const auto total_blk_num = chunk->get_total_blks(); const auto gc_garbage_rate_threshold = HS_BACKEND_DYNAMIC_CONFIG(gc_garbage_rate_threshold); - bool should_gc = 100 * defrag_blk_num >= total_blk_num * gc_garbage_rate_threshold; + bool should_gc = 100 * defrag_blk_num > total_blk_num * gc_garbage_rate_threshold; LOGDEBUGMOD(gcmgr, "gc scan chunk_id={}, use_blks={}, available_blks={}, total_blks={}, defrag_blks={}, should_gc={}", @@ -334,6 +350,11 @@ void GCManager::pdev_gc_actor::add_reserved_chunk( } folly::SemiFuture< bool > GCManager::pdev_gc_actor::add_gc_task(uint8_t priority, chunk_id_t move_from_chunk) { + if (m_is_stopped.load()) { + LOGWARNMOD(gcmgr, "pdev gc actor for pdev_id={} is not started yet or already stopped, cannot add gc task!", + m_pdev_id); + return folly::makeSemiFuture< bool >(false); + } auto EXvchunk = m_chunk_selector->get_extend_vchunk(move_from_chunk); // it does not belong to any pg, so we don't need to gc it. if (!EXvchunk->m_pg_id.has_value()) { diff --git a/src/lib/homestore_backend/gc_manager.hpp b/src/lib/homestore_backend/gc_manager.hpp index 9e2ccfe78..fb3771332 100644 --- a/src/lib/homestore_backend/gc_manager.hpp +++ b/src/lib/homestore_backend/gc_manager.hpp @@ -310,7 +310,11 @@ class GCManager { void start(); void stop(); - bool is_started(); + + // the following two functions should not be called concurrently. if we need to call them concurrently, we need to + // add lock to protect + void start_gc_scan_timer(); + void stop_gc_scan_timer(); void scan_chunks_for_gc(); void drain_pg_pending_gc_task(const pg_id_t pg_id); diff --git a/src/lib/homestore_backend/hs_blob_manager.cpp b/src/lib/homestore_backend/hs_blob_manager.cpp index 70e29c8aa..c89c32ec5 100644 --- a/src/lib/homestore_backend/hs_blob_manager.cpp +++ b/src/lib/homestore_backend/hs_blob_manager.cpp @@ -77,10 +77,6 @@ struct put_blob_req_ctx : public repl_result_ctx< BlobManager::Result< HSHomeObj blob_header_idx_ = data_bufs_.size() - 1; } - void copy_user_key(std::string const& user_key) { - std::memcpy((blob_header_buf().bytes() + sizeof(HSHomeObject::BlobHeader)), user_key.data(), user_key.size()); - } - HSHomeObject::BlobHeader* blob_header() { return r_cast< HSHomeObject::BlobHeader* >(blob_header_buf().bytes()); } sisl::io_blob_safe& blob_header_buf() { return data_bufs_[blob_header_idx_]; } }; @@ -92,6 +88,13 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s return folly::makeUnexpected(BlobErrorCode::SHUTTING_DOWN); } incr_pending_request_num(); + // check user key size + if (blob.user_key.size() > BlobHeader::max_user_key_length) { + BLOGE(tid, shard.id, 0, "input user key length > max_user_key_length {}", blob.user_key.size(), + BlobHeader::max_user_key_length); + decr_pending_request_num(); + return folly::makeUnexpected(BlobError(BlobErrorCode::INVALID_ARG)); + } auto& pg_id = shard.placement_group; shared< homestore::ReplDev > repl_dev; blob_id_t new_blob_id; @@ -128,7 +131,7 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s } // Create a put_blob request which allocates for header, key and blob_header, user_key. Data sgs are added later - auto req = put_blob_req_ctx::make(sizeof(BlobHeader) + blob.user_key.size()); + auto req = put_blob_req_ctx::make(sisl::round_up(sizeof(BlobHeader), repl_dev->get_blk_size())); req->header()->msg_type = ReplicationMessageType::PUT_BLOB_MSG; req->header()->payload_size = 0; req->header()->payload_crc = 0; @@ -157,11 +160,15 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s req->blob_header()->object_offset = blob.object_off; // Append the user key information if present. - if (!blob.user_key.empty()) { req->copy_user_key(blob.user_key); } + if (!blob.user_key.empty()) { + std::memcpy(req->blob_header()->user_key, blob.user_key.data(), blob.user_key.size()); + } + // TODO support data blocks checksum for partial read integrity // Set offset of actual data after the blob header and user key (rounded off) req->blob_header()->data_offset = req->blob_header_buf().size(); - + RELEASE_ASSERT(req->blob_header()->data_offset == _data_block_size, + "blob header should equals _data_block_size"); // In case blob body is not aligned, create a new aligned buffer and copy the blob body. if (((r_cast< uintptr_t >(blob.body.cbytes()) % io_align) != 0) || ((blob_size % io_align) != 0)) { // If address or size is not aligned, create a separate aligned buffer and do expensive memcpy. @@ -169,11 +176,9 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s std::memcpy(new_body.bytes(), blob.body.cbytes(), blob_size); blob.body = std::move(new_body); } - // Compute the checksum of blob and metadata. compute_blob_payload_hash(req->blob_header()->hash_algorithm, blob.body.cbytes(), blob_size, - (uint8_t*)blob.user_key.data(), blob.user_key.size(), req->blob_header()->hash, - BlobHeader::blob_max_hash_len); + req->blob_header()->hash, BlobHeader::blob_max_hash_len); req->blob_header()->seal(); // Add blob body to the request @@ -279,7 +284,8 @@ void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sis } BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard, blob_id_t blob_id, uint64_t req_offset, - uint64_t req_len, trace_id_t tid) const { + uint64_t req_len, bool allow_skip_verify, + trace_id_t tid) const { if (is_shutting_down()) { LOGI("service is being shutdown"); return folly::makeUnexpected(BlobErrorCode::SHUTTING_DOWN); @@ -322,15 +328,29 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard, return folly::makeUnexpected(r.error()); } - return _get_blob_data(repl_dev, shard.id, blob_id, req_offset, req_len, r.value() /* blkid*/, tid); + return _get_blob_data(repl_dev, shard.id, blob_id, req_offset, req_len, r.value() /* blkid*/, tid, + allow_skip_verify) + .deferValue([this](auto&& result) { + decr_pending_request_num(); + return std::forward< decltype(result) >(result); + }); } BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob_data(const shared< homestore::ReplDev >& repl_dev, shard_id_t shard_id, blob_id_t blob_id, uint64_t req_offset, uint64_t req_len, - const homestore::MultiBlkId& blkid, - trace_id_t tid) const { - auto const total_size = blkid.blk_count() * repl_dev->get_blk_size(); + const homestore::MultiBlkId& blkid, trace_id_t tid, + bool allow_skip_verify) const { + auto const blk_size = repl_dev->get_blk_size(); + auto const total_size = blkid.blk_count() * blk_size; + + // Use partial read path only when we can skip at least 1 data block (in addition to header) + // to make the optimization worthwhile. This requires req_len > 0 (known exact length). + if (allow_skip_verify && req_len > 0 && + (req_offset >= blk_size || req_offset + req_len + blk_size <= total_size - _data_block_size)) { + return _get_blob_data_partial(repl_dev, shard_id, blob_id, req_offset, req_len, blkid, tid); + } + sisl::io_blob_safe read_buf{total_size, io_align}; sisl::sg_list sgs; @@ -343,44 +363,19 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob_data(const shared< home read_buf = std::move(read_buf)](auto&& result) mutable -> BlobManager::AsyncResult< Blob > { if (result) { BLOGE(tid, shard_id, blob_id, "Failed to get blob: err={}", blob_id, shard_id, result.value()); - decr_pending_request_num(); - return folly::makeUnexpected(BlobError(BlobErrorCode::READ_FAILED)); - } - - BlobHeader const* header = r_cast< BlobHeader const* >(read_buf.cbytes()); - if (!header->valid()) { - BLOGE(tid, shard_id, blob_id, "Invalid header found: [header={}]", header->to_string()); - decr_pending_request_num(); return folly::makeUnexpected(BlobError(BlobErrorCode::READ_FAILED)); } - if (header->shard_id != shard_id) { - BLOGE(tid, shard_id, blob_id, "Invalid shard_id in header: [header={}]", header->to_string()); - decr_pending_request_num(); - return folly::makeUnexpected(BlobError(BlobErrorCode::READ_FAILED)); - } - - // Metadata start offset is just after blob header - std::string user_key = header->user_key_size - ? std::string((const char*)(read_buf.bytes() + sizeof(BlobHeader)), (size_t)header->user_key_size) - : std::string{}; - - uint8_t const* blob_bytes = read_buf.bytes() + header->data_offset; - uint8_t computed_hash[BlobHeader::blob_max_hash_len]{}; - compute_blob_payload_hash(header->hash_algorithm, blob_bytes, header->blob_size, - uintptr_cast(user_key.data()), header->user_key_size, computed_hash, - BlobHeader::blob_max_hash_len); - if (std::memcmp(computed_hash, header->hash, BlobHeader::blob_max_hash_len) != 0) { - BLOGE(tid, shard_id, blob_id, "Hash mismatch header, [header={}] [computed={:np}]", header->to_string(), - spdlog::to_hex(computed_hash, computed_hash + BlobHeader::blob_max_hash_len)); - decr_pending_request_num(); - return folly::makeUnexpected(BlobError(BlobErrorCode::CHECKSUM_MISMATCH)); + auto verify_result = do_verify_blob(read_buf.cbytes(), shard_id, 0 /* no blob_id check */); + if (!verify_result.hasValue()) { + return folly::makeUnexpected(verify_result.error()); } + std::string user_key = std::move(verify_result.value()); + BlobHeader const* header = r_cast< BlobHeader const* >(read_buf.cbytes()); if (req_offset + req_len > header->blob_size) { BLOGE(tid, shard_id, blob_id, "Invalid offset length requested in get blob offset={} len={} size={}", req_offset, req_len, header->blob_size); - decr_pending_request_num(); return folly::makeUnexpected(BlobError(BlobErrorCode::INVALID_ARG)); } @@ -388,14 +383,73 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob_data(const shared< home // whole blob size else copy only the request length. auto res_len = req_len == 0 ? header->blob_size - req_offset : req_len; auto body = sisl::io_blob_safe(res_len); + uint8_t const* blob_bytes = read_buf.bytes() + header->data_offset; std::memcpy(body.bytes(), blob_bytes + req_offset, res_len); BLOGD(tid, shard_id, blob_id, "Blob get success: blkid={}", blkid.to_string()); - decr_pending_request_num(); return Blob(std::move(body), std::move(user_key), header->object_offset, repl_dev->get_leader_id()); }); } +BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob_data_partial(const shared< homestore::ReplDev >& repl_dev, + shard_id_t shard_id, blob_id_t blob_id, + uint64_t req_offset, uint64_t req_len, + const homestore::MultiBlkId& blkid, + trace_id_t tid) const { + auto const blk_size = repl_dev->get_blk_size(); + + // In v4, BlobHeader is fixed at _data_block_size (4KB), and data starts immediately after + // We can skip reading the header entirely for partial reads that don't overlap with it + auto const fixed_data_offset = _data_block_size; + + // Calculate byte range within the storage (including header) + auto const read_start_byte = fixed_data_offset + req_offset; + auto const read_end_byte = read_start_byte + req_len; + + // Calculate which blocks we need to read + uint32_t start_blk = read_start_byte / blk_size; + uint32_t num_blks = (read_end_byte + blk_size - 1) / blk_size - start_blk; + uint32_t read_size = num_blks * blk_size; + + // Create MultiBlkId for the range we need + homestore::MultiBlkId read_blkid; + read_blkid.add(blkid.blk_num() + start_blk, num_blks, blkid.chunk_num()); + + sisl::io_blob_safe read_buf{read_size, io_align}; + sisl::sg_list sgs; + sgs.size = read_size; + sgs.iovs.emplace_back(iovec{.iov_base = read_buf.bytes(), .iov_len = read_buf.size()}); + + BLOGD(tid, shard_id, blob_id, + "Reading partial data: offset={}, len={}, full_blkid={}, read_blkid={}, start_blk={}, num_blks={}", + req_offset, req_len, blkid.to_string(), read_blkid.to_string(), start_blk, num_blks); + + return repl_dev->async_read(read_blkid, sgs, read_size) + .thenValue([tid, blob_id, shard_id, req_offset, req_len, blkid, repl_dev, start_blk, blk_size, + read_buf = std::move(read_buf)](auto&& result) mutable -> BlobManager::AsyncResult< Blob > { + if (result) { + BLOGE(tid, shard_id, blob_id, "Failed to read partial data: err={}", result.value()); + return folly::makeUnexpected(BlobError(BlobErrorCode::READ_FAILED)); + } + + // Calculate offset within read buffer + auto const data_start_in_storage = _data_block_size; + auto const req_start_in_storage = data_start_in_storage + req_offset; + auto const read_start_in_storage = start_blk * blk_size; + auto const offset_in_buf = req_start_in_storage - read_start_in_storage; + + uint8_t const* blob_bytes = read_buf.bytes() + offset_in_buf; + + // Copy the requested blob bytes + auto body = sisl::io_blob_safe(req_len); + std::memcpy(body.bytes(), blob_bytes, req_len); + + BLOGD(tid, shard_id, blob_id, "Blob partial get success: blkid={}", blkid.to_string()); + // user_key and object_offset are not available in partial read mode + return Blob(std::move(body), std::string{}, 0 /* object_offset */, repl_dev->get_leader_id()); + }); +} + homestore::ReplResult< homestore::blk_alloc_hints > HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive< homestore::repl_req_ctx >& hs_ctx) { repl_result_ctx< BlobManager::Result< BlobInfo > >* ctx{nullptr}; @@ -542,7 +596,7 @@ void HSHomeObject::on_blob_del_commit(int64_t lsn, sisl::blob const& header, sis RELEASE_ASSERT(hs_pg, "PG not found, pg={}", pg_id); auto index_table = hs_pg->index_table_; auto repl_dev = hs_pg->repl_dev_; - RELEASE_ASSERT(index_table != nullptr, "Index table not initialized"); + RELEASE_ASSERT(index_table != nullptr, "Index table not intialized"); RELEASE_ASSERT(repl_dev != nullptr, "Repl dev instance null"); const auto shard_id = msg_header->shard_id; @@ -591,8 +645,7 @@ void HSHomeObject::on_blob_del_commit(int64_t lsn, sisl::blob const& header, sis } void HSHomeObject::compute_blob_payload_hash(BlobHeader::HashAlgorithm algorithm, const uint8_t* blob_bytes, - size_t blob_size, const uint8_t* user_key_bytes, size_t user_key_size, - uint8_t* hash_bytes, size_t hash_len) const { + size_t blob_size, uint8_t* hash_bytes, size_t hash_len) const { std::memset(hash_bytes, 0, hash_len); switch (algorithm) { case HSHomeObject::BlobHeader::HashAlgorithm::NONE: { @@ -600,7 +653,6 @@ void HSHomeObject::compute_blob_payload_hash(BlobHeader::HashAlgorithm algorithm } case HSHomeObject::BlobHeader::HashAlgorithm::CRC32: { auto hash32 = crc32_ieee(init_crc32, blob_bytes, blob_size); - if (user_key_size != 0) { hash32 = crc32_ieee(hash32, user_key_bytes, user_key_size); } RELEASE_ASSERT(sizeof(uint32_t) <= hash_len, "Hash length invalid"); std::memcpy(hash_bytes, r_cast< uint8_t* >(&hash32), sizeof(uint32_t)); break; @@ -640,51 +692,54 @@ void HSHomeObject::on_blob_message_rollback(int64_t lsn, sisl::blob const& heade } } -bool HSHomeObject::verify_blob(const void* blob, const shard_id_t shard_id, const blob_id_t blob_id, - bool allow_delete_marker) const { - if (allow_delete_marker && !std::memcmp(blob, delete_marker_blob_data.data(), delete_marker_blob_data.size())) { - LOGW("find delete_marker for shard_id={}, blob_id={}, skipping verification!", shard_id, blob_id); - return true; - } - +BlobManager::Result< std::string > HSHomeObject::do_verify_blob(const void* blob, shard_id_t expected_shard_id, + blob_id_t expected_blob_id) const { uint8_t const* blob_data = static_cast< uint8_t const* >(blob); - HSHomeObject::BlobHeader const* header = r_cast< HSHomeObject::BlobHeader const* >(blob_data); + BlobHeader const* header = r_cast< BlobHeader const* >(blob_data); + // Check if header is valid if (!header->valid()) { - LOGE("read blob header is not valid for shard_id={}, blob_id={}, " - "Invalid header found: [header={}]", - shard_id, blob_id, header->to_string()); - return false; + LOGE("Invalid header found: [header={}]", header->to_string()); + return folly::makeUnexpected(BlobError(BlobErrorCode::READ_FAILED)); } - if (header->shard_id != shard_id) { - LOGE("expecting shard_id={}, Invalid shard_id={} in header: [header={}]", shard_id, header->shard_id, - header->to_string()); - return false; + // Check if shard_id matches + if (header->shard_id != expected_shard_id) { + LOGE("Invalid shard_id in header: [header={}]", header->to_string()); + return folly::makeUnexpected(BlobError(BlobErrorCode::READ_FAILED)); } - if (header->blob_id != blob_id) { - LOGE("expecting blob_id={}, Invalid blob_id={} in header: [header={}]", blob_id, header->blob_id, - header->to_string()); - return false; + // Check if blob_id matches (only if expected_blob_id != 0) + if (expected_blob_id != 0 && header->blob_id != expected_blob_id) { + LOGE("Invalid blob_id in header: [header={}]", header->to_string()); + return folly::makeUnexpected(BlobError(BlobErrorCode::READ_FAILED)); } - std::string user_key = header->user_key_size - ? std::string((const char*)(blob_data + sizeof(HSHomeObject::BlobHeader)), (size_t)header->user_key_size) - : std::string{}; - + // Verify hash uint8_t const* blob_bytes = blob_data + header->data_offset; - uint8_t computed_hash[HSHomeObject::BlobHeader::blob_max_hash_len]{}; - compute_blob_payload_hash(header->hash_algorithm, blob_bytes, header->blob_size, uintptr_cast(user_key.data()), - header->user_key_size, computed_hash, HSHomeObject::BlobHeader::blob_max_hash_len); + uint8_t computed_hash[BlobHeader::blob_max_hash_len]{}; + compute_blob_payload_hash(header->hash_algorithm, blob_bytes, header->blob_size, computed_hash, + BlobHeader::blob_max_hash_len); - if (std::memcmp(computed_hash, header->hash, HSHomeObject::BlobHeader::blob_max_hash_len) != 0) { + if (std::memcmp(computed_hash, header->hash, BlobHeader::blob_max_hash_len) != 0) { LOGE("Hash mismatch header, [header={}] [computed={:np}]", header->to_string(), - spdlog::to_hex(computed_hash, computed_hash + HSHomeObject::BlobHeader::blob_max_hash_len)); - return false; + spdlog::to_hex(computed_hash, computed_hash + BlobHeader::blob_max_hash_len)); + return folly::makeUnexpected(BlobError(BlobErrorCode::CHECKSUM_MISMATCH)); } - return true; + return header->get_user_key().value(); // Must have a value as header verified above } +bool HSHomeObject::verify_blob(const void* blob, const shard_id_t shard_id, const blob_id_t blob_id, + bool allow_delete_marker) const { + // Handle deleteMarker case + if (0 == std::memcmp(blob, delete_marker_blob_data.data(), delete_marker_blob_data.size())) { + LOGW("Found delete_marker for shard_id={}, blob_id={}, skipping verification!", shard_id, blob_id); + return allow_delete_marker; + } + + // Use the new _verify_blob method + auto result = do_verify_blob(blob, shard_id, blob_id); + return result.hasValue(); +} } // namespace homeobject diff --git a/src/lib/homestore_backend/hs_homeobject.cpp b/src/lib/homestore_backend/hs_homeobject.cpp index c584a50cd..ef84a4c27 100644 --- a/src/lib/homestore_backend/hs_homeobject.cpp +++ b/src/lib/homestore_backend/hs_homeobject.cpp @@ -282,6 +282,9 @@ void HSHomeObject::on_replica_restart() { [this](bool success) { on_shard_meta_blk_recover_completed(success); }, true); homestore::meta_service().read_sub_sb(_shard_meta_name); + // Write migrated shard metadata to disk (must be called AFTER read_sub_sb returns to avoid deadlock) + write_migrated_shard_metablks(); + // recover snapshot context homestore::meta_service().register_handler( _snp_ctx_meta_name, @@ -567,4 +570,15 @@ void HSHomeObject::yield_pg_leadership_to_follower(int32_t pg_id) { } } +void HSHomeObject::trigger_snapshot_creation(int32_t pg_id, int64_t compact_lsn, bool wait_for_commit) { + LOGI("Triggering snapshot creation for pg_id={}, compact_lsn={}, wait_for_commit={}", pg_id, compact_lsn, + wait_for_commit); + auto hs_pg = get_hs_pg(pg_id); + if (!hs_pg) { + LOGE("Failed to trigger snapshot: PG {} not found", pg_id); + return; + } + hs_pg->trigger_snapshot_creation(compact_lsn, wait_for_commit); +} + } // namespace homeobject diff --git a/src/lib/homestore_backend/hs_homeobject.hpp b/src/lib/homestore_backend/hs_homeobject.hpp index f2f14d93e..15137f820 100644 --- a/src/lib/homestore_backend/hs_homeobject.hpp +++ b/src/lib/homestore_backend/hs_homeobject.hpp @@ -54,12 +54,13 @@ class HSHomeObject : public HomeObjectImpl { uint32_t _hs_reserved_blks = 0; /// Overridable Helpers - ShardManager::AsyncResult< ShardInfo > _create_shard(pg_id_t, uint64_t size_bytes, trace_id_t tid) override; + ShardManager::AsyncResult< ShardInfo > _create_shard(pg_id_t, uint64_t size_bytes, std::string meta, + trace_id_t tid) override; ShardManager::AsyncResult< ShardInfo > _seal_shard(ShardInfo const&, trace_id_t tid) override; BlobManager::AsyncResult< blob_id_t > _put_blob(ShardInfo const&, Blob&&, trace_id_t tid) override; BlobManager::AsyncResult< Blob > _get_blob(ShardInfo const&, blob_id_t, uint64_t off, uint64_t len, - trace_id_t tid) const override; + bool allow_skip_verify, trace_id_t tid) const override; BlobManager::NullAsyncResult _del_blob(ShardInfo const&, blob_id_t, trace_id_t tid) override; PGManager::NullAsyncResult _create_pg(PGInfo&& pg_info, std::set< peer_id_t > const& peers, @@ -101,7 +102,25 @@ class HSHomeObject : public HomeObjectImpl { // mapping from chunk to shard list. std::unordered_map< homestore::chunk_num_t, std::set< shard_id_t > > chunk_to_shards_map_; + // Shard migration info: tracks shards that need migration from v1 to v2 format + std::vector< shard_id_t > shards_to_migrate_; + public: + // Old version shard_info_superblk (v0.01) - for backward compatibility testing and migration + // v1 ShardInfo did not have the meta field + struct v1_ShardInfo { + shard_id_t id; + pg_id_t placement_group; + ShardInfo::State state; + uint64_t lsn; + uint64_t created_time; + uint64_t last_modified_time; + uint64_t available_capacity_bytes; + uint64_t total_capacity_bytes; + std::optional< peer_id_t > current_leader{std::nullopt}; + // Note: meta field was added in v2 + }; + #pragma pack(1) struct pg_members { peer_id_t id; @@ -175,7 +194,7 @@ class HSHomeObject : public HomeObjectImpl { }; struct DataHeader { - static constexpr uint8_t data_header_version = 0x01; + static constexpr uint8_t data_header_version = 0x02; static constexpr uint64_t data_header_magic = 0x21fdffdba8d68fc6; // echo "BlobHeader" | md5sum enum class data_type_t : uint32_t { SHARD_INFO = 1, BLOB_INFO = 2 }; @@ -188,9 +207,22 @@ class HSHomeObject : public HomeObjectImpl { }; struct shard_info_superblk : DataHeader { + // This version is a common version of DataHeader, each derived struct can have its own version. + static constexpr uint8_t shard_sb_version = 0x02; + + uint8_t sb_version{shard_sb_version}; ShardInfo info; - homestore::chunk_num_t p_chunk_id; - homestore::chunk_num_t v_chunk_id; + homestore::chunk_num_t p_chunk_id{0}; + homestore::chunk_num_t v_chunk_id{0}; + + // backward compatibility + bool valid() const { return DataHeader::valid() && sb_version <= shard_sb_version; } + }; + + struct v1_shard_info_superblk : DataHeader { + v1_ShardInfo info; + homestore::chunk_num_t p_chunk_id{0}; + homestore::chunk_num_t v_chunk_id{0}; }; struct snapshot_ctx_superblk { @@ -373,6 +405,8 @@ class HSHomeObject : public HomeObjectImpl { void yield_leadership_to_follower() const; + void trigger_snapshot_creation(int64_t compact_lsn, bool wait_for_commit) const; + /** * Returns all shards */ @@ -400,6 +434,8 @@ class HSHomeObject : public HomeObjectImpl { // Padding of zeroes is added to make sure the whole payload be aligned to device block size. struct BlobHeader : DataHeader { static constexpr uint64_t blob_max_hash_len = 32; + static constexpr uint64_t max_user_key_length = 1024; + static constexpr uint8_t blob_header_version = 0x02; enum class HashAlgorithm : uint8_t { NONE = 0, @@ -408,6 +444,7 @@ class HSHomeObject : public HomeObjectImpl { SHA1 = 3, }; + uint8_t blob_hdr_version{blob_header_version}; HashAlgorithm hash_algorithm; mutable uint8_t header_hash[blob_max_hash_len]{}; uint8_t hash[blob_max_hash_len]{}; @@ -415,17 +452,28 @@ class HSHomeObject : public HomeObjectImpl { blob_id_t blob_id; uint32_t blob_size; uint64_t object_offset; // Offset of this blob in the object. Provided by GW. - uint32_t data_offset; // Offset of actual data blob stored after the metadata. + uint32_t data_offset; // Offset of actual data blob stored after the metadata uint32_t user_key_size; // Actual size of the user key. + uint8_t user_key[max_user_key_length + 1]{}; + uint8_t padding[2956]{}; // data_block_size is 4K, so total size of BlobHeader is 4096 bytes + + std::optional< std::string > get_user_key() const { + if (user_key_size > max_user_key_length) { return std::nullopt; } + std::string ret = user_key_size ? std::string((const char*)user_key, user_key_size) : std::string{}; + return ret; + } std::string to_string() const { - return fmt::format("magic={:#x} version={} shard={:#x} blob_size={} user_size={} algo={} hash={:np}\n", - magic, version, shard_id, blob_size, user_key_size, (uint8_t)hash_algorithm, - spdlog::to_hex(hash, hash + blob_max_hash_len)); + return fmt::format( + "magic={:#x} version={} shard={:#x} blob_size={} user_size={} algo={} hash={:np}, user_key={}\n", magic, + version, shard_id, blob_size, user_key_size, (uint8_t)hash_algorithm, + spdlog::to_hex(hash, hash + blob_max_hash_len), get_user_key().value_or("")); } bool valid() const { - if (!DataHeader::valid()) { return false; } + if (!DataHeader::valid() || blob_hdr_version > blob_header_version || user_key_size > max_user_key_length) { + return false; + } uint8_t hash_arr[blob_max_hash_len]; std::memcpy(hash_arr, header_hash, blob_max_hash_len); @@ -472,7 +520,8 @@ class HSHomeObject : public HomeObjectImpl { } }; #pragma pack() - + // size of BlobHeader should be smaller than _data_block_size + static_assert(sizeof(BlobHeader) == _data_block_size); struct BlobInfo { shard_id_t shard_id; blob_id_t blob_id; @@ -693,7 +742,13 @@ class HSHomeObject : public HomeObjectImpl { // blob related BlobManager::AsyncResult< Blob > _get_blob_data(const shared< homestore::ReplDev >& repl_dev, shard_id_t shard_id, blob_id_t blob_id, uint64_t req_offset, uint64_t req_len, - const homestore::MultiBlkId& blkid, trace_id_t tid) const; + const homestore::MultiBlkId& blkid, trace_id_t tid, + bool allow_skip_verify = false) const; + + BlobManager::AsyncResult< Blob > _get_blob_data_partial(const shared< homestore::ReplDev >& repl_dev, + shard_id_t shard_id, blob_id_t blob_id, uint64_t req_offset, + uint64_t req_len, const homestore::MultiBlkId& blkid, + trace_id_t tid) const; // create pg related static PGManager::NullAsyncResult do_create_pg(cshared< homestore::ReplDev > repl_dev, PGInfo&& pg_info, @@ -719,6 +774,7 @@ class HSHomeObject : public HomeObjectImpl { void on_pg_meta_blk_found(sisl::byte_view const& buf, void* meta_cookie); void on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf); void on_shard_meta_blk_recover_completed(bool success); + void write_migrated_shard_metablks(); void on_snp_ctx_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf); void on_snp_ctx_meta_blk_recover_completed(bool success); void on_snp_rcvr_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf); @@ -947,6 +1003,22 @@ class HSHomeObject : public HomeObjectImpl { */ void yield_pg_leadership_to_follower(int32_t pg_id = 1); + /** + * @brief Manually trigger a snapshot creation. + * @param compact_lsn Expected compact up to LSN. Default is -1, meaning it depends directly on the current HS + * status. + * @param wait_for_commit Wait committed lsn reaches compact_lsn. Default is true, false means the snapshot will be + * triggered based on its latest committed lsn and the log compaction depends on min(snapshot_lsn, compact_lsn) + * + * Recommendation: + * - Please keep wait_for_commit=true to make sure the compact_lsn <= committed_lsn when snapshot is created. If you + * just want to trigger a snapshot manually or want to update the compact_lsn (already less than committed_lsn) for + * log compaction, you can set wait_for_commit=false. + * - wait_for_commit=true might cause the caller blocked for a while until the committed_lsn reaches compact_lsn. + * Please keep in mind and take action if this function costs too long. + */ + void trigger_snapshot_creation(int32_t pg_id, int64_t compact_lsn, bool wait_for_commit); + // Blob manager related. void on_blob_message_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key, cintrusive< homestore::repl_req_ctx >& hs_ctx); @@ -958,8 +1030,7 @@ class HSHomeObject : public HomeObjectImpl { homestore::ReplResult< homestore::blk_alloc_hints > blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive< homestore::repl_req_ctx >& ctx); void compute_blob_payload_hash(BlobHeader::HashAlgorithm algorithm, const uint8_t* blob_bytes, size_t blob_size, - const uint8_t* user_key_bytes, size_t user_key_size, uint8_t* hash_bytes, - size_t hash_len) const; + uint8_t* hash_bytes, size_t hash_len) const; std::shared_ptr< homestore::IndexTableBase > recover_index_table(homestore::superblk< homestore::index_table_sb >&& sb); @@ -985,6 +1056,8 @@ class HSHomeObject : public HomeObjectImpl { BlobManager::Result< std::vector< BlobInfo > > get_shard_blobs(shard_id_t shard_id); private: + BlobManager::Result< std::string > do_verify_blob(const void* blob, shard_id_t expected_shard_id, + blob_id_t expected_blob_id = 0) const; std::shared_ptr< BlobIndexTable > create_pg_index_table(); std::shared_ptr< GCBlobIndexTable > create_gc_index_table(); diff --git a/src/lib/homestore_backend/hs_http_manager.cpp b/src/lib/homestore_backend/hs_http_manager.cpp index d02c7da3a..5dd9a865a 100644 --- a/src/lib/homestore_backend/hs_http_manager.cpp +++ b/src/lib/homestore_backend/hs_http_manager.cpp @@ -51,6 +51,8 @@ HttpManager::HttpManager(HSHomeObject& ho) : ho_(ho) { {Pistache::Http::Method::Post, "/api/v1/reconcile_membership", Pistache::Rest::Routes::bind(&HttpManager::reconcile_membership, this)}, {Pistache::Http::Method::Delete, "/api/v1/pg", Pistache::Rest::Routes::bind(&HttpManager::exit_pg, this)}, + {Pistache::Http::Method::Post, "/api/v1/trigger_snapshot_creation", + Pistache::Rest::Routes::bind(&HttpManager::trigger_snapshot_creation, this)}, #ifdef _PRERELEASE {Pistache::Http::Method::Post, "/api/v1/crashSystem", Pistache::Rest::Routes::bind(&HttpManager::crash_system, this)}, @@ -62,7 +64,17 @@ HttpManager::HttpManager(HSHomeObject& ho) : ho_(ho) { {Pistache::Http::Method::Get, "/api/v1/chunk/dump", Pistache::Rest::Routes::bind(&HttpManager::dump_chunk, this)}, {Pistache::Http::Method::Get, "/api/v1/shard/dump", - Pistache::Rest::Routes::bind(&HttpManager::dump_shard, this)}}; + Pistache::Rest::Routes::bind(&HttpManager::dump_shard, this)}, + + // we support triggering gc for: + // 1 all the chunks in all the pg: no input param + // 2 all the chunks in a specific pg: input param is pg_id + // 3 a specific chunk: input param is pchunk_id + + {Pistache::Http::Method::Post, "/api/v1/trigger_gc", + Pistache::Rest::Routes::bind(&HttpManager::trigger_gc, this)}, + {Pistache::Http::Method::Get, "/api/v1/gc_job_status", + Pistache::Rest::Routes::bind(&HttpManager::get_gc_job_status, this)}}; auto http_server = ioenvironment.get_http_server(); if (!http_server) { @@ -105,6 +117,36 @@ void HttpManager::yield_leadership_to_follower(const Pistache::Rest::Request& re response.send(Pistache::Http::Code::Ok, "Yield leadership request submitted"); } +void HttpManager::trigger_snapshot_creation(const Pistache::Rest::Request& request, + Pistache::Http::ResponseWriter response) { + // Extract and validate pg_id parameter (required) + const auto pg_id_param = request.query().get("pg_id"); + if (!pg_id_param) { + response.send(Pistache::Http::Code::Bad_Request, "pg_id is required"); + return; + } + const int32_t pg_id = std::stoi(pg_id_param.value()); + + // Extract compact_lsn parameter (optional, default: -1 means use current HS status) + const auto compact_lsn_param = request.query().get("compact_lsn"); + const int64_t compact_lsn = std::stoll(compact_lsn_param.value_or("-1")); + + // Extract wait_for_commit parameter (optional, default: true) + const auto wait_for_commit_param = request.query().get("wait_for_commit"); + std::string wait_for_commit_mode = wait_for_commit_param.value_or("true"); + if (wait_for_commit_mode != "true" && wait_for_commit_mode != "false") { + response.send(Pistache::Http::Code::Bad_Request, "wait_for_commit must be 'true' or 'false'"); + return; + } + bool wait_for_commit = (wait_for_commit_mode == "true"); + + LOGINFO("Received snapshot creation request for pg_id={}, compact_lsn={}, wait_for_commit={}", pg_id, compact_lsn, + wait_for_commit); + + ho_.trigger_snapshot_creation(pg_id, compact_lsn, wait_for_commit); + response.send(Pistache::Http::Code::Ok, "Snapshot creation request submitted"); +} + void HttpManager::get_pg(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) { auto pg_str = request.query().get("pg_id"); if (!pg_str) { @@ -444,6 +486,290 @@ void HttpManager::exit_pg(const Pistache::Rest::Request& request, Pistache::Http response.send(Pistache::Http::Code::Ok, "Exit pg request submitted"); } +void HttpManager::trigger_gc(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) { + auto gc_mgr = ho_.gc_manager(); + if (!gc_mgr) { + response.send(Pistache::Http::Code::Internal_Server_Error, "GC manager not available"); + return; + } + + auto chunk_selector = ho_.chunk_selector(); + if (!chunk_selector) { + response.send(Pistache::Http::Code::Internal_Server_Error, "Chunk selector not available"); + return; + } + + const auto chunk_id_param = request.query().get("chunk_id"); + const auto pg_id_param = request.query().get("pg_id"); + + if (chunk_id_param && !chunk_id_param.value().empty()) { + // trigger gc for a specific chunk, the chunk_id is pchunk_id, not vchunk_id + uint32_t chunk_id = std::stoul(chunk_id_param.value()); + LOGINFO("Received trigger_gc request for chunk_id {}", chunk_id); + + auto chunk = chunk_selector->get_extend_vchunk(chunk_id); + if (!chunk) { + nlohmann::json error; + error["chunk_id"] = chunk_id; + error["error"] = "Chunk not found"; + response.send(Pistache::Http::Code::Not_Found, error.dump()); + return; + } + + if (!chunk->m_pg_id.has_value()) { + nlohmann::json error; + error["chunk_id"] = chunk_id; + error["error"] = "Chunk belongs to no pg"; + response.send(Pistache::Http::Code::Not_Found, error.dump()); + return; + } + + const auto pg_id = chunk->m_pg_id.value(); + nlohmann::json result; + const auto job_id = generate_job_id(); + + result["chunk_id"] = chunk_id; + result["pg_id"] = pg_id; + result["job_id"] = job_id; + + if (chunk->m_state == ChunkState::GC) { + result["message"] = "chunk is already under GC now, this task will not be executed!"; + response.send(Pistache::Http::Code::Ok, result.dump()); + return; + } + result["message"] = "GC triggered for chunk, pls query job status using gc_job_status API"; + + // return response before starting the GC so that we don't block the client. + response.send(Pistache::Http::Code::Accepted, result.dump()); + + auto job_info = std::make_shared< GCJobInfo >(job_id, pg_id, chunk_id); + { + std::lock_guard lock(gc_job_mutex_); + gc_jobs_map_.set(job_id, job_info); + } + + // sumbit gc task for this chunk + + // Clear in-memory requests only for emergent priority chunks (chunks with open shards) + auto hs_pg = const_cast< HSHomeObject::HS_PG* >(ho_.get_hs_pg(pg_id)); + RELEASE_ASSERT(hs_pg, "HS PG {} not found during GC job {}", pg_id, job_id); + auto repl_dev = hs_pg->repl_dev_; + repl_dev->quiesce_reqs(); + repl_dev->clear_chunk_req(chunk_id); + const auto priority = chunk->m_state == ChunkState::INUSE ? task_priority::emergent : task_priority::normal; + + gc_mgr->submit_gc_task(priority, chunk_id) + .via(&folly::InlineExecutor::instance()) + .thenValue([this, job_info, repl_dev](bool res) { + job_info->status = res ? GCJobStatus::COMPLETED : GCJobStatus::FAILED; + // Resume accepting new requests for this pg + repl_dev->resume_accepting_reqs(); + }); + } else if (pg_id_param && !pg_id_param.value().empty()) { + // trigger gc for all chunks in a specific pg + const auto pg_id = std::stoul(pg_id_param.value()); + LOGINFO("Received trigger_gc request for pg_id {}", pg_id); + auto hs_pg = const_cast< HSHomeObject::HS_PG* >(ho_.get_hs_pg(pg_id)); + if (!hs_pg) { + nlohmann::json error; + error["pg_id"] = pg_id; + error["error"] = "PG not found"; + response.send(Pistache::Http::Code::Not_Found, error.dump()); + return; + } + + nlohmann::json result; + const auto job_id = generate_job_id(); + result["pg_id"] = pg_id; + result["job_id"] = job_id; + result["message"] = "GC triggered for a single pg, pls query job status using gc_job_status API"; + // return response before starting the GC so that we don't block the client. + response.send(Pistache::Http::Code::Accepted, result.dump()); + + auto job_info = std::make_shared< GCJobInfo >(job_id, pg_id); + { + std::lock_guard lock(gc_job_mutex_); + gc_jobs_map_.set(job_id, job_info); + } + + LOGINFO("GC job {} stopping GC scan timer", job_id); + gc_mgr->stop_gc_scan_timer(); + + // we block here until all gc tasks for the pg are done + trigger_gc_for_pg(pg_id, job_id); + + LOGINFO("GC job {} restarting GC scan timer", job_id); + gc_mgr->start_gc_scan_timer(); + } else { + LOGINFO("Received trigger_gc request for all chunks"); + nlohmann::json result; + const auto job_id = generate_job_id(); + result["job_id"] = job_id; + result["message"] = "GC triggered for all chunks, pls query job status using gc_job_status API"; + // return response before starting the GC so that we don't block the client. + response.send(Pistache::Http::Code::Accepted, result.dump()); + + auto job_info = std::make_shared< GCJobInfo >(job_id); + { + std::lock_guard lock(gc_job_mutex_); + gc_jobs_map_.set(job_id, job_info); + } + + std::vector< pg_id_t > pg_ids; + ho_.get_pg_ids(pg_ids); + LOGINFO("GC job {} will process {} PGs", job_id, pg_ids.size()); + LOGINFO("GC job {} stopping GC scan timer", job_id); + gc_mgr->stop_gc_scan_timer(); + + // we block here until all gc tasks for the pg are done + for (const auto& pg_id : pg_ids) { + trigger_gc_for_pg(pg_id, job_id); + } + + LOGINFO("GC job {} restarting GC scan timer", job_id); + gc_mgr->start_gc_scan_timer(); + } +} + +std::string HttpManager::generate_job_id() { + auto counter = job_counter_.fetch_add(1, std::memory_order_relaxed); + return fmt::format("trigger-gc-task-{}", counter); +} + +void HttpManager::get_job_status(const std::string& job_id, nlohmann::json& result) { + result["job_id"] = job_id; + std::shared_ptr< GCJobInfo > job_info; + { + std::shared_lock lock(gc_job_mutex_); + job_info = gc_jobs_map_.get(job_id); + } + + if (!job_info) { + result["error"] = "job_id not found, or job has been evicted"; + return; + } + + switch (job_info->status) { + case GCJobStatus::RUNNING: + result["status"] = "running"; + break; + case GCJobStatus::COMPLETED: + result["status"] = "completed"; + break; + case GCJobStatus::FAILED: + result["status"] = "failed"; + break; + } + + if (job_info->chunk_id.has_value()) { result["chunk_id"] = job_info->chunk_id.value(); } + if (job_info->pg_id.has_value()) { result["pg_id"] = job_info->pg_id.value(); } + + if (job_info->total_chunks > 0) { + nlohmann::json stats; + stats["total_chunks"] = job_info->total_chunks; + stats["success_count"] = job_info->success_count; + stats["failed_count"] = job_info->failed_count; + result["statistics"] = stats; + } +} + +void HttpManager::get_gc_job_status(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) { + auto job_id_param = request.query().get("job_id"); + if (job_id_param && !job_id_param.value().empty()) { + const auto job_id = job_id_param.value(); + LOGINFO("query job {} status!", job_id); + nlohmann::json result; + get_job_status(job_id, result); + response.send(Pistache::Http::Code::Ok, result.dump()); + return; + } + + LOGINFO("query all job status!"); + nlohmann::json result; + std::vector< std::string > job_ids; + { + std::shared_lock lock(gc_job_mutex_); + for (const auto& [k, v] : gc_jobs_map_) { + job_ids.push_back(k); + } + } + + for (const auto& job_id : job_ids) { + nlohmann::json job_json; + get_job_status(job_id, job_json); + result["jobs"].push_back(job_json); + } + + response.send(Pistache::Http::Code::Ok, result.dump()); +} + +void HttpManager::trigger_gc_for_pg(uint16_t pg_id, const std::string& job_id) { + auto hs_pg = const_cast< HSHomeObject::HS_PG* >(ho_.get_hs_pg(pg_id)); + RELEASE_ASSERT(hs_pg, "HS PG {} not found during GC job {}", pg_id, job_id); + + LOGINFO("GC job {} draining pending GC tasks for PG {}", job_id, pg_id); + auto gc_mgr = ho_.gc_manager(); + gc_mgr->drain_pg_pending_gc_task(pg_id); + auto pg_sb = hs_pg->pg_sb_.get(); + std::vector< homestore::chunk_num_t > pg_chunks(pg_sb->get_chunk_ids(), pg_sb->get_chunk_ids() + pg_sb->num_chunks); + + LOGINFO("GC job {} processing PG {} with {} chunks", job_id, pg_id, pg_chunks.size()); + hs_pg->repl_dev_->quiesce_reqs(); + std::vector< folly::SemiFuture< bool > > gc_task_futures; + + std::shared_ptr< GCJobInfo > job_info; + { + std::shared_lock lock(gc_job_mutex_); + job_info = gc_jobs_map_.get(job_id); + } + + auto chunk_selector = ho_.chunk_selector(); + + for (const auto& chunk_id : pg_chunks) { + job_info->total_chunks++; + // Determine priority based on chunk state (INUSE means has open shard) + auto chunk = chunk_selector->get_extend_vchunk(chunk_id); + RELEASE_ASSERT(chunk, "Chunk {} not found during GC job {}", chunk_id, job_id); + auto priority = chunk->m_state == ChunkState::INUSE ? task_priority::emergent : task_priority::normal; + + // Clear in-memory requests only for emergent priority chunks (chunks with open shards) + if (priority == task_priority::emergent) { hs_pg->repl_dev_->clear_chunk_req(chunk_id); } + + // Submit GC task for this chunk + auto future = gc_mgr->submit_gc_task(priority, chunk_id); + gc_task_futures.push_back(std::move(future)); + LOGDEBUG("GC job {} for chunk {} in PG {} with priority={}", job_id, chunk_id, pg_id, + (priority == task_priority::emergent) ? "emergent" : "normal"); + } + + folly::collectAllUnsafe(gc_task_futures) + .thenValue([job_info](auto&& results) { + for (auto const& ok : results) { + RELEASE_ASSERT(ok.hasValue(), "we never throw any exception when copying data"); + if (ok.value()) { + job_info->success_count++; + } else { + job_info->failed_count++; + } + } + }) + .thenValue([this, pg_id, job_info, gc_mgr](auto&& rets) { + LOGINFO("All GC tasks have been processed"); + const auto& job_id = job_info->job_id; + + auto hs_pg = const_cast< HSHomeObject::HS_PG* >(ho_.get_hs_pg(pg_id)); + RELEASE_ASSERT(hs_pg, "HS PG {} not found during GC job {}", pg_id, job_id); + // Resume accepting new requests for this pg + hs_pg->repl_dev_->resume_accepting_reqs(); + LOGINFO("GC job {} resumed accepting requests for PG {}", job_id, pg_id); + + job_info->status = job_info->failed_count ? GCJobStatus::FAILED : GCJobStatus::COMPLETED; + LOGINFO("GC job {} completed: total={}, success={}, failed={}", job_id, job_info->total_chunks, + job_info->success_count, job_info->failed_count); + }) + .get(); +} + #ifdef _PRERELEASE void HttpManager::crash_system(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) { std::string crash_type; diff --git a/src/lib/homestore_backend/hs_http_manager.hpp b/src/lib/homestore_backend/hs_http_manager.hpp index 9a2d53591..9a6ee0b97 100644 --- a/src/lib/homestore_backend/hs_http_manager.hpp +++ b/src/lib/homestore_backend/hs_http_manager.hpp @@ -16,6 +16,11 @@ #include #include +#include +#include +#include +#include + namespace homeobject { class HSHomeObject; @@ -28,6 +33,7 @@ class HttpManager { void get_malloc_stats(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); void reconcile_leader(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); void yield_leadership_to_follower(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); + void trigger_snapshot_creation(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); void get_pg(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); void get_pg_chunks(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); void dump_chunk(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); @@ -40,12 +46,43 @@ class HttpManager { void reconcile_membership(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); void get_pg_quorum(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); void exit_pg(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); + void trigger_gc(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); + void get_gc_job_status(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); + void trigger_gc_for_pg(uint16_t pg_id, const std::string& job_id); + void get_job_status(const std::string& job_id, nlohmann::json& result); #ifdef _PRERELEASE void crash_system(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); #endif +private: + enum class GCJobStatus { RUNNING, COMPLETED, FAILED }; + + struct GCJobInfo { + std::string job_id; + GCJobStatus status; + std::optional< uint16_t > pg_id; + std::optional< uint32_t > chunk_id; + + // Statistics for batch GC jobs (all chunks) + uint32_t total_chunks{0}; + uint32_t success_count{0}; + uint32_t failed_count{0}; + + GCJobInfo(const std::string& id, std::optional< uint16_t > pgid = std::nullopt, + std::optional< uint32_t > cid = std::nullopt) : + job_id(id), status(GCJobStatus::RUNNING), pg_id(pgid), chunk_id(cid) {} + }; + + std::string generate_job_id(); + private: HSHomeObject& ho_; + std::atomic< uint64_t > job_counter_{0}; + std::shared_mutex gc_job_mutex_; + + // we don`t have an external DB to store the job status, so we only keep the status of the lastest 100 jobs for + // query. or, we can evict the job after it is completed after a timeout period. + folly::EvictingCacheMap< std::string, std::shared_ptr< GCJobInfo > > gc_jobs_map_{100}; }; } // namespace homeobject \ No newline at end of file diff --git a/src/lib/homestore_backend/hs_pg_manager.cpp b/src/lib/homestore_backend/hs_pg_manager.cpp index 548af166b..9dd9242cb 100644 --- a/src/lib/homestore_backend/hs_pg_manager.cpp +++ b/src/lib/homestore_backend/hs_pg_manager.cpp @@ -1084,6 +1084,10 @@ void HSHomeObject::HS_PG::yield_leadership_to_follower() const { repl_dev_->yield_leadership(false /*immediate_yield*/, candidate_leader_id); } +void HSHomeObject::HS_PG::trigger_snapshot_creation(int64_t compact_lsn, bool wait_for_commit) const { + repl_dev_->trigger_snapshot_creation(compact_lsn, wait_for_commit); +} + std::vector< Shard > HSHomeObject::HS_PG::get_chunk_shards(chunk_num_t v_chunk_id) const { std::vector< Shard > ret; for (auto const& s : shards_) { diff --git a/src/lib/homestore_backend/hs_shard_manager.cpp b/src/lib/homestore_backend/hs_shard_manager.cpp index 79873cb30..f67d0c055 100644 --- a/src/lib/homestore_backend/hs_shard_manager.cpp +++ b/src/lib/homestore_backend/hs_shard_manager.cpp @@ -93,6 +93,7 @@ std::string HSHomeObject::serialize_shard_info(const ShardInfo& info) { j["shard_info"]["modified_time"] = info.last_modified_time; j["shard_info"]["total_capacity"] = info.total_capacity_bytes; j["shard_info"]["available_capacity"] = info.available_capacity_bytes; + j["shard_info"]["meta"] = std::string(reinterpret_cast< const char* >(info.meta)); return j.dump(); } @@ -107,10 +108,13 @@ ShardInfo HSHomeObject::deserialize_shard_info(const char* json_str, size_t str_ shard_info.last_modified_time = shard_json["shard_info"]["modified_time"].get< uint64_t >(); shard_info.available_capacity_bytes = shard_json["shard_info"]["available_capacity"].get< uint64_t >(); shard_info.total_capacity_bytes = shard_json["shard_info"]["total_capacity"].get< uint64_t >(); + auto meta_str = shard_json["shard_info"]["meta"].get< std::string >(); + std::memcpy(shard_info.meta, meta_str.data(), meta_str.length()); + shard_info.meta[meta_str.length()] = '\0'; return shard_info; } -ShardManager::AsyncResult< ShardInfo > HSHomeObject::_create_shard(pg_id_t pg_owner, uint64_t size_bytes, +ShardManager::AsyncResult< ShardInfo > HSHomeObject::_create_shard(pg_id_t pg_owner, uint64_t size_bytes, std::string meta, trace_id_t tid) { if (is_shutting_down()) { @@ -118,7 +122,11 @@ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_create_shard(pg_id_t pg_ow return folly::makeUnexpected(ShardError(ShardErrorCode::SHUTTING_DOWN)); } incr_pending_request_num(); - + if (!meta.empty() && meta.length() > ShardInfo::meta_length - 1) { + LOGW("meta length {} exceeds max meta length {}, trace_id={}", meta.length(), ShardInfo::meta_length - 1, tid); + decr_pending_request_num(); + return folly::makeUnexpected(ShardError(ShardErrorCode::INVALID_ARG)); + } auto hs_pg = get_hs_pg(pg_owner); if (!hs_pg) { LOGW("failed to create shard with non-exist pg={}", pg_owner); @@ -175,6 +183,10 @@ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_create_shard(pg_id_t pg_ow .last_modified_time = create_time, .available_capacity_bytes = size_bytes, .total_capacity_bytes = size_bytes}; + if (!meta.empty()) { + std::memcpy(sb->info.meta, meta.data(), meta.length()); + sb->info.meta[meta.length()] = '\0'; + } sb->p_chunk_id = 0; sb->v_chunk_id = v_chunk_id; @@ -582,8 +594,58 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom } void HSHomeObject::on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf) { + // First peek at the version + auto* header = reinterpret_cast< const DataHeader* >(buf.bytes()); + homestore::superblk< shard_info_superblk > sb(_shard_meta_name); - sb.load(buf, mblk); + + // Detect and migrate old version data (DataHeader version 0x01) + if (header->version == 0x01) { + // Read data from buffer with old v1 layout + auto* old_sb = reinterpret_cast< const v1_shard_info_superblk* >(buf.bytes()); + + const auto v1_info = old_sb->info; + const auto saved_p_chunk_id = old_sb->p_chunk_id; + const auto saved_v_chunk_id = old_sb->v_chunk_id; + + LOGI("Detected v1 shard superblk (shard={}, pg={}, p_chunk={}, v_chunk={}), migrating to v2", v1_info.id, + v1_info.placement_group, saved_p_chunk_id, saved_v_chunk_id); + + // Create a new v2 superblk with the given mblk + sb.load(buf, mblk); + sb.resize(sizeof(shard_info_superblk)); + sb->magic = DataHeader::data_header_magic; + sb->version = DataHeader::data_header_version; // 0x02 + sb->type = DataHeader::data_type_t::SHARD_INFO; + sb->sb_version = shard_info_superblk::shard_sb_version; // 0x02 + + // Migrate v1_ShardInfo to v2 ShardInfo (v2 added meta field) + sb->info.id = v1_info.id; + sb->info.placement_group = v1_info.placement_group; + sb->info.state = v1_info.state; + sb->info.lsn = v1_info.lsn; + sb->info.created_time = v1_info.created_time; + sb->info.last_modified_time = v1_info.last_modified_time; + sb->info.available_capacity_bytes = v1_info.available_capacity_bytes; + sb->info.total_capacity_bytes = v1_info.total_capacity_bytes; + sb->info.current_leader = v1_info.current_leader; + // meta field is new in v2, initialize to empty + std::memset(sb->info.meta, 0, ShardInfo::meta_length); + + sb->p_chunk_id = saved_p_chunk_id; + sb->v_chunk_id = saved_v_chunk_id; + + // Save shard_id and old mblk pointer for migration (cannot write or remove during callback due to + // metasvc lock held - would cause deadlock) + shards_to_migrate_.push_back(v1_info.id); + + LOGI("Queued shard_id={} for migration write and old metablk removal after recovery", v1_info.id); + } else if (header->version == DataHeader::data_header_version) { + sb.load(buf, mblk); + } else { + RELEASE_ASSERT(false, "Unknown shard superblock version {}", header->version); + } + add_new_shard_to_map(std::make_unique< HS_Shard >(std::move(sb))); } @@ -607,6 +669,36 @@ void HSHomeObject::on_shard_meta_blk_recover_completed(bool success) { } } +void HSHomeObject::write_migrated_shard_metablks() { + // Write all migrated shards to disk + // This is called AFTER read_sub_sb() returns to avoid deadlock with metasvc lock + if (!shards_to_migrate_.empty()) { + LOGI("Writing {} migrated shard v2 superblocks", shards_to_migrate_.size()); + + std::scoped_lock lock_guard(_shard_lock); + for (auto& shard_id : shards_to_migrate_) { + auto shard_iter = _shard_map.find(shard_id); + if (shard_iter == _shard_map.end()) { + LOGW("Shard {} not found in shard map during migration write, skipping", shard_id); + continue; + } + + auto* hs_shard = d_cast< HS_Shard* >(shard_iter->second->get()); + + try { + hs_shard->sb_.write(); + LOGI("Successfully wrote migrated v2 shard superblk for shard_id={}", shard_id); + } catch (const std::exception& e) { + LOGE("Failed to migrate shard_id={}: {}", shard_id, e.what()); + // Continue with other shards even if one fails + } + } + + shards_to_migrate_.clear(); + LOGI("Completed migrating all shard superblocks from v1 to v2"); + } +} + void HSHomeObject::add_new_shard_to_map(std::unique_ptr< HS_Shard > shard) { // TODO: We are taking a global lock for all pgs to create shard. Is it really needed?? // We need to have fine grained per PG lock and take only that. @@ -812,6 +904,7 @@ HSHomeObject::HS_Shard::HS_Shard(ShardInfo shard_info, homestore::chunk_num_t p_ homestore::chunk_num_t v_chunk_id) : Shard(std::move(shard_info)), sb_(_shard_meta_name) { sb_.create(sizeof(shard_info_superblk)); + sb_->type = DataHeader::data_type_t::SHARD_INFO; sb_->info = info; sb_->p_chunk_id = p_chunk_id; sb_->v_chunk_id = v_chunk_id; diff --git a/src/lib/homestore_backend/pg_blob_iterator.cpp b/src/lib/homestore_backend/pg_blob_iterator.cpp index 419b039bc..9181f8ca8 100644 --- a/src/lib/homestore_backend/pg_blob_iterator.cpp +++ b/src/lib/homestore_backend/pg_blob_iterator.cpp @@ -213,9 +213,10 @@ bool HSHomeObject::PGBlobIterator::create_shard_snapshot_data(sisl::io_blob_safe } auto shard = shard_list_[cur_shard_idx_]; - auto shard_entry = CreateResyncShardMetaData( + std::vector< uint8_t > meta_bytes(shard.info.meta, shard.info.meta + ShardInfo::meta_length); + auto shard_entry = CreateResyncShardMetaDataDirect( builder_, shard.info.id, pg_id, static_cast< uint8_t >(shard.info.state), shard.info.lsn, - shard.info.created_time, shard.info.last_modified_time, shard.info.total_capacity_bytes, shard.v_chunk_num); + shard.info.created_time, shard.info.last_modified_time, shard.info.total_capacity_bytes, shard.v_chunk_num, &meta_bytes); builder_.FinishSizePrefixed(shard_entry); @@ -248,37 +249,10 @@ BlobManager::AsyncResult< blob_read_result > HSHomeObject::PGBlobIterator::load_ return folly::makeUnexpected(BlobError(BlobErrorCode::READ_FAILED)); } - BlobHeader const* header = r_cast< BlobHeader const* >(read_buf.cbytes()); - if (!header->valid()) { + if (!home_obj_.verify_blob(read_buf.cbytes(), shard_id, 0 /* no blob_id check */)) { // The metrics for corrupted blob is handled on the follower side. - LOGE("Invalid header found, shardID=0x{:x}, pg={}, shard=0x{:x}, blob_id={}, [header={}]", shard_id, - (shard_id >> homeobject::shard_width), (shard_id & homeobject::shard_mask), blob_id, - header->to_string()); - return blob_read_result(blob_id, std::move(read_buf), ResyncBlobState::CORRUPTED); - } - - if (header->shard_id != shard_id) { - // The metrics for corrupted blob is handled on the follower side. - LOGE("Invalid shard_id in header, shardID=0x{:x}, pg={}, shard=0x{:x}, blob_id={}, [header={}]", - shard_id, (shard_id >> homeobject::shard_width), (shard_id & homeobject::shard_mask), blob_id, - header->to_string()); - return blob_read_result(blob_id, std::move(read_buf), ResyncBlobState::CORRUPTED); - } - - std::string user_key = header->user_key_size - ? std::string((const char*)(read_buf.bytes() + sizeof(BlobHeader)), (size_t)header->user_key_size) - : std::string{}; - - uint8_t const* blob_bytes = read_buf.bytes() + header->data_offset; - uint8_t computed_hash[BlobHeader::blob_max_hash_len]{}; - home_obj_.compute_blob_payload_hash(header->hash_algorithm, blob_bytes, header->blob_size, - uintptr_cast(user_key.data()), header->user_key_size, computed_hash, - BlobHeader::blob_max_hash_len); - if (std::memcmp(computed_hash, header->hash, BlobHeader::blob_max_hash_len) != 0) { - LOGE("corrupted blob found, shardID=0x{:x}, pg={}, shard=0x{:x}, blob_id={}, hash mismatch header " - "[{}] [computed={:np}]", - shard_id, (shard_id >> homeobject::shard_width), (shard_id & homeobject::shard_mask), blob_id, - header->to_string(), spdlog::to_hex(computed_hash, computed_hash + BlobHeader::blob_max_hash_len)); + LOGE("Blob verification failed, shardID=0x{:x}, pg={}, shard=0x{:x}, blob_id={}", shard_id, + (shard_id >> homeobject::shard_width), (shard_id & homeobject::shard_mask), blob_id); return blob_read_result(blob_id, std::move(read_buf), ResyncBlobState::CORRUPTED); } diff --git a/src/lib/homestore_backend/replication_state_machine.cpp b/src/lib/homestore_backend/replication_state_machine.cpp index a508bad91..71d536cc0 100644 --- a/src/lib/homestore_backend/replication_state_machine.cpp +++ b/src/lib/homestore_backend/replication_state_machine.cpp @@ -976,27 +976,20 @@ void ReplicationStateMachine::handle_no_space_left(homestore::repl_lsn_t lsn, ho // 3 handling this error. for homeobject, we will submit an emergent gc task and wait for the completion. auto gc_mgr = home_object_->gc_manager(); - if (gc_mgr->is_started()) { - // FIXME:: there is a very corner case that when reaching this line, gc_mgr is stopped. fix this later. - gc_mgr->submit_gc_task(task_priority::emergent, chunk_id) - .via(&folly::InlineExecutor::instance()) - .thenValue([this, lsn, chunk_id](auto&& res) { - if (!res) { - LOGERROR("failed to submit emergent gc task for chunk_id={} , lsn={}, will retry again if new " - "no_space_left happens", - chunk_id, lsn); - } else { - LOGD("successfully handle no_space_left error for chunk_id={} , lsn={}", chunk_id, lsn); - } + gc_mgr->submit_gc_task(task_priority::emergent, chunk_id) + .via(&folly::InlineExecutor::instance()) + .thenValue([this, lsn, chunk_id](auto&& res) { + if (!res) { + LOGERROR("failed to submit emergent gc task for chunk_id={} , lsn={}, will retry again if new " + "no_space_left happens", + chunk_id, lsn); + } else { + LOGD("successfully handle no_space_left error for chunk_id={} , lsn={}", chunk_id, lsn); + } - // start accepting new requests again. - repl_dev()->resume_accepting_reqs(); - }); - } else { - // start accepting new requests again. - LOGD("gc manager is not started, skip handle no_space_left for chunk={}, lsn={}", chunk_id, lsn); - repl_dev()->resume_accepting_reqs(); - } + // start accepting new requests again. + repl_dev()->resume_accepting_reqs(); + }); } void ReplicationStateMachine::on_log_replay_done(const homestore::group_id_t& group_id) { diff --git a/src/lib/homestore_backend/resync_shard_data.fbs b/src/lib/homestore_backend/resync_shard_data.fbs index 5f4123f32..6fa761e32 100644 --- a/src/lib/homestore_backend/resync_shard_data.fbs +++ b/src/lib/homestore_backend/resync_shard_data.fbs @@ -11,6 +11,7 @@ table ResyncShardMetaData { last_modified_time : ulong; // shard last modify time total_capacity_bytes : ulong; // total capacity of the shard vchunk_id : uint16; // vchunk id + meta: [ubyte]; // serialized shard meta data } //ShardMetaData schema is the first batch(batch=0) in the shard transmission diff --git a/src/lib/homestore_backend/snapshot_receive_handler.cpp b/src/lib/homestore_backend/snapshot_receive_handler.cpp index 351b72d47..787581c5f 100644 --- a/src/lib/homestore_backend/snapshot_receive_handler.cpp +++ b/src/lib/homestore_backend/snapshot_receive_handler.cpp @@ -84,6 +84,10 @@ int HSHomeObject::SnapshotReceiveHandler::process_shard_snapshot_data(ResyncShar shard_sb->info.last_modified_time = shard_meta.last_modified_time(); shard_sb->info.available_capacity_bytes = shard_meta.total_capacity_bytes(); shard_sb->info.total_capacity_bytes = shard_meta.total_capacity_bytes(); + // Copy metadata into the fixed-size array (null-terminated C-style string) + if (shard_meta.meta() != nullptr && shard_meta.meta()->size() > 0) { + std::memcpy(shard_sb->info.meta, shard_meta.meta()->Data(), ShardInfo::meta_length); + } shard_sb->v_chunk_id = shard_meta.vchunk_id(); homestore::blk_alloc_hints hints; @@ -218,24 +222,9 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob // Check integrity of normal blobs if (blob->state() != static_cast< uint8_t >(ResyncBlobState::CORRUPTED)) { - auto header = r_cast< BlobHeader const* >(blob_data); - if (!header->valid()) { - std::unique_lock< std::shared_mutex > lock(ctx_->progress_lock); - ctx_->progress.error_count++; - LOGE("Invalid header found for blob_id={}: [header={}]", blob->blob_id(), header->to_string()); - return INVALID_BLOB_HEADER; - } - std::string user_key = header->user_key_size - ? std::string(r_cast< const char* >(blob_data + sizeof(BlobHeader)), header->user_key_size) - : std::string{}; - - uint8_t computed_hash[BlobHeader::blob_max_hash_len]{}; - home_obj_.compute_blob_payload_hash(header->hash_algorithm, blob_data + header->data_offset, - header->blob_size, uintptr_cast(user_key.data()), header->user_key_size, - computed_hash, BlobHeader::blob_max_hash_len); - if (std::memcmp(computed_hash, header->hash, BlobHeader::blob_max_hash_len) != 0) { - LOGE("Hash mismatch for blob_id={}: header [{}] [computed={:np}]", blob->blob_id(), header->to_string(), - spdlog::to_hex(computed_hash, computed_hash + BlobHeader::blob_max_hash_len)); + // Verify full blob (includes validation, shard_id check, and hash verification) + if (!home_obj_.verify_blob(blob_data, ctx_->shard_cursor, 0 /* no blob_id check */)) { + LOGE("Blob verification failed for blob_id={}", blob->blob_id()); std::unique_lock< std::shared_mutex > lock(ctx_->progress_lock); ctx_->progress.error_count++; return BLOB_DATA_CORRUPTED; diff --git a/src/lib/homestore_backend/tests/homeobj_fixture.hpp b/src/lib/homestore_backend/tests/homeobj_fixture.hpp index b1404793e..499968ab3 100644 --- a/src/lib/homestore_backend/tests/homeobj_fixture.hpp +++ b/src/lib/homestore_backend/tests/homeobj_fixture.hpp @@ -38,16 +38,18 @@ class HomeObjectFixture : public ::testing::Test { public: std::shared_ptr< homeobject::HSHomeObject > _obj_inst; - // Create blob size in range (1, 16kb) and user key in range (1, 1kb) - HomeObjectFixture() : rand_blob_size{1u, 16 * 1024}, rand_user_key_size{1u, 1024} {} + HomeObjectFixture() : + rand_blob_size{1u, 16 * 1024}, rand_user_key_size{1u, HSHomeObject::BlobHeader::max_user_key_length} {} void SetUp() override { IM_SETTINGS_FACTORY().modifiable_settings([](auto& s) { s.io_env.http_port = 5000 + g_helper->replica_num(); LOGD("setup http port to {}", s.io_env.http_port); }); + HSHomeObject::_hs_chunk_size = SISL_OPTIONS["chunk_size"].as< uint64_t >() * Mi; _obj_inst = std::dynamic_pointer_cast< HSHomeObject >(g_helper->build_new_homeobject()); + // Used to export metrics, it should be called after init_homeobject if (SISL_OPTIONS["enable_http"].as< bool >()) { g_helper->app->start_http_server(); } if (!g_helper->is_current_testcase_restarted()) { @@ -146,13 +148,13 @@ class HomeObjectFixture : public ::testing::Test { } } - ShardInfo create_shard(pg_id_t pg_id, uint64_t size_bytes) { + ShardInfo create_shard(pg_id_t pg_id, uint64_t size_bytes, std::string meta) { g_helper->sync(); if (!am_i_in_pg(pg_id)) return {}; // schedule create_shard only on leader auto tid = generateRandomTraceId(); run_on_pg_leader(pg_id, [&]() { - auto s = _obj_inst->shard_manager()->create_shard(pg_id, size_bytes, tid).get(); + auto s = _obj_inst->shard_manager()->create_shard(pg_id, size_bytes, meta, tid).get(); RELEASE_ASSERT(!!s, "failed to create shard"); auto ret = s.value(); g_helper->set_uint64_id(ret.id); @@ -403,6 +405,24 @@ class HomeObjectFixture : public ::testing::Test { return valid_blob_indexes.size(); } + void verify_shard_meta(std::map< pg_id_t, std::vector< shard_id_t > > const& pg_shard_id_vec) { + for (const auto& [pg_id, shard_vec] : pg_shard_id_vec) { + if (!am_i_in_pg(pg_id)) continue; + for (const auto& shard_id : shard_vec) { + auto iter = _obj_inst->_shard_map.find(shard_id); + ASSERT_TRUE(iter != _obj_inst->_shard_map.end()) + << "shard not found in local shard_map, shard_id " << shard_id << " replica number " + << g_helper->replica_num(); + auto meta_str = "shard meta:" + std::to_string(shard_id); + uint8_t expected_meta[ShardInfo::meta_length]{}; + memcpy(expected_meta, meta_str.c_str(), meta_str.length()); + expected_meta[meta_str.length()] = '\0'; + auto shard = iter->second->get(); + ASSERT_TRUE(std::memcmp(shard->info.meta, expected_meta, ShardInfo::meta_length) == 0); + } + } + } + // TODO:make this run in parallel void verify_get_blob(std::map< pg_id_t, std::vector< shard_id_t > > const& pg_shard_id_vec, uint64_t const num_blobs_per_shard, bool const use_random_offset = false, @@ -421,32 +441,46 @@ class HomeObjectFixture : public ::testing::Test { current_blob_id, tid); auto blob = build_blob(current_blob_id); len = blob.body.size(); + + bool allow_skip_verify = true; if (use_random_offset) { std::uniform_int_distribution< uint32_t > rand_off_gen{0u, len - 1u}; std::uniform_int_distribution< uint32_t > rand_len_gen{1u, len}; off = rand_off_gen(rnd_engine); len = rand_len_gen(rnd_engine); - if ((off + len) >= blob.body.size()) { len = blob.body.size() - off; } + if (off + len >= blob.body.size()) { len = blob.body.size() - off; } + + // randomly set allow_skip_verify to false to do full verification + std::uniform_int_distribution< uint32_t > bool_dist(0, 1); + allow_skip_verify = (bool)bool_dist(rnd_engine); } - auto g = _obj_inst->blob_manager()->get(shard_id, current_blob_id, off, len, tid).get(); + auto g = _obj_inst->blob_manager() + ->get(shard_id, current_blob_id, off, len, allow_skip_verify, tid) + .get(); while (wait_when_not_exist && g.hasError() && g.error().code == BlobErrorCode::UNKNOWN_BLOB) { LOGDEBUG("blob not exist at the moment, waiting for sync, shard {} blob {}", shard_id, current_blob_id); wait_for_blob(shard_id, current_blob_id); - g = _obj_inst->blob_manager()->get(shard_id, current_blob_id, off, len, tid).get(); + g = _obj_inst->blob_manager() + ->get(shard_id, current_blob_id, off, len, allow_skip_verify, tid) + .get(); } ASSERT_TRUE(!!g) << "get blob fail, shard_id " << shard_id << " blob_id " << current_blob_id << " replica number " << g_helper->replica_num(); auto result = std::move(g.value()); - LOGINFO("get blob pg={} shard {} blob {} off {} len {} data {}", pg_id, shard_id, current_blob_id, - off, len, hex_bytes(result.body.cbytes(), std::min(len, 10u))); + LOGINFO("get blob pg={} shard {} blob {} off {} len {} data {} allow_skip_verify {}", pg_id, + shard_id, current_blob_id, off, len, hex_bytes(result.body.cbytes(), std::min(len, 10u)), + allow_skip_verify); EXPECT_EQ(result.body.size(), len); EXPECT_EQ(std::memcmp(result.body.bytes(), blob.body.cbytes() + off, result.body.size()), 0); - EXPECT_EQ(result.user_key.size(), blob.user_key.size()); - EXPECT_EQ(blob.user_key, result.user_key); - EXPECT_EQ(blob.object_off, result.object_off); + // Only verify user_key and object_off when allow_skip_verify is false + if (!allow_skip_verify) { + EXPECT_EQ(result.user_key.size(), blob.user_key.size()); + EXPECT_EQ(blob.user_key, result.user_key); + EXPECT_EQ(blob.object_off, result.object_off); + } current_blob_id++; } } @@ -579,6 +613,7 @@ class HomeObjectFixture : public ::testing::Test { EXPECT_EQ(lhs.available_capacity_bytes, rhs.available_capacity_bytes); EXPECT_EQ(lhs.total_capacity_bytes, rhs.total_capacity_bytes); EXPECT_EQ(lhs.current_leader, rhs.current_leader); + EXPECT_TRUE(std::memcmp(lhs.meta, rhs.meta, ShardInfo::meta_length) == 0); } bool verify_start_replace_member_result(pg_id_t pg_id, std::string& task_id, peer_id_t out_member_id, @@ -828,8 +863,7 @@ class HomeObjectFixture : public ::testing::Test { auto blob = build_blob(blob_id); auto blob_size = blob.body.size(); - uint64_t actual_written_size{ - uint32_cast(sisl::round_up(sizeof(HSHomeObject::BlobHeader) + blob.user_key.size(), io_align))}; + uint64_t actual_written_size{uint32_cast(sisl::round_up(sizeof(HSHomeObject::BlobHeader), io_align))}; if (((r_cast< uintptr_t >(blob.body.cbytes()) % io_align) != 0) || ((blob_size % io_align) != 0)) { blob_size = sisl::round_up(blob_size, io_align); diff --git a/src/lib/homestore_backend/tests/homeobj_misc_tests.cpp b/src/lib/homestore_backend/tests/homeobj_misc_tests.cpp index 730be09e2..8238e4e69 100644 --- a/src/lib/homestore_backend/tests/homeobj_misc_tests.cpp +++ b/src/lib/homestore_backend/tests/homeobj_misc_tests.cpp @@ -7,7 +7,7 @@ TEST_F(HomeObjectFixture, HSHomeObjectCPTestBasic) { // Step-1: create a PG and a shard std::vector< std::pair< pg_id_t, shard_id_t > > pg_shard_id_vec; create_pg(1 /* pg_id */); - auto shard_info = create_shard(1 /* pg_id */, 64 * Mi); + auto shard_info = create_shard(1 /* pg_id */, 64 * Mi, "shard meta"); pg_shard_id_vec.emplace_back(1 /* pg_id */, shard_info.id); LOGINFO("pg={} shard {}", 1, shard_info.id); { @@ -51,7 +51,7 @@ TEST_F(HomeObjectFixture, PGBlobIterator) { auto& shard_list = pg_shard_id_vec[pg_id]; create_pg(pg_id); for (uint64_t i = 0; i < num_shards_per_pg; i++) { - auto shard = create_shard(pg_id, 64 * Mi); + auto shard = create_shard(pg_id, 64 * Mi, "shard meta" + std::to_string(i)); if (i != empty_shard_seq - 1) { shard_list.emplace_back(shard.id); } LOGINFO("pg={} shard {}", pg_id, shard.id); } @@ -144,6 +144,7 @@ TEST_F(HomeObjectFixture, PGBlobIterator) { ASSERT_EQ(shard_msg->created_time(), shard->info.created_time); ASSERT_EQ(shard_msg->last_modified_time(), shard->info.last_modified_time); ASSERT_EQ(shard_msg->total_capacity_bytes(), shard->info.total_capacity_bytes); + EXPECT_TRUE(std::memcmp(shard_msg->meta()->data(), shard->info.meta, ShardInfo::meta_length) == 0); // Verify blob data uint64_t packed_blob_size{0}; @@ -264,6 +265,9 @@ TEST_F(HomeObjectFixture, SnapshotReceiveHandler) { shard.last_modified_time = shard.created_time; shard.total_capacity_bytes = 1024 * Mi; shard.lsn = snp_lsn; + auto meta_str = "shard meta:" + std::to_string(i); + std::memcpy(shard.meta, meta_str.c_str(),meta_str.length()); + shard.meta[meta_str.size()] = '\0'; auto v_chunk_id = _obj_inst->chunk_selector()->get_most_available_blk_chunk(shard.id, pg_id); @@ -306,8 +310,7 @@ TEST_F(HomeObjectFixture, SnapshotReceiveHandler) { // Construct raw blob buffer auto blob = build_blob(cur_blob_id); - const auto aligned_hdr_size = - sisl::round_up(sizeof(HSHomeObject::BlobHeader) + blob.user_key.size(), io_align); + const auto aligned_hdr_size = sisl::round_up(sizeof(HSHomeObject::BlobHeader), _obj_inst->_data_block_size); sisl::io_blob_safe blob_raw(aligned_hdr_size + blob.body.size(), io_align); HSHomeObject::BlobHeader hdr; hdr.type = HSHomeObject::DataHeader::data_type_t::BLOB_INFO; @@ -318,18 +321,13 @@ TEST_F(HomeObjectFixture, SnapshotReceiveHandler) { hdr.user_key_size = blob.user_key.size(); hdr.object_offset = blob.object_off; hdr.data_offset = aligned_hdr_size; - _obj_inst->compute_blob_payload_hash(hdr.hash_algorithm, blob.body.cbytes(), blob.body.size(), - reinterpret_cast< uint8_t* >(blob.user_key.data()), - blob.user_key.size(), hdr.hash, + if (!blob.user_key.empty()) { std::memcpy(hdr.user_key, blob.user_key.data(), blob.user_key.size()); } + _obj_inst->compute_blob_payload_hash(hdr.hash_algorithm, blob.body.cbytes(), blob.body.size(), hdr.hash, HSHomeObject::BlobHeader::blob_max_hash_len); hdr.seal(); std::memcpy(blob_raw.bytes(), &hdr, sizeof(HSHomeObject::BlobHeader)); - if (!blob.user_key.empty()) { - std::memcpy((blob_raw.bytes() + sizeof(HSHomeObject::BlobHeader)), blob.user_key.data(), - blob.user_key.size()); - } - std::memcpy(blob_raw.bytes() + aligned_hdr_size, blob.body.cbytes(), blob.body.size()); + std::memcpy(blob_raw.bytes() + hdr.data_offset, blob.body.cbytes(), blob.body.size()); // Simulate blob data corruption - tamper with random bytes if (is_corrupted_batch || blob_state == ResyncBlobState::CORRUPTED) { diff --git a/src/lib/homestore_backend/tests/hs_blob_tests.cpp b/src/lib/homestore_backend/tests/hs_blob_tests.cpp index c48512bf8..3429a8719 100644 --- a/src/lib/homestore_backend/tests/hs_blob_tests.cpp +++ b/src/lib/homestore_backend/tests/hs_blob_tests.cpp @@ -11,7 +11,6 @@ TEST_F(HomeObjectFixture, BasicEquivalence) { EXPECT_EQ(_obj_inst.get(), dynamic_cast< homeobject::HomeObject* >(pg_mgr.get())); EXPECT_EQ(_obj_inst.get(), dynamic_cast< homeobject::HomeObject* >(blob_mgr.get())); } - TEST_F(HomeObjectFixture, BasicPutGetDelBlobWithRestart) { // test recovery with pristine state firstly restart(); @@ -29,7 +28,7 @@ TEST_F(HomeObjectFixture, BasicPutGetDelBlobWithRestart) { create_pg(i); pg_blob_id[i] = 0; for (uint64_t j = 0; j < num_shards_per_pg; j++) { - auto shard = create_shard(i, 64 * Mi); + auto shard = create_shard(i, 64 * Mi, "shard meta"); pg_shard_id_vec[i].emplace_back(shard.id); LOGINFO("pg={} shard {}", i, shard.id); } @@ -155,7 +154,7 @@ TEST_F(HomeObjectFixture, BasicPutGetDelBlobOnExistPGWithDiskLost) { create_pg(i); pg_blob_id[i] = 0; for (uint64_t j = 0; j < num_shards_per_pg; j++) { - auto shard = create_shard(i, 64 * Mi); + auto shard = create_shard(i, 64 * Mi, "shard meta"); pg_shard_id_vec[i].emplace_back(shard.id); LOGINFO("pg={} shard {}", i, shard.id); } @@ -226,7 +225,7 @@ TEST_F(HomeObjectFixture, BasicPutGetDelBlobWithDiskBack) { create_pg(i); pg_blob_id[i] = 0; for (uint64_t j = 0; j < num_shards_per_pg; j++) { - auto shard = create_shard(i, 64 * Mi); + auto shard = create_shard(i, 64 * Mi, "shard meta"); pg_shard_id_vec[i].emplace_back(shard.id); LOGINFO("pg={} shard {}", i, shard.id); } @@ -356,7 +355,7 @@ TEST_F(HomeObjectFixture, BasicPutGetDelOnAllPGWithDiskLost) { create_pg(i); pg_blob_id[i] = 0; for (uint64_t j = 0; j < num_shards_per_pg; j++) { - auto shard = create_shard(i, 64 * Mi); + auto shard = create_shard(i, 64 * Mi, "shard meta"); pg_shard_id_vec[i].emplace_back(shard.id); LOGINFO("pg={} shard {}", i, shard.id); } @@ -390,7 +389,7 @@ TEST_F(HomeObjectFixture, BasicPutGetDelOnAllPGWithDiskLost) { current_blob_id, tid); auto blob = build_blob(current_blob_id); len = blob.body.size(); - auto g = _obj_inst->blob_manager()->get(shard_id, current_blob_id, off, len, tid).get(); + auto g = _obj_inst->blob_manager()->get(shard_id, current_blob_id, off, len, false, tid).get(); ASSERT_TRUE(g.hasError()) << "degraded pg on error member should return get blob fail, shard_id " << shard_id << " blob_id " << current_blob_id << " replica number " << g_helper->replica_num(); @@ -436,7 +435,7 @@ TEST_F(HomeObjectFixture, DeleteNonExistBlob) { // create a pg with one shard const auto pg_id = 1; create_pg(pg_id); - const auto shard_id = create_shard(1, 64 * Mi).id; + const auto shard_id = create_shard(1, 64 * Mi, "shard meta").id; verify_obj_count(1, 1, 0, false /* deleted */); // do not put any blob to exercise deleting a non-exist blob, everything should goes well @@ -465,7 +464,7 @@ TEST_F(HomeObjectFixture, BasicPutGetBlobWithPushDataDisabled) { create_pg(i); pg_blob_id[i] = 0; for (uint64_t j = 0; j < num_shards_per_pg; j++) { - auto shard = create_shard(i, 64 * Mi); + auto shard = create_shard(i, 64 * Mi, "shard meta"); pg_shard_id_vec[i].emplace_back(shard.id); LOGINFO("pg={} shard {}", i, shard.id); } diff --git a/src/lib/homestore_backend/tests/hs_gc_tests.cpp b/src/lib/homestore_backend/tests/hs_gc_tests.cpp index 7c4f61e2d..736c5d697 100644 --- a/src/lib/homestore_backend/tests/hs_gc_tests.cpp +++ b/src/lib/homestore_backend/tests/hs_gc_tests.cpp @@ -30,7 +30,11 @@ TEST_F(HomeObjectFixture, BasicGC) { // create a shard for each chunk for (const auto& [pg_id, chunk_num] : pg_chunk_nums) { for (uint64_t j = 0; j < chunk_num; j++) { - auto shard = create_shard(pg_id, 64 * Mi); + auto shard_seq = i * chunk_num + j + 1; + auto derived_shard_id = make_new_shard_id(pg_id, shard_seq); // shard id start from 1 + auto shard = create_shard(pg_id, 64 * Mi, "shard meta:" + std::to_string(derived_shard_id)); + LOGINFO("create shard pg={} shard {} in chunk {}", pg_id, shard.id, j); + ASSERT_EQ(derived_shard_id, shard.id); pg_open_shard_id_vec[pg_id].emplace_back(shard.id); pg_shard_id_vec[pg_id].emplace_back(shard.id); } @@ -154,7 +158,7 @@ TEST_F(HomeObjectFixture, BasicGC) { } } verify_shard_blobs(remaining_shard_blobs); - + verify_shard_meta(pg_shard_id_vec); // check vchunk to pchunk for every pg for (const auto& [pg_id, shard_vec] : pg_shard_id_vec) { // after half blobs have been deleted, the tombstone indexes(half of the total blobs) have been removed by gc @@ -332,7 +336,7 @@ TEST_F(HomeObjectFixture, HandlingNoSpaceLeft) { // create a shard for each chunk for (const auto& [pg_id, chunk_num] : pg_chunk_nums) { for (uint64_t j = 0; j < chunk_num; j++) { - auto shard = create_shard(pg_id, 64 * Mi); + auto shard = create_shard(pg_id, 64 * Mi, "shard meta"); pg_open_shard_id_vec[pg_id].emplace_back(shard.id); } } @@ -441,7 +445,7 @@ void HomeObjectFixture::EmergentGC(bool with_crash_recovery) { // create a shard for each chunk for (const auto& [pg_id, chunk_num] : pg_chunk_nums) { for (uint64_t j = 0; j < chunk_num; j++) { - auto shard = create_shard(pg_id, 64 * Mi); + auto shard = create_shard(pg_id, 64 * Mi, "shard meta"); pg_open_shard_id_vec[pg_id].emplace_back(shard.id); pg_shard_id_vec[pg_id].emplace_back(shard.id); } diff --git a/src/lib/homestore_backend/tests/hs_pg_tests.cpp b/src/lib/homestore_backend/tests/hs_pg_tests.cpp index 6ad17654c..a1a9d6f04 100644 --- a/src/lib/homestore_backend/tests/hs_pg_tests.cpp +++ b/src/lib/homestore_backend/tests/hs_pg_tests.cpp @@ -6,7 +6,7 @@ TEST_F(HomeObjectFixture, PGStatsTest) { // Create a pg, shard, put blob should succeed. pg_id_t pg_id{1}; create_pg(pg_id); - auto shard_info = create_shard(pg_id, 64 * Mi); + auto shard_info = create_shard(pg_id, 64 * Mi, "shard meta"); auto shard_id = shard_info.id; auto s = _obj_inst->shard_manager()->get_shard(shard_id).get(); ASSERT_TRUE(!!s); @@ -26,7 +26,7 @@ TEST_F(HomeObjectFixture, PGStatsTest) { LOGINFO("Sealed shard {}", shard_id); // create a 2nd shard - auto shard_info2 = create_shard(pg_id, 64 * Mi); + auto shard_info2 = create_shard(pg_id, 64 * Mi, "shard meta"); auto shard_id2 = shard_info2.id; auto s2 = _obj_inst->shard_manager()->get_shard(shard_id2).get(); ASSERT_TRUE(!!s2); diff --git a/src/lib/homestore_backend/tests/hs_shard_tests.cpp b/src/lib/homestore_backend/tests/hs_shard_tests.cpp index c2e057488..8b68aee8d 100644 --- a/src/lib/homestore_backend/tests/hs_shard_tests.cpp +++ b/src/lib/homestore_backend/tests/hs_shard_tests.cpp @@ -3,8 +3,8 @@ TEST_F(HomeObjectFixture, CreateMultiShards) { pg_id_t pg_id{1}; create_pg(pg_id); - auto _shard_1 = create_shard(pg_id, 64 * Mi); - auto _shard_2 = create_shard(pg_id, 64 * Mi); + auto _shard_1 = create_shard(pg_id, 64 * Mi, "shard meta"); + auto _shard_2 = create_shard(pg_id, 64 * Mi, "shard meta"); auto chunk_num_1 = _obj_inst->get_shard_p_chunk_id(_shard_1.id); ASSERT_TRUE(chunk_num_1.has_value()); @@ -33,14 +33,14 @@ TEST_F(HomeObjectFixture, CreateMultiShardsOnMultiPG) { } for (const auto pg : pgs) { - auto shard_info = create_shard(pg, Mi); + auto shard_info = create_shard(pg, Mi, "shard meta"); auto p_chunk_ID1 = _obj_inst->get_shard_p_chunk_id(shard_info.id); auto v_chunk_ID1 = _obj_inst->get_shard_v_chunk_id(shard_info.id); ASSERT_TRUE(p_chunk_ID1.has_value()); ASSERT_TRUE(v_chunk_ID1.has_value()); // create another shard again. - shard_info = create_shard(pg, Mi); + shard_info = create_shard(pg, Mi, "shard meta"); auto p_chunk_ID2 = _obj_inst->get_shard_p_chunk_id(shard_info.id); auto v_chunk_ID2 = _obj_inst->get_shard_v_chunk_id(shard_info.id); ASSERT_TRUE(p_chunk_ID2.has_value()); @@ -64,7 +64,7 @@ TEST_F(HomeObjectFixture, CreateMultiShardsOnMultiPG) { TEST_F(HomeObjectFixture, SealShard) { pg_id_t pg_id{1}; create_pg(pg_id); - auto shard_info = create_shard(pg_id, 64 * Mi); + auto shard_info = create_shard(pg_id, 64 * Mi, "shard meta"); ASSERT_EQ(ShardInfo::State::OPEN, shard_info.state); // seal the shard @@ -77,7 +77,7 @@ TEST_F(HomeObjectFixture, SealShard) { // create shard until no space left, we have 5 chunks in one pg. for (auto i = 0; i < 5; i++) { - shard_info = create_shard(pg_id, 64 * Mi); + shard_info = create_shard(pg_id, 64 * Mi, "shard meta"); ASSERT_EQ(ShardInfo::State::OPEN, shard_info.state); } @@ -86,7 +86,7 @@ TEST_F(HomeObjectFixture, SealShard) { // expect to create shard failed run_on_pg_leader(pg_id, [&]() { - auto s = _obj_inst->shard_manager()->create_shard(pg_id, 64 * Mi).get(); + auto s = _obj_inst->shard_manager()->create_shard(pg_id, 64 * Mi, "shard meta").get(); ASSERT_TRUE(s.hasError()); ASSERT_EQ(ShardErrorCode::NO_SPACE_LEFT, s.error().getCode()); }); @@ -110,7 +110,7 @@ TEST_F(HomeObjectFixture, SealShard) { shard_info = seal_shard(shard_info.id); ASSERT_EQ(ShardInfo::State::SEALED, shard_info.state); - shard_info = create_shard(pg_id, 64 * Mi); + shard_info = create_shard(pg_id, 64 * Mi, "shard meta"); ASSERT_EQ(ShardInfo::State::OPEN, shard_info.state); shard_info = seal_shard(shard_info.id); @@ -122,7 +122,7 @@ TEST_F(HomeObjectFixture, ShardManagerRecovery) { create_pg(pg_id); // create one shard; - auto shard_info = create_shard(pg_id, Mi); + auto shard_info = create_shard(pg_id, Mi, "shard meta");; auto shard_id = shard_info.id; EXPECT_EQ(ShardInfo::State::OPEN, shard_info.state); EXPECT_EQ(Mi, shard_info.total_capacity_bytes); @@ -162,7 +162,7 @@ TEST_F(HomeObjectFixture, ShardManagerRecovery) { EXPECT_EQ(1, pg_result->shard_sequence_num_); // re-create new shards on this pg works too even homeobject is restarted twice. - auto new_shard_info = create_shard(pg_id, Mi); + auto new_shard_info = create_shard(pg_id, Mi, "shard meta");; EXPECT_NE(shard_id, new_shard_info.id); EXPECT_EQ(ShardInfo::State::OPEN, new_shard_info.state); @@ -177,7 +177,7 @@ TEST_F(HomeObjectFixture, SealedShardRecovery) { create_pg(pg_id); // create one shard and seal it. - auto shard_info = create_shard(pg_id, Mi); + auto shard_info = create_shard(pg_id, Mi, "shard meta");; auto shard_id = shard_info.id; shard_info = seal_shard(shard_id); EXPECT_EQ(ShardInfo::State::SEALED, shard_info.state); @@ -210,7 +210,7 @@ TEST_F(HomeObjectFixture, SealShardWithRestart) { pg_id_t pg_id{1}; create_pg(pg_id); - auto shard_info = create_shard(pg_id, 64 * Mi); + auto shard_info = create_shard(pg_id, 64 * Mi, "shard meta"); auto shard_id = shard_info.id; auto s = _obj_inst->shard_manager()->get_shard(shard_id).get(); ASSERT_TRUE(!!s); @@ -263,7 +263,7 @@ TEST_F(HomeObjectFixture, CreateShardOnDiskLostMemeber) { create_pg(degrade_pg_id); std::map< pg_id_t, shard_id_t > pg_shard_id_map; for (int i = 1; i <= 2; i++) { - auto shard_info = create_shard(i, 64 * Mi); + auto shard_info = create_shard(i, 64 * Mi, "shard meta"); ASSERT_EQ(ShardInfo::State::OPEN, shard_info.state); pg_shard_id_map[i] = shard_info.id; LOGINFO("pg={} shard {}", i, shard_info.id); @@ -284,7 +284,7 @@ TEST_F(HomeObjectFixture, CreateShardOnDiskLostMemeber) { << " replica number " << g_helper->replica_num(); tid = generateRandomTraceId(); - s = _obj_inst->shard_manager()->create_shard(degrade_pg_id, 64 * Mi, tid).get(); + s = _obj_inst->shard_manager()->create_shard(degrade_pg_id, 64 * Mi, "shard meta", tid).get(); ASSERT_TRUE(s.hasError()) << "degraded pg on error member should return create shard fail, pg_id " << degrade_pg_id << " replica number " << g_helper->replica_num(); } else { @@ -293,3 +293,191 @@ TEST_F(HomeObjectFixture, CreateShardOnDiskLostMemeber) { } g_helper->sync(); } + +TEST_F(HomeObjectFixture, ShardVersionMigrationRecovery) { + // Test migration with mixed versions (v1 and v2) to simulate partial migration or interrupted upgrade + // Also tests the actual bug scenario where v1 shards had incorrect type field + pg_id_t pg_id{1}; + create_pg(pg_id); + + // Create multiple shards with current version + LOGINFO("Creating 5 test shards..."); + std::vector< ShardInfo > shard_infos; + std::vector< shard_id_t > shard_ids; + for (int i = 0; i < 5; ++i) { + auto shard_info = create_shard(pg_id, Mi, fmt::format("test_migration_{}", i)); + shard_infos.push_back(shard_info); + shard_ids.push_back(shard_info.id); + EXPECT_EQ(ShardInfo::State::OPEN, shard_info.state); + } + + // Get the PG and verify all shards are at current version + auto pg_result = _obj_inst->get_hs_pg(pg_id); + EXPECT_TRUE(pg_result != nullptr); + EXPECT_EQ(5, pg_result->shards_.size()); + + // Save original data for all shards - use map keyed by shard_id since recovery order is not guaranteed + struct ShardOriginalData { + ShardInfo info; + homestore::chunk_num_t p_chunk_id; + homestore::chunk_num_t v_chunk_id; + bool should_be_v1; // Track which shards should be created as v1 + }; + std::unordered_map< shard_id_t, ShardOriginalData > original_data_map; + + // Mark which shards should be v1 vs v2 (using alternating pattern for variety) + // shard_ids[0], [2], [4] -> v1 (needs migration) + // shard_ids[1], [3] -> v2 (already migrated) + std::set v1_shard_ids = {shard_ids[0], shard_ids[2], shard_ids[4]}; + + for (auto& shard : pg_result->shards_) { + auto hs_shard = d_cast< HSHomeObject::HS_Shard* >(shard.get()); + EXPECT_EQ(0x02, hs_shard->sb_->version); + EXPECT_EQ(0x02, hs_shard->sb_->sb_version); + + auto shard_id = hs_shard->sb_->info.id; + original_data_map[shard_id] = {hs_shard->sb_->info, hs_shard->sb_->p_chunk_id, hs_shard->sb_->v_chunk_id, + v1_shard_ids.contains(shard_id)}; + + // Destroy all current superblks + hs_shard->sb_.destroy(); + } + + // Simulate partial migration: create shards with mixed versions + LOGINFO("Creating mixed version shards (3 v1 shards, 2 v2 shards)..."); + + for (const auto& [shard_id, orig_data] : original_data_map) { + if (orig_data.should_be_v1) { + // Create v1 shard (needs migration) + homestore::superblk< HSHomeObject::v1_shard_info_superblk > old_sb("ShardManager"); + old_sb.create(sizeof(HSHomeObject::v1_shard_info_superblk)); + old_sb->magic = HSHomeObject::DataHeader::data_header_magic; + old_sb->version = 0x01; // Old version + // Simulate the actual bug: first v1 shard has incorrect type field set to BLOB_INFO + if (shard_id == shard_ids[0]) { + old_sb->type = HSHomeObject::DataHeader::data_type_t::BLOB_INFO; // Bug scenario + LOGINFO("Created v1 shard {} with BLOB_INFO type (bug scenario)", shard_id); + } else { + LOGINFO("Created v1 shard {}", shard_id); + } + // Convert v2 ShardInfo to v1_ShardInfo (v1 doesn't have meta field) + old_sb->info.id = orig_data.info.id; + old_sb->info.placement_group = orig_data.info.placement_group; + old_sb->info.state = orig_data.info.state; + old_sb->info.lsn = orig_data.info.lsn; + old_sb->info.created_time = orig_data.info.created_time; + old_sb->info.last_modified_time = orig_data.info.last_modified_time; + old_sb->info.available_capacity_bytes = orig_data.info.available_capacity_bytes; + old_sb->info.total_capacity_bytes = orig_data.info.total_capacity_bytes; + old_sb->info.current_leader = orig_data.info.current_leader; + // Note: v1 doesn't have meta field, so we don't copy it + old_sb->p_chunk_id = orig_data.p_chunk_id; + old_sb->v_chunk_id = orig_data.v_chunk_id; + old_sb.write(); + } else { + // Create v2 shard (already migrated) + homestore::superblk< HSHomeObject::shard_info_superblk > new_sb("ShardManager"); + new_sb.create(sizeof(HSHomeObject::shard_info_superblk)); + new_sb->magic = HSHomeObject::DataHeader::data_header_magic; + new_sb->version = 0x02; // New version + new_sb->type = HSHomeObject::DataHeader::data_type_t::SHARD_INFO; + new_sb->sb_version = 0x02; + new_sb->info = orig_data.info; + new_sb->p_chunk_id = orig_data.p_chunk_id; + new_sb->v_chunk_id = orig_data.v_chunk_id; + new_sb.write(); + LOGINFO("Created v2 shard {}", shard_id); + } + } + + auto old_size = sizeof(HSHomeObject::v1_shard_info_superblk); + auto new_size = sizeof(HSHomeObject::shard_info_superblk); + LOGINFO("Setup complete - old size={}, new size={}", old_size, new_size); + + // Restart homeobject - this should migrate only v1 shards + LOGINFO("Restarting homeobject to trigger migration..."); + restart(); + + // Verify all shards are now at v2 + pg_result = _obj_inst->get_hs_pg(pg_id); + EXPECT_TRUE(pg_result != nullptr); + EXPECT_EQ(5, pg_result->shards_.size()); + + LOGINFO("Verifying all shards after migration..."); + for (auto& shard : pg_result->shards_) { + auto hs_shard = d_cast< HSHomeObject::HS_Shard* >(shard.get()); + auto shard_id = hs_shard->sb_->info.id; + + // Look up the original data for this shard + auto it = original_data_map.find(shard_id); + ASSERT_NE(it, original_data_map.end()) << "Shard " << shard_id << " not found in original data"; + const auto& orig_data = it->second; + + // All shards should now be at v2 + EXPECT_EQ(0x02, hs_shard->sb_->version) << "Shard " << shard_id << " version should be 0x02"; + EXPECT_EQ(0x02, hs_shard->sb_->sb_version) << "Shard " << shard_id << " sb_version should be 0x02"; + + // Verify all shard data was preserved + EXPECT_EQ(HSHomeObject::DataHeader::data_header_magic, hs_shard->sb_->magic); + EXPECT_EQ(HSHomeObject::DataHeader::data_type_t::SHARD_INFO, hs_shard->sb_->type); + EXPECT_EQ(orig_data.info.id, hs_shard->sb_->info.id); + EXPECT_EQ(orig_data.info.placement_group, hs_shard->sb_->info.placement_group); + EXPECT_EQ(orig_data.info.state, hs_shard->sb_->info.state); + EXPECT_EQ(orig_data.p_chunk_id, hs_shard->sb_->p_chunk_id); + EXPECT_EQ(orig_data.v_chunk_id, hs_shard->sb_->v_chunk_id); + + LOGINFO("Shard {} verified successfully (was {})", shard_id, orig_data.should_be_v1 ? "v1" : "v2"); + } + + // Verify all shards are still functional + LOGINFO("Verifying shard functionality..."); + for (size_t i = 0; i < shard_ids.size(); ++i) { + auto s = _obj_inst->shard_manager()->get_shard(shard_ids[i]).get(); + ASSERT_TRUE(!!s) << "Shard " << i << " should be accessible"; + EXPECT_EQ(shard_ids[i], s.value().id); + EXPECT_EQ(ShardInfo::State::OPEN, s.value().state); + } + + // Seal some v1 shards to verify they still work after migration + LOGINFO("Sealing two v1 shards to verify post-migration functionality..."); + auto sealed_shard_0 = seal_shard(shard_ids[0]); + EXPECT_EQ(ShardInfo::State::SEALED, sealed_shard_0.state); + + auto sealed_shard_2 = seal_shard(shard_ids[2]); + EXPECT_EQ(ShardInfo::State::SEALED, sealed_shard_2.state); + + // Track which shards were sealed for later verification + std::set sealed_shard_ids = {shard_ids[0], shard_ids[2]}; + + LOGINFO("Mixed version migration test completed - 3 shards migrated from v1, 2 shards already at v2"); + + // Restart again to verify the migration was persisted to disk + LOGINFO("Second restart to verify persistence..."); + restart(); + + // Verify all migrated versions persist after restart + pg_result = _obj_inst->get_hs_pg(pg_id); + EXPECT_TRUE(pg_result != nullptr); + EXPECT_EQ(5, pg_result->shards_.size()); + + LOGINFO("Verifying all shards after second restart..."); + for (auto& shard : pg_result->shards_) { + auto hs_shard = d_cast< HSHomeObject::HS_Shard* >(shard.get()); + auto shard_id = hs_shard->sb_->info.id; + + EXPECT_EQ(0x02, hs_shard->sb_->version) << "Shard " << shard_id << " should still be v2 after restart"; + EXPECT_EQ(0x02, hs_shard->sb_->sb_version) << "Shard " << shard_id << " sb_version should still be 0x02"; + + // Verify sealed shards remain sealed + if (sealed_shard_ids.contains(shard_id)) { + EXPECT_EQ(ShardInfo::State::SEALED, hs_shard->sb_->info.state) + << "Shard " << shard_id << " should remain sealed"; + } else { + EXPECT_EQ(ShardInfo::State::OPEN, hs_shard->sb_->info.state) + << "Shard " << shard_id << " should remain open"; + } + } + + LOGINFO("Verified migration persisted to disk - all {} shards remain at v2 after second restart", + pg_result->shards_.size()); +} diff --git a/src/lib/homestore_backend/tests/test_homestore_backend_dynamic.cpp b/src/lib/homestore_backend/tests/test_homestore_backend_dynamic.cpp index 38d283058..c89002d89 100644 --- a/src/lib/homestore_backend/tests/test_homestore_backend_dynamic.cpp +++ b/src/lib/homestore_backend/tests/test_homestore_backend_dynamic.cpp @@ -149,7 +149,7 @@ void HomeObjectFixture::RestartFollowerDuringBaselineResyncUsingSigKill(uint64_t std::string task_id = "task_id"; if (!is_restart) { for (uint64_t j = 0; j < num_shards_per_pg; j++) - create_shard(pg_id, 64 * Mi); + create_shard(pg_id, 64 * Mi, "shard meta:" + std::to_string(pg_shard_id_vec[pg_id][j])); // put and verify blobs in the pg, excluding the spare replicas put_blobs(pg_shard_id_vec, num_blobs_per_shard, pg_blob_id, true, true); @@ -225,6 +225,7 @@ void HomeObjectFixture::RestartFollowerDuringBaselineResyncUsingSigKill(uint64_t } run_if_in_pg(pg_id, [&]() { wait_for_blob(last_shard, last_blob); + verify_shard_meta(pg_shard_id_vec); // 1st round blobs verify_get_blob(pg_shard_id_vec, num_blobs_per_shard, false, true); // 2nd round blobs @@ -300,7 +301,7 @@ TEST_F(HomeObjectFixture, RestartFollowerDuringBaselineResyncUsingGracefulShutdo #endif for (uint64_t j = 0; j < num_shards_per_pg; j++) - create_shard(pg_id, 64 * Mi); + create_shard(pg_id, 64 * Mi, "shard meta:" + std::to_string(pg_shard_id_vec[pg_id][j])); // put and verify blobs in the pg, excluding the spare replicas put_blobs(pg_shard_id_vec, num_blobs_per_shard, pg_blob_id, true, true); @@ -336,6 +337,7 @@ TEST_F(HomeObjectFixture, RestartFollowerDuringBaselineResyncUsingGracefulShutdo LOGINFO("restart, wait for data replication") if (in_member_id == g_helper->my_replica_id()) { wait_for_blob(last_shard, last_blob); + verify_shard_meta(pg_shard_id_vec); verify_get_blob(pg_shard_id_vec, num_blobs_per_shard); verify_obj_count(1, num_shards_per_pg, num_blobs_per_shard, false); } @@ -388,10 +390,6 @@ void HomeObjectFixture::ReplaceMember(bool withGC) { auto num_shards_per_pg = SISL_OPTIONS["num_shards"].as< uint64_t >(); auto num_blobs_per_shard = SISL_OPTIONS["num_blobs"].as< uint64_t >() / num_shards_per_pg; - // last shard is empty shard - for (uint64_t j = 0; j < num_shards_per_pg + 1; j++) - create_shard(pg_id, 64 * Mi); - // we can not share all the shard_id and blob_id among all the replicas including the spare ones, so we need to // derive them by calculating. // since shard_id = pg_id + shard_sequence_num, so we can derive shard_ids for all the shards in this pg, and these @@ -403,6 +401,12 @@ void HomeObjectFixture::ReplaceMember(bool withGC) { pg_shard_id_vec[pg_id].emplace_back(derived_shard_id); } + // create shards [1, num_shards_per_pg + 1]. The last shard is an empty shard + for (uint64_t j = 0; j < num_shards_per_pg + 1; j++) { + auto derived_shard_id = make_new_shard_id(pg_id, j + 1); + create_shard(pg_id, 64 * Mi, "shard meta:" + std::to_string(derived_shard_id)); + } + // TODO:: if we add delete blobs case in baseline resync, we need also derive the last blob_id in this pg for spare // replicas pg_blob_id[pg_id] = 0; @@ -523,6 +527,7 @@ void HomeObjectFixture::ReplaceMember(bool withGC) { // Step 6: restart, verify the blobs again on all members, including the new spare replica, and out_member restart(); run_if_in_pg(pg_id, [&]() { + verify_shard_meta(pg_shard_id_vec); verify_get_blob(pg_shard_id_vec, num_blobs_per_shard); verify_obj_count(1, num_shards_per_pg, num_blobs_per_shard, false); LOGINFO("After restart, check pg related data in pg members successfully"); @@ -631,7 +636,7 @@ void HomeObjectFixture::RestartLeaderDuringBaselineResyncUsingSigKill(uint64_t f // ========Stage 1: Create a pg without spare replicas and put blobs======== for (uint64_t j = 0; j < num_shards_per_pg; j++) - create_shard(pg_id, 64 * Mi); + create_shard(pg_id, 64 * Mi, "shard meta:" + std::to_string(pg_shard_id_vec[pg_id][j])); // put and verify blobs in the pg, excluding the spare replicas put_blobs(pg_shard_id_vec, num_blobs_per_shard, pg_blob_id, true, true); @@ -720,6 +725,7 @@ void HomeObjectFixture::RestartLeaderDuringBaselineResyncUsingSigKill(uint64_t f verify_get_blob(pg_shard_id_vec, num_blobs_per_shard, false, true); // // 2nd round blobs pg_blob_id[pg_id] = num_blobs_per_shard * num_shards_per_pg; + verify_shard_meta(pg_shard_id_vec); verify_get_blob(pg_shard_id_vec, 1, false, true, pg_blob_id); verify_obj_count(1, num_shards_per_pg, num_blobs_per_shard + 1, false); }); @@ -799,7 +805,7 @@ TEST_F(HomeObjectFixture, RollbackReplaceMember) { #endif for (uint64_t j = 0; j < num_shards_per_pg; j++) - create_shard(pg_id, 64 * Mi); + create_shard(pg_id, 64 * Mi, "shard meta:" + std::to_string(pg_shard_id_vec[pg_id][j])); // put and verify blobs in the pg, excluding the spare replicas put_blobs(pg_shard_id_vec, num_blobs_per_shard, pg_blob_id, true, true); diff --git a/src/lib/memory_backend/mem_blob_manager.cpp b/src/lib/memory_backend/mem_blob_manager.cpp index 045a1ea7d..e41d12809 100644 --- a/src/lib/memory_backend/mem_blob_manager.cpp +++ b/src/lib/memory_backend/mem_blob_manager.cpp @@ -39,9 +39,10 @@ BlobManager::AsyncResult< blob_id_t > MemoryHomeObject::_put_blob(ShardInfo cons // Lookup BlobExt and duplicate underyling Blob for user; only *safe* because we defer GC. BlobManager::AsyncResult< Blob > MemoryHomeObject::_get_blob(ShardInfo const& _shard, blob_id_t _blob, uint64_t off, - uint64_t len, trace_id_t tid) const { + uint64_t len, bool allow_skip_verify, trace_id_t tid) const { (void)off; (void)len; + (void)allow_skip_verify; (void)tid; WITH_SHARD WITH_ROUTE(_blob) diff --git a/src/lib/memory_backend/mem_homeobject.hpp b/src/lib/memory_backend/mem_homeobject.hpp index e6704953c..c3fb5025d 100644 --- a/src/lib/memory_backend/mem_homeobject.hpp +++ b/src/lib/memory_backend/mem_homeobject.hpp @@ -37,13 +37,13 @@ class MemoryHomeObject : public HomeObjectImpl { /// Helpers // ShardManager - ShardManager::AsyncResult< ShardInfo > _create_shard(pg_id_t, uint64_t size_bytes, trace_id_t tid) override; + ShardManager::AsyncResult< ShardInfo > _create_shard(pg_id_t, uint64_t size_bytes, std::string meta, trace_id_t tid) override; ShardManager::AsyncResult< ShardInfo > _seal_shard(ShardInfo const&, trace_id_t tid) override; // BlobManager BlobManager::AsyncResult< blob_id_t > _put_blob(ShardInfo const&, Blob&&, trace_id_t tid) override; BlobManager::AsyncResult< Blob > _get_blob(ShardInfo const&, blob_id_t, uint64_t off, uint64_t len, - trace_id_t tid) const override; + bool allow_skip_verify, trace_id_t tid) const override; BlobManager::NullAsyncResult _del_blob(ShardInfo const&, blob_id_t, trace_id_t tid) override; /// diff --git a/src/lib/memory_backend/mem_shard_manager.cpp b/src/lib/memory_backend/mem_shard_manager.cpp index 80c791725..8566408bd 100644 --- a/src/lib/memory_backend/mem_shard_manager.cpp +++ b/src/lib/memory_backend/mem_shard_manager.cpp @@ -6,7 +6,7 @@ namespace homeobject { uint64_t ShardManager::max_shard_size() { return Gi; } -ShardManager::AsyncResult< ShardInfo > MemoryHomeObject::_create_shard(pg_id_t pg_owner, uint64_t size_bytes, +ShardManager::AsyncResult< ShardInfo > MemoryHomeObject::_create_shard(pg_id_t pg_owner, uint64_t size_bytes, std::string meta, trace_id_t tid) { (void)tid; auto const now = get_current_timestamp(); diff --git a/src/lib/shard_manager.cpp b/src/lib/shard_manager.cpp index e46c99f1e..a6626cfb0 100644 --- a/src/lib/shard_manager.cpp +++ b/src/lib/shard_manager.cpp @@ -4,14 +4,12 @@ namespace homeobject { std::shared_ptr< ShardManager > HomeObjectImpl::shard_manager() { return shared_from_this(); } -ShardManager::AsyncResult< ShardInfo > HomeObjectImpl::create_shard(pg_id_t pg_owner, uint64_t size_bytes, +ShardManager::AsyncResult< ShardInfo > HomeObjectImpl::create_shard(pg_id_t pg_owner, uint64_t size_bytes, std::string meta, trace_id_t tid) { - if (0 == size_bytes || max_shard_size() < size_bytes) - return folly::makeUnexpected(ShardError(ShardErrorCode::INVALID_ARG)); - return _defer().thenValue( - [this, pg_owner, size_bytes, tid](auto) mutable -> ShardManager::AsyncResult< ShardInfo > { - return _create_shard(pg_owner, size_bytes, tid); - }); + if (0 == size_bytes || max_shard_size() < size_bytes) return folly::makeUnexpected(ShardError(ShardErrorCode::INVALID_ARG)); + return _defer().thenValue([this, pg_owner, size_bytes, meta, tid](auto) mutable -> ShardManager::AsyncResult< ShardInfo > { + return _create_shard(pg_owner, size_bytes, meta, tid); + }); } ShardManager::AsyncResult< InfoList > HomeObjectImpl::list_shards(pg_id_t pgid, trace_id_t tid) const { diff --git a/src/lib/tests/ShardManagerTest.cpp b/src/lib/tests/ShardManagerTest.cpp index 565c92e1c..1c1dc56a1 100644 --- a/src/lib/tests/ShardManagerTest.cpp +++ b/src/lib/tests/ShardManagerTest.cpp @@ -9,7 +9,7 @@ using homeobject::ShardInfo; TEST_F(TestFixture, CreateShardTooBig) { EXPECT_EQ(ShardErrorCode::INVALID_ARG, homeobj_->shard_manager() - ->create_shard(_pg_id, homeobject::ShardManager::max_shard_size() + 1) + ->create_shard(_pg_id, homeobject::ShardManager::max_shard_size() + 1, "shard meta") .get() .error() .getCode()); @@ -17,12 +17,12 @@ TEST_F(TestFixture, CreateShardTooBig) { TEST_F(TestFixture, CreateShardTooSmall) { EXPECT_EQ(ShardErrorCode::INVALID_ARG, - homeobj_->shard_manager()->create_shard(_pg_id, 0ul).get().error().getCode()); + homeobj_->shard_manager()->create_shard(_pg_id, 0ul, "shard meta").get().error().getCode()); } TEST_F(TestFixture, CreateShardNoPg) { EXPECT_EQ(ShardErrorCode::UNKNOWN_PG, - homeobj_->shard_manager()->create_shard(_pg_id + 1, Mi).get().error().getCode()); + homeobj_->shard_manager()->create_shard(_pg_id + 1, Mi, "shard meta").get().error().getCode()); } TEST_F(TestFixture, GetUnknownShard) { diff --git a/src/lib/tests/fixture_app.cpp b/src/lib/tests/fixture_app.cpp index 18f836f34..c71007247 100644 --- a/src/lib/tests/fixture_app.cpp +++ b/src/lib/tests/fixture_app.cpp @@ -50,11 +50,11 @@ void TestFixture::SetUp() { EXPECT_TRUE(homeobj_->pg_manager()->create_pg(std::move(info), tid).get()); LOGDEBUG("Setup Shards, trace_id={}", tid); - auto s_e = homeobj_->shard_manager()->create_shard(_pg_id, Mi, tid).get(); + auto s_e = homeobj_->shard_manager()->create_shard(_pg_id, Mi, "shard meta", tid).get(); ASSERT_TRUE(!!s_e); s_e.then([this](auto&& i) { _shard_1 = std::move(i); }); - s_e = homeobj_->shard_manager()->create_shard(_pg_id, Mi, tid).get(); + s_e = homeobj_->shard_manager()->create_shard(_pg_id, Mi, "shard meta", tid).get(); ASSERT_TRUE(!!s_e); s_e.then([this](auto&& i) { _shard_2 = std::move(i); }); diff --git a/v4upgrade.md b/v4upgrade.md new file mode 100644 index 000000000..a5739ad46 --- /dev/null +++ b/v4upgrade.md @@ -0,0 +1,41 @@ +# v4 upgrade Note + +## Version Changes +- DataHeader version: 0x02 +- BlobHeader version: 0x02 +- Shard superblock version: 0x02 + +Currently only support version backward compatibility. + +## Incompatibilities and Breaking Changes + +### 1. Fixed-Size BlobHeader (4KB) +- Add a blob_header_version=0x02 +- Move user key into the BlobHeader. Note, currently gw doesn't pass user key, but in next version, it will put object identifier as user key. +- More padding to align BlobHeader to 4KB + +### 2. Shard Metadata Persistence +- Add a shard sb_verison=0x02 +- Add metadata field to ShardInfo structure for future use + +### 3. Partial Read Optimization +**Impact:** New capability, backward compatible for new reads but changes read behavior +- Added `allow_skip_verify` parameter to enable optimized partial reads +- When enabled partial read, skips header read and directly reads requested data range +- Improves HDD performance by reducing I/O operations +- Only works with v4's fixed-size header (predictable data offset) + +## Upgrade Plan +1. **Stop PG traffic** to all HomeObject instances +2. **Rewrite all existing data** and upgrade to v4 in place using offline refactor tool: + - Blob data + - Read each blob with v3 format + - Rewrite with v4 fixed-size BlobHeader format + - Embed user_key into BlobHeader (validate max 1024 bytes) + - Update data_offset to fixed value (4096) + - Recalculate hash without separate user_key parameter + - Update shard metadata + - Rewrite shard superblock with version 0x02 + - Rewrite shard header and footer in chunks +3. **If chunk space is insufficient**, migrate the data from upper layer(rclone nuobject data) +4. **Restore service and traffic** \ No newline at end of file