Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import org.elasticsearch.xpack.esql.expression.predicate.logical.And;
import org.elasticsearch.xpack.esql.expression.predicate.nulls.IsNotNull;
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.Equals;
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.GreaterThanOrEqual;
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.In;
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.LessThanOrEqual;
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.NotEquals;
import org.elasticsearch.xpack.esql.expression.promql.function.PromqlFunctionRegistry;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.OptimizerRules;
Expand Down Expand Up @@ -97,16 +99,34 @@ protected LogicalPlan rule(PromqlCommand promqlCommand) {
LogicalPlan promqlPlan = promqlCommand.promqlPlan();

// first replace the Placeholder relation with the child plan
promqlPlan = promqlPlan.transformUp(PlaceholderRelation.class, pr -> promqlCommand.child());
promqlPlan = promqlPlan.transformUp(PlaceholderRelation.class, pr -> withTimestampFilter(promqlCommand, promqlCommand.child()));

// Translate based on plan type
return translate(promqlCommand, promqlPlan);
}

private LogicalPlan translate(PromqlCommand promqlCommand, LogicalPlan promqlPlan) {
// convert the plan bottom-up
MapResult result = map(promqlCommand, promqlPlan);
return result.plan();
promqlPlan = map(promqlCommand, promqlPlan).plan();

return promqlPlan;
}

private static LogicalPlan withTimestampFilter(PromqlCommand promqlCommand, LogicalPlan plan) {
if (promqlCommand.start().value() != null && promqlCommand.end().value() != null) {
Source promqlSource = promqlCommand.source();
Expression timestamp = promqlCommand.timestamp();
plan = new Filter(
promqlSource,
plan,
new And(
promqlSource,
new GreaterThanOrEqual(promqlSource, timestamp, promqlCommand.start()),
new LessThanOrEqual(promqlSource, timestamp, promqlCommand.end())
)
);
}
return plan;
}

private record MapResult(LogicalPlan plan, Map<String, Expression> extras) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;

import static java.util.Collections.emptyMap;
Expand All @@ -52,13 +51,10 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assume.assumeTrue;

// @TestLogging(value = "org.elasticsearch.xpack.esql:TRACE", reason = "debug tests")
public class PromqlLogicalPlanOptimizerTests extends AbstractLogicalPlanOptimizerTests {

private static final String PARAM_FORMATTING = "%1$s";

private static Analyzer tsAnalyzer;

@BeforeClass
Expand Down Expand Up @@ -92,8 +88,6 @@ public void testExplainPromql() {
| LIMIT 1000
)
""");

logger.trace(plan);
}

public void testExplainPromqlSimple() {
Expand All @@ -105,8 +99,6 @@ public void testExplainPromqlSimple() {
| WHERE TRANGE($now-1h, $now)
| STATS AVG(AVG_OVER_TIME(network.bytes_in)) BY TBUCKET(1h)
""");

logger.trace(plan);
}

/**
Expand Down Expand Up @@ -144,8 +136,6 @@ public void testAvgAvgOverTimeOutput() {
| LIMIT 1000
""");

logger.trace(plan);

var project = as(plan, Project.class);
assertThat(project.projections(), hasSize(3));

Expand Down Expand Up @@ -239,8 +229,6 @@ public void testTSAvgAvgOverTimeOutput() {
| STATS AVG(AVG_OVER_TIME(network.bytes_in)) BY pod, TBUCKET(1h)
| LIMIT 1000
""");

logger.trace(plan);
}

/**
Expand Down Expand Up @@ -270,9 +258,6 @@ public void testTSAvgWithoutByDimension() {
| STATS avg(avg_over_time(network.bytes_in)) BY TBUCKET(1h)
| LIMIT 1000
""");

logger.trace(plan);

}

/**
Expand Down Expand Up @@ -305,8 +290,6 @@ public void testPromqlAvgWithoutByDimension() {
)
| LIMIT 1000
""");

logger.trace(plan);
}

/**
Expand Down Expand Up @@ -336,8 +319,6 @@ public void testRangeSelector() {
TS k8s
| promql step 1h ( max by (pod) (avg_over_time(network.bytes_in[1h])) )
""");

logger.trace(plan);
}

@AwaitsFix(bugUrl = "Invalid call to dataType on an unresolved object ?RATE_$1")
Expand All @@ -353,7 +334,6 @@ avg by (pod) (rate(network.bytes_in[1h]))
""";

var plan = planPromql(testQuery);
logger.trace(plan);
}

/**
Expand Down Expand Up @@ -385,8 +365,6 @@ public void testStartEndStep() {
""";

var plan = planPromql(testQuery);
List<LogicalPlan> collect = plan.collect(Bucket.class::isInstance);
logger.trace(plan);
}

/**
Expand Down Expand Up @@ -427,7 +405,6 @@ max by (pod) (avg_over_time(network.bytes_in{pod=~"host-0|host-1|host-2"}[5m]))
assertThat(filters, hasSize(1));
var filter = (Filter) filters.getFirst();
assertThat(filter.condition().anyMatch(In.class::isInstance), equalTo(true));
logger.trace(plan);
}

public void testLabelSelectorPrefix() {
Expand All @@ -448,7 +425,6 @@ avg by (pod) (avg_over_time(network.bytes_in{pod=~"host-.*"}[5m]))
var filter = (Filter) filters.getFirst();
assertThat(filter.condition().anyMatch(StartsWith.class::isInstance), equalTo(true));
assertThat(filter.condition().anyMatch(NotEquals.class::isInstance), equalTo(false));
logger.trace(plan);
}

public void testLabelSelectorProperPrefix() {
Expand Down Expand Up @@ -518,7 +494,6 @@ sum by (host.name, mountpoint) (last_over_time(system.filesystem.usage{state=~"u
""";

var plan = planPromql(testQuery);
logger.trace(plan);
}

@AwaitsFix(bugUrl = "only aggregations across timeseries are supported at this time (found [foo or bar])")
Expand All @@ -539,7 +514,6 @@ public void testGrammar() {
""";

var plan = planPromql(testQuery);
logger.trace(plan);
}

// public void testPromqlArithmetricOperators() {
Expand Down Expand Up @@ -567,9 +541,9 @@ protected LogicalPlan planPromql(String query) {
query = query.replace("$now-1h", '"' + Instant.now().minus(1, ChronoUnit.HOURS).toString() + '"');
query = query.replace("$now", '"' + Instant.now().toString() + '"');
var analyzed = tsAnalyzer.analyze(parser.createStatement(query));
logger.trace(analyzed);
logger.trace("analyzed plan:\n{}", analyzed);
var optimized = logicalOptimizer.optimize(analyzed);
logger.trace(optimized);
logger.trace("optimized plan:\n{}", optimized);
return optimized;
}
}