diff --git a/include/paimon/defs.h b/include/paimon/defs.h index 9443d93a3..73fd43d1e 100644 --- a/include/paimon/defs.h +++ b/include/paimon/defs.h @@ -178,6 +178,27 @@ struct PAIMON_EXPORT Options { /// on-disk file. The default value is 256 mb static const char WRITE_BUFFER_SIZE[]; + /// "write-buffer-spillable" - Whether the write buffer can be spillable. Default value is true. + static const char WRITE_BUFFER_SPILLABLE[]; + + /// "write-buffer-spill.max-disk-size" - The max disk to use for write buffer spill. This only + /// works when the write buffer spill is enabled. Default value is unlimited. + static const char WRITE_BUFFER_SPILL_MAX_DISK_SIZE[]; + + /// "local-sort.max-num-file-handles" - The maximal fan-in for external merge sort. It limits + /// the number of file handles. If it is too small, may cause intermediate merging. But if it is + /// too large, it will cause too many files opened at the same time, consume memory and lead to + /// random reading. Default value is 128. + static const char LOCAL_SORT_MAX_NUM_FILE_HANDLES[]; + + /// "spill-compression" - Compression for spill. Default value is zstd. + static const char SPILL_COMPRESSION[]; + + /// "spill-compression.zstd-level" - Default spill compression zstd level. For higher + /// compression rates, it can be configured to 9, but the read and write speed will + /// significantly decrease. Default value is 1. + static const char SPILL_COMPRESSION_ZSTD_LEVEL[]; + /// "snapshot.num-retained.min" - The minimum number of completed snapshots to retain. Should be /// greater than or equal to 1. Default value is 10 static const char SNAPSHOT_NUM_RETAINED_MIN[]; @@ -417,10 +438,6 @@ struct PAIMON_EXPORT Options { /// lz4 are supported. Default value is zstd. /// Noted that java paimon also supports lzo which paimon-cpp does not support for now. static const char LOOKUP_CACHE_SPILL_COMPRESSION[]; - /// "spill-compression.zstd-level" - Default spill compression zstd level. For higher - /// compression rates, it can be configured to 9, but the read and write speed will - /// significantly decrease. Default value is 1. - static const char SPILL_COMPRESSION_ZSTD_LEVEL[]; /// "cache-page-size" - Memory page size for caching. Default value is 64 kb. static const char CACHE_PAGE_SIZE[]; /// "file.format.per.level" - Define different file format for different level, you can add the diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index 1a5737160..1301169b5 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -234,6 +234,7 @@ set(PAIMON_CORE_SRCS core/manifest/manifest_list.cpp core/manifest/partition_entry.cpp core/manifest/index_manifest_file_handler.cpp + core/memory/writer_memory_manager.cpp core/mergetree/compact/universal_compaction.cpp core/mergetree/compact/early_full_compaction.cpp core/mergetree/compact/aggregate/aggregate_merge_function.cpp @@ -251,6 +252,7 @@ set(PAIMON_CORE_SRCS core/mergetree/compact/changelog_merge_tree_rewriter.cpp core/mergetree/merge_tree_writer.cpp core/mergetree/in_memory_sort_buffer.cpp + core/mergetree/external_sort_buffer.cpp core/mergetree/write_buffer.cpp core/mergetree/levels.cpp core/mergetree/lookup_file.cpp @@ -594,6 +596,7 @@ if(PAIMON_BUILD_TESTS) core/manifest/file_entry_test.cpp core/manifest/index_manifest_entry_serializer_test.cpp core/manifest/index_manifest_file_handler_test.cpp + core/memory/writer_memory_manager_test.cpp core/mergetree/levels_test.cpp core/mergetree/lookup_file_test.cpp core/mergetree/lookup_levels_test.cpp @@ -633,6 +636,7 @@ if(PAIMON_BUILD_TESTS) core/mergetree/drop_delete_reader_test.cpp core/mergetree/merge_tree_writer_test.cpp core/mergetree/write_buffer_test.cpp + core/mergetree/sort_buffer_test.cpp core/mergetree/sorted_run_test.cpp core/mergetree/spill_channel_manager_test.cpp core/mergetree/spill_reader_writer_test.cpp diff --git a/src/paimon/common/defs.cpp b/src/paimon/common/defs.cpp index 3a1ee786d..11fba2a17 100644 --- a/src/paimon/common/defs.cpp +++ b/src/paimon/common/defs.cpp @@ -50,6 +50,11 @@ const char Options::SCAN_MODE[] = "scan.mode"; const char Options::READ_BATCH_SIZE[] = "read.batch-size"; const char Options::WRITE_BATCH_SIZE[] = "write.batch-size"; const char Options::WRITE_BUFFER_SIZE[] = "write-buffer-size"; +const char Options::WRITE_BUFFER_SPILLABLE[] = "write-buffer-spillable"; +const char Options::WRITE_BUFFER_SPILL_MAX_DISK_SIZE[] = "write-buffer-spill.max-disk-size"; +const char Options::LOCAL_SORT_MAX_NUM_FILE_HANDLES[] = "local-sort.max-num-file-handles"; +const char Options::SPILL_COMPRESSION[] = "spill-compression"; +const char Options::SPILL_COMPRESSION_ZSTD_LEVEL[] = "spill-compression.zstd-level"; const char Options::SNAPSHOT_NUM_RETAINED_MIN[] = "snapshot.num-retained.min"; const char Options::SNAPSHOT_NUM_RETAINED_MAX[] = "snapshot.num-retained.max"; const char Options::SNAPSHOT_TIME_RETAINED[] = "snapshot.time-retained"; @@ -106,7 +111,6 @@ const char Options::LOOKUP_CACHE_BLOOM_FILTER_FPP[] = "lookup.cache.bloom.filter const char Options::LOOKUP_REMOTE_FILE_ENABLED[] = "lookup.remote-file.enabled"; const char Options::LOOKUP_REMOTE_LEVEL_THRESHOLD[] = "lookup.remote-file.level-threshold"; const char Options::LOOKUP_CACHE_SPILL_COMPRESSION[] = "lookup.cache-spill-compression"; -const char Options::SPILL_COMPRESSION_ZSTD_LEVEL[] = "spill-compression.zstd-level"; const char Options::CACHE_PAGE_SIZE[] = "cache-page-size"; const char Options::FILE_FORMAT_PER_LEVEL[] = "file.format.per.level"; const char Options::FILE_COMPRESSION_PER_LEVEL[] = "file.compression.per.level"; diff --git a/src/paimon/common/utils/string_utils_test.cpp b/src/paimon/common/utils/string_utils_test.cpp index 6fc777b46..b4190d309 100644 --- a/src/paimon/common/utils/string_utils_test.cpp +++ b/src/paimon/common/utils/string_utils_test.cpp @@ -327,6 +327,12 @@ TEST_F(StringUtilsTest, TestSplit) { StringUtils::Split("key1=value1//key3=value3", std::string("/"), std::string("=")); ASSERT_EQ(expect, result); } + { + std::vector> expect = {}; + std::vector> result = + StringUtils::Split("", std::string("/"), std::string("=")); + ASSERT_EQ(expect, result); + } } TEST_F(StringUtilsTest, TestStringToValueSimple) { diff --git a/src/paimon/core/core_options.cpp b/src/paimon/core/core_options.cpp index 11d6fdfd8..b00f9a99f 100644 --- a/src/paimon/core/core_options.cpp +++ b/src/paimon/core/core_options.cpp @@ -97,20 +97,26 @@ class ConfigParser { return Status::OK(); // Return success even if the configuration does not exist } - // Parse string configurations - Status ParseString(const std::string& key, std::string* value) const { + // Parse memory size configurations + template + Status ParseMemorySize(const std::string& key, T* value) const { + static_assert(std::is_same_v || std::is_same_v>, + "ParseMemorySize only supports int64_t and std::optional"); auto iter = config_map_.find(key); if (iter != config_map_.end()) { - *value = iter->second; + PAIMON_ASSIGN_OR_RAISE(*value, MemorySize::ParseBytes(iter->second)); } return Status::OK(); } - // Parse memory size configurations - Status ParseMemorySize(const std::string& key, int64_t* value) const { + // Parse time duration configurations + template + Status ParseTimeDuration(const std::string& key, T* value) const { + static_assert(std::is_same_v || std::is_same_v>, + "ParseTimeDuration only supports int64_t and std::optional"); auto iter = config_map_.find(key); if (iter != config_map_.end()) { - PAIMON_ASSIGN_OR_RAISE(*value, MemorySize::ParseBytes(iter->second)); + PAIMON_ASSIGN_OR_RAISE(*value, TimeDuration::Parse(iter->second)); } return Status::OK(); } @@ -288,28 +294,25 @@ class ConfigParser { std::map>* file_format_per_level_ptr) const { auto& file_format_per_level = *file_format_per_level_ptr; std::string file_format_per_level_str; - PAIMON_RETURN_NOT_OK( - ParseString(Options::FILE_FORMAT_PER_LEVEL, &file_format_per_level_str)); - if (!file_format_per_level_str.empty()) { - auto level2format = - StringUtils::Split(file_format_per_level_str, std::string(","), std::string(":")); - for (const auto& single_level : level2format) { - if (single_level.size() != 2) { - return Status::Invalid(fmt::format( - "fail to parse key {}, value {} (usage example: 0:avro,3:parquet)", - Options::FILE_FORMAT_PER_LEVEL, file_format_per_level_str)); - } - auto level = StringUtils::StringToValue(single_level[0]); - if (!level || level.value() < 0) { - return Status::Invalid( - fmt::format("fail to parse level {} from string to int in {}", - single_level[0], Options::FILE_FORMAT_PER_LEVEL)); - } - std::shared_ptr file_format; - PAIMON_RETURN_NOT_OK(ParseObject( - "_no_use", /*default_identifier=*/single_level[1], &file_format)); - file_format_per_level[level.value()] = file_format; + PAIMON_RETURN_NOT_OK(Parse(Options::FILE_FORMAT_PER_LEVEL, &file_format_per_level_str)); + auto level2format = + StringUtils::Split(file_format_per_level_str, std::string(","), std::string(":")); + for (const auto& single_level : level2format) { + if (single_level.size() != 2) { + return Status::Invalid( + fmt::format("fail to parse key {}, value {} (usage example: 0:avro,3:parquet)", + Options::FILE_FORMAT_PER_LEVEL, file_format_per_level_str)); } + auto level = StringUtils::StringToValue(single_level[0]); + if (!level || level.value() < 0) { + return Status::Invalid( + fmt::format("fail to parse level {} from string to int in {}", single_level[0], + Options::FILE_FORMAT_PER_LEVEL)); + } + std::shared_ptr file_format; + PAIMON_RETURN_NOT_OK(ParseObject( + "_no_use", /*default_identifier=*/single_level[1], &file_format)); + file_format_per_level[level.value()] = file_format; } return Status::OK(); } @@ -320,24 +323,22 @@ class ConfigParser { auto& file_compression_per_level = *file_compression_per_level_ptr; std::string file_compression_per_level_str; PAIMON_RETURN_NOT_OK( - ParseString(Options::FILE_COMPRESSION_PER_LEVEL, &file_compression_per_level_str)); - if (!file_compression_per_level_str.empty()) { - auto level2compression = StringUtils::Split(file_compression_per_level_str, - std::string(","), std::string(":")); - for (const auto& single_level : level2compression) { - if (single_level.size() != 2) { - return Status::Invalid(fmt::format( - "fail to parse key {}, value {} (usage example: 0:lz4,1:zstd)", - Options::FILE_COMPRESSION_PER_LEVEL, file_compression_per_level_str)); - } - auto level = StringUtils::StringToValue(single_level[0]); - if (!level || level.value() < 0) { - return Status::Invalid( - fmt::format("fail to parse level {} from string to int in {}", - single_level[0], Options::FILE_COMPRESSION_PER_LEVEL)); - } - file_compression_per_level[level.value()] = single_level[1]; + Parse(Options::FILE_COMPRESSION_PER_LEVEL, &file_compression_per_level_str)); + auto level2compression = + StringUtils::Split(file_compression_per_level_str, std::string(","), std::string(":")); + for (const auto& single_level : level2compression) { + if (single_level.size() != 2) { + return Status::Invalid(fmt::format( + "fail to parse key {}, value {} (usage example: 0:lz4,1:zstd)", + Options::FILE_COMPRESSION_PER_LEVEL, file_compression_per_level_str)); } + auto level = StringUtils::StringToValue(single_level[0]); + if (!level || level.value() < 0) { + return Status::Invalid( + fmt::format("fail to parse level {} from string to int in {}", single_level[0], + Options::FILE_COMPRESSION_PER_LEVEL)); + } + file_compression_per_level[level.value()] = single_level[1]; } return Status::OK(); } @@ -393,6 +394,7 @@ struct CoreOptions::Impl { int32_t manifest_merge_min_count = 30; int32_t read_batch_size = 1024; int32_t write_batch_size = 1024; + int32_t local_sort_max_num_file_handles = 128; int32_t commit_max_retries = 10; int32_t compaction_min_file_num = 5; int32_t compaction_max_size_amplification_percent = 200; @@ -411,8 +413,10 @@ struct CoreOptions::Impl { BucketFunctionType bucket_function_type = BucketFunctionType::DEFAULT; int32_t file_compression_zstd_level = 1; + int64_t write_buffer_spill_max_disk_size = std::numeric_limits::max(); bool ignore_delete = false; + bool write_buffer_spillable = true; bool write_only = false; bool deletion_vectors_enabled = false; bool deletion_vectors_bitmap64 = false; @@ -444,6 +448,7 @@ struct CoreOptions::Impl { bool lookup_remote_file_enabled = false; int32_t lookup_remote_level_threshold = INT32_MIN; CompressOptions lookup_compress_options{"zstd", 1}; + CompressOptions spill_compress_options{"zstd", 1}; int64_t cache_page_size = 64 * 1024; // 64KB std::map> file_format_per_level; std::map file_compression_per_level; @@ -460,23 +465,14 @@ struct CoreOptions::Impl { PAIMON_RETURN_NOT_OK(parser.Parse(Options::BUCKET, &bucket)); // Parse partition.default-name - default partition name for null/empty partition values PAIMON_RETURN_NOT_OK( - parser.ParseString(Options::PARTITION_DEFAULT_NAME, &partition_default_name)); + parser.Parse(Options::PARTITION_DEFAULT_NAME, &partition_default_name)); // Parse page-size - memory page size, default 64 kb PAIMON_RETURN_NOT_OK(parser.ParseMemorySize(Options::PAGE_SIZE, &page_size)); // Parse target-file-size - target size of a data file - if (parser.ContainsKey(Options::TARGET_FILE_SIZE)) { - int64_t parsed_target_file_size; - PAIMON_RETURN_NOT_OK( - parser.ParseMemorySize(Options::TARGET_FILE_SIZE, &parsed_target_file_size)); - target_file_size = parsed_target_file_size; - } + PAIMON_RETURN_NOT_OK(parser.ParseMemorySize(Options::TARGET_FILE_SIZE, &target_file_size)); // Parse blob.target-file-size - target size of a blob file - if (parser.ContainsKey(Options::BLOB_TARGET_FILE_SIZE)) { - int64_t parsed_blob_target_file_size; - PAIMON_RETURN_NOT_OK(parser.ParseMemorySize(Options::BLOB_TARGET_FILE_SIZE, - &parsed_blob_target_file_size)); - blob_target_file_size = parsed_blob_target_file_size; - } + PAIMON_RETURN_NOT_OK( + parser.ParseMemorySize(Options::BLOB_TARGET_FILE_SIZE, &blob_target_file_size)); // Parse source.split.target-size - target size of a source split when scanning a bucket PAIMON_RETURN_NOT_OK( parser.ParseMemorySize(Options::SOURCE_SPLIT_TARGET_SIZE, &source_split_target_size)); @@ -490,6 +486,21 @@ struct CoreOptions::Impl { // Parse write-buffer-size - data to build up in memory before flushing to disk PAIMON_RETURN_NOT_OK( parser.ParseMemorySize(Options::WRITE_BUFFER_SIZE, &write_buffer_size)); + // Parse write-buffer-spillable - whether write buffer may spill to disk + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::WRITE_BUFFER_SPILLABLE, &write_buffer_spillable)); + // Parse write-buffer-spill.max-disk-size - max disk size for spill files + PAIMON_RETURN_NOT_OK(parser.ParseMemorySize(Options::WRITE_BUFFER_SPILL_MAX_DISK_SIZE, + &write_buffer_spill_max_disk_size)); + // Parse local-sort.max-num-file-handles - spill file handle cap for local merge + PAIMON_RETURN_NOT_OK(parser.Parse(Options::LOCAL_SORT_MAX_NUM_FILE_HANDLES, + &local_sort_max_num_file_handles)); + // Parse spill-compression - compression codec for spill files + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::SPILL_COMPRESSION, &spill_compress_options.compress)); + // Parse spill-compression.zstd-level - zstd level for spill compression, default 1 + PAIMON_RETURN_NOT_OK(parser.Parse(Options::SPILL_COMPRESSION_ZSTD_LEVEL, + &(spill_compress_options.zstd_level))); // Parse file-system - file system type, default "local" PAIMON_RETURN_NOT_OK(parser.ParseFileSystem(fs_scheme_to_identifier_map, specified_file_system, &file_system)); @@ -502,16 +513,12 @@ struct CoreOptions::Impl { PAIMON_RETURN_NOT_OK(parser.Parse("test.enable-adaptive-prefetch-strategy", &enable_adaptive_prefetch_strategy)); // Parse data-file.external-paths - external paths for data files, comma separated - std::string parsed_external_paths; PAIMON_RETURN_NOT_OK( - parser.ParseString(Options::DATA_FILE_EXTERNAL_PATHS, &parsed_external_paths)); - if (!parsed_external_paths.empty()) { - data_file_external_paths = parsed_external_paths; - } + parser.Parse(Options::DATA_FILE_EXTERNAL_PATHS, &data_file_external_paths)); // Parse data-file.external-paths.strategy - strategy for selecting external path PAIMON_RETURN_NOT_OK(parser.ParseExternalPathStrategy(&external_path_strategy)); // Parse data-file.prefix - file name prefix of data files, default "data-" - PAIMON_RETURN_NOT_OK(parser.ParseString(Options::DATA_FILE_PREFIX, &data_file_prefix)); + PAIMON_RETURN_NOT_OK(parser.Parse(Options::DATA_FILE_PREFIX, &data_file_prefix)); // Parse row-tracking.enabled - whether to enable unique row id for append table PAIMON_RETURN_NOT_OK( parser.Parse(Options::ROW_TRACKING_ENABLED, &row_tracking_enabled)); @@ -529,7 +536,7 @@ struct CoreOptions::Impl { PAIMON_RETURN_NOT_OK(parser.ParseObject( Options::FILE_FORMAT, /*default_identifier=*/"parquet", &file_format)); // Parse file.compression - default file compression, default "zstd" - PAIMON_RETURN_NOT_OK(parser.ParseString(Options::FILE_COMPRESSION, &file_compression)); + PAIMON_RETURN_NOT_OK(parser.Parse(Options::FILE_COMPRESSION, &file_compression)); // Parse file.compression.zstd-level - zstd compression level, default 1 PAIMON_RETURN_NOT_OK( parser.Parse(Options::FILE_COMPRESSION_ZSTD_LEVEL, &file_compression_zstd_level)); @@ -546,8 +553,7 @@ struct CoreOptions::Impl { PAIMON_RETURN_NOT_OK(parser.ParseObject( Options::MANIFEST_FORMAT, /*default_identifier=*/"avro", &manifest_file_format)); // Parse manifest.compression - manifest file compression, default "zstd" - PAIMON_RETURN_NOT_OK( - parser.ParseString(Options::MANIFEST_COMPRESSION, &manifest_compression)); + PAIMON_RETURN_NOT_OK(parser.Parse(Options::MANIFEST_COMPRESSION, &manifest_compression)); // Parse manifest.target-file-size - suggested manifest file size, default 8MB PAIMON_RETURN_NOT_OK( parser.ParseMemorySize(Options::MANIFEST_TARGET_FILE_SIZE, &manifest_target_file_size)); @@ -568,17 +574,15 @@ struct CoreOptions::Impl { int32_t snapshot_num_retain_max = std::numeric_limits::max(); // Parse snapshot.expire.limit - maximum snapshots allowed to expire at a time, default 10 int32_t snapshot_expire_limit = 10; + // Parse snapshot.time-retained - maximum time of completed snapshots to retain + int64_t snapshot_time_retained = 1 * 3600 * 1000; // 1 hour PAIMON_RETURN_NOT_OK( parser.Parse(Options::SNAPSHOT_NUM_RETAINED_MIN, &snapshot_num_retain_min)); PAIMON_RETURN_NOT_OK( parser.Parse(Options::SNAPSHOT_NUM_RETAINED_MAX, &snapshot_num_retain_max)); PAIMON_RETURN_NOT_OK(parser.Parse(Options::SNAPSHOT_EXPIRE_LIMIT, &snapshot_expire_limit)); - // Parse snapshot.time-retained - maximum time of completed snapshots to retain - std::string snapshot_time_retained_str = "1 hour"; PAIMON_RETURN_NOT_OK( - parser.ParseString(Options::SNAPSHOT_TIME_RETAINED, &snapshot_time_retained_str)); - PAIMON_ASSIGN_OR_RAISE(int64_t snapshot_time_retained, - TimeDuration::Parse(snapshot_time_retained_str)); + parser.ParseTimeDuration(Options::SNAPSHOT_TIME_RETAINED, &snapshot_time_retained)); // Parse snapshot.clean-empty-directories - whether to clean empty dirs on expiration bool snapshot_clean_empty_directories = false; PAIMON_RETURN_NOT_OK(parser.Parse(Options::SNAPSHOT_CLEAN_EMPTY_DIRECTORIES, @@ -595,11 +599,7 @@ struct CoreOptions::Impl { PAIMON_RETURN_NOT_OK( parser.Parse(Options::COMMIT_FORCE_COMPACT, &commit_force_compact)); // Parse commit.timeout - timeout duration of retry when commit failed - std::string commit_timeout_str; - PAIMON_RETURN_NOT_OK(parser.ParseString(Options::COMMIT_TIMEOUT, &commit_timeout_str)); - if (!commit_timeout_str.empty()) { - PAIMON_ASSIGN_OR_RAISE(commit_timeout, TimeDuration::Parse(commit_timeout_str)); - } + PAIMON_RETURN_NOT_OK(parser.ParseTimeDuration(Options::COMMIT_TIMEOUT, &commit_timeout)); // Parse commit.max-retries - maximum retries when commit failed, default 10 PAIMON_RETURN_NOT_OK(parser.Parse(Options::COMMIT_MAX_RETRIES, &commit_max_retries)); return Status::OK(); @@ -619,12 +619,7 @@ struct CoreOptions::Impl { // Parse ignore-delete - whether to ignore delete records, default false PAIMON_RETURN_NOT_OK(parser.Parse(Options::IGNORE_DELETE, &ignore_delete)); // Parse fields.default-aggregate-function - default agg function for partial-update - std::string parsed_default_func; - PAIMON_RETURN_NOT_OK( - parser.ParseString(Options::FIELDS_DEFAULT_AGG_FUNC, &parsed_default_func)); - if (!parsed_default_func.empty()) { - field_default_func = parsed_default_func; - } + PAIMON_RETURN_NOT_OK(parser.Parse(Options::FIELDS_DEFAULT_AGG_FUNC, &field_default_func)); // Parse changelog-producer - whether to double write to a changelog file, default "none" PAIMON_RETURN_NOT_OK(parser.ParseChangelogProducer(&changelog_producer)); // Parse partial-update.remove-record-on-delete - remove whole row on delete @@ -660,7 +655,7 @@ struct CoreOptions::Impl { PAIMON_RETURN_NOT_OK(parser.Parse(Options::SCAN_SNAPSHOT_ID, &scan_snapshot_id)); // Parse scan.timestamp-millis and scan.timestamp std::string scan_timestamp_str; - PAIMON_RETURN_NOT_OK(parser.ParseString(Options::SCAN_TIMESTAMP, &scan_timestamp_str)); + PAIMON_RETURN_NOT_OK(parser.Parse(Options::SCAN_TIMESTAMP, &scan_timestamp_str)); PAIMON_RETURN_NOT_OK(parser.Parse(Options::SCAN_TIMESTAMP_MILLIS, &scan_timestamp_millis)); if (scan_timestamp_millis != std::nullopt && !scan_timestamp_str.empty()) { return Status::Invalid( @@ -674,20 +669,11 @@ struct CoreOptions::Impl { // Parse scan.mode - scanning behavior of the source, default "default" PAIMON_RETURN_NOT_OK(parser.ParseStartupMode(&startup_mode)); // Parse scan.fallback-branch - fallback branch when partition not found - std::string parsed_fallback_branch; - PAIMON_RETURN_NOT_OK( - parser.ParseString(Options::SCAN_FALLBACK_BRANCH, &parsed_fallback_branch)); - if (!parsed_fallback_branch.empty()) { - scan_fallback_branch = parsed_fallback_branch; - } + PAIMON_RETURN_NOT_OK(parser.Parse(Options::SCAN_FALLBACK_BRANCH, &scan_fallback_branch)); // Parse branch - branch name, default "main" - PAIMON_RETURN_NOT_OK(parser.ParseString(Options::BRANCH, &branch)); + PAIMON_RETURN_NOT_OK(parser.Parse(Options::BRANCH, &branch)); // Parse scan.tag-name - optional tag name for "from-snapshot" scan mode - std::string parsed_tag_name; - PAIMON_RETURN_NOT_OK(parser.ParseString(Options::SCAN_TAG_NAME, &parsed_tag_name)); - if (!parsed_tag_name.empty()) { - scan_tag_name = parsed_tag_name; - } + PAIMON_RETURN_NOT_OK(parser.Parse(Options::SCAN_TAG_NAME, &scan_tag_name)); return Status::OK(); } @@ -703,12 +689,8 @@ struct CoreOptions::Impl { PAIMON_RETURN_NOT_OK( parser.Parse(Options::GLOBAL_INDEX_ENABLED, &global_index_enabled)); // Parse global-index.external-path - global index root directory - std::string parsed_global_index_external_path; - PAIMON_RETURN_NOT_OK(parser.ParseString(Options::GLOBAL_INDEX_EXTERNAL_PATH, - &parsed_global_index_external_path)); - if (!parsed_global_index_external_path.empty()) { - global_index_external_path = parsed_global_index_external_path; - } + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::GLOBAL_INDEX_EXTERNAL_PATH, &global_index_external_path)); return Status::OK(); } @@ -739,31 +721,15 @@ struct CoreOptions::Impl { PAIMON_RETURN_NOT_OK( parser.Parse(Options::COMPACTION_FORCE_UP_LEVEL_0, &compaction_force_up_level_0)); // Parse compaction.optimization-interval - how often to perform optimization compaction - std::string optimized_compaction_interval_str; - PAIMON_RETURN_NOT_OK(parser.ParseString(Options::COMPACTION_OPTIMIZATION_INTERVAL, - &optimized_compaction_interval_str)); - if (!optimized_compaction_interval_str.empty()) { - PAIMON_ASSIGN_OR_RAISE(optimized_compaction_interval, - TimeDuration::Parse(optimized_compaction_interval_str)); - } + PAIMON_RETURN_NOT_OK(parser.ParseTimeDuration(Options::COMPACTION_OPTIMIZATION_INTERVAL, + &optimized_compaction_interval)); // Parse compaction.total-size-threshold - force full compaction when total size is smaller - std::string compaction_total_size_threshold_str; - PAIMON_RETURN_NOT_OK(parser.ParseString(Options::COMPACTION_TOTAL_SIZE_THRESHOLD, - &compaction_total_size_threshold_str)); - if (!compaction_total_size_threshold_str.empty()) { - PAIMON_ASSIGN_OR_RAISE(compaction_total_size_threshold, - MemorySize::ParseBytes(compaction_total_size_threshold_str)); - } + PAIMON_RETURN_NOT_OK(parser.ParseMemorySize(Options::COMPACTION_TOTAL_SIZE_THRESHOLD, + &compaction_total_size_threshold)); // Parse compaction.incremental-size-threshold - force full compaction when incremental size // is bigger - std::string compaction_incremental_size_threshold_str; - PAIMON_RETURN_NOT_OK(parser.ParseString(Options::COMPACTION_INCREMENTAL_SIZE_THRESHOLD, - &compaction_incremental_size_threshold_str)); - if (!compaction_incremental_size_threshold_str.empty()) { - PAIMON_ASSIGN_OR_RAISE( - compaction_incremental_size_threshold, - MemorySize::ParseBytes(compaction_incremental_size_threshold_str)); - } + PAIMON_RETURN_NOT_OK(parser.ParseMemorySize(Options::COMPACTION_INCREMENTAL_SIZE_THRESHOLD, + &compaction_incremental_size_threshold)); // Parse compaction.offpeak.start.hour - start of off-peak hours (0-23), -1 to disable PAIMON_RETURN_NOT_OK( parser.Parse(Options::COMPACT_OFFPEAK_START_HOUR, &compact_off_peak_start_hour)); @@ -800,12 +766,8 @@ struct CoreOptions::Impl { PAIMON_RETURN_NOT_OK(parser.Parse(Options::LOOKUP_REMOTE_LEVEL_THRESHOLD, &lookup_remote_level_threshold)); // Parse lookup.cache-spill-compression - spill compression for lookup cache, default "zstd" - std::string lookup_compress_options_compression_str; - PAIMON_RETURN_NOT_OK(parser.ParseString(Options::LOOKUP_CACHE_SPILL_COMPRESSION, - &lookup_compress_options_compression_str)); - if (!lookup_compress_options_compression_str.empty()) { - lookup_compress_options.compress = lookup_compress_options_compression_str; - } + PAIMON_RETURN_NOT_OK(parser.Parse(Options::LOOKUP_CACHE_SPILL_COMPRESSION, + &lookup_compress_options.compress)); // Parse spill-compression.zstd-level - zstd level for spill compression, default 1 PAIMON_RETURN_NOT_OK(parser.Parse(Options::SPILL_COMPRESSION_ZSTD_LEVEL, &(lookup_compress_options.zstd_level))); @@ -824,13 +786,8 @@ struct CoreOptions::Impl { lookup_cache_high_prio_pool_ratio)); } // Parse lookup.cache-file-retention - cached files retention time, default "1 hour" - std::string lookup_cache_file_retention_str; - PAIMON_RETURN_NOT_OK(parser.ParseString(Options::LOOKUP_CACHE_FILE_RETENTION, - &lookup_cache_file_retention_str)); - if (!lookup_cache_file_retention_str.empty()) { - PAIMON_ASSIGN_OR_RAISE(lookup_cache_file_retention_ms, - TimeDuration::Parse(lookup_cache_file_retention_str)); - } + PAIMON_RETURN_NOT_OK(parser.ParseTimeDuration(Options::LOOKUP_CACHE_FILE_RETENTION, + &lookup_cache_file_retention_ms)); // Parse lookup.cache-max-disk-size - max disk size for lookup cache, default unlimited PAIMON_RETURN_NOT_OK(parser.ParseMemorySize(Options::LOOKUP_CACHE_MAX_DISK_SIZE, &lookup_cache_max_disk_size)); @@ -1000,6 +957,22 @@ int64_t CoreOptions::GetWriteBufferSize() const { return impl_->write_buffer_size; } +bool CoreOptions::GetWriteBufferSpillable() const { + return impl_->write_buffer_spillable; +} + +int64_t CoreOptions::GetWriteBufferSpillMaxDiskSize() const { + return impl_->write_buffer_spill_max_disk_size; +} + +int32_t CoreOptions::GetLocalSortMaxNumFileHandles() const { + return impl_->local_sort_max_num_file_handles; +} + +const CompressOptions& CoreOptions::GetSpillCompressOptions() const { + return impl_->spill_compress_options; +} + bool CoreOptions::CommitForceCompact() const { return impl_->commit_force_compact; } @@ -1119,14 +1092,11 @@ bool CoreOptions::EnableAdaptivePrefetchStrategy() const { Result> CoreOptions::GetFieldAggFunc( const std::string& field_name) const { ConfigParser parser(impl_->raw_options); - std::string field_agg_func = ""; + std::optional field_agg_func; std::string key = std::string(Options::FIELDS_PREFIX) + "." + field_name + "." + std::string(Options::AGG_FUNCTION); - PAIMON_RETURN_NOT_OK(parser.ParseString(key, &field_agg_func)); - if (!field_agg_func.empty()) { - return std::optional(field_agg_func); - } - return std::optional(); + PAIMON_RETURN_NOT_OK(parser.Parse(key, &field_agg_func)); + return field_agg_func; } Result CoreOptions::FieldAggIgnoreRetract(const std::string& field_name) const { @@ -1143,7 +1113,7 @@ Result CoreOptions::FieldListAggDelimiter(const std::string& field_ std::string delimiter = ","; std::string key = std::string(Options::FIELDS_PREFIX) + "." + field_name + "." + std::string(Options::LIST_AGG_DELIMITER); - PAIMON_RETURN_NOT_OK(parser.ParseString(key, &delimiter)); + PAIMON_RETURN_NOT_OK(parser.Parse(key, &delimiter)); return delimiter; } @@ -1257,7 +1227,8 @@ Result> CoreOptions::CreateExternalPaths() const { if (strategy == ExternalPathStrategy::SPECIFIC_FS) { return Status::NotImplemented("do not support specific-fs external path strategy for now"); } - if (data_file_external_paths == std::nullopt || strategy == ExternalPathStrategy::NONE) { + if (data_file_external_paths == std::nullopt || data_file_external_paths->empty() || + strategy == ExternalPathStrategy::NONE) { return external_paths; } for (const auto& p : StringUtils::Split(data_file_external_paths.value(), ",")) { @@ -1305,8 +1276,8 @@ std::optional CoreOptions::GetGlobalIndexExternalPath() const { Result> CoreOptions::CreateGlobalIndexExternalPath() const { std::optional global_index_external_path = GetGlobalIndexExternalPath(); - if (global_index_external_path == std::nullopt) { - return global_index_external_path; + if (global_index_external_path == std::nullopt || global_index_external_path->empty()) { + return std::optional(); } std::string tmp_path = global_index_external_path.value(); StringUtils::Trim(&tmp_path); diff --git a/src/paimon/core/core_options.h b/src/paimon/core/core_options.h index d878a735b..e50176a4b 100644 --- a/src/paimon/core/core_options.h +++ b/src/paimon/core/core_options.h @@ -17,7 +17,6 @@ #pragma once #include -#include #include #include #include @@ -83,6 +82,10 @@ class PAIMON_EXPORT CoreOptions { int32_t GetReadBatchSize() const; int32_t GetWriteBatchSize() const; int64_t GetWriteBufferSize() const; + bool GetWriteBufferSpillable() const; + int64_t GetWriteBufferSpillMaxDiskSize() const; + int32_t GetLocalSortMaxNumFileHandles() const; + const CompressOptions& GetSpillCompressOptions() const; const ExpireConfig& GetExpireConfig() const; diff --git a/src/paimon/core/core_options_test.cpp b/src/paimon/core/core_options_test.cpp index 2e7a3672b..c0f63f721 100644 --- a/src/paimon/core/core_options_test.cpp +++ b/src/paimon/core/core_options_test.cpp @@ -17,16 +17,13 @@ #include "paimon/core/core_options.h" #include -#include #include "gtest/gtest.h" #include "paimon/bucket/bucket_function_type.h" #include "paimon/common/fs/resolving_file_system.h" #include "paimon/core/options/expire_config.h" #include "paimon/defs.h" -#include "paimon/format/file_format.h" #include "paimon/fs/local/local_file_system.h" -#include "paimon/status.h" #include "paimon/testing/mock/mock_file_system.h" #include "paimon/testing/utils/testharness.h" #include "paimon/testing/utils/timezone_guard.h" @@ -63,6 +60,11 @@ TEST(CoreOptionsTest, TestDefaultValue) { ASSERT_EQ(1024, core_options.GetReadBatchSize()); ASSERT_EQ(1024, core_options.GetWriteBatchSize()); ASSERT_EQ(256 * 1024 * 1024, core_options.GetWriteBufferSize()); + ASSERT_TRUE(core_options.GetWriteBufferSpillable()); + ASSERT_EQ(std::numeric_limits::max(), core_options.GetWriteBufferSpillMaxDiskSize()); + ASSERT_EQ(128, core_options.GetLocalSortMaxNumFileHandles()); + ASSERT_EQ("zstd", core_options.GetSpillCompressOptions().compress); + ASSERT_EQ(1, core_options.GetSpillCompressOptions().zstd_level); ASSERT_FALSE(core_options.CommitForceCompact()); ASSERT_EQ(std::numeric_limits::max(), core_options.GetCommitTimeout()); ASSERT_EQ(10, core_options.GetCommitMaxRetries()); @@ -112,7 +114,7 @@ TEST(CoreOptionsTest, TestDefaultValue) { ASSERT_FALSE(core_options.DataEvolutionEnabled()); ASSERT_TRUE(core_options.LegacyPartitionNameEnabled()); ASSERT_TRUE(core_options.GlobalIndexEnabled()); - ASSERT_FALSE(core_options.GetGlobalIndexExternalPath()); + ASSERT_EQ(std::nullopt, core_options.GetGlobalIndexExternalPath()); ASSERT_EQ(std::nullopt, core_options.GetScanTagName()); ASSERT_EQ(std::nullopt, core_options.GetOptimizedCompactionInterval()); ASSERT_EQ(std::nullopt, core_options.GetCompactionTotalSizeThreshold()); @@ -129,6 +131,7 @@ TEST(CoreOptionsTest, TestDefaultValue) { ASSERT_EQ(1, core_options.GetCompactionSizeRatio()); ASSERT_EQ(5, core_options.GetNumSortedRunsCompactionTrigger()); ASSERT_EQ(8, core_options.GetNumSortedRunsStopTrigger()); + ASSERT_EQ(6, core_options.GetNumLevels()); ASSERT_EQ(LookupCompactMode::RADICAL, core_options.GetLookupCompactMode()); ASSERT_EQ(10, core_options.GetLookupCompactMaxInterval()); ASSERT_EQ(256 * 1024 * 1024, core_options.GetLookupCacheMaxMemory()); @@ -158,10 +161,15 @@ TEST(CoreOptionsTest, TestFromMap) { {Options::READ_BATCH_SIZE, "2048"}, {Options::WRITE_BUFFER_SIZE, "16MB"}, {Options::WRITE_BATCH_SIZE, "1234"}, + {Options::WRITE_BUFFER_SPILLABLE, "false"}, + {Options::WRITE_BUFFER_SPILL_MAX_DISK_SIZE, "7GB"}, + {Options::LOCAL_SORT_MAX_NUM_FILE_HANDLES, "64"}, + {Options::SPILL_COMPRESSION, "lz4"}, {Options::COMMIT_FORCE_COMPACT, "true"}, {Options::COMMIT_TIMEOUT, "120s"}, {Options::COMMIT_MAX_RETRIES, "20"}, {Options::SCAN_SNAPSHOT_ID, "5"}, + {Options::SCAN_MODE, "from-snapshot-full"}, {Options::SNAPSHOT_NUM_RETAINED_MIN, "15"}, {Options::SNAPSHOT_NUM_RETAINED_MAX, "30"}, {Options::SNAPSHOT_EXPIRE_LIMIT, "20"}, @@ -211,6 +219,7 @@ TEST(CoreOptionsTest, TestFromMap) { {Options::COMPACTION_SIZE_RATIO, "9"}, {Options::NUM_SORTED_RUNS_COMPACTION_TRIGGER, "11"}, {Options::NUM_SORTED_RUNS_STOP_TRIGGER, "17"}, + {Options::NUM_LEVELS, "9"}, {Options::LOOKUP_COMPACT, "gentle"}, {Options::LOOKUP_COMPACT_MAX_INTERVAL, "7"}, {Options::COMPACTION_OPTIMIZATION_INTERVAL, "2s"}, @@ -260,6 +269,11 @@ TEST(CoreOptionsTest, TestFromMap) { ASSERT_EQ(2048, core_options.GetReadBatchSize()); ASSERT_EQ(1234, core_options.GetWriteBatchSize()); ASSERT_EQ(16 * 1024 * 1024, core_options.GetWriteBufferSize()); + ASSERT_FALSE(core_options.GetWriteBufferSpillable()); + ASSERT_EQ(7L * 1024 * 1024 * 1024, core_options.GetWriteBufferSpillMaxDiskSize()); + ASSERT_EQ(64, core_options.GetLocalSortMaxNumFileHandles()); + ASSERT_EQ("lz4", core_options.GetSpillCompressOptions().compress); + ASSERT_EQ(2, core_options.GetSpillCompressOptions().zstd_level); ASSERT_TRUE(core_options.CommitForceCompact()); ASSERT_EQ(120 * 1000, core_options.GetCommitTimeout()); ASSERT_EQ(20, core_options.GetCommitMaxRetries()); @@ -322,7 +336,7 @@ TEST(CoreOptionsTest, TestFromMap) { ASSERT_TRUE(core_options.GetGlobalIndexExternalPath()); ASSERT_EQ(core_options.GetGlobalIndexExternalPath().value(), "FILE:///tmp/global_index/"); ASSERT_EQ("test-tag", core_options.GetScanTagName().value()); - ASSERT_EQ(StartupMode::FromSnapshot(), core_options.GetStartupMode()); + ASSERT_EQ(StartupMode::FromSnapshotFull(), core_options.GetStartupMode()); ASSERT_EQ(375809637, core_options.GetCompactionFileSize(/*has_primary_key=*/true)); ASSERT_EQ(375809637, core_options.GetCompactionFileSize(/*has_primary_key=*/false)); ASSERT_TRUE(core_options.WriteOnly()); @@ -331,6 +345,7 @@ TEST(CoreOptionsTest, TestFromMap) { ASSERT_EQ(9, core_options.GetCompactionSizeRatio()); ASSERT_EQ(11, core_options.GetNumSortedRunsCompactionTrigger()); ASSERT_EQ(17, core_options.GetNumSortedRunsStopTrigger()); + ASSERT_EQ(9, core_options.GetNumLevels()); ASSERT_EQ(LookupCompactMode::GENTLE, core_options.GetLookupCompactMode()); ASSERT_EQ(11, core_options.GetLookupCompactMaxInterval()); ASSERT_TRUE(core_options.CompactionForceRewriteAllFiles()); diff --git a/src/paimon/core/disk/io_manager.h b/src/paimon/core/disk/io_manager.h index 5e14892ae..50d863fae 100644 --- a/src/paimon/core/disk/io_manager.h +++ b/src/paimon/core/disk/io_manager.h @@ -20,8 +20,6 @@ #include #include -#include "paimon/common/utils/path_util.h" -#include "paimon/common/utils/uuid.h" #include "paimon/core/disk/file_channel_manager.h" #include "paimon/result.h" diff --git a/src/paimon/core/memory/writer_memory_manager.cpp b/src/paimon/core/memory/writer_memory_manager.cpp new file mode 100644 index 000000000..7e27b9d05 --- /dev/null +++ b/src/paimon/core/memory/writer_memory_manager.cpp @@ -0,0 +1,109 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/memory/writer_memory_manager.h" + +#include + +#include "fmt/format.h" +#include "paimon/core/utils/batch_writer.h" + +namespace paimon { + +void WriterMemoryManager::RegisterWriter(BatchWriter* writer) { + UpdateWriterMemory(writer); +} + +void WriterMemoryManager::UnregisterWriter(BatchWriter* writer) { + auto iter = writer_memory_.find(writer); + if (iter == writer_memory_.end()) { + return; + } + + assert(total_memory_ >= iter->second); + total_memory_ -= iter->second; + writer_memory_.erase(iter); +} + +void WriterMemoryManager::RefreshWriterMemory(BatchWriter* writer) { + UpdateWriterMemory(writer); +} + +Status WriterMemoryManager::OnWriteCompleted(BatchWriter* writer) { + UpdateWriterMemory(writer); + if (total_memory_ < memory_limit_) { + return Status::OK(); + } + + return ShrinkToLimit(); +} + +void WriterMemoryManager::UpdateWriterMemory(BatchWriter* writer) { + uint64_t current_memory_usage = writer->GetMemoryUsage(); + auto [iter, inserted] = writer_memory_.emplace(writer, current_memory_usage); + if (inserted) { + total_memory_ += current_memory_usage; + } else { + uint64_t previous_memory = iter->second; + if (current_memory_usage >= previous_memory) { + total_memory_ += (current_memory_usage - previous_memory); + } else { + assert(total_memory_ >= previous_memory - current_memory_usage); + total_memory_ -= (previous_memory - current_memory_usage); + } + iter->second = current_memory_usage; + } +} + +WriterMemoryManager::Candidate WriterMemoryManager::PickLargest() const { + Candidate candidate; + for (const auto& [writer, memory] : writer_memory_) { + if (memory > candidate.memory) { + candidate = {writer, memory}; + } + } + return candidate; +} + +Status WriterMemoryManager::ShrinkToLimit() { + while (true) { + if (total_memory_ < memory_limit_) { + return Status::OK(); + } + + Candidate picked = PickLargest(); + if (picked.memory == 0) { + return Status::Invalid( + fmt::format("Unable to release memory to below the write-buffer-size limit ({} " + "bytes), this might be a bug.", + memory_limit_)); + } + BatchWriter* candidate = picked.writer; + uint64_t before_memory = picked.memory; + PAIMON_RETURN_NOT_OK(candidate->FlushMemory()); + + UpdateWriterMemory(candidate); + uint64_t after_memory = candidate->GetMemoryUsage(); + if (after_memory >= before_memory) { + return Status::Invalid(fmt::format( + "Before flushing memory, writer had {} bytes of memory allocated, After flushing " + "memory, writer still has {} bytes of memory allocated, this might be a bug.", + before_memory, after_memory)); + } + } +} + +} // namespace paimon diff --git a/src/paimon/core/memory/writer_memory_manager.h b/src/paimon/core/memory/writer_memory_manager.h new file mode 100644 index 000000000..3ac5488cf --- /dev/null +++ b/src/paimon/core/memory/writer_memory_manager.h @@ -0,0 +1,59 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/status.h" + +namespace paimon { + +class BatchWriter; + +/// Coordinates global write-buffer memory across managed writers. Used in `AbstractFileStoreWrite`. +/// @note This class is not thread-safe. +class WriterMemoryManager { + public: + explicit WriterMemoryManager(uint64_t memory_limit) : memory_limit_(memory_limit) {} + + /// Register a writer when create a new `BatchWriter` in `AbstractFileStoreWrite::GetWriter()` + void RegisterWriter(BatchWriter* writer); + /// Unregister a writer when the `BatchWriter` has been erased in `AbstractFileStoreWrite` + void UnregisterWriter(BatchWriter* writer); + /// Refresh the memory usage of a writer when after `BatchWriter::PrepareCommit()` + void RefreshWriterMemory(BatchWriter* writer); + /// Check if the total memory usage exceeds the limit after `BatchWriter::Write()`, and trigger + /// flush if needed. + Status OnWriteCompleted(BatchWriter* writer); + + private: + struct Candidate { + BatchWriter* writer = nullptr; + uint64_t memory = 0; + }; + + void UpdateWriterMemory(BatchWriter* writer); + Candidate PickLargest() const; + Status ShrinkToLimit(); + + const uint64_t memory_limit_; + uint64_t total_memory_ = 0; + std::unordered_map writer_memory_; +}; + +} // namespace paimon diff --git a/src/paimon/core/memory/writer_memory_manager_test.cpp b/src/paimon/core/memory/writer_memory_manager_test.cpp new file mode 100644 index 000000000..b1370a90c --- /dev/null +++ b/src/paimon/core/memory/writer_memory_manager_test.cpp @@ -0,0 +1,245 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/memory/writer_memory_manager.h" + +#include +#include +#include +#include +#include + +#include "gtest/gtest.h" +#include "paimon/core/io/compact_increment.h" +#include "paimon/core/io/data_increment.h" +#include "paimon/core/utils/batch_writer.h" +#include "paimon/core/utils/commit_increment.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +namespace { + +class FakeBatchWriter : public BatchWriter { + public: + FakeBatchWriter(const std::string& name, std::vector* flush_history) + : name_(name), flush_history_(flush_history) {} + + void SetMemoryUsage(uint64_t memory_usage) { + memory_usage_ = memory_usage; + } + + void SetFlushReductions(std::vector flush_reductions) { + flush_reductions_ = std::move(flush_reductions); + } + + uint64_t GetMemoryUsage() const override { + return memory_usage_; + } + + Status FlushMemory() override { + uint64_t reduction = memory_usage_; + if (flush_calls_ < static_cast(flush_reductions_.size())) { + reduction = flush_reductions_[flush_calls_]; + } + reduction = std::min(reduction, memory_usage_); + memory_usage_ -= reduction; + + if (reduction > 0) { + if (flush_history_ != nullptr) { + flush_history_->push_back(name_); + } + ++flush_calls_; + } + return Status::OK(); + } + + Status Write(std::unique_ptr&& batch) override { + (void)batch; + return Status::OK(); + } + + Status Compact(bool full_compaction) override { + (void)full_compaction; + return Status::OK(); + } + + Result PrepareCommit(bool wait_compaction) override { + (void)wait_compaction; + return CommitIncrement(DataIncrement({}, {}, {}), CompactIncrement({}, {}, {}), nullptr); + } + + Result CompactNotCompleted() override { + return false; + } + + Status Sync() override { + return Status::OK(); + } + + Status Close() override { + return Status::OK(); + } + + std::shared_ptr GetMetrics() const override { + return nullptr; + } + + private: + std::string name_; + std::vector* flush_history_; + std::vector flush_reductions_; + uint64_t memory_usage_ = 0; + int32_t flush_calls_ = 0; +}; + +} // namespace + +TEST(WriterMemoryManagerTest, DoesNotFlushWhenMemoryIsBelowLimit) { + WriterMemoryManager manager(/*memory_limit=*/100); + std::vector flush_history; + FakeBatchWriter writer("writer", &flush_history); + manager.RegisterWriter(&writer); + + writer.SetMemoryUsage(40); + ASSERT_OK(manager.OnWriteCompleted(&writer)); + ASSERT_TRUE(flush_history.empty()); +} + +TEST(WriterMemoryManagerTest, UnregisterWriterRemovesMemoryFromLedger) { + WriterMemoryManager manager(/*memory_limit=*/100); + std::vector flush_history; + FakeBatchWriter writer_a("writer_a", &flush_history); + manager.RegisterWriter(&writer_a); + FakeBatchWriter writer_b("writer_b", &flush_history); + manager.RegisterWriter(&writer_b); + + writer_a.SetMemoryUsage(80); + manager.RefreshWriterMemory(&writer_a); + manager.UnregisterWriter(&writer_a); + + writer_b.SetMemoryUsage(30); + ASSERT_OK(manager.OnWriteCompleted(&writer_b)); + ASSERT_TRUE(flush_history.empty()); +} + +TEST(WriterMemoryManagerTest, RefreshWriterMemoryUpdatesLedgerWithoutFlushing) { + WriterMemoryManager manager(/*memory_limit=*/80); + std::vector flush_history; + FakeBatchWriter writer_a("writer_a", &flush_history); + manager.RegisterWriter(&writer_a); + FakeBatchWriter writer_b("writer_b", &flush_history); + manager.RegisterWriter(&writer_b); + + writer_a.SetMemoryUsage(60); + manager.RefreshWriterMemory(&writer_a); + writer_b.SetMemoryUsage(30); + manager.RefreshWriterMemory(&writer_b); + + writer_a.SetMemoryUsage(10); + ASSERT_OK(manager.OnWriteCompleted(&writer_a)); + ASSERT_TRUE(flush_history.empty()); +} + +TEST(WriterMemoryManagerTest, FlushWriterMemoryWithMultipleWriters) { + WriterMemoryManager manager(/*memory_limit=*/100); + std::vector flush_history; + FakeBatchWriter writer_a("writer_a", &flush_history); + manager.RegisterWriter(&writer_a); + FakeBatchWriter writer_b("writer_b", &flush_history); + manager.RegisterWriter(&writer_b); + FakeBatchWriter writer_c("writer_c", &flush_history); + manager.RegisterWriter(&writer_c); + + writer_a.SetMemoryUsage(60); + ASSERT_OK(manager.OnWriteCompleted(&writer_a)); + writer_b.SetMemoryUsage(30); + ASSERT_OK(manager.OnWriteCompleted(&writer_b)); + writer_c.SetMemoryUsage(50); + ASSERT_OK(manager.OnWriteCompleted(&writer_c)); + writer_a.SetMemoryUsage(30); + ASSERT_OK(manager.OnWriteCompleted(&writer_a)); + writer_c.SetMemoryUsage(45); + ASSERT_OK(manager.OnWriteCompleted(&writer_c)); + + ASSERT_EQ(flush_history, std::vector({"writer_a", "writer_c", "writer_c"})); + ASSERT_EQ(writer_a.GetMemoryUsage(), 30); + ASSERT_EQ(writer_b.GetMemoryUsage(), 30); + ASSERT_EQ(writer_c.GetMemoryUsage(), 0); +} + +TEST(WriterMemoryManagerTest, ReclaimsCallerWhenCallerIsLargestWriter) { + WriterMemoryManager manager(/*memory_limit=*/100); + std::vector flush_history; + FakeBatchWriter writer_a("writer_a", &flush_history); + manager.RegisterWriter(&writer_a); + FakeBatchWriter writer_b("writer_b", &flush_history); + manager.RegisterWriter(&writer_b); + + writer_a.SetMemoryUsage(20); + manager.RefreshWriterMemory(&writer_a); + + writer_b.SetMemoryUsage(90); + ASSERT_OK(manager.OnWriteCompleted(&writer_b)); + + ASSERT_EQ(flush_history, std::vector({"writer_b"})); + ASSERT_EQ(writer_a.GetMemoryUsage(), 20); + ASSERT_EQ(writer_b.GetMemoryUsage(), 0); +} + +TEST(WriterMemoryManagerTest, ContinuesReclaimingUntilBelowGlobalLimit) { + WriterMemoryManager manager(/*memory_limit=*/61); + std::vector flush_history; + FakeBatchWriter writer_a("writer_a", &flush_history); + manager.RegisterWriter(&writer_a); + FakeBatchWriter writer_b("writer_b", &flush_history); + manager.RegisterWriter(&writer_b); + + writer_a.SetMemoryUsage(90); + writer_a.SetFlushReductions({20, 20, 20}); + manager.RefreshWriterMemory(&writer_a); + + writer_b.SetMemoryUsage(60); + writer_b.SetFlushReductions({30}); + ASSERT_OK(manager.OnWriteCompleted(&writer_b)); + + ASSERT_EQ(flush_history, + std::vector({"writer_a", "writer_a", "writer_b", "writer_a"})); + ASSERT_EQ(writer_a.GetMemoryUsage(), 30); + ASSERT_EQ(writer_b.GetMemoryUsage(), 30); +} + +TEST(WriterMemoryManagerTest, ReturnsConfigurationErrorWhenNoWriterCanReleaseEnoughMemory) { + WriterMemoryManager manager(/*memory_limit=*/100); + std::vector flush_history; + FakeBatchWriter writer_a("writer_a", &flush_history); + manager.RegisterWriter(&writer_a); + FakeBatchWriter writer_b("writer_b", &flush_history); + manager.RegisterWriter(&writer_b); + + writer_b.SetMemoryUsage(20); + manager.RefreshWriterMemory(&writer_b); + + writer_a.SetMemoryUsage(120); + writer_a.SetFlushReductions({10, 0}); + ASSERT_NOK_WITH_MSG( + manager.OnWriteCompleted(&writer_a), + "Before flushing memory, writer had 110 bytes of memory allocated, After flushing memory, " + "writer still has 110 bytes of memory allocated, this might be a bug."); + ASSERT_EQ(flush_history, std::vector({"writer_a"})); +} + +} // namespace paimon::test diff --git a/src/paimon/core/mergetree/compact/sort_merge_reader_test.cpp b/src/paimon/core/mergetree/compact/sort_merge_reader_test.cpp index fde56299e..84ef87c24 100644 --- a/src/paimon/core/mergetree/compact/sort_merge_reader_test.cpp +++ b/src/paimon/core/mergetree/compact/sort_merge_reader_test.cpp @@ -74,10 +74,12 @@ class SortMergeReaderTest : public testing::Test { const std::vector& expected) const { CheckSortMergeResult(src_array_vec, user_key_comparator, user_defined_seq_comparator, key_schema, - value_schema, expected); + value_schema, expected, + /*need_merge=*/true); CheckSortMergeResult(src_array_vec, user_key_comparator, user_defined_seq_comparator, key_schema, - value_schema, expected); + value_schema, expected, + /*need_merge=*/true); } private: @@ -87,10 +89,18 @@ class SortMergeReaderTest : public testing::Test { const std::shared_ptr& user_key_comparator, const std::shared_ptr& user_defined_seq_comparator, const std::shared_ptr& key_schema, - const std::shared_ptr& value_schema, int32_t batch_size) const { + const std::shared_ptr& value_schema, int32_t batch_size, + bool need_merge) const { auto mfunc = std::make_unique(/*ignore_delete=*/false); auto merge_function_wrapper = std::make_shared(std::move(mfunc)); + if (!need_merge) { + if constexpr (std::is_same_v) { + merge_function_wrapper = nullptr; + } else { + ADD_FAILURE() << "Only SortMergeReaderWithMinHeap supports no merge"; + } + } std::vector> concat_readers; for (const auto& src_array : src_array_vec) { auto file_batch_reader = std::make_unique( @@ -114,11 +124,11 @@ class SortMergeReaderTest : public testing::Test { const std::shared_ptr& user_defined_seq_comparator, const std::shared_ptr& key_schema, const std::shared_ptr& value_schema, - const std::vector& expected) const { + const std::vector& expected, bool need_merge) const { for (auto batch_size : {1, 2, 3, 4, 100}) { auto sort_merge_reader = CreateSortMergeReader( src_array_vec, user_key_comparator, user_defined_seq_comparator, key_schema, - value_schema, batch_size); + value_schema, batch_size, need_merge); ASSERT_OK_AND_ASSIGN( std::vector results, (ReadResultCollector::CollectKeyValueResult< @@ -215,7 +225,7 @@ TEST_F(SortMergeReaderTest, TestSimpleWithTwoSameKeys) { {1, 1, 1}, {{2}, {3}, {5}}, {{2, 30}, {3, 30}, {5, 50}}, pool_); CheckSortMergeResult( {src_array1, src_array2, src_array3, src_array4}, user_key_comparator, nullptr, key_schema, - value_schema, expected); + value_schema, expected, /*need_merge=*/true); } TEST_F(SortMergeReaderTest, TestSimpleWithThreeSameKeys) { @@ -261,7 +271,7 @@ TEST_F(SortMergeReaderTest, TestSimpleWithThreeSameKeys) { KeyValueChecker::GenerateKeyValues({2, 1}, {{2}, {5}}, {{2, 30}, {5, 50}}, pool_); CheckSortMergeResult( {src_array1, src_array2, src_array3, src_array4}, user_key_comparator, nullptr, key_schema, - value_schema, expected); + value_schema, expected, /*need_merge=*/true); } TEST_F(SortMergeReaderTest, TestSimpleWithThreeSameKeys2) { @@ -306,7 +316,7 @@ TEST_F(SortMergeReaderTest, TestSimpleWithThreeSameKeys2) { KeyValueChecker::GenerateKeyValues({2, 1}, {{2}, {5}}, {{2, 30}, {5, 50}}, pool_); CheckSortMergeResult( {src_array1, src_array2, src_array3, src_array4}, user_key_comparator, nullptr, key_schema, - value_schema, expected); + value_schema, expected, /*need_merge=*/true); } TEST_F(SortMergeReaderTest, TestSortMergeIn2Ways) { @@ -712,4 +722,131 @@ TEST_F(SortMergeReaderTest, TestSortMergeWithAggMergeFunction) { value_schema, {user_defined_sequence_field}, {"k0"}, core_options, expected); } +TEST_F(SortMergeReaderTest, TestRawSortNoMergeKeepsDuplicateKeys) { + arrow::FieldVector fields = {arrow::field("_SEQUENCE_NUMBER", arrow::int64()), + arrow::field("_VALUE_KIND", arrow::int8()), + arrow::field("k0", arrow::int32()), + arrow::field("v0", arrow::int32())}; + + auto data_fields = CreateDataField(fields); + std::shared_ptr key_schema = arrow::schema(arrow::FieldVector({fields[2]})); + std::shared_ptr value_schema = + arrow::schema(arrow::FieldVector({fields[2], fields[3]})); + std::shared_ptr src_type = arrow::struct_(fields); + + auto src_array1 = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(src_type, R"([ + [0, 0, 2, 10] + ])") + .ValueOrDie()); + + auto src_array2 = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(src_type, R"([ + [2, 0, 2, 30] + ])") + .ValueOrDie()); + + auto src_array3 = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(src_type, R"([ + [1, 0, 5, 50] + ])") + .ValueOrDie()); + + auto src_array4 = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(src_type, R"([ + [1, 0, 2, 30] + ])") + .ValueOrDie()); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr user_key_comparator, + FieldsComparator::Create({data_fields[2]}, std::vector({0}), + /*is_ascending_order=*/true)); + std::vector expected = KeyValueChecker::GenerateKeyValues( + {0, 1, 2, 1}, {{2}, {2}, {2}, {5}}, {{2, 10}, {2, 30}, {2, 30}, {5, 50}}, pool_); + CheckSortMergeResult( + {src_array1, src_array2, src_array3, src_array4}, user_key_comparator, + /*user_defined_seq_comparator=*/nullptr, key_schema, value_schema, expected, + /*need_merge=*/false); +} + +TEST_F(SortMergeReaderTest, TestRawSortNoMergeWithMinHeap) { + // key: k0, user defined sequence field: ts, value: v0 + // Format: [_SEQUENCE_NUMBER, _VALUE_KIND, k0, ts, v0] + // Reader1 (SEQUENCE_NUMBER 0..5): + // [key=1,ts=1,v=1], [key=1,ts=2,v=2], [key=1,ts=3,v=3] + // [key=1,ts=4,v=4], [key=2,ts=4,v=40], [key=2,ts=5,v=50] + // Reader2 (SEQUENCE_NUMBER 6..11): + // [key=1,ts=5,v=5], [key=1,ts=6,v=6], [key=2,ts=1,v=10] + // [key=2,ts=2,v=20], [key=2,ts=3,v=30], [key=2,ts=6,v=60] + // + // After sort: + // With user_defined_seq_comparator on ts field, sort by key asc, then ts asc within same key + // key=1: ts=1(seq0,v=1), ts=2(seq1,v=2), ts=3(seq2,v=3), ts=4(seq3,v=4), + // ts=5(seq6,v=5), ts=6(seq7,v=6) + // key=2: ts=1(seq8,v=10), ts=2(seq9,v=20), ts=3(seq10,v=30), ts=4(seq4,v=40), + // ts=5(seq5,v=50), ts=6(seq11,v=60) + + arrow::FieldVector fields = { + arrow::field("_SEQUENCE_NUMBER", arrow::int64()), + arrow::field("_VALUE_KIND", arrow::int8()), arrow::field("k0", arrow::int32()), + arrow::field("ts", arrow::int32()), arrow::field("v0", arrow::int32())}; + + auto data_fields = CreateDataField(fields); + std::shared_ptr key_schema = arrow::schema(arrow::FieldVector({fields[2]})); + std::shared_ptr value_schema = + arrow::schema(arrow::FieldVector({fields[2], fields[3], fields[4]})); + std::shared_ptr src_type = arrow::struct_(fields); + + auto src_array1 = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(src_type, R"([ + [0, 0, 1, 1, 1], + [1, 0, 1, 2, 2], + [2, 0, 1, 3, 3], + [3, 0, 1, 4, 4], + [4, 0, 2, 4, 40], + [5, 0, 2, 5, 50] + ])") + .ValueOrDie()); + + auto src_array2 = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(src_type, R"([ + [6, 0, 1, 5, 5], + [7, 0, 1, 6, 6], + [8, 0, 2, 1, 10], + [9, 0, 2, 2, 20], + [10, 0, 2, 3, 30], + [11, 0, 2, 6, 60] + ])") + .ValueOrDie()); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr user_key_comparator, + FieldsComparator::Create({data_fields[2]}, std::vector({0}), + /*is_ascending_order=*/true)); + // user_defined_seq_comparator based on ts field (index 1 in value schema {k0, ts, v0}) + ASSERT_OK_AND_ASSIGN(std::shared_ptr user_defined_seq_comparator, + FieldsComparator::Create({data_fields[2], data_fields[3], data_fields[4]}, + std::vector({1}), + /*is_ascending_order=*/true)); + + std::vector expected = KeyValueChecker::GenerateKeyValues( + {0, 1, 2, 3, 6, 7, 8, 9, 10, 4, 5, 11}, + {{1}, {1}, {1}, {1}, {1}, {1}, {2}, {2}, {2}, {2}, {2}, {2}}, + {{1, 1, 1}, + {1, 2, 2}, + {1, 3, 3}, + {1, 4, 4}, + {1, 5, 5}, + {1, 6, 6}, + {2, 1, 10}, + {2, 2, 20}, + {2, 3, 30}, + {2, 4, 40}, + {2, 5, 50}, + {2, 6, 60}}, + pool_); + CheckSortMergeResult({src_array1, src_array2}, user_key_comparator, + user_defined_seq_comparator, key_schema, + value_schema, expected, /*need_merge=*/false); +} + } // namespace paimon::test diff --git a/src/paimon/core/mergetree/compact/sort_merge_reader_with_loser_tree.h b/src/paimon/core/mergetree/compact/sort_merge_reader_with_loser_tree.h index 721f7076b..d302ffec4 100644 --- a/src/paimon/core/mergetree/compact/sort_merge_reader_with_loser_tree.h +++ b/src/paimon/core/mergetree/compact/sort_merge_reader_with_loser_tree.h @@ -18,7 +18,6 @@ #include #include -#include #include #include @@ -36,8 +35,8 @@ class FieldsComparator; class KeyValueRecordReader; class Metrics; -/// `SortMergeReader` implemented with min-heap. Merge the KeyValue parsed by -/// KeyValueDataFileRecordReader and return the iterator of KeyValue +/// `SortMergeReader` implemented with loser tree. Merge the KeyValue parsed by +/// `KeyValueRecordReader` and return the iterator of KeyValue class SortMergeReaderWithLoserTree : public SortMergeReader { public: SortMergeReaderWithLoserTree( diff --git a/src/paimon/core/mergetree/compact/sort_merge_reader_with_min_heap.cpp b/src/paimon/core/mergetree/compact/sort_merge_reader_with_min_heap.cpp index 78bb0734d..cd3a38175 100644 --- a/src/paimon/core/mergetree/compact/sort_merge_reader_with_min_heap.cpp +++ b/src/paimon/core/mergetree/compact/sort_merge_reader_with_min_heap.cpp @@ -27,7 +27,8 @@ SortMergeReaderWithMinHeap::SortMergeReaderWithMinHeap( const std::shared_ptr& user_key_comparator, const std::shared_ptr& user_defined_seq_comparator, const std::shared_ptr>& merge_function_wrapper) - : readers_holder_(std::move(readers)), + : need_merge_(merge_function_wrapper != nullptr), + readers_holder_(std::move(readers)), user_key_comparator_(user_key_comparator), merge_function_wrapper_(merge_function_wrapper), min_heap_(HeapSorter(user_key_comparator, user_defined_seq_comparator)) { @@ -64,6 +65,10 @@ Result> SortMergeReaderWithMinHeap::N Result SortMergeReaderWithMinHeap::Iterator::HasNext() { while (true) { PAIMON_ASSIGN_OR_RAISE(bool has_more, NextImpl()); + if (!reader_->need_merge_) { + // no merge: just return every kv in sorted order, possibly with duplicate keys + return has_more; + } if (!has_more) { return false; } @@ -97,6 +102,16 @@ Result SortMergeReaderWithMinHeap::Iterator::NextImpl() { if (reader_->min_heap_.empty()) { return Status::Invalid("Min heap is empty. This is a bug."); } + + if (!reader_->need_merge_) { + // no merge: only poll the top element, set it as result directly + auto& element = const_cast(reader_->min_heap_.top()); + result_ = std::move(element.kv); + reader_->polled_.push_back(std::move(element)); + reader_->min_heap_.pop(); + return true; + } + reader_->merge_function_wrapper_->Reset(); std::shared_ptr key = reader_->min_heap_.top().kv.key; bool is_first = true; diff --git a/src/paimon/core/mergetree/compact/sort_merge_reader_with_min_heap.h b/src/paimon/core/mergetree/compact/sort_merge_reader_with_min_heap.h index ca47cf389..7f31e7223 100644 --- a/src/paimon/core/mergetree/compact/sort_merge_reader_with_min_heap.h +++ b/src/paimon/core/mergetree/compact/sort_merge_reader_with_min_heap.h @@ -38,8 +38,8 @@ class Metrics; template class MergeFunctionWrapper; -/// `SortMergeReader` implemented with min-heap. Merge the KeyValue parsed by -/// KeyValueDataFileRecordReader and return the iterator of KeyValue +/// `SortMergeReader` implemented with min-heap. Merge the KeyValue or only sort the KeyValue parsed +/// by `KeyValueRecordReader` and return the iterator of KeyValue class SortMergeReaderWithMinHeap : public SortMergeReader { public: SortMergeReaderWithMinHeap( @@ -142,6 +142,7 @@ class SortMergeReaderWithMinHeap : public SortMergeReader { }; private: + const bool need_merge_; // must hold all readers, as data array is allocated by the pool of data file reader std::vector> readers_holder_; std::vector next_batch_readers_; diff --git a/src/paimon/core/mergetree/external_sort_buffer.cpp b/src/paimon/core/mergetree/external_sort_buffer.cpp new file mode 100644 index 000000000..1dc5f9738 --- /dev/null +++ b/src/paimon/core/mergetree/external_sort_buffer.cpp @@ -0,0 +1,242 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/external_sort_buffer.h" + +#include +#include + +#include "arrow/api.h" +#include "arrow/c/bridge.h" +#include "arrow/compute/api.h" +#include "paimon/common/table/special_fields.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/fields_comparator.h" +#include "paimon/common/utils/scope_guard.h" +#include "paimon/core/disk/io_manager.h" +#include "paimon/core/io/async_key_value_producer_and_consumer.h" +#include "paimon/core/io/key_value_in_memory_record_reader.h" +#include "paimon/core/io/key_value_meta_projection_consumer.h" +#include "paimon/core/io/key_value_record_reader.h" +#include "paimon/core/io/row_to_arrow_array_converter.h" +#include "paimon/core/mergetree/compact/sort_merge_reader_with_min_heap.h" +#include "paimon/core/mergetree/spill_channel_manager.h" +#include "paimon/core/mergetree/spill_reader.h" +#include "paimon/core/mergetree/spill_writer.h" + +namespace paimon { + +Result> ExternalSortBuffer::Create( + std::unique_ptr&& in_memory_buffer, + const std::shared_ptr& value_schema, + const std::vector& trimmed_primary_keys, + const std::shared_ptr& key_comparator, + const std::shared_ptr& user_defined_seq_comparator, + const CoreOptions& options, const std::shared_ptr& io_manager, + const std::shared_ptr& pool) { + arrow::FieldVector key_fields; + key_fields.reserve(trimmed_primary_keys.size()); + for (const auto& primary_key : trimmed_primary_keys) { + auto key_field = value_schema->GetFieldByName(primary_key); + assert(key_field != nullptr); + key_fields.push_back(key_field); + } + auto key_schema = arrow::schema(key_fields); + + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr spill_channel_enumerator, + io_manager->CreateChannelEnumerator()); + return std::unique_ptr(new ExternalSortBuffer( + std::move(in_memory_buffer), key_schema, value_schema, key_comparator, + user_defined_seq_comparator, options, spill_channel_enumerator, pool)); +} + +ExternalSortBuffer::ExternalSortBuffer( + std::unique_ptr&& in_memory_buffer, + const std::shared_ptr& key_schema, + const std::shared_ptr& value_schema, + const std::shared_ptr& key_comparator, + const std::shared_ptr& user_defined_seq_comparator, + const CoreOptions& options, + const std::shared_ptr& spill_channel_enumerator, + const std::shared_ptr& pool) + : in_memory_buffer_(std::move(in_memory_buffer)), + pool_(pool), + key_schema_(key_schema), + value_schema_(value_schema), + key_comparator_(key_comparator), + user_defined_seq_comparator_(user_defined_seq_comparator), + write_schema_(SpecialFields::CompleteSequenceAndValueKindField(value_schema)), + options_(options), + spill_channel_manager_(std::make_shared( + options_.GetFileSystem(), options_.GetLocalSortMaxNumFileHandles())), + spill_channel_enumerator_(spill_channel_enumerator) {} + +ExternalSortBuffer::~ExternalSortBuffer() { + DoClear(); +} + +bool ExternalSortBuffer::HasSpilledData() const { + return !spill_channel_manager_->GetChannels().empty(); +} + +void ExternalSortBuffer::DoClear() { + in_memory_buffer_->Clear(); + CleanupSpillFiles(); +} + +void ExternalSortBuffer::Clear() { + DoClear(); +} + +uint64_t ExternalSortBuffer::GetMemorySize() const { + return in_memory_buffer_->GetMemorySize(); +} + +Result ExternalSortBuffer::FlushMemory() { + if (!in_memory_buffer_->HasData()) { + return true; + } + + int64_t max_spill_disk_size = options_.GetWriteBufferSpillMaxDiskSize(); + + PAIMON_ASSIGN_OR_RAISE(std::vector> memory_buffer_readers, + in_memory_buffer_->CreateReaders()); + PAIMON_RETURN_NOT_OK(SpillMemoryBuffer(std::move(memory_buffer_readers))); + in_memory_buffer_->Clear(); + return total_spill_disk_bytes_ < max_spill_disk_size; +} + +Result ExternalSortBuffer::Write(std::unique_ptr&& batch) { + PAIMON_ASSIGN_OR_RAISE(bool has_remaining_memory, in_memory_buffer_->Write(std::move(batch))); + if (has_remaining_memory) { + return true; + } + return FlushMemory(); +} + +Result>> ExternalSortBuffer::CreateReaders() { + PAIMON_ASSIGN_OR_RAISE(std::vector> readers, + CollectSpillReaders()); + PAIMON_ASSIGN_OR_RAISE(std::vector> memory_readers, + in_memory_buffer_->CreateReaders()); + + readers.insert(readers.end(), std::make_move_iterator(memory_readers.begin()), + std::make_move_iterator(memory_readers.end())); + return readers; +} + +bool ExternalSortBuffer::HasData() const { + return in_memory_buffer_->HasData() || HasSpilledData(); +} + +void ExternalSortBuffer::CleanupSpillFiles() { + spill_channel_manager_->Reset(); + total_spill_disk_bytes_ = 0; +} + +Result>> ExternalSortBuffer::CollectSpillReaders() + const { + std::vector> readers; + const auto& channel_ids = spill_channel_manager_->GetChannels(); + readers.reserve(channel_ids.size()); + for (const auto& channel_id : channel_ids) { + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr spill_reader, + SpillReader::Create(options_.GetFileSystem(), key_schema_, + value_schema_, pool_, channel_id)); + readers.push_back(std::move(spill_reader)); + } + return readers; +} + +Result ExternalSortBuffer::SpillToDisk( + std::vector>&& readers, int32_t write_batch_size) { + const auto& spill_compress_options = options_.GetSpillCompressOptions(); + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr spill_writer, + SpillWriter::Create(options_.GetFileSystem(), write_schema_, spill_channel_enumerator_, + spill_channel_manager_, spill_compress_options.compress, + spill_compress_options.zstd_level)); + auto cleanup_guard = ScopeGuard([&]() { + [[maybe_unused]] auto status = + spill_channel_manager_->DeleteChannel(spill_writer->GetChannelId()); + }); + + auto sorted_reader = std::make_unique( + std::move(readers), key_comparator_, user_defined_seq_comparator_, + /*merge_function_wrapper=*/nullptr); + auto create_consumer = [target_schema = write_schema_, pool = pool_]() + -> Result>> { + return KeyValueMetaProjectionConsumer::Create(target_schema, pool); + }; + auto async_key_value_producer_consumer = + std::make_unique>( + std::move(sorted_reader), create_consumer, write_batch_size, + /*projection_thread_num=*/1, pool_); + auto close_guard = ScopeGuard([&]() { async_key_value_producer_consumer->Close(); }); + + while (true) { + PAIMON_ASSIGN_OR_RAISE(KeyValueBatch key_value_batch, + async_key_value_producer_consumer->NextBatch()); + if (key_value_batch.batch == nullptr) { + break; + } + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( + std::shared_ptr record_batch, + arrow::ImportRecordBatch(key_value_batch.batch.get(), write_schema_)); + PAIMON_RETURN_NOT_OK(spill_writer->WriteBatch(record_batch)); + } + + PAIMON_RETURN_NOT_OK(spill_writer->Close()); + PAIMON_ASSIGN_OR_RAISE(int64_t spilled_file_size, spill_writer->GetFileSize()); + cleanup_guard.Release(); + return spilled_file_size; +} + +Status ExternalSortBuffer::SpillMemoryBuffer( + std::vector>&& readers) { + PAIMON_ASSIGN_OR_RAISE(int64_t spill_file_size, + SpillToDisk(std::move(readers), options_.GetWriteBatchSize())); + total_spill_disk_bytes_ += spill_file_size; + + if (options_.GetLocalSortMaxNumFileHandles() > 0 && + static_cast(spill_channel_manager_->GetChannels().size()) >= + options_.GetLocalSortMaxNumFileHandles()) { + PAIMON_RETURN_NOT_OK(MergeSpilledFiles()); + } + return Status::OK(); +} + +Status ExternalSortBuffer::MergeSpilledFiles() { + if (spill_channel_manager_->GetChannels().size() < 2) { + return Status::OK(); + } + auto spill_channel_ids_before_merge = spill_channel_manager_->GetChannels(); + auto cleanup_guard = ScopeGuard([&]() { + for (const auto& spill_channel_id : spill_channel_ids_before_merge) { + [[maybe_unused]] auto status = spill_channel_manager_->DeleteChannel(spill_channel_id); + } + }); + + PAIMON_ASSIGN_OR_RAISE(std::vector> readers, + CollectSpillReaders()); + PAIMON_ASSIGN_OR_RAISE(int64_t merged_file_size, + SpillToDisk(std::move(readers), options_.GetWriteBatchSize())); + total_spill_disk_bytes_ = merged_file_size; + + return Status::OK(); +} + +} // namespace paimon diff --git a/src/paimon/core/mergetree/external_sort_buffer.h b/src/paimon/core/mergetree/external_sort_buffer.h new file mode 100644 index 000000000..57b14d04d --- /dev/null +++ b/src/paimon/core/mergetree/external_sort_buffer.h @@ -0,0 +1,99 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "arrow/type_fwd.h" +#include "paimon/core/core_options.h" +#include "paimon/core/disk/file_io_channel.h" +#include "paimon/core/mergetree/in_memory_sort_buffer.h" +#include "paimon/core/mergetree/sort_buffer.h" +#include "paimon/record_batch.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace arrow { +class Schema; +} // namespace arrow + +namespace paimon { +class FieldsComparator; +class IOManager; +class KeyValueRecordReader; +class MemoryPool; +class SpillChannelManager; + +/// Spillable SortBuffer. Buffers RecordBatches in an underlying in-memory sort buffer; +/// when the in-memory budget is reached, sorted data is spilled to a new on-disk file. +class ExternalSortBuffer : public SortBuffer { + public: + static Result> Create( + std::unique_ptr&& in_memory_buffer, + const std::shared_ptr& value_schema, + const std::vector& trimmed_primary_keys, + const std::shared_ptr& key_comparator, + const std::shared_ptr& user_defined_seq_comparator, + const CoreOptions& options, const std::shared_ptr& io_manager, + const std::shared_ptr& pool); + ~ExternalSortBuffer() override; + + void Clear() override; + uint64_t GetMemorySize() const override; + Result FlushMemory() override; + Result Write(std::unique_ptr&& batch) override; + Result>> CreateReaders() override; + bool HasData() const override; + + private: + void DoClear(); + bool HasSpilledData() const; + Result>> CollectSpillReaders() const; + Result SpillToDisk(std::vector>&& readers, + int32_t write_batch_size); + Status MergeSpilledFiles(); + Status SpillMemoryBuffer(std::vector>&& readers); + void CleanupSpillFiles(); + + ExternalSortBuffer(std::unique_ptr&& in_memory_buffer, + const std::shared_ptr& key_schema, + const std::shared_ptr& value_schema, + const std::shared_ptr& key_comparator, + const std::shared_ptr& user_defined_seq_comparator, + const CoreOptions& options, + const std::shared_ptr& spill_channel_enumerator, + const std::shared_ptr& pool); + + std::unique_ptr in_memory_buffer_; + + const std::shared_ptr pool_; + const std::shared_ptr key_schema_; + const std::shared_ptr value_schema_; + const std::shared_ptr key_comparator_; + const std::shared_ptr user_defined_seq_comparator_; + const std::shared_ptr write_schema_; + const CoreOptions options_; + const std::shared_ptr spill_channel_manager_; + + std::shared_ptr spill_channel_enumerator_; + int64_t total_spill_disk_bytes_ = 0; +}; + +} // namespace paimon diff --git a/src/paimon/core/mergetree/sort_buffer_test.cpp b/src/paimon/core/mergetree/sort_buffer_test.cpp new file mode 100644 index 000000000..bdc9600a1 --- /dev/null +++ b/src/paimon/core/mergetree/sort_buffer_test.cpp @@ -0,0 +1,368 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/ipc/json_simple.h" +#include "gtest/gtest.h" +#include "paimon/common/types/data_field.h" +#include "paimon/common/types/row_kind.h" +#include "paimon/common/utils/fields_comparator.h" +#include "paimon/core/disk/io_manager.h" +#include "paimon/core/io/key_value_record_reader.h" +#include "paimon/core/mergetree/external_sort_buffer.h" +#include "paimon/core/mergetree/in_memory_sort_buffer.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/defs.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/binary_row_generator.h" +#include "paimon/testing/utils/data_generator.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +namespace { + +struct ReaderResult { + std::string key; + int32_t sequence_field; + int32_t value_field; + int64_t sequence_number; + const RowKind* row_kind; +}; + +} // namespace + +class SortBufferTest : public ::testing::Test { + public: + void SetUp() override { + pool_ = GetDefaultPool(); + test_dir_ = UniqueTestDirectory::Create(); + io_manager_ = std::make_shared(test_dir_->Str(), test_dir_->GetFileSystem()); + + std::vector value_fields = {DataField(0, arrow::field("key", arrow::utf8())), + DataField(1, arrow::field("seq", arrow::int32())), + DataField(2, arrow::field("val", arrow::int32()))}; + value_type_ = DataField::ConvertDataFieldsToArrowStructType(value_fields); + value_schema_ = DataField::ConvertDataFieldsToArrowSchema(value_fields); + primary_keys_ = {"key"}; + sequence_fields_ = {"seq"}; + + ASSERT_OK_AND_ASSIGN( + std::shared_ptr table_schema, + TableSchema::Create(/*schema_id=*/0, value_schema_, /*partition_keys=*/{}, + primary_keys_, /*options=*/{{Options::BUCKET, "1"}})); + data_generator_ = std::make_shared(table_schema, pool_); + + ASSERT_OK_AND_ASSIGN(key_comparator_, FieldsComparator::Create( + value_fields, {0}, /*is_ascending_order=*/true)); + ASSERT_OK_AND_ASSIGN( + sequence_comparator_, + FieldsComparator::Create(value_fields, {1}, /*is_ascending_order=*/true)); + } + + protected: + void CheckResult(std::vector>&& readers, + const std::vector>& expected, bool need_sort) const { + ASSERT_EQ(readers.size(), expected.size()); + std::vector> actual; + for (size_t i = 0; i < expected.size(); ++i) { + ASSERT_OK_AND_ASSIGN(auto rows, CollectRows(readers[i].get())); + actual.push_back(std::move(rows)); + } + if (need_sort) { + std::sort(actual.begin(), actual.end(), + [](const auto& a, const auto& b) { return a.size() < b.size(); }); + } + for (size_t i = 0; i < expected.size(); ++i) { + AssertRows(actual[i], expected[i]); + } + CloseReaders(readers); + } + + BinaryRow MakeRow(const RowKind* kind, const std::string& key, int32_t seq, int32_t val) { + return BinaryRowGenerator::GenerateRow(kind, {key, seq, val}, pool_.get()); + } + std::unique_ptr MakeBatch(const std::vector& input_rows) { + EXPECT_OK_AND_ASSIGN(auto batches, + data_generator_->SplitArrayByPartitionAndBucket(input_rows)); + EXPECT_EQ(1, batches.size()); + return std::move(batches[0]); + } + + Result> CollectRows(KeyValueRecordReader* reader) const { + std::vector rows; + while (true) { + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr iterator, + reader->NextBatch()); + if (iterator == nullptr) { + break; + } + while (iterator->HasNext()) { + PAIMON_ASSIGN_OR_RAISE(KeyValue key_value, iterator->Next()); + rows.push_back(ReaderResult{std::string(key_value.key->GetStringView(0)), + key_value.value->GetInt(1), key_value.value->GetInt(2), + key_value.sequence_number, key_value.value_kind}); + } + } + return rows; + } + + Result> CreateExternalSortBuffer( + int64_t last_sequence_number, uint64_t write_buffer_size) const { + PAIMON_ASSIGN_OR_RAISE( + CoreOptions options, + CoreOptions::FromMap({{Options::SPILL_COMPRESSION, "uncompressed"}})); + auto in_memory_buffer = std::make_unique( + last_sequence_number, value_type_, primary_keys_, sequence_fields_, + /*sequence_fields_ascending=*/true, key_comparator_, write_buffer_size, pool_); + return ExternalSortBuffer::Create(std::move(in_memory_buffer), value_schema_, primary_keys_, + key_comparator_, sequence_comparator_, options, + io_manager_, pool_); + } + + void AssertRows(const std::vector& actual, + const std::vector& expected) const { + ASSERT_EQ(actual.size(), expected.size()); + for (size_t index = 0; index < expected.size(); ++index) { + ASSERT_EQ(actual[index].key, expected[index].key); + ASSERT_EQ(actual[index].sequence_field, expected[index].sequence_field); + ASSERT_EQ(actual[index].value_field, expected[index].value_field); + ASSERT_EQ(actual[index].sequence_number, expected[index].sequence_number); + ASSERT_EQ(actual[index].row_kind, expected[index].row_kind); + } + } + + void CloseReaders(const std::vector>& readers) const { + for (const auto& reader : readers) { + reader->Close(); + } + } + + std::shared_ptr pool_; + std::shared_ptr data_generator_; + std::unique_ptr test_dir_; + std::shared_ptr io_manager_; + std::shared_ptr value_type_; + std::shared_ptr value_schema_; + std::vector primary_keys_; + std::vector sequence_fields_; + std::shared_ptr key_comparator_; + std::shared_ptr sequence_comparator_; +}; + +TEST_F(SortBufferTest, TestInMemorySortBufferEstimateMemoryUse) { + { + arrow::FieldVector fields = { + arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), + arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}; + std::shared_ptr array = + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([ + ["Lucy", 20, 1, 14.1], + ["Paul", 20, 1, null], + ["Alice", 10, 0, 13.1] + ])") + .ValueOrDie(); + ASSERT_OK_AND_ASSIGN(int64_t memory_use, InMemorySortBuffer::EstimateMemoryUse(array)); + int64_t expected_memory_use = + 1 + (13 + 3 * 4 + 1) + (3 * 4 + 1) + (3 * 4 + 1) + (3 * 8 + 1); + ASSERT_EQ(memory_use, expected_memory_use); + } + { + arrow::FieldVector fields = {arrow::field("v0", arrow::boolean()), + arrow::field("v1", arrow::int8()), + arrow::field("v2", arrow::int16()), + arrow::field("v3", arrow::int32()), + arrow::field("v4", arrow::int64()), + arrow::field("v5", arrow::float32()), + arrow::field("v6", arrow::float64()), + arrow::field("v7", arrow::date32()), + arrow::field("v8", arrow::timestamp(arrow::TimeUnit::NANO)), + arrow::field("v9", arrow::decimal128(30, 20)), + arrow::field("v10", arrow::utf8()), + arrow::field("v11", arrow::binary())}; + + auto array = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([ + [true, 10, 200, 65536, 123456789, 0.0, 0.0, 2000, -86399999999500, "2134.48690000000000000009", "difference", "Alice"], + [false, -128, -32768, -2147483648, -9223372036854775808, -3.4028235E38, -1.7976931348623157E308, -719528, -9223372036854775808, "-999999999999999999.99999999999999999999", "Alice", "Two"], + [true, 127, 32767, 2147483647, 9223372036854775807, 3.4028235E38, 1.7976931348623157E308, 2932896, 9223372036854775807, "999999999999999999.99999999999999999999", "Alice", "made"], + [true, 0, 0, 0, 0, 1.4E-45, 4.9E-324, 0, 0, "0.00000000000000000000", "Alice", "wood"] +])") + .ValueOrDie()); + ASSERT_OK_AND_ASSIGN(int64_t memory_use, InMemorySortBuffer::EstimateMemoryUse(array)); + int64_t expected_memory_use = 1 + (4 + 1) + (4 + 1) + (2 * 4 + 1) + (4 * 4 + 1) + + (8 * 4 + 1) + (4 * 4 + 1) + (8 * 4 + 1) + (4 * 4 + 1) + + (8 * 4 + 1) + (4 * 16 + 1) + (25 + 4 * 4 + 1) + + (16 + 4 * 4 + 1); + ASSERT_EQ(memory_use, expected_memory_use); + } + { + arrow::FieldVector fields = { + arrow::field("f0", arrow::list(arrow::int32())), + arrow::field("f1", arrow::map(arrow::utf8(), arrow::int64())), + arrow::field("f2", arrow::struct_({arrow::field("sub1", arrow::int64()), + arrow::field("sub2", arrow::float64()), + arrow::field("sub3", arrow::boolean())})), + }; + auto array = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([ + [[1, 2, 3], [["apple", 3], ["banana", 4]], [10, 10.1, false]], + [[4, 5], [["cat", 5], ["dog", 6], ["mouse", 7]], [20, 20.1, true]], + [[6], [["elephant", 7], ["fox", 8]], [null, 30.1, true]] + ])") + .ValueOrDie()); + ASSERT_OK_AND_ASSIGN(int64_t memory_use, InMemorySortBuffer::EstimateMemoryUse(array)); + int64_t list_mem = 1 + (4 * 6 + 1); + int64_t map_mem = 1 + (33 + 4 * 7 + 1) + (8 * 7 + 1); + int64_t struct_mem = 1 + (8 * 3 + 1) + (8 * 3 + 1) + (1 * 3 + 1); + int64_t expected_memory_use = 1 + list_mem + map_mem + struct_mem; + ASSERT_EQ(memory_use, expected_memory_use); + } +} + +TEST_F(SortBufferTest, TestInMemorySortBufferSimple) { + InMemorySortBuffer buffer(/*last_sequence_number=*/9, value_type_, primary_keys_, + sequence_fields_, /*sequence_fields_ascending=*/true, key_comparator_, + /*write_buffer_size=*/1024 * 1024, pool_); + + std::vector input_rows; + input_rows.push_back(MakeRow(RowKind::Insert(), "b", 2, 200)); + input_rows.push_back(MakeRow(RowKind::Delete(), "a", 3, 300)); + input_rows.push_back(MakeRow(RowKind::UpdateAfter(), "a", 1, 100)); + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, buffer.Write(MakeBatch(input_rows))); + ASSERT_TRUE(has_remaining_quota); + + input_rows.clear(); + input_rows.push_back(MakeRow(RowKind::UpdateBefore(), "c", 1, 400)); + input_rows.push_back(MakeRow(RowKind::Insert(), "b", 1, 150)); + ASSERT_OK_AND_ASSIGN(has_remaining_quota, buffer.Write(MakeBatch(input_rows))); + ASSERT_TRUE(has_remaining_quota); + + ASSERT_TRUE(buffer.HasData()); + ASSERT_GT(buffer.GetMemorySize(), 0); + + ASSERT_OK_AND_ASSIGN(auto readers, buffer.CreateReaders()); + ASSERT_TRUE(buffer.HasData()); + ASSERT_GT(buffer.GetMemorySize(), 0); + + CheckResult( + std::move(readers), + {{{"a", 1, 100, 12, RowKind::UpdateAfter()}, + {"a", 3, 300, 11, RowKind::Delete()}, + {"b", 2, 200, 10, RowKind::Insert()}}, + {{"b", 1, 150, 14, RowKind::Insert()}, {"c", 1, 400, 13, RowKind::UpdateBefore()}}}, + /*need_sort=*/false); + + buffer.Clear(); + ASSERT_FALSE(buffer.HasData()); + ASSERT_EQ(buffer.GetMemorySize(), 0); +} + +TEST_F(SortBufferTest, TestExternalSortBufferWithInMemoryDataAndNoSpill) { + ASSERT_OK_AND_ASSIGN(auto buffer, CreateExternalSortBuffer(/*last_sequence_number=*/4, + /*write_buffer_size=*/1024 * 1024)); + std::vector input_rows; + input_rows.push_back(MakeRow(RowKind::Delete(), "b", 2, 200)); + input_rows.push_back(MakeRow(RowKind::UpdateAfter(), "a", 3, 300)); + input_rows.push_back(MakeRow(RowKind::Insert(), "a", 1, 100)); + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, buffer->Write(MakeBatch(input_rows))); + ASSERT_TRUE(has_remaining_quota); + ASSERT_TRUE(buffer->HasData()); + ASSERT_GT(buffer->GetMemorySize(), 0); + + ASSERT_OK_AND_ASSIGN(auto readers, buffer->CreateReaders()); + ASSERT_TRUE(buffer->HasData()); + ASSERT_GT(buffer->GetMemorySize(), 0); + + CheckResult(std::move(readers), + {{{"a", 1, 100, 7, RowKind::Insert()}, + {"a", 3, 300, 6, RowKind::UpdateAfter()}, + {"b", 2, 200, 5, RowKind::Delete()}}}, + /*need_sort=*/false); + + buffer->Clear(); + ASSERT_FALSE(buffer->HasData()); + ASSERT_EQ(buffer->GetMemorySize(), 0); +} + +TEST_F(SortBufferTest, TestExternalSortBufferWithSpilledDataAndInMemoryData) { + // the write buffer size limit 35 bytes is larger than 2 rows but smaller than 3 rows. + ASSERT_OK_AND_ASSIGN(auto buffer, CreateExternalSortBuffer(/*last_sequence_number=*/19, + /*write_buffer_size=*/35)); + + // in memory data + std::vector input_rows; + input_rows.push_back(MakeRow(RowKind::Insert(), "b", 1, 200)); + input_rows.push_back(MakeRow(RowKind::Delete(), "b", 2, 200)); + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, buffer->Write(MakeBatch(input_rows))); + ASSERT_TRUE(has_remaining_quota); + ASSERT_TRUE(buffer->HasData()); + ASSERT_GT(buffer->GetMemorySize(), 0); + + // spill file 1 (with above in memory data) + input_rows.clear(); + input_rows.push_back(MakeRow(RowKind::UpdateAfter(), "a", 3, 300)); + ASSERT_OK_AND_ASSIGN(has_remaining_quota, buffer->Write(MakeBatch(input_rows))); + ASSERT_TRUE(has_remaining_quota); + ASSERT_TRUE(buffer->HasData()); + ASSERT_EQ(buffer->GetMemorySize(), 0); + + // spill file 2 + input_rows.clear(); + input_rows.push_back(MakeRow(RowKind::Insert(), "c", 5, 500)); + input_rows.push_back(MakeRow(RowKind::Insert(), "c", 4, 400)); + input_rows.push_back(MakeRow(RowKind::UpdateBefore(), "a", 1, 100)); + input_rows.push_back(MakeRow(RowKind::Insert(), "b", 1, 150)); + ASSERT_OK_AND_ASSIGN(has_remaining_quota, buffer->Write(MakeBatch(input_rows))); + ASSERT_TRUE(has_remaining_quota); + ASSERT_TRUE(buffer->HasData()); + ASSERT_EQ(buffer->GetMemorySize(), 0); + + // in memory data + input_rows.clear(); + input_rows.push_back(MakeRow(RowKind::Insert(), "c", 4, 400)); + input_rows.push_back(MakeRow(RowKind::UpdateBefore(), "a", 1, 100)); + ASSERT_OK_AND_ASSIGN(has_remaining_quota, buffer->Write(MakeBatch(input_rows))); + ASSERT_TRUE(has_remaining_quota); + ASSERT_TRUE(buffer->HasData()); + ASSERT_GT(buffer->GetMemorySize(), 0); + + ASSERT_OK_AND_ASSIGN(auto readers, buffer->CreateReaders()); + ASSERT_TRUE(buffer->HasData()); + ASSERT_GT(buffer->GetMemorySize(), 0); + + CheckResult(std::move(readers), + {{{"a", 1, 100, 28, RowKind::UpdateBefore()}, {"c", 4, 400, 27, RowKind::Insert()}}, + {{"a", 3, 300, 22, RowKind::UpdateAfter()}, + {"b", 1, 200, 20, RowKind::Insert()}, + {"b", 2, 200, 21, RowKind::Delete()}}, + {{"a", 1, 100, 25, RowKind::UpdateBefore()}, + {"b", 1, 150, 26, RowKind::Insert()}, + {"c", 4, 400, 24, RowKind::Insert()}, + {"c", 5, 500, 23, RowKind::Insert()}}}, + /*need_sort=*/true); + + buffer->Clear(); + ASSERT_FALSE(buffer->HasData()); + ASSERT_EQ(buffer->GetMemorySize(), 0); +} + +} // namespace paimon::test diff --git a/src/paimon/core/mergetree/write_buffer_test.cpp b/src/paimon/core/mergetree/write_buffer_test.cpp index 82162da51..6258bbaca 100644 --- a/src/paimon/core/mergetree/write_buffer_test.cpp +++ b/src/paimon/core/mergetree/write_buffer_test.cpp @@ -170,71 +170,4 @@ TEST_F(WriteBufferTest, TestFlushPreservesRowKinds) { ASSERT_EQ(reader_result.sequence_numbers, (std::vector{0, 1, 2, 3})); } -TEST_F(WriteBufferTest, TestEstimateMemoryUse) { - { - std::shared_ptr array = - arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ - ["Lucy", 20, 1, 14.1], - ["Paul", 20, 1, null], - ["Alice", 10, 0, 13.1] - ])") - .ValueOrDie(); - ASSERT_OK_AND_ASSIGN(int64_t memory_use, InMemorySortBuffer::EstimateMemoryUse(array)); - int64_t expected_memory_use = - 1 + (13 + 3 * 4 + 1) + (3 * 4 + 1) + (3 * 4 + 1) + (3 * 8 + 1); - ASSERT_EQ(memory_use, expected_memory_use); - } - { - arrow::FieldVector fields = {arrow::field("v0", arrow::boolean()), - arrow::field("v1", arrow::int8()), - arrow::field("v2", arrow::int16()), - arrow::field("v3", arrow::int32()), - arrow::field("v4", arrow::int64()), - arrow::field("v5", arrow::float32()), - arrow::field("v6", arrow::float64()), - arrow::field("v7", arrow::date32()), - arrow::field("v8", arrow::timestamp(arrow::TimeUnit::NANO)), - arrow::field("v9", arrow::decimal128(30, 20)), - arrow::field("v10", arrow::utf8()), - arrow::field("v11", arrow::binary())}; - - auto array = std::dynamic_pointer_cast( - arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([ - [true, 10, 200, 65536, 123456789, 0.0, 0.0, 2000, -86399999999500, "2134.48690000000000000009", "difference", "Alice"], - [false, -128, -32768, -2147483648, -9223372036854775808, -3.4028235E38, -1.7976931348623157E308, -719528, -9223372036854775808, "-999999999999999999.99999999999999999999", "Alice", "Two"], - [true, 127, 32767, 2147483647, 9223372036854775807, 3.4028235E38, 1.7976931348623157E308, 2932896, 9223372036854775807, "999999999999999999.99999999999999999999", "Alice", "made"], - [true, 0, 0, 0, 0, 1.4E-45, 4.9E-324, 0, 0, "0.00000000000000000000", "Alice", "wood"] -])") - .ValueOrDie()); - ASSERT_OK_AND_ASSIGN(int64_t memory_use, InMemorySortBuffer::EstimateMemoryUse(array)); - int64_t expected_memory_use = 1 + (4 + 1) + (4 + 1) + (2 * 4 + 1) + (4 * 4 + 1) + - (8 * 4 + 1) + (4 * 4 + 1) + (8 * 4 + 1) + (4 * 4 + 1) + - (8 * 4 + 1) + (4 * 16 + 1) + (25 + 4 * 4 + 1) + - (16 + 4 * 4 + 1); - ASSERT_EQ(memory_use, expected_memory_use); - } - { - arrow::FieldVector fields = { - arrow::field("f0", arrow::list(arrow::int32())), - arrow::field("f1", arrow::map(arrow::utf8(), arrow::int64())), - arrow::field("f2", arrow::struct_({arrow::field("sub1", arrow::int64()), - arrow::field("sub2", arrow::float64()), - arrow::field("sub3", arrow::boolean())})), - }; - auto array = std::dynamic_pointer_cast( - arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([ - [[1, 2, 3], [["apple", 3], ["banana", 4]], [10, 10.1, false]], - [[4, 5], [["cat", 5], ["dog", 6], ["mouse", 7]], [20, 20.1, true]], - [[6], [["elephant", 7], ["fox", 8]], [null, 30.1, true]] - ])") - .ValueOrDie()); - ASSERT_OK_AND_ASSIGN(int64_t memory_use, InMemorySortBuffer::EstimateMemoryUse(array)); - int64_t list_mem = 1 + (4 * 6 + 1); - int64_t map_mem = 1 + (33 + 4 * 7 + 1) + (8 * 7 + 1); - int64_t struct_mem = 1 + (8 * 3 + 1) + (8 * 3 + 1) + (1 * 3 + 1); - int64_t expected_memory_use = 1 + list_mem + map_mem + struct_mem; - ASSERT_EQ(memory_use, expected_memory_use); - } -} - } // namespace paimon::test diff --git a/src/paimon/testing/utils/data_generator_test.cpp b/src/paimon/testing/utils/data_generator_test.cpp index 6cbc1d3a8..e8e26cb96 100644 --- a/src/paimon/testing/utils/data_generator_test.cpp +++ b/src/paimon/testing/utils/data_generator_test.cpp @@ -51,15 +51,6 @@ TEST_F(DataGeneratorTest, TestSimple) { arrow::field("f1", arrow::utf8()), arrow::field("f2", arrow::utf8())}; auto schema = arrow::schema(fields); - std::vector primary_keys = {"f0"}; - std::vector partition_keys = {"f1"}; - std::map options; - options[Options::BUCKET_KEY] = "f0"; - options[Options::BUCKET] = "2"; - ASSERT_OK_AND_ASSIGN( - std::shared_ptr table_schema, - TableSchema::Create(/*schema_id=*/0, schema, partition_keys, primary_keys, options)); - DataGenerator gen(table_schema, GetDefaultPool()); std::vector rows; rows.push_back(Make(RowKind::Insert(), "Alex", "20250326", "18")); @@ -69,8 +60,34 @@ TEST_F(DataGeneratorTest, TestSimple) { rows.push_back(Make(RowKind::Insert(), "Evan", "20250326", "22")); rows.push_back(Make(RowKind::Delete(), "Alex", "20250326", "18")); rows.push_back(Make(RowKind::Delete(), "Bob", "20250326", "19")); - ASSERT_OK_AND_ASSIGN(auto batches, gen.SplitArrayByPartitionAndBucket(rows)); - ASSERT_EQ(3, batches.size()); + + { + std::vector primary_keys = {"f0"}; + std::vector partition_keys = {"f1"}; + std::map options; + options[Options::BUCKET_KEY] = "f0"; + options[Options::BUCKET] = "2"; + ASSERT_OK_AND_ASSIGN( + std::shared_ptr table_schema, + TableSchema::Create(/*schema_id=*/0, schema, partition_keys, primary_keys, options)); + DataGenerator gen(table_schema, GetDefaultPool()); + + ASSERT_OK_AND_ASSIGN(auto batches, gen.SplitArrayByPartitionAndBucket(rows)); + ASSERT_EQ(3, batches.size()); + } + + { + std::vector primary_keys; + std::vector partition_keys; + std::map options; + ASSERT_OK_AND_ASSIGN( + std::shared_ptr table_schema, + TableSchema::Create(/*schema_id=*/0, schema, partition_keys, primary_keys, options)); + DataGenerator gen(table_schema, GetDefaultPool()); + + ASSERT_OK_AND_ASSIGN(auto batches, gen.SplitArrayByPartitionAndBucket(rows)); + ASSERT_EQ(1, batches.size()); + } } } // namespace paimon::test