Push LIMIT / OFFSET into the last RowFilter predicate and skip unused row groups#9766
Push LIMIT / OFFSET into the last RowFilter predicate and skip unused row groups#9766haohuaijin wants to merge 12 commits intoapache:mainfrom
LIMIT / OFFSET into the last RowFilter predicate and skip unused row groups#9766Conversation
LIMIT / OFFSET into the last RowFilter predicate and skip unused row groups
|
for this benchmark in main branch, i copy the benchmark file to main branch. the benchmark script is below |
|
run benchmarks arrow_reader_row_filter_limit |
|
run benchmark arrow_reader_row_filter |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing fix-pushdown-limit-early-stop (011e068) to 51b02f1 (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing fix-pushdown-limit-early-stop (011e068) to 51b02f1 (merge-base) diff File an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
run benchmark arrow_reader_clickbench |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing fix-pushdown-limit-early-stop (011e068) to 51b02f1 (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
|
Hi @Dandandan thank you for trigger benchmark, i submit another pr to add the new benchmark for limit #9767 to make it easy to compare(after merge to main) |
# Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. --> - part of #9766 # Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> # What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> # Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> # Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. If there are any breaking changes to public APIs, please call them out. -->
I just merged and updated this PR and will retrigger |
|
run benchmark arrow_reader_clickbench |
|
run benchmark arrow_reader_row_filter |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing fix-pushdown-limit-early-stop (15e7f5f) to 9d3a4d9 (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing fix-pushdown-limit-early-stop (15e7f5f) to 9d3a4d9 (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
Hi @Dandandan @alamb the benchmark result look good, do you have time take a look for this pr? |
|
run benchmark arrow_reader_row_filter |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing fix-pushdown-limit-early-stop (dbcf49d) to b93240a (merge-base) diff File an issue against this benchmark runner |
alamb
left a comment
There was a problem hiding this comment.
Thank you @haohuaijin -- I think this looks really nice. I had one API design suggestion and a minor nitpick but otherwise 💯
Thank you
| /// should have been passed to subsequent predicates. | ||
| /// | ||
| /// If `limit` is `None` this behaves exactly like [`Self::with_predicate`]. | ||
| pub fn with_predicate_limited( |
There was a problem hiding this comment.
Thanks @haohuaijin -- this actually makes a lot of sense to me 👍
I do think the API is somewhat unfortunate now (the fact we have to add a new function shows it isn't ideal.)
Would you be willing to make the API a little more future proof with something like the following?
struct PredicateOptions {
array_reader: Box<dyn ArrayReader>,
predicate: &mut dyn ArrowPredicate,
limit: Option<usize>,
total_rows: usize,
}pub fn with_predicate_options(mut self, options: PredicateOptions) -> Result<Self> {
...
}| let reader = ParquetRecordBatchReader::new(array_reader, self.clone().build()); | ||
| let mut filters = vec![]; | ||
| let mut processed_rows: usize = 0; | ||
| let mut cumulative_matches: usize = 0; |
There was a problem hiding this comment.
I would have found a name like matched_rows easier to understand given the processed_rows other variable
| match limit { | ||
| Some(limit) if cumulative_matches + filter.true_count() >= limit => { | ||
| let needed = limit - cumulative_matches; | ||
| let truncated = truncate_filter_after_n_trues(&filter, needed); |
There was a problem hiding this comment.
you could avoid this clone by passing in filter rather than &filter
| /// | ||
| /// `filter` must not contain nulls (callers apply [`prep_null_mask_filter`] | ||
| /// first). If `filter` has at most `n` `true` values, a clone is returned. | ||
| fn truncate_filter_after_n_trues(filter: &BooleanArray, n: usize) -> BooleanArray { |
There was a problem hiding this comment.
This might nice (as a follow on PR) to make as a method on BooleanArray -- something like BooleanArray::take_n_true or something 🤔
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
Those are some sweet results @haohuaijin 😎 |
alamb
left a comment
There was a problem hiding this comment.
Thanks @haohuaijin - looks great to me
| /// | ||
| /// Only valid for the *last* predicate in a filter chain: intermediate | ||
| /// predicates' match counts do not map 1:1 to output rows. | ||
| pub fn with_limit(mut self, limit: usize, total_rows: usize) -> Self { |
There was a problem hiding this comment.
this is nice API that makes it clear limit and total rows is required

Which issue does this PR close?
RowFilterwhenwith_limit(N)is set #9765Rationale for this change
RowFilterwhenwith_limit(N)is set #9765.What changes are included in this PR?
Push
LIMIT + OFFSETinto the lastRowFilterpredicate and skip row groups once the limit is exhausted.Within a row group. New
ReadPlanBuilder::with_predicate_limitedtakes an optional cap; once cumulative matches reach it, the batch filter is truncated and the reader loop breaks. The tail is padded as "not selected" soRowSelectionstill spans the full row group.with_predicatebecomes a thin wrapper passingNone.Across row groups.
RowGroupReaderBuilder::transitionshort-circuitsStart → Finishedwhenlimit == Some(0), skipping filter-plan setup and predicate-column fetches.Wiring.
FilterInfo::is_lastmarks the final predicate; the call site passesSome(limit + offset)only for it (offset is included becausewith_offsetruns after the predicate).Are these changes tested?
truncate_filter_after_n_truesand tail-padding inwith_predicate_limited.offset + limitwindow, multi-predicate chains short-circuiting only on the last predicate, and preservation with a priorRowSelection. Predicate-invocation counters verify fewer rows are evaluated.benchmark_filters_with_limitgroup inarrow_reader_row_filter.rswithLIMIT 10.Are there any user-facing changes?
ReadPlanBuilder::with_predicate_limited;with_predicateunchanged. No breaking changes.RowFilter+with_limit/with_offsetreads now decode fewer predicate pages. Output is identical.