Skip to content

feat: optimize parquet reads with bucket and page-level filtering#232

Open
liangjie3138 wants to merge 16 commits intoalibaba:mainfrom
liangjie3138:dev_parquet
Open

feat: optimize parquet reads with bucket and page-level filtering#232
liangjie3138 wants to merge 16 commits intoalibaba:mainfrom
liangjie3138:dev_parquet

Conversation

@liangjie3138
Copy link
Copy Markdown
Contributor

Purpose

Linked issue: close ##137

Implement multi-level filtering optimization for Parquet file reading. By leveraging ColumnIndex statistics and bucket predicate derivation, the reader can skip non-matching data at the bucket, row group, and page levels, reducing I/O and decoding overhead.

Main Features

  1. Page-level data filtering

    • ColumnIndexFilter: Filters data pages based on Parquet ColumnIndex min/max statistics. Supports predicates such as EQUAL, NOT_EQUAL, LESS_THAN, GREATER_THAN, IN, IS_NULL, and compound predicates with AND/OR.
    • PageFilteredRowGroupReader: Reads row groups after page-level filtering:
      • I/O layer: Skips non-matching pages through the data_page_filter callback.
      • Decoding layer: Skips rows through SkipRecords/ReadRecords.
  2. Page-level prefetching
    Computes the byte ranges of required pages based on RowRanges and OffsetIndex, and uses ArrowPreBuffer for asynchronous prefetching.

  3. BucketSelectConverter
    Derives target bucket IDs from query predicates, and is compatible with the Java Paimon hash algorithm.

Tests

  • bucket_select_converter_test.cpp: Covers various predicate combinations, Timestamp type, and Cartesian product computation.
  • column_index_filter_test.cpp: Covers all predicate types (EQUAL, IN, LESS_THAN, GREATER_THAN, IS_NULL, etc.) and AND/OR compound predicates.
  • page_filtered_row_group_reader_test.cpp: Verifies filtering correctness, edge cases, and prefetching behavior.

API and Format

No public API changes. No impact on storage format or protocol.

Documentation

Not applicable.

Generative AI Tooling

Claude Code (Opus 4.6)

@CLAassistant
Copy link
Copy Markdown

CLAassistant commented Apr 16, 2026

CLA assistant check
All committers have signed the CLA.

@CLAassistant
Copy link
Copy Markdown

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you all sign our Contributor License Agreement before we can accept your contribution.
0 out of 2 committers have signed the CLA.

❌ liangjie.liang
❌ liangjie3138


liangjie.liang seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.

@liangjie3138 liangjie3138 changed the title Dev parquet feat: optimize parquet reads with bucket and page-level filtering Apr 16, 2026
@lxy-9602
Copy link
Copy Markdown
Collaborator

lxy-9602 commented Apr 17, 2026

Thank you for your contribution! This is a highly complex and important feature, and your work on it is greatly appreciated.

Given the large scope of this PR, would it be possible to split it into smaller, focused changes? For example, separating the bucket predicate logic from the Parquet point lookup improvements could make each part easier to review and move forward incrementally.

Also, could you please fix the CI failures first so we can begin the review process?

We truly recognize the effort behind this change and look forward to helping get it merged smoothly.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Implements multi-level Parquet read optimizations (bucket selection + page/row-group filtering) by leveraging Parquet page indexes (ColumnIndex/OffsetIndex) and adding page-level prefetching to reduce I/O and decode work.

Changes:

  • Added page-index-based filtering infrastructure (ColumnIndexFilter, RowRanges) and a page-filtered row-group reader with page-range prefetch support.
  • Integrated page-level filtering/prefetch into ParquetFileBatchReader/FileReaderWrapper, and enabled writing page indexes via a new writer option.
  • Added bucket-id derivation from predicates (BucketSelectConverter) and expanded scan bucket filtering to support multiple buckets.

Reviewed changes

Copilot reviewed 28 out of 28 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
src/paimon/format/parquet/row_ranges.h Introduces RowRanges abstraction for page/row-range selection.
src/paimon/format/parquet/row_ranges.cpp Implements range union/intersection/overlap/add logic used by page filtering.
src/paimon/format/parquet/parquet_writer_builder.cpp Enables Parquet page index writing behind an option.
src/paimon/format/parquet/parquet_format_defs.h Adds new read/write options for page-index functionality.
src/paimon/format/parquet/parquet_file_batch_reader.h Adds page-index filtering API and logging member.
src/paimon/format/parquet/parquet_file_batch_reader.cpp Wires page-level filtering + eager prepare to start prebuffer earlier.
src/paimon/format/parquet/page_filtered_row_group_reader.h Declares page-filtered row group read + page-range computation.
src/paimon/format/parquet/page_filtered_row_group_reader.cpp Implements decode skipping + page-range prefetch logic for filtered reads.
src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp Adds end-to-end tests for page filtering and page-range computation.
src/paimon/format/parquet/file_reader_wrapper.h Extends wrapper to support page-filtered RG reads and page-range prebuffering.
src/paimon/format/parquet/file_reader_wrapper.cpp Implements page-filtered RG scheduling + unified PreBufferRanges prefetch.
src/paimon/format/parquet/column_index_filter.h Adds ColumnIndex-based predicate evaluation for page selection.
src/paimon/format/parquet/column_index_filter.cpp Implements ColumnIndex-based page matching and RowRanges generation.
src/paimon/format/parquet/column_index_filter_test.cpp Adds RowRanges unit tests + ColumnIndexFilter integration tests.
src/paimon/format/parquet/CMakeLists.txt Registers new parquet sources/tests; adds Arrow source include path.
src/paimon/core/operation/key_value_file_store_scan.cpp Derives bucket filter from predicates when not explicitly set.
src/paimon/core/operation/file_store_scan.h Changes bucket filter to optional<set<int32_t>>; adds helpers.
src/paimon/core/operation/file_store_scan.cpp Updates bucket filtering logic to handle multiple buckets.
src/paimon/core/operation/bucket_select_converter.h Declares predicate→bucket-id derivation helper.
src/paimon/core/operation/bucket_select_converter.cpp Implements bucket-id derivation compatible with Java hashing.
src/paimon/core/operation/bucket_select_converter_test.cpp Adds tests for bucket derivation across predicate shapes/types.
src/paimon/core/operation/merge_file_split_read.cpp Refactors loops to index-based iteration.
src/paimon/core/operation/abstract_split_read.cpp Refactors loop to index-based iteration.
src/paimon/core/mergetree/compact/sort_merge_reader_with_min_heap.cpp Refactors loop to index-based iteration.
src/paimon/common/utils/arrow/arrow_input_stream_adapter.h Tracks outstanding async reads for safe destruction.
src/paimon/common/utils/arrow/arrow_input_stream_adapter.cpp Waits for pending futures; prunes finished futures.
src/paimon/CMakeLists.txt Registers new core operation source + test.
cmake_modules/arrow.diff Patches Arrow Parquet reader to add PreBufferRanges/WhenBufferedRanges and cached page-range reads.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/paimon/format/parquet/column_index_filter.cpp
Comment thread cmake_modules/arrow.diff Outdated
Comment thread src/paimon/format/parquet/file_reader_wrapper.cpp
Comment thread src/paimon/format/parquet/file_reader_wrapper.cpp
Comment thread src/paimon/format/parquet/row_ranges.cpp Outdated
Comment thread src/paimon/format/parquet/column_index_filter_test.cpp Outdated
Comment thread src/paimon/format/parquet/page_filtered_row_group_reader.cpp
Comment thread src/paimon/format/parquet/file_reader_wrapper.cpp
Comment thread src/paimon/format/parquet/column_index_filter.cpp
Comment thread src/paimon/core/operation/bucket_select_converter.cpp Outdated
Comment thread src/paimon/core/operation/bucket_select_converter.h Outdated
Comment thread src/paimon/core/operation/bucket_select_converter_test.cpp Outdated
Comment thread src/paimon/format/parquet/column_index_filter.h Outdated
Comment thread src/paimon/format/parquet/page_filtered_row_group_reader.cpp Outdated
Comment thread src/paimon/format/parquet/page_filtered_row_group_reader.h Outdated
Comment thread src/paimon/format/parquet/row_ranges.cpp Outdated
Comment thread src/paimon/format/parquet/row_ranges.h Outdated
Comment thread src/paimon/common/utils/arrow/arrow_input_stream_adapter.cpp Outdated
Comment thread src/paimon/core/mergetree/compact/sort_merge_reader_with_min_heap.cpp Outdated
Comment thread src/paimon/core/operation/abstract_split_read.cpp Outdated
Comment thread src/paimon/core/operation/bucket_select_converter.cpp Outdated
Comment thread src/paimon/core/operation/merge_file_split_read.cpp Outdated
Comment thread src/paimon/core/operation/merge_file_split_read.cpp Outdated
@liangjie3138 liangjie3138 requested a review from lucasfang April 22, 2026 09:09
// causing use-after-free in arrow::PoolBuffer::~PoolBuffer().
std::mutex pending_futures_mutex_;
std::vector<arrow::Future<std::shared_ptr<arrow::Buffer>>> pending_futures_;
};
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious — is this issue introduced by the page-level pushdown feature, or did it exist before?
If it's a pre-existing problem, should we consider submitting a separate fix PR to address it independently?

That way, we can clearly track and verify the bug fix without being coupled to the new feature changes.

const std::shared_ptr<Predicate>& predicate,
::parquet::RowGroupPageIndexReader* rg_page_index_reader,
const std::map<std::string, int32_t>& column_name_to_index, int64_t row_group_row_count) {
if (auto leaf_predicate = std::dynamic_pointer_cast<LeafPredicate>(predicate)) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency, could you please place output parameters like rg_page_index_reader at the end of the parameter list — this follows the common style in our codebase.

auto function_type = function.GetType();
const auto& literals = leaf_predicate->Literals();
switch (function_type) {
case Function::Type::IS_NULL:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since function and function_type are needed outside the if block, consider declaring them in the outer scope for clarity.

if (it == column_name_to_index.end()) {
// Column not found in file (schema evolution): all values are treated as NULL.
// Return precise results based on predicate type, matching Java behavior.
const auto& function = leaf_predicate->GetFunction();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case seems unlikely to occur, because in C++ Paimon, schema evolution (e.g., missing columns filled with nulls, unsupported predicates not pushed down) is already handled by FieldMappingReader.

Given that, we don’t need to handle missing columns here; it’s already filtered upstream. So reporting an error is just ok.

// NULL = non_null → no rows.
bool has_null_literal = !literals.empty() && literals[0].IsNull();
return has_null_literal ? RowRanges::CreateSingle(row_group_row_count)
: RowRanges::CreateEmpty();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, equal predicates with null literals should not match — see java.
We return false on null, rather than allowing null to participate in equality.

for (int i = 0; i < arrow_schema->num_fields(); ++i) {
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
auto empty_array, arrow::MakeEmptyArray(arrow_schema->field(i)->type(), pool));
empty_columns.push_back(std::move(empty_array));
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please avoid auto with PAIMON_ASSIGN_OR_RAISE — prefer explicit types for clarity.

// Build Table from ChunkedArrays, then combine chunks and extract a single RecordBatch
auto table = arrow::Table::Make(arrow_schema, columns, expected_rows);
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Table> combined_table,
table->CombineChunks(pool));
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether CombineChunks copies data during its operation. Does it perform a deep copy of the chunk data, or does it work with references/views to avoid unnecessary duplication?

int64_t remaining = current_filtered_batch_->num_rows() - filtered_batch_offset_;
int64_t slice_len = (batch_size_ > 0 && remaining > batch_size_) ? batch_size_ : remaining;
record_batch = current_filtered_batch_->Slice(filtered_batch_offset_, slice_len);

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted that the returned C ArrowArray must have offset = 0. Otherwise, converting from C ArrowArray to arrow::RecordBatch will fail with an error (see
https://github.com/apache/arrow/blob/main/cpp/src/arrow/c/bridge.cc#L1563 for ref). Therefore, Therefore, the line current_filtered_batch_->Slice(filtered_batch_offset_, slice_len); needs to be adjusted to ensure the resulting array starts at offset 0.


auto meta_data = file_reader_->parquet_reader()->metadata();
int64_t row_count = meta_data->RowGroup(row_group_index)->num_rows();

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big issue, but I noticed the declaration for meta_data and row_count is repeated a few times. Maybe we could extract them into a common section to reduce duplication.

current_filtered_batch_ = full_batch;
filtered_batch_offset_ = batch_size_;
record_batch = full_batch->Slice(0, batch_size_);
} else {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to have a test case for the case that the page-level predicate returns more rows than the batch size. For example, we could set page row count = 3 and try different batch sizes like 1, 2, 3, 5, and 10, to check how the results are split across batches.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants