Skip to content

Commit e5116e8

Browse files
IGNITE-12692 SQL Calcite: Distributed table modify
1 parent a31421f commit e5116e8

File tree

11 files changed

+649
-373
lines changed

11 files changed

+649
-373
lines changed

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@
6464
import org.apache.ignite.internal.processors.query.calcite.rule.SortAggregateConverterRule;
6565
import org.apache.ignite.internal.processors.query.calcite.rule.SortConverterRule;
6666
import org.apache.ignite.internal.processors.query.calcite.rule.TableFunctionScanConverterRule;
67-
import org.apache.ignite.internal.processors.query.calcite.rule.TableModifyConverterRule;
67+
import org.apache.ignite.internal.processors.query.calcite.rule.TableModifyDistributedConverterRule;
68+
import org.apache.ignite.internal.processors.query.calcite.rule.TableModifySingleNodeConverterRule;
6869
import org.apache.ignite.internal.processors.query.calcite.rule.UncollectConverterRule;
6970
import org.apache.ignite.internal.processors.query.calcite.rule.UnionConverterRule;
7071
import org.apache.ignite.internal.processors.query.calcite.rule.ValuesConverterRule;
@@ -292,7 +293,8 @@ public enum PlannerPhase {
292293
SetOpConverterRule.MAP_REDUCE_INTERSECT,
293294
ProjectConverterRule.INSTANCE,
294295
FilterConverterRule.INSTANCE,
295-
TableModifyConverterRule.INSTANCE,
296+
TableModifySingleNodeConverterRule.INSTANCE,
297+
TableModifyDistributedConverterRule.INSTANCE,
296298
UnionConverterRule.INSTANCE,
297299
SortConverterRule.INSTANCE,
298300
TableFunctionScanConverterRule.INSTANCE

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableModify.java

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.ignite.internal.processors.query.calcite.rel;
1919

2020
import java.util.List;
21+
import com.google.common.collect.ImmutableList;
2122
import org.apache.calcite.plan.RelOptCluster;
2223
import org.apache.calcite.plan.RelOptTable;
2324
import org.apache.calcite.plan.RelTraitSet;
@@ -26,10 +27,16 @@
2627
import org.apache.calcite.rel.core.TableModify;
2728
import org.apache.calcite.rel.metadata.RelMetadataQuery;
2829
import org.apache.calcite.rex.RexNode;
30+
import org.apache.calcite.util.Pair;
31+
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
32+
import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
2933
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
3034

3135
/** */
3236
public class IgniteTableModify extends TableModify implements IgniteRel {
37+
/** If table modify can affect data source. */
38+
private final boolean affectsSrc;
39+
3340
/**
3441
* Creates a {@code TableModify}.
3542
*
@@ -44,8 +51,9 @@ public class IgniteTableModify extends TableModify implements IgniteRel {
4451
* @param input Sub-query or filter condition.
4552
* @param operation Modify operation (INSERT, UPDATE, DELETE, MERGE).
4653
* @param updateColumnList List of column identifiers to be updated (e.g. ident1, ident2); null if not UPDATE.
47-
* @param sourceExpressionList List of value expressions to be set (e.g. exp1, exp2); null if not UPDATE.
54+
* @param srcExpressionList List of value expressions to be set (e.g. exp1, exp2); null if not UPDATE.
4855
* @param flattened Whether set flattens the input row type.
56+
* @param affectsSrc If table modify can affect data source.
4957
*/
5058
public IgniteTableModify(
5159
RelOptCluster cluster,
@@ -54,12 +62,15 @@ public IgniteTableModify(
5462
RelNode input,
5563
Operation operation,
5664
List<String> updateColumnList,
57-
List<RexNode> sourceExpressionList,
58-
boolean flattened
65+
List<RexNode> srcExpressionList,
66+
boolean flattened,
67+
boolean affectsSrc
5968
) {
6069
super(cluster, traitSet, table, Commons.context(cluster).catalogReader(),
6170
input, operation, updateColumnList,
62-
sourceExpressionList, flattened);
71+
srcExpressionList, flattened);
72+
73+
this.affectsSrc = affectsSrc;
6374
}
6475

6576
/**
@@ -76,7 +87,8 @@ public IgniteTableModify(RelInput input) {
7687
input.getEnum("operation", Operation.class),
7788
input.getStringList("updateColumnList"),
7889
input.getExpressionList("sourceExpressionList"),
79-
input.getBoolean("flattened", true)
90+
input.getBoolean("flattened", true),
91+
false // Field only for planning.
8092
);
8193
}
8294

@@ -90,7 +102,8 @@ public IgniteTableModify(RelInput input) {
90102
getOperation(),
91103
getUpdateColumnList(),
92104
getSourceExpressionList(),
93-
isFlattened());
105+
isFlattened(),
106+
affectsSrc);
94107
}
95108

96109
/** {@inheritDoc} */
@@ -101,11 +114,34 @@ public IgniteTableModify(RelInput input) {
101114
/** {@inheritDoc} */
102115
@Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
103116
return new IgniteTableModify(cluster, getTraitSet(), getTable(), sole(inputs),
104-
getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened());
117+
getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened(), affectsSrc);
105118
}
106119

107120
/** {@inheritDoc} */
108121
@Override public double estimateRowCount(RelMetadataQuery mq) {
109122
return 1.0D;
110123
}
124+
125+
/** {@inheritDoc} */
126+
@Override public Pair<RelTraitSet, List<RelTraitSet>> deriveTraits(RelTraitSet childTraits, int childId) {
127+
// Don't derive traits for single-node table modify.
128+
if (TraitUtils.distribution(traitSet) == IgniteDistributions.single())
129+
return null;
130+
131+
assert childId == 0;
132+
133+
if (childTraits.getConvention() != IgniteConvention.INSTANCE)
134+
return null;
135+
136+
// If modify can affect data source (for example, INSERT contains self table as source) only
137+
// modified table affinity distibution is possible, otherwise inconsistency is possible on remote nodes.
138+
if (affectsSrc)
139+
return null;
140+
141+
// Any distributed (random/hash) trait is accepted if data source is not affected by modify.
142+
if (!TraitUtils.distribution(childTraits).satisfies(IgniteDistributions.random()))
143+
return null;
144+
145+
return Pair.of(traitSet, ImmutableList.of(childTraits));
146+
}
111147
}

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/AbstractIgniteConverterRule.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.calcite.rel.convert.ConverterRule;
2525
import org.apache.calcite.rel.metadata.RelMetadataQuery;
2626
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
27+
import org.jetbrains.annotations.Nullable;
2728

2829
/** */
2930
public abstract class AbstractIgniteConverterRule<T extends RelNode> extends ConverterRule {
@@ -39,7 +40,7 @@ protected AbstractIgniteConverterRule(Class<T> clazz, String descriptionPrefix)
3940
}
4041

4142
/** {@inheritDoc} */
42-
@Override public final RelNode convert(RelNode rel) {
43+
@Override public final @Nullable RelNode convert(RelNode rel) {
4344
return convert(rel.getCluster().getPlanner(), rel.getCluster().getMetadataQuery(), (T)rel);
4445
}
4546

@@ -51,5 +52,5 @@ protected AbstractIgniteConverterRule(Class<T> clazz, String descriptionPrefix)
5152
* @param rel Rel node.
5253
* @return Physical rel.
5354
*/
54-
protected abstract PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, T rel);
55+
protected abstract @Nullable PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, T rel);
5556
}

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/HashAggregateConverterRule.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919

2020
import org.apache.calcite.plan.RelOptCluster;
2121
import org.apache.calcite.plan.RelOptPlanner;
22-
import org.apache.calcite.plan.RelOptRule;
2322
import org.apache.calcite.plan.RelTraitSet;
2423
import org.apache.calcite.rel.PhysicalNode;
2524
import org.apache.calcite.rel.RelNode;
25+
import org.apache.calcite.rel.convert.ConverterRule;
2626
import org.apache.calcite.rel.logical.LogicalAggregate;
2727
import org.apache.calcite.rel.metadata.RelMetadataQuery;
2828
import org.apache.ignite.internal.processors.query.calcite.hint.HintUtils;
@@ -31,16 +31,17 @@
3131
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapHashAggregate;
3232
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceHashAggregate;
3333
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
34+
import org.jetbrains.annotations.Nullable;
3435

3536
/**
3637
*
3738
*/
3839
public class HashAggregateConverterRule {
3940
/** */
40-
public static final RelOptRule COLOCATED = new ColocatedHashAggregateConverterRule();
41+
public static final ConverterRule COLOCATED = new ColocatedHashAggregateConverterRule();
4142

4243
/** */
43-
public static final RelOptRule MAP_REDUCE = new MapReduceHashAggregateConverterRule();
44+
public static final ConverterRule MAP_REDUCE = new MapReduceHashAggregateConverterRule();
4445

4546
/** */
4647
private HashAggregateConverterRule() {
@@ -55,8 +56,11 @@ private static class ColocatedHashAggregateConverterRule extends AbstractIgniteC
5556
}
5657

5758
/** {@inheritDoc} */
58-
@Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq,
59-
LogicalAggregate agg) {
59+
@Override protected @Nullable PhysicalNode convert(
60+
RelOptPlanner planner,
61+
RelMetadataQuery mq,
62+
LogicalAggregate agg
63+
) {
6064
if (HintUtils.isExpandDistinctAggregate(agg))
6165
return null;
6266

@@ -84,8 +88,11 @@ private static class MapReduceHashAggregateConverterRule extends AbstractIgniteC
8488
}
8589

8690
/** {@inheritDoc} */
87-
@Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq,
88-
LogicalAggregate agg) {
91+
@Override protected @Nullable PhysicalNode convert(
92+
RelOptPlanner planner,
93+
RelMetadataQuery mq,
94+
LogicalAggregate agg
95+
) {
8996
if (HintUtils.isExpandDistinctAggregate(agg))
9097
return null;
9198

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/ProjectConverterRule.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@
2020
import java.util.Set;
2121
import org.apache.calcite.plan.RelOptCluster;
2222
import org.apache.calcite.plan.RelOptPlanner;
23-
import org.apache.calcite.plan.RelOptRule;
2423
import org.apache.calcite.plan.RelTraitSet;
2524
import org.apache.calcite.rel.PhysicalNode;
2625
import org.apache.calcite.rel.RelNode;
26+
import org.apache.calcite.rel.convert.ConverterRule;
2727
import org.apache.calcite.rel.core.CorrelationId;
2828
import org.apache.calcite.rel.logical.LogicalProject;
2929
import org.apache.calcite.rel.metadata.RelMetadataQuery;
@@ -39,7 +39,7 @@
3939
*/
4040
public class ProjectConverterRule extends AbstractIgniteConverterRule<LogicalProject> {
4141
/** */
42-
public static final RelOptRule INSTANCE = new ProjectConverterRule();
42+
public static final ConverterRule INSTANCE = new ProjectConverterRule();
4343

4444
/** */
4545
public ProjectConverterRule() {

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableModifyConverterRule.java

Lines changed: 0 additions & 156 deletions
This file was deleted.

0 commit comments

Comments
 (0)