From 9b1de0b6d0ef8528825672662aa110e0b7b6258b Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Wed, 27 Aug 2025 21:28:35 +0800 Subject: [PATCH 1/7] Add log Signed-off-by: JaySon-Huang --- dbms/src/Server/DTTool/DTToolBench.cpp | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/dbms/src/Server/DTTool/DTToolBench.cpp b/dbms/src/Server/DTTool/DTToolBench.cpp index d6284baf6b9..5c088469bdc 100644 --- a/dbms/src/Server/DTTool/DTToolBench.cpp +++ b/dbms/src/Server/DTTool/DTToolBench.cpp @@ -418,6 +418,7 @@ int benchEntry(const std::vector & opts) opt.emplace(std::map{}, frame, algorithm); if (version == 2) { + // frame checksum DB::STORAGE_FORMAT_CURRENT = DB::STORAGE_FORMAT_V3; } else if (version == 3) @@ -443,13 +444,13 @@ int benchEntry(const std::vector & opts) = genBlocks(random_seed, num_rows, num_cols, field, sparse_ratio, logger); } - size_t write_cost_ms = 0; + TableID table_id = 1; auto settings = DB::Settings(); auto db_context = env.getContext(); auto path_pool = std::make_shared(db_context->getPathPool().withTable("test", "t1", false)); auto storage_pool - = std::make_shared(*db_context, NullspaceID, /*ns_id*/ 1, *path_pool, "test.t1"); + = std::make_shared(*db_context, NullspaceID, table_id, *path_pool, "test.t1"); auto dm_settings = DB::DM::DeltaMergeStore::Settings{}; auto dm_context = DB::DM::DMContext::createUnique( *db_context, @@ -457,7 +458,7 @@ int benchEntry(const std::vector & opts) storage_pool, /*min_version_*/ 0, NullspaceID, - /*physical_table_id*/ 1, + table_id, /*pk_col_id*/ 0, false, 1, @@ -469,6 +470,7 @@ int benchEntry(const std::vector & opts) // Write if (write_repeat > 0) { + size_t write_cost_ms = 0; LOG_INFO(logger, "start writing"); for (size_t i = 0; i < write_repeat; ++i) { @@ -489,15 +491,17 @@ int benchEntry(const std::vector & opts) write_cost_ms += duration; LOG_INFO(logger, "attempt {} finished in {} ms", i, duration); } + size_t effective_size_on_disk = dmfile->getBytesOnDisk(); LOG_INFO( logger, "average write time: {} ms", (static_cast(write_cost_ms) / static_cast(repeat))); LOG_INFO( logger, - "throughput (MB/s): {:.3f}", - (static_cast(effective_size) * 1'000 * static_cast(repeat) - / static_cast(write_cost_ms) / 1024 / 1024)); + "write throughput by uncompressed size: {:.3f}MiB/s;" + " write throughput by compressed size: {:.3f}MiB/s", + (effective_size * 1'000.0 * repeat / write_cost_ms / 1024 / 1024), + (effective_size_on_disk * 1'000.0 * repeat / write_cost_ms / 1024 / 1024)); } // Read @@ -548,12 +552,10 @@ int benchEntry(const std::vector & opts) LOG_INFO(logger, "average read time: {} ms", (static_cast(read_cost_ms) / static_cast(repeat))); LOG_INFO( logger, - "throughput by deserialized bytes (MB/s): {:.3f}" - " throughput by compressed bytes (MB/s): {:.3f}", - (static_cast(effective_size_read) * 1'000 * static_cast(repeat) - / static_cast(read_cost_ms) / 1024 / 1024), - (static_cast(effective_size_on_disk) * 1'000 * static_cast(repeat) - / static_cast(read_cost_ms) / 1024 / 1024)); + "read throughput by uncompressed bytes: {:.3f}MiB/s;" + " read throughput by compressed bytes: {:.3f}MiB/s", + (effective_size_read * 1'000.0 * repeat / read_cost_ms / 1024 / 1024), + (effective_size_on_disk * 1'000.0 * repeat / read_cost_ms / 1024 / 1024)); } catch (const boost::wrapexcept & e) { From 7cdcd27523ae9cbcfdcdb1e8020e3512c1aad297 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Fri, 22 Aug 2025 14:58:48 +0800 Subject: [PATCH 2/7] minimize read amplification in the merged file Signed-off-by: JaySon-Huang --- .../Storages/DeltaMerge/File/ColumnStream.cpp | 27 +++++++++++-------- .../DeltaMerge/File/DMFilePackFilter.cpp | 8 +++--- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp b/dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp index a4c4dc75b3c..fe26ea09b82 100644 --- a/dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp +++ b/dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp @@ -115,11 +115,13 @@ class MarkLoader return res; // First, read from merged file to get the raw data(contains the header) + // Note that we directly use `data_size` as the size of buffer size in order + // to minimize read amplification in the merged file. auto buffer = ReadBufferFromRandomAccessFileBuilder::build( reader.file_provider, file_path, encrypt_path, - reader.dmfile->getConfiguration()->getChecksumFrameLength(), + data_size, read_limiter); buffer.seek(offset); @@ -234,9 +236,10 @@ std::unique_ptr ColumnReadStream::buildColDataRe { const auto * dmfile_meta = typeid_cast(reader.dmfile->meta.get()); assert(dmfile_meta != nullptr); - const auto & info = dmfile_meta->merged_sub_file_infos.find(colDataFileName(file_name_base)); - if (info == dmfile_meta->merged_sub_file_infos.end()) + const auto & info_iter = dmfile_meta->merged_sub_file_infos.find(colDataFileName(file_name_base)); + if (info_iter == dmfile_meta->merged_sub_file_infos.end()) { + // Not merged into merged file, read from the original data file. return CompressedReadBufferFromFileBuilder::build( reader.file_provider, reader.dmfile->colDataPath(file_name_base), @@ -247,26 +250,28 @@ std::unique_ptr ColumnReadStream::buildColDataRe reader.dmfile->getConfiguration()->getChecksumFrameLength()); } - assert(info != dmfile_meta->merged_sub_file_infos.end()); - auto file_path = dmfile_meta->mergedPath(info->second.number); - auto encrypt_path = dmfile_meta->encryptionMergedPath(info->second.number); - auto offset = info->second.offset; - auto size = info->second.size; + assert(info_iter != dmfile_meta->merged_sub_file_infos.end()); + auto file_path = dmfile_meta->mergedPath(info_iter->second.number); + auto encrypt_path = dmfile_meta->encryptionMergedPath(info_iter->second.number); + auto offset = info_iter->second.offset; + auto data_size = info_iter->second.size; // First, read from merged file to get the raw data(contains the header) + // Note that we directly use `data_size` as the size of buffer size in order + // to minimize read amplification in the merged file. auto buffer = ReadBufferFromRandomAccessFileBuilder::build( reader.file_provider, file_path, encrypt_path, - reader.dmfile->getConfiguration()->getChecksumFrameLength(), + data_size, read_limiter); buffer.seek(offset); // Read the raw data into memory. It is OK because the mark merged into // merged_file is small enough. String raw_data; - raw_data.resize(size); - buffer.read(reinterpret_cast(raw_data.data()), size); + raw_data.resize(data_size); + buffer.read(reinterpret_cast(raw_data.data()), data_size); // Then read from the buffer based on the raw data return CompressedReadBufferFromFileBuilder::build( diff --git a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp index 2c022743fb4..496a9d9fbf1 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp @@ -246,17 +246,19 @@ std::pair DMFilePackFilter::loadIndex( auto offset = info->second.offset; auto data_size = info->second.size; + // First, read from merged file to get the raw data(contains the header) + // Note that we directly use `data_size` as the size of buffer size in order + // to minimize read amplification in the merged file. auto buffer = ReadBufferFromRandomAccessFileBuilder::build( file_provider, file_path, encrypt_path, - dmfile.getConfiguration()->getChecksumFrameLength(), + data_size, read_limiter); buffer.seek(offset); String raw_data; raw_data.resize(data_size); - buffer.read(reinterpret_cast(raw_data.data()), data_size); auto buf = ChecksumReadBufferBuilder::build( @@ -502,7 +504,7 @@ std::pair, DMFilePackFilterResults> DMFileP { // `not_clean > 0` means there are more than one version for some rowkeys in this pack // `pack.max_version > start_ts` means some rows will be filtered by MVCC reading - // We need to read this pack to do delte merge, RowKey or MVCC filter. + // We need to read this pack to do delta merge, RowKey or MVCC filter. continue; } From 4b5f6681054c5b7e77743d09cf3855d04a1531ef Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Tue, 22 Jul 2025 17:56:32 +0800 Subject: [PATCH 3/7] Split class MinMaxIndexLoader Signed-off-by: JaySon-Huang --- dbms/src/Storages/DeltaMerge/File/DMFile.h | 1 + .../DeltaMerge/File/DMFilePackFilter.cpp | 214 +++++++++++------- 2 files changed, 135 insertions(+), 80 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.h b/dbms/src/Storages/DeltaMerge/File/DMFile.h index ab8d74f1f62..14e3d688da3 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.h @@ -358,6 +358,7 @@ class DMFile : private boost::noncopyable friend class DMFileLocalIndexWriter; friend class DMFileReader; friend class MarkLoader; + friend class MinMaxIndexLoader; friend class ColumnReadStream; friend class DMFilePackFilter; friend class DMFileBlockInputStreamBuilder; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp index 496a9d9fbf1..67a1165e64a 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -198,102 +199,155 @@ void DMFilePackFilter::loadIndex( indexes.emplace(col_id, RSIndex(type, minmax_index)); } -std::pair DMFilePackFilter::loadIndex( - const DMFile & dmfile, - const FileProviderPtr & file_provider, - const MinMaxIndexCachePtr & index_cache, - bool set_cache_if_miss, - ColId col_id, - const ReadLimiterPtr & read_limiter, - const ScanContextPtr & scan_context) +class MinMaxIndexLoader { - const auto & type = dmfile.getColumnStat(col_id).type; - const auto file_name_base = DMFile::getFileNameBase(col_id); - - auto load = [&]() { +public: + // Make the instance of `MinMaxIndexLoader` as a callable object that is used in + // `index_cache->getOrSet(...)`. + MinMaxIndexPtr operator()() + { + const auto & type = dmfile.getColumnStat(col_id).type; auto index_file_size = dmfile.colIndexSize(col_id); if (index_file_size == 0) return std::make_shared(*type); + auto index_guard = S3::S3RandomAccessFile::setReadFileInfo({ .size = dmfile.getReadFileSize(col_id, colIndexFileName(file_name_base)), .scan_context = scan_context, }); - if (!dmfile.getConfiguration()) // v1 + + if (likely(dmfile.useMetaV2())) { - auto index_buf = ReadBufferFromRandomAccessFileBuilder::build( - file_provider, - dmfile.colIndexPath(file_name_base), - dmfile.encryptionIndexPath(file_name_base), - std::min(static_cast(DBMS_DEFAULT_BUFFER_SIZE), index_file_size), - read_limiter); - return MinMaxIndex::read(*type, index_buf, index_file_size); + // the min-max index is merged into metav2 + return loadMinMaxIndexFromMetav2(type, index_file_size); } - else if (dmfile.useMetaV2()) // v3 + else if (unlikely(!dmfile.getConfiguration())) { - const auto * dmfile_meta = typeid_cast(dmfile.meta.get()); - assert(dmfile_meta != nullptr); - auto info = dmfile_meta->merged_sub_file_infos.find(colIndexFileName(file_name_base)); - if (info == dmfile_meta->merged_sub_file_infos.end()) - { - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Unknown index file {}", - dmfile.colIndexPath(file_name_base)); - } - - auto file_path = dmfile.meta->mergedPath(info->second.number); - auto encrypt_path = dmfile_meta->encryptionMergedPath(info->second.number); - auto offset = info->second.offset; - auto data_size = info->second.size; - - // First, read from merged file to get the raw data(contains the header) - // Note that we directly use `data_size` as the size of buffer size in order - // to minimize read amplification in the merged file. - auto buffer = ReadBufferFromRandomAccessFileBuilder::build( - file_provider, - file_path, - encrypt_path, - data_size, - read_limiter); - buffer.seek(offset); - - String raw_data; - raw_data.resize(data_size); - buffer.read(reinterpret_cast(raw_data.data()), data_size); - - auto buf = ChecksumReadBufferBuilder::build( - std::move(raw_data), - file_path, // just for debug, the buffer is part of the merged file - dmfile.getConfiguration()->getChecksumFrameLength(), - dmfile.getConfiguration()->getChecksumAlgorithm(), - dmfile.getConfiguration()->getChecksumFrameLength()); - - auto header_size = dmfile.getConfiguration()->getChecksumHeaderLength(); - auto frame_total_size = dmfile.getConfiguration()->getChecksumFrameLength() + header_size; - auto frame_count = index_file_size / frame_total_size + (index_file_size % frame_total_size != 0); - - return MinMaxIndex::read(*type, *buf, index_file_size - header_size * frame_count); + // without checksum, simply load the raw bytes from file + return loadRawMinMaxIndex(type, index_file_size); } else - { // v2 - auto index_buf = ChecksumReadBufferBuilder::build( - file_provider, - dmfile.colIndexPath(file_name_base), - dmfile.encryptionIndexPath(file_name_base), - index_file_size, - read_limiter, - dmfile.getConfiguration()->getChecksumAlgorithm(), - dmfile.getConfiguration()->getChecksumFrameLength()); - auto header_size = dmfile.getConfiguration()->getChecksumHeaderLength(); - auto frame_total_size = dmfile.getConfiguration()->getChecksumFrameLength() + header_size; - auto frame_count = index_file_size / frame_total_size + (index_file_size % frame_total_size != 0); - return MinMaxIndex::read(*type, *index_buf, index_file_size - header_size * frame_count); + { + // checksum is enabled but not merged into meta v2 + return loadMinMaxIndexWithChecksum(type, index_file_size); } - }; + } + +public: + MinMaxIndexLoader( + const DMFile & dmfile_, + const FileProviderPtr & file_provider_, + ColId col_id_, + const ReadLimiterPtr & read_limiter_, + const ScanContextPtr & scan_context_) + : dmfile(dmfile_) + , file_name_base(DMFile::getFileNameBase(col_id_)) + , col_id(col_id_) + , file_provider(file_provider_) + , read_limiter(read_limiter_) + , scan_context(scan_context_) + {} + + const DMFile & dmfile; + const FileNameBase file_name_base; + ColId col_id; + FileProviderPtr file_provider; + ReadLimiterPtr read_limiter; + ScanContextPtr scan_context; + +private: + MinMaxIndexPtr loadRawMinMaxIndex(const DataTypePtr & type, size_t index_file_size) const + { + auto index_buf = ReadBufferFromRandomAccessFileBuilder::build( + file_provider, + dmfile.colIndexPath(file_name_base), + dmfile.encryptionIndexPath(file_name_base), + std::min(static_cast(DBMS_DEFAULT_BUFFER_SIZE), index_file_size), + read_limiter); + return MinMaxIndex::read(*type, index_buf, index_file_size); + } + + MinMaxIndexPtr loadMinMaxIndexWithChecksum(const DataTypePtr & type, size_t index_file_size) const + { + auto index_buf = ChecksumReadBufferBuilder::build( + file_provider, + dmfile.colIndexPath(file_name_base), + dmfile.encryptionIndexPath(file_name_base), + index_file_size, + read_limiter, + dmfile.getConfiguration()->getChecksumAlgorithm(), + dmfile.getConfiguration()->getChecksumFrameLength()); + auto header_size = dmfile.getConfiguration()->getChecksumHeaderLength(); + auto frame_total_size = dmfile.getConfiguration()->getChecksumFrameLength() + header_size; + auto frame_count = index_file_size / frame_total_size + (index_file_size % frame_total_size != 0); + return MinMaxIndex::read(*type, *index_buf, index_file_size - header_size * frame_count); + } + + MinMaxIndexPtr loadMinMaxIndexFromMetav2(const DataTypePtr & type, size_t index_file_size) const + { + const auto * dmfile_meta = typeid_cast(dmfile.meta.get()); + assert(dmfile_meta != nullptr); + const auto col_index_fname = colIndexFileName(file_name_base); + auto info_iter = dmfile_meta->merged_sub_file_infos.find(col_index_fname); + RUNTIME_CHECK_MSG( + info_iter != dmfile_meta->merged_sub_file_infos.end(), + "Unknown index file, dmfile_path={} index_fname={}", + dmfile.parentPath(), + col_index_fname); + + const auto & merged_file_info = info_iter->second; + auto file_path = dmfile.meta->mergedPath(merged_file_info.number); + auto encrypt_path = dmfile_meta->encryptionMergedPath(merged_file_info.number); + auto offset = merged_file_info.offset; + auto data_size = merged_file_info.size; + + // First, read from merged file to get the raw data(contains the header) + // Note that we directly use `data_size` as the size of buffer size in order + // to minimize read amplification in the merged file. + auto buffer = ReadBufferFromRandomAccessFileBuilder::build( + file_provider, + file_path, + encrypt_path, + data_size, + read_limiter); + buffer.seek(offset); + + String raw_data; + raw_data.resize(data_size); + buffer.read(reinterpret_cast(raw_data.data()), data_size); + + auto buf = ChecksumReadBufferBuilder::build( + std::move(raw_data), + file_path, + dmfile.getConfiguration()->getChecksumFrameLength(), + dmfile.getConfiguration()->getChecksumAlgorithm(), + dmfile.getConfiguration()->getChecksumFrameLength()); + + auto header_size = dmfile.getConfiguration()->getChecksumHeaderLength(); + auto frame_total_size = dmfile.getConfiguration()->getChecksumFrameLength() + header_size; + auto frame_count = index_file_size / frame_total_size + (index_file_size % frame_total_size != 0); + + return MinMaxIndex::read(*type, *buf, index_file_size - header_size * frame_count); + } +}; + +std::pair DMFilePackFilter::loadIndex( + const DMFile & dmfile, + const FileProviderPtr & file_provider, + const MinMaxIndexCachePtr & index_cache, + bool set_cache_if_miss, + ColId col_id, + const ReadLimiterPtr & read_limiter, + const ScanContextPtr & scan_context) +{ + const auto & type = dmfile.getColumnStat(col_id).type; + const auto file_name_base = DMFile::getFileNameBase(col_id); + MinMaxIndexPtr minmax_index; if (index_cache && set_cache_if_miss) { - minmax_index = index_cache->getOrSet(dmfile.colIndexCacheKey(file_name_base), load); + auto loader = MinMaxIndexLoader(dmfile, file_provider, col_id, read_limiter, scan_context); + minmax_index = index_cache->getOrSet(dmfile.colIndexCacheKey(file_name_base), loader); } else { @@ -301,7 +355,7 @@ std::pair DMFilePackFilter::loadIndex( if (index_cache) minmax_index = index_cache->get(dmfile.colIndexCacheKey(file_name_base)); if (minmax_index == nullptr) - minmax_index = load(); + minmax_index = MinMaxIndexLoader(dmfile, file_provider, col_id, read_limiter, scan_context)(); } return {type, minmax_index}; } From 9b214d5f356eafcca336f147a618d4e89928d297 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 28 Aug 2025 13:00:48 +0800 Subject: [PATCH 4/7] Refine the allocated buffer size Signed-off-by: JaySon-Huang --- .../Storages/DeltaMerge/File/ColumnStream.cpp | 26 ++++++++----------- .../DeltaMerge/File/DMFilePackFilter.cpp | 14 +++++----- 2 files changed, 17 insertions(+), 23 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp b/dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp index fe26ea09b82..21075d2829b 100644 --- a/dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp +++ b/dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp @@ -106,10 +106,9 @@ class MarkLoader col_mark_fname); const auto & merged_file_info = info_iter->second; - auto file_path = dmfile_meta->mergedPath(merged_file_info.number); - auto encrypt_path = dmfile_meta->encryptionMergedPath(merged_file_info.number); - auto offset = merged_file_info.offset; - auto data_size = merged_file_info.size; + const auto file_path = dmfile_meta->mergedPath(merged_file_info.number); + const auto offset = merged_file_info.offset; + const auto data_size = merged_file_info.size; if (data_size == 0) return res; @@ -120,22 +119,21 @@ class MarkLoader auto buffer = ReadBufferFromRandomAccessFileBuilder::build( reader.file_provider, file_path, - encrypt_path, + dmfile_meta->encryptionMergedPath(merged_file_info.number), data_size, read_limiter); buffer.seek(offset); // Read the raw data into memory. It is OK because the mark merged into // merged_file is small enough. - String raw_data; - raw_data.resize(data_size); + String raw_data(data_size, '\0'); buffer.read(reinterpret_cast(raw_data.data()), data_size); // Then read from the buffer based on the raw data auto buf = ChecksumReadBufferBuilder::build( std::move(raw_data), file_path, // just for debug, the buffer is part of the merged file - reader.dmfile->getConfiguration()->getChecksumFrameLength(), + data_size, // reduce the allocated buffer size and overhead reader.dmfile->getConfiguration()->getChecksumAlgorithm(), reader.dmfile->getConfiguration()->getChecksumFrameLength()); buf->readBig(reinterpret_cast(res->data()), bytes_size); @@ -252,9 +250,8 @@ std::unique_ptr ColumnReadStream::buildColDataRe assert(info_iter != dmfile_meta->merged_sub_file_infos.end()); auto file_path = dmfile_meta->mergedPath(info_iter->second.number); - auto encrypt_path = dmfile_meta->encryptionMergedPath(info_iter->second.number); - auto offset = info_iter->second.offset; - auto data_size = info_iter->second.size; + const auto offset = info_iter->second.offset; + const auto data_size = info_iter->second.size; // First, read from merged file to get the raw data(contains the header) // Note that we directly use `data_size` as the size of buffer size in order @@ -262,22 +259,21 @@ std::unique_ptr ColumnReadStream::buildColDataRe auto buffer = ReadBufferFromRandomAccessFileBuilder::build( reader.file_provider, file_path, - encrypt_path, + dmfile_meta->encryptionMergedPath(info_iter->second.number), data_size, read_limiter); buffer.seek(offset); // Read the raw data into memory. It is OK because the mark merged into // merged_file is small enough. - String raw_data; - raw_data.resize(data_size); + String raw_data(data_size, '\0'); buffer.read(reinterpret_cast(raw_data.data()), data_size); // Then read from the buffer based on the raw data return CompressedReadBufferFromFileBuilder::build( std::move(raw_data), file_path, - reader.dmfile->getConfiguration()->getChecksumFrameLength(), + data_size, // reduce the allocated buffer size and overhead reader.dmfile->getConfiguration()->getChecksumAlgorithm(), reader.dmfile->getConfiguration()->getChecksumFrameLength()); } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp index 67a1165e64a..b100c7e5544 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp @@ -296,10 +296,9 @@ class MinMaxIndexLoader col_index_fname); const auto & merged_file_info = info_iter->second; - auto file_path = dmfile.meta->mergedPath(merged_file_info.number); - auto encrypt_path = dmfile_meta->encryptionMergedPath(merged_file_info.number); - auto offset = merged_file_info.offset; - auto data_size = merged_file_info.size; + const auto file_path = dmfile.meta->mergedPath(merged_file_info.number); + const auto offset = merged_file_info.offset; + const auto data_size = merged_file_info.size; // First, read from merged file to get the raw data(contains the header) // Note that we directly use `data_size` as the size of buffer size in order @@ -307,19 +306,18 @@ class MinMaxIndexLoader auto buffer = ReadBufferFromRandomAccessFileBuilder::build( file_provider, file_path, - encrypt_path, + dmfile_meta->encryptionMergedPath(merged_file_info.number), data_size, read_limiter); buffer.seek(offset); - String raw_data; - raw_data.resize(data_size); + String raw_data(data_size, '\0'); buffer.read(reinterpret_cast(raw_data.data()), data_size); auto buf = ChecksumReadBufferBuilder::build( std::move(raw_data), file_path, - dmfile.getConfiguration()->getChecksumFrameLength(), + data_size, // reduce the allocated buffer size and overhead dmfile.getConfiguration()->getChecksumAlgorithm(), dmfile.getConfiguration()->getChecksumFrameLength()); From 9eb995241070203da13bd5d9b4db79fa3b4e68f8 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Fri, 29 Aug 2025 10:10:38 +0800 Subject: [PATCH 5/7] Address comment of using min(data_size, checksum_frame_size) Signed-off-by: JaySon-Huang --- dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp | 12 ++++++------ .../Storages/DeltaMerge/File/DMFilePackFilter.cpp | 5 +++-- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp b/dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp index 21075d2829b..d1671ad6367 100644 --- a/dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp +++ b/dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp @@ -114,13 +114,13 @@ class MarkLoader return res; // First, read from merged file to get the raw data(contains the header) - // Note that we directly use `data_size` as the size of buffer size in order + // Note that we use min(`data_size`, checksum_frame_size) as the size of buffer size in order // to minimize read amplification in the merged file. auto buffer = ReadBufferFromRandomAccessFileBuilder::build( reader.file_provider, file_path, dmfile_meta->encryptionMergedPath(merged_file_info.number), - data_size, + std::min(data_size, reader.dmfile->getConfiguration()->getChecksumFrameLength()), read_limiter); buffer.seek(offset); @@ -129,7 +129,7 @@ class MarkLoader String raw_data(data_size, '\0'); buffer.read(reinterpret_cast(raw_data.data()), data_size); - // Then read from the buffer based on the raw data + // Then read from the buffer based on the raw data. The buffer size is min(estimated_size, checksum_frame_size) auto buf = ChecksumReadBufferBuilder::build( std::move(raw_data), file_path, // just for debug, the buffer is part of the merged file @@ -254,13 +254,13 @@ std::unique_ptr ColumnReadStream::buildColDataRe const auto data_size = info_iter->second.size; // First, read from merged file to get the raw data(contains the header) - // Note that we directly use `data_size` as the size of buffer size in order + // Note that we use min(`data_size`, checksum_frame_size) as the size of buffer size in order // to minimize read amplification in the merged file. auto buffer = ReadBufferFromRandomAccessFileBuilder::build( reader.file_provider, file_path, dmfile_meta->encryptionMergedPath(info_iter->second.number), - data_size, + std::min(data_size, reader.dmfile->getConfiguration()->getChecksumFrameLength()), read_limiter); buffer.seek(offset); @@ -269,7 +269,7 @@ std::unique_ptr ColumnReadStream::buildColDataRe String raw_data(data_size, '\0'); buffer.read(reinterpret_cast(raw_data.data()), data_size); - // Then read from the buffer based on the raw data + // Then read from the buffer based on the raw data. The buffer size is min(estimated_size, checksum_frame_size) return CompressedReadBufferFromFileBuilder::build( std::move(raw_data), file_path, diff --git a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp index b100c7e5544..071b24b3b28 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp @@ -301,19 +301,20 @@ class MinMaxIndexLoader const auto data_size = merged_file_info.size; // First, read from merged file to get the raw data(contains the header) - // Note that we directly use `data_size` as the size of buffer size in order + // Note that we use min(`data_size`, checksum_frame_size) as the size of buffer size in order // to minimize read amplification in the merged file. auto buffer = ReadBufferFromRandomAccessFileBuilder::build( file_provider, file_path, dmfile_meta->encryptionMergedPath(merged_file_info.number), - data_size, + std::min(data_size, dmfile.getConfiguration()->getChecksumFrameLength()), read_limiter); buffer.seek(offset); String raw_data(data_size, '\0'); buffer.read(reinterpret_cast(raw_data.data()), data_size); + // Then read from the buffer based on the raw data. The buffer size is min(estimated_size, checksum_frame_size) auto buf = ChecksumReadBufferBuilder::build( std::move(raw_data), file_path, From 0fa5138e330a80edda55c8dada70781fd3167f89 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Fri, 29 Aug 2025 10:25:55 +0800 Subject: [PATCH 6/7] Remove useless estimated_size when creating from in-memory-bufer Signed-off-by: JaySon-Huang --- dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.cpp | 3 +-- dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.h | 1 - .../IO/FileProvider/CompressedReadBufferFromFileBuilder.cpp | 2 -- .../IO/FileProvider/CompressedReadBufferFromFileBuilder.h | 1 - dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp | 6 ++---- dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp | 3 +-- 6 files changed, 4 insertions(+), 12 deletions(-) diff --git a/dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.cpp b/dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.cpp index bd757eaa755..1ff2f99de53 100644 --- a/dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.cpp +++ b/dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.cpp @@ -55,12 +55,11 @@ std::unique_ptr ChecksumReadBufferBuilder::build( std::unique_ptr ChecksumReadBufferBuilder::build( String && data, const String & file_name, - size_t estimated_size, ChecksumAlgo checksum_algorithm, size_t checksum_frame_size) { + auto allocation_size = std::min(data.size(), checksum_frame_size); auto file = std::make_shared(file_name, std::forward(data)); - auto allocation_size = std::min(estimated_size, checksum_frame_size); switch (checksum_algorithm) { case ChecksumAlgo::None: diff --git a/dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.h b/dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.h index 31566818f20..a330b046d6d 100644 --- a/dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.h +++ b/dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.h @@ -40,7 +40,6 @@ class ChecksumReadBufferBuilder static std::unique_ptr build( String && data, const String & file_name, - size_t estimated_size, ChecksumAlgo checksum_algorithm, size_t checksum_frame_size); }; diff --git a/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.cpp b/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.cpp index 81f4b24a70f..8131e1e4044 100644 --- a/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.cpp +++ b/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.cpp @@ -40,14 +40,12 @@ std::unique_ptr CompressedReadBufferFromFile std::unique_ptr CompressedReadBufferFromFileBuilder::build( String && data, const String & file_name, - size_t estimated_size, ChecksumAlgo checksum_algorithm, size_t checksum_frame_size) { auto file_in = ChecksumReadBufferBuilder::build( std::move(data), file_name, - estimated_size, checksum_algorithm, checksum_frame_size); return std::make_unique>(std::move(file_in)); diff --git a/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.h b/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.h index edbce869abd..85cdc164539 100644 --- a/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.h +++ b/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.h @@ -34,7 +34,6 @@ class CompressedReadBufferFromFileBuilder static std::unique_ptr build( String && data, const String & file_name, - size_t estimated_size, ChecksumAlgo checksum_algorithm, size_t checksum_frame_size); diff --git a/dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp b/dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp index d1671ad6367..b1fb72e8761 100644 --- a/dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp +++ b/dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp @@ -129,11 +129,10 @@ class MarkLoader String raw_data(data_size, '\0'); buffer.read(reinterpret_cast(raw_data.data()), data_size); - // Then read from the buffer based on the raw data. The buffer size is min(estimated_size, checksum_frame_size) + // Then read from the buffer based on the raw data. The buffer size is min(data.size(), checksum_frame_size) auto buf = ChecksumReadBufferBuilder::build( std::move(raw_data), file_path, // just for debug, the buffer is part of the merged file - data_size, // reduce the allocated buffer size and overhead reader.dmfile->getConfiguration()->getChecksumAlgorithm(), reader.dmfile->getConfiguration()->getChecksumFrameLength()); buf->readBig(reinterpret_cast(res->data()), bytes_size); @@ -269,11 +268,10 @@ std::unique_ptr ColumnReadStream::buildColDataRe String raw_data(data_size, '\0'); buffer.read(reinterpret_cast(raw_data.data()), data_size); - // Then read from the buffer based on the raw data. The buffer size is min(estimated_size, checksum_frame_size) + // Then read from the buffer based on the raw data. The buffer size is min(data.size(), checksum_frame_size) return CompressedReadBufferFromFileBuilder::build( std::move(raw_data), file_path, - data_size, // reduce the allocated buffer size and overhead reader.dmfile->getConfiguration()->getChecksumAlgorithm(), reader.dmfile->getConfiguration()->getChecksumFrameLength()); } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp index 071b24b3b28..4f421a3dbe8 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp @@ -314,11 +314,10 @@ class MinMaxIndexLoader String raw_data(data_size, '\0'); buffer.read(reinterpret_cast(raw_data.data()), data_size); - // Then read from the buffer based on the raw data. The buffer size is min(estimated_size, checksum_frame_size) + // Then read from the buffer based on the raw data. The buffer size is min(data.size(), checksum_frame_size) auto buf = ChecksumReadBufferBuilder::build( std::move(raw_data), file_path, - data_size, // reduce the allocated buffer size and overhead dmfile.getConfiguration()->getChecksumAlgorithm(), dmfile.getConfiguration()->getChecksumFrameLength()); From 440da39bb648f6af999d7caf584ff4752d1a6b70 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Fri, 29 Aug 2025 12:22:55 +0800 Subject: [PATCH 7/7] Format files Signed-off-by: JaySon-Huang --- .../FileProvider/CompressedReadBufferFromFileBuilder.cpp | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.cpp b/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.cpp index 8131e1e4044..e819f2d61a0 100644 --- a/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.cpp +++ b/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.cpp @@ -43,11 +43,8 @@ std::unique_ptr CompressedReadBufferFromFileBuilde ChecksumAlgo checksum_algorithm, size_t checksum_frame_size) { - auto file_in = ChecksumReadBufferBuilder::build( - std::move(data), - file_name, - checksum_algorithm, - checksum_frame_size); + auto file_in + = ChecksumReadBufferBuilder::build(std::move(data), file_name, checksum_algorithm, checksum_frame_size); return std::make_unique>(std::move(file_in)); }