From c1e883e170ffd4aed8f4927936bb29ece75bbd06 Mon Sep 17 00:00:00 2001 From: jkukreja Date: Wed, 4 Mar 2026 11:03:55 -0500 Subject: [PATCH 1/2] [flink] Implement FLIP-314 LineageVertexProvider for source and sink connectors --- .../flink/PaimonDataStreamScanProvider.java | 17 ++ .../flink/PaimonDataStreamSinkProvider.java | 14 ++ .../flink/sink/FlinkFormatTableSink.java | 6 +- .../paimon/flink/sink/FlinkTableSinkBase.java | 14 +- .../flink/source/BaseDataTableSource.java | 7 +- .../flink/source/SystemTableSource.java | 6 +- .../flink/source/DataTableSourceTest.java | 12 +- .../lineage/DataStreamProviderFactory.java | 40 +++ paimon-flink/paimon-flink2-common/pom.xml | 6 + .../lineage/DataStreamProviderFactory.java | 117 +++++++++ .../paimon/flink/lineage/LineageUtils.java | 98 ++++++++ .../flink/lineage/PaimonLineageDataset.java | 72 ++++++ .../lineage/PaimonSinkLineageVertex.java | 39 +++ .../lineage/PaimonSourceLineageVertex.java | 50 ++++ .../flink/lineage/LineageUtilsTest.java | 228 ++++++++++++++++++ 15 files changed, 709 insertions(+), 17 deletions(-) create mode 100644 paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/lineage/DataStreamProviderFactory.java create mode 100644 paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/DataStreamProviderFactory.java create mode 100644 paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java create mode 100644 paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/PaimonLineageDataset.java create mode 100644 paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSinkLineageVertex.java create mode 100644 paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSourceLineageVertex.java create mode 100644 paimon-flink/paimon-flink2-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java index af9df5b9338c..163bb8e6ec84 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java @@ -18,10 +18,14 @@ package org.apache.paimon.flink; +import org.apache.paimon.flink.lineage.DataStreamProviderFactory; +import org.apache.paimon.table.Table; + import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.connector.ProviderContext; import org.apache.flink.table.connector.source.DataStreamScanProvider; +import org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider; import org.apache.flink.table.data.RowData; import java.util.function.Function; @@ -38,6 +42,19 @@ public PaimonDataStreamScanProvider( this.producer = producer; } + /** + * Creates a {@link ScanRuntimeProvider} that may be enriched with lineage metadata when running + * on a Flink version that supports it. + */ + public static ScanRuntimeProvider createProvider( + boolean isBounded, + Function> producer, + String name, + Table table) { + return DataStreamProviderFactory.getScanProvider( + new PaimonDataStreamScanProvider(isBounded, producer), name, table); + } + @Override public DataStream produceDataStream( ProviderContext context, StreamExecutionEnvironment env) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java index 05eaacf5ab14..7516e9b5b937 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java @@ -18,10 +18,14 @@ package org.apache.paimon.flink; +import org.apache.paimon.flink.lineage.DataStreamProviderFactory; +import org.apache.paimon.table.Table; + import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.table.connector.ProviderContext; import org.apache.flink.table.connector.sink.DataStreamSinkProvider; +import org.apache.flink.table.connector.sink.DynamicTableSink.SinkRuntimeProvider; import org.apache.flink.table.data.RowData; import java.util.function.Function; @@ -35,6 +39,16 @@ public PaimonDataStreamSinkProvider(Function, DataStreamSink this.producer = producer; } + /** + * Creates a {@link SinkRuntimeProvider} that may be enriched with lineage metadata when running + * on a Flink version that supports it. + */ + public static SinkRuntimeProvider createProvider( + Function, DataStreamSink> producer, String name, Table table) { + return DataStreamProviderFactory.getSinkProvider( + new PaimonDataStreamSinkProvider(producer), name, table); + } + @Override public DataStreamSink consumeDataStream( ProviderContext providerContext, DataStream dataStream) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableSink.java index 361323f016ff..695730b46d24 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableSink.java @@ -57,10 +57,12 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { @Override public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { - return new PaimonDataStreamSinkProvider( + return PaimonDataStreamSinkProvider.createProvider( (dataStream) -> new FlinkFormatTableDataStreamSink(table, overwrite, staticPartitions) - .sinkFrom(dataStream)); + .sinkFrom(dataStream), + tableIdentifier.asSummaryString(), + table); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java index b7c37d525eff..1430009ecf21 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java @@ -104,18 +104,22 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { throw new UnsupportedOperationException( "Paimon doesn't support streaming INSERT OVERWRITE."); } + String name = tableIdentifier.asSummaryString(); + if (table instanceof FormatTable) { FormatTable formatTable = (FormatTable) table; - return new PaimonDataStreamSinkProvider( + return PaimonDataStreamSinkProvider.createProvider( (dataStream) -> new FlinkFormatTableDataStreamSink( formatTable, overwrite, staticPartitions) - .sinkFrom(dataStream)); + .sinkFrom(dataStream), + name, + table); } Options conf = Options.fromMap(table.options()); // Do not sink to log store when overwrite mode - return new PaimonDataStreamSinkProvider( + return PaimonDataStreamSinkProvider.createProvider( (dataStream) -> { FlinkSinkBuilder builder = createSinkBuilder(); builder.forRowData( @@ -134,7 +138,9 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { } conf.getOptional(SINK_PARALLELISM).ifPresent(builder::parallelism); return builder.build(); - }); + }, + name, + table); } protected FlinkSinkBuilder createSinkBuilder() { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java index 8e1ecfc9556e..b0b5c2d296c1 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java @@ -195,14 +195,15 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .limit(limit) .watermarkStrategy(watermarkStrategy) .dynamicPartitionFilteringFields(dynamicPartitionFilteringFields()); - - return new PaimonDataStreamScanProvider( + return PaimonDataStreamScanProvider.createProvider( !unbounded, env -> sourceBuilder .sourceParallelism(inferSourceParallelism(env)) .env(env) - .build()); + .build(), + tableIdentifier.asSummaryString(), + table); } private ScanRuntimeProvider createCountStarScan() { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java index e99f265d03b4..bbe15cec96ee 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java @@ -117,7 +117,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { CoreOptions.BLOB_AS_DESCRIPTOR.key(), "false"))); } - return new PaimonDataStreamScanProvider( + return PaimonDataStreamScanProvider.createProvider( source.getBoundedness() == Boundedness.BOUNDED, env -> { Integer parallelism = inferSourceParallelism(env); @@ -130,7 +130,9 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { dataStreamSource.setParallelism(parallelism); } return dataStreamSource; - }); + }, + tableIdentifier.asSummaryString(), + table); } @Override diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/DataTableSourceTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/DataTableSourceTest.java index 260da03f63a9..20a5ef5ac47e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/DataTableSourceTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/DataTableSourceTest.java @@ -20,7 +20,6 @@ import org.apache.paimon.data.GenericRow; import org.apache.paimon.flink.FlinkConnectorOptions; -import org.apache.paimon.flink.PaimonDataStreamScanProvider; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; @@ -43,6 +42,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.connector.source.DataStreamScanProvider; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.data.RowData; @@ -75,7 +75,7 @@ void testInferScanParallelism() throws Exception { DataTableSource tableSource = new DataTableSource( ObjectIdentifier.of("cat", "db", "table"), fileStoreTable, true, null); - PaimonDataStreamScanProvider runtimeProvider = runtimeProvider(tableSource); + DataStreamScanProvider runtimeProvider = runtimeProvider(tableSource); StreamExecutionEnvironment sEnv1 = StreamExecutionEnvironment.createLocalEnvironment(); sEnv1.setParallelism(-1); DataStream sourceStream1 = @@ -105,7 +105,7 @@ public void testInferStreamParallelism() throws Exception { DataTableSource tableSource = new DataTableSource( ObjectIdentifier.of("cat", "db", "table"), fileStoreTable, true, null); - PaimonDataStreamScanProvider runtimeProvider = runtimeProvider(tableSource); + DataStreamScanProvider runtimeProvider = runtimeProvider(tableSource); StreamExecutionEnvironment sEnv1 = StreamExecutionEnvironment.createLocalEnvironment(); DataStream sourceStream1 = @@ -123,7 +123,7 @@ public void testSystemTableParallelism() throws Exception { SystemTableSource tableSource = new SystemTableSource(ro, false, ObjectIdentifier.of("cat", "db", "table")); - PaimonDataStreamScanProvider runtimeProvider = runtimeProvider(tableSource); + DataStreamScanProvider runtimeProvider = runtimeProvider(tableSource); Configuration configuration = new Configuration(); configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH); @@ -133,8 +133,8 @@ public void testSystemTableParallelism() throws Exception { assertThat(sourceStream1.getParallelism()).isEqualTo(3); } - private PaimonDataStreamScanProvider runtimeProvider(FlinkTableSource tableSource) { - return (PaimonDataStreamScanProvider) + private DataStreamScanProvider runtimeProvider(FlinkTableSource tableSource) { + return (DataStreamScanProvider) tableSource.getScanRuntimeProvider( new ScanTableSource.ScanContext() { @Override diff --git a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/lineage/DataStreamProviderFactory.java b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/lineage/DataStreamProviderFactory.java new file mode 100644 index 000000000000..ee8877d26bee --- /dev/null +++ b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/lineage/DataStreamProviderFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.lineage; + +import org.apache.paimon.table.Table; + +import org.apache.flink.table.connector.sink.DynamicTableSink.SinkRuntimeProvider; +import org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider; + +/** Stub factory for Flink 1.x. Returns providers unchanged since lineage is not supported. */ +public class DataStreamProviderFactory { + + /** Returns the provider unchanged. Flink 1.x does not support lineage. */ + public static ScanRuntimeProvider getScanProvider( + ScanRuntimeProvider provider, String name, Table table) { + return provider; + } + + /** Returns the provider unchanged. Flink 1.x does not support lineage. */ + public static SinkRuntimeProvider getSinkProvider( + SinkRuntimeProvider provider, String name, Table table) { + return provider; + } +} diff --git a/paimon-flink/paimon-flink2-common/pom.xml b/paimon-flink/paimon-flink2-common/pom.xml index c05486013382..1113ea8b457c 100644 --- a/paimon-flink/paimon-flink2-common/pom.xml +++ b/paimon-flink/paimon-flink2-common/pom.xml @@ -55,6 +55,12 @@ under the License. ${flink.version} provided + + org.apache.flink + flink-table-api-java-bridge + ${flink.version} + provided + org.apache.flink flink-runtime diff --git a/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/DataStreamProviderFactory.java b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/DataStreamProviderFactory.java new file mode 100644 index 000000000000..91fd59de7928 --- /dev/null +++ b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/DataStreamProviderFactory.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.lineage; + +import org.apache.paimon.table.Table; + +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; +import org.apache.flink.table.connector.ProviderContext; +import org.apache.flink.table.connector.sink.DataStreamSinkProvider; +import org.apache.flink.table.connector.sink.DynamicTableSink.SinkRuntimeProvider; +import org.apache.flink.table.connector.source.DataStreamScanProvider; +import org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider; +import org.apache.flink.table.data.RowData; + +/** + * Factory that wraps {@link DataStreamScanProvider} and {@link DataStreamSinkProvider} with {@link + * LineageVertexProvider} support for Flink 2.0+. + */ +public class DataStreamProviderFactory { + + /** + * Returns a {@link ScanRuntimeProvider} that also implements {@link LineageVertexProvider} so + * Flink's lineage graph discovers the Paimon source table. + */ + public static ScanRuntimeProvider getScanProvider( + ScanRuntimeProvider provider, String name, Table table) { + return new LineageAwarePaimonDataStreamScanProvider( + (DataStreamScanProvider) provider, name, table); + } + + /** + * Returns a {@link SinkRuntimeProvider} that also implements {@link LineageVertexProvider} so + * Flink's lineage graph discovers the Paimon sink table. + */ + public static SinkRuntimeProvider getSinkProvider( + SinkRuntimeProvider provider, String name, Table table) { + return new LineageAwarePaimonDataStreamSinkProvider( + (DataStreamSinkProvider) provider, name, table); + } + + private static class LineageAwarePaimonDataStreamScanProvider + implements DataStreamScanProvider, LineageVertexProvider { + + private final DataStreamScanProvider delegate; + private final String name; + private final Table table; + + LineageAwarePaimonDataStreamScanProvider( + DataStreamScanProvider delegate, String name, Table table) { + this.delegate = delegate; + this.name = name; + this.table = table; + } + + @Override + public DataStream produceDataStream( + ProviderContext context, StreamExecutionEnvironment env) { + return delegate.produceDataStream(context, env); + } + + @Override + public boolean isBounded() { + return delegate.isBounded(); + } + + @Override + public LineageVertex getLineageVertex() { + return LineageUtils.sourceLineageVertex(name, delegate.isBounded(), table); + } + } + + private static class LineageAwarePaimonDataStreamSinkProvider + implements DataStreamSinkProvider, LineageVertexProvider { + + private final DataStreamSinkProvider delegate; + private final String name; + private final Table table; + + LineageAwarePaimonDataStreamSinkProvider( + DataStreamSinkProvider delegate, String name, Table table) { + this.delegate = delegate; + this.name = name; + this.table = table; + } + + @Override + public DataStreamSink consumeDataStream( + ProviderContext providerContext, DataStream dataStream) { + return delegate.consumeDataStream(providerContext, dataStream); + } + + @Override + public LineageVertex getLineageVertex() { + return LineageUtils.sinkLineageVertex(name, table); + } + } +} diff --git a/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java new file mode 100644 index 000000000000..978d5e7b6ca1 --- /dev/null +++ b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.lineage; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.table.Table; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.SourceLineageVertex; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Lineage utilities for Flink 2.0+. Builds {@link SourceLineageVertex} and {@link LineageVertex} + * from a Paimon table name and its physical warehouse path (namespace). + */ +public class LineageUtils { + + private static final String PAIMON_DATASET_PREFIX = "paimon://"; + + private static final Set PAIMON_OPTION_KEYS = + CoreOptions.getOptions().stream().map(opt -> opt.key()).collect(Collectors.toSet()); + + /** + * Builds the config map for a dataset facet from a {@link Table}. Includes filtered Paimon + * {@link CoreOptions}, partition keys, primary keys, and the table comment (if present). + */ + private static Map buildConfigMap(Table table) { + Map config = new HashMap<>(); + config.put("partition-keys", String.join(",", table.partitionKeys())); + config.put("primary-keys", String.join(",", table.primaryKeys())); + + table.options().entrySet().stream() + .filter(e -> PAIMON_OPTION_KEYS.contains(e.getKey())) + .forEach(e -> config.put(e.getKey(), e.getValue())); + + return config; + } + + /** + * Returns the lineage namespace for a Paimon table. The namespace uses the {@code paimon://} + * scheme followed by the table's physical warehouse path, e.g. {@code + * "paimon://s3://my-bucket/warehouse/mydb.db/mytable"}. + */ + public static String getNamespace(Table table) { + return PAIMON_DATASET_PREFIX + CoreOptions.path(table.options()); + } + + /** + * Creates a {@link SourceLineageVertex} for a Paimon source table. + * + * @param name fully qualified table name, e.g. {@code "paimon.mydb.mytable"} + * @param isBounded whether the source is bounded (batch) or unbounded (streaming) + * @param table the Paimon table (namespace is derived from its {@code path} option) + */ + public static SourceLineageVertex sourceLineageVertex( + String name, boolean isBounded, Table table) { + LineageDataset dataset = + new PaimonLineageDataset(name, getNamespace(table), buildConfigMap(table)); + Boundedness boundedness = + isBounded ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED; + return new PaimonSourceLineageVertex(boundedness, Collections.singletonList(dataset)); + } + + /** + * Creates a {@link LineageVertex} for a Paimon sink table. + * + * @param name fully qualified table name, e.g. {@code "paimon.mydb.mytable"} + * @param table the Paimon table (namespace is derived from its {@code path} option) + */ + public static LineageVertex sinkLineageVertex(String name, Table table) { + LineageDataset dataset = + new PaimonLineageDataset(name, getNamespace(table), buildConfigMap(table)); + return new PaimonSinkLineageVertex(Collections.singletonList(dataset)); + } +} diff --git a/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/PaimonLineageDataset.java b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/PaimonLineageDataset.java new file mode 100644 index 000000000000..5e99df0b2d4d --- /dev/null +++ b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/PaimonLineageDataset.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.lineage; + +import org.apache.flink.streaming.api.lineage.DatasetConfigFacet; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; + +import java.util.HashMap; +import java.util.Map; + +/** + * A {@link LineageDataset} representing a Paimon table, identified by its fully qualified name and + * physical warehouse path as the namespace. + */ +public class PaimonLineageDataset implements LineageDataset { + + private final String name; + private final String namespace; + private final Map tableOptions; + + public PaimonLineageDataset(String name, String namespace, Map tableOptions) { + this.name = name; + this.namespace = namespace; + this.tableOptions = tableOptions; + } + + @Override + public String name() { + return name; + } + + @Override + public String namespace() { + return namespace; + } + + @Override + public Map facets() { + Map facets = new HashMap<>(); + facets.put( + "config", + new DatasetConfigFacet() { + @Override + public String name() { + return "config"; + } + + @Override + public Map config() { + return tableOptions; + } + }); + return facets; + } +} diff --git a/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSinkLineageVertex.java b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSinkLineageVertex.java new file mode 100644 index 000000000000..40024da5e919 --- /dev/null +++ b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSinkLineageVertex.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.lineage; + +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageVertex; + +import java.util.List; + +/** A {@link LineageVertex} representing a Paimon sink table. */ +public class PaimonSinkLineageVertex implements LineageVertex { + + private final List datasets; + + public PaimonSinkLineageVertex(List datasets) { + this.datasets = datasets; + } + + @Override + public List datasets() { + return datasets; + } +} diff --git a/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSourceLineageVertex.java b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSourceLineageVertex.java new file mode 100644 index 000000000000..cbacce2f8abb --- /dev/null +++ b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSourceLineageVertex.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.lineage; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.SourceLineageVertex; + +import java.util.List; + +/** + * A {@link SourceLineageVertex} representing a Paimon source table. Carries the {@link Boundedness} + * to indicate whether the source is bounded (batch) or unbounded (streaming). + */ +public class PaimonSourceLineageVertex implements SourceLineageVertex { + + private final Boundedness boundedness; + private final List datasets; + + public PaimonSourceLineageVertex(Boundedness boundedness, List datasets) { + this.boundedness = boundedness; + this.datasets = datasets; + } + + @Override + public Boundedness boundedness() { + return boundedness; + } + + @Override + public List datasets() { + return datasets; + } +} diff --git a/paimon-flink/paimon-flink2-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java b/paimon-flink/paimon-flink2-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java new file mode 100644 index 000000000000..2b9284985e4c --- /dev/null +++ b/paimon-flink/paimon-flink2-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.lineage; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.VarCharType; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.lineage.DatasetConfigFacet; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; +import org.apache.flink.streaming.api.lineage.SourceLineageVertex; +import org.apache.flink.table.connector.sink.DataStreamSinkProvider; +import org.apache.flink.table.connector.sink.DynamicTableSink.SinkRuntimeProvider; +import org.apache.flink.table.connector.source.DataStreamScanProvider; +import org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider; +import org.apache.flink.table.data.RowData; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link LineageUtils}. */ +class LineageUtilsTest { + + @TempDir java.nio.file.Path temp; + + private Path tablePath; + + @BeforeEach + void setUp() { + tablePath = new Path(temp.toUri().toString()); + } + + private FileStoreTable createTable( + Map options, + java.util.List partitionKeys, + java.util.List primaryKeys) + throws Exception { + new SchemaManager(LocalFileIO.create(), tablePath) + .createTable( + new Schema( + RowType.of(new IntType(), new VarCharType(100), new IntType()) + .getFields(), + partitionKeys, + primaryKeys, + options, + "")); + return FileStoreTableFactory.create(LocalFileIO.create(), tablePath); + } + + @Test + void testGetNamespace() throws Exception { + FileStoreTable table = + createTable(new HashMap<>(), Collections.emptyList(), Arrays.asList("f0")); + + String namespace = LineageUtils.getNamespace(table); + + assertThat(namespace).startsWith("paimon://"); + assertThat(namespace).contains(tablePath.toString()); + } + + @Test + void testSourceLineageVertexBounded() throws Exception { + FileStoreTable table = + createTable(new HashMap<>(), Collections.emptyList(), Arrays.asList("f0")); + + SourceLineageVertex vertex = LineageUtils.sourceLineageVertex("paimon.db.src", true, table); + + assertThat(vertex).isInstanceOf(PaimonSourceLineageVertex.class); + assertThat(vertex.boundedness()).isEqualTo(Boundedness.BOUNDED); + assertThat(vertex.datasets()).hasSize(1); + + LineageDataset dataset = vertex.datasets().get(0); + assertThat(dataset.name()).isEqualTo("paimon.db.src"); + assertThat(dataset.namespace()).startsWith("paimon://"); + } + + @Test + void testSourceLineageVertexUnbounded() throws Exception { + FileStoreTable table = + createTable(new HashMap<>(), Collections.emptyList(), Arrays.asList("f0")); + + SourceLineageVertex vertex = + LineageUtils.sourceLineageVertex("paimon.db.src", false, table); + + assertThat(vertex.boundedness()).isEqualTo(Boundedness.CONTINUOUS_UNBOUNDED); + } + + @Test + void testSinkLineageVertex() throws Exception { + FileStoreTable table = + createTable(new HashMap<>(), Collections.emptyList(), Arrays.asList("f0")); + + LineageVertex vertex = LineageUtils.sinkLineageVertex("paimon.db.sink", table); + + assertThat(vertex).isInstanceOf(PaimonSinkLineageVertex.class); + assertThat(vertex.datasets()).hasSize(1); + + LineageDataset dataset = vertex.datasets().get(0); + assertThat(dataset.name()).isEqualTo("paimon.db.sink"); + assertThat(dataset.namespace()).startsWith("paimon://"); + } + + @Test + void testConfigFacetContainsPartitionAndPrimaryKeys() throws Exception { + FileStoreTable table = + createTable(new HashMap<>(), Arrays.asList("f2"), Arrays.asList("f0", "f2")); + + LineageVertex vertex = LineageUtils.sinkLineageVertex("paimon.db.t", table); + LineageDataset dataset = vertex.datasets().get(0); + + Map facets = dataset.facets(); + assertThat(facets).containsKey("config"); + + DatasetConfigFacet configFacet = (DatasetConfigFacet) facets.get("config"); + Map config = configFacet.config(); + assertThat(config).containsEntry("partition-keys", "f2"); + assertThat(config).containsEntry("primary-keys", "f0,f2"); + } + + @Test + void testConfigFacetIncludesPaimonOptions() throws Exception { + Map options = new HashMap<>(); + options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "lookup"); + + FileStoreTable table = createTable(options, Collections.emptyList(), Arrays.asList("f0")); + + SourceLineageVertex vertex = LineageUtils.sourceLineageVertex("paimon.db.t", true, table); + LineageDataset dataset = vertex.datasets().get(0); + + DatasetConfigFacet configFacet = (DatasetConfigFacet) dataset.facets().get("config"); + Map config = configFacet.config(); + assertThat(config).containsEntry(CoreOptions.CHANGELOG_PRODUCER.key(), "lookup"); + } + + @Test + void testConfigFacetWithEmptyKeys() throws Exception { + FileStoreTable table = + createTable(new HashMap<>(), Collections.emptyList(), Collections.emptyList()); + + LineageVertex vertex = LineageUtils.sinkLineageVertex("paimon.db.t", table); + LineageDataset dataset = vertex.datasets().get(0); + + DatasetConfigFacet configFacet = (DatasetConfigFacet) dataset.facets().get("config"); + Map config = configFacet.config(); + assertThat(config).containsEntry("partition-keys", ""); + assertThat(config).containsEntry("primary-keys", ""); + } + + @Test + void testGetScanProviderImplementsLineageVertexProvider() throws Exception { + FileStoreTable table = + createTable(new HashMap<>(), Collections.emptyList(), Arrays.asList("f0")); + + DataStreamScanProvider stub = + new DataStreamScanProvider() { + @Override + public DataStream produceDataStream( + org.apache.flink.table.connector.ProviderContext context, + StreamExecutionEnvironment env) { + return null; + } + + @Override + public boolean isBounded() { + return true; + } + }; + ScanRuntimeProvider wrapped = + DataStreamProviderFactory.getScanProvider(stub, "paimon.db.src", table); + + assertThat(wrapped).isInstanceOf(LineageVertexProvider.class); + LineageVertex vertex = ((LineageVertexProvider) wrapped).getLineageVertex(); + assertThat(vertex).isInstanceOf(SourceLineageVertex.class); + assertThat(vertex.datasets()).hasSize(1); + assertThat(vertex.datasets().get(0).name()).isEqualTo("paimon.db.src"); + } + + @Test + void testGetSinkProviderImplementsLineageVertexProvider() throws Exception { + FileStoreTable table = + createTable(new HashMap<>(), Collections.emptyList(), Arrays.asList("f0")); + + DataStreamSinkProvider stub = (providerContext, dataStream) -> null; + SinkRuntimeProvider wrapped = + DataStreamProviderFactory.getSinkProvider(stub, "paimon.db.sink", table); + + assertThat(wrapped).isInstanceOf(LineageVertexProvider.class); + LineageVertex vertex = ((LineageVertexProvider) wrapped).getLineageVertex(); + assertThat(vertex.datasets()).hasSize(1); + assertThat(vertex.datasets().get(0).name()).isEqualTo("paimon.db.sink"); + } +} From 97881fef58a72388eee41565e22722e76d8a2ad0 Mon Sep 17 00:00:00 2001 From: jkukreja Date: Tue, 10 Mar 2026 19:20:00 -0400 Subject: [PATCH 2/2] copy over lineage class dependencies to flink-1 module and remove delegate pattern introduced to handle backward compatibility --- .../flink/PaimonDataStreamScanProvider.java | 35 +++--- .../flink/PaimonDataStreamSinkProvider.java | 34 ++--- .../paimon/flink/lineage/LineageUtils.java | 4 +- .../flink/lineage/PaimonLineageDataset.java | 0 .../lineage/PaimonSinkLineageVertex.java | 0 .../lineage/PaimonSourceLineageVertex.java | 0 .../flink/sink/FlinkFormatTableSink.java | 2 +- .../paimon/flink/sink/FlinkTableSinkBase.java | 4 +- .../flink/source/BaseDataTableSource.java | 2 +- .../flink/source/SystemTableSource.java | 2 +- .../flink/lineage/LineageUtilsTest.java | 46 ++----- .../flink/source/DataTableSourceTest.java | 12 +- .../api/lineage/DatasetConfigFacet.java | 29 +++++ .../streaming/api/lineage/LineageDataset.java | 33 +++++ .../api/lineage/LineageDatasetFacet.java | 27 ++++ .../streaming/api/lineage/LineageVertex.java | 29 +++++ .../api/lineage/LineageVertexProvider.java | 32 +++++ .../api/lineage/SourceLineageVertex.java | 28 +++++ .../lineage/DataStreamProviderFactory.java | 40 ------ paimon-flink/paimon-flink2-common/pom.xml | 6 - .../lineage/DataStreamProviderFactory.java | 117 ------------------ 21 files changed, 242 insertions(+), 240 deletions(-) rename paimon-flink/{paimon-flink2-common => paimon-flink-common}/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java (95%) rename paimon-flink/{paimon-flink2-common => paimon-flink-common}/src/main/java/org/apache/paimon/flink/lineage/PaimonLineageDataset.java (100%) rename paimon-flink/{paimon-flink2-common => paimon-flink-common}/src/main/java/org/apache/paimon/flink/lineage/PaimonSinkLineageVertex.java (100%) rename paimon-flink/{paimon-flink2-common => paimon-flink-common}/src/main/java/org/apache/paimon/flink/lineage/PaimonSourceLineageVertex.java (100%) rename paimon-flink/{paimon-flink2-common => paimon-flink-common}/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java (80%) create mode 100644 paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/DatasetConfigFacet.java create mode 100644 paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageDataset.java create mode 100644 paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageDatasetFacet.java create mode 100644 paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageVertex.java create mode 100644 paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageVertexProvider.java create mode 100644 paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/SourceLineageVertex.java delete mode 100644 paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/lineage/DataStreamProviderFactory.java delete mode 100644 paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/DataStreamProviderFactory.java diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java index 163bb8e6ec84..776575dc0a72 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java @@ -18,41 +18,39 @@ package org.apache.paimon.flink; -import org.apache.paimon.flink.lineage.DataStreamProviderFactory; +import org.apache.paimon.flink.lineage.LineageUtils; import org.apache.paimon.table.Table; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; import org.apache.flink.table.connector.ProviderContext; import org.apache.flink.table.connector.source.DataStreamScanProvider; -import org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider; import org.apache.flink.table.data.RowData; import java.util.function.Function; -/** Paimon {@link DataStreamScanProvider}. */ -public class PaimonDataStreamScanProvider implements DataStreamScanProvider { +/** + * Paimon {@link DataStreamScanProvider} that also implements {@link LineageVertexProvider} so + * Flink's lineage graph discovers the Paimon source table. + */ +public class PaimonDataStreamScanProvider implements DataStreamScanProvider, LineageVertexProvider { private final boolean isBounded; private final Function> producer; + private final String name; + private final Table table; public PaimonDataStreamScanProvider( - boolean isBounded, Function> producer) { - this.isBounded = isBounded; - this.producer = producer; - } - - /** - * Creates a {@link ScanRuntimeProvider} that may be enriched with lineage metadata when running - * on a Flink version that supports it. - */ - public static ScanRuntimeProvider createProvider( boolean isBounded, Function> producer, String name, Table table) { - return DataStreamProviderFactory.getScanProvider( - new PaimonDataStreamScanProvider(isBounded, producer), name, table); + this.isBounded = isBounded; + this.producer = producer; + this.name = name; + this.table = table; } @Override @@ -65,4 +63,9 @@ public DataStream produceDataStream( public boolean isBounded() { return isBounded; } + + @Override + public LineageVertex getLineageVertex() { + return LineageUtils.sourceLineageVertex(name, isBounded, table); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java index 7516e9b5b937..f9087b49fc88 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java @@ -18,35 +18,34 @@ package org.apache.paimon.flink; -import org.apache.paimon.flink.lineage.DataStreamProviderFactory; +import org.apache.paimon.flink.lineage.LineageUtils; import org.apache.paimon.table.Table; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; import org.apache.flink.table.connector.ProviderContext; import org.apache.flink.table.connector.sink.DataStreamSinkProvider; -import org.apache.flink.table.connector.sink.DynamicTableSink.SinkRuntimeProvider; import org.apache.flink.table.data.RowData; import java.util.function.Function; -/** Paimon {@link DataStreamSinkProvider}. */ -public class PaimonDataStreamSinkProvider implements DataStreamSinkProvider { +/** + * Paimon {@link DataStreamSinkProvider} that also implements {@link LineageVertexProvider} so + * Flink's lineage graph discovers the Paimon sink table. + */ +public class PaimonDataStreamSinkProvider implements DataStreamSinkProvider, LineageVertexProvider { private final Function, DataStreamSink> producer; + private final String name; + private final Table table; - public PaimonDataStreamSinkProvider(Function, DataStreamSink> producer) { - this.producer = producer; - } - - /** - * Creates a {@link SinkRuntimeProvider} that may be enriched with lineage metadata when running - * on a Flink version that supports it. - */ - public static SinkRuntimeProvider createProvider( + public PaimonDataStreamSinkProvider( Function, DataStreamSink> producer, String name, Table table) { - return DataStreamProviderFactory.getSinkProvider( - new PaimonDataStreamSinkProvider(producer), name, table); + this.producer = producer; + this.name = name; + this.table = table; } @Override @@ -54,4 +53,9 @@ public DataStreamSink consumeDataStream( ProviderContext providerContext, DataStream dataStream) { return producer.apply(dataStream); } + + @Override + public LineageVertex getLineageVertex() { + return LineageUtils.sinkLineageVertex(name, table); + } } diff --git a/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java similarity index 95% rename from paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java index 978d5e7b6ca1..110365c76ee4 100644 --- a/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java @@ -33,8 +33,8 @@ import java.util.stream.Collectors; /** - * Lineage utilities for Flink 2.0+. Builds {@link SourceLineageVertex} and {@link LineageVertex} - * from a Paimon table name and its physical warehouse path (namespace). + * Lineage utilities for building {@link SourceLineageVertex} and {@link LineageVertex} from a + * Paimon table name and its physical warehouse path (namespace). */ public class LineageUtils { diff --git a/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/PaimonLineageDataset.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonLineageDataset.java similarity index 100% rename from paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/PaimonLineageDataset.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonLineageDataset.java diff --git a/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSinkLineageVertex.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSinkLineageVertex.java similarity index 100% rename from paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSinkLineageVertex.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSinkLineageVertex.java diff --git a/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSourceLineageVertex.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSourceLineageVertex.java similarity index 100% rename from paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSourceLineageVertex.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSourceLineageVertex.java diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableSink.java index 695730b46d24..1c48602098e3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableSink.java @@ -57,7 +57,7 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { @Override public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { - return PaimonDataStreamSinkProvider.createProvider( + return new PaimonDataStreamSinkProvider( (dataStream) -> new FlinkFormatTableDataStreamSink(table, overwrite, staticPartitions) .sinkFrom(dataStream), diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java index 1430009ecf21..33260bf388c1 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java @@ -108,7 +108,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { if (table instanceof FormatTable) { FormatTable formatTable = (FormatTable) table; - return PaimonDataStreamSinkProvider.createProvider( + return new PaimonDataStreamSinkProvider( (dataStream) -> new FlinkFormatTableDataStreamSink( formatTable, overwrite, staticPartitions) @@ -119,7 +119,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { Options conf = Options.fromMap(table.options()); // Do not sink to log store when overwrite mode - return PaimonDataStreamSinkProvider.createProvider( + return new PaimonDataStreamSinkProvider( (dataStream) -> { FlinkSinkBuilder builder = createSinkBuilder(); builder.forRowData( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java index b0b5c2d296c1..5227fa0d270f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java @@ -195,7 +195,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .limit(limit) .watermarkStrategy(watermarkStrategy) .dynamicPartitionFilteringFields(dynamicPartitionFilteringFields()); - return PaimonDataStreamScanProvider.createProvider( + return new PaimonDataStreamScanProvider( !unbounded, env -> sourceBuilder diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java index bbe15cec96ee..2bbd749f58ca 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java @@ -117,7 +117,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { CoreOptions.BLOB_AS_DESCRIPTOR.key(), "false"))); } - return PaimonDataStreamScanProvider.createProvider( + return new PaimonDataStreamScanProvider( source.getBoundedness() == Boundedness.BOUNDED, env -> { Integer parallelism = inferSourceParallelism(env); diff --git a/paimon-flink/paimon-flink2-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java similarity index 80% rename from paimon-flink/paimon-flink2-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java rename to paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java index 2b9284985e4c..62d601ec1b23 100644 --- a/paimon-flink/paimon-flink2-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java @@ -19,6 +19,8 @@ package org.apache.paimon.flink.lineage; import org.apache.paimon.CoreOptions; +import org.apache.paimon.flink.PaimonDataStreamScanProvider; +import org.apache.paimon.flink.PaimonDataStreamSinkProvider; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.schema.Schema; @@ -30,19 +32,12 @@ import org.apache.paimon.types.VarCharType; import org.apache.flink.api.connector.source.Boundedness; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.lineage.DatasetConfigFacet; import org.apache.flink.streaming.api.lineage.LineageDataset; import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; import org.apache.flink.streaming.api.lineage.LineageVertex; import org.apache.flink.streaming.api.lineage.LineageVertexProvider; import org.apache.flink.streaming.api.lineage.SourceLineageVertex; -import org.apache.flink.table.connector.sink.DataStreamSinkProvider; -import org.apache.flink.table.connector.sink.DynamicTableSink.SinkRuntimeProvider; -import org.apache.flink.table.connector.source.DataStreamScanProvider; -import org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider; -import org.apache.flink.table.data.RowData; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -183,45 +178,30 @@ void testConfigFacetWithEmptyKeys() throws Exception { } @Test - void testGetScanProviderImplementsLineageVertexProvider() throws Exception { + void testScanProviderImplementsLineageVertexProvider() throws Exception { FileStoreTable table = createTable(new HashMap<>(), Collections.emptyList(), Arrays.asList("f0")); - DataStreamScanProvider stub = - new DataStreamScanProvider() { - @Override - public DataStream produceDataStream( - org.apache.flink.table.connector.ProviderContext context, - StreamExecutionEnvironment env) { - return null; - } - - @Override - public boolean isBounded() { - return true; - } - }; - ScanRuntimeProvider wrapped = - DataStreamProviderFactory.getScanProvider(stub, "paimon.db.src", table); - - assertThat(wrapped).isInstanceOf(LineageVertexProvider.class); - LineageVertex vertex = ((LineageVertexProvider) wrapped).getLineageVertex(); + PaimonDataStreamScanProvider provider = + new PaimonDataStreamScanProvider(true, env -> null, "paimon.db.src", table); + + assertThat(provider).isInstanceOf(LineageVertexProvider.class); + LineageVertex vertex = provider.getLineageVertex(); assertThat(vertex).isInstanceOf(SourceLineageVertex.class); assertThat(vertex.datasets()).hasSize(1); assertThat(vertex.datasets().get(0).name()).isEqualTo("paimon.db.src"); } @Test - void testGetSinkProviderImplementsLineageVertexProvider() throws Exception { + void testSinkProviderImplementsLineageVertexProvider() throws Exception { FileStoreTable table = createTable(new HashMap<>(), Collections.emptyList(), Arrays.asList("f0")); - DataStreamSinkProvider stub = (providerContext, dataStream) -> null; - SinkRuntimeProvider wrapped = - DataStreamProviderFactory.getSinkProvider(stub, "paimon.db.sink", table); + PaimonDataStreamSinkProvider provider = + new PaimonDataStreamSinkProvider(dataStream -> null, "paimon.db.sink", table); - assertThat(wrapped).isInstanceOf(LineageVertexProvider.class); - LineageVertex vertex = ((LineageVertexProvider) wrapped).getLineageVertex(); + assertThat(provider).isInstanceOf(LineageVertexProvider.class); + LineageVertex vertex = provider.getLineageVertex(); assertThat(vertex.datasets()).hasSize(1); assertThat(vertex.datasets().get(0).name()).isEqualTo("paimon.db.sink"); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/DataTableSourceTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/DataTableSourceTest.java index 20a5ef5ac47e..260da03f63a9 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/DataTableSourceTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/DataTableSourceTest.java @@ -20,6 +20,7 @@ import org.apache.paimon.data.GenericRow; import org.apache.paimon.flink.FlinkConnectorOptions; +import org.apache.paimon.flink.PaimonDataStreamScanProvider; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; @@ -42,7 +43,6 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.catalog.ObjectIdentifier; -import org.apache.flink.table.connector.source.DataStreamScanProvider; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.data.RowData; @@ -75,7 +75,7 @@ void testInferScanParallelism() throws Exception { DataTableSource tableSource = new DataTableSource( ObjectIdentifier.of("cat", "db", "table"), fileStoreTable, true, null); - DataStreamScanProvider runtimeProvider = runtimeProvider(tableSource); + PaimonDataStreamScanProvider runtimeProvider = runtimeProvider(tableSource); StreamExecutionEnvironment sEnv1 = StreamExecutionEnvironment.createLocalEnvironment(); sEnv1.setParallelism(-1); DataStream sourceStream1 = @@ -105,7 +105,7 @@ public void testInferStreamParallelism() throws Exception { DataTableSource tableSource = new DataTableSource( ObjectIdentifier.of("cat", "db", "table"), fileStoreTable, true, null); - DataStreamScanProvider runtimeProvider = runtimeProvider(tableSource); + PaimonDataStreamScanProvider runtimeProvider = runtimeProvider(tableSource); StreamExecutionEnvironment sEnv1 = StreamExecutionEnvironment.createLocalEnvironment(); DataStream sourceStream1 = @@ -123,7 +123,7 @@ public void testSystemTableParallelism() throws Exception { SystemTableSource tableSource = new SystemTableSource(ro, false, ObjectIdentifier.of("cat", "db", "table")); - DataStreamScanProvider runtimeProvider = runtimeProvider(tableSource); + PaimonDataStreamScanProvider runtimeProvider = runtimeProvider(tableSource); Configuration configuration = new Configuration(); configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH); @@ -133,8 +133,8 @@ public void testSystemTableParallelism() throws Exception { assertThat(sourceStream1.getParallelism()).isEqualTo(3); } - private DataStreamScanProvider runtimeProvider(FlinkTableSource tableSource) { - return (DataStreamScanProvider) + private PaimonDataStreamScanProvider runtimeProvider(FlinkTableSource tableSource) { + return (PaimonDataStreamScanProvider) tableSource.getScanRuntimeProvider( new ScanTableSource.ScanContext() { @Override diff --git a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/DatasetConfigFacet.java b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/DatasetConfigFacet.java new file mode 100644 index 000000000000..cbd25f6c59b5 --- /dev/null +++ b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/DatasetConfigFacet.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.Map; + +/** Builtin config facet for dataset. Stub for Flink 1.x compatibility. */ +@PublicEvolving +public interface DatasetConfigFacet extends LineageDatasetFacet { + Map config(); +} diff --git a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageDataset.java b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageDataset.java new file mode 100644 index 000000000000..4d49b5ca7775 --- /dev/null +++ b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageDataset.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.Map; + +/** Lineage dataset represents the source or sink in the job. Stub for Flink 1.x compatibility. */ +@PublicEvolving +public interface LineageDataset { + String name(); + + String namespace(); + + Map facets(); +} diff --git a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageDatasetFacet.java b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageDatasetFacet.java new file mode 100644 index 000000000000..55bf57887a94 --- /dev/null +++ b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageDatasetFacet.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.annotation.PublicEvolving; + +/** Facet interface for dataset. Stub for Flink 1.x compatibility. */ +@PublicEvolving +public interface LineageDatasetFacet { + String name(); +} diff --git a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageVertex.java b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageVertex.java new file mode 100644 index 000000000000..43c4f991e1a8 --- /dev/null +++ b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageVertex.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.List; + +/** Lineage vertex represents the connectors in lineage graph. Stub for Flink 1.x compatibility. */ +@PublicEvolving +public interface LineageVertex { + List datasets(); +} diff --git a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageVertexProvider.java b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageVertexProvider.java new file mode 100644 index 000000000000..959e2e96255a --- /dev/null +++ b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageVertexProvider.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Create lineage vertex for source and sink in DataStream. Stub for Flink 1.x compatibility. + * + *

On Flink 1.x the runtime never calls {@link #getLineageVertex()}, so the implementation is + * effectively a no-op. + */ +@PublicEvolving +public interface LineageVertexProvider { + LineageVertex getLineageVertex(); +} diff --git a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/SourceLineageVertex.java b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/SourceLineageVertex.java new file mode 100644 index 000000000000..ab36aaee4998 --- /dev/null +++ b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/SourceLineageVertex.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.source.Boundedness; + +/** Lineage vertex for source which has {@link Boundedness}. Stub for Flink 1.x compatibility. */ +@PublicEvolving +public interface SourceLineageVertex extends LineageVertex { + Boundedness boundedness(); +} diff --git a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/lineage/DataStreamProviderFactory.java b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/lineage/DataStreamProviderFactory.java deleted file mode 100644 index ee8877d26bee..000000000000 --- a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/lineage/DataStreamProviderFactory.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.lineage; - -import org.apache.paimon.table.Table; - -import org.apache.flink.table.connector.sink.DynamicTableSink.SinkRuntimeProvider; -import org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider; - -/** Stub factory for Flink 1.x. Returns providers unchanged since lineage is not supported. */ -public class DataStreamProviderFactory { - - /** Returns the provider unchanged. Flink 1.x does not support lineage. */ - public static ScanRuntimeProvider getScanProvider( - ScanRuntimeProvider provider, String name, Table table) { - return provider; - } - - /** Returns the provider unchanged. Flink 1.x does not support lineage. */ - public static SinkRuntimeProvider getSinkProvider( - SinkRuntimeProvider provider, String name, Table table) { - return provider; - } -} diff --git a/paimon-flink/paimon-flink2-common/pom.xml b/paimon-flink/paimon-flink2-common/pom.xml index 1113ea8b457c..c05486013382 100644 --- a/paimon-flink/paimon-flink2-common/pom.xml +++ b/paimon-flink/paimon-flink2-common/pom.xml @@ -55,12 +55,6 @@ under the License. ${flink.version} provided - - org.apache.flink - flink-table-api-java-bridge - ${flink.version} - provided - org.apache.flink flink-runtime diff --git a/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/DataStreamProviderFactory.java b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/DataStreamProviderFactory.java deleted file mode 100644 index 91fd59de7928..000000000000 --- a/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/DataStreamProviderFactory.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.lineage; - -import org.apache.paimon.table.Table; - -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.lineage.LineageVertex; -import org.apache.flink.streaming.api.lineage.LineageVertexProvider; -import org.apache.flink.table.connector.ProviderContext; -import org.apache.flink.table.connector.sink.DataStreamSinkProvider; -import org.apache.flink.table.connector.sink.DynamicTableSink.SinkRuntimeProvider; -import org.apache.flink.table.connector.source.DataStreamScanProvider; -import org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider; -import org.apache.flink.table.data.RowData; - -/** - * Factory that wraps {@link DataStreamScanProvider} and {@link DataStreamSinkProvider} with {@link - * LineageVertexProvider} support for Flink 2.0+. - */ -public class DataStreamProviderFactory { - - /** - * Returns a {@link ScanRuntimeProvider} that also implements {@link LineageVertexProvider} so - * Flink's lineage graph discovers the Paimon source table. - */ - public static ScanRuntimeProvider getScanProvider( - ScanRuntimeProvider provider, String name, Table table) { - return new LineageAwarePaimonDataStreamScanProvider( - (DataStreamScanProvider) provider, name, table); - } - - /** - * Returns a {@link SinkRuntimeProvider} that also implements {@link LineageVertexProvider} so - * Flink's lineage graph discovers the Paimon sink table. - */ - public static SinkRuntimeProvider getSinkProvider( - SinkRuntimeProvider provider, String name, Table table) { - return new LineageAwarePaimonDataStreamSinkProvider( - (DataStreamSinkProvider) provider, name, table); - } - - private static class LineageAwarePaimonDataStreamScanProvider - implements DataStreamScanProvider, LineageVertexProvider { - - private final DataStreamScanProvider delegate; - private final String name; - private final Table table; - - LineageAwarePaimonDataStreamScanProvider( - DataStreamScanProvider delegate, String name, Table table) { - this.delegate = delegate; - this.name = name; - this.table = table; - } - - @Override - public DataStream produceDataStream( - ProviderContext context, StreamExecutionEnvironment env) { - return delegate.produceDataStream(context, env); - } - - @Override - public boolean isBounded() { - return delegate.isBounded(); - } - - @Override - public LineageVertex getLineageVertex() { - return LineageUtils.sourceLineageVertex(name, delegate.isBounded(), table); - } - } - - private static class LineageAwarePaimonDataStreamSinkProvider - implements DataStreamSinkProvider, LineageVertexProvider { - - private final DataStreamSinkProvider delegate; - private final String name; - private final Table table; - - LineageAwarePaimonDataStreamSinkProvider( - DataStreamSinkProvider delegate, String name, Table table) { - this.delegate = delegate; - this.name = name; - this.table = table; - } - - @Override - public DataStreamSink consumeDataStream( - ProviderContext providerContext, DataStream dataStream) { - return delegate.consumeDataStream(providerContext, dataStream); - } - - @Override - public LineageVertex getLineageVertex() { - return LineageUtils.sinkLineageVertex(name, table); - } - } -}