diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java index af9df5b9338c..776575dc0a72 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java @@ -18,24 +18,39 @@ package org.apache.paimon.flink; +import org.apache.paimon.flink.lineage.LineageUtils; +import org.apache.paimon.table.Table; + import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; import org.apache.flink.table.connector.ProviderContext; import org.apache.flink.table.connector.source.DataStreamScanProvider; import org.apache.flink.table.data.RowData; import java.util.function.Function; -/** Paimon {@link DataStreamScanProvider}. */ -public class PaimonDataStreamScanProvider implements DataStreamScanProvider { +/** + * Paimon {@link DataStreamScanProvider} that also implements {@link LineageVertexProvider} so + * Flink's lineage graph discovers the Paimon source table. + */ +public class PaimonDataStreamScanProvider implements DataStreamScanProvider, LineageVertexProvider { private final boolean isBounded; private final Function> producer; + private final String name; + private final Table table; public PaimonDataStreamScanProvider( - boolean isBounded, Function> producer) { + boolean isBounded, + Function> producer, + String name, + Table table) { this.isBounded = isBounded; this.producer = producer; + this.name = name; + this.table = table; } @Override @@ -48,4 +63,9 @@ public DataStream produceDataStream( public boolean isBounded() { return isBounded; } + + @Override + public LineageVertex getLineageVertex() { + return LineageUtils.sourceLineageVertex(name, isBounded, table); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java index 05eaacf5ab14..f9087b49fc88 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java @@ -18,21 +18,34 @@ package org.apache.paimon.flink; +import org.apache.paimon.flink.lineage.LineageUtils; +import org.apache.paimon.table.Table; + import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; import org.apache.flink.table.connector.ProviderContext; import org.apache.flink.table.connector.sink.DataStreamSinkProvider; import org.apache.flink.table.data.RowData; import java.util.function.Function; -/** Paimon {@link DataStreamSinkProvider}. */ -public class PaimonDataStreamSinkProvider implements DataStreamSinkProvider { +/** + * Paimon {@link DataStreamSinkProvider} that also implements {@link LineageVertexProvider} so + * Flink's lineage graph discovers the Paimon sink table. + */ +public class PaimonDataStreamSinkProvider implements DataStreamSinkProvider, LineageVertexProvider { private final Function, DataStreamSink> producer; + private final String name; + private final Table table; - public PaimonDataStreamSinkProvider(Function, DataStreamSink> producer) { + public PaimonDataStreamSinkProvider( + Function, DataStreamSink> producer, String name, Table table) { this.producer = producer; + this.name = name; + this.table = table; } @Override @@ -40,4 +53,9 @@ public DataStreamSink consumeDataStream( ProviderContext providerContext, DataStream dataStream) { return producer.apply(dataStream); } + + @Override + public LineageVertex getLineageVertex() { + return LineageUtils.sinkLineageVertex(name, table); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java new file mode 100644 index 000000000000..110365c76ee4 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.lineage; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.table.Table; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.SourceLineageVertex; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Lineage utilities for building {@link SourceLineageVertex} and {@link LineageVertex} from a + * Paimon table name and its physical warehouse path (namespace). + */ +public class LineageUtils { + + private static final String PAIMON_DATASET_PREFIX = "paimon://"; + + private static final Set PAIMON_OPTION_KEYS = + CoreOptions.getOptions().stream().map(opt -> opt.key()).collect(Collectors.toSet()); + + /** + * Builds the config map for a dataset facet from a {@link Table}. Includes filtered Paimon + * {@link CoreOptions}, partition keys, primary keys, and the table comment (if present). + */ + private static Map buildConfigMap(Table table) { + Map config = new HashMap<>(); + config.put("partition-keys", String.join(",", table.partitionKeys())); + config.put("primary-keys", String.join(",", table.primaryKeys())); + + table.options().entrySet().stream() + .filter(e -> PAIMON_OPTION_KEYS.contains(e.getKey())) + .forEach(e -> config.put(e.getKey(), e.getValue())); + + return config; + } + + /** + * Returns the lineage namespace for a Paimon table. The namespace uses the {@code paimon://} + * scheme followed by the table's physical warehouse path, e.g. {@code + * "paimon://s3://my-bucket/warehouse/mydb.db/mytable"}. + */ + public static String getNamespace(Table table) { + return PAIMON_DATASET_PREFIX + CoreOptions.path(table.options()); + } + + /** + * Creates a {@link SourceLineageVertex} for a Paimon source table. + * + * @param name fully qualified table name, e.g. {@code "paimon.mydb.mytable"} + * @param isBounded whether the source is bounded (batch) or unbounded (streaming) + * @param table the Paimon table (namespace is derived from its {@code path} option) + */ + public static SourceLineageVertex sourceLineageVertex( + String name, boolean isBounded, Table table) { + LineageDataset dataset = + new PaimonLineageDataset(name, getNamespace(table), buildConfigMap(table)); + Boundedness boundedness = + isBounded ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED; + return new PaimonSourceLineageVertex(boundedness, Collections.singletonList(dataset)); + } + + /** + * Creates a {@link LineageVertex} for a Paimon sink table. + * + * @param name fully qualified table name, e.g. {@code "paimon.mydb.mytable"} + * @param table the Paimon table (namespace is derived from its {@code path} option) + */ + public static LineageVertex sinkLineageVertex(String name, Table table) { + LineageDataset dataset = + new PaimonLineageDataset(name, getNamespace(table), buildConfigMap(table)); + return new PaimonSinkLineageVertex(Collections.singletonList(dataset)); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonLineageDataset.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonLineageDataset.java new file mode 100644 index 000000000000..5e99df0b2d4d --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonLineageDataset.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.lineage; + +import org.apache.flink.streaming.api.lineage.DatasetConfigFacet; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; + +import java.util.HashMap; +import java.util.Map; + +/** + * A {@link LineageDataset} representing a Paimon table, identified by its fully qualified name and + * physical warehouse path as the namespace. + */ +public class PaimonLineageDataset implements LineageDataset { + + private final String name; + private final String namespace; + private final Map tableOptions; + + public PaimonLineageDataset(String name, String namespace, Map tableOptions) { + this.name = name; + this.namespace = namespace; + this.tableOptions = tableOptions; + } + + @Override + public String name() { + return name; + } + + @Override + public String namespace() { + return namespace; + } + + @Override + public Map facets() { + Map facets = new HashMap<>(); + facets.put( + "config", + new DatasetConfigFacet() { + @Override + public String name() { + return "config"; + } + + @Override + public Map config() { + return tableOptions; + } + }); + return facets; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSinkLineageVertex.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSinkLineageVertex.java new file mode 100644 index 000000000000..40024da5e919 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSinkLineageVertex.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.lineage; + +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageVertex; + +import java.util.List; + +/** A {@link LineageVertex} representing a Paimon sink table. */ +public class PaimonSinkLineageVertex implements LineageVertex { + + private final List datasets; + + public PaimonSinkLineageVertex(List datasets) { + this.datasets = datasets; + } + + @Override + public List datasets() { + return datasets; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSourceLineageVertex.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSourceLineageVertex.java new file mode 100644 index 000000000000..cbacce2f8abb --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSourceLineageVertex.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.lineage; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.SourceLineageVertex; + +import java.util.List; + +/** + * A {@link SourceLineageVertex} representing a Paimon source table. Carries the {@link Boundedness} + * to indicate whether the source is bounded (batch) or unbounded (streaming). + */ +public class PaimonSourceLineageVertex implements SourceLineageVertex { + + private final Boundedness boundedness; + private final List datasets; + + public PaimonSourceLineageVertex(Boundedness boundedness, List datasets) { + this.boundedness = boundedness; + this.datasets = datasets; + } + + @Override + public Boundedness boundedness() { + return boundedness; + } + + @Override + public List datasets() { + return datasets; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableSink.java index 361323f016ff..1c48602098e3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableSink.java @@ -60,7 +60,9 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { return new PaimonDataStreamSinkProvider( (dataStream) -> new FlinkFormatTableDataStreamSink(table, overwrite, staticPartitions) - .sinkFrom(dataStream)); + .sinkFrom(dataStream), + tableIdentifier.asSummaryString(), + table); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java index b7c37d525eff..33260bf388c1 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java @@ -104,13 +104,17 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { throw new UnsupportedOperationException( "Paimon doesn't support streaming INSERT OVERWRITE."); } + String name = tableIdentifier.asSummaryString(); + if (table instanceof FormatTable) { FormatTable formatTable = (FormatTable) table; return new PaimonDataStreamSinkProvider( (dataStream) -> new FlinkFormatTableDataStreamSink( formatTable, overwrite, staticPartitions) - .sinkFrom(dataStream)); + .sinkFrom(dataStream), + name, + table); } Options conf = Options.fromMap(table.options()); @@ -134,7 +138,9 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { } conf.getOptional(SINK_PARALLELISM).ifPresent(builder::parallelism); return builder.build(); - }); + }, + name, + table); } protected FlinkSinkBuilder createSinkBuilder() { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java index 8e1ecfc9556e..5227fa0d270f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java @@ -195,14 +195,15 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .limit(limit) .watermarkStrategy(watermarkStrategy) .dynamicPartitionFilteringFields(dynamicPartitionFilteringFields()); - return new PaimonDataStreamScanProvider( !unbounded, env -> sourceBuilder .sourceParallelism(inferSourceParallelism(env)) .env(env) - .build()); + .build(), + tableIdentifier.asSummaryString(), + table); } private ScanRuntimeProvider createCountStarScan() { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java index e99f265d03b4..2bbd749f58ca 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java @@ -130,7 +130,9 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { dataStreamSource.setParallelism(parallelism); } return dataStreamSource; - }); + }, + tableIdentifier.asSummaryString(), + table); } @Override diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java new file mode 100644 index 000000000000..62d601ec1b23 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.lineage; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.flink.PaimonDataStreamScanProvider; +import org.apache.paimon.flink.PaimonDataStreamSinkProvider; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.VarCharType; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.streaming.api.lineage.DatasetConfigFacet; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; +import org.apache.flink.streaming.api.lineage.SourceLineageVertex; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link LineageUtils}. */ +class LineageUtilsTest { + + @TempDir java.nio.file.Path temp; + + private Path tablePath; + + @BeforeEach + void setUp() { + tablePath = new Path(temp.toUri().toString()); + } + + private FileStoreTable createTable( + Map options, + java.util.List partitionKeys, + java.util.List primaryKeys) + throws Exception { + new SchemaManager(LocalFileIO.create(), tablePath) + .createTable( + new Schema( + RowType.of(new IntType(), new VarCharType(100), new IntType()) + .getFields(), + partitionKeys, + primaryKeys, + options, + "")); + return FileStoreTableFactory.create(LocalFileIO.create(), tablePath); + } + + @Test + void testGetNamespace() throws Exception { + FileStoreTable table = + createTable(new HashMap<>(), Collections.emptyList(), Arrays.asList("f0")); + + String namespace = LineageUtils.getNamespace(table); + + assertThat(namespace).startsWith("paimon://"); + assertThat(namespace).contains(tablePath.toString()); + } + + @Test + void testSourceLineageVertexBounded() throws Exception { + FileStoreTable table = + createTable(new HashMap<>(), Collections.emptyList(), Arrays.asList("f0")); + + SourceLineageVertex vertex = LineageUtils.sourceLineageVertex("paimon.db.src", true, table); + + assertThat(vertex).isInstanceOf(PaimonSourceLineageVertex.class); + assertThat(vertex.boundedness()).isEqualTo(Boundedness.BOUNDED); + assertThat(vertex.datasets()).hasSize(1); + + LineageDataset dataset = vertex.datasets().get(0); + assertThat(dataset.name()).isEqualTo("paimon.db.src"); + assertThat(dataset.namespace()).startsWith("paimon://"); + } + + @Test + void testSourceLineageVertexUnbounded() throws Exception { + FileStoreTable table = + createTable(new HashMap<>(), Collections.emptyList(), Arrays.asList("f0")); + + SourceLineageVertex vertex = + LineageUtils.sourceLineageVertex("paimon.db.src", false, table); + + assertThat(vertex.boundedness()).isEqualTo(Boundedness.CONTINUOUS_UNBOUNDED); + } + + @Test + void testSinkLineageVertex() throws Exception { + FileStoreTable table = + createTable(new HashMap<>(), Collections.emptyList(), Arrays.asList("f0")); + + LineageVertex vertex = LineageUtils.sinkLineageVertex("paimon.db.sink", table); + + assertThat(vertex).isInstanceOf(PaimonSinkLineageVertex.class); + assertThat(vertex.datasets()).hasSize(1); + + LineageDataset dataset = vertex.datasets().get(0); + assertThat(dataset.name()).isEqualTo("paimon.db.sink"); + assertThat(dataset.namespace()).startsWith("paimon://"); + } + + @Test + void testConfigFacetContainsPartitionAndPrimaryKeys() throws Exception { + FileStoreTable table = + createTable(new HashMap<>(), Arrays.asList("f2"), Arrays.asList("f0", "f2")); + + LineageVertex vertex = LineageUtils.sinkLineageVertex("paimon.db.t", table); + LineageDataset dataset = vertex.datasets().get(0); + + Map facets = dataset.facets(); + assertThat(facets).containsKey("config"); + + DatasetConfigFacet configFacet = (DatasetConfigFacet) facets.get("config"); + Map config = configFacet.config(); + assertThat(config).containsEntry("partition-keys", "f2"); + assertThat(config).containsEntry("primary-keys", "f0,f2"); + } + + @Test + void testConfigFacetIncludesPaimonOptions() throws Exception { + Map options = new HashMap<>(); + options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "lookup"); + + FileStoreTable table = createTable(options, Collections.emptyList(), Arrays.asList("f0")); + + SourceLineageVertex vertex = LineageUtils.sourceLineageVertex("paimon.db.t", true, table); + LineageDataset dataset = vertex.datasets().get(0); + + DatasetConfigFacet configFacet = (DatasetConfigFacet) dataset.facets().get("config"); + Map config = configFacet.config(); + assertThat(config).containsEntry(CoreOptions.CHANGELOG_PRODUCER.key(), "lookup"); + } + + @Test + void testConfigFacetWithEmptyKeys() throws Exception { + FileStoreTable table = + createTable(new HashMap<>(), Collections.emptyList(), Collections.emptyList()); + + LineageVertex vertex = LineageUtils.sinkLineageVertex("paimon.db.t", table); + LineageDataset dataset = vertex.datasets().get(0); + + DatasetConfigFacet configFacet = (DatasetConfigFacet) dataset.facets().get("config"); + Map config = configFacet.config(); + assertThat(config).containsEntry("partition-keys", ""); + assertThat(config).containsEntry("primary-keys", ""); + } + + @Test + void testScanProviderImplementsLineageVertexProvider() throws Exception { + FileStoreTable table = + createTable(new HashMap<>(), Collections.emptyList(), Arrays.asList("f0")); + + PaimonDataStreamScanProvider provider = + new PaimonDataStreamScanProvider(true, env -> null, "paimon.db.src", table); + + assertThat(provider).isInstanceOf(LineageVertexProvider.class); + LineageVertex vertex = provider.getLineageVertex(); + assertThat(vertex).isInstanceOf(SourceLineageVertex.class); + assertThat(vertex.datasets()).hasSize(1); + assertThat(vertex.datasets().get(0).name()).isEqualTo("paimon.db.src"); + } + + @Test + void testSinkProviderImplementsLineageVertexProvider() throws Exception { + FileStoreTable table = + createTable(new HashMap<>(), Collections.emptyList(), Arrays.asList("f0")); + + PaimonDataStreamSinkProvider provider = + new PaimonDataStreamSinkProvider(dataStream -> null, "paimon.db.sink", table); + + assertThat(provider).isInstanceOf(LineageVertexProvider.class); + LineageVertex vertex = provider.getLineageVertex(); + assertThat(vertex.datasets()).hasSize(1); + assertThat(vertex.datasets().get(0).name()).isEqualTo("paimon.db.sink"); + } +} diff --git a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/DatasetConfigFacet.java b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/DatasetConfigFacet.java new file mode 100644 index 000000000000..cbd25f6c59b5 --- /dev/null +++ b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/DatasetConfigFacet.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.Map; + +/** Builtin config facet for dataset. Stub for Flink 1.x compatibility. */ +@PublicEvolving +public interface DatasetConfigFacet extends LineageDatasetFacet { + Map config(); +} diff --git a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageDataset.java b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageDataset.java new file mode 100644 index 000000000000..4d49b5ca7775 --- /dev/null +++ b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageDataset.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.Map; + +/** Lineage dataset represents the source or sink in the job. Stub for Flink 1.x compatibility. */ +@PublicEvolving +public interface LineageDataset { + String name(); + + String namespace(); + + Map facets(); +} diff --git a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageDatasetFacet.java b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageDatasetFacet.java new file mode 100644 index 000000000000..55bf57887a94 --- /dev/null +++ b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageDatasetFacet.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.annotation.PublicEvolving; + +/** Facet interface for dataset. Stub for Flink 1.x compatibility. */ +@PublicEvolving +public interface LineageDatasetFacet { + String name(); +} diff --git a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageVertex.java b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageVertex.java new file mode 100644 index 000000000000..43c4f991e1a8 --- /dev/null +++ b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageVertex.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.List; + +/** Lineage vertex represents the connectors in lineage graph. Stub for Flink 1.x compatibility. */ +@PublicEvolving +public interface LineageVertex { + List datasets(); +} diff --git a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageVertexProvider.java b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageVertexProvider.java new file mode 100644 index 000000000000..959e2e96255a --- /dev/null +++ b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageVertexProvider.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Create lineage vertex for source and sink in DataStream. Stub for Flink 1.x compatibility. + * + *

On Flink 1.x the runtime never calls {@link #getLineageVertex()}, so the implementation is + * effectively a no-op. + */ +@PublicEvolving +public interface LineageVertexProvider { + LineageVertex getLineageVertex(); +} diff --git a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/SourceLineageVertex.java b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/SourceLineageVertex.java new file mode 100644 index 000000000000..ab36aaee4998 --- /dev/null +++ b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/SourceLineageVertex.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.source.Boundedness; + +/** Lineage vertex for source which has {@link Boundedness}. Stub for Flink 1.x compatibility. */ +@PublicEvolving +public interface SourceLineageVertex extends LineageVertex { + Boundedness boundedness(); +}