feat(parquet): wire scan_filtered through ArrayReader stack; add with_miniblock_predicate#9770
Draft
sahuagin wants to merge 2 commits intoapache:mainfrom
Draft
feat(parquet): wire scan_filtered through ArrayReader stack; add with_miniblock_predicate#9770sahuagin wants to merge 2 commits intoapache:mainfrom
sahuagin wants to merge 2 commits intoapache:mainfrom
Conversation
…red() Two additions to DeltaBitPackDecoder: 1. skip() optimization: bw=0 miniblocks use an O(1) multiply instead of decoding 32/64 values per miniblock. Terminal skips (discarding all remaining page values) avoid heap allocation and last_value tracking. 2. Decoder::scan_filtered() — new provided method on the Decoder trait (default: decode everything, safe fallback for all encodings). DeltaBitPackDecoder overrides it to compute a conservative value range [lo, hi] per miniblock and skip non-matching miniblocks without decoding individual values. Benchmarks vs upstream HEAD (arrow_reader bench): bw=0 single-value skip: -21.6% bw=0 increasing-value skip: -24.3% mixed stepped skip: -3.9% Wall-time scan_filtered on 1M-row DELTA file (monotone column): full decode: 1.96ms -> scan_filtered: 470us (4.2x speedup)
…_miniblock_predicate
Wires DeltaBitPackDecoder::scan_filtered() up the full column reader stack
and exposes it as a public API on both ParquetRecordBatchReaderBuilder
(sync) and ParquetRecordBatchStreamBuilder (async).
Wiring chain (bottom to top):
ColumnValueDecoderImpl::scan_filtered_values()
GenericColumnReader::scan_filtered_records() (mandatory columns only)
GenericRecordReader::scan_filtered_records() (optional/repeated fallback)
ArrayReader::scan_records() trait method + page-switching helper
PrimitiveArrayReader::scan_records() override
StructArrayReader::scan_records() (single-child delegate)
ParquetRecordBatchReader::next_inner All branch
ArrowReaderBuilder::with_miniblock_predicate()
Public API additions:
- MiniblockPredicate type alias
(Arc<dyn Fn(i64, i64) -> bool + Send + Sync>)
- ArrowReaderBuilder::with_miniblock_predicate(pred) fluent setter
Example:
let pred: MiniblockPredicate = Arc::new(|_lo, hi| hi >= 1_000_000);
let reader = ParquetRecordBatchReaderBuilder::try_new(file)?
.with_projection(mask)
.with_miniblock_predicate(pred)
.build()?;
Limitations:
- Multi-column projections fall back to full decode (column value ranges
are independent; a shared predicate would produce mismatched lengths)
- Optional/repeated columns fall back (def/rep levels must stay
synchronized with the values buffer)
- No file format changes; miniblock ranges computed on-the-fly from
DELTA_BINARY_PACKED block headers already present in the page data
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Depends on #9769 (the decoder PR — diff includes those changes while that PR is pending).
Please review after #9769 merges; this PR's diff includes those decoder changes while that PR is pending.
Rationale for this change
Exposes the scan_filtered miniblock-level predicate pushdown added in #9769 through the full column reader stack and as a public API. For mandatory DELTA_BINARY_PACKED
INT32/INT64 columns, entire miniblocks (32/64 values) can be skipped without decoding when a caller-supplied range predicate rules them out. This is especially effective
for monotone columns (timestamps, sequence numbers, auto-increment IDs) where bw=0 blocks allow O(1) skipping.
What changes are included in this PR?
Wiring chain (bottom to top):
New public API:
pub type MiniblockPredicate = Arc<dyn Fn(i64, i64) -> bool + Send + Sync>;
// on ArrowReaderBuilder:
pub fn with_miniblock_predicate(self, predicate: MiniblockPredicate) -> Self;
Example:
let pred: MiniblockPredicate = Arc::new(|_lo, hi| hi >= 1_000_000);
let reader = ParquetRecordBatchReaderBuilder::try_new(file)?
.with_projection(mask)
.with_miniblock_predicate(pred)
.build()?;
Limitations (documented in MiniblockPredicate rustdoc):
Are these changes tested?
miniblock-level skipping and no false negatives
and that fewer rows are returned than a full read
Are there any user-facing changes?
Yes — two additive public API items:
No breaking changes. The new scan_records method on the ArrayReader trait has a provided default (read_records) so no existing implementations are affected.