diff --git a/dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.cpp b/dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.cpp index bd757eaa755..1ff2f99de53 100644 --- a/dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.cpp +++ b/dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.cpp @@ -55,12 +55,11 @@ std::unique_ptr ChecksumReadBufferBuilder::build( std::unique_ptr ChecksumReadBufferBuilder::build( String && data, const String & file_name, - size_t estimated_size, ChecksumAlgo checksum_algorithm, size_t checksum_frame_size) { + auto allocation_size = std::min(data.size(), checksum_frame_size); auto file = std::make_shared(file_name, std::forward(data)); - auto allocation_size = std::min(estimated_size, checksum_frame_size); switch (checksum_algorithm) { case ChecksumAlgo::None: diff --git a/dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.h b/dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.h index 31566818f20..a330b046d6d 100644 --- a/dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.h +++ b/dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.h @@ -40,7 +40,6 @@ class ChecksumReadBufferBuilder static std::unique_ptr build( String && data, const String & file_name, - size_t estimated_size, ChecksumAlgo checksum_algorithm, size_t checksum_frame_size); }; diff --git a/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.cpp b/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.cpp index 81f4b24a70f..e819f2d61a0 100644 --- a/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.cpp +++ b/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.cpp @@ -40,16 +40,11 @@ std::unique_ptr CompressedReadBufferFromFile std::unique_ptr CompressedReadBufferFromFileBuilder::build( String && data, const String & file_name, - size_t estimated_size, ChecksumAlgo checksum_algorithm, size_t checksum_frame_size) { - auto file_in = ChecksumReadBufferBuilder::build( - std::move(data), - file_name, - estimated_size, - checksum_algorithm, - checksum_frame_size); + auto file_in + = ChecksumReadBufferBuilder::build(std::move(data), file_name, checksum_algorithm, checksum_frame_size); return std::make_unique>(std::move(file_in)); } diff --git a/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.h b/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.h index edbce869abd..85cdc164539 100644 --- a/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.h +++ b/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.h @@ -34,7 +34,6 @@ class CompressedReadBufferFromFileBuilder static std::unique_ptr build( String && data, const String & file_name, - size_t estimated_size, ChecksumAlgo checksum_algorithm, size_t checksum_frame_size); diff --git a/dbms/src/Server/DTTool/DTToolBench.cpp b/dbms/src/Server/DTTool/DTToolBench.cpp index d6284baf6b9..5c088469bdc 100644 --- a/dbms/src/Server/DTTool/DTToolBench.cpp +++ b/dbms/src/Server/DTTool/DTToolBench.cpp @@ -418,6 +418,7 @@ int benchEntry(const std::vector & opts) opt.emplace(std::map{}, frame, algorithm); if (version == 2) { + // frame checksum DB::STORAGE_FORMAT_CURRENT = DB::STORAGE_FORMAT_V3; } else if (version == 3) @@ -443,13 +444,13 @@ int benchEntry(const std::vector & opts) = genBlocks(random_seed, num_rows, num_cols, field, sparse_ratio, logger); } - size_t write_cost_ms = 0; + TableID table_id = 1; auto settings = DB::Settings(); auto db_context = env.getContext(); auto path_pool = std::make_shared(db_context->getPathPool().withTable("test", "t1", false)); auto storage_pool - = std::make_shared(*db_context, NullspaceID, /*ns_id*/ 1, *path_pool, "test.t1"); + = std::make_shared(*db_context, NullspaceID, table_id, *path_pool, "test.t1"); auto dm_settings = DB::DM::DeltaMergeStore::Settings{}; auto dm_context = DB::DM::DMContext::createUnique( *db_context, @@ -457,7 +458,7 @@ int benchEntry(const std::vector & opts) storage_pool, /*min_version_*/ 0, NullspaceID, - /*physical_table_id*/ 1, + table_id, /*pk_col_id*/ 0, false, 1, @@ -469,6 +470,7 @@ int benchEntry(const std::vector & opts) // Write if (write_repeat > 0) { + size_t write_cost_ms = 0; LOG_INFO(logger, "start writing"); for (size_t i = 0; i < write_repeat; ++i) { @@ -489,15 +491,17 @@ int benchEntry(const std::vector & opts) write_cost_ms += duration; LOG_INFO(logger, "attempt {} finished in {} ms", i, duration); } + size_t effective_size_on_disk = dmfile->getBytesOnDisk(); LOG_INFO( logger, "average write time: {} ms", (static_cast(write_cost_ms) / static_cast(repeat))); LOG_INFO( logger, - "throughput (MB/s): {:.3f}", - (static_cast(effective_size) * 1'000 * static_cast(repeat) - / static_cast(write_cost_ms) / 1024 / 1024)); + "write throughput by uncompressed size: {:.3f}MiB/s;" + " write throughput by compressed size: {:.3f}MiB/s", + (effective_size * 1'000.0 * repeat / write_cost_ms / 1024 / 1024), + (effective_size_on_disk * 1'000.0 * repeat / write_cost_ms / 1024 / 1024)); } // Read @@ -548,12 +552,10 @@ int benchEntry(const std::vector & opts) LOG_INFO(logger, "average read time: {} ms", (static_cast(read_cost_ms) / static_cast(repeat))); LOG_INFO( logger, - "throughput by deserialized bytes (MB/s): {:.3f}" - " throughput by compressed bytes (MB/s): {:.3f}", - (static_cast(effective_size_read) * 1'000 * static_cast(repeat) - / static_cast(read_cost_ms) / 1024 / 1024), - (static_cast(effective_size_on_disk) * 1'000 * static_cast(repeat) - / static_cast(read_cost_ms) / 1024 / 1024)); + "read throughput by uncompressed bytes: {:.3f}MiB/s;" + " read throughput by compressed bytes: {:.3f}MiB/s", + (effective_size_read * 1'000.0 * repeat / read_cost_ms / 1024 / 1024), + (effective_size_on_disk * 1'000.0 * repeat / read_cost_ms / 1024 / 1024)); } catch (const boost::wrapexcept & e) { diff --git a/dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp b/dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp index a4c4dc75b3c..b1fb72e8761 100644 --- a/dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp +++ b/dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp @@ -106,34 +106,33 @@ class MarkLoader col_mark_fname); const auto & merged_file_info = info_iter->second; - auto file_path = dmfile_meta->mergedPath(merged_file_info.number); - auto encrypt_path = dmfile_meta->encryptionMergedPath(merged_file_info.number); - auto offset = merged_file_info.offset; - auto data_size = merged_file_info.size; + const auto file_path = dmfile_meta->mergedPath(merged_file_info.number); + const auto offset = merged_file_info.offset; + const auto data_size = merged_file_info.size; if (data_size == 0) return res; // First, read from merged file to get the raw data(contains the header) + // Note that we use min(`data_size`, checksum_frame_size) as the size of buffer size in order + // to minimize read amplification in the merged file. auto buffer = ReadBufferFromRandomAccessFileBuilder::build( reader.file_provider, file_path, - encrypt_path, - reader.dmfile->getConfiguration()->getChecksumFrameLength(), + dmfile_meta->encryptionMergedPath(merged_file_info.number), + std::min(data_size, reader.dmfile->getConfiguration()->getChecksumFrameLength()), read_limiter); buffer.seek(offset); // Read the raw data into memory. It is OK because the mark merged into // merged_file is small enough. - String raw_data; - raw_data.resize(data_size); + String raw_data(data_size, '\0'); buffer.read(reinterpret_cast(raw_data.data()), data_size); - // Then read from the buffer based on the raw data + // Then read from the buffer based on the raw data. The buffer size is min(data.size(), checksum_frame_size) auto buf = ChecksumReadBufferBuilder::build( std::move(raw_data), file_path, // just for debug, the buffer is part of the merged file - reader.dmfile->getConfiguration()->getChecksumFrameLength(), reader.dmfile->getConfiguration()->getChecksumAlgorithm(), reader.dmfile->getConfiguration()->getChecksumFrameLength()); buf->readBig(reinterpret_cast(res->data()), bytes_size); @@ -234,9 +233,10 @@ std::unique_ptr ColumnReadStream::buildColDataRe { const auto * dmfile_meta = typeid_cast(reader.dmfile->meta.get()); assert(dmfile_meta != nullptr); - const auto & info = dmfile_meta->merged_sub_file_infos.find(colDataFileName(file_name_base)); - if (info == dmfile_meta->merged_sub_file_infos.end()) + const auto & info_iter = dmfile_meta->merged_sub_file_infos.find(colDataFileName(file_name_base)); + if (info_iter == dmfile_meta->merged_sub_file_infos.end()) { + // Not merged into merged file, read from the original data file. return CompressedReadBufferFromFileBuilder::build( reader.file_provider, reader.dmfile->colDataPath(file_name_base), @@ -247,32 +247,31 @@ std::unique_ptr ColumnReadStream::buildColDataRe reader.dmfile->getConfiguration()->getChecksumFrameLength()); } - assert(info != dmfile_meta->merged_sub_file_infos.end()); - auto file_path = dmfile_meta->mergedPath(info->second.number); - auto encrypt_path = dmfile_meta->encryptionMergedPath(info->second.number); - auto offset = info->second.offset; - auto size = info->second.size; + assert(info_iter != dmfile_meta->merged_sub_file_infos.end()); + auto file_path = dmfile_meta->mergedPath(info_iter->second.number); + const auto offset = info_iter->second.offset; + const auto data_size = info_iter->second.size; // First, read from merged file to get the raw data(contains the header) + // Note that we use min(`data_size`, checksum_frame_size) as the size of buffer size in order + // to minimize read amplification in the merged file. auto buffer = ReadBufferFromRandomAccessFileBuilder::build( reader.file_provider, file_path, - encrypt_path, - reader.dmfile->getConfiguration()->getChecksumFrameLength(), + dmfile_meta->encryptionMergedPath(info_iter->second.number), + std::min(data_size, reader.dmfile->getConfiguration()->getChecksumFrameLength()), read_limiter); buffer.seek(offset); // Read the raw data into memory. It is OK because the mark merged into // merged_file is small enough. - String raw_data; - raw_data.resize(size); - buffer.read(reinterpret_cast(raw_data.data()), size); + String raw_data(data_size, '\0'); + buffer.read(reinterpret_cast(raw_data.data()), data_size); - // Then read from the buffer based on the raw data + // Then read from the buffer based on the raw data. The buffer size is min(data.size(), checksum_frame_size) return CompressedReadBufferFromFileBuilder::build( std::move(raw_data), file_path, - reader.dmfile->getConfiguration()->getChecksumFrameLength(), reader.dmfile->getConfiguration()->getChecksumAlgorithm(), reader.dmfile->getConfiguration()->getChecksumFrameLength()); } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.h b/dbms/src/Storages/DeltaMerge/File/DMFile.h index ab8d74f1f62..14e3d688da3 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.h @@ -358,6 +358,7 @@ class DMFile : private boost::noncopyable friend class DMFileLocalIndexWriter; friend class DMFileReader; friend class MarkLoader; + friend class MinMaxIndexLoader; friend class ColumnReadStream; friend class DMFilePackFilter; friend class DMFileBlockInputStreamBuilder; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp index 2c022743fb4..4f421a3dbe8 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -198,100 +199,153 @@ void DMFilePackFilter::loadIndex( indexes.emplace(col_id, RSIndex(type, minmax_index)); } -std::pair DMFilePackFilter::loadIndex( - const DMFile & dmfile, - const FileProviderPtr & file_provider, - const MinMaxIndexCachePtr & index_cache, - bool set_cache_if_miss, - ColId col_id, - const ReadLimiterPtr & read_limiter, - const ScanContextPtr & scan_context) +class MinMaxIndexLoader { - const auto & type = dmfile.getColumnStat(col_id).type; - const auto file_name_base = DMFile::getFileNameBase(col_id); - - auto load = [&]() { +public: + // Make the instance of `MinMaxIndexLoader` as a callable object that is used in + // `index_cache->getOrSet(...)`. + MinMaxIndexPtr operator()() + { + const auto & type = dmfile.getColumnStat(col_id).type; auto index_file_size = dmfile.colIndexSize(col_id); if (index_file_size == 0) return std::make_shared(*type); + auto index_guard = S3::S3RandomAccessFile::setReadFileInfo({ .size = dmfile.getReadFileSize(col_id, colIndexFileName(file_name_base)), .scan_context = scan_context, }); - if (!dmfile.getConfiguration()) // v1 + + if (likely(dmfile.useMetaV2())) { - auto index_buf = ReadBufferFromRandomAccessFileBuilder::build( - file_provider, - dmfile.colIndexPath(file_name_base), - dmfile.encryptionIndexPath(file_name_base), - std::min(static_cast(DBMS_DEFAULT_BUFFER_SIZE), index_file_size), - read_limiter); - return MinMaxIndex::read(*type, index_buf, index_file_size); + // the min-max index is merged into metav2 + return loadMinMaxIndexFromMetav2(type, index_file_size); } - else if (dmfile.useMetaV2()) // v3 + else if (unlikely(!dmfile.getConfiguration())) { - const auto * dmfile_meta = typeid_cast(dmfile.meta.get()); - assert(dmfile_meta != nullptr); - auto info = dmfile_meta->merged_sub_file_infos.find(colIndexFileName(file_name_base)); - if (info == dmfile_meta->merged_sub_file_infos.end()) - { - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Unknown index file {}", - dmfile.colIndexPath(file_name_base)); - } - - auto file_path = dmfile.meta->mergedPath(info->second.number); - auto encrypt_path = dmfile_meta->encryptionMergedPath(info->second.number); - auto offset = info->second.offset; - auto data_size = info->second.size; - - auto buffer = ReadBufferFromRandomAccessFileBuilder::build( - file_provider, - file_path, - encrypt_path, - dmfile.getConfiguration()->getChecksumFrameLength(), - read_limiter); - buffer.seek(offset); + // without checksum, simply load the raw bytes from file + return loadRawMinMaxIndex(type, index_file_size); + } + else + { + // checksum is enabled but not merged into meta v2 + return loadMinMaxIndexWithChecksum(type, index_file_size); + } + } - String raw_data; - raw_data.resize(data_size); +public: + MinMaxIndexLoader( + const DMFile & dmfile_, + const FileProviderPtr & file_provider_, + ColId col_id_, + const ReadLimiterPtr & read_limiter_, + const ScanContextPtr & scan_context_) + : dmfile(dmfile_) + , file_name_base(DMFile::getFileNameBase(col_id_)) + , col_id(col_id_) + , file_provider(file_provider_) + , read_limiter(read_limiter_) + , scan_context(scan_context_) + {} + + const DMFile & dmfile; + const FileNameBase file_name_base; + ColId col_id; + FileProviderPtr file_provider; + ReadLimiterPtr read_limiter; + ScanContextPtr scan_context; + +private: + MinMaxIndexPtr loadRawMinMaxIndex(const DataTypePtr & type, size_t index_file_size) const + { + auto index_buf = ReadBufferFromRandomAccessFileBuilder::build( + file_provider, + dmfile.colIndexPath(file_name_base), + dmfile.encryptionIndexPath(file_name_base), + std::min(static_cast(DBMS_DEFAULT_BUFFER_SIZE), index_file_size), + read_limiter); + return MinMaxIndex::read(*type, index_buf, index_file_size); + } - buffer.read(reinterpret_cast(raw_data.data()), data_size); + MinMaxIndexPtr loadMinMaxIndexWithChecksum(const DataTypePtr & type, size_t index_file_size) const + { + auto index_buf = ChecksumReadBufferBuilder::build( + file_provider, + dmfile.colIndexPath(file_name_base), + dmfile.encryptionIndexPath(file_name_base), + index_file_size, + read_limiter, + dmfile.getConfiguration()->getChecksumAlgorithm(), + dmfile.getConfiguration()->getChecksumFrameLength()); + auto header_size = dmfile.getConfiguration()->getChecksumHeaderLength(); + auto frame_total_size = dmfile.getConfiguration()->getChecksumFrameLength() + header_size; + auto frame_count = index_file_size / frame_total_size + (index_file_size % frame_total_size != 0); + return MinMaxIndex::read(*type, *index_buf, index_file_size - header_size * frame_count); + } - auto buf = ChecksumReadBufferBuilder::build( - std::move(raw_data), - file_path, // just for debug, the buffer is part of the merged file - dmfile.getConfiguration()->getChecksumFrameLength(), - dmfile.getConfiguration()->getChecksumAlgorithm(), - dmfile.getConfiguration()->getChecksumFrameLength()); + MinMaxIndexPtr loadMinMaxIndexFromMetav2(const DataTypePtr & type, size_t index_file_size) const + { + const auto * dmfile_meta = typeid_cast(dmfile.meta.get()); + assert(dmfile_meta != nullptr); + const auto col_index_fname = colIndexFileName(file_name_base); + auto info_iter = dmfile_meta->merged_sub_file_infos.find(col_index_fname); + RUNTIME_CHECK_MSG( + info_iter != dmfile_meta->merged_sub_file_infos.end(), + "Unknown index file, dmfile_path={} index_fname={}", + dmfile.parentPath(), + col_index_fname); + + const auto & merged_file_info = info_iter->second; + const auto file_path = dmfile.meta->mergedPath(merged_file_info.number); + const auto offset = merged_file_info.offset; + const auto data_size = merged_file_info.size; + + // First, read from merged file to get the raw data(contains the header) + // Note that we use min(`data_size`, checksum_frame_size) as the size of buffer size in order + // to minimize read amplification in the merged file. + auto buffer = ReadBufferFromRandomAccessFileBuilder::build( + file_provider, + file_path, + dmfile_meta->encryptionMergedPath(merged_file_info.number), + std::min(data_size, dmfile.getConfiguration()->getChecksumFrameLength()), + read_limiter); + buffer.seek(offset); + + String raw_data(data_size, '\0'); + buffer.read(reinterpret_cast(raw_data.data()), data_size); + + // Then read from the buffer based on the raw data. The buffer size is min(data.size(), checksum_frame_size) + auto buf = ChecksumReadBufferBuilder::build( + std::move(raw_data), + file_path, + dmfile.getConfiguration()->getChecksumAlgorithm(), + dmfile.getConfiguration()->getChecksumFrameLength()); + + auto header_size = dmfile.getConfiguration()->getChecksumHeaderLength(); + auto frame_total_size = dmfile.getConfiguration()->getChecksumFrameLength() + header_size; + auto frame_count = index_file_size / frame_total_size + (index_file_size % frame_total_size != 0); + + return MinMaxIndex::read(*type, *buf, index_file_size - header_size * frame_count); + } +}; - auto header_size = dmfile.getConfiguration()->getChecksumHeaderLength(); - auto frame_total_size = dmfile.getConfiguration()->getChecksumFrameLength() + header_size; - auto frame_count = index_file_size / frame_total_size + (index_file_size % frame_total_size != 0); +std::pair DMFilePackFilter::loadIndex( + const DMFile & dmfile, + const FileProviderPtr & file_provider, + const MinMaxIndexCachePtr & index_cache, + bool set_cache_if_miss, + ColId col_id, + const ReadLimiterPtr & read_limiter, + const ScanContextPtr & scan_context) +{ + const auto & type = dmfile.getColumnStat(col_id).type; + const auto file_name_base = DMFile::getFileNameBase(col_id); - return MinMaxIndex::read(*type, *buf, index_file_size - header_size * frame_count); - } - else - { // v2 - auto index_buf = ChecksumReadBufferBuilder::build( - file_provider, - dmfile.colIndexPath(file_name_base), - dmfile.encryptionIndexPath(file_name_base), - index_file_size, - read_limiter, - dmfile.getConfiguration()->getChecksumAlgorithm(), - dmfile.getConfiguration()->getChecksumFrameLength()); - auto header_size = dmfile.getConfiguration()->getChecksumHeaderLength(); - auto frame_total_size = dmfile.getConfiguration()->getChecksumFrameLength() + header_size; - auto frame_count = index_file_size / frame_total_size + (index_file_size % frame_total_size != 0); - return MinMaxIndex::read(*type, *index_buf, index_file_size - header_size * frame_count); - } - }; MinMaxIndexPtr minmax_index; if (index_cache && set_cache_if_miss) { - minmax_index = index_cache->getOrSet(dmfile.colIndexCacheKey(file_name_base), load); + auto loader = MinMaxIndexLoader(dmfile, file_provider, col_id, read_limiter, scan_context); + minmax_index = index_cache->getOrSet(dmfile.colIndexCacheKey(file_name_base), loader); } else { @@ -299,7 +353,7 @@ std::pair DMFilePackFilter::loadIndex( if (index_cache) minmax_index = index_cache->get(dmfile.colIndexCacheKey(file_name_base)); if (minmax_index == nullptr) - minmax_index = load(); + minmax_index = MinMaxIndexLoader(dmfile, file_provider, col_id, read_limiter, scan_context)(); } return {type, minmax_index}; } @@ -502,7 +556,7 @@ std::pair, DMFilePackFilterResults> DMFileP { // `not_clean > 0` means there are more than one version for some rowkeys in this pack // `pack.max_version > start_ts` means some rows will be filtered by MVCC reading - // We need to read this pack to do delte merge, RowKey or MVCC filter. + // We need to read this pack to do delta merge, RowKey or MVCC filter. continue; }