diff --git a/docs/content/docs/sql/reference/queries/changelog.md b/docs/content/docs/sql/reference/queries/changelog.md index 60773af2b27f1..c15ff255cfd29 100644 --- a/docs/content/docs/sql/reference/queries/changelog.md +++ b/docs/content/docs/sql/reference/queries/changelog.md @@ -45,7 +45,7 @@ Note: This version requires that your CDC data encodes updates using a full imag ```sql SELECT * FROM FROM_CHANGELOG( - input => TABLE source_table, + input => TABLE source_table [PARTITION BY key_col], [op => DESCRIPTOR(op_column_name),] [op_mapping => MAP[ 'c, r', 'INSERT', @@ -61,7 +61,7 @@ SELECT * FROM FROM_CHANGELOG( | Parameter | Required | Description | |:-------------|:---------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `input` | Yes | The input table. Must be append-only. | +| `input` | Yes | The input table. Must be append-only. Use `PARTITION BY` to ensure rows for the same key are processed together. This is required when downstream operators are keyed on that column. | | `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. The column must exist in the input table and be of type STRING. | | `op_mapping` | No | A `MAP` mapping user-defined codes to Flink change operation names. Keys are user-defined codes (e.g., `'c'`, `'u'`, `'d'`), values are Flink change operation names (`INSERT`, `UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated codes to map multiple codes to the same operation (e.g., `'c, r'`). Each change operation may appear at most once across all entries. | | `error_handling` | No | Controls behavior when an input row's operation code is `NULL` or not present in the `op_mapping`. Valid values: `FAIL` (default) — throw a `TableRuntimeException`, `SKIP` — silently drop the row. | @@ -127,6 +127,14 @@ SELECT * FROM FROM_CHANGELOG( -- The operation column named 'operation' is used instead of 'op' ``` +#### Partitioning by a key + +```sql +SELECT * FROM FROM_CHANGELOG( + input => TABLE cdc_stream PARTITION BY id +) +``` + #### Invalid operation code handling Two `error_handling` modes are supported. The job can either fail upon an invalid or unknown op code, or skip the row and continue processing. @@ -177,7 +185,7 @@ This is useful when you need to materialize changelog events into a downstream s ```sql SELECT * FROM TO_CHANGELOG( - input => TABLE source_table, + input => TABLE source_table [PARTITION BY key_col], [op => DESCRIPTOR(op_column_name),] [op_mapping => MAP['INSERT', 'I', 'DELETE', 'D', ...]] ) @@ -185,10 +193,10 @@ SELECT * FROM TO_CHANGELOG( ### Parameters -| Parameter | Required | Description | -|:-------------|:---------|:------------| -| `input` | Yes | The input table. Accepts insert-only, retract, and upsert tables. | -| `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. | +| Parameter | Required | Description | +|:-------------|:---------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `input` | Yes | The input table. With `PARTITION BY`, rows with the same key are co-located and run in the same operator instance. Without `PARTITION BY`, each row is processed independently. Accepts insert-only, retract, and upsert tables. For upsert tables, the provided `PARTITION BY` key should match or be a subset of the upsert key of the subquery. | +| `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. | | `op_mapping` | No | A `MAP` mapping change operation names to custom output codes. Keys can contain comma-separated names to map multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). When provided, only mapped operations are forwarded - unmapped events are dropped. Each change operation may appear at most once across all entries. | #### Default op_mapping diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java index eb61371329c2f..aa2417d4531ce 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java @@ -1467,6 +1467,17 @@ default TableResult executeInsert( * TableRuntimeException} when an input row's op code is {@code NULL} or not present in the * mapping; pass {@code error_handling => 'SKIP'} to silently drop those rows instead. * + *

By default, the input is processed with row semantics (each row independently). To + * co-locate rows with the same key in the same parallel operator instance, partition the input + * first via {@link #partitionBy(Expression...)} and invoke the function via {@link + * PartitionedTable#process(String, Object...)}: + * + *

{@code
+     * Table result = cdcStream
+     *     .partitionBy($("id"))
+     *     .process("FROM_CHANGELOG");
+     * }
+ * *

Optional arguments can be passed using named expressions: * *

{@code
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index bbe38ca13d5a5..8ecc43aa9827b 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -36,6 +36,7 @@
 import org.apache.flink.table.types.inference.InputTypeStrategies;
 import org.apache.flink.table.types.inference.StaticArgument;
 import org.apache.flink.table.types.inference.StaticArgumentTrait;
+import org.apache.flink.table.types.inference.TraitCondition;
 import org.apache.flink.table.types.inference.TypeStrategies;
 import org.apache.flink.table.types.inference.strategies.ArrayOfStringArgumentTypeStrategy;
 import org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies;
@@ -785,22 +786,22 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
                     .name("TO_CHANGELOG")
                     .kind(PROCESS_TABLE)
                     .staticArguments(
-                            // Row semantics (no PARTITION BY). Accepts updating
-                            // inputs. The planner inserts ChangelogNormalize for
-                            // upsert sources to produce UPDATE_BEFORE and full
-                            // DELETE rows.
+                            // Row semantics (no PARTITION BY).
+                            // With PARTITION BY, switches to set
+                            // semantics for co-located parallel execution.
                             StaticArgument.table(
-                                    "input",
-                                    Row.class,
-                                    false,
-                                    EnumSet.of(
-                                            StaticArgumentTrait.TABLE,
-                                            StaticArgumentTrait.ROW_SEMANTIC_TABLE,
-                                            StaticArgumentTrait.SUPPORT_UPDATES,
-                                            StaticArgumentTrait.REQUIRE_UPDATE_BEFORE,
-                                            // Not strictly necessary but explicitly state that
-                                            // we require full deletes.
-                                            StaticArgumentTrait.REQUIRE_FULL_DELETE)),
+                                            "input",
+                                            Row.class,
+                                            false,
+                                            EnumSet.of(
+                                                    StaticArgumentTrait.TABLE,
+                                                    StaticArgumentTrait.ROW_SEMANTIC_TABLE,
+                                                    StaticArgumentTrait.SUPPORT_UPDATES,
+                                                    StaticArgumentTrait.REQUIRE_UPDATE_BEFORE,
+                                                    StaticArgumentTrait.REQUIRE_FULL_DELETE))
+                                    .withConditionalTrait(
+                                            StaticArgumentTrait.SET_SEMANTIC_TABLE,
+                                            TraitCondition.hasPartitionBy()),
                             StaticArgument.scalar("op", DataTypes.DESCRIPTOR(), true),
                             StaticArgument.scalar(
                                     "op_mapping",
@@ -817,13 +818,19 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
                     .name("FROM_CHANGELOG")
                     .kind(PROCESS_TABLE)
                     .staticArguments(
+                            // Row semantics (no PARTITION BY).
+                            // With PARTITION BY, switches to set
+                            // semantics for co-located parallel execution.
                             StaticArgument.table(
-                                    "input",
-                                    Row.class,
-                                    false,
-                                    EnumSet.of(
-                                            StaticArgumentTrait.TABLE,
-                                            StaticArgumentTrait.ROW_SEMANTIC_TABLE)),
+                                            "input",
+                                            Row.class,
+                                            false,
+                                            EnumSet.of(
+                                                    StaticArgumentTrait.TABLE,
+                                                    StaticArgumentTrait.ROW_SEMANTIC_TABLE))
+                                    .withConditionalTrait(
+                                            StaticArgumentTrait.SET_SEMANTIC_TABLE,
+                                            TraitCondition.hasPartitionBy()),
                             StaticArgument.scalar("op", DataTypes.DESCRIPTOR(), true),
                             StaticArgument.scalar(
                                     "op_mapping",
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/BuiltInCondition.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/BuiltInCondition.java
new file mode 100644
index 0000000000000..2a06191ad13ab
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/BuiltInCondition.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Predicate;
+
+/**
+ * Internal value-comparable wrapper used by all built-in {@link TraitCondition} factories. Equality
+ * is keyed by {@code kind + args}; the {@code impl} predicate is reused but never compared, so two
+ * conditions built from the same factory inputs are equal.
+ *
+ * 

Lives outside {@link TraitCondition} because Java forbids {@code private} nested types in + * interfaces (they are implicitly {@code public static}); top-level package-private gives the same + * encapsulation. + */ +final class BuiltInCondition implements TraitCondition { + + /** Tag identifying which factory produced the condition. */ + enum Kind { + HAS_PARTITION_BY, + ARG_IS_EQUAL_TO, + NOT + } + + private final Kind kind; + private final List args; + private final Predicate impl; + + BuiltInCondition(final Kind kind, final List args, final Predicate impl) { + this.kind = kind; + this.args = args; + this.impl = impl; + } + + @Override + public boolean test(final TraitContext ctx) { + return impl.test(ctx); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (!(o instanceof BuiltInCondition)) { + return false; + } + final BuiltInCondition that = (BuiltInCondition) o; + return kind == that.kind && args.equals(that.args); + } + + @Override + public int hashCode() { + return Objects.hash(kind, args); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java index 3f5c48db8d221..546f30a7f7ea8 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java @@ -31,10 +31,13 @@ import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.EnumSet; +import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Describes an argument in a static signature that is not overloaded and does not support varargs. @@ -57,6 +60,7 @@ public class StaticArgument { private final @Nullable Class conversionClass; private final boolean isOptional; private final EnumSet traits; + private final List conditionalTraits; private StaticArgument( String name, @@ -64,11 +68,22 @@ private StaticArgument( @Nullable Class conversionClass, boolean isOptional, EnumSet traits) { + this(name, dataType, conversionClass, isOptional, traits, List.of()); + } + + private StaticArgument( + String name, + @Nullable DataType dataType, + @Nullable Class conversionClass, + boolean isOptional, + EnumSet traits, + List conditionalTraits) { this.name = Preconditions.checkNotNull(name, "Name must not be null."); this.dataType = dataType; this.conversionClass = conversionClass; this.isOptional = isOptional; this.traits = Preconditions.checkNotNull(traits, "Traits must not be null."); + this.conditionalTraits = conditionalTraits; checkName(); checkTraits(traits); checkOptionalType(); @@ -196,6 +211,84 @@ public boolean is(StaticArgumentTrait trait) { return traits.contains(trait); } + /** + * Context-aware trait check. Evaluates conditional trait rules against the given context to + * determine the effective traits. + */ + public boolean is(StaticArgumentTrait trait, TraitContext ctx) { + return resolveTraits(ctx).contains(trait); + } + + /** + * Returns a new {@link StaticArgument} with an additional conditional trait rule. The trait is + * added to the effective trait set when the condition evaluates to {@code true} at planning + * time. Only non-root traits (subtraits of TABLE, SCALAR, or MODEL) are allowed. + * + *

Multiple conditions for the same trait use OR semantics: the trait is activated if any of + * its conditions is met. + * + *

Example: + * + *

{@code
+     * StaticArgument.table("input", Row.class, false, EnumSet.of(TABLE, SUPPORT_UPDATES))
+     *         .withConditionalTrait(SET_SEMANTIC_TABLE, hasPartitionBy());
+     * }
+ */ + public StaticArgument withConditionalTrait( + final StaticArgumentTrait trait, final TraitCondition condition) { + if (trait.isRoot()) { + throw new IllegalArgumentException( + "Root traits (SCALAR, TABLE, MODEL) cannot be conditional."); + } + final List accumulated = new ArrayList<>(this.conditionalTraits); + accumulated.add(new ConditionalTrait(condition, trait)); + return new StaticArgument(name, dataType, conversionClass, isOptional, traits, accumulated); + } + + /** Whether this argument has conditional trait rules. */ + public boolean hasConditionalTraits() { + return !conditionalTraits.isEmpty(); + } + + /** Whether any conditional trait rule may add the given trait. */ + public boolean hasConditionalTrait(final StaticArgumentTrait trait) { + return conditionalTraits.stream().anyMatch(c -> c.trait == trait); + } + + /** + * Returns a new {@link StaticArgument} with conditional traits resolved against the given + * context. The returned argument has the effective traits baked in and no conditional rules. + */ + public StaticArgument applyConditionalTraits(final TraitContext ctx) { + if (conditionalTraits.isEmpty()) { + return this; + } + return new StaticArgument(name, dataType, conversionClass, isOptional, resolveTraits(ctx)); + } + + /** + * Resolves effective traits by evaluating conditional rules against the context. Returns the + * base traits combined with any conditional traits whose conditions are met. + */ + public EnumSet resolveTraits(final TraitContext ctx) { + if (conditionalTraits.isEmpty()) { + return traits; + } + final EnumSet resolved = EnumSet.copyOf(traits); + for (final ConditionalTrait conditionalTrait : conditionalTraits) { + if (conditionalTrait.condition.test(ctx)) { + removeMutuallyExclusiveTraits(resolved, conditionalTrait.trait); + resolved.add(conditionalTrait.trait); + } + } + return resolved; + } + + private static void removeMutuallyExclusiveTraits( + final EnumSet traits, final StaticArgumentTrait adding) { + traits.removeAll(adding.getIncompatibleWith()); + } + @Override public String toString() { final StringBuilder s = new StringBuilder(); @@ -210,11 +303,13 @@ public String toString() { s.append(dataType); } if (!traits.equals(EnumSet.of(StaticArgumentTrait.SCALAR))) { + final Stream baseTraitNames = + traits.stream().map(Enum::name).map(n -> n.replace('_', ' ')); + final Stream conditionalTraitNames = + conditionalTraits.stream().map(c -> c.trait.name().replace('_', ' ')); s.append(" "); s.append( - traits.stream() - .map(Enum::name) - .map(n -> n.replace('_', ' ')) + Stream.concat(baseTraitNames, conditionalTraitNames) .collect(Collectors.joining(", ", "{", "}"))); } return s.toString(); @@ -233,12 +328,13 @@ public boolean equals(Object o) { && Objects.equals(name, that.name) && Objects.equals(dataType, that.dataType) && Objects.equals(conversionClass, that.conversionClass) - && Objects.equals(traits, that.traits); + && Objects.equals(traits, that.traits) + && Objects.equals(conditionalTraits, that.conditionalTraits); } @Override public int hashCode() { - return Objects.hash(name, dataType, conversionClass, isOptional, traits); + return Objects.hash(name, dataType, conversionClass, isOptional, traits, conditionalTraits); } private void checkName() { @@ -354,4 +450,32 @@ private void checkModelNotOptional() { throw new ValidationException("Model arguments must not be optional."); } } + + /** A trait that is conditionally added based on a {@link TraitCondition}. */ + private static final class ConditionalTrait { + private final TraitCondition condition; + private final StaticArgumentTrait trait; + + ConditionalTrait(final TraitCondition condition, final StaticArgumentTrait trait) { + this.condition = condition; + this.trait = trait; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final ConditionalTrait that = (ConditionalTrait) o; + return Objects.equals(condition, that.condition) && trait == that.trait; + } + + @Override + public int hashCode() { + return Objects.hash(condition, trait); + } + } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java index 647248ada2df4..7b0083ed2dcac 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java @@ -21,6 +21,8 @@ import org.apache.flink.annotation.PublicEvolving; import java.util.Arrays; +import java.util.Collections; +import java.util.EnumSet; import java.util.Set; import java.util.stream.Collectors; @@ -51,6 +53,8 @@ public enum StaticArgumentTrait { REQUIRE_UPDATE_BEFORE(SUPPORT_UPDATES), REQUIRE_FULL_DELETE(SUPPORT_UPDATES); + private static final Set ROOTS = EnumSet.of(SCALAR, TABLE, MODEL); + private final Set requirements; StaticArgumentTrait(StaticArgumentTrait... requirements) { @@ -60,4 +64,24 @@ public enum StaticArgumentTrait { public Set getRequirements() { return requirements; } + + /** Whether this trait is one of the top-level roots (SCALAR, TABLE, MODEL). */ + public boolean isRoot() { + return ROOTS.contains(this); + } + + /** + * Returns the traits that are mutually exclusive with this one. Adding this trait to a set + * implies removing all returned traits. Empty by default. + */ + public Set getIncompatibleWith() { + switch (this) { + case SET_SEMANTIC_TABLE: + return Collections.singleton(ROW_SEMANTIC_TABLE); + case ROW_SEMANTIC_TABLE: + return Collections.singleton(SET_SEMANTIC_TABLE); + default: + return Collections.emptySet(); + } + } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java index 92fd79f407b67..c4c95a09127b2 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java @@ -182,6 +182,29 @@ private static void checkReservedArgs(List staticArgs) { } } + /** + * Resolves conditional traits (see {@link StaticArgument#withConditionalTrait}) on every static + * arg using the call's semantics and operands. Called once at the top of {@link + * SystemInputStrategy#inferInputTypes} and {@link SystemOutputStrategy#inferType}; downstream + * helpers receive the resolved list and iterate it directly. + */ + private static List resolveStaticArgs( + final CallContext callContext, final List staticArgs) { + return IntStream.range(0, staticArgs.size()) + .mapToObj( + pos -> { + final StaticArgument arg = staticArgs.get(pos); + if (!arg.hasConditionalTraits()) { + return arg; + } + final TableSemantics semantics = + callContext.getTableSemantics(pos).orElse(null); + return arg.applyConditionalTraits( + TraitContext.of(semantics, callContext, staticArgs)); + }) + .collect(Collectors.toList()); + } + private static void checkMultipleTableArgs(List staticArgs) { final List tableArgs = staticArgs.stream() @@ -262,6 +285,10 @@ public Optional inferType(CallContext callContext) { return origin.inferType(callContext) .map( functionDataType -> { + // Resolve once so all helpers see the same effective signature + // (PARTITION BY / scalar literals applied to conditional traits). + final List resolvedArgs = + resolveStaticArgs(callContext, staticArgs); final List fields = new ArrayList<>(); // According to the SQL standard, pass-through columns should @@ -273,11 +300,11 @@ public Optional inferType(CallContext callContext) { // - Flink SESSION windows add pass-through columns at the beginning // - Oracle adds pass-through columns for all ROW semantics args, so // this whole topic is kind of vendor specific already - fields.addAll(derivePassThroughFields(callContext)); + fields.addAll(derivePassThroughFields(callContext, resolvedArgs)); fields.addAll(deriveFunctionOutputFields(functionDataType)); if (!disableSystemArgs) { - fields.addAll(deriveRowtimeField(callContext)); + fields.addAll(deriveRowtimeField(callContext, resolvedArgs)); } final List uniqueFields = makeFieldNamesUnique(fields); @@ -303,7 +330,8 @@ private List makeFieldNamesUnique(List fields) { .collect(Collectors.toList()); } - private List derivePassThroughFields(CallContext callContext) { + private List derivePassThroughFields( + CallContext callContext, List staticArgs) { if (functionKind != FunctionKind.PROCESS_TABLE) { return List.of(); } @@ -349,7 +377,8 @@ private List deriveFunctionOutputFields(DataType functionDataType) { .collect(Collectors.toList()); } - private List deriveRowtimeField(CallContext callContext) { + private List deriveRowtimeField( + CallContext callContext, List staticArgs) { if (this.functionKind != FunctionKind.PROCESS_TABLE) { return List.of(); } @@ -566,8 +595,10 @@ public Optional> inferInputTypes( + "that is not overloaded and doesn't contain varargs."); } + // Resolve once so the rest of validation iterates the effective signature. + final List resolvedArgs = resolveStaticArgs(callContext, staticArgs); try { - checkTableArgs(staticArgs, callContext); + checkTableArgs(resolvedArgs, callContext); if (!disableSystemArgs) { checkUidArg(callContext); } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java new file mode 100644 index 0000000000000..93bdc5c8ce479 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.List; + +/** + * A condition that determines whether a conditional trait on a {@link StaticArgument} should be + * active for a given call. + * + *

Conditions are evaluated at planning time using the {@link TraitContext} which provides access + * to the SQL call's properties (PARTITION BY presence, scalar literal values, etc.). + * + *

Implementations must implement {@code hashCode} and {@code equals} for {@link + * StaticArgument#equals}/{@link StaticArgument#hashCode} to work correctly. The built-in factories + * below return value-comparable instances; user-supplied lambdas do not - prefer the factories. + * + *

{@code
+ * import static org.apache.flink.table.types.inference.TraitCondition.*;
+ *
+ * StaticArgument.table("input", Row.class, false, EnumSet.of(TABLE, SUPPORT_UPDATES))
+ *         .withConditionalTrait(SET_SEMANTIC_TABLE, hasPartitionBy());
+ * }
+ */ +@PublicEvolving +@FunctionalInterface +public interface TraitCondition { + + /** Evaluates this condition against the given context. */ + boolean test(TraitContext ctx); + + /** True when PARTITION BY is provided on the table argument. */ + static TraitCondition hasPartitionBy() { + return new BuiltInCondition( + BuiltInCondition.Kind.HAS_PARTITION_BY, List.of(), TraitContext::hasPartitionBy); + } + + /** True when the named scalar argument equals the expected value. */ + @SuppressWarnings("unchecked") + static TraitCondition argIsEqualTo(final String name, final T expected) { + final Class clazz = (Class) expected.getClass(); + return new BuiltInCondition( + BuiltInCondition.Kind.ARG_IS_EQUAL_TO, + List.of(name, expected), + ctx -> ctx.getScalarArgument(name, clazz).map(expected::equals).orElse(false)); + } + + /** Negates the given condition. */ + static TraitCondition not(final TraitCondition condition) { + return new BuiltInCondition( + BuiltInCondition.Kind.NOT, List.of(condition), ctx -> !condition.test(ctx)); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitContext.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitContext.java new file mode 100644 index 0000000000000..e58a4b36dceea --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitContext.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.functions.TableSemantics; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Optional; + +/** + * Read-only context provided to {@link TraitCondition} during trait resolution at planning time. + * + *

Allows conditions to inspect the SQL call (e.g., whether PARTITION BY was provided, or what + * value a scalar argument has) to decide whether a conditional trait should be active. + */ +@PublicEvolving +public interface TraitContext { + + /** Whether PARTITION BY was provided on this table argument. */ + boolean hasPartitionBy(); + + /** + * Reads a scalar argument value by name. + * + * @return the argument value, or empty if the argument was not provided, is not a literal, or + * cannot be converted to the requested type + */ + Optional getScalarArgument(String name, Class clazz); + + /** + * Builds a {@link TraitContext} from validation-time inputs. + * + *

Used by {@code SystemTypeInference} when wrapping a function's strategies. Planner-side + * code that has a {@code RexCall} should use the planner adapter in {@code BridgingSqlFunction} + * instead. + */ + static TraitContext of( + @Nullable final TableSemantics semantics, + final CallContext callContext, + final List staticArgs) { + return new TraitContext() { + @Override + public boolean hasPartitionBy() { + return semantics != null && semantics.partitionByColumns().length > 0; + } + + @Override + public Optional getScalarArgument(final String name, final Class clazz) { + for (int i = 0; i < staticArgs.size(); i++) { + final StaticArgument arg = staticArgs.get(i); + if (arg.is(StaticArgumentTrait.SCALAR) && arg.getName().equals(name)) { + if (!callContext.isArgumentLiteral(i)) { + return Optional.empty(); + } + return callContext.getArgumentValue(i, clazz); + } + } + return Optional.empty(); + } + }; + } +} diff --git a/flink-table/flink-table-planner/AGENTS.md b/flink-table/flink-table-planner/AGENTS.md index 969c5b366569d..8b45353ecbf6f 100644 --- a/flink-table/flink-table-planner/AGENTS.md +++ b/flink-table/flink-table-planner/AGENTS.md @@ -106,6 +106,22 @@ When bumping an ExecNode version, update the `@ExecNodeMetadata` annotation's `v New features often introduce `ExecutionConfigOptions` entries (in `flink-table-api-java`) for runtime tunability (e.g., cache sizes, timeouts, batch sizes). +### PTF conditional traits + +A *conditional trait* lets a PTF's table-argument traits depend on the call site instead of being fixed at declaration. Example for `TO_CHANGELOG`: the `input` argument is row-semantic by default (single stream, no PARTITION BY), but switches to set-semantic when the user writes `PARTITION BY` so the runtime can co-locate state per key. One declaration, two effective signatures depending on the call. + +**Declaration.** Built-in functions add conditional rules in `BuiltInFunctionDefinitions` via `StaticArgument.withConditionalTrait(trait, condition)`. The condition (a `TraitCondition`) is a small value-comparable predicate evaluated against a `TraitContext`. Built-in factories live on `TraitCondition` (`hasPartitionBy()`, `argIsEqualTo(name, value)`, `not(c)`); under the hood they wrap into the package-private `BuiltInCondition` so equality cascades correctly through `StaticArgument.equals`. + +**Evaluation.** A `TraitCondition` reads two things: whether `PARTITION BY` is present on this table arg, and the literal value of named scalar args. Both come through `TraitContext`. There are two factories: `TraitContext.of(TableSemantics, CallContext, declared)` for the validation side (called from `SystemTypeInference.resolveStaticArgs`) and a planner-side adapter inside `BridgingSqlFunction.buildTraitContext` that sources the same data from a `RexCall` + `RexTableArgCall`. Same logical context, different inputs because the two layers don't share types. + +**Resolution.** Three call sites bake conditional traits into the operator's effective signature: + +1. **Validation** — `SystemTypeInference.resolveStaticArgs` runs once each from `inferInputTypes` and `inferType`. Twice per validation pass; can't dedupe across Calcite hooks because each gets a different `CallContext` instance. +2. **Planning** — `BridgingSqlFunction.resolveCallTraits` is called from `FlinkLogicalTableFunctionScan.Converter.convert`. It rewrites the operator on the `RexCall` so all downstream readers see the resolved view via plain `function.getTypeInference().getStaticArguments()`. +3. **Compiled-plan restore** — `BridgingSqlFunction.resolveCallTraits` is called again from `StreamExecProcessTableFunction.@JsonCreator`, because the JSON path skips the logical converter. Without this hook, restore would silently produce wrong results for any conditional-trait PTF. + +The payoff: downstream rules, exec nodes, codegen, and changelog inference all use ordinary `staticArg.is(SET_SEMANTIC_TABLE)` checks. No consumer needs to know that conditional traits exist. Why three sites and not one. The three resolution points exist because they sit in different lifecycles that can't share state. + ## Testing Patterns Choose test types based on what you're changing: diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java index 1cad3c7ec86dc..059fcd6184d69 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.table.catalog.ContextResolvedFunction; import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.functions.BuiltInFunctionDefinition; import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.functions.FunctionIdentifier; @@ -30,17 +31,25 @@ import org.apache.flink.table.planner.calcite.FlinkRelBuilder; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.calcite.RexFactory; +import org.apache.flink.table.planner.calcite.RexTableArgCall; +import org.apache.flink.table.planner.functions.inference.OperatorBindingCallContext; import org.apache.flink.table.planner.utils.ShortcutUtils; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.CallContext; import org.apache.flink.table.types.inference.StaticArgument; import org.apache.flink.table.types.inference.StaticArgumentTrait; import org.apache.flink.table.types.inference.SystemTypeInference; +import org.apache.flink.table.types.inference.TraitContext; import org.apache.flink.table.types.inference.TypeInference; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rel.type.StructKind; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexCallBinding; +import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlFunction; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlOperator; @@ -50,8 +59,11 @@ import org.apache.calcite.tools.RelBuilder; import org.checkerframework.checker.nullness.qual.Nullable; +import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.apache.flink.table.planner.functions.bridging.BridgingUtils.createName; import static org.apache.flink.table.planner.functions.bridging.BridgingUtils.createSqlFunctionCategory; @@ -216,6 +228,152 @@ public boolean isDeterministic() { return resolvedFunction.getDefinition().isDeterministic(); } + // -------------------------------------------------------------------------------------------- + // Conditional trait resolution + // -------------------------------------------------------------------------------------------- + + /** + * Rewrites {@code call} so that the operator's {@link StaticArgument}s have any conditional + * traits (see {@link StaticArgument#withConditionalTrait}) applied against the call site + * (PARTITION BY, scalar literals). Downstream consumers can then treat the operator's static + * arguments as the effective signature and use plain {@code arg.is(SET_SEMANTIC_TABLE)} checks. + * + *

Called from the two places where a planner-level {@link RexCall} for a PTF is first built + * for downstream consumption: {@code FlinkLogicalTableFunctionScan} converter (fresh planning) + * and {@code StreamExecProcessTableFunction.@JsonCreator} (compiled-plan restore). A no-op for + * non-PTF calls and for PTFs that declare no conditional traits. + */ + public static RexCall resolveCallTraits(RexCall call) { + if (!(call.getOperator() instanceof BridgingSqlFunction)) { + return call; + } + final BridgingSqlFunction function = (BridgingSqlFunction) call.getOperator(); + final List declared = + function.typeInference.getStaticArguments().orElse(null); + if (declared == null || declared.stream().noneMatch(StaticArgument::hasConditionalTraits)) { + return call; + } + final List operands = call.getOperands(); + final CallContext callContext = function.toCallContext(call); + final List resolved = + IntStream.range(0, declared.size()) + .mapToObj( + i -> + resolveArg( + declared.get(i), + declared, + operands, + i, + callContext)) + .collect(Collectors.toList()); + if (resolved.equals(declared)) { + return call; + } + final BridgingSqlFunction rewritten = function.withStaticArguments(resolved); + // Use a fresh RexBuilder from the function's own type factory so this can run from a + // Jackson @JsonCreator that has no planner context. + return (RexCall) + new RexBuilder(function.typeFactory).makeCall(call.getType(), rewritten, operands); + } + + private static StaticArgument resolveArg( + StaticArgument declaredArg, + List declared, + List operands, + int index, + CallContext callContext) { + // We only resolve conditional traits for the Table Argument with conditional traits + if (!declaredArg.hasConditionalTraits() + || !(operands.get(index) instanceof RexTableArgCall)) { + return declaredArg; + } + return declaredArg.applyConditionalTraits( + buildTraitContext((RexTableArgCall) operands.get(index), declared, callContext)); + } + + /** + * Planner-side adapter to {@link TraitContext}. Sourced from a {@link RexCall}: PARTITION BY + * via the {@link RexTableArgCall} operand, scalar literals via the {@link CallContext} wrapper. + * Equivalent to {@link TraitContext#of} but takes its inputs from a planner-side call instead + * of validation-side {@link org.apache.flink.table.functions.TableSemantics}. + */ + private static TraitContext buildTraitContext( + RexTableArgCall tableArgCall, List declared, CallContext callContext) { + return new TraitContext() { + @Override + public boolean hasPartitionBy() { + return tableArgCall.getPartitionKeys().length > 0; + } + + @Override + public Optional getScalarArgument(String name, Class clazz) { + for (int i = 0; i < declared.size(); i++) { + final StaticArgument arg = declared.get(i); + if (arg.is(StaticArgumentTrait.SCALAR) && arg.getName().equals(name)) { + return callContext.getArgumentValue(i, clazz); + } + } + return Optional.empty(); + } + }; + } + + /** + * Builds a {@link CallContext} from the given {@link RexCall} for this function. Wraps the call + * in an {@link OperatorBindingCallContext} so consumers (trait resolution, codegen, etc.) read + * scalar arguments through the same coercion path as validation. + */ + public CallContext toCallContext(RexCall call) { + return toCallContext(call, null, null, null); + } + + /** + * Variant of {@link #toCallContext(RexCall)} that additionally exposes the call's input time + * columns and changelog modes - needed by the streaming codegen path so PTFs can specialize + * themselves to the exact call. + */ + public CallContext toCallContext( + RexCall call, + @Nullable List inputTimeColumns, + @Nullable List inputChangelogModes, + @Nullable ChangelogMode outputChangelogMode) { + return new OperatorBindingCallContext( + dataTypeFactory, + getDefinition(), + RexCallBinding.create(typeFactory, call, Collections.emptyList()), + call.getType(), + inputTimeColumns, + inputChangelogModes, + outputChangelogMode); + } + + /** + * Returns a copy of this function whose {@link TypeInference} reports the given static + * arguments. The wrapped input/output strategies are reused unchanged - they ran at validation + * time and aren't invoked again afterwards. + */ + private BridgingSqlFunction withStaticArguments(List staticArguments) { + final TypeInference rewritten = + TypeInference.newBuilder() + .staticArguments(staticArguments) + .inputTypeStrategy(typeInference.getInputTypeStrategy()) + .stateTypeStrategies(typeInference.getStateTypeStrategies()) + .outputTypeStrategy(typeInference.getOutputTypeStrategy()) + .disableSystemArguments(typeInference.disableSystemArguments()) + .build(); + if (this instanceof WithTableFunction) { + return new WithTableFunction( + dataTypeFactory, + typeFactory, + rexFactory, + getKind(), + resolvedFunction, + rewritten); + } + return new BridgingSqlFunction( + dataTypeFactory, typeFactory, rexFactory, getKind(), resolvedFunction, rewritten); + } + // -------------------------------------------------------------------------------------------- // Table function extension // -------------------------------------------------------------------------------------------- @@ -273,10 +431,12 @@ public SqlReturnTypeInference getRowTypeInference() { } final StaticArgument arg = args.get(ordinal); final TableCharacteristic.Semantics semantics; - if (arg.is(StaticArgumentTrait.ROW_SEMANTIC_TABLE)) { - semantics = TableCharacteristic.Semantics.ROW; - } else if (arg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE)) { + // Report SET semantics if it may apply - which allows the use of PARTITION BY + if (arg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE) + || arg.hasConditionalTrait(StaticArgumentTrait.SET_SEMANTIC_TABLE)) { semantics = TableCharacteristic.Semantics.SET; + } else if (arg.is(StaticArgumentTrait.ROW_SEMANTIC_TABLE)) { + semantics = TableCharacteristic.Semantics.ROW; } else { return null; } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java index 12a20833068c8..3973329af7484 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java @@ -38,6 +38,7 @@ import org.apache.flink.table.planner.codegen.ProcessTableRunnerGenerator; import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator; import org.apache.flink.table.planner.delegation.PlannerBase; +import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction; import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase; import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig; import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; @@ -157,7 +158,10 @@ public StreamExecProcessTableFunction( @JsonProperty(FIELD_NAME_OUTPUT_CHANGELOG_MODE) ChangelogMode outputChangelogMode) { super(id, context, persistedConfig, inputProperties, outputType, description); this.uid = uid; - this.invocation = (RexCall) invocation; + // Mirror the FlinkLogicalTableFunctionScan converter for the compiled-plan restore path: + // bake StaticArgument#withConditionalTrait rules into the operator's static args so + // downstream code can use plain arg.is(SET_SEMANTIC_TABLE) checks. + this.invocation = BridgingSqlFunction.resolveCallTraits((RexCall) invocation); this.inputChangelogModes = inputChangelogModes; this.outputChangelogMode = outputChangelogMode; } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.java index d478b3afcf072..17473ab19a7d7 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.java @@ -22,6 +22,7 @@ import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.functions.FunctionKind; import org.apache.flink.table.functions.TemporalTableFunction; +import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction; import org.apache.flink.table.planner.plan.nodes.FlinkConventions; import org.apache.flink.table.planner.utils.ShortcutUtils; @@ -115,7 +116,14 @@ public boolean matches(RelOptRuleCall call) { functionScan.getInputs().stream() .map(input -> RelOptRule.convert(input, FlinkConventions.LOGICAL())) .collect(Collectors.toList()); - final RexCall rexCall = (RexCall) functionScan.getCall(); + + // Resolve any StaticArgument#withConditionalTrait rules on the operator against this + // call site (PARTITION BY, scalar literals). After this rewrite, downstream code sees a + // BridgingSqlFunction whose getStaticArguments() reports the effective signature, so + // simple staticArg.is(SET_SEMANTIC_TABLE) checks suffice. + final RexCall rexCall = + BridgingSqlFunction.resolveCallTraits((RexCall) functionScan.getCall()); + return new FlinkLogicalTableFunctionScan( functionScan.getCluster(), traitSet, diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java index 754fe1b328492..56a2bf1f6fcab 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java @@ -22,13 +22,11 @@ import org.apache.flink.table.api.config.OptimizerConfigOptions; import org.apache.flink.table.catalog.ContextResolvedFunction; import org.apache.flink.table.connector.ChangelogMode; -import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.functions.FunctionIdentifier; import org.apache.flink.table.functions.ProcessTableFunction; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.calcite.RexTableArgCall; import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction; -import org.apache.flink.table.planner.functions.inference.OperatorBindingCallContext; import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecProcessTableFunction; @@ -36,7 +34,6 @@ import org.apache.flink.table.planner.plan.utils.ChangelogPlanUtils; import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; import org.apache.flink.table.planner.utils.ShortcutUtils; -import org.apache.flink.table.types.inference.CallContext; import org.apache.flink.table.types.inference.StaticArgument; import org.apache.flink.table.types.inference.StaticArgumentTrait; import org.apache.flink.table.types.inference.SystemTypeInference; @@ -56,7 +53,6 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexCall; -import org.apache.calcite.rex.RexCallBinding; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexUtil; @@ -66,7 +62,6 @@ import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Objects; @@ -512,22 +507,4 @@ public static Set toPartitionColumns(RexCall call) { } return ImmutableSet.copyOf(partitionColumnsPerArg); } - - public static CallContext toCallContext( - RexCall udfCall, - List inputTimeColumns, - List inputChangelogModes, - @Nullable ChangelogMode outputChangelogMode) { - final BridgingSqlFunction function = ShortcutUtils.unwrapBridgingSqlFunction(udfCall); - assert function != null; - final FunctionDefinition definition = ShortcutUtils.unwrapFunctionDefinition(udfCall); - return new OperatorBindingCallContext( - function.getDataTypeFactory(), - definition, - RexCallBinding.create(function.getTypeFactory(), udfCall, Collections.emptyList()), - udfCall.getType(), - inputTimeColumns, - inputChangelogModes, - outputChangelogMode); - } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala index 4f1d8d65d419f..52df803d5c8f8 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala @@ -34,7 +34,6 @@ import org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil import org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil.{verifyFunctionAwareOutputType, DefaultExpressionEvaluatorFactory} import org.apache.flink.table.planner.delegation.PlannerBase import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction -import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalProcessTableFunction import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala import org.apache.flink.table.runtime.dataview.DataViewUtils import org.apache.flink.table.runtime.dataview.StateListView.KeyedStateListView @@ -77,11 +76,8 @@ object ProcessTableRunnerGenerator { // For specialized functions, this call context is able to provide the final changelog modes. // Thus, functions can reconfigure themselves for the exact use case. // Including updating their state layout. - val callContext = StreamPhysicalProcessTableFunction.toCallContext( - udfCall, - inputTimeColumns, - inputChangelogModes, - outputChangelogMode) + val callContext = + function.toCallContext(udfCall, inputTimeColumns, inputChangelogModes, outputChangelogMode) // Create the final UDF for runtime val udf = UserDefinedFunctionHelper.createSpecializedFunction( diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala index b6767e68ac6e0..6839cc1c28c13 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala @@ -26,6 +26,7 @@ import org.apache.flink.table.connector.ChangelogMode import org.apache.flink.table.functions.{BuiltInFunctionDefinition, ChangelogFunction} import org.apache.flink.table.functions.ChangelogFunction.ChangelogContext import org.apache.flink.table.planner.calcite.{FlinkTypeFactory, RexTableArgCall} +import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction import org.apache.flink.table.planner.plan.`trait`._ import org.apache.flink.table.planner.plan.`trait`.DeleteKindTrait.{deleteOnKeyOrNone, fullDeleteOrNone, DELETE_BY_KEY} import org.apache.flink.table.planner.plan.`trait`.UpdateKindTrait.{beforeAfterOrNone, onlyAfterOrNone, BEFORE_AND_AFTER, ONLY_UPDATE_AFTER} @@ -852,10 +853,10 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti !modifyKindSet.isInsertOnly && tableArg.is( StaticArgumentTrait.SUPPORT_UPDATES) ) { - if (isPtfUpsert(tableArg, tableArgCall, child)) { - UpdateKindTrait.ONLY_UPDATE_AFTER - } else { + if (ptfRequiresUpdateBefore(tableArg, tableArgCall, child)) { UpdateKindTrait.BEFORE_AND_AFTER + } else { + UpdateKindTrait.ONLY_UPDATE_AFTER } } else { UpdateKindTrait.NONE @@ -1272,7 +1273,7 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti extractPtfTableArgComponents(process, child, inputArg) if ( tableArg.is(StaticArgumentTrait.SUPPORT_UPDATES) - && isPtfUpsert(tableArg, tableArgCall, child) + && !ptfRequiresUpdateBefore(tableArg, tableArgCall, child) && !tableArg.is(StaticArgumentTrait.REQUIRE_FULL_DELETE) ) { this @@ -1640,21 +1641,20 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti modeBuilder.build() } - private def isPtfUpsert( + /** + * Whether the PTF requires UPDATE_BEFORE from its input. Returns true unless partition keys cover + * the upsert keys (co-located) and the argument doesn't explicitly require UPDATE_BEFORE. + */ + private def ptfRequiresUpdateBefore( tableArg: StaticArgument, tableArgCall: RexTableArgCall, input: StreamPhysicalRel): Boolean = { val partitionKeys = ImmutableBitSet.of(tableArgCall.getPartitionKeys: _*) val fmq = FlinkRelMetadataQuery.reuseOrCreate(input.getCluster.getMetadataQuery) val upsertKeys = fmq.getUpsertKeys(input) - if ( - upsertKeys == null || partitionKeys.isEmpty || !upsertKeys.contains(partitionKeys) - || tableArg.is(StaticArgumentTrait.REQUIRE_UPDATE_BEFORE) - ) { - false - } else { - true - } + upsertKeys == null || partitionKeys.isEmpty || + !upsertKeys.contains(partitionKeys) || + tableArg.is(StaticArgumentTrait.REQUIRE_UPDATE_BEFORE) } private def extractPtfTableArgComponents( @@ -1674,11 +1674,9 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti outputChangelogMode: ChangelogMode): ChangelogContext = { val udfCall = StreamPhysicalProcessTableFunction.toUdfCall(process.getCall) val inputTimeColumns = StreamPhysicalProcessTableFunction.toInputTimeColumns(process.getCall) - val callContext = StreamPhysicalProcessTableFunction.toCallContext( - udfCall, - inputTimeColumns, - inputChangelogModes, - outputChangelogMode) + val function = udfCall.getOperator.asInstanceOf[BridgingSqlFunction] + val callContext = + function.toCallContext(udfCall, inputTimeColumns, inputChangelogModes, outputChangelogMode) // Expose a simplified context to let users focus on important characteristics. // If necessary, we can expose the full CallContext in the future. diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java index 4643a51be52e0..dc62330b257da 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java @@ -42,6 +42,7 @@ public List programs() { FromChangelogTestPrograms.DEFAULT_OP_MAPPING, FromChangelogTestPrograms.CUSTOM_OP_MAPPING, FromChangelogTestPrograms.CUSTOM_OP_NAME, + FromChangelogTestPrograms.SET_SEMANTICS_PARTITION_BY, FromChangelogTestPrograms.SKIP_INVALID_OP_HANDLING, FromChangelogTestPrograms.SKIP_NULL_OP_CODE, FromChangelogTestPrograms.TABLE_API_DEFAULT, diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java index 1858aabeed019..2cd785a46b695 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java @@ -172,6 +172,44 @@ public class FromChangelogTestPrograms { + "op => DESCRIPTOR(operation))") .build(); + // -------------------------------------------------------------------------------------------- + // Set semantics with PARTITION BY + // -------------------------------------------------------------------------------------------- + + /** + * Verifies that {@code FROM_CHANGELOG(TABLE t PARTITION BY id)} produces the same logical + * output as the row-semantic call. The conditional {@code SET_SEMANTIC_TABLE} trait switches + * the execution to a co-located parallel mode but must not change row-level semantics. + */ + public static final TableTestProgram SET_SEMANTICS_PARTITION_BY = + TableTestProgram.of( + "from-changelog-set-semantics-partition-by", + "PARTITION BY enables set semantics without altering output rows") + .setupTableSource( + SourceTestStep.newBuilder("cdc_stream") + .addSchema(SIMPLE_CDC_SCHEMA) + .producedValues( + Row.of(1, "INSERT", "Alice"), + Row.of(2, "INSERT", "Bob"), + Row.of(1, "UPDATE_BEFORE", "Alice"), + Row.of(1, "UPDATE_AFTER", "Alice2"), + Row.of(2, "DELETE", "Bob")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("id INT", "name STRING") + .consumedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.INSERT, 2, "Bob"), + Row.ofKind(RowKind.UPDATE_BEFORE, 1, "Alice"), + Row.ofKind(RowKind.UPDATE_AFTER, 1, "Alice2"), + Row.ofKind(RowKind.DELETE, 2, "Bob")) + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM FROM_CHANGELOG(" + + "input => TABLE cdc_stream PARTITION BY id)") + .build(); + // -------------------------------------------------------------------------------------------- // Table API test // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java index ba2c1a5690cb6..392abda3cca4d 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java @@ -75,4 +75,18 @@ void testCustomOpMapping() { + "error_handling => 'SKIP')", CHANGELOG_MODE); } + + @Test + void testSetSemanticsWithPartitionBy() { + util.tableEnv() + .executeSql( + "CREATE TABLE cdc_stream (" + + " id INT," + + " op STRING," + + " name STRING" + + ") WITH ('connector' = 'values')"); + util.verifyRelPlan( + "SELECT * FROM FROM_CHANGELOG(input => TABLE cdc_stream PARTITION BY id)", + CHANGELOG_MODE); + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java index ee9576b2e758c..e98e28fc53af9 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java @@ -87,4 +87,21 @@ void testInsertOnlySource() { util.verifyRelPlan( "SELECT * FROM TO_CHANGELOG(input => TABLE insert_only_source)", CHANGELOG_MODE); } + + @Test + void testSetSemanticsWithPartitionBy() { + util.tableEnv() + .executeSql( + "CREATE TABLE retract_source (" + + " id INT," + + " name STRING," + + " PRIMARY KEY (id) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'values'," + + " 'changelog-mode' = 'I,UB,UA,D'" + + ")"); + util.verifyRelPlan( + "SELECT * FROM TO_CHANGELOG(input => TABLE retract_source PARTITION BY id)", + CHANGELOG_MODE); + } } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml index 614eb5456b9aa..a1ab91c2998fc 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml @@ -51,6 +51,26 @@ LogicalProject(id=[$0], name=[$1]) + + + + + TABLE cdc_stream PARTITION BY id)]]> + + + + + + diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml index 91c73153bee03..77133f4fe419c 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml @@ -51,6 +51,26 @@ LogicalProject(op=[$0], id=[$1], name=[$2]) + + + + + TABLE retract_source PARTITION BY id)]]> + + + + + +