Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -320,11 +320,26 @@ && child(0).child(0) instanceof LogicalPartitionTopN)) {
&& chosenRowNumberPartitionLimit == Long.MAX_VALUE)) {
return null;
} else {
// 4. check all windowExpression's order key is empty or is the same as chosenWindowFunc's order key
// 4. check all windowExpression's partition key and order key are compatible with chosenWindowFunc.
// The generated partitionTopN is pushed below the whole window node, so it prunes the input rows
// (keeping per-partition top-k of the chosen function) of every co-located window function.
// For another window function W to stay correct, every row that could affect W's value for a
// surviving row must also survive the pruning. This holds iff the chosen partition key is a
// SUBSET of W's partition key (chosen is coarser): then any row in W's partition with a smaller
// order value is also in the same chosen-partition with an order value <= the surviving row's,
// so its chosen-rank is within top-k and it is kept. The order key must also be compatible
// (empty or identical). Otherwise we must disable the optimization, e.g.
// 'row_number() over (partition by a order by c)' as rn together with
// 'row_number() over (partition by b order by c)' as rk, filter on rn
// (independent partitions), or a chosen partition (a, b) finer than a co-located 'partition by a'
// would prune rows the latter still needs and produce a wrong result.
for (NamedExpression windowExpr : windowExpressions) {
if (windowExpr != null && windowExpr instanceof Alias
&& windowExpr.child(0) instanceof WindowExpression) {
WindowExpression windowFunc = (WindowExpression) windowExpr.child(0);
if (!windowFunc.getPartitionKeys().containsAll(chosenWindowFunc.getPartitionKeys())) {
return null;
}
if (windowFunc.getOrderKeys().isEmpty()
|| windowFunc.getOrderKeys().equals(chosenWindowFunc.getOrderKeys())) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,4 +157,111 @@ public void testMultipleWindowsWithDifferentOrders() {
logicalOlapScan()
))));
}

@Test
public void testMultipleWindowsWithDifferentPartitions() {
ConnectContext context = MemoTestUtils.createConnectContext();
context.getSessionVariable().setEnablePartitionTopN(true);
NamedExpression gender = scan.getOutput().get(1).toSlot();
NamedExpression name = scan.getOutput().get(2).toSlot();
NamedExpression age = scan.getOutput().get(3).toSlot();

List<OrderExpression> orderKeyList = ImmutableList.of(new OrderExpression(
new OrderKey(age, true, true)));
WindowFrame windowFrame = new WindowFrame(WindowFrame.FrameUnitsType.ROWS,
WindowFrame.FrameBoundary.newPrecedingBoundary(),
WindowFrame.FrameBoundary.newCurrentRowBoundary());

// window1: partition by gender, order by age
WindowExpression window1 = new WindowExpression(new RowNumber(),
ImmutableList.of(gender), orderKeyList, windowFrame);
Alias windowAlias1 = new Alias(window1, window1.toSql());

// window2: same order key (age) but different partition key (name)
WindowExpression window2 = new WindowExpression(new RowNumber(),
ImmutableList.of(name), orderKeyList, windowFrame);
Alias windowAlias2 = new Alias(window2, window2.toSql());

List<NamedExpression> expressions = Lists.newArrayList(windowAlias1, windowAlias2);
LogicalWindow<LogicalOlapScan> window = new LogicalWindow<>(expressions, scan);
// filter on window1 (row_number partitioned by gender)
Expression filterPredicate = new LessThanEqual(window.getOutput().get(4).toSlot(), Literal.of(1));

LogicalPlan plan = new LogicalPlanBuilder(window)
.filter(filterPredicate)
.project(ImmutableList.of(0))
.build();

// The optimization must be disabled: pushing a partitionTopN partitioned by gender below the whole
// window would prune the input rows of window2 (partitioned by name) and corrupt its result.
PlanChecker.from(context, plan)
.applyTopDown(new CreatePartitionTopNFromWindow())
.matches(
logicalProject(
logicalFilter(
logicalWindow(
logicalOlapScan()
))));
}

@Test
public void testMultipleWindowsSubsetPartitionGeneratesTopn() {
ConnectContext context = MemoTestUtils.createConnectContext();
context.getSessionVariable().setEnablePartitionTopN(true);
NamedExpression gender = scan.getOutput().get(1).toSlot();
NamedExpression name = scan.getOutput().get(2).toSlot();
NamedExpression age = scan.getOutput().get(3).toSlot();

List<Expression> partitionKeyList = ImmutableList.of(gender);
List<OrderExpression> orderKeyList = ImmutableList.of(new OrderExpression(
new OrderKey(age, true, true)));
WindowFrame windowFrame = new WindowFrame(WindowFrame.FrameUnitsType.ROWS,
WindowFrame.FrameBoundary.newPrecedingBoundary(),
WindowFrame.FrameBoundary.newCurrentRowBoundary());

// window1 (chosen via filter): partition by gender, the coarser key
WindowExpression window1 = new WindowExpression(new RowNumber(),
partitionKeyList, orderKeyList, windowFrame);
Alias windowAlias1 = new Alias(window1, window1.toSql());

// window2: partition by (gender, name) -- a SUPERSET of window1's partition key
WindowExpression window2 = new WindowExpression(new RowNumber(),
ImmutableList.of(gender, name), orderKeyList, windowFrame);
Alias windowAlias2 = new Alias(window2, window2.toSql());

List<NamedExpression> expressions = Lists.newArrayList(windowAlias1, windowAlias2);
LogicalWindow<LogicalOlapScan> window = new LogicalWindow<>(expressions, scan);
// filter on window1 (the coarser, chosen window)
Expression filterPredicate = new LessThanEqual(window.getOutput().get(4).toSlot(), Literal.of(100));

LogicalPlan plan = new LogicalPlanBuilder(window)
.filter(filterPredicate)
.project(ImmutableList.of(0))
.build();

// The chosen partition key (gender) is a subset of the other window's
// (gender, name), so pruning per gender cannot corrupt window2 -> the
// partition topn is still generated.
PlanChecker.from(context, plan)
.applyTopDown(new CreatePartitionTopNFromWindow())
.matches(
logicalProject(
logicalFilter(
logicalWindow(
logicalPartitionTopN(
logicalOlapScan()
).when(logicalPartitionTopN -> {
WindowFuncType funName = logicalPartitionTopN.getFunction();
List<Expression> partitionKeys = logicalPartitionTopN.getPartitionKeys();
boolean hasGlobalLimit = logicalPartitionTopN.hasGlobalLimit();
long partitionLimit = logicalPartitionTopN.getPartitionLimit();
return funName == WindowFuncType.ROW_NUMBER
&& partitionKeys.equals(partitionKeyList)
&& !hasGlobalLimit && partitionLimit == 100;
})
)
).when(filter -> filter.getConjuncts().equals(ImmutableSet.of(filterPredicate)))
)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !multi_window --
1 A X 1 1 1
4 B X 4 1 3
7 C X 7 1 4

-- !subset_safe_2window --
1 A X 1 1 1
2 A X 2 2 2
4 B X 4 1 1
5 B Y 5 2 1
7 C X 7 1 1
8 C Z 8 2 1

-- !subset_safe_3window --
1 A X 1 1 1 1
4 B X 4 1 1 1
7 C X 7 1 1 1

Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("check_partitionkey") {
sql "drop table if exists multi_window_cases"
sql """
create table multi_window_cases (
id int,
g1 varchar(8),
g2 varchar(8),
ord_key int,
amt int
) properties ('replication_num'='1');

insert into multi_window_cases values
(1,'A','X',1,10),
(2,'A','X',2,20),
(3,'A','Y',3,30),
(4,'B','X',4,40),
(5,'B','Y',5,50),
(6,'B','Y',6,60),
(7,'C','X',7,70),
(8,'C','Z',8,80);
"""

// Two row_number() with the SAME order key (ord_key) but DIFFERENT partition keys
// (g1 vs g2). The filter is on rn1 (partition by g1). The partition topn generated
// from rn1 would be pushed below the whole window node and prune the input rows of
// rn2 (partition by g2), corrupting rn2. So partition topn must NOT be applied here.
explain {
sql """
select id, g1, g2, ord_key, rn1, rn2
from (
select
id, g1, g2, ord_key,
row_number() over (partition by g1 order by ord_key) as rn1,
row_number() over (partition by g2 order by ord_key) as rn2
from multi_window_cases
) q
where rn1 <= 1;
"""
notContains("VPartitionTopN")
}

qt_multi_window """
select id, g1, g2, ord_key, rn1, rn2
from (
select
id, g1, g2, ord_key,
row_number() over (partition by g1 order by ord_key) as rn1,
row_number() over (partition by g2 order by ord_key) as rn2
from multi_window_cases
) q
where rn1 <= 1
order by id;
"""

// ---------------------------------------------------------------------
// The optimization is STILL valid when the chosen window's partition key
// is a SUBSET of every other co-located window's partition key (i.e. the
// chosen one is coarser). Pruning per chosen-partition top-k then only
// removes rows the finer windows do not need, so partition topn MUST still
// fire and the results must stay correct.

// 2 windows: the chosen rank(partition by g1) is coarser than the
// row_number(partition by g1, g2). The filter is on the coarser window, so
// the partition topn (partition by g1) is safe to push down -> it fires.
explain {
sql """
select id, g1, g2, ord_key, rk, rn
from (
select
id, g1, g2, ord_key,
rank() over (partition by g1 order by ord_key) as rk,
row_number() over (partition by g1, g2 order by ord_key) as rn
from multi_window_cases
) q
where rk <= 2;
"""
contains("VPartitionTopN")
}

qt_subset_safe_2window """
select id, g1, g2, ord_key, rk, rn
from (
select
id, g1, g2, ord_key,
rank() over (partition by g1 order by ord_key) as rk,
row_number() over (partition by g1, g2 order by ord_key) as rn
from multi_window_cases
) q
where rk <= 2
order by id;
"""

// 3 windows: the chosen rank(partition by g1) is a subset of both the
// (g1, g2) windows, so pruning by g1 cannot corrupt them -> it still fires.
explain {
sql """
select id, g1, g2, ord_key, rk, rn, rk2
from (
select
id, g1, g2, ord_key,
rank() over (partition by g1 order by ord_key) as rk,
row_number() over (partition by g1, g2 order by ord_key) as rn,
rank() over (partition by g1, g2 order by ord_key) as rk2
from multi_window_cases
) q
where rk <= 1;
"""
contains("VPartitionTopN")
}

qt_subset_safe_3window """
select id, g1, g2, ord_key, rk, rn, rk2
from (
select
id, g1, g2, ord_key,
rank() over (partition by g1 order by ord_key) as rk,
row_number() over (partition by g1, g2 order by ord_key) as rn,
rank() over (partition by g1, g2 order by ord_key) as rk2
from multi_window_cases
) q
where rk <= 1
order by id;
"""

// 3 windows but one is partitioned by g2, which is NOT a superset of the
// chosen g1. Pruning by g1 would corrupt the g2 window, so the optimization
// must be disabled.
explain {
sql """
select id, g1, g2, ord_key, rk, rn, rk2
from (
select
id, g1, g2, ord_key,
rank() over (partition by g1 order by ord_key) as rk,
row_number() over (partition by g1, g2 order by ord_key) as rn,
rank() over (partition by g2 order by ord_key) as rk2
from multi_window_cases
) q
where rk <= 1;
"""
notContains("VPartitionTopN")
}
}
Loading
Loading