From 39a7a85e1bb637851fbd4b9baee86a8a826ddebd Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 7 May 2026 13:19:53 +0000 Subject: [PATCH 1/2] Cherry-pick of https://github.com/ClickHouse/ClickHouse/pull/104251 with unresolved conflict markers (resolution in next commit) --- Original cherry-pick message follows: Merge pull request #104251 from alexey-milovidov/parquet-single-file-parallelism Parallelize reads from a single Parquet file in StorageFile # Conflicts: # src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp # src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h --- src/Formats/FormatFactory.cpp | 8 + src/Formats/FormatFactory.h | 3 + src/Processors/Formats/IInputFormat.h | 5 + .../Formats/Impl/Parquet/ReadManager.cpp | 22 +- .../Impl/ParquetV3BlockInputFormat.cpp | 242 +++++++++++++++++- .../Formats/Impl/ParquetV3BlockInputFormat.h | 27 ++ src/Storages/StorageFile.cpp | 80 +++++- src/Storages/StorageFile.h | 19 ++ .../02725_parquet_preserve_order.reference | 3 +- 9 files changed, 399 insertions(+), 10 deletions(-) diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index 807bf03ce4f7..42e9ac18ecfa 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -831,6 +831,14 @@ BucketSplitter FormatFactory::getSplitter(const String & format) return creator.bucket_splitter_creator(); } +bool FormatFactory::checkFormatHasSplitter(const String & format) const +{ + auto it = dict.find(boost::to_lower_copy(format)); + if (it == dict.end()) + return false; + return static_cast(it->second.bucket_splitter_creator); +} + void FormatFactory::registerRandomAccessInputFormat(const String & name, RandomAccessInputCreator input_creator) { chassert(input_creator); diff --git a/src/Formats/FormatFactory.h b/src/Formats/FormatFactory.h index 76a04bb98798..2aac56dd2d87 100644 --- a/src/Formats/FormatFactory.h +++ b/src/Formats/FormatFactory.h @@ -373,6 +373,9 @@ class FormatFactory final : private boost::noncopyable, public IHints<2> void registerFileBucketInfo(const String & format, FileBucketInfoCreator bucket_info); void registerSplitter(const String & format, BucketSplitterCreator splitter); BucketSplitter getSplitter(const String & format); + /// Returns true if `format` is registered and has a bucket splitter + /// (e.g. Parquet). Used to decide whether to attempt single-file parallel splitting. + bool checkFormatHasSplitter(const String & format) const; private: FormatsDictionary dict; diff --git a/src/Processors/Formats/IInputFormat.h b/src/Processors/Formats/IInputFormat.h index ab693ba7a212..7030ff3849eb 100644 --- a/src/Processors/Formats/IInputFormat.h +++ b/src/Processors/Formats/IInputFormat.h @@ -66,6 +66,11 @@ struct IBucketSplitter /// Returns information about the resulting buckets (see the structure above for details). virtual std::vector splitToBuckets(size_t bucket_size, ReadBuffer & buf, const FormatSettings & format_settings_) = 0; + /// Splits a file into approximately `target_count` buckets, each covering a roughly + /// equal slice of the file. Useful for parallelising one large file across N readers. + /// The result has at most `target_count` buckets and never drops any data. + virtual std::vector splitToBucketsByCount(size_t target_count, ReadBuffer & buf, const FormatSettings & format_settings_) = 0; + virtual ~IBucketSplitter() = default; }; using BucketSplitter = std::shared_ptr; diff --git a/src/Processors/Formats/Impl/Parquet/ReadManager.cpp b/src/Processors/Formats/Impl/Parquet/ReadManager.cpp index 72aeabd331ab..fa1459732657 100644 --- a/src/Processors/Formats/Impl/Parquet/ReadManager.cpp +++ b/src/Processors/Formats/Impl/Parquet/ReadManager.cpp @@ -64,8 +64,26 @@ void ReadManager::init(FormatParserSharedResourcesPtr parser_shared_resources_, reader.prefilterAndInitRowGroups(row_groups_to_read); reader.preparePrewhere(); - ProfileEvents::increment(ProfileEvents::ParquetReadRowGroups, reader.row_groups.size()); - ProfileEvents::increment(ProfileEvents::ParquetPrunedRowGroups, reader.file_metadata.row_groups.size() - reader.row_groups.size()); + /// Profile events must reflect only the row groups that belong to this bucket, otherwise + /// every bucket of a single-file split would report the file's totals and the events would + /// be multiplied by the number of buckets. + size_t read_count; + size_t total_in_partition; + if (row_groups_to_read.has_value()) + { + read_count = 0; + for (const auto & rg : reader.row_groups) + if (rg.need_to_process) + ++read_count; + total_in_partition = row_groups_to_read->size(); + } + else + { + read_count = reader.row_groups.size(); + total_in_partition = reader.file_metadata.row_groups.size(); + } + ProfileEvents::increment(ProfileEvents::ParquetReadRowGroups, read_count); + ProfileEvents::increment(ProfileEvents::ParquetPrunedRowGroups, total_in_partition - read_count); size_t num_row_groups = reader.row_groups.size(); for (size_t i = size_t(ReadStage::NotStarted) + 1; i < size_t(ReadStage::Deliver); ++i) diff --git a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp index c9498e6dd8c0..c58c2bfd01ad 100644 --- a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp @@ -126,8 +126,23 @@ Chunk ParquetV3BlockInputFormat::read() temp_prefetcher.init(in, read_options, parser_shared_resources); parquet::format::FileMetaData file_metadata = getFileMetadata(temp_prefetcher); + size_t num_rows = 0; + if (buckets_to_read) + { + /// Only count rows in the assigned row groups. Otherwise multiple sources + /// reading buckets of the same file would each report the file's total. + for (size_t rg : buckets_to_read->row_group_ids) + { + if (rg < file_metadata.row_groups.size()) + num_rows += size_t(file_metadata.row_groups[rg].num_rows); + } + } + else + { + num_rows = size_t(file_metadata.num_rows); + } - auto chunk = getChunkForCount(size_t(file_metadata.num_rows)); + auto chunk = getChunkForCount(num_rows); chunk.getChunkInfos().add(std::make_shared(0)); reported_count = true; @@ -199,6 +214,231 @@ std::optional NativeParquetSchemaReader::readNumberOrRows() return size_t(file_metadata.num_rows); } +<<<<<<< HEAD +======= +void ParquetFileBucketInfo::serialize(WriteBuffer & buffer) +{ + writeVarUInt(row_group_ids.size(), buffer); + for (auto chunk : row_group_ids) + writeVarUInt(chunk, buffer); +} + +void ParquetFileBucketInfo::deserialize(ReadBuffer & buffer) +{ + size_t size_chunks; + readVarUInt(size_chunks, buffer); + row_group_ids = std::vector{}; + row_group_ids.resize(size_chunks); + size_t bucket; + for (size_t i = 0; i < size_chunks; ++i) + { + readVarUInt(bucket, buffer); + row_group_ids[i] = bucket; + } +} + +String ParquetFileBucketInfo::getIdentifier() const +{ + String result; + for (auto chunk : row_group_ids) + result += "_" + std::to_string(chunk); + return result; +} + +ParquetFileBucketInfo::ParquetFileBucketInfo(const std::vector & row_group_ids_) + : row_group_ids(row_group_ids_) +{ +} + +std::shared_ptr ParquetFileBucketInfo::filterByMatchingRowGroups(const std::vector & matching_row_groups) const +{ + if (matching_row_groups.empty()) + return nullptr; + if (row_group_ids.empty()) + return std::make_shared(matching_row_groups); + std::unordered_set matching_set(matching_row_groups.begin(), matching_row_groups.end()); + std::vector filtered; + for (size_t rg : row_group_ids) + if (matching_set.contains(rg)) + filtered.push_back(rg); + if (filtered.empty()) + return nullptr; + return std::make_shared(std::move(filtered)); +} + +void registerParquetFileBucketInfo(std::unordered_map & instances) +{ + instances.emplace("Parquet", std::make_shared()); +} + +std::vector ParquetBucketSplitter::splitToBuckets(size_t bucket_size, ReadBuffer & buf, const FormatSettings & format_settings_) +{ + std::atomic is_stopped = false; + auto arrow_file = asArrowFile(buf, format_settings_, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true, nullptr); + auto metadata = parquet::ReadMetaData(arrow_file); + std::vector bucket_sizes; + for (int i = 0; i < metadata->num_row_groups(); ++i) + bucket_sizes.push_back(metadata->RowGroup(i)->total_byte_size()); + + std::vector> buckets; + size_t current_weight = 0; + for (size_t i = 0; i < bucket_sizes.size(); ++i) + { + if (current_weight + bucket_sizes[i] <= bucket_size) + { + if (buckets.empty()) + buckets.emplace_back(); + buckets.back().push_back(i); + current_weight += bucket_sizes[i]; + } + else + { + current_weight = 0; + buckets.push_back({}); + buckets.back().push_back(i); + current_weight += bucket_sizes[i]; + } + } + + std::vector result; + for (const auto & bucket : buckets) + { + result.push_back(std::make_shared(bucket)); + } + return result; +} + +std::vector ParquetBucketSplitter::splitToBucketsByCount(size_t target_count, ReadBuffer & buf, const FormatSettings & format_settings_) +{ + std::atomic is_stopped = false; + auto arrow_file = asArrowFile(buf, format_settings_, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true, nullptr); + auto metadata = parquet::ReadMetaData(arrow_file); + const size_t num_row_groups = metadata->num_row_groups(); + + if (target_count == 0 || num_row_groups == 0) + return {}; + + /// Distribute row groups across at most target_count contiguous chunks. Each + /// chunk becomes a single ParquetFileBucketInfo containing several row groups, + /// so the caller gets one source per chunk and no row group is dropped. + const size_t num_chunks = std::min(target_count, num_row_groups); + std::vector result; + result.reserve(num_chunks); + for (size_t g = 0; g < num_chunks; ++g) + { + size_t lo = g * num_row_groups / num_chunks; + size_t hi = (g + 1) * num_row_groups / num_chunks; + std::vector ids; + ids.reserve(hi - lo); + for (size_t k = lo; k < hi; ++k) + ids.push_back(k); + result.push_back(std::make_shared(ids)); + } + return result; +} + +void registerInputFormatParquet(FormatFactory & factory) +{ + factory.registerFileBucketInfo( + "Parquet", + [] + { + return std::make_shared(); + } + ); + factory.registerRandomAccessInputFormatWithMetadata( + "Parquet", + [](ReadBuffer & buf, + const Block & sample, + const FormatSettings & settings, + const ReadSettings & read_settings, + bool is_remote_fs, + FormatParserSharedResourcesPtr parser_shared_resources, + FormatFilterInfoPtr format_filter_info, + const std::optional & object_with_metadata, + const ContextPtr & context) -> InputFormatPtr + { + size_t min_bytes_for_seek + = is_remote_fs ? read_settings.remote_read_min_bytes_for_seek : settings.parquet.local_read_min_bytes_for_seek; + ParquetMetadataCachePtr metadata_cache = context->getParquetMetadataCache(); + return std::make_shared( + buf, + std::make_shared(sample), + settings, + std::move(parser_shared_resources), + std::move(format_filter_info), + min_bytes_for_seek, + metadata_cache, + object_with_metadata + ); + }); + factory.registerRandomAccessInputFormat( + "Parquet", + [](ReadBuffer & buf, + const Block & sample, + const FormatSettings & settings, + const ReadSettings & read_settings, + bool is_remote_fs, + FormatParserSharedResourcesPtr parser_shared_resources, + FormatFilterInfoPtr format_filter_info) -> InputFormatPtr + { + size_t min_bytes_for_seek + = is_remote_fs ? read_settings.remote_read_min_bytes_for_seek : settings.parquet.local_read_min_bytes_for_seek; + return std::make_shared( + buf, + std::make_shared(sample), + settings, + std::move(parser_shared_resources), + std::move(format_filter_info), + min_bytes_for_seek, + nullptr, + std::nullopt + ); + }); + factory.markFormatSupportsSubsetOfColumns("Parquet"); + factory.registerPrewhereSupportChecker("Parquet", [](const FormatSettings &) + { + return true; + }); +} + +void registerParquetSchemaReader(FormatFactory & factory) +{ + factory.registerSplitter("Parquet", [] + { + return std::make_shared(); + }); + factory.registerSchemaReader( + "Parquet", [](ReadBuffer & buf, const FormatSettings & settings) -> SchemaReaderPtr + { + return std::make_shared(buf, settings); + } + ); + + factory.registerAdditionalInfoForSchemaCacheGetter( + "Parquet", + [](const FormatSettings & settings) + { + return fmt::format( + "schema_inference_make_columns_nullable={};enable_json_parsing={}", + settings.schema_inference_make_columns_nullable, + settings.parquet.enable_json_parsing); + }); +} + +} + +#else + +namespace DB +{ +class FormatFactory; +void registerInputFormatParquet(FormatFactory &) +{ +} + +void registerParquetSchemaReader(FormatFactory &) {} +>>>>>>> 47b6163d9f9 (Merge pull request #104251 from alexey-milovidov/parquet-single-file-parallelism) } #endif diff --git a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h index dffd7c28605e..420480a8a07d 100644 --- a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h @@ -12,6 +12,33 @@ namespace DB { +<<<<<<< HEAD +======= +struct ParquetFileBucketInfo : public FileBucketInfo +{ + std::vector row_group_ids; + + ParquetFileBucketInfo() = default; + explicit ParquetFileBucketInfo(const std::vector & row_group_ids_); + void serialize(WriteBuffer & buffer) override; + void deserialize(ReadBuffer & buffer) override; + String getIdentifier() const override; + String getFormatName() const override + { + return "Parquet"; + } + std::shared_ptr filterByMatchingRowGroups(const std::vector & matching_row_groups) const override; +}; +using ParquetFileBucketInfoPtr = std::shared_ptr; + +struct ParquetBucketSplitter : public IBucketSplitter +{ + ParquetBucketSplitter() = default; + std::vector splitToBuckets(size_t bucket_size, ReadBuffer & buf, const FormatSettings & format_settings_) override; + std::vector splitToBucketsByCount(size_t target_count, ReadBuffer & buf, const FormatSettings & format_settings_) override; +}; + +>>>>>>> 47b6163d9f9 (Merge pull request #104251 from alexey-milovidov/parquet-single-file-parallelism) class ParquetV3BlockInputFormat : public IInputFormat { public: diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index f5e9b285e113..ffd61755081e 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -1546,6 +1546,15 @@ Chunk StorageFileSource::generate() progress_callback(FileProgress(0, tryGetFileSizeFromReadBuffer(*read_buf).value_or(0))); } } + else if (fixed_file_path.has_value()) + { + /// This source was assigned to one specific (file, bucket) pair. + /// Consume it exactly once. + if (fixed_file_consumed) + return {}; + fixed_file_consumed = true; + current_path = *fixed_file_path; + } else { current_path = files_iterator->next(); @@ -1572,7 +1581,11 @@ Chunk StorageFileSource::generate() if (getContext()->getSettingsRef()[Setting::engine_file_skip_empty_files] && file_stat.st_size == 0) continue; - if (need_only_count && tryGetCountFromCache(file_stat)) + /// The count cache stores the file's total row count. When this source + /// only reads a subset of the file (file_bucket_info is set), the cache + /// is inapplicable — using it would have every source report the full + /// total and produce a count that's multiplied by the number of buckets. + if (need_only_count && !file_bucket_info && tryGetCountFromCache(file_stat)) continue; read_buf = createReadBuffer(current_path, file_stat, storage->use_table_fd, storage->table_fd, storage->compression_method, getContext()); @@ -1601,6 +1614,12 @@ Chunk StorageFileSource::generate() input_format->setSerializationHints(serialization_hints); + /// If this source was assigned to read only a subset of the file's buckets + /// (used to read one large file with multiple parallel sources), pass the + /// bucket assignment to the format before it starts reading. + if (file_bucket_info) + input_format->setBucketsToRead(file_bucket_info); + if (need_only_count) input_format->needOnlyCount(); @@ -1666,7 +1685,8 @@ Chunk StorageFileSource::generate() finished_generate = true; if (input_format && storage->format_name != "Distributed" && getContext()->getSettingsRef()[Setting::use_cache_for_count_from_files] - && (!format_filter_info || !format_filter_info->hasFilter())) + && (!format_filter_info || !format_filter_info->hasFilter()) + && !file_bucket_info) addNumRowsToCache(current_path, total_rows_in_file); total_rows_in_file = 0; @@ -1869,11 +1889,55 @@ void ReadFromFile::initializePipeline(QueryPipelineBuilder & pipeline, const Bui if (max_num_streams > files_to_read) num_streams = files_to_read; + auto ctx = getContext(); + + /// If we are reading exactly one local file in a splittable format (e.g. Parquet), + /// we can split it into multiple buckets (row group ranges) and create one source + /// per bucket. This recovers the parallelism we'd otherwise have only when reading + /// many files at once. Without this, a single big Parquet file feeds the whole + /// downstream pipeline through a single source/Resize(1->N) — leaving most of the + /// CPU idle on machines with many cores. + /// + /// We use the file list from `files_iterator` rather than `storage->paths`: the + /// iterator has already pruned files by `_path`/`_file` virtual-column predicates + /// (`createPathAndFileFilterDAG`), so the optimization respects that pruning. If + /// the predicate excludes the only path the file is not read at all. It also + /// means a query against many paths whose predicate prunes down to a single file + /// still benefits from the split. + std::vector per_source_buckets; + String single_file_path; + if (max_num_streams > 1 + && !storage->archive_info + && !storage->use_table_fd + && !storage->has_peekable_read_buffer_from_fd.load() + && !storage->distributed_processing + && storage->compression_method == "auto" + && FormatFactory::instance().checkFormatHasSplitter(storage->format_name) + && FormatFactory::instance().checkParallelizeOutputAfterReading(storage->format_name, ctx) + && files_iterator->getFiles().size() == 1) + { + auto splitter = FormatFactory::instance().getSplitter(storage->format_name); + single_file_path = files_iterator->getFiles().front(); + struct stat file_stat = getFileStat(single_file_path, false, -1, storage->getName()); + if (file_stat.st_size > 0) + { + auto buf = createReadBuffer( + single_file_path, file_stat, false, -1, storage->compression_method, ctx); + auto buckets = splitter->splitToBucketsByCount( + max_num_streams, *buf, + storage->format_settings.value_or(getFormatSettings(ctx))); + + if (buckets.size() >= 2) + { + per_source_buckets = std::move(buckets); + num_streams = per_source_buckets.size(); + } + } + } + Pipes pipes; pipes.reserve(num_streams); - auto ctx = getContext(); - /// Set total number of bytes to process. For progress bar. auto progress_callback = ctx->getFileProgressCallback(); @@ -1904,13 +1968,19 @@ void ReadFromFile::initializePipeline(QueryPipelineBuilder & pipeline, const Bui parser_shared_resources, format_filter_info); + if (i < per_source_buckets.size()) + { + source->fixed_file_path = single_file_path; + source->file_bucket_info = per_source_buckets[i]; + } + pipes.emplace_back(std::move(source)); } auto pipe = Pipe::unitePipes(std::move(pipes)); size_t output_ports = pipe.numOutputPorts(); const bool parallelize_output = ctx->getSettingsRef()[Setting::parallelize_output_from_storages]; - if (parallelize_output && storage->parallelizeOutputAfterReading(ctx) && output_ports > 0 && output_ports < max_num_streams) + if (parallelize_output && storage->parallelizeOutputAfterReading(ctx) && output_ports > 0 && output_ports != max_num_streams) pipe.resize(max_num_streams); if (pipe.empty()) diff --git a/src/Storages/StorageFile.h b/src/Storages/StorageFile.h index 82c3a2a7cd49..3e1815025806 100644 --- a/src/Storages/StorageFile.h +++ b/src/Storages/StorageFile.h @@ -25,6 +25,9 @@ using OutputFormatPtr = std::shared_ptr; class IInputFormat; using InputFormatPtr = std::shared_ptr; +struct FileBucketInfo; +using FileBucketInfoPtr = std::shared_ptr; + class PullingPipelineExecutor; class StorageFile final : public IStorage @@ -248,6 +251,11 @@ class StorageFileSource : public ISource, WithContext } const String & getFileNameInArchive(); + + /// Returns the (possibly virtual-column-filtered) list of files this iterator + /// will produce. Only meaningful when not reading from an archive and not + /// using distributed_processing. + const std::vector & getFiles() const { return files; } private: std::vector files; @@ -312,6 +320,17 @@ class StorageFileSource : public ISource, WithContext std::shared_ptr archive_reader; std::unique_ptr file_enumerator; + /// Optional subset-of-file assignment. When set, the input format only reads + /// these buckets (e.g. for Parquet — only the listed row groups). This is how + /// a single big file is processed in parallel by multiple sources. + FileBucketInfoPtr file_bucket_info; + + /// When this source has been assigned a specific (file, bucket) pair, it + /// reads only that one file (once) and ignores the shared FilesIterator. + /// Set together with `file_bucket_info`. + std::optional fixed_file_path; + bool fixed_file_consumed = false; + ColumnsDescription columns_description; NamesAndTypesList requested_columns; NamesAndTypesList requested_virtual_columns; diff --git a/tests/queries/0_stateless/02725_parquet_preserve_order.reference b/tests/queries/0_stateless/02725_parquet_preserve_order.reference index 3f410c13ec44..ce91c5aed9ea 100644 --- a/tests/queries/0_stateless/02725_parquet_preserve_order.reference +++ b/tests/queries/0_stateless/02725_parquet_preserve_order.reference @@ -8,5 +8,4 @@ ExpressionTransform (Expression) ExpressionTransform × 2 (ReadFromFile) - Resize 1 → 2 - File 0 → 1 + File × 2 0 → 1 From b08d6ae98f1d0abe24b75d21baa57a9e758cde41 Mon Sep 17 00:00:00 2001 From: Andrey Zvonov Date: Fri, 15 May 2026 21:55:05 +0200 Subject: [PATCH 2/2] Resolve conflicts in cherry-pick of #104251 --- .../Formats/Impl/ParquetBlockInputFormat.cpp | 29 +++ .../Formats/Impl/ParquetBlockInputFormat.h | 1 + .../Impl/ParquetV3BlockInputFormat.cpp | 225 ------------------ .../Formats/Impl/ParquetV3BlockInputFormat.h | 27 --- 4 files changed, 30 insertions(+), 252 deletions(-) diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index 2a88c7f000c9..7163a9eb727b 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -1402,6 +1402,35 @@ std::vector ParquetBucketSplitter::splitToBuckets(size_t buck return result; } +std::vector ParquetBucketSplitter::splitToBucketsByCount(size_t target_count, ReadBuffer & buf, const FormatSettings & format_settings_) +{ + std::atomic is_stopped = false; + auto arrow_file = asArrowFile(buf, format_settings_, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true, nullptr); + auto metadata = parquet::ReadMetaData(arrow_file); + const size_t num_row_groups = metadata->num_row_groups(); + + if (target_count == 0 || num_row_groups == 0) + return {}; + + /// Distribute row groups across at most target_count contiguous chunks. Each + /// chunk becomes a single ParquetFileBucketInfo containing several row groups, + /// so the caller gets one source per chunk and no row group is dropped. + const size_t num_chunks = std::min(target_count, num_row_groups); + std::vector result; + result.reserve(num_chunks); + for (size_t g = 0; g < num_chunks; ++g) + { + size_t lo = g * num_row_groups / num_chunks; + size_t hi = (g + 1) * num_row_groups / num_chunks; + std::vector ids; + ids.reserve(hi - lo); + for (size_t k = lo; k < hi; ++k) + ids.push_back(k); + result.push_back(std::make_shared(ids)); + } + return result; +} + void registerInputFormatParquet(FormatFactory & factory) { factory.registerFileBucketInfo( diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h index 1fea5f0609c1..7dc0e68ecb08 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h @@ -71,6 +71,7 @@ struct ParquetBucketSplitter : public IBucketSplitter { ParquetBucketSplitter() = default; std::vector splitToBuckets(size_t bucket_size, ReadBuffer & buf, const FormatSettings & format_settings_) override; + std::vector splitToBucketsByCount(size_t target_count, ReadBuffer & buf, const FormatSettings & format_settings_) override; }; class ParquetBlockInputFormat : public IInputFormat diff --git a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp index c58c2bfd01ad..01a73159d04a 100644 --- a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp @@ -214,231 +214,6 @@ std::optional NativeParquetSchemaReader::readNumberOrRows() return size_t(file_metadata.num_rows); } -<<<<<<< HEAD -======= -void ParquetFileBucketInfo::serialize(WriteBuffer & buffer) -{ - writeVarUInt(row_group_ids.size(), buffer); - for (auto chunk : row_group_ids) - writeVarUInt(chunk, buffer); -} - -void ParquetFileBucketInfo::deserialize(ReadBuffer & buffer) -{ - size_t size_chunks; - readVarUInt(size_chunks, buffer); - row_group_ids = std::vector{}; - row_group_ids.resize(size_chunks); - size_t bucket; - for (size_t i = 0; i < size_chunks; ++i) - { - readVarUInt(bucket, buffer); - row_group_ids[i] = bucket; - } -} - -String ParquetFileBucketInfo::getIdentifier() const -{ - String result; - for (auto chunk : row_group_ids) - result += "_" + std::to_string(chunk); - return result; -} - -ParquetFileBucketInfo::ParquetFileBucketInfo(const std::vector & row_group_ids_) - : row_group_ids(row_group_ids_) -{ -} - -std::shared_ptr ParquetFileBucketInfo::filterByMatchingRowGroups(const std::vector & matching_row_groups) const -{ - if (matching_row_groups.empty()) - return nullptr; - if (row_group_ids.empty()) - return std::make_shared(matching_row_groups); - std::unordered_set matching_set(matching_row_groups.begin(), matching_row_groups.end()); - std::vector filtered; - for (size_t rg : row_group_ids) - if (matching_set.contains(rg)) - filtered.push_back(rg); - if (filtered.empty()) - return nullptr; - return std::make_shared(std::move(filtered)); -} - -void registerParquetFileBucketInfo(std::unordered_map & instances) -{ - instances.emplace("Parquet", std::make_shared()); -} - -std::vector ParquetBucketSplitter::splitToBuckets(size_t bucket_size, ReadBuffer & buf, const FormatSettings & format_settings_) -{ - std::atomic is_stopped = false; - auto arrow_file = asArrowFile(buf, format_settings_, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true, nullptr); - auto metadata = parquet::ReadMetaData(arrow_file); - std::vector bucket_sizes; - for (int i = 0; i < metadata->num_row_groups(); ++i) - bucket_sizes.push_back(metadata->RowGroup(i)->total_byte_size()); - - std::vector> buckets; - size_t current_weight = 0; - for (size_t i = 0; i < bucket_sizes.size(); ++i) - { - if (current_weight + bucket_sizes[i] <= bucket_size) - { - if (buckets.empty()) - buckets.emplace_back(); - buckets.back().push_back(i); - current_weight += bucket_sizes[i]; - } - else - { - current_weight = 0; - buckets.push_back({}); - buckets.back().push_back(i); - current_weight += bucket_sizes[i]; - } - } - - std::vector result; - for (const auto & bucket : buckets) - { - result.push_back(std::make_shared(bucket)); - } - return result; -} - -std::vector ParquetBucketSplitter::splitToBucketsByCount(size_t target_count, ReadBuffer & buf, const FormatSettings & format_settings_) -{ - std::atomic is_stopped = false; - auto arrow_file = asArrowFile(buf, format_settings_, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true, nullptr); - auto metadata = parquet::ReadMetaData(arrow_file); - const size_t num_row_groups = metadata->num_row_groups(); - - if (target_count == 0 || num_row_groups == 0) - return {}; - - /// Distribute row groups across at most target_count contiguous chunks. Each - /// chunk becomes a single ParquetFileBucketInfo containing several row groups, - /// so the caller gets one source per chunk and no row group is dropped. - const size_t num_chunks = std::min(target_count, num_row_groups); - std::vector result; - result.reserve(num_chunks); - for (size_t g = 0; g < num_chunks; ++g) - { - size_t lo = g * num_row_groups / num_chunks; - size_t hi = (g + 1) * num_row_groups / num_chunks; - std::vector ids; - ids.reserve(hi - lo); - for (size_t k = lo; k < hi; ++k) - ids.push_back(k); - result.push_back(std::make_shared(ids)); - } - return result; -} - -void registerInputFormatParquet(FormatFactory & factory) -{ - factory.registerFileBucketInfo( - "Parquet", - [] - { - return std::make_shared(); - } - ); - factory.registerRandomAccessInputFormatWithMetadata( - "Parquet", - [](ReadBuffer & buf, - const Block & sample, - const FormatSettings & settings, - const ReadSettings & read_settings, - bool is_remote_fs, - FormatParserSharedResourcesPtr parser_shared_resources, - FormatFilterInfoPtr format_filter_info, - const std::optional & object_with_metadata, - const ContextPtr & context) -> InputFormatPtr - { - size_t min_bytes_for_seek - = is_remote_fs ? read_settings.remote_read_min_bytes_for_seek : settings.parquet.local_read_min_bytes_for_seek; - ParquetMetadataCachePtr metadata_cache = context->getParquetMetadataCache(); - return std::make_shared( - buf, - std::make_shared(sample), - settings, - std::move(parser_shared_resources), - std::move(format_filter_info), - min_bytes_for_seek, - metadata_cache, - object_with_metadata - ); - }); - factory.registerRandomAccessInputFormat( - "Parquet", - [](ReadBuffer & buf, - const Block & sample, - const FormatSettings & settings, - const ReadSettings & read_settings, - bool is_remote_fs, - FormatParserSharedResourcesPtr parser_shared_resources, - FormatFilterInfoPtr format_filter_info) -> InputFormatPtr - { - size_t min_bytes_for_seek - = is_remote_fs ? read_settings.remote_read_min_bytes_for_seek : settings.parquet.local_read_min_bytes_for_seek; - return std::make_shared( - buf, - std::make_shared(sample), - settings, - std::move(parser_shared_resources), - std::move(format_filter_info), - min_bytes_for_seek, - nullptr, - std::nullopt - ); - }); - factory.markFormatSupportsSubsetOfColumns("Parquet"); - factory.registerPrewhereSupportChecker("Parquet", [](const FormatSettings &) - { - return true; - }); -} - -void registerParquetSchemaReader(FormatFactory & factory) -{ - factory.registerSplitter("Parquet", [] - { - return std::make_shared(); - }); - factory.registerSchemaReader( - "Parquet", [](ReadBuffer & buf, const FormatSettings & settings) -> SchemaReaderPtr - { - return std::make_shared(buf, settings); - } - ); - - factory.registerAdditionalInfoForSchemaCacheGetter( - "Parquet", - [](const FormatSettings & settings) - { - return fmt::format( - "schema_inference_make_columns_nullable={};enable_json_parsing={}", - settings.schema_inference_make_columns_nullable, - settings.parquet.enable_json_parsing); - }); -} - -} - -#else - -namespace DB -{ -class FormatFactory; -void registerInputFormatParquet(FormatFactory &) -{ -} - -void registerParquetSchemaReader(FormatFactory &) {} ->>>>>>> 47b6163d9f9 (Merge pull request #104251 from alexey-milovidov/parquet-single-file-parallelism) } #endif diff --git a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h index 420480a8a07d..dffd7c28605e 100644 --- a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h @@ -12,33 +12,6 @@ namespace DB { -<<<<<<< HEAD -======= -struct ParquetFileBucketInfo : public FileBucketInfo -{ - std::vector row_group_ids; - - ParquetFileBucketInfo() = default; - explicit ParquetFileBucketInfo(const std::vector & row_group_ids_); - void serialize(WriteBuffer & buffer) override; - void deserialize(ReadBuffer & buffer) override; - String getIdentifier() const override; - String getFormatName() const override - { - return "Parquet"; - } - std::shared_ptr filterByMatchingRowGroups(const std::vector & matching_row_groups) const override; -}; -using ParquetFileBucketInfoPtr = std::shared_ptr; - -struct ParquetBucketSplitter : public IBucketSplitter -{ - ParquetBucketSplitter() = default; - std::vector splitToBuckets(size_t bucket_size, ReadBuffer & buf, const FormatSettings & format_settings_) override; - std::vector splitToBucketsByCount(size_t target_count, ReadBuffer & buf, const FormatSettings & format_settings_) override; -}; - ->>>>>>> 47b6163d9f9 (Merge pull request #104251 from alexey-milovidov/parquet-single-file-parallelism) class ParquetV3BlockInputFormat : public IInputFormat { public: