diff --git a/docs/changelog/138023.yaml b/docs/changelog/138023.yaml new file mode 100644 index 0000000000000..b4cdaa9d7acb4 --- /dev/null +++ b/docs/changelog/138023.yaml @@ -0,0 +1,5 @@ +pr: 138023 +summary: Push down COUNT(*) BY DATE_TRUNC +area: ES|QL +type: feature +issues: [] diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java index cab81212743bd..39f2b39c63ef2 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java @@ -175,11 +175,11 @@ private Page buildConstantBlocksResult(List tags, PerTagsState state) { Block[] blocks = new Block[2 + tagTypes.size()]; int b = 0; try { - blocks[b++] = blockFactory.newConstantLongBlockWith(state.totalHits, 1); - blocks[b++] = blockFactory.newConstantBooleanBlockWith(true, 1); - for (Object e : tags) { + for (Object e : tags) { // by blocks[b++] = BlockUtils.constantBlock(blockFactory, e, 1); } + blocks[b++] = blockFactory.newConstantLongBlockWith(state.totalHits, 1); // count + blocks[b] = blockFactory.newConstantBooleanBlockWith(true, 1); // seen Page page = new Page(1, blocks); blocks = null; return page; @@ -207,12 +207,12 @@ private Page buildNonConstantBlocksResult() { } } - blocks[0] = countBuilder.build().asBlock(); - blocks[1] = blockFactory.newConstantBooleanBlockWith(true, tagsToState.size()); - for (b = 0; b < builders.length; b++) { - blocks[2 + b] = builders[b].builder().build(); - builders[b++] = null; + for (b = 0; b < builders.length; b++) { // by + blocks[b] = builders[b].builder().build(); + builders[b] = null; } + blocks[b++] = countBuilder.build().asBlock(); // count + blocks[b] = blockFactory.newConstantBooleanBlockWith(true, tagsToState.size()); // seen Page page = new Page(tagsToState.size(), blocks); blocks = null; return page; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java index fd738a4b39c04..b4c047f70d911 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java @@ -404,9 +404,9 @@ public void testPushRoundToCountToQuery() throws IOException { try (CannedSourceOperator sourceOperator = new CannedSourceOperator(dataDriverPages.iterator())) { HashAggregationOperator.HashAggregationOperatorFactory aggFactory = new HashAggregationOperator.HashAggregationOperatorFactory( - List.of(new BlockHash.GroupSpec(2, ElementType.LONG)), - AggregatorMode.FINAL, - List.of(CountAggregatorFunction.supplier().groupingAggregatorFactory(AggregatorMode.FINAL, List.of(0, 1))), + List.of(new BlockHash.GroupSpec(0, ElementType.LONG)), + AggregatorMode.INTERMEDIATE, + List.of(CountAggregatorFunction.supplier().groupingAggregatorFactory(AggregatorMode.INTERMEDIATE, List.of(1, 2))), Integer.MAX_VALUE, null ); @@ -426,7 +426,7 @@ public void testPushRoundToCountToQuery() throws IOException { assertThat(reduceDriverPages, hasSize(1)); Page result = reduceDriverPages.getFirst(); - assertThat(result.getBlockCount(), equalTo(2)); + assertThat(result.getBlockCount(), equalTo(3)); LongBlock groupsBlock = result.getBlock(0); LongVector groups = groupsBlock.asVector(); LongBlock countsBlock = result.getBlock(1); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneCountOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneCountOperatorTests.java index e34fc9d845aa0..415ef4b4ecc60 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneCountOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneCountOperatorTests.java @@ -325,7 +325,7 @@ private static long getCount(Page p) { } private static void checkSeen(Page p, Matcher positionCount) { - BooleanBlock b = p.getBlock(1); + BooleanBlock b = p.getBlock(p.getBlockCount() - 1); BooleanVector v = b.asVector(); assertThat(v.getPositionCount(), positionCount); assertThat(v.isConstant(), equalTo(true)); @@ -337,9 +337,9 @@ private static Map getCountsByTag(List results) { for (Page page : results) { assertThat(page.getBlockCount(), equalTo(3)); checkSeen(page, greaterThanOrEqualTo(0)); - LongBlock countsBlock = page.getBlock(0); + LongBlock countsBlock = page.getBlock(page.getBlockCount() - 2); LongVector counts = countsBlock.asVector(); - IntBlock groupsBlock = page.getBlock(2); + IntBlock groupsBlock = page.getBlock(0); IntVector groups = groupsBlock.asVector(); for (int p = 0; p < page.getPositionCount(); p++) { long count = counts.getLong(p); diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec index 131facf5074e3..f8a9453b0a439 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec @@ -179,6 +179,32 @@ fork3 | null | 100 | 10100 | 10001 fork4 | null | 100 | 10001 | null ; +forkWithStatsCountStarDateTrunc +required_capability: fork_v9 + +FROM employees +| FORK (WHERE emp_no == 10048 OR emp_no == 10081) + (WHERE emp_no == 10081 OR emp_no == 10087) + (STATS x = COUNT(*), y = MAX(emp_no) by hd = DATE_TRUNC(1 year, hire_date)) + (STATS x = COUNT(*), y = MIN(emp_no) by hd = DATE_TRUNC(2 year, hire_date)) +| KEEP _fork, emp_no, hd, x, y +| SORT _fork, emp_no, hd +| LIMIT 10 +; + +_fork:keyword | emp_no:integer | hd:datetime | x:long | y:integer +fork1 | 10048 | null | null | null +fork1 | 10081 | null | null | null +fork2 | 10081 | null | null | null +fork2 | 10087 | null | null | null +fork3 | null | 1985-01-01T00:00:00.000Z | 11 | 10098 +fork3 | null | 1986-01-01T00:00:00.000Z | 11 | 10095 +fork3 | null | 1987-01-01T00:00:00.000Z | 15 | 10100 +fork3 | null | 1988-01-01T00:00:00.000Z | 9 | 10099 +fork3 | null | 1989-01-01T00:00:00.000Z | 13 | 10092 +fork3 | null | 1990-01-01T00:00:00.000Z | 12 | 10097 +; + forkWithDissect required_capability: fork_v9 diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec index d385e2cb52388..f48a24c835be7 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec @@ -3789,6 +3789,29 @@ null |1995-08-22T00:00:00.000Z|10 |1995-01-01T00:00:00.000Z null |1995-08-22T00:00:00.000Z|10 |1995-01-01T00:00:00.000Z ; +DateTruncInlinestatsCountStar +required_capability: inline_stats + +FROM employees +| KEEP emp_no, hire_date +| INLINE STATS c = count(*) by yr=date_trunc(1 year, hire_date) +| SORT hire_date DESC +| LIMIT 10 +; + +emp_no:integer | hire_date:datetime |c:long | yr:datetime +10019 | 1999-04-30T00:00:00.000Z | 1 | 1999-01-01T00:00:00.000Z +10024 | 1997-05-19T00:00:00.000Z | 1 | 1997-01-01T00:00:00.000Z +10093 | 1996-11-05T00:00:00.000Z | 1 | 1996-01-01T00:00:00.000Z +10084 | 1995-12-15T00:00:00.000Z | 5 | 1995-01-01T00:00:00.000Z +10022 | 1995-08-22T00:00:00.000Z | 5 | 1995-01-01T00:00:00.000Z +10026 | 1995-03-20T00:00:00.000Z | 5 | 1995-01-01T00:00:00.000Z +10054 | 1995-03-13T00:00:00.000Z | 5 | 1995-01-01T00:00:00.000Z +10016 | 1995-01-27T00:00:00.000Z | 5 | 1995-01-01T00:00:00.000Z +10008 | 1994-09-15T00:00:00.000Z | 4 | 1994-01-01T00:00:00.000Z +10044 | 1994-05-21T00:00:00.000Z | 4 | 1994-01-01T00:00:00.000Z +; + ImplicitCastingMultiTypedBucketDateNanosByYear required_capability: inline_stats diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats.csv-spec index 2cb15bc42d9d1..7ed5b62840b2a 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats.csv-spec @@ -835,6 +835,21 @@ c:l | l:i 15 |1 ; +countStarGroupedTrunc +from employees | stats c = count(*) by d=date_trunc(1 year, hire_date) | sort d | limit 10; +c:l | d:datetime +11 | 1985-01-01T00:00:00.000Z +11 | 1986-01-01T00:00:00.000Z +15 | 1987-01-01T00:00:00.000Z +9 | 1988-01-01T00:00:00.000Z +13 | 1989-01-01T00:00:00.000Z +12 | 1990-01-01T00:00:00.000Z +6 | 1991-01-01T00:00:00.000Z +8 | 1992-01-01T00:00:00.000Z +3 | 1993-01-01T00:00:00.000Z +4 | 1994-01-01T00:00:00.000Z +; + countAllAndOtherStatGrouped from employees | stats c = count(*), min = min(emp_no) by languages | sort languages; diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/subquery.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/subquery.csv-spec index 930ca874488b6..b28ab36d48ca6 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/subquery.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/subquery.csv-spec @@ -340,6 +340,28 @@ employees | 10093 | 3 | null | null null | null | null | 4 | 172.21.3.15 ; +subqueryWithCountStarAndDateTrunc +required_capability: fork_v9 +required_capability: subquery_in_from_command + +FROM employees, (FROM sample_data + | STATS cnt = count(*) by ts=date_trunc(1 hour, @timestamp) + | SORT cnt DESC + ) + metadata _index +| WHERE ( emp_no >= 10091 AND emp_no < 10094) OR emp_no IS NULL +| SORT emp_no, ts +| KEEP _index, emp_no, languages, cnt, ts +; + +_index:keyword | emp_no:integer | languages:integer | cnt:long | ts:datetime +employees | 10091 | 3 | null | null +employees | 10092 | 1 | null | null +employees | 10093 | 3 | null | null +null | null | null | 2 | 2023-10-23T12:00:00.000Z +null | null | null | 5 | 2023-10-23T13:00:00.000Z +; + subqueryInFromWithGrokInSubquery required_capability: fork_v9 required_capability: subquery_in_from_command diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java index 8ce4d2db07931..99f77f73268e5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java @@ -13,6 +13,7 @@ import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.EnableSpatialDistancePushdown; import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.ExtractDimensionFieldsAfterAggregation; import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.InsertFieldExtraction; +import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushCountQueryAndTagsToSource; import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushFiltersToSource; import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushLimitToSource; import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushSampleToSource; @@ -79,7 +80,12 @@ protected static List> rules(boolean optimizeForEsSource) { // execute the SubstituteRoundToWithQueryAndTags rule once after all the other pushdown rules are applied, as this rule generate // multiple QueryBuilders according the number of RoundTo points, it should be applied after all the other eligible pushdowns are // done, and it should be executed only once. - var substitutionRules = new Batch<>("Substitute RoundTo with QueryAndTags", Limiter.ONCE, new ReplaceRoundToWithQueryAndTags()); + var substitutionRules = new Batch<>( + "Substitute RoundTo with QueryAndTags", + Limiter.ONCE, + new ReplaceRoundToWithQueryAndTags(), + new PushCountQueryAndTagsToSource() + ); // add the field extraction in just one pass // add it at the end after all the other rules have ran diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushCountQueryAndTagsToSource.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushCountQueryAndTagsToSource.java new file mode 100644 index 0000000000000..b9a24b00b82ac --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushCountQueryAndTagsToSource.java @@ -0,0 +1,81 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.optimizer.rules.physical.local; + +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.xpack.esql.core.expression.Alias; +import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.expression.Literal; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.expression.function.aggregate.Count; +import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.GreaterThan; +import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerRules; +import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; +import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; +import org.elasticsearch.xpack.esql.plan.physical.EsStatsQueryExec; +import org.elasticsearch.xpack.esql.plan.physical.EvalExec; +import org.elasticsearch.xpack.esql.plan.physical.FilterExec; +import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.esql.querydsl.query.SingleValueQuery; + +/** + * Pushes count aggregations on top of query and tags to source. + * Will transform: + *
+ *  Aggregate (count(*) by x)
+ *  └── Eval (x = round_to)
+ *      └── Query [query + tags]
+ *  
+ * into: + *
+ *  Filter (count > 0)
+ *  └── StatsQuery [count with query + tags]
+ *  
+ * Where the filter is needed since the original Aggregate would not produce buckets with count = 0. + */ +public class PushCountQueryAndTagsToSource extends PhysicalOptimizerRules.OptimizerRule { + @Override + protected PhysicalPlan rule(AggregateExec aggregateExec) { + if ( + // Ensures we are only grouping by one field (2 aggregates: count + group by field). + aggregateExec.aggregates().size() == 2 + && aggregateExec.aggregates().getFirst() instanceof Alias alias + && alias.child() instanceof Count count + && count.hasFilter() == false // TODO We don't support filters at the moment (but we definitely should!). + && count.field() instanceof Literal // Ensures count(*) or equivalent. + && aggregateExec.child() instanceof EvalExec evalExec + && evalExec.child() instanceof EsQueryExec queryExec + && queryExec.queryBuilderAndTags().size() > 1 // Ensures there are query and tags to push down. + && queryExec.queryBuilderAndTags().stream().allMatch(PushCountQueryAndTagsToSource::isSingleFilterQuery)) { + EsStatsQueryExec statsQueryExec = new EsStatsQueryExec( + queryExec.source(), + queryExec.indexPattern(), + null, // query + queryExec.limit(), + aggregateExec.output(), + new EsStatsQueryExec.ByStat(queryExec.queryBuilderAndTags()) + ); + // Wrap with FilterExec to remove empty buckets (keep buckets where count > 0). This was automatically handled by the + // AggregateExec, but since we removed it, we need to do it manually. + Attribute countAttr = statsQueryExec.output().get(1); + return new FilterExec(Source.EMPTY, statsQueryExec, new GreaterThan(Source.EMPTY, countAttr, ZERO)); + } + return aggregateExec; + } + + private static boolean isSingleFilterQuery(EsQueryExec.QueryBuilderAndTags queryBuilderAndTags) { + return switch (queryBuilderAndTags.query()) { + case SingleValueQuery.Builder unused -> true; + case BoolQueryBuilder bq -> bq.filter().size() + bq.must().size() + bq.should().size() + bq.mustNot().size() <= 1; + default -> false; + }; + } + + private static final Literal ZERO = new Literal(Source.EMPTY, 0L, DataType.LONG); +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushStatsToSource.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushStatsToSource.java index c16be81a405b2..a6a64c57b0834 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushStatsToSource.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushStatsToSource.java @@ -51,22 +51,20 @@ protected PhysicalPlan rule(AggregateExec aggregateExec, LocalPhysicalOptimizerC // for the moment support pushing count just for one field List stats = tuple.v2(); - if (stats.size() != 1) { + if (stats.size() != 1 || stats.size() != aggregateExec.aggregates().size()) { return aggregateExec; } // TODO: handle case where some aggs cannot be pushed down by breaking the aggs into two sources (regular + stats) + union // use the stats since the attributes are larger in size (due to seen) - if (tuple.v2().size() == aggregateExec.aggregates().size()) { - plan = new EsStatsQueryExec( - aggregateExec.source(), - queryExec.indexPattern(), - queryExec.query(), - queryExec.limit(), - tuple.v1(), - tuple.v2() - ); - } + plan = new EsStatsQueryExec( + aggregateExec.source(), + queryExec.indexPattern(), + queryExec.query(), + queryExec.limit(), + tuple.v1(), + stats.get(0) + ); } return plan; } @@ -117,7 +115,7 @@ private Tuple, List> pushableStats( var countFilter = TRANSLATOR_HANDLER.asQuery(LucenePushdownPredicates.DEFAULT, count.filter()); query = Queries.combine(Queries.Clause.MUST, asList(countFilter.toQueryBuilder(), query)); } - return new EsStatsQueryExec.Stat(fieldName, COUNT, query); + return new EsStatsQueryExec.BasicStat(fieldName, COUNT, query); } } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java index dede455238b29..7ef0db9f46143 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java @@ -129,12 +129,7 @@ public DataType resulType() { } } - public record QueryBuilderAndTags(QueryBuilder query, List tags) { - @Override - public String toString() { - return "QueryBuilderAndTags{" + "queryBuilder=[" + query + "], tags=" + tags.toString() + "}"; - } - }; + public record QueryBuilderAndTags(QueryBuilder query, List tags) {}; public EsQueryExec( Source source, @@ -304,6 +299,7 @@ public boolean canSubstituteRoundToWithQueryBuilderAndTags() { */ private QueryBuilder queryWithoutTag() { QueryBuilder queryWithoutTag; + if (queryBuilderAndTags == null || queryBuilderAndTags.isEmpty()) { return null; } else if (queryBuilderAndTags.size() == 1) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsStatsQueryExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsStatsQueryExec.java index 397c3ade17a64..e977d42de5360 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsStatsQueryExec.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsStatsQueryExec.java @@ -9,6 +9,8 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.core.Nullable; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.Expression; @@ -31,37 +33,70 @@ public class EsStatsQueryExec extends LeafExec implements EstimatesRowSize { public enum StatsType { COUNT, - MIN, - MAX, - EXISTS } - public record Stat(String name, StatsType type, QueryBuilder query) { + public sealed interface Stat { + List tagTypes(); + } + + public record BasicStat(String name, StatsType type, QueryBuilder query) implements Stat { public QueryBuilder filter(QueryBuilder sourceQuery) { return query == null ? sourceQuery : Queries.combine(Queries.Clause.FILTER, asList(sourceQuery, query)).boost(0.0f); } + + @Override + public List tagTypes() { + return List.of(); + } + } + + public record ByStat(List queryBuilderAndTags) implements Stat { + public ByStat { + if (queryBuilderAndTags.isEmpty()) { + throw new IllegalStateException("ByStat must have at least one queryBuilderAndTags"); + } + } + + @Override + public List tagTypes() { + return List.of(switch (queryBuilderAndTags.getFirst().tags().getFirst()) { + case Integer i -> ElementType.INT; + case Long l -> ElementType.LONG; + default -> throw new IllegalStateException( + "Unsupported tag type in ByStat: " + queryBuilderAndTags.getFirst().tags().getFirst() + ); + }); + } + + @Override + public String toString() { + final StringBuffer sb = new StringBuffer("ByStat{"); + sb.append("queryBuilderAndTags=").append(queryBuilderAndTags); + sb.append('}'); + return sb.toString(); + } } private final String indexPattern; private final QueryBuilder query; private final Expression limit; private final List attrs; - private final List stats; + private final Stat stat; public EsStatsQueryExec( Source source, String indexPattern, - QueryBuilder query, + @Nullable QueryBuilder query, Expression limit, List attributes, - List stats + Stat stat ) { super(source); this.indexPattern = indexPattern; this.query = query; this.limit = limit; this.attrs = attributes; - this.stats = stats; + this.stat = stat; } @Override @@ -76,15 +111,15 @@ public String getWriteableName() { @Override protected NodeInfo info() { - return NodeInfo.create(this, EsStatsQueryExec::new, indexPattern, query, limit, attrs, stats); + return NodeInfo.create(this, EsStatsQueryExec::new, indexPattern, query, limit, attrs, stat); } - public QueryBuilder query() { + public @Nullable QueryBuilder query() { return query; } - public List stats() { - return stats; + public Stat stat() { + return stat; } @Override @@ -107,7 +142,7 @@ public PhysicalPlan estimateRowSize(State state) { @Override public int hashCode() { - return Objects.hash(indexPattern, query, limit, attrs, stats); + return Objects.hash(indexPattern, query, limit, attrs, stat); } @Override @@ -125,7 +160,7 @@ public boolean equals(Object obj) { && Objects.equals(attrs, other.attrs) && Objects.equals(query, other.query) && Objects.equals(limit, other.limit) - && Objects.equals(stats, other.stats); + && Objects.equals(stat, other.stat); } @Override @@ -134,7 +169,7 @@ public String nodeString() { + "[" + indexPattern + "], stats" - + stats + + stat + "], query[" + (query != null ? Strings.toString(query, false, true) : "") + "]" diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index 43130692dfda1..aca25c6fc1676 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -35,6 +35,7 @@ import org.elasticsearch.compute.operator.TimeSeriesAggregationOperator; import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.RefCounted; import org.elasticsearch.core.Releasable; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexSettings; @@ -63,6 +64,7 @@ import org.elasticsearch.search.fetch.StoredFieldsSpec; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.search.internal.AliasFilter; +import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.lookup.SearchLookup; import org.elasticsearch.search.lookup.SourceFilter; import org.elasticsearch.search.sort.SortAndFormats; @@ -105,7 +107,7 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi /** * Context of each shard we're operating against. Note these objects are shared across multiple operators as - * {@link org.elasticsearch.core.RefCounted}. + * {@link RefCounted}. */ public abstract static class ShardContext implements org.elasticsearch.compute.lucene.ShardContext, Releasable { private final AbstractRefCounted refCounted = new AbstractRefCounted() { @@ -394,13 +396,18 @@ static Set nullsFilteredFieldsAfterSourceQuery(QueryBuilder sourceQuery) /** * Build a {@link SourceOperator.SourceOperatorFactory} that counts documents in the search index. */ - public LuceneCountOperator.Factory countSource(LocalExecutionPlannerContext context, QueryBuilder queryBuilder, Expression limit) { + public LuceneCountOperator.Factory countSource( + LocalExecutionPlannerContext context, + Function> queryFunction, + List tagTypes, + Expression limit + ) { return new LuceneCountOperator.Factory( shardContexts, - querySupplier(queryBuilder), + queryFunction, context.queryPragmas().dataPartitioning(plannerSettings.defaultDataPartitioning()), context.queryPragmas().taskConcurrency(), - List.of(), + tagTypes, limit == null ? NO_LIMIT : (Integer) limit.fold(context.foldCtx()) ); } @@ -427,7 +434,7 @@ public static class DefaultShardContext extends ShardContext { private final int index; /** - * In production, this will be a {@link org.elasticsearch.search.internal.SearchContext}, but we don't want to drag that huge + * In production, this will be a {@link SearchContext}, but we don't want to drag that huge * dependency here. */ private final Releasable releasable; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index 2af166b2c1a11..39dd5ea59b094 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -380,15 +380,15 @@ private PhysicalOperation planEsStats(EsStatsQueryExec statsQuery, LocalExecutio if (physicalOperationProviders instanceof EsPhysicalOperationProviders == false) { throw new EsqlIllegalArgumentException("EsStatsQuery should only occur against a Lucene backend"); } - if (statsQuery.stats().size() > 1) { - throw new EsqlIllegalArgumentException("EsStatsQuery currently supports only one field statistic"); - } - // for now only one stat is supported - EsStatsQueryExec.Stat stat = statsQuery.stats().get(0); + EsStatsQueryExec.Stat stat = statsQuery.stat(); EsPhysicalOperationProviders esProvider = (EsPhysicalOperationProviders) physicalOperationProviders; - final LuceneOperator.Factory luceneFactory = esProvider.countSource(context, stat.filter(statsQuery.query()), statsQuery.limit()); + var queryFunction = switch (stat) { + case EsStatsQueryExec.BasicStat basic -> esProvider.querySupplier(basic.filter(statsQuery.query())); + case EsStatsQueryExec.ByStat byStat -> esProvider.querySupplier(byStat.queryBuilderAndTags()); + }; + final LuceneOperator.Factory luceneFactory = esProvider.countSource(context, queryFunction, stat.tagTypes(), statsQuery.limit()); Layout.Builder layout = new Layout.Builder(); layout.append(statsQuery.outputSet()); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java index 053cf8a4fc595..483d30671b337 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java @@ -13,12 +13,15 @@ import org.elasticsearch.compute.aggregation.AggregatorMode; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.query.CoordinatorRewriteContext; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.SearchExecutionContext; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; import org.elasticsearch.xpack.esql.capabilities.TranslationAware; import org.elasticsearch.xpack.esql.core.expression.AttributeSet; @@ -69,11 +72,13 @@ import static org.elasticsearch.index.mapper.MappedFieldType.FieldExtractPreference.DOC_VALUES; import static org.elasticsearch.index.mapper.MappedFieldType.FieldExtractPreference.EXTRACT_SPATIAL_BOUNDS; import static org.elasticsearch.index.mapper.MappedFieldType.FieldExtractPreference.NONE; +import static org.elasticsearch.index.query.QueryBuilders.boolQuery; import static org.elasticsearch.xpack.esql.capabilities.TranslationAware.translatable; import static org.elasticsearch.xpack.esql.core.util.Queries.Clause.FILTER; import static org.elasticsearch.xpack.esql.planner.TranslatorHandler.TRANSLATOR_HANDLER; public class PlannerUtils { + private static final Logger LOGGER = LogManager.getLogger(PlannerUtils.class); /** * When the plan contains children like {@code MergeExec} resulted from the planning of commands such as FORK, @@ -196,6 +201,18 @@ public static PhysicalPlan localPlan( return localPlan(plan, logicalOptimizer, physicalOptimizer); } + public static PhysicalPlan integrateEsFilterIntoFragment(PhysicalPlan plan, @Nullable QueryBuilder esFilter) { + return esFilter == null ? plan : plan.transformUp(FragmentExec.class, f -> { + var fragmentFilter = f.esFilter(); + // TODO: have an ESFilter and push down to EsQueryExec / EsSource + // This is an ugly hack to push the filter parameter to Lucene + // TODO: filter integration testing + var filter = fragmentFilter != null ? boolQuery().filter(fragmentFilter).must(esFilter) : esFilter; + LOGGER.debug("Fold filter {} to EsQueryExec", filter); + return f.withFilter(filter); + }); + } + public static PhysicalPlan localPlan( PhysicalPlan plan, LocalLogicalPlanOptimizer logicalOptimizer, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 1d6aa2b53cc9e..38df20aa07277 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -78,7 +78,6 @@ import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation; import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier; import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize; -import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.planner.PlannerUtils; @@ -100,7 +99,6 @@ import static java.util.stream.Collectors.joining; import static java.util.stream.Collectors.toSet; -import static org.elasticsearch.index.query.QueryBuilders.boolQuery; import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY; import static org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin.firstSubPlan; import static org.elasticsearch.xpack.esql.session.SessionUtils.checkPagesBelowSize; @@ -929,19 +927,7 @@ private PhysicalPlan logicalPlanToPhysicalPlan( PhysicalPlanOptimizer physicalPlanOptimizer ) { PhysicalPlan physicalPlan = optimizedPhysicalPlan(optimizedPlan, physicalPlanOptimizer); - physicalPlan = physicalPlan.transformUp(FragmentExec.class, f -> { - QueryBuilder filter = request.filter(); - if (filter != null) { - var fragmentFilter = f.esFilter(); - // TODO: have an ESFilter and push down to EsQueryExec / EsSource - // This is an ugly hack to push the filter parameter to Lucene - // TODO: filter integration testing - filter = fragmentFilter != null ? boolQuery().filter(fragmentFilter).must(filter) : filter; - LOGGER.debug("Fold filter {} to EsQueryExec", filter); - f = f.withFilter(filter); - } - return f; - }); + physicalPlan = PlannerUtils.integrateEsFilterIntoFragment(physicalPlan, request.filter()); return EstimatesRowSize.estimateRowSize(0, physicalPlan); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java index 29a2101a05eb8..9e7b372edbf8a 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java @@ -187,7 +187,7 @@ public void testCountAllWithEval() { from test | eval s = salary | rename s as sr | eval hidden_s = sr | rename emp_no as e | where e < 10050 | stats c = count(*) """); - var stat = queryStatsFor(plan); + var stat = (EsStatsQueryExec.BasicStat) queryStatsFor(plan); assertThat(stat.type(), is(StatsType.COUNT)); assertThat(stat.query(), is(nullValue())); } @@ -203,7 +203,7 @@ public void testCountAllWithEval() { */ public void testCountAllWithFilter() { var plan = plannerOptimizer.plan("from test | where emp_no > 10040 | stats c = count(*)"); - var stat = queryStatsFor(plan); + var stat = (EsStatsQueryExec.BasicStat) queryStatsFor(plan); assertThat(stat.type(), is(StatsType.COUNT)); assertThat(stat.query(), is(nullValue())); } @@ -223,7 +223,7 @@ public void testCountAllWithFilter() { */ public void testCountFieldWithFilter() { var plan = plannerOptimizer.plan("from test | where emp_no > 10040 | stats c = count(emp_no)", IS_SV_STATS); - var stat = queryStatsFor(plan); + var stat = (EsStatsQueryExec.BasicStat) queryStatsFor(plan); assertThat(stat.type(), is(StatsType.COUNT)); assertThat(stat.query(), is(existsQuery("emp_no"))); } @@ -252,7 +252,7 @@ public void testCountFieldWithEval() { assertThat(esStatsQuery.limit(), is(nullValue())); assertThat(Expressions.names(esStatsQuery.output()), contains("$$c$count", "$$c$seen")); - var stat = as(esStatsQuery.stats().get(0), Stat.class); + var stat = as(esStatsQuery.stat(), EsStatsQueryExec.BasicStat.class); assertThat(stat.query(), is(existsQuery("salary"))); } @@ -273,7 +273,7 @@ public void testCountOneFieldWithFilter() { var esStatsQuery = as(exchange.child(), EsStatsQueryExec.class); assertThat(esStatsQuery.limit(), is(nullValue())); assertThat(Expressions.names(esStatsQuery.output()), contains("$$c$count", "$$c$seen")); - var stat = as(esStatsQuery.stats().get(0), Stat.class); + var stat = as(esStatsQuery.stat(), EsStatsQueryExec.BasicStat.class); Source source = new Source(2, 8, "salary > 1000"); var exists = existsQuery("salary"); assertThat(stat.query(), is(exists)); @@ -328,14 +328,14 @@ public void testCountPushdownForSvAndMvFields() throws IOException { }); String expectedStats = """ - [Stat[name=salary, type=COUNT, query={ + BasicStat[name=salary, type=COUNT, query={ "exists" : { "field" : "salary", "boost" : 1.0 } - }]]"""; + }]"""; assertNotNull(leaf.get()); - assertThat(leaf.get().stats().toString(), equalTo(expectedStats)); + assertThat(leaf.get().stat().toString(), equalTo(expectedStats)); } } @@ -653,7 +653,7 @@ public void testIsNotNull_TextField_Pushdown_WithCount() { var esStatsQuery = as(exg.child(), EsStatsQueryExec.class); assertThat(esStatsQuery.limit(), is(nullValue())); assertThat(Expressions.names(esStatsQuery.output()), contains("$$c$count", "$$c$seen")); - var stat = as(esStatsQuery.stats().get(0), Stat.class); + var stat = as(esStatsQuery.stat(), EsStatsQueryExec.BasicStat.class); assertThat(stat.query(), is(existsQuery("job"))); } @@ -2475,14 +2475,7 @@ private Stat queryStatsFor(PhysicalPlan plan) { var agg = as(limit.child(), AggregateExec.class); var exg = as(agg.child(), ExchangeExec.class); var statSource = as(exg.child(), EsStatsQueryExec.class); - var stats = statSource.stats(); - assertThat(stats, hasSize(1)); - var stat = stats.get(0); - return stat; - } - - private static KqlQueryBuilder kqlQueryBuilder(String query) { - return new KqlQueryBuilder(query); + return statSource.stat(); } /** diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java index 713ae3eb2f063..9b4d9c5455bca 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.esql.optimizer; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.xpack.esql.EsqlTestUtils; import org.elasticsearch.xpack.esql.analysis.Analyzer; import org.elasticsearch.xpack.esql.core.expression.FoldContext; @@ -59,8 +61,12 @@ public PhysicalPlan plan(String query, SearchStats stats) { } public PhysicalPlan plan(String query, SearchStats stats, Analyzer analyzer) { - var physical = optimizedPlan(physicalPlan(query, analyzer), stats); - return physical; + return plan(query, stats, analyzer, null); + } + + public PhysicalPlan plan(String query, SearchStats stats, Analyzer analyzer, @Nullable QueryBuilder esFilter) { + PhysicalPlan plan = PlannerUtils.integrateEsFilterIntoFragment(physicalPlan(query, analyzer), esFilter); + return optimizedPlan(plan, stats); } public PhysicalPlan plan(String query, SearchStats stats, EsqlFlags esqlFlags) { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceRoundToWithQueryAndTagsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/SubstituteRoundToTests.java similarity index 65% rename from x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceRoundToWithQueryAndTagsTests.java rename to x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/SubstituteRoundToTests.java index 2158b39527c4a..a3be8fe82e078 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceRoundToWithQueryAndTagsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/SubstituteRoundToTests.java @@ -9,6 +9,8 @@ import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.core.Nullable; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.MatchQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; @@ -19,6 +21,7 @@ import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.Expressions; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; +import org.elasticsearch.xpack.esql.core.expression.Literal; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute; import org.elasticsearch.xpack.esql.core.expression.function.Function; @@ -27,14 +30,17 @@ import org.elasticsearch.xpack.esql.expression.function.grouping.Bucket; import org.elasticsearch.xpack.esql.expression.function.scalar.date.DateTrunc; import org.elasticsearch.xpack.esql.expression.function.scalar.math.RoundTo; +import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.GreaterThan; import org.elasticsearch.xpack.esql.optimizer.AbstractLocalPhysicalPlanOptimizerTests; import org.elasticsearch.xpack.esql.optimizer.TestPlannerOptimizer; import org.elasticsearch.xpack.esql.plan.logical.EsRelation; import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; +import org.elasticsearch.xpack.esql.plan.physical.EsStatsQueryExec; import org.elasticsearch.xpack.esql.plan.physical.EvalExec; import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec; import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec; +import org.elasticsearch.xpack.esql.plan.physical.FilterExec; import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; import org.elasticsearch.xpack.esql.plan.physical.LimitExec; import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec; @@ -68,13 +74,15 @@ import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.DEFAULT_DATE_TIME_FORMATTER; import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.dateNanosToLong; import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.dateTimeToLong; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; -//@TestLogging(value = "org.elasticsearch.xpack.esql:TRACE", reason = "debug") -public class ReplaceRoundToWithQueryAndTagsTests extends AbstractLocalPhysicalPlanOptimizerTests { - - public ReplaceRoundToWithQueryAndTagsTests(String name, Configuration config) { +public class SubstituteRoundToTests extends AbstractLocalPhysicalPlanOptimizerTests { + public SubstituteRoundToTests(String name, Configuration config) { super(name, config); } @@ -132,31 +140,111 @@ public void testDateTruncBucketTransformToQueryAndTags() { from test | stats count(*) by x = {} """, dateHistogram); - PhysicalPlan plan = plannerOptimizer.plan(query, searchStats, makeAnalyzer("mapping-all-types.json")); - LimitExec limit = as(plan, LimitExec.class); - AggregateExec agg = as(limit.child(), AggregateExec.class); - assertThat(agg.getMode(), is(FINAL)); - List groupings = agg.groupings(); - NamedExpression grouping = as(groupings.get(0), NamedExpression.class); - assertEquals("x", grouping.name()); - assertEquals(DataType.DATETIME, grouping.dataType()); - assertEquals(List.of("count(*)", "x"), Expressions.names(agg.aggregates())); - ExchangeExec exchange = as(agg.child(), ExchangeExec.class); - assertThat(exchange.inBetweenAggs(), is(true)); - agg = as(exchange.child(), AggregateExec.class); - EvalExec eval = as(agg.child(), EvalExec.class); - List aliases = eval.fields(); + ExchangeExec exchange = validatePlanBeforeExchange(query, DataType.DATETIME); + + var queryBuilderAndTags = getBuilderAndTagsFromStats(exchange, DataType.DATETIME); + + List expectedQueryBuilderAndTags = expectedQueryBuilderAndTags( + query, + "date", + List.of(), + new Source(2, 24, dateHistogram), + null + ); + verifyQueryAndTags(expectedQueryBuilderAndTags, queryBuilderAndTags); + } + } + + // Pushing count to source isn't supported when there is a filter on the count at the moment. + public void testDateTruncBucketTransformToQueryAndTagsWithWhereInsideAggregation() { + for (String dateHistogram : dateHistograms) { + String query = LoggerMessageFormat.format(null, """ + from test + | stats count(*) where long > 10 by x = {} + """, dateHistogram); + + ExchangeExec exchange = validatePlanBeforeExchange(query, DataType.DATETIME, List.of("count(*) where long > 10")); + + AggregateExec agg = as(exchange.child(), AggregateExec.class); + FieldExtractExec fieldExtractExec = as(agg.child(), FieldExtractExec.class); + EvalExec evalExec = as(fieldExtractExec.child(), EvalExec.class); + List aliases = evalExec.fields(); + assertEquals(1, aliases.size()); + FieldAttribute roundToTag = as(aliases.get(0).child(), FieldAttribute.class); + assertEquals("$$date$round_to$datetime", roundToTag.name()); + EsQueryExec esQueryExec = as(evalExec.child(), EsQueryExec.class); + + List queryBuilderAndTags = esQueryExec.queryBuilderAndTags(); + List expectedQueryBuilderAndTags = expectedQueryBuilderAndTags( + query, + "date", + List.of(), + new Source(2, 40, dateHistogram), + null + ); + verifyQueryAndTags(expectedQueryBuilderAndTags, queryBuilderAndTags); + assertThrows(UnsupportedOperationException.class, esQueryExec::query); + } + } + + // We do not support count pushdown when there is an ES filter at the moment, even if it's on the same field. + public void testDateTruncBucketTransformToQueryAndTagsWithEsFilter() { + for (String dateHistogram : dateHistograms) { + String query = LoggerMessageFormat.format(null, """ + from test + | stats count(*) by x = {} + """, dateHistogram); + + RangeQueryBuilder esFilter = rangeQuery("date").from("2023-10-21T00:00:00.000Z").to("2023-10-22T00:00:00.000Z"); + ExchangeExec exchange = validatePlanBeforeExchange(query, DataType.DATETIME, List.of("count(*)"), esFilter); + + AggregateExec agg = as(exchange.child(), AggregateExec.class); + EvalExec evalExec = as(agg.child(), EvalExec.class); + List aliases = evalExec.fields(); assertEquals(1, aliases.size()); FieldAttribute roundToTag = as(aliases.get(0).child(), FieldAttribute.class); assertEquals("$$date$round_to$datetime", roundToTag.name()); - EsQueryExec esQueryExec = as(eval.child(), EsQueryExec.class); + EsQueryExec esQueryExec = as(evalExec.child(), EsQueryExec.class); + List queryBuilderAndTags = esQueryExec.queryBuilderAndTags(); List expectedQueryBuilderAndTags = expectedQueryBuilderAndTags( query, "date", List.of(), new Source(2, 24, dateHistogram), + esFilter + ); + verifyQueryAndTags(expectedQueryBuilderAndTags, queryBuilderAndTags); + assertThrows(UnsupportedOperationException.class, esQueryExec::query); + } + } + + // Pushing count to source isn't supported when there are multiple aggregates. + public void testDateTruncBucketTransformToQueryAndTagsWithMultipleAggregates() { + for (String dateHistogram : dateHistograms) { + String query = LoggerMessageFormat.format(null, """ + from test + | stats sum(long), count(*) by x = {} + """, dateHistogram); + + ExchangeExec exchange = validatePlanBeforeExchange(query, DataType.DATETIME, List.of("sum(long)", "count(*)")); + + AggregateExec agg = as(exchange.child(), AggregateExec.class); + FieldExtractExec fieldExtractExec = as(agg.child(), FieldExtractExec.class); + EvalExec evalExec = as(fieldExtractExec.child(), EvalExec.class); + List aliases = evalExec.fields(); + assertEquals(1, aliases.size()); + FieldAttribute roundToTag = as(aliases.get(0).child(), FieldAttribute.class); + assertEquals("$$date$round_to$datetime", roundToTag.name()); + EsQueryExec esQueryExec = as(evalExec.child(), EsQueryExec.class); + + List queryBuilderAndTags = esQueryExec.queryBuilderAndTags(); + List expectedQueryBuilderAndTags = expectedQueryBuilderAndTags( + query, + "date", + List.of(), + new Source(2, 35, dateHistogram), null ); verifyQueryAndTags(expectedQueryBuilderAndTags, queryBuilderAndTags); @@ -167,7 +255,7 @@ public void testDateTruncBucketTransformToQueryAndTags() { // DateTrunc is transformed to RoundTo first but cannot be transformed to QueryAndTags, when the TopN is pushed down to EsQueryExec public void testDateTruncNotTransformToQueryAndTags() { for (String dateHistogram : dateHistograms) { - if (dateHistogram.contains("bucket")) { // bucket cannot be used out side of stats + if (dateHistogram.contains("bucket")) { // bucket cannot be used outside of stats continue; } String query = LoggerMessageFormat.format(null, """ @@ -188,13 +276,13 @@ public void testDateTruncNotTransformToQueryAndTags() { EvalExec evalExec = as(fieldExtractExec.child(), EvalExec.class); List aliases = evalExec.fields(); assertEquals(1, aliases.size()); - RoundTo roundTo = as(aliases.get(0).child(), RoundTo.class); + RoundTo roundTo = as(aliases.getFirst().child(), RoundTo.class); assertEquals(4, roundTo.points().size()); fieldExtractExec = as(evalExec.child(), FieldExtractExec.class); EsQueryExec esQueryExec = as(fieldExtractExec.child(), EsQueryExec.class); List queryBuilderAndTags = esQueryExec.queryBuilderAndTags(); assertEquals(1, queryBuilderAndTags.size()); - EsQueryExec.QueryBuilderAndTags queryBuilder = queryBuilderAndTags.get(0); + EsQueryExec.QueryBuilderAndTags queryBuilder = queryBuilderAndTags.getFirst(); assertNull(queryBuilder.query()); assertTrue(queryBuilder.tags().isEmpty()); assertNull(esQueryExec.query()); @@ -215,25 +303,12 @@ public void testRoundToTransformToQueryAndTags() { from test | stats count(*) by x = {} """, expression); - PhysicalPlan plan = plannerOptimizer.plan(query, searchStats, makeAnalyzer("mapping-all-types.json")); + DataType bucketType = DataType.fromTypeName(roundTo.getKey()).widenSmallNumeric(); + + ExchangeExec exchange = validatePlanBeforeExchange(query, bucketType); + + var queryBuilderAndTags = getBuilderAndTagsFromStats(exchange, bucketType); - LimitExec limit = as(plan, LimitExec.class); - AggregateExec agg = as(limit.child(), AggregateExec.class); - assertThat(agg.getMode(), is(FINAL)); - List groupings = agg.groupings(); - NamedExpression grouping = as(groupings.get(0), NamedExpression.class); - assertEquals("x", grouping.name()); - assertEquals(List.of("count(*)", "x"), Expressions.names(agg.aggregates())); - ExchangeExec exchange = as(agg.child(), ExchangeExec.class); - assertThat(exchange.inBetweenAggs(), is(true)); - agg = as(exchange.child(), AggregateExec.class); - EvalExec eval = as(agg.child(), EvalExec.class); - List aliases = eval.fields(); - assertEquals(1, aliases.size()); - FieldAttribute roundToTag = as(aliases.get(0).child(), FieldAttribute.class); - assertTrue(roundToTag.name().startsWith("$$" + fieldName + "$round_to$")); - EsQueryExec esQueryExec = as(eval.child(), EsQueryExec.class); - List queryBuilderAndTags = esQueryExec.queryBuilderAndTags(); List expectedQueryBuilderAndTags = expectedQueryBuilderAndTags( query, fieldName, @@ -242,7 +317,6 @@ public void testRoundToTransformToQueryAndTags() { null ); verifyQueryAndTags(expectedQueryBuilderAndTags, queryBuilderAndTags); - assertThrows(UnsupportedOperationException.class, esQueryExec::query); } } @@ -266,25 +340,16 @@ public void testDateTruncBucketTransformToQueryAndTagsWithOtherPushdownFunctions new Source(2, 8, predicate.contains("and") ? predicate.substring(0, 20) : predicate) ); - PhysicalPlan plan = plannerOptimizer.plan(query, searchStats, makeAnalyzer("mapping-all-types.json")); - - LimitExec limit = as(plan, LimitExec.class); - AggregateExec agg = as(limit.child(), AggregateExec.class); - assertThat(agg.getMode(), is(FINAL)); - List groupings = agg.groupings(); - NamedExpression grouping = as(groupings.get(0), NamedExpression.class); - assertEquals("x", grouping.name()); - assertEquals(DataType.DATETIME, grouping.dataType()); - assertEquals(List.of("count(*)", "x"), Expressions.names(agg.aggregates())); - ExchangeExec exchange = as(agg.child(), ExchangeExec.class); - assertThat(exchange.inBetweenAggs(), is(true)); - agg = as(exchange.child(), AggregateExec.class); + ExchangeExec exchange = validatePlanBeforeExchange(query, DataType.DATETIME); + + AggregateExec agg = as(exchange.child(), AggregateExec.class); EvalExec eval = as(agg.child(), EvalExec.class); List aliases = eval.fields(); assertEquals(1, aliases.size()); FieldAttribute roundToTag = as(aliases.get(0).child(), FieldAttribute.class); assertEquals("$$date$round_to$datetime", roundToTag.name()); EsQueryExec esQueryExec = as(eval.child(), EsQueryExec.class); + List queryBuilderAndTags = esQueryExec.queryBuilderAndTags(); List expectedQueryBuilderAndTags = expectedQueryBuilderAndTags( query, @@ -325,39 +390,29 @@ public void testDateTruncBucketNotTransformToQueryAndTagsWithLookupJoin() { | lookup join languages_lookup on language_code | stats count(*) by x = {} """, dateHistogram); - PhysicalPlan plan = plannerOptimizer.plan(query, searchStats, makeAnalyzer("mapping-all-types.json")); + ExchangeExec exchange = validatePlanBeforeExchange(query, DataType.DATETIME); - LimitExec limit = as(plan, LimitExec.class); - AggregateExec agg = as(limit.child(), AggregateExec.class); - assertThat(agg.getMode(), is(FINAL)); - List groupings = agg.groupings(); - NamedExpression grouping = as(groupings.get(0), NamedExpression.class); - assertEquals("x", grouping.name()); - assertEquals(DataType.DATETIME, grouping.dataType()); - assertEquals(List.of("count(*)", "x"), Expressions.names(agg.aggregates())); - ExchangeExec exchange = as(agg.child(), ExchangeExec.class); - assertThat(exchange.inBetweenAggs(), is(true)); - agg = as(exchange.child(), AggregateExec.class); + AggregateExec agg = as(exchange.child(), AggregateExec.class); EvalExec eval = as(agg.child(), EvalExec.class); List aliases = eval.fields(); assertEquals(1, aliases.size()); - RoundTo roundTo = as(aliases.get(0).child(), RoundTo.class); + RoundTo roundTo = as(aliases.getFirst().child(), RoundTo.class); assertEquals(4, roundTo.points().size()); FieldExtractExec fieldExtractExec = as(eval.child(), FieldExtractExec.class); List attributes = fieldExtractExec.attributesToExtract(); assertEquals(1, attributes.size()); - assertEquals("date", attributes.get(0).name()); + assertEquals("date", attributes.getFirst().name()); LookupJoinExec lookupJoinExec = as(fieldExtractExec.child(), LookupJoinExec.class); // this is why the rule doesn't apply // lhs of lookup join fieldExtractExec = as(lookupJoinExec.left(), FieldExtractExec.class); attributes = fieldExtractExec.attributesToExtract(); assertEquals(1, attributes.size()); - assertEquals("integer", attributes.get(0).name()); + assertEquals("integer", attributes.getFirst().name()); EsQueryExec esQueryExec = as(fieldExtractExec.child(), EsQueryExec.class); assertEquals("test", esQueryExec.indexPattern()); List queryBuilderAndTags = esQueryExec.queryBuilderAndTags(); assertEquals(1, queryBuilderAndTags.size()); - EsQueryExec.QueryBuilderAndTags queryBuilder = queryBuilderAndTags.get(0); + EsQueryExec.QueryBuilderAndTags queryBuilder = queryBuilderAndTags.getFirst(); assertNull(queryBuilder.query()); assertTrue(queryBuilder.tags().isEmpty()); assertNull(esQueryExec.query()); @@ -383,14 +438,14 @@ public void testDateTruncBucketNotTransformToQueryAndTagsWithFork() { AggregateExec agg = as(limit.child(), AggregateExec.class); assertThat(agg.getMode(), is(SINGLE)); List groupings = agg.groupings(); - NamedExpression grouping = as(groupings.get(0), NamedExpression.class); + NamedExpression grouping = as(groupings.getFirst(), NamedExpression.class); assertEquals("x", grouping.name()); assertEquals(DataType.DATETIME, grouping.dataType()); assertEquals(List.of("count(*)", "x"), Expressions.names(agg.aggregates())); EvalExec eval = as(agg.child(), EvalExec.class); List aliases = eval.fields(); assertEquals(1, aliases.size()); - var function = as(aliases.get(0).child(), Function.class); + var function = as(aliases.getFirst().child(), Function.class); ReferenceAttribute fa = null; // if merge returns FieldAttribute instead of ReferenceAttribute, the rule might apply if (function instanceof DateTrunc dateTrunc) { fa = as(dateTrunc.field(), ReferenceAttribute.class); @@ -402,7 +457,7 @@ public void testDateTruncBucketNotTransformToQueryAndTagsWithFork() { assertNotNull(fa); assertEquals("date", fa.name()); assertEquals(DataType.DATETIME, fa.dataType()); - MergeExec mergeExec = as(eval.child(), MergeExec.class); + as(eval.child(), MergeExec.class); } } @@ -424,37 +479,23 @@ public void testRoundToTransformToQueryAndTagsWithDefaultUpperLimit() { | stats count(*) by x = round_to(integer, {}) """, points.toString()); - PhysicalPlan plan = plannerOptimizer.plan(query, searchStats, makeAnalyzer("mapping-all-types.json")); + ExchangeExec exchange = validatePlanBeforeExchange(query, DataType.INTEGER); - LimitExec limit = as(plan, LimitExec.class); - AggregateExec agg = as(limit.child(), AggregateExec.class); - assertThat(agg.getMode(), is(FINAL)); - List groupings = agg.groupings(); - NamedExpression grouping = as(groupings.get(0), NamedExpression.class); - assertEquals("x", grouping.name()); - assertEquals(DataType.INTEGER, grouping.dataType()); - assertEquals(List.of("count(*)", "x"), Expressions.names(agg.aggregates())); - ExchangeExec exchange = as(agg.child(), ExchangeExec.class); - assertThat(exchange.inBetweenAggs(), is(true)); - agg = as(exchange.child(), AggregateExec.class); - EvalExec evalExec = as(agg.child(), EvalExec.class); - List aliases = evalExec.fields(); - assertEquals(1, aliases.size()); if (numOfPoints == 127) { - FieldAttribute roundToTag = as(aliases.get(0).child(), FieldAttribute.class); - assertTrue(roundToTag.name().startsWith("$$integer$round_to$")); - EsQueryExec esQueryExec = as(evalExec.child(), EsQueryExec.class); - List queryBuilderAndTags = esQueryExec.queryBuilderAndTags(); - assertEquals(128, queryBuilderAndTags.size()); // 127 + nullBucket - assertThrows(UnsupportedOperationException.class, esQueryExec::query); + var queryBuilderAndTags = getBuilderAndTagsFromStats(exchange, DataType.INTEGER); + assertThat(queryBuilderAndTags, hasSize(128)); // 127 + nullBucket } else { // numOfPoints == 128, query rewrite does not happen - RoundTo roundTo = as(aliases.get(0).child(), RoundTo.class); + AggregateExec agg = as(exchange.child(), AggregateExec.class); + EvalExec evalExec = as(agg.child(), EvalExec.class); + List aliases = evalExec.fields(); + assertEquals(1, aliases.size()); + RoundTo roundTo = as(aliases.getFirst().child(), RoundTo.class); assertEquals(128, roundTo.points().size()); FieldExtractExec fieldExtractExec = as(evalExec.child(), FieldExtractExec.class); EsQueryExec esQueryExec = as(fieldExtractExec.child(), EsQueryExec.class); List queryBuilderAndTags = esQueryExec.queryBuilderAndTags(); assertEquals(1, queryBuilderAndTags.size()); - EsQueryExec.QueryBuilderAndTags queryBuilder = queryBuilderAndTags.get(0); + EsQueryExec.QueryBuilderAndTags queryBuilder = queryBuilderAndTags.getFirst(); assertNull(queryBuilder.query()); assertTrue(queryBuilder.tags().isEmpty()); assertNull(esQueryExec.query()); @@ -462,6 +503,22 @@ public void testRoundToTransformToQueryAndTagsWithDefaultUpperLimit() { } } + private static List getBuilderAndTagsFromStats(ExchangeExec exchange, DataType aggregateType) { + FilterExec filter = as(exchange.child(), FilterExec.class); + GreaterThan condition = as(filter.condition(), GreaterThan.class); + Literal literal = as(condition.right(), Literal.class); + + assertThat(literal.value(), is(0L)); + EsStatsQueryExec statsQueryExec = as(filter.child(), EsStatsQueryExec.class); + assertThat( + statsQueryExec.output().stream().map(Attribute::dataType).toList(), + equalTo(List.of(aggregateType, DataType.LONG, DataType.BOOLEAN)) + ); + var left = as(condition.left(), ReferenceAttribute.class); + assertThat(left.id(), is(statsQueryExec.output().get(1).id())); + return as(statsQueryExec.stat(), EsStatsQueryExec.ByStat.class).queryBuilderAndTags(); + } + /** * Query level threshold(if greater than -1) set in QueryPragmas overrides the cluster level threshold set in EsqlFlags. */ @@ -494,43 +551,33 @@ public void testRoundToTransformToQueryAndTagsWithCustomizedUpperLimit() { EsqlFlags esqlFlags = new EsqlFlags(clusterLevelThreshold); assertEquals(clusterLevelThreshold, esqlFlags.roundToPushdownThreshold()); assertTrue(esqlFlags.stringLikeOnIndex()); - PhysicalPlan plan = plannerOptimizerWithPragmas.plan(query, searchStats, esqlFlags); - boolean pushdown = false; + boolean pushdown; if (queryLevelThreshold > -1) { pushdown = queryLevelThreshold >= 127; } else { pushdown = clusterLevelThreshold >= 127; } - LimitExec limit = as(plan, LimitExec.class); - AggregateExec agg = as(limit.child(), AggregateExec.class); - assertThat(agg.getMode(), is(FINAL)); - List groupings = agg.groupings(); - NamedExpression grouping = as(groupings.get(0), NamedExpression.class); - assertEquals("x", grouping.name()); - assertEquals(DataType.INTEGER, grouping.dataType()); - assertEquals(List.of("count(*)", "x"), Expressions.names(agg.aggregates())); - ExchangeExec exchange = as(agg.child(), ExchangeExec.class); - assertThat(exchange.inBetweenAggs(), is(true)); - agg = as(exchange.child(), AggregateExec.class); - EvalExec evalExec = as(agg.child(), EvalExec.class); - List aliases = evalExec.fields(); - assertEquals(1, aliases.size()); + ExchangeExec exchange = validatePlanBeforeExchange( + plannerOptimizerWithPragmas.plan(query, searchStats, esqlFlags), + DataType.INTEGER, + List.of("count(*)") + ); if (pushdown) { - FieldAttribute roundToTag = as(aliases.get(0).child(), FieldAttribute.class); - assertTrue(roundToTag.name().startsWith("$$integer$round_to$")); - EsQueryExec esQueryExec = as(evalExec.child(), EsQueryExec.class); - List queryBuilderAndTags = esQueryExec.queryBuilderAndTags(); - assertEquals(128, queryBuilderAndTags.size()); // 127 + nullBucket - assertThrows(UnsupportedOperationException.class, esQueryExec::query); + var queryBuilderAndTags = getQueryBuilderAndTags(exchange); + assertThat(queryBuilderAndTags, hasSize(128)); // 127 + nullBucket } else { // query rewrite does not happen - RoundTo roundTo = as(aliases.get(0).child(), RoundTo.class); + AggregateExec agg = as(exchange.child(), AggregateExec.class); + EvalExec evalExec = as(agg.child(), EvalExec.class); + List aliases = evalExec.fields(); + assertEquals(1, aliases.size()); + RoundTo roundTo = as(aliases.getFirst().child(), RoundTo.class); assertEquals(127, roundTo.points().size()); FieldExtractExec fieldExtractExec = as(evalExec.child(), FieldExtractExec.class); EsQueryExec esQueryExec = as(fieldExtractExec.child(), EsQueryExec.class); List queryBuilderAndTags = esQueryExec.queryBuilderAndTags(); assertEquals(1, queryBuilderAndTags.size()); - EsQueryExec.QueryBuilderAndTags queryBuilder = queryBuilderAndTags.get(0); + EsQueryExec.QueryBuilderAndTags queryBuilder = queryBuilderAndTags.getFirst(); assertNull(queryBuilder.query()); assertTrue(queryBuilder.tags().isEmpty()); assertNull(esQueryExec.query()); @@ -539,15 +586,61 @@ public void testRoundToTransformToQueryAndTagsWithCustomizedUpperLimit() { } } - static String pointArray(int numPoints) { + private static List getQueryBuilderAndTags(ExchangeExec exchange) { + return getBuilderAndTagsFromStats(exchange, DataType.INTEGER); + } + + private ExchangeExec validatePlanBeforeExchange(String query, DataType aggregateType) { + return validatePlanBeforeExchange(query, aggregateType, List.of("count(*)")); + } + + private ExchangeExec validatePlanBeforeExchange(String query, DataType aggregateType, List aggregation) { + return validatePlanBeforeExchange(query, aggregateType, aggregation, null); + } + + private ExchangeExec validatePlanBeforeExchange( + String query, + DataType aggregateType, + List aggregation, + @Nullable QueryBuilder esFilter + ) { + return validatePlanBeforeExchange( + plannerOptimizer.plan(query, searchStats, makeAnalyzer("mapping-all-types.json"), esFilter), + aggregateType, + aggregation + ); + } + + private static ExchangeExec validatePlanBeforeExchange(PhysicalPlan plan, DataType aggregateType, List aggregation) { + LimitExec limit = as(plan, LimitExec.class); + + AggregateExec agg = as(limit.child(), AggregateExec.class); + assertThat(agg.getMode(), is(FINAL)); + List groupings = agg.groupings(); + NamedExpression grouping = as(groupings.getFirst(), NamedExpression.class); + assertEquals("x", grouping.name()); + assertEquals(aggregateType, grouping.dataType()); + assertEquals(CollectionUtils.appendToCopy(aggregation, "x"), Expressions.names(agg.aggregates())); + + ExchangeExec exchange = as(agg.child(), ExchangeExec.class); + assertThat(exchange.inBetweenAggs(), is(true)); + return exchange; + } + + private static String pointArray(int numPoints) { return IntStream.range(0, numPoints).mapToObj(Integer::toString).collect(Collectors.joining(",")); } - static int queryAndTags(PhysicalPlan plan) { + private static int plainQueryAndTags(PhysicalPlan plan) { EsQueryExec esQuery = (EsQueryExec) plan.collectFirstChildren(EsQueryExec.class::isInstance).getFirst(); return esQuery.queryBuilderAndTags().size(); } + private static int statsQueryAndTags(PhysicalPlan plan) { + EsStatsQueryExec esQuery = (EsStatsQueryExec) plan.collectFirstChildren(EsStatsQueryExec.class::isInstance).getFirst(); + return ((EsStatsQueryExec.ByStat) esQuery.stat()).queryBuilderAndTags().size(); + } + public void testAdjustThresholdForQueries() { { int points = between(2, 127); @@ -556,7 +649,7 @@ public void testAdjustThresholdForQueries() { | stats count(*) by x = round_to(integer, %s) """, pointArray(points)); PhysicalPlan plan = plannerOptimizer.plan(q, searchStats, makeAnalyzer("mapping-all-types.json")); - int queryAndTags = queryAndTags(plan); + int queryAndTags = statsQueryAndTags(plan); assertThat(queryAndTags, equalTo(points + 1)); // include null bucket } { @@ -567,7 +660,7 @@ public void testAdjustThresholdForQueries() { | stats count(*) by x = round_to(integer, %s) """, pointArray(points)); var plan = plannerOptimizer.plan(q, searchStats, makeAnalyzer("mapping-all-types.json")); - int queryAndTags = queryAndTags(plan); + int queryAndTags = plainQueryAndTags(plan); assertThat(queryAndTags, equalTo(points + 1)); // include null bucket } { @@ -578,7 +671,7 @@ public void testAdjustThresholdForQueries() { | stats count(*) by x = round_to(integer, %s) """, pointArray(points)); var plan = plannerOptimizer.plan(q, searchStats, makeAnalyzer("mapping-all-types.json")); - int queryAndTags = queryAndTags(plan); + int queryAndTags = plainQueryAndTags(plan); assertThat(queryAndTags, equalTo(1)); // no rewrite } { @@ -590,7 +683,7 @@ public void testAdjustThresholdForQueries() { | stats count(*) by x = round_to(integer, %s) """, pointArray(points)); var plan = plannerOptimizer.plan(q, searchStats, makeAnalyzer("mapping-all-types.json")); - int queryAndTags = queryAndTags(plan); + int queryAndTags = plainQueryAndTags(plan); assertThat("points=" + points, queryAndTags, equalTo(points + 1)); // include null bucket } { @@ -602,7 +695,7 @@ public void testAdjustThresholdForQueries() { | stats count(*) by x = round_to(integer, %s) """, pointArray(points)); PhysicalPlan plan = plannerOptimizer.plan(q, searchStats, makeAnalyzer("mapping-all-types.json")); - int queryAndTags = queryAndTags(plan); + int queryAndTags = plainQueryAndTags(plan); assertThat("points=" + points, queryAndTags, equalTo(1)); // no rewrite } } @@ -613,7 +706,7 @@ private static void verifyQueryAndTags(List exp EsQueryExec.QueryBuilderAndTags expectedItem = expected.get(i); EsQueryExec.QueryBuilderAndTags actualItem = actual.get(i); assertEquals(expectedItem.query().toString(), actualItem.query().toString()); - assertEquals(expectedItem.tags().get(0), actualItem.tags().get(0)); + assertEquals(expectedItem.tags().getFirst(), actualItem.tags().getFirst()); } } @@ -709,6 +802,106 @@ private static List> numericBuckets(List roundingPoints) { return List.of(firstBucket, List.of(p2, p3, p2), List.of(p3, p4, p3), lastBucket); } + public void testForkWithStatsCountStarDateTrunc() { + String query = """ + from test + | fork (stats x = count(*), y = max(long) by hd = date_trunc(1 day, date)) + (stats x = count(*), y = min(long) by hd = date_trunc(2 day, date)) + """; + PhysicalPlan plan = plannerOptimizer.plan(query, searchStats, makeAnalyzer("mapping-all-types.json")); + + LimitExec limit = as(plan, LimitExec.class); + MergeExec merge = as(limit.child(), MergeExec.class); + + List mergeChildren = merge.children(); + assertThat(mergeChildren, hasSize(2)); + + PhysicalPlan firstBranch = mergeChildren.get(0); + ProjectExec firstProject = as(firstBranch, ProjectExec.class); + EvalExec firstEval = as(firstProject.child(), EvalExec.class); + LimitExec firstLimit = as(firstEval.child(), LimitExec.class); + AggregateExec firstFinalAgg = as(firstLimit.child(), AggregateExec.class); + assertThat(firstFinalAgg.getMode(), is(FINAL)); + var firstGroupings = firstFinalAgg.groupings(); + assertThat(firstGroupings, hasSize(1)); + NamedExpression firstGrouping = as(firstGroupings.getFirst(), NamedExpression.class); + assertThat(firstGrouping.name(), is("hd")); + assertThat(firstGrouping.dataType(), is(DataType.DATETIME)); + assertThat(Expressions.names(firstFinalAgg.aggregates()), is(List.of("x", "y", "hd"))); + + ExchangeExec firstExchange = as(firstFinalAgg.child(), ExchangeExec.class); + assertThat(firstExchange.inBetweenAggs(), is(true)); + AggregateExec firstInitialAgg = as(firstExchange.child(), AggregateExec.class); + FieldExtractExec firstFieldExtract = as(firstInitialAgg.child(), FieldExtractExec.class); + EvalExec firstDateTruncEval = as(firstFieldExtract.child(), EvalExec.class); + as(firstDateTruncEval.child(), EsQueryExec.class); + + PhysicalPlan secondBranch = mergeChildren.get(1); + ProjectExec secondProject = as(secondBranch, ProjectExec.class); + EvalExec secondEval = as(secondProject.child(), EvalExec.class); + LimitExec secondLimit = as(secondEval.child(), LimitExec.class); + AggregateExec secondFinalAgg = as(secondLimit.child(), AggregateExec.class); + assertThat(secondFinalAgg.getMode(), is(FINAL)); + var secondGroupings = secondFinalAgg.groupings(); + assertThat(secondGroupings, hasSize(1)); + NamedExpression secondGrouping = as(secondGroupings.getFirst(), NamedExpression.class); + assertThat(secondGrouping.name(), is("hd")); + assertThat(secondGrouping.dataType(), is(DataType.DATETIME)); + assertThat(Expressions.names(secondFinalAgg.aggregates()), is(List.of("x", "y", "hd"))); + + ExchangeExec secondExchange = as(secondFinalAgg.child(), ExchangeExec.class); + assertThat(secondExchange.inBetweenAggs(), is(true)); + AggregateExec secondInitialAgg = as(secondExchange.child(), AggregateExec.class); + FieldExtractExec secondFieldExtract = as(secondInitialAgg.child(), FieldExtractExec.class); + EvalExec secondDateTruncEval = as(secondFieldExtract.child(), EvalExec.class); + FieldExtractExec secondDateFieldExtract = as(secondDateTruncEval.child(), FieldExtractExec.class); + as(secondDateFieldExtract.child(), EsQueryExec.class); + } + + public void testSubqueryWithCountStarAndDateTrunc() { + String query = """ + from test, (from test | stats cnt = count(*) by x = date_trunc(1 day, date)) + | keep x, cnt, date + """; + PhysicalPlan plan = plannerOptimizer.plan(query, searchStats, makeAnalyzer("mapping-all-types.json")); + + ProjectExec project = as(plan, ProjectExec.class); + LimitExec limit = as(project.child(), LimitExec.class); + MergeExec merge = as(limit.child(), MergeExec.class); + + List mergeChildren = merge.children(); + assertThat(mergeChildren, hasSize(2)); + + PhysicalPlan leftBranch = mergeChildren.get(0); + ProjectExec leftProject = as(leftBranch, ProjectExec.class); + EvalExec leftEval = as(leftProject.child(), EvalExec.class); + LimitExec leftLimit = as(leftEval.child(), LimitExec.class); + ExchangeExec leftExchange = as(leftLimit.child(), ExchangeExec.class); + ProjectExec leftInnerProject = as(leftExchange.child(), ProjectExec.class); + FieldExtractExec leftFieldExtract = as(leftInnerProject.child(), FieldExtractExec.class); + as(leftFieldExtract.child(), EsQueryExec.class); + + PhysicalPlan rightBranch = mergeChildren.get(1); + + ProjectExec subqueryProject = as(rightBranch, ProjectExec.class); + EvalExec subqueryEval = as(subqueryProject.child(), EvalExec.class); + LimitExec subqueryLimit = as(subqueryEval.child(), LimitExec.class); + AggregateExec finalAgg = as(subqueryLimit.child(), AggregateExec.class); + assertThat(finalAgg.getMode(), is(FINAL)); + var groupings = finalAgg.groupings(); + assertThat(groupings, hasSize(1)); + + ExchangeExec partialExchange = as(finalAgg.child(), ExchangeExec.class); + assertThat(partialExchange.inBetweenAggs(), is(true)); + + FilterExec filter = as(partialExchange.child(), FilterExec.class); + EsStatsQueryExec statsQueryExec = as(filter.child(), EsStatsQueryExec.class); + + assertThat(statsQueryExec.stat(), is(instanceOf(EsStatsQueryExec.ByStat.class))); + EsStatsQueryExec.ByStat byStat = (EsStatsQueryExec.ByStat) statsQueryExec.stat(); + assertThat(byStat.queryBuilderAndTags(), is(not(empty()))); + } + private static SearchStats searchStats() { // create a SearchStats with min and max in milliseconds Map minValue = Map.of("date", 1697804103360L); // 2023-10-20T12:15:03.360Z diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/tree/EsqlNodeSubclassTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/tree/EsqlNodeSubclassTests.java index c0bbd51a00908..b48cf959fb016 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/tree/EsqlNodeSubclassTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/tree/EsqlNodeSubclassTests.java @@ -55,6 +55,7 @@ import org.elasticsearch.xpack.esql.plan.logical.join.JoinType; import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes; import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; +import org.elasticsearch.xpack.esql.plan.physical.EsStatsQueryExec; import org.elasticsearch.xpack.esql.plan.physical.EsStatsQueryExec.Stat; import org.elasticsearch.xpack.esql.plan.physical.EsStatsQueryExec.StatsType; import org.elasticsearch.xpack.esql.plan.physical.MergeExec; @@ -446,7 +447,7 @@ public void accept(Page page) { return randomResolvedExpression(argClass); } else if (argClass == Stat.class) { // record field - return new Stat(randomRealisticUnicodeOfLength(10), randomFrom(StatsType.values()), null); + return new EsStatsQueryExec.BasicStat(randomRealisticUnicodeOfLength(10), randomFrom(StatsType.values()), null); } else if (argClass == Integer.class) { return randomInt(); } else if (argClass == JoinType.class) {