Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,39 @@

package org.apache.paimon.flink;

import org.apache.paimon.flink.lineage.LineageUtils;
import org.apache.paimon.table.Table;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.data.RowData;

import java.util.function.Function;

/** Paimon {@link DataStreamScanProvider}. */
public class PaimonDataStreamScanProvider implements DataStreamScanProvider {
/**
* Paimon {@link DataStreamScanProvider} that also implements {@link LineageVertexProvider} so
* Flink's lineage graph discovers the Paimon source table.
*/
public class PaimonDataStreamScanProvider implements DataStreamScanProvider, LineageVertexProvider {

private final boolean isBounded;
private final Function<StreamExecutionEnvironment, DataStream<RowData>> producer;
private final String name;
private final Table table;

public PaimonDataStreamScanProvider(
boolean isBounded, Function<StreamExecutionEnvironment, DataStream<RowData>> producer) {
boolean isBounded,
Function<StreamExecutionEnvironment, DataStream<RowData>> producer,
String name,
Table table) {
this.isBounded = isBounded;
this.producer = producer;
this.name = name;
this.table = table;
}

@Override
Expand All @@ -48,4 +63,9 @@ public DataStream<RowData> produceDataStream(
public boolean isBounded() {
return isBounded;
}

@Override
public LineageVertex getLineageVertex() {
return LineageUtils.sourceLineageVertex(name, isBounded, table);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,44 @@

package org.apache.paimon.flink;

import org.apache.paimon.flink.lineage.LineageUtils;
import org.apache.paimon.table.Table;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.data.RowData;

import java.util.function.Function;

/** Paimon {@link DataStreamSinkProvider}. */
public class PaimonDataStreamSinkProvider implements DataStreamSinkProvider {
/**
* Paimon {@link DataStreamSinkProvider} that also implements {@link LineageVertexProvider} so
* Flink's lineage graph discovers the Paimon sink table.
*/
public class PaimonDataStreamSinkProvider implements DataStreamSinkProvider, LineageVertexProvider {

private final Function<DataStream<RowData>, DataStreamSink<?>> producer;
private final String name;
private final Table table;

public PaimonDataStreamSinkProvider(Function<DataStream<RowData>, DataStreamSink<?>> producer) {
public PaimonDataStreamSinkProvider(
Function<DataStream<RowData>, DataStreamSink<?>> producer, String name, Table table) {
this.producer = producer;
this.name = name;
this.table = table;
}

@Override
public DataStreamSink<?> consumeDataStream(
ProviderContext providerContext, DataStream<RowData> dataStream) {
return producer.apply(dataStream);
}

@Override
public LineageVertex getLineageVertex() {
return LineageUtils.sinkLineageVertex(name, table);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.lineage;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.table.Table;

import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.SourceLineageVertex;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Lineage utilities for building {@link SourceLineageVertex} and {@link LineageVertex} from a
* Paimon table name and its physical warehouse path (namespace).
*/
public class LineageUtils {

private static final String PAIMON_DATASET_PREFIX = "paimon://";

private static final Set<String> PAIMON_OPTION_KEYS =
CoreOptions.getOptions().stream().map(opt -> opt.key()).collect(Collectors.toSet());

/**
* Builds the config map for a dataset facet from a {@link Table}. Includes filtered Paimon
* {@link CoreOptions}, partition keys, primary keys, and the table comment (if present).
*/
private static Map<String, String> buildConfigMap(Table table) {
Map<String, String> config = new HashMap<>();
config.put("partition-keys", String.join(",", table.partitionKeys()));
config.put("primary-keys", String.join(",", table.primaryKeys()));

table.options().entrySet().stream()
.filter(e -> PAIMON_OPTION_KEYS.contains(e.getKey()))
.forEach(e -> config.put(e.getKey(), e.getValue()));

return config;
}

/**
* Returns the lineage namespace for a Paimon table. The namespace uses the {@code paimon://}
* scheme followed by the table's physical warehouse path, e.g. {@code
* "paimon://s3://my-bucket/warehouse/mydb.db/mytable"}.
*/
public static String getNamespace(Table table) {
return PAIMON_DATASET_PREFIX + CoreOptions.path(table.options());
}

/**
* Creates a {@link SourceLineageVertex} for a Paimon source table.
*
* @param name fully qualified table name, e.g. {@code "paimon.mydb.mytable"}
* @param isBounded whether the source is bounded (batch) or unbounded (streaming)
* @param table the Paimon table (namespace is derived from its {@code path} option)
*/
public static SourceLineageVertex sourceLineageVertex(
String name, boolean isBounded, Table table) {
LineageDataset dataset =
new PaimonLineageDataset(name, getNamespace(table), buildConfigMap(table));
Boundedness boundedness =
isBounded ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED;
return new PaimonSourceLineageVertex(boundedness, Collections.singletonList(dataset));
}

/**
* Creates a {@link LineageVertex} for a Paimon sink table.
*
* @param name fully qualified table name, e.g. {@code "paimon.mydb.mytable"}
* @param table the Paimon table (namespace is derived from its {@code path} option)
*/
public static LineageVertex sinkLineageVertex(String name, Table table) {
LineageDataset dataset =
new PaimonLineageDataset(name, getNamespace(table), buildConfigMap(table));
return new PaimonSinkLineageVertex(Collections.singletonList(dataset));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.lineage;

import org.apache.flink.streaming.api.lineage.DatasetConfigFacet;
import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;

import java.util.HashMap;
import java.util.Map;

/**
* A {@link LineageDataset} representing a Paimon table, identified by its fully qualified name and
* physical warehouse path as the namespace.
*/
public class PaimonLineageDataset implements LineageDataset {

private final String name;
private final String namespace;
private final Map<String, String> tableOptions;

public PaimonLineageDataset(String name, String namespace, Map<String, String> tableOptions) {
this.name = name;
this.namespace = namespace;
this.tableOptions = tableOptions;
}

@Override
public String name() {
return name;
}

@Override
public String namespace() {
return namespace;
}

@Override
public Map<String, LineageDatasetFacet> facets() {
Map<String, LineageDatasetFacet> facets = new HashMap<>();
facets.put(
"config",
new DatasetConfigFacet() {
@Override
public String name() {
return "config";
}

@Override
public Map<String, String> config() {
return tableOptions;
}
});
return facets;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.lineage;

import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.LineageVertex;

import java.util.List;

/** A {@link LineageVertex} representing a Paimon sink table. */
public class PaimonSinkLineageVertex implements LineageVertex {

private final List<LineageDataset> datasets;

public PaimonSinkLineageVertex(List<LineageDataset> datasets) {
this.datasets = datasets;
}

@Override
public List<LineageDataset> datasets() {
return datasets;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.lineage;

import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.SourceLineageVertex;

import java.util.List;

/**
* A {@link SourceLineageVertex} representing a Paimon source table. Carries the {@link Boundedness}
* to indicate whether the source is bounded (batch) or unbounded (streaming).
*/
public class PaimonSourceLineageVertex implements SourceLineageVertex {

private final Boundedness boundedness;
private final List<LineageDataset> datasets;

public PaimonSourceLineageVertex(Boundedness boundedness, List<LineageDataset> datasets) {
this.boundedness = boundedness;
this.datasets = datasets;
}

@Override
public Boundedness boundedness() {
return boundedness;
}

@Override
public List<LineageDataset> datasets() {
return datasets;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
return new PaimonDataStreamSinkProvider(
(dataStream) ->
new FlinkFormatTableDataStreamSink(table, overwrite, staticPartitions)
.sinkFrom(dataStream));
.sinkFrom(dataStream),
tableIdentifier.asSummaryString(),
table);
}

@Override
Expand Down
Loading
Loading