diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java index d04b63031d8684..e398d6f905a51d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java @@ -159,6 +159,7 @@ private Plan bindOlapTableSink(MatchingContext> ctx) { Database database = pair.first; OlapTable table = pair.second; boolean isPartialUpdate = sink.isPartialUpdate() && table.getKeysType() == KeysType.UNIQUE_KEYS; + boolean isDeletePartialUpdate = isPartialUpdate && sink.getDMLCommandType() == DMLCommandType.DELETE; TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = sink.getPartialUpdateNewRowPolicy(); LogicalPlan child = ((LogicalPlan) sink.child()); @@ -170,7 +171,7 @@ private Plan bindOlapTableSink(MatchingContext> ctx) { // 1. bind target columns: from sink's column names to target tables' Columns Pair, Integer> bindColumnsResult = bindTargetColumns(table, sink.getColNames(), childHasSeqCol, needExtraSeqCol, - sink.getDMLCommandType() == DMLCommandType.GROUP_COMMIT); + sink.getDMLCommandType() == DMLCommandType.GROUP_COMMIT, isDeletePartialUpdate); List bindColumns = bindColumnsResult.first; int extraColumnsNum = bindColumnsResult.second; @@ -255,7 +256,7 @@ private Plan bindOlapTableSink(MatchingContext> ctx) { } Map columnToOutput = getColumnToOutput( - ctx, table, isPartialUpdate, boundSink, child); + ctx, table, isPartialUpdate, isDeletePartialUpdate, boundSink, child); LogicalProject fullOutputProject = getOutputProjectByCoercion( table.getFullSchema(), child, columnToOutput); List columns = new ArrayList<>(table.getFullSchema().size()); @@ -347,7 +348,8 @@ private LogicalProject getOutputProjectByCoercion(List tableSchema, L private static Map getColumnToOutput( MatchingContext> ctx, - TableIf table, boolean isPartialUpdate, LogicalTableSink boundSink, LogicalPlan child) { + TableIf table, boolean isPartialUpdate, boolean isDeletePartialUpdate, + LogicalTableSink boundSink, LogicalPlan child) { // we need to insert all the columns of the target table // although some columns are not mentions. // so we add a projects to supply the default value. @@ -470,6 +472,18 @@ private static Map getColumnToOutput( // if processed in upper for loop, will lead to not found slot error // It's the same reason for moving the processing of materialized columns down. for (Column column : generatedColumns) { + if (isDeletePartialUpdate) { + NamedExpression childOutput = columnToChildOutput.get(column); + if (childOutput == null) { + continue; + } + Alias output = new Alias(TypeCoercionUtils.castIfNotSameType( + childOutput, DataType.fromCatalogType(column.getType())), column.getName()); + columnToOutput.put(column.getName(), output); + columnToReplaced.put(column.getName(), output.toSlot()); + replaceMap.put(output.toSlot(), output.child()); + continue; + } Map currentSessionVars = ctx.connectContext.getSessionVariable().getAffectQueryResultInPlanVariables(); try (AutoCloseSessionVariable autoClose = new AutoCloseSessionVariable(ctx.connectContext, @@ -594,7 +608,7 @@ private Plan bindHiveTableSink(MatchingContext> ctx) if (boundSink.getCols().size() != child.getOutput().size()) { throw new AnalysisException("insert into cols should be corresponding to the query output"); } - Map columnToOutput = getColumnToOutput(ctx, table, false, + Map columnToOutput = getColumnToOutput(ctx, table, false, false, boundSink, child); LogicalProject fullOutputProject = getOutputProjectByCoercion(table.getFullSchema(), child, columnToOutput); return boundSink.withChildAndUpdateOutput(fullOutputProject); @@ -635,7 +649,7 @@ private Plan bindIcebergTableSink(MatchingContext> if (boundSink.getCols().size() != child.getOutput().size()) { throw new AnalysisException("insert into cols should be corresponding to the query output"); } - Map columnToOutput = getColumnToOutput(ctx, table, false, + Map columnToOutput = getColumnToOutput(ctx, table, false, false, boundSink, child); LogicalProject fullOutputProject = getOutputProjectByCoercion(table.getFullSchema(), child, columnToOutput); return boundSink.withChildAndUpdateOutput(fullOutputProject); @@ -843,7 +857,7 @@ private List bindPartitionIds(OlapTable table, List partitions, bo // bindTargetColumns means bind sink node's target columns' names to target table's columns private Pair, Integer> bindTargetColumns(OlapTable table, List colsName, - boolean childHasSeqCol, boolean needExtraSeqCol, boolean isGroupCommit) { + boolean childHasSeqCol, boolean needExtraSeqCol, boolean isGroupCommit, boolean isDeletePartialUpdate) { // if the table set sequence column in stream load phase, the sequence map column is null, we query it. if (colsName.isEmpty()) { // ATTN: group commit without column list should return all base index column @@ -861,7 +875,7 @@ private Pair, Integer> bindTargetColumns(OlapTable table, List