Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/138023.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 138023
summary: Push down count
area: ES|QL
type: feature
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,11 @@ private Page buildConstantBlocksResult(List<Object> tags, PerTagsState state) {
Block[] blocks = new Block[2 + tagTypes.size()];
int b = 0;
try {
blocks[b++] = blockFactory.newConstantLongBlockWith(state.totalHits, 1);
blocks[b++] = blockFactory.newConstantBooleanBlockWith(true, 1);
for (Object e : tags) {
for (Object e : tags) { // by
blocks[b++] = BlockUtils.constantBlock(blockFactory, e, 1);
}
blocks[b++] = blockFactory.newConstantLongBlockWith(state.totalHits, 1); // count
blocks[b] = blockFactory.newConstantBooleanBlockWith(true, 1); // seen
Page page = new Page(1, blocks);
blocks = null;
return page;
Expand Down Expand Up @@ -207,12 +207,12 @@ private Page buildNonConstantBlocksResult() {
}
}

blocks[0] = countBuilder.build().asBlock();
blocks[1] = blockFactory.newConstantBooleanBlockWith(true, tagsToState.size());
for (b = 0; b < builders.length; b++) {
blocks[2 + b] = builders[b].builder().build();
builders[b++] = null;
for (b = 0; b < builders.length; b++) { // by
blocks[b] = builders[b].builder().build();
builders[b] = null;
}
blocks[b++] = countBuilder.build().asBlock(); // count
blocks[b] = blockFactory.newConstantBooleanBlockWith(true, tagsToState.size()); // seen
Page page = new Page(tagsToState.size(), blocks);
blocks = null;
return page;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,9 @@ public void testPushRoundToCountToQuery() throws IOException {
try (CannedSourceOperator sourceOperator = new CannedSourceOperator(dataDriverPages.iterator())) {
HashAggregationOperator.HashAggregationOperatorFactory aggFactory =
new HashAggregationOperator.HashAggregationOperatorFactory(
List.of(new BlockHash.GroupSpec(2, ElementType.LONG)),
AggregatorMode.FINAL,
List.of(CountAggregatorFunction.supplier().groupingAggregatorFactory(AggregatorMode.FINAL, List.of(0, 1))),
List.of(new BlockHash.GroupSpec(0, ElementType.LONG)),
AggregatorMode.INTERMEDIATE,
List.of(CountAggregatorFunction.supplier().groupingAggregatorFactory(AggregatorMode.INTERMEDIATE, List.of(1, 2))),
Integer.MAX_VALUE,
null
);
Expand All @@ -426,7 +426,7 @@ public void testPushRoundToCountToQuery() throws IOException {

assertThat(reduceDriverPages, hasSize(1));
Page result = reduceDriverPages.getFirst();
assertThat(result.getBlockCount(), equalTo(2));
assertThat(result.getBlockCount(), equalTo(3));
LongBlock groupsBlock = result.getBlock(0);
LongVector groups = groupsBlock.asVector();
LongBlock countsBlock = result.getBlock(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ private static long getCount(Page p) {
}

private static void checkSeen(Page p, Matcher<Integer> positionCount) {
BooleanBlock b = p.getBlock(1);
BooleanBlock b = p.getBlock(p.getBlockCount() - 1);
BooleanVector v = b.asVector();
assertThat(v.getPositionCount(), positionCount);
assertThat(v.isConstant(), equalTo(true));
Expand All @@ -337,9 +337,9 @@ private static Map<Integer, Long> getCountsByTag(List<Page> results) {
for (Page page : results) {
assertThat(page.getBlockCount(), equalTo(3));
checkSeen(page, greaterThanOrEqualTo(0));
LongBlock countsBlock = page.getBlock(0);
LongBlock countsBlock = page.getBlock(page.getBlockCount() - 2);
LongVector counts = countsBlock.asVector();
IntBlock groupsBlock = page.getBlock(2);
IntBlock groupsBlock = page.getBlock(0);
IntVector groups = groupsBlock.asVector();
for (int p = 0; p < page.getPositionCount(); p++) {
long count = counts.getLong(p);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ private synchronized void reset() {
protected static boolean testClustersOk = true;

@Before
public void setup() {
public void setup() throws IOException {
assumeTrue("test clusters were broken", testClustersOk);
INGEST.protectedBlock(() -> {
// Inference endpoints must be created before ingesting any datasets that rely on them (mapping of inference_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,21 @@ c:l | l:i
15 |1
;

countStarGroupedTrunc
from employees | stats c = count(*) by d=date_trunc(1 year, hire_date) | sort d | limit 10;
c:l | d:datetime
11 | 1985-01-01T00:00:00.000Z
11 | 1986-01-01T00:00:00.000Z
15 | 1987-01-01T00:00:00.000Z
9 | 1988-01-01T00:00:00.000Z
13 | 1989-01-01T00:00:00.000Z
12 | 1990-01-01T00:00:00.000Z
6 | 1991-01-01T00:00:00.000Z
8 | 1992-01-01T00:00:00.000Z
3 | 1993-01-01T00:00:00.000Z
4 | 1994-01-01T00:00:00.000Z
;

countAllAndOtherStatGrouped
from employees | stats c = count(*), min = min(emp_no) by languages | sort languages;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.EnableSpatialDistancePushdown;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.ExtractDimensionFieldsAfterAggregation;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.InsertFieldExtraction;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushCountQueryAndTagsToSource;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushFiltersToSource;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushLimitToSource;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushSampleToSource;
Expand Down Expand Up @@ -79,7 +80,12 @@ protected static List<Batch<PhysicalPlan>> rules(boolean optimizeForEsSource) {
// execute the SubstituteRoundToWithQueryAndTags rule once after all the other pushdown rules are applied, as this rule generate
// multiple QueryBuilders according the number of RoundTo points, it should be applied after all the other eligible pushdowns are
// done, and it should be executed only once.
var substitutionRules = new Batch<>("Substitute RoundTo with QueryAndTags", Limiter.ONCE, new ReplaceRoundToWithQueryAndTags());
var substitutionRules = new Batch<>(
"Substitute RoundTo with QueryAndTags",
Limiter.ONCE,
new ReplaceRoundToWithQueryAndTags(),
new PushCountQueryAndTagsToSource()
);

// add the field extraction in just one pass
// add it at the end after all the other rules have ran
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.optimizer.rules.physical.local;

import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.GreaterThan;
import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerRules;
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
import org.elasticsearch.xpack.esql.plan.physical.EsStatsQueryExec;
import org.elasticsearch.xpack.esql.plan.physical.EvalExec;
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.querydsl.query.SingleValueQuery;

/**
* Pushes count aggregations on top of query and tags to source.
* Will transform:
* <pre>
* Aggregate (count(*) by x)
* └── Eval (x = round_to)
* └── Query [query + tags]
* </pre>
* into:
* <pre>
* Filter (count > 0)
* └── StatsQuery [count with query + tags]
* </pre>
* Where the filter is needed since the original Aggregate would not produce buckets with count = 0.
*/
public class PushCountQueryAndTagsToSource extends PhysicalOptimizerRules.OptimizerRule<AggregateExec> {
@Override
protected PhysicalPlan rule(AggregateExec aggregateExec) {
if (
// Ensures we are only grouping by one field (2 aggregates: count + group by field).
aggregateExec.aggregates().size() == 2
&& aggregateExec.aggregates().getFirst() instanceof Alias alias
&& alias.child() instanceof Count count
&& count.hasFilter() == false // TODO We don't support filters at the moment (but we definitely should!).
&& count.field() instanceof Literal // Ensures count(*) or equivalent.
&& aggregateExec.child() instanceof EvalExec evalExec
&& evalExec.child() instanceof EsQueryExec queryExec
&& queryExec.queryBuilderAndTags().size() > 1 // Ensures there are query and tags to push down.
&& queryExec.queryBuilderAndTags().stream().allMatch(PushCountQueryAndTagsToSource::isSingleFilterQuery)) {
EsStatsQueryExec statsQueryExec = new EsStatsQueryExec(
queryExec.source(),
queryExec.indexPattern(),
null, // query
queryExec.limit(),
aggregateExec.output(),
new EsStatsQueryExec.ByStat(queryExec.queryBuilderAndTags())
);
// Wrap with FilterExec to remove empty buckets (keep buckets where count > 0). This was automatically handled by the
// AggregateExec, but since we removed it, we need to do it manually.
Attribute countAttr = statsQueryExec.output().get(1);
return new FilterExec(Source.EMPTY, statsQueryExec, new GreaterThan(Source.EMPTY, countAttr, ZERO));
}
return aggregateExec;
}

private static boolean isSingleFilterQuery(EsQueryExec.QueryBuilderAndTags queryBuilderAndTags) {
return switch (queryBuilderAndTags.query()) {
case SingleValueQuery.Builder unused -> true;
case BoolQueryBuilder bq -> bq.filter().size() + bq.must().size() + bq.should().size() + bq.mustNot().size() <= 1;
default -> false;
};
}

private static final Literal ZERO = new Literal(Source.EMPTY, 0L, DataType.LONG);
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,20 @@ protected PhysicalPlan rule(AggregateExec aggregateExec, LocalPhysicalOptimizerC

// for the moment support pushing count just for one field
List<EsStatsQueryExec.Stat> stats = tuple.v2();
if (stats.size() != 1) {
if (stats.size() != 1 || stats.size() != aggregateExec.aggregates().size()) {
return aggregateExec;
}

// TODO: handle case where some aggs cannot be pushed down by breaking the aggs into two sources (regular + stats) + union
// use the stats since the attributes are larger in size (due to seen)
if (tuple.v2().size() == aggregateExec.aggregates().size()) {
plan = new EsStatsQueryExec(
aggregateExec.source(),
queryExec.indexPattern(),
queryExec.query(),
queryExec.limit(),
tuple.v1(),
tuple.v2()
);
}
plan = new EsStatsQueryExec(
aggregateExec.source(),
queryExec.indexPattern(),
queryExec.query(),
queryExec.limit(),
tuple.v1(),
stats.get(0)
);
}
return plan;
}
Expand Down Expand Up @@ -117,7 +115,7 @@ private Tuple<List<Attribute>, List<EsStatsQueryExec.Stat>> pushableStats(
var countFilter = TRANSLATOR_HANDLER.asQuery(LucenePushdownPredicates.DEFAULT, count.filter());
query = Queries.combine(Queries.Clause.MUST, asList(countFilter.toQueryBuilder(), query));
}
return new EsStatsQueryExec.Stat(fieldName, COUNT, query);
return new EsStatsQueryExec.BasicStat(fieldName, COUNT, query);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,7 @@ public DataType resulType() {
}
}

public record QueryBuilderAndTags(QueryBuilder query, List<Object> tags) {
@Override
public String toString() {
return "QueryBuilderAndTags{" + "queryBuilder=[" + query + "], tags=" + tags.toString() + "}";
}
};
public record QueryBuilderAndTags(QueryBuilder query, List<Object> tags) {};

public EsQueryExec(
Source source,
Expand Down Expand Up @@ -343,6 +338,7 @@ public boolean canSubstituteRoundToWithQueryBuilderAndTags() {
*/
private QueryBuilder queryWithoutTag() {
QueryBuilder queryWithoutTag;

if (queryBuilderAndTags == null || queryBuilderAndTags.isEmpty()) {
return null;
} else if (queryBuilderAndTags.size() == 1) {
Expand Down
Loading