From 9ab5b98f7891f68156fe8a85d2e8db707b299ca6 Mon Sep 17 00:00:00 2001 From: fhan Date: Tue, 26 May 2026 20:35:05 +0800 Subject: [PATCH 01/11] [lake/hudi] Introduce Hudi LakeCatalog to create table --- .../fluss/flink/lake/LakeFlinkCatalog.java | 153 ++++++++--- .../fluss/flink/lake/LakeTableFactory.java | 68 +++-- fluss-lake/fluss-lake-hudi/pom.xml | 87 +++++++ .../hudi/FlussDataTypeToHudiDataType.java | 180 +++++++++++++ .../fluss/lake/hudi/HudiLakeCatalog.java | 148 +++++++++++ .../lake/hudi/utils/HudiConversions.java | 237 ++++++++++++++++++ .../utils/catalog/CatalogDatabaseImpl.java | 62 +++++ .../hudi/utils/catalog/HudiCatalogUtils.java | 104 ++++++++ .../fluss/lake/hudi/HudiLakeCatalogTest.java | 174 +++++++++++++ 9 files changed, 1159 insertions(+), 54 deletions(-) create mode 100644 fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/FlussDataTypeToHudiDataType.java create mode 100644 fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java create mode 100644 fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiConversions.java create mode 100644 fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/catalog/CatalogDatabaseImpl.java create mode 100644 fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/catalog/HudiCatalogUtils.java create mode 100644 fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/HudiLakeCatalogTest.java diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java index f42b0c4fb2..420a9a63e0 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java @@ -1,3 +1,4 @@ + /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -33,6 +34,7 @@ import java.util.HashMap; import java.util.Map; +import static org.apache.fluss.metadata.DataLakeFormat.HUDI; import static org.apache.fluss.metadata.DataLakeFormat.ICEBERG; import static org.apache.fluss.metadata.DataLakeFormat.PAIMON; import static org.apache.fluss.utils.Preconditions.checkNotNull; @@ -52,7 +54,7 @@ public LakeFlinkCatalog(String catalogName, ClassLoader classLoader) { } public Catalog getLakeCatalog( - Configuration tableOptions, Map lakeCatalogProperties) { + Configuration tableOptions, Map lakeCatalogProperties) { // TODO: Currently, a Fluss cluster only supports a single DataLake storage. // However, in the // future, it may support multiple DataLakes. The following code assumes @@ -64,31 +66,36 @@ public Catalog getLakeCatalog( synchronized (this) { if (catalog == null) { DataLakeFormat lakeFormat = - tableOptions.get(ConfigOptions.TABLE_DATALAKE_FORMAT); + tableOptions.get(ConfigOptions.TABLE_DATALAKE_FORMAT); if (lakeFormat == null) { throw new IllegalArgumentException( - "DataLake format is not specified in table options. " - + "Please ensure '" - + ConfigOptions.TABLE_DATALAKE_FORMAT.key() - + "' is set."); + "DataLake format is not specified in table options. " + + "Please ensure '" + + ConfigOptions.TABLE_DATALAKE_FORMAT.key() + + "' is set."); } Map catalogProperties = - new HashMap<>(DataLakeUtils.extractLakeCatalogProperties(tableOptions)); + new HashMap<>(DataLakeUtils.extractLakeCatalogProperties(tableOptions)); // properties in catalog are preferred catalogProperties.putAll( - PropertiesUtils.extractAndRemovePrefix( - lakeCatalogProperties, lakeFormat + ".")); + PropertiesUtils.extractAndRemovePrefix( + lakeCatalogProperties, lakeFormat + ".")); if (lakeFormat == PAIMON) { catalog = - PaimonCatalogFactory.create( - catalogName, catalogProperties, classLoader); + PaimonCatalogFactory.create( + catalogName, catalogProperties, classLoader); this.lakeFormat = PAIMON; } else if (lakeFormat == ICEBERG) { catalog = IcebergCatalogFactory.create(catalogName, catalogProperties); this.lakeFormat = ICEBERG; + } else if (lakeFormat == HUDI) { + catalog = + HudiCatalogFactory.create( + catalogName, catalogProperties, classLoader); + this.lakeFormat = HUDI; } else { throw new UnsupportedOperationException( - "Unsupported data lake format: " + lakeFormat); + "Unsupported data lake format: " + lakeFormat); } } } @@ -98,8 +105,8 @@ public Catalog getLakeCatalog( public DataLakeFormat getLakeFormat() { checkNotNull( - lakeFormat, - "DataLake format is null, must call getLakeCatalog first to initialize lake format."); + lakeFormat, + "DataLake format is null, must call getLakeCatalog first to initialize lake format."); return lakeFormat; } @@ -121,14 +128,14 @@ public static class PaimonCatalogFactory { private PaimonCatalogFactory() {} public static Catalog create( - String catalogName, - Map catalogProperties, - ClassLoader classLoader) { + String catalogName, + Map catalogProperties, + ClassLoader classLoader) { return FlinkCatalogFactory.createCatalog( - catalogName, - CatalogContext.create( - Options.fromMap(catalogProperties), null, new FlinkFileIOLoader()), - classLoader); + catalogName, + CatalogContext.create( + Options.fromMap(catalogProperties), null, new FlinkFileIOLoader()), + classLoader); } } @@ -150,19 +157,109 @@ public static Catalog create(String catalogName, Map catalogProp } try { Class flinkCatalogFactoryClass = - Class.forName("org.apache.iceberg.flink.FlinkCatalogFactory"); + Class.forName("org.apache.iceberg.flink.FlinkCatalogFactory"); Object factoryInstance = - flinkCatalogFactoryClass.getDeclaredConstructor().newInstance(); + flinkCatalogFactoryClass.getDeclaredConstructor().newInstance(); Method createCatalogMethod = - flinkCatalogFactoryClass.getMethod( - "createCatalog", String.class, Map.class); + flinkCatalogFactoryClass.getMethod( + "createCatalog", String.class, Map.class); return (Catalog) - createCatalogMethod.invoke(factoryInstance, catalogName, catalogProperties); + createCatalogMethod.invoke(factoryInstance, catalogName, catalogProperties); + } catch (Exception e) { + throw new RuntimeException( + "Failed to create Iceberg catalog using reflection. Please make sure iceberg-flink-runtime is on the classpath.", + e); + } + } + } + + /** + * Factory using reflection to create Hudi Catalog instances. + * + *

Hudi is intentionally NOT a compile-time dependency of fluss-flink-common to avoid + * dragging the shaded {@code hudi-flink-bundle} (which re-bundles hadoop / parquet / + * avro / jackson / guava) into every downstream consumer. The bundle is shipped as a plugin + * under {@code plugins/hudi/} and loaded via the plugin classloader at runtime, mirroring the + * Iceberg integration pattern. + * + *

Unlike Iceberg's {@code FlinkCatalogFactory}, which keeps a convenience overload {@code + * createCatalog(String, Map)}, Hudi's {@code HoodieCatalogFactory} only exposes the standard + * Flink SPI signature {@code createCatalog(CatalogFactory.Context)}. Since Fluss is not + * invoking the factory through Flink's SQL {@code CREATE CATALOG} path, no {@code Context} is + * provided to us — we have to build one ourselves. We do that by reflectively instantiating + * Flink's {@code FactoryUtil$DefaultCatalogContext}, the same internal implementation Flink + * itself uses in {@code FactoryUtil#createCatalog(...)} when bridging the legacy and new SPI + * stacks. + */ + public static class HudiCatalogFactory { + + private static final String HOODIE_CATALOG_FACTORY_CLASS = + "org.apache.hudi.table.catalog.HoodieCatalogFactory"; + private static final String FLINK_CATALOG_FACTORY_CONTEXT_CLASS = + "org.apache.flink.table.factories.CatalogFactory$Context"; + private static final String FLINK_DEFAULT_CATALOG_CONTEXT_CLASS = + "org.apache.flink.table.factories.FactoryUtil$DefaultCatalogContext"; + + private HudiCatalogFactory() {} + + public static Catalog create( + String catalogName, + Map catalogProperties, + ClassLoader classLoader) { + try { + // 1) Build Hudi's catalog factory instance via reflection. + Class hoodieCatalogFactoryClass = + Class.forName(HOODIE_CATALOG_FACTORY_CLASS, true, classLoader); + Object factoryInstance = + hoodieCatalogFactoryClass.getDeclaredConstructor().newInstance(); + + // 2) Build a CatalogFactory.Context via Flink's internal default impl. + // Constructor: DefaultCatalogContext(String name, + // Map options, + // ReadableConfig configuration, + // ClassLoader classLoader) + // We have no real Flink ReadableConfig in this code path, so we pass an + // empty Flink Configuration (which implements ReadableConfig) as a benign + // placeholder — Hudi's factory only consumes 'options' / 'name' / classloader. + Class defaultCatalogContextClass = + Class.forName(FLINK_DEFAULT_CATALOG_CONTEXT_CLASS, true, classLoader); + Class readableConfigClass = + Class.forName( + "org.apache.flink.configuration.ReadableConfig", true, classLoader); + Class flinkConfigurationClass = + Class.forName( + "org.apache.flink.configuration.Configuration", true, classLoader); + Object emptyFlinkConfiguration = + flinkConfigurationClass.getDeclaredConstructor().newInstance(); + + Object context = + defaultCatalogContextClass + .getDeclaredConstructor( + String.class, + Map.class, + readableConfigClass, + ClassLoader.class) + .newInstance( + catalogName, + catalogProperties, + emptyFlinkConfiguration, + classLoader); + + // 3) Invoke HoodieCatalogFactory#createCatalog(Context). + Class contextInterface = + Class.forName(FLINK_CATALOG_FACTORY_CONTEXT_CLASS, true, classLoader); + Method createCatalogMethod = + hoodieCatalogFactoryClass.getMethod("createCatalog", contextInterface); + return (Catalog) createCatalogMethod.invoke(factoryInstance, context); } catch (Exception e) { throw new RuntimeException( - "Failed to create Iceberg catalog using reflection. Please make sure iceberg-flink-runtime is on the classpath.", - e); + "Failed to create Hudi catalog using reflection. Please make sure " + + "hudi-flink-bundle (matching the current Flink version, " + + "Hudi 1.0+) is on the classpath, typically under " + + "plugins/hudi/, and that the catalog options include a valid " + + "'mode' (supported: 'hms' or 'dfs').", + e); } } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java index ad9918f389..6fb0671cd9 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java @@ -36,24 +36,24 @@ public LakeTableFactory(LakeFlinkCatalog lakeFlinkCatalog) { } public DynamicTableSource createDynamicTableSource( - DynamicTableFactory.Context context, String tableName) { + DynamicTableFactory.Context context, String tableName) { ObjectIdentifier originIdentifier = context.getObjectIdentifier(); ObjectIdentifier lakeIdentifier = - ObjectIdentifier.of( - originIdentifier.getCatalogName(), - originIdentifier.getDatabaseName(), - tableName); + ObjectIdentifier.of( + originIdentifier.getCatalogName(), + originIdentifier.getDatabaseName(), + tableName); - // For Iceberg and Paimon, pass the table name as-is to their factory. + // For Iceberg, Hudi and Paimon, pass the table name as-is to their factory. // Metadata tables will be handled internally by their respective factories. DynamicTableFactory.Context newContext = - new FactoryUtil.DefaultDynamicTableContext( - lakeIdentifier, - context.getCatalogTable(), - context.getEnrichmentOptions(), - context.getConfiguration(), - context.getClassLoader(), - context.isTemporary()); + new FactoryUtil.DefaultDynamicTableContext( + lakeIdentifier, + context.getCatalogTable(), + context.getEnrichmentOptions(), + context.getConfiguration(), + context.getClassLoader(), + context.isTemporary()); // Get the appropriate factory based on connector type DynamicTableSourceFactory factory = getLakeTableFactory(); @@ -66,11 +66,13 @@ private DynamicTableSourceFactory getLakeTableFactory() { return getPaimonFactory(); case ICEBERG: return getIcebergFactory(); + case HUDI: + return getHudiFactory(); default: throw new UnsupportedOperationException( - "Unsupported lake connector: " - + lakeFlinkCatalog.getLakeFormat() - + ". Only 'paimon' and 'iceberg' are supported."); + "Unsupported lake connector: " + + lakeFlinkCatalog.getLakeFormat() + + ". Only 'paimon', 'iceberg' and 'hudi' are supported."); } } @@ -82,23 +84,37 @@ private DynamicTableSourceFactory getIcebergFactory() { try { // Get catalog with explicit ICEBERG format org.apache.flink.table.catalog.Catalog catalog = - lakeFlinkCatalog.getLakeCatalog( - // we can pass empty configuration to get catalog - // since the catalog should already be initialized - new Configuration(), Collections.emptyMap()); + lakeFlinkCatalog.getLakeCatalog( + // we can pass empty configuration to get catalog + // since the catalog should already be initialized + new Configuration(), Collections.emptyMap()); // Create FlinkDynamicTableFactory with the catalog Class icebergFactoryClass = - Class.forName("org.apache.iceberg.flink.FlinkDynamicTableFactory"); + Class.forName("org.apache.iceberg.flink.FlinkDynamicTableFactory"); Class flinkCatalogClass = Class.forName("org.apache.iceberg.flink.FlinkCatalog"); return (DynamicTableSourceFactory) - icebergFactoryClass - .getDeclaredConstructor(flinkCatalogClass) - .newInstance(catalog); + icebergFactoryClass + .getDeclaredConstructor(flinkCatalogClass) + .newInstance(catalog); } catch (Exception e) { throw new RuntimeException( - "Failed to create Iceberg table factory. Please ensure iceberg-flink-runtime is on the classpath.", - e); + "Failed to create Iceberg table factory. Please ensure iceberg-flink-runtime is on the classpath.", + e); + } + } + + private DynamicTableSourceFactory getHudiFactory() { + try { + Class hudiFactoryClass = Class.forName("org.apache.hudi.table.HoodieTableFactory"); + return (DynamicTableSourceFactory) + hudiFactoryClass.getDeclaredConstructor().newInstance(); + } catch (Exception e) { + throw new RuntimeException( + "Failed to create Hudi table factory. Please ensure hudi-flink-bundle " + + "(matching the current Flink version) is on the classpath, " + + "typically under plugins/hudi/.", + e); } } } diff --git a/fluss-lake/fluss-lake-hudi/pom.xml b/fluss-lake/fluss-lake-hudi/pom.xml index d4c5070e10..5a9ea7f47b 100644 --- a/fluss-lake/fluss-lake-hudi/pom.xml +++ b/fluss-lake/fluss-lake-hudi/pom.xml @@ -69,12 +69,99 @@ + + org.apache.hadoop + hadoop-common + ${fluss.hadoop.version} + provided + + + avro + org.apache.avro + + + log4j + log4j + + + slf4j-log4j12 + org.slf4j + + + ch.qos.reload4j + reload4j + + + org.slf4j + slf4j-reload4j + + + jdk.tools + jdk.tools + + + protobuf-java + com.google.protobuf + + + commons-io + commons-io + + + org.apache.commons + commons-lang3 + + + org.apache.curator + curator-client + + + org.apache.curator + curator-framework + + + org.apache.curator + curator-recipes + + + org.apache.zookeeper + zookeeper + + + + org.apache.fluss fluss-common ${project.version} provided + + + org.apache.flink + flink-table-common + provided + + + org.apache.commons + commons-lang3 + + + + + + + org.apache.fluss + fluss-common + ${project.version} + test-jar + test + + + + org.apache.fluss + fluss-test-utils + \ No newline at end of file diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/FlussDataTypeToHudiDataType.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/FlussDataTypeToHudiDataType.java new file mode 100644 index 0000000000..9bb2a69764 --- /dev/null +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/FlussDataTypeToHudiDataType.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.hudi; + +import org.apache.fluss.types.ArrayType; +import org.apache.fluss.types.BigIntType; +import org.apache.fluss.types.BinaryType; +import org.apache.fluss.types.BooleanType; +import org.apache.fluss.types.BytesType; +import org.apache.fluss.types.CharType; +import org.apache.fluss.types.DataTypeVisitor; +import org.apache.fluss.types.DateType; +import org.apache.fluss.types.DecimalType; +import org.apache.fluss.types.DoubleType; +import org.apache.fluss.types.FloatType; +import org.apache.fluss.types.IntType; +import org.apache.fluss.types.LocalZonedTimestampType; +import org.apache.fluss.types.MapType; +import org.apache.fluss.types.RowType; +import org.apache.fluss.types.SmallIntType; +import org.apache.fluss.types.StringType; +import org.apache.fluss.types.TimeType; +import org.apache.fluss.types.TimestampType; +import org.apache.fluss.types.TinyIntType; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; + +import static org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils.FILE_SYSTEM_TYPE; +import static org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils.HIVA_META_STAORE_TYPE; + +/** Convert from Fluss's data type to Hudi's internal data type. */ +public class FlussDataTypeToHudiDataType implements DataTypeVisitor { + + private String catalogMode; + + public FlussDataTypeToHudiDataType(String catalogMode) { + this.catalogMode = catalogMode; + } + + public static final FlussDataTypeToHudiDataType DFS_INSTANCE = + new FlussDataTypeToHudiDataType(FILE_SYSTEM_TYPE); + public static final FlussDataTypeToHudiDataType HMS_INSTANCE = + new FlussDataTypeToHudiDataType(HIVA_META_STAORE_TYPE); + + @Override + public DataType visit(CharType charType) { + return withNullability(DataTypes.STRING(), charType.isNullable()); + } + + @Override + public DataType visit(StringType stringType) { + return withNullability(DataTypes.STRING(), stringType.isNullable()); + } + + @Override + public DataType visit(BooleanType booleanType) { + return withNullability(DataTypes.BOOLEAN(), booleanType.isNullable()); + } + + @Override + public DataType visit(BinaryType binaryType) { + return withNullability(DataTypes.BINARY(binaryType.getLength()), binaryType.isNullable()); + } + + @Override + public DataType visit(BytesType bytesType) { + return withNullability(DataTypes.BYTES(), bytesType.isNullable()); + } + + @Override + public DataType visit(DecimalType decimalType) { + return withNullability( + DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale()), + decimalType.isNullable()); + } + + @Override + public DataType visit(TinyIntType tinyIntType) { + // Hudi IntType covers 8/16/32-bit integers (consistent with internal conversion logic) + return withNullability(DataTypes.TINYINT(), tinyIntType.isNullable()); + } + + @Override + public DataType visit(SmallIntType smallIntType) { + return withNullability(DataTypes.SMALLINT(), smallIntType.isNullable()); + } + + @Override + public DataType visit(IntType intType) { + return withNullability(DataTypes.INT(), intType.isNullable()); + } + + @Override + public DataType visit(BigIntType bigIntType) { + return withNullability(DataTypes.BIGINT(), bigIntType.isNullable()); + } + + @Override + public DataType visit(FloatType floatType) { + return withNullability(DataTypes.FLOAT(), floatType.isNullable()); + } + + @Override + public DataType visit(DoubleType doubleType) { + return withNullability(DataTypes.DOUBLE(), doubleType.isNullable()); + } + + @Override + public DataType visit(DateType dateType) { + return withNullability(DataTypes.DATE(), dateType.isNullable()); + } + + @Override + public DataType visit(TimeType timeType) { + return withNullability(DataTypes.TIME(timeType.getPrecision()), timeType.isNullable()); + } + + @Override + public DataType visit(TimestampType timestampType) { + return withNullability( + DataTypes.TIMESTAMP(timestampType.getPrecision()), timestampType.isNullable()); + } + + @Override + public DataType visit(LocalZonedTimestampType localZonedTimestampType) { + return HIVA_META_STAORE_TYPE.equals(catalogMode) + ? withNullability(DataTypes.BIGINT(), localZonedTimestampType.isNullable()) + : withNullability( + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(), + localZonedTimestampType.isNullable()); + } + + @Override + public DataType visit(ArrayType arrayType) { + DataType elementType = arrayType.getElementType().accept(this); + return withNullability(DataTypes.ARRAY(elementType), arrayType.isNullable()); + } + + @Override + public DataType visit(MapType mapType) { + DataType keyType = mapType.getKeyType().accept(this); + DataType valueType = mapType.getValueType().accept(this); + return withNullability(DataTypes.MAP(keyType, valueType), mapType.isNullable()); + } + + @Override + public DataType visit(RowType rowType) { + DataTypes.Field[] fields = new DataTypes.Field[rowType.getFieldCount()]; + + for (int i = 0; i < rowType.getFieldCount(); i++) { + String fieldName = rowType.getFieldNames().get(i); + DataType fieldType = rowType.getTypeAt(i).accept(this); + fields[i] = DataTypes.FIELD(fieldName, fieldType); + } + return withNullability(DataTypes.ROW(fields), rowType.isNullable()); + } + + private DataType withNullability(DataType hudi, boolean nullable) { + if (hudi.getLogicalType().isNullable() != nullable) { + return nullable ? hudi.nullable() : hudi.notNull(); + } + return hudi; + } +} diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java new file mode 100644 index 0000000000..84d088df8b --- /dev/null +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.hudi; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.lake.hudi.utils.HudiConversions; +import org.apache.fluss.lake.hudi.utils.catalog.CatalogDatabaseImpl; +import org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils; +import org.apache.fluss.lake.lakestorage.LakeCatalog; +import org.apache.fluss.metadata.TableChange; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.utils.IOUtils; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.types.DataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; + +import static org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils.HIVA_META_STAORE_TYPE; +import static org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils.HUDI_CATALOG_DEFAULT_NAME; +import static org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils.MODE_CONFIG; +import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME; +import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME; +import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME; + +/** Implementation of {@link LakeCatalog} for Hudi. */ +public class HudiLakeCatalog implements LakeCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(HudiLakeCatalog.class); + + public static final LinkedHashMap SYSTEM_COLUMNS = new LinkedHashMap<>(); + + static { + SYSTEM_COLUMNS.put(BUCKET_COLUMN_NAME, DataTypes.INT()); + SYSTEM_COLUMNS.put(OFFSET_COLUMN_NAME, DataTypes.BIGINT()); + SYSTEM_COLUMNS.put(TIMESTAMP_COLUMN_NAME, DataTypes.TIMESTAMP(6)); + } + + private final Catalog hudiCatalog; + private final String catalagMode; + + public HudiLakeCatalog(Configuration configuration) { + this.catalagMode = configuration.toMap().getOrDefault(MODE_CONFIG, HIVA_META_STAORE_TYPE); + this.hudiCatalog = HudiCatalogUtils.createHudiCatalog(configuration); + this.hudiCatalog.open(); + } + + @VisibleForTesting + protected Catalog getHudiCatalog() { + return hudiCatalog; + } + + @Override + public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Context context) + throws org.apache.fluss.exception.TableAlreadyExistException { + LOG.info("create the lake table for : {} with props: {}", tablePath, tableDescriptor); + + ObjectPath objectPath = HudiConversions.toHudiObjectPath(tablePath); + + boolean isPkTable = tableDescriptor.getSchema().getPrimaryKeyIndexes().length > 0; + + // Create Hudi catalog table + CatalogTable catalogTable = + HudiConversions.createHudiCatalogTable(tableDescriptor, isPkTable, catalagMode); + + // Create table in Hudi catalog + try { + createTable(objectPath, catalogTable); + } catch (DatabaseNotExistException e) { + createDatabase(tablePath.getDatabaseName()); + try { + createTable(objectPath, catalogTable); + } catch (DatabaseNotExistException t) { + // shouldn't happen in normal cases + throw new RuntimeException( + String.format( + "Fail to create table %s in Hudi, because " + + "Database %s still doesn't exist although create database " + + "successfully, please try again.", + tablePath, tablePath.getDatabaseName())); + } + } + } + + @Override + public void alterTable(TablePath tablePath, List tableChanges, Context context) + throws org.apache.fluss.exception.TableNotExistException { + throw new UnsupportedOperationException( + "Alter table is not supported for Hudi at the moment"); + } + + private void createTable(ObjectPath tablePath, CatalogBaseTable catalogTable) + throws DatabaseNotExistException { + try { + hudiCatalog.createTable(tablePath, catalogTable, true); + LOG.info("Table {} created successfully.", tablePath); + } catch (TableAlreadyExistException e) { + throw new org.apache.fluss.exception.TableAlreadyExistException( + "Table " + tablePath + " already exists."); + } + } + + @Override + public void close() { + if (hudiCatalog != null && hudiCatalog instanceof AutoCloseable) { + IOUtils.closeQuietly((AutoCloseable) hudiCatalog, HUDI_CATALOG_DEFAULT_NAME); + } + } + + public void createDatabase(String databaseName) { + try { + CatalogDatabase database = new CatalogDatabaseImpl(new HashMap<>(), "Hudi database"); + // ignore if exists + hudiCatalog.createDatabase(databaseName, database, true); + } catch (DatabaseAlreadyExistException e) { + // do nothing, shouldn't throw since ignoreIfExists + } + } +} diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiConversions.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiConversions.java new file mode 100644 index 0000000000..6bab741755 --- /dev/null +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiConversions.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.hudi.utils; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.exception.InvalidConfigException; +import org.apache.fluss.exception.InvalidTableException; +import org.apache.fluss.lake.hudi.FlussDataTypeToHudiDataType; +import org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; + +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.types.DataType; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.index.HoodieIndex; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.fluss.lake.hudi.HudiLakeCatalog.SYSTEM_COLUMNS; +import static org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils.HIVA_META_STAORE_TYPE; + +/** Utils for conversion between Hudi and Fluss. */ +public class HudiConversions { + + private static final Logger LOG = LoggerFactory.getLogger(HudiConversions.class); + + // for fluss config + private static final String FLUSS_CONF_PREFIX = "fluss."; + // for hudi config + private static final String HUDI_CONF_PREFIX = "hudi."; + + private static final String DELIMITER = ","; + + /** Hudi config options set by Fluss should not be set by users. */ + @VisibleForTesting public static final Set HUDI_UNSETTABLE_OPTIONS = new HashSet<>(); + + /** + * Converts a Fluss TablePath to a Hudi ObjectPath. + * + * @param tablePath the Fluss table path + * @return the corresponding Hudi ObjectPath + */ + public static ObjectPath toHudiObjectPath(TablePath tablePath) { + return new ObjectPath(tablePath.getDatabaseName(), tablePath.getTableName()); + } + + public static ResolvedSchema convertToFlinkResolvedSchema( + TableDescriptor tableDescriptor, boolean isPkTable) { + // validate hudi options first + validateHudiOptions(tableDescriptor.getProperties()); + validateHudiOptions(tableDescriptor.getCustomProperties()); + + List columns = new ArrayList<>(); + + // Add regular columns + for (org.apache.fluss.metadata.Schema.Column column : + tableDescriptor.getSchema().getColumns()) { + String columnName = column.getName(); + if (SYSTEM_COLUMNS.containsKey(columnName)) { + throw new InvalidTableException( + "Column " + + columnName + + " conflicts with a system column name of hudi table, please rename the column."); + } + columns.add( + Column.physical( + columnName, + column.getDataType().accept(FlussDataTypeToHudiDataType.DFS_INSTANCE))); + } + + // add system metadata columns to schema + for (Map.Entry systemColumn : SYSTEM_COLUMNS.entrySet()) { + columns.add(Column.physical(systemColumn.getKey(), systemColumn.getValue())); + } + + UniqueConstraint constraint = null; + // Set primary key if this is a PK table + if (isPkTable && tableDescriptor.hasPrimaryKey()) { + List primaryKeys = new ArrayList<>(); + for (int pkIndex : tableDescriptor.getSchema().getPrimaryKeyIndexes()) { + primaryKeys.add(tableDescriptor.getSchema().getColumns().get(pkIndex).getName()); + } + constraint = UniqueConstraint.primaryKey("primaryKey", primaryKeys); + } + + return new ResolvedSchema(columns, Collections.emptyList(), constraint); + } + + /** + * Builds Hudi table properties from Fluss TableDescriptor. + * + * @param tableDescriptor the Fluss table descriptor + * @param isPkTable whether this is a primary key table + * @return map of Hudi table properties + */ + public static Map buildHudiTableProperties( + TableDescriptor tableDescriptor, boolean isPkTable) { + Map hudiProperties = new HashMap<>(); + // Set connector type + hudiProperties.put(FactoryUtil.CONNECTOR.key(), "hudi"); + hudiProperties.put("storageType", "hudi"); + + // Set table type based on whether it's a PK table + if (isPkTable) { + hudiProperties.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); + List primaryKeys = new ArrayList<>(); + for (int pkIndex : tableDescriptor.getSchema().getPrimaryKeyIndexes()) { + primaryKeys.add(tableDescriptor.getSchema().getColumns().get(pkIndex).getName()); + } + hudiProperties.put(FlinkOptions.RECORD_KEY_FIELD.key(), String.join(",", primaryKeys)); + } else { + hudiProperties.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.COPY_ON_WRITE.name()); + // set primary key for Fluss Log Table. + String recordKeyField = + tableDescriptor + .getCustomProperties() + .get(HUDI_CONF_PREFIX + FlinkOptions.RECORD_KEY_FIELD.key()); + if (recordKeyField == null || recordKeyField.isEmpty()) { + throw new IllegalArgumentException("Record key field should be set."); + } + hudiProperties.put(FlinkOptions.RECORD_KEY_FIELD.key(), recordKeyField); + hudiProperties.put( + FlinkOptions.INDEX_KEY_FIELD.key(), + recordKeyField); // use primary key as index key + } + + // buket keys column + hudiProperties.put(FlinkOptions.INDEX_TYPE.key(), HoodieIndex.IndexType.BUCKET.name()); + List bucketKeys = tableDescriptor.getBucketKeys(); + int numBuckets = + tableDescriptor + .getTableDistribution() + .flatMap(TableDescriptor.TableDistribution::getBucketCount) + .orElseThrow( + () -> new IllegalArgumentException("Bucket count should be set.")); + + if (!bucketKeys.isEmpty()) { + hudiProperties.put( + FlinkOptions.INDEX_KEY_FIELD.key(), String.join(DELIMITER, bucketKeys)); + } + hudiProperties.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), String.valueOf(numBuckets)); + + // partition keys column + List partitionKeys = tableDescriptor.getPartitionKeys(); + hudiProperties.put( + FlinkOptions.PARTITION_PATH_FIELD.key(), String.join(DELIMITER, partitionKeys)); + + // Convert Fluss properties to Hudi properties + tableDescriptor + .getProperties() + .forEach((k, v) -> setFlussPropertyToHudi(k, v, hudiProperties)); + tableDescriptor + .getCustomProperties() + .forEach((k, v) -> setFlussPropertyToHudi(k, v, hudiProperties)); + + return hudiProperties; + } + + /** + * Creates a CatalogTable for Hudi from Fluss TableDescriptor. + * + * @param tableDescriptor the Fluss table descriptor + * @param isPkTable whether this is a primary key table + * @return the created CatalogTable + */ + public static CatalogTable createHudiCatalogTable( + TableDescriptor tableDescriptor, boolean isPkTable, String catalogMode) { + ResolvedSchema resolvedSchema = convertToFlinkResolvedSchema(tableDescriptor, isPkTable); + Schema schema = Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(); + List partitionKeys = tableDescriptor.getPartitionKeys(); + Map options = buildHudiTableProperties(tableDescriptor, isPkTable); + LOG.info("Hudi table properties: {}", options); + + String comment = tableDescriptor.getComment().orElse("Hudi table created from Fluss"); + return HIVA_META_STAORE_TYPE.equals(catalogMode) + ? HudiCatalogUtils.createCatalogTable(schema, partitionKeys, options, comment) + : HudiCatalogUtils.createResolvedCatalogTable( + schema, partitionKeys, options, comment, resolvedSchema); + } + + private static void setFlussPropertyToHudi( + String key, String value, Map hudiProperties) { + if (key.startsWith(HUDI_CONF_PREFIX)) { + hudiProperties.put(key.substring(HUDI_CONF_PREFIX.length()), value); + } else { + hudiProperties.put(FLUSS_CONF_PREFIX + key, value); + } + } + + private static void validateHudiOptions(Map properties) { + properties.forEach( + (k, v) -> { + String hudiKey = k; + if (k.startsWith(HUDI_CONF_PREFIX)) { + hudiKey = k.substring(HUDI_CONF_PREFIX.length()); + } + if (HUDI_UNSETTABLE_OPTIONS.contains(hudiKey)) { + throw new InvalidConfigException( + String.format( + "The Hudi option %s will be set automatically by Fluss " + + "and should not be set manually.", + k)); + } + }); + } +} diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/catalog/CatalogDatabaseImpl.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/catalog/CatalogDatabaseImpl.java new file mode 100644 index 0000000000..b3a0d47f28 --- /dev/null +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/catalog/CatalogDatabaseImpl.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.hudi.utils.catalog; + +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** Implement of catalog database. */ +public class CatalogDatabaseImpl implements CatalogDatabase { + private final Map properties; + private final String comment; + + public CatalogDatabaseImpl(Map properties, @Nullable String comment) { + this.properties = Preconditions.checkNotNull(properties, "properties cannot be null"); + this.comment = comment; + } + + public Map getProperties() { + return this.properties; + } + + public String getComment() { + return this.comment; + } + + public CatalogDatabase copy() { + return this.copy(this.getProperties()); + } + + public CatalogDatabase copy(Map properties) { + return new CatalogDatabaseImpl(new HashMap<>(properties), this.comment); + } + + public Optional getDescription() { + return Optional.ofNullable(this.comment); + } + + public Optional getDetailedDescription() { + return Optional.ofNullable(this.comment); + } +} diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/catalog/HudiCatalogUtils.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/catalog/HudiCatalogUtils.java new file mode 100644 index 0000000000..948ceb3248 --- /dev/null +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/catalog/HudiCatalogUtils.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.hudi.utils.catalog; + +import org.apache.fluss.config.Configuration; + +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.hudi.exception.HoodieCatalogException; +import org.apache.hudi.table.catalog.CatalogOptions; +import org.apache.hudi.table.catalog.HoodieCatalog; +import org.apache.hudi.table.catalog.HoodieHiveCatalog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Locale; +import java.util.Map; + +/** Implementation of {@link HudiCatalogUtils} for Hudi. */ +public class HudiCatalogUtils { + + private static final Logger LOG = LoggerFactory.getLogger(HudiCatalogUtils.class); + + public static final String MODE_CONFIG = "mode"; + public static final String CATALOG_NAME_CONFIG = "name"; + + public static final String HUDI_CATALOG_DEFAULT_NAME = "fluss-hudi-catalog"; + public static final String HIVA_META_STAORE_TYPE = "hms"; + public static final String FILE_SYSTEM_TYPE = "dfs"; + + public static Catalog createHudiCatalog(Configuration configuration) { + Map hudiProps = configuration.toMap(); + configuration.setString(CatalogOptions.DEFAULT_DATABASE.key(), "tmp"); + configuration.setString(CatalogOptions.TABLE_EXTERNAL.key(), "true"); + String catalogName = hudiProps.getOrDefault(CATALOG_NAME_CONFIG, HUDI_CATALOG_DEFAULT_NAME); + return buildHudiCatalog( + catalogName, + hudiProps, + org.apache.flink.configuration.Configuration.fromMap(configuration.toMap())); + } + + public static Catalog buildHudiCatalog( + String catalogName, + Map hudiProps, + org.apache.flink.configuration.Configuration configuration) { + String catalogMode = + hudiProps.getOrDefault(CatalogOptions.MODE.key(), HIVA_META_STAORE_TYPE); + LOG.info( + "create hudi catalog: {}, mode: {}, configuration: {}", + catalogName, + catalogMode, + configuration); + switch (catalogMode.toLowerCase(Locale.ENGLISH)) { + case HIVA_META_STAORE_TYPE: + return new HoodieHiveCatalog(catalogName, configuration); + case FILE_SYSTEM_TYPE: + return new HoodieCatalog(catalogName, configuration); + default: + throw new HoodieCatalogException( + String.format( + "Invalid hudi-catalog mode: %s, supported modes: [hms, dfs].", + catalogMode)); + } + } + + public static CatalogTable createCatalogTable( + Schema schema, + List partitionKeys, + Map options, + @Nullable String comment) { + return CatalogTable.of(schema, comment, partitionKeys, options); + } + + public static ResolvedCatalogTable createResolvedCatalogTable( + Schema schema, + List partitionKeys, + Map options, + @Nullable String comment, + ResolvedSchema resolvedSchema) { + return new ResolvedCatalogTable( + CatalogTable.of(schema, comment, partitionKeys, options), resolvedSchema); + } +} diff --git a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/HudiLakeCatalogTest.java b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/HudiLakeCatalogTest.java new file mode 100644 index 0000000000..1500f22c26 --- /dev/null +++ b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/HudiLakeCatalogTest.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.hudi; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.lake.hudi.utils.HudiConversions; +import org.apache.fluss.lake.lakestorage.TestingLakeCatalogContext; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.types.DataTypes; + +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit test for {@link HudiLakeCatalog}. */ +class HudiLakeCatalogTest { + + @TempDir private File tempWarehouseDir; + + private HudiLakeCatalog flussHudiLakeCatalog; + + @BeforeEach + public void setUp() { + Configuration configuration = new Configuration(); + configuration.setString("catalog.path", tempWarehouseDir.toURI().toString()); + configuration.setString("mode", "dfs"); + this.flussHudiLakeCatalog = new HudiLakeCatalog(configuration); + } + + /** Verify property prefix rewriting. */ + @Test + void testPropertyPrefixRewriting() throws TableNotExistException { + String database = "test_db"; + String tableName = "test_table"; + + Schema flussSchema = + Schema.newBuilder().column("id", DataTypes.BIGINT()).primaryKey("id").build(); + + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(flussSchema) + .distributedBy(3) + .property("hudi.precombine.field", "id") + .property("table.datalake.freshness", "30s") + .build(); + + TablePath tablePath = TablePath.of(database, tableName); + TestingLakeCatalogContext context = new TestingLakeCatalogContext(); + flussHudiLakeCatalog.createTable(tablePath, tableDescriptor, context); + + CatalogBaseTable table = + flussHudiLakeCatalog + .getHudiCatalog() + .getTable(HudiConversions.toHudiObjectPath(tablePath)); + + // Verify property prefix rewriting + assertThat(table.getOptions()).containsEntry("precombine.field", "id"); + assertThat(table.getOptions()).containsEntry("fluss.table.datalake.freshness", "30s"); + assertThat(table.getOptions()) + .doesNotContainKeys("hudi.precombine.field", "table.datalake.freshness"); + } + + @Test + void testCreatePrimaryKeyTable() throws TableNotExistException { + String database = "test_db"; + String tableName = "pk_table"; + + Schema flussSchema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .withComment("pk_table") + .primaryKey("id") + .build(); + + TableDescriptor tableDescriptor = + TableDescriptor.builder().schema(flussSchema).distributedBy(4, "id").build(); + + TablePath tablePath = TablePath.of(database, tableName); + TestingLakeCatalogContext context = new TestingLakeCatalogContext(); + flussHudiLakeCatalog.createTable(tablePath, tableDescriptor, context); + + ObjectPath objectPath = HudiConversions.toHudiObjectPath(tablePath); + CatalogBaseTable table = flussHudiLakeCatalog.getHudiCatalog().getTable(objectPath); + + assertThat(table).isNotNull(); + + List primaryKeys = new ArrayList<>(); + primaryKeys.add("id"); + TableSchema expectHudiSchema = + TableSchema.builder() + .field("id", org.apache.flink.table.api.DataTypes.INT().notNull()) + .field("name", org.apache.flink.table.api.DataTypes.STRING()) + .field("__bucket", org.apache.flink.table.api.DataTypes.INT()) + .field("__offset", org.apache.flink.table.api.DataTypes.BIGINT()) + .field("__timestamp", org.apache.flink.table.api.DataTypes.TIMESTAMP(6)) + .primaryKey("primaryKey", primaryKeys.toArray(new String[0])) + .build(); + + assertThat(table.getUnresolvedSchema()).isEqualTo(expectHudiSchema.toSchema()); + } + + @Test + void testCreateLogTable() throws TableNotExistException { + String database = "test_db"; + String tableName = "log_table"; + + Schema flussSchema = + Schema.newBuilder() + .column("id", DataTypes.BIGINT()) + .column("name", DataTypes.STRING()) + .build(); + + Map customProperties = new HashMap<>(); + customProperties.put("hudi.hoodie.datasource.write.recordkey.field", "id"); + + TableDescriptor td = + TableDescriptor.builder() + .schema(flussSchema) + .distributedBy(3) // no bucket key + .customProperties(customProperties) + .build(); + + TablePath tablePath = TablePath.of(database, tableName); + TestingLakeCatalogContext context = new TestingLakeCatalogContext(); + flussHudiLakeCatalog.createTable(tablePath, td, context); + + ObjectPath objectPath = HudiConversions.toHudiObjectPath(tablePath); + CatalogBaseTable table = flussHudiLakeCatalog.getHudiCatalog().getTable(objectPath); + + List primaryKeys = new ArrayList<>(); + primaryKeys.add("id"); + TableSchema expectHudiSchema = + TableSchema.builder() + .field("id", org.apache.flink.table.api.DataTypes.BIGINT().notNull()) + .field("name", org.apache.flink.table.api.DataTypes.STRING()) + .field("__bucket", org.apache.flink.table.api.DataTypes.INT()) + .field("__offset", org.apache.flink.table.api.DataTypes.BIGINT()) + .field("__timestamp", org.apache.flink.table.api.DataTypes.TIMESTAMP(6)) + .primaryKey("PK_id", primaryKeys.toArray(new String[0])) + .build(); + + assertThat(table.getUnresolvedSchema()).isEqualTo(expectHudiSchema.toSchema()); + } +} From 0a21ea0d43bd84fb312beb4eccd6c9b8c38b243a Mon Sep 17 00:00:00 2001 From: fhan Date: Tue, 26 May 2026 21:52:35 +0800 Subject: [PATCH 02/11] [lake/hudi] fix HIVE_META_STORE_TYPE spelling error --- .../fluss/lake/hudi/FlussDataTypeToHudiDataType.java | 6 +++--- .../java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java | 8 ++++---- .../java/org/apache/fluss/lake/hudi/HudiLakeStorage.java | 5 +---- .../org/apache/fluss/lake/hudi/utils/HudiConversions.java | 4 ++-- .../fluss/lake/hudi/utils/catalog/HudiCatalogUtils.java | 6 +++--- 5 files changed, 13 insertions(+), 16 deletions(-) diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/FlussDataTypeToHudiDataType.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/FlussDataTypeToHudiDataType.java index 9bb2a69764..2d62c07f5d 100644 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/FlussDataTypeToHudiDataType.java +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/FlussDataTypeToHudiDataType.java @@ -42,7 +42,7 @@ import org.apache.flink.table.types.DataType; import static org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils.FILE_SYSTEM_TYPE; -import static org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils.HIVA_META_STAORE_TYPE; +import static org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils.HIVE_META_STORE_TYPE; /** Convert from Fluss's data type to Hudi's internal data type. */ public class FlussDataTypeToHudiDataType implements DataTypeVisitor { @@ -56,7 +56,7 @@ public FlussDataTypeToHudiDataType(String catalogMode) { public static final FlussDataTypeToHudiDataType DFS_INSTANCE = new FlussDataTypeToHudiDataType(FILE_SYSTEM_TYPE); public static final FlussDataTypeToHudiDataType HMS_INSTANCE = - new FlussDataTypeToHudiDataType(HIVA_META_STAORE_TYPE); + new FlussDataTypeToHudiDataType(HIVE_META_STORE_TYPE); @Override public DataType visit(CharType charType) { @@ -139,7 +139,7 @@ public DataType visit(TimestampType timestampType) { @Override public DataType visit(LocalZonedTimestampType localZonedTimestampType) { - return HIVA_META_STAORE_TYPE.equals(catalogMode) + return HIVE_META_STORE_TYPE.equals(catalogMode) ? withNullability(DataTypes.BIGINT(), localZonedTimestampType.isNullable()) : withNullability( DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(), diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java index 84d088df8b..7d8b134d97 100644 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java @@ -45,7 +45,7 @@ import java.util.LinkedHashMap; import java.util.List; -import static org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils.HIVA_META_STAORE_TYPE; +import static org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils.HIVE_META_STORE_TYPE; import static org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils.HUDI_CATALOG_DEFAULT_NAME; import static org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils.MODE_CONFIG; import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME; @@ -66,10 +66,10 @@ public class HudiLakeCatalog implements LakeCatalog { } private final Catalog hudiCatalog; - private final String catalagMode; + private final String catalogMode; public HudiLakeCatalog(Configuration configuration) { - this.catalagMode = configuration.toMap().getOrDefault(MODE_CONFIG, HIVA_META_STAORE_TYPE); + this.catalogMode = configuration.toMap().getOrDefault(MODE_CONFIG, HIVE_META_STORE_TYPE); this.hudiCatalog = HudiCatalogUtils.createHudiCatalog(configuration); this.hudiCatalog.open(); } @@ -90,7 +90,7 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Co // Create Hudi catalog table CatalogTable catalogTable = - HudiConversions.createHudiCatalogTable(tableDescriptor, isPkTable, catalagMode); + HudiConversions.createHudiCatalogTable(tableDescriptor, isPkTable, catalogMode); // Create table in Hudi catalog try { diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeStorage.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeStorage.java index f0865de0ea..66db56057b 100644 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeStorage.java +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeStorage.java @@ -43,10 +43,7 @@ public HudiLakeStorage(Configuration configuration) { @Override public LakeCatalog createLakeCatalog() { - throw new UnsupportedOperationException( - "HudiLakeStorage is currently a scaffold and does not support creating a " - + "LakeCatalog yet. Verify that Hudi lake storage was selected " - + "intentionally and that the required Hudi support/module is available."); + return new HudiLakeCatalog(hudiConfig); } @Override diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiConversions.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiConversions.java index 6bab741755..41ccdffca1 100644 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiConversions.java +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiConversions.java @@ -48,7 +48,7 @@ import java.util.Set; import static org.apache.fluss.lake.hudi.HudiLakeCatalog.SYSTEM_COLUMNS; -import static org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils.HIVA_META_STAORE_TYPE; +import static org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils.HIVE_META_STORE_TYPE; /** Utils for conversion between Hudi and Fluss. */ public class HudiConversions { @@ -203,7 +203,7 @@ public static CatalogTable createHudiCatalogTable( LOG.info("Hudi table properties: {}", options); String comment = tableDescriptor.getComment().orElse("Hudi table created from Fluss"); - return HIVA_META_STAORE_TYPE.equals(catalogMode) + return HIVE_META_STORE_TYPE.equals(catalogMode) ? HudiCatalogUtils.createCatalogTable(schema, partitionKeys, options, comment) : HudiCatalogUtils.createResolvedCatalogTable( schema, partitionKeys, options, comment, resolvedSchema); diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/catalog/HudiCatalogUtils.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/catalog/HudiCatalogUtils.java index 948ceb3248..e9de3b759d 100644 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/catalog/HudiCatalogUtils.java +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/catalog/HudiCatalogUtils.java @@ -46,7 +46,7 @@ public class HudiCatalogUtils { public static final String CATALOG_NAME_CONFIG = "name"; public static final String HUDI_CATALOG_DEFAULT_NAME = "fluss-hudi-catalog"; - public static final String HIVA_META_STAORE_TYPE = "hms"; + public static final String HIVE_META_STORE_TYPE = "hms"; public static final String FILE_SYSTEM_TYPE = "dfs"; public static Catalog createHudiCatalog(Configuration configuration) { @@ -65,14 +65,14 @@ public static Catalog buildHudiCatalog( Map hudiProps, org.apache.flink.configuration.Configuration configuration) { String catalogMode = - hudiProps.getOrDefault(CatalogOptions.MODE.key(), HIVA_META_STAORE_TYPE); + hudiProps.getOrDefault(CatalogOptions.MODE.key(), HIVE_META_STORE_TYPE); LOG.info( "create hudi catalog: {}, mode: {}, configuration: {}", catalogName, catalogMode, configuration); switch (catalogMode.toLowerCase(Locale.ENGLISH)) { - case HIVA_META_STAORE_TYPE: + case HIVE_META_STORE_TYPE: return new HoodieHiveCatalog(catalogName, configuration); case FILE_SYSTEM_TYPE: return new HoodieCatalog(catalogName, configuration); From c5594be99351ad01068601f5356d3e501c7a25ab Mon Sep 17 00:00:00 2001 From: fhan Date: Wed, 27 May 2026 11:56:03 +0800 Subject: [PATCH 03/11] [lake/hudi] add catalogMode in HudiConversions.convertToFlinkResolvedSchema() and add isCreatingFlussTable in HudiLakeCatalog.createTable() --- .../fluss/lake/hudi/HudiLakeCatalog.java | 89 +++++++++++++++++-- .../lake/hudi/utils/HudiConversions.java | 25 +++++- 2 files changed, 102 insertions(+), 12 deletions(-) diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java index 7d8b134d97..c1b294e1b8 100644 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java @@ -19,6 +19,7 @@ import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.TableAlreadyExistException; import org.apache.fluss.lake.hudi.utils.HudiConversions; import org.apache.fluss.lake.hudi.utils.catalog.CatalogDatabaseImpl; import org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils; @@ -34,9 +35,10 @@ import org.apache.flink.table.catalog.CatalogDatabase; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; -import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.table.types.DataType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,7 +83,7 @@ protected Catalog getHudiCatalog() { @Override public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Context context) - throws org.apache.fluss.exception.TableAlreadyExistException { + throws TableAlreadyExistException { LOG.info("create the lake table for : {} with props: {}", tablePath, tableDescriptor); ObjectPath objectPath = HudiConversions.toHudiObjectPath(tablePath); @@ -94,11 +96,11 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Co // Create table in Hudi catalog try { - createTable(objectPath, catalogTable); + createTable(objectPath, catalogTable, context.isCreatingFlussTable()); } catch (DatabaseNotExistException e) { createDatabase(tablePath.getDatabaseName()); try { - createTable(objectPath, catalogTable); + createTable(objectPath, catalogTable, context.isCreatingFlussTable()); } catch (DatabaseNotExistException t) { // shouldn't happen in normal cases throw new RuntimeException( @@ -118,15 +120,86 @@ public void alterTable(TablePath tablePath, List tableChanges, Cont "Alter table is not supported for Hudi at the moment"); } - private void createTable(ObjectPath tablePath, CatalogBaseTable catalogTable) + private void createTable( + ObjectPath tablePath, CatalogBaseTable catalogTable, boolean isCreatingFlussTable) throws DatabaseNotExistException { try { - hudiCatalog.createTable(tablePath, catalogTable, true); + hudiCatalog.createTable(tablePath, catalogTable, false); LOG.info("Table {} created successfully.", tablePath); } catch (TableAlreadyExistException e) { - throw new org.apache.fluss.exception.TableAlreadyExistException( - "Table " + tablePath + " already exists."); + // table already exists, check schema compatibility for idempotency + try { + CatalogBaseTable existingTable = hudiCatalog.getTable(tablePath); + if (!isHudiSchemaCompatible(existingTable, catalogTable)) { + throw new TableAlreadyExistException( + String.format( + "The table %s already exists in Hudi catalog, but the table schema is not compatible. " + + "Please first drop the table in Hudi catalog or use a new table name.", + tablePath)); + } + // if creating a new fluss table, we should ensure the lake table is empty + // TODO: add emptiness check for Hudi table once LakeTieringFactory is implemented + if (isCreatingFlussTable) { + LOG.warn( + "Table {} already exists in Hudi catalog with compatible schema. " + + "Skipping creation as the table may not be empty.", + tablePath); + } + } catch (TableNotExistException tableNotExistException) { + // shouldn't happen in normal cases + throw new RuntimeException( + String.format( + "Failed to create table %s in Hudi. The table already existed " + + "during the initial creation attempt, but subsequently " + + "could not be found when trying to get it. " + + "Please check whether the Hudi table was manually deleted, and try again.", + tablePath)); + } + } + } + + /** + * Checks whether the existing Hudi table schema is compatible with the expected schema. + * + *

Compatibility means the column names, types, and nullability match. This is used for + * crash-recovery idempotency: if the table already exists with a compatible schema, the + * creation is considered successful. + */ + @VisibleForTesting + boolean isHudiSchemaCompatible(CatalogBaseTable existingTable, CatalogBaseTable expectedTable) { + ResolvedSchema existingSchema; + ResolvedSchema expectedSchema; + try { + existingSchema = existingTable.getResolvedSchema(); + expectedSchema = expectedTable.getResolvedSchema(); + } catch (Exception e) { + // Fallback: if resolved schema is not available, compare unresolved columns + List existingColumns = + existingTable.getSchema().getColumns().stream() + .map(org.apache.flink.table.api.Schema.UnresolvedColumn::getName) + .toList(); + List expectedColumns = + expectedTable.getSchema().getColumns().stream() + .map(org.apache.flink.table.api.Schema.UnresolvedColumn::getName) + .toList(); + return existingColumns.equals(expectedColumns); + } + + if (existingSchema.getColumns().size() != expectedSchema.getColumns().size()) { + return false; + } + + for (int i = 0; i < existingSchema.getColumns().size(); i++) { + org.apache.flink.table.catalog.Column existingCol = + existingSchema.getColumns().get(i); + org.apache.flink.table.catalog.Column expectedCol = + expectedSchema.getColumns().get(i); + if (!existingCol.getName().equals(expectedCol.getName()) + || !existingCol.getDataType().equals(expectedCol.getDataType())) { + return false; + } } + return true; } @Override diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiConversions.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiConversions.java index 41ccdffca1..8098597943 100644 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiConversions.java +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiConversions.java @@ -63,7 +63,17 @@ public class HudiConversions { private static final String DELIMITER = ","; /** Hudi config options set by Fluss should not be set by users. */ - @VisibleForTesting public static final Set HUDI_UNSETTABLE_OPTIONS = new HashSet<>(); + @VisibleForTesting + public static final Set HUDI_UNSETTABLE_OPTIONS = new HashSet<>(); + + static { + HUDI_UNSETTABLE_OPTIONS.add(FlinkOptions.TABLE_TYPE.key()); + HUDI_UNSETTABLE_OPTIONS.add(FlinkOptions.RECORD_KEY_FIELD.key()); + HUDI_UNSETTABLE_OPTIONS.add(FlinkOptions.INDEX_TYPE.key()); + HUDI_UNSETTABLE_OPTIONS.add(FlinkOptions.INDEX_KEY_FIELD.key()); + HUDI_UNSETTABLE_OPTIONS.add(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key()); + HUDI_UNSETTABLE_OPTIONS.add(FlinkOptions.PARTITION_PATH_FIELD.key()); + } /** * Converts a Fluss TablePath to a Hudi ObjectPath. @@ -76,11 +86,17 @@ public static ObjectPath toHudiObjectPath(TablePath tablePath) { } public static ResolvedSchema convertToFlinkResolvedSchema( - TableDescriptor tableDescriptor, boolean isPkTable) { + TableDescriptor tableDescriptor, boolean isPkTable, String catalogMode) { // validate hudi options first validateHudiOptions(tableDescriptor.getProperties()); validateHudiOptions(tableDescriptor.getCustomProperties()); + // choose the correct converter based on catalog mode + FlussDataTypeToHudiDataType converter = + HIVE_META_STORE_TYPE.equals(catalogMode) + ? FlussDataTypeToHudiDataType.HMS_INSTANCE + : FlussDataTypeToHudiDataType.DFS_INSTANCE; + List columns = new ArrayList<>(); // Add regular columns @@ -96,7 +112,7 @@ public static ResolvedSchema convertToFlinkResolvedSchema( columns.add( Column.physical( columnName, - column.getDataType().accept(FlussDataTypeToHudiDataType.DFS_INSTANCE))); + column.getDataType().accept(converter))); } // add system metadata columns to schema @@ -196,7 +212,8 @@ public static Map buildHudiTableProperties( */ public static CatalogTable createHudiCatalogTable( TableDescriptor tableDescriptor, boolean isPkTable, String catalogMode) { - ResolvedSchema resolvedSchema = convertToFlinkResolvedSchema(tableDescriptor, isPkTable); + ResolvedSchema resolvedSchema = + convertToFlinkResolvedSchema(tableDescriptor, isPkTable, catalogMode); Schema schema = Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(); List partitionKeys = tableDescriptor.getPartitionKeys(); Map options = buildHudiTableProperties(tableDescriptor, isPkTable); From ca2da588b52f160ca9b8468c3e19d62ea4bd14cd Mon Sep 17 00:00:00 2001 From: fhan Date: Wed, 27 May 2026 14:02:32 +0800 Subject: [PATCH 04/11] [lake/hudi] refine primarykey logic in HudiConversions & use flink CatalogDatabaseImpl in HudiLakeCatalog & use copied Configuration in HudiCatalogUtils --- fluss-lake/fluss-lake-hudi/pom.xml | 6 ++ .../fluss/lake/hudi/HudiLakeCatalog.java | 5 +- .../lake/hudi/utils/HudiConversions.java | 30 ++++++--- .../utils/catalog/CatalogDatabaseImpl.java | 62 ------------------- .../hudi/utils/catalog/HudiCatalogUtils.java | 8 ++- 5 files changed, 34 insertions(+), 77 deletions(-) delete mode 100644 fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/catalog/CatalogDatabaseImpl.java diff --git a/fluss-lake/fluss-lake-hudi/pom.xml b/fluss-lake/fluss-lake-hudi/pom.xml index 5a9ea7f47b..e816c84c11 100644 --- a/fluss-lake/fluss-lake-hudi/pom.xml +++ b/fluss-lake/fluss-lake-hudi/pom.xml @@ -149,6 +149,12 @@ + + org.apache.flink + flink-table-api-java + provided + + org.apache.fluss diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java index c1b294e1b8..3404dbf807 100644 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java @@ -21,7 +21,6 @@ import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.TableAlreadyExistException; import org.apache.fluss.lake.hudi.utils.HudiConversions; -import org.apache.fluss.lake.hudi.utils.catalog.CatalogDatabaseImpl; import org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils; import org.apache.fluss.lake.lakestorage.LakeCatalog; import org.apache.fluss.metadata.TableChange; @@ -211,7 +210,9 @@ public void close() { public void createDatabase(String databaseName) { try { - CatalogDatabase database = new CatalogDatabaseImpl(new HashMap<>(), "Hudi database"); + CatalogDatabase database = + new org.apache.flink.table.catalog.CatalogDatabaseImpl( + new HashMap<>(), "Hudi database"); // ignore if exists hudiCatalog.createDatabase(databaseName, database, true); } catch (DatabaseAlreadyExistException e) { diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiConversions.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiConversions.java index 8098597943..157e49eb1f 100644 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiConversions.java +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiConversions.java @@ -123,11 +123,9 @@ public static ResolvedSchema convertToFlinkResolvedSchema( UniqueConstraint constraint = null; // Set primary key if this is a PK table if (isPkTable && tableDescriptor.hasPrimaryKey()) { - List primaryKeys = new ArrayList<>(); - for (int pkIndex : tableDescriptor.getSchema().getPrimaryKeyIndexes()) { - primaryKeys.add(tableDescriptor.getSchema().getColumns().get(pkIndex).getName()); - } - constraint = UniqueConstraint.primaryKey("primaryKey", primaryKeys); + constraint = + UniqueConstraint.primaryKey( + "primaryKey", extractPrimaryKeyColumns(tableDescriptor)); } return new ResolvedSchema(columns, Collections.emptyList(), constraint); @@ -150,11 +148,9 @@ public static Map buildHudiTableProperties( // Set table type based on whether it's a PK table if (isPkTable) { hudiProperties.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); - List primaryKeys = new ArrayList<>(); - for (int pkIndex : tableDescriptor.getSchema().getPrimaryKeyIndexes()) { - primaryKeys.add(tableDescriptor.getSchema().getColumns().get(pkIndex).getName()); - } - hudiProperties.put(FlinkOptions.RECORD_KEY_FIELD.key(), String.join(",", primaryKeys)); + hudiProperties.put( + FlinkOptions.RECORD_KEY_FIELD.key(), + String.join(DELIMITER, extractPrimaryKeyColumns(tableDescriptor))); } else { hudiProperties.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.COPY_ON_WRITE.name()); // set primary key for Fluss Log Table. @@ -251,4 +247,18 @@ private static void validateHudiOptions(Map properties) { } }); } + + /** + * Extracts the primary key column names from a Fluss TableDescriptor. + * + * @param tableDescriptor the Fluss table descriptor + * @return list of primary key column names + */ + private static List extractPrimaryKeyColumns(TableDescriptor tableDescriptor) { + List primaryKeys = new ArrayList<>(); + for (int pkIndex : tableDescriptor.getSchema().getPrimaryKeyIndexes()) { + primaryKeys.add(tableDescriptor.getSchema().getColumns().get(pkIndex).getName()); + } + return primaryKeys; + } } diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/catalog/CatalogDatabaseImpl.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/catalog/CatalogDatabaseImpl.java deleted file mode 100644 index b3a0d47f28..0000000000 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/catalog/CatalogDatabaseImpl.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.lake.hudi.utils.catalog; - -import org.apache.flink.table.catalog.CatalogDatabase; -import org.apache.flink.util.Preconditions; - -import javax.annotation.Nullable; - -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; - -/** Implement of catalog database. */ -public class CatalogDatabaseImpl implements CatalogDatabase { - private final Map properties; - private final String comment; - - public CatalogDatabaseImpl(Map properties, @Nullable String comment) { - this.properties = Preconditions.checkNotNull(properties, "properties cannot be null"); - this.comment = comment; - } - - public Map getProperties() { - return this.properties; - } - - public String getComment() { - return this.comment; - } - - public CatalogDatabase copy() { - return this.copy(this.getProperties()); - } - - public CatalogDatabase copy(Map properties) { - return new CatalogDatabaseImpl(new HashMap<>(properties), this.comment); - } - - public Optional getDescription() { - return Optional.ofNullable(this.comment); - } - - public Optional getDetailedDescription() { - return Optional.ofNullable(this.comment); - } -} diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/catalog/HudiCatalogUtils.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/catalog/HudiCatalogUtils.java index e9de3b759d..c4bd3413a3 100644 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/catalog/HudiCatalogUtils.java +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/catalog/HudiCatalogUtils.java @@ -51,13 +51,15 @@ public class HudiCatalogUtils { public static Catalog createHudiCatalog(Configuration configuration) { Map hudiProps = configuration.toMap(); - configuration.setString(CatalogOptions.DEFAULT_DATABASE.key(), "tmp"); - configuration.setString(CatalogOptions.TABLE_EXTERNAL.key(), "true"); + // copy the configuration to avoid modifying the original + Configuration copiedConfig = new Configuration(configuration); + copiedConfig.setString(CatalogOptions.DEFAULT_DATABASE.key(), "tmp"); + copiedConfig.setString(CatalogOptions.TABLE_EXTERNAL.key(), "true"); String catalogName = hudiProps.getOrDefault(CATALOG_NAME_CONFIG, HUDI_CATALOG_DEFAULT_NAME); return buildHudiCatalog( catalogName, hudiProps, - org.apache.flink.configuration.Configuration.fromMap(configuration.toMap())); + org.apache.flink.configuration.Configuration.fromMap(copiedConfig.toMap())); } public static Catalog buildHudiCatalog( From 9a12957d997058c0bee3d3799cc8404eee0b3343 Mon Sep 17 00:00:00 2001 From: fhan Date: Wed, 27 May 2026 14:41:05 +0800 Subject: [PATCH 05/11] [lake/hudi] optimize createDatabase() impl --- .../fluss/lake/hudi/HudiLakeCatalog.java | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java index 3404dbf807..272f061416 100644 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java @@ -31,11 +31,9 @@ import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogBaseTable; -import org.apache.flink.table.catalog.CatalogDatabase; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.ResolvedSchema; -import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.table.types.DataType; @@ -210,13 +208,20 @@ public void close() { public void createDatabase(String databaseName) { try { - CatalogDatabase database = - new org.apache.flink.table.catalog.CatalogDatabaseImpl( - new HashMap<>(), "Hudi database"); - // ignore if exists - hudiCatalog.createDatabase(databaseName, database, true); - } catch (DatabaseAlreadyExistException e) { - // do nothing, shouldn't throw since ignoreIfExists + if (!hudiCatalog.databaseExists(databaseName)) { + CatalogDatabase database = + new org.apache.flink.table.catalog.CatalogDatabaseImpl( + new HashMap<>(), "Hudi database"); + hudiCatalog.createDatabase(databaseName, database, true); + } + } catch (UnsupportedOperationException e) { + throw new UnsupportedOperationException( + String.format( + "The underlying Hudi catalog does not support database operations for database '%s'. " + + "This typically occurs with a filesystem-based catalog (dfs mode). " + + "Consider using Hive Metastore (hms mode) instead.", + databaseName), + e); } } } From 47dc7c57b0eb010929be94ed04121d393a9b0ab8 Mon Sep 17 00:00:00 2001 From: fhan Date: Thu, 28 May 2026 18:16:10 +0800 Subject: [PATCH 06/11] [lake/hudi] add some basic tests --- .../fluss/lake/hudi/HudiLakeCatalogTest.java | 315 ++++++++++++++++++ 1 file changed, 315 insertions(+) diff --git a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/HudiLakeCatalogTest.java b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/HudiLakeCatalogTest.java index 1500f22c26..9c96e51346 100644 --- a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/HudiLakeCatalogTest.java +++ b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/HudiLakeCatalogTest.java @@ -18,6 +18,9 @@ package org.apache.fluss.lake.hudi; import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.InvalidConfigException; +import org.apache.fluss.exception.InvalidTableException; +import org.apache.fluss.exception.TableAlreadyExistException; import org.apache.fluss.lake.hudi.utils.HudiConversions; import org.apache.fluss.lake.lakestorage.TestingLakeCatalogContext; import org.apache.fluss.metadata.Schema; @@ -27,7 +30,10 @@ import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -35,11 +41,13 @@ import java.io.File; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Unit test for {@link HudiLakeCatalog}. */ class HudiLakeCatalogTest { @@ -171,4 +179,311 @@ void testCreateLogTable() throws TableNotExistException { assertThat(table.getUnresolvedSchema()).isEqualTo(expectHudiSchema.toSchema()); } + + // ------------------------------------------------------------------ + // isHudiSchemaCompatible() tests + // ------------------------------------------------------------------ + + @Test + void testIsHudiSchemaCompatibleWithSameSchema() { + // Build two catalog tables with identical schema + CatalogTable table1 = buildTestCatalogTable( + new String[]{"id", "name"}, + new org.apache.flink.table.api.DataTypes[]{ + org.apache.flink.table.api.DataTypes.INT().notNull(), + org.apache.flink.table.api.DataTypes.STRING() + }); + CatalogTable table2 = buildTestCatalogTable( + new String[]{"id", "name"}, + new org.apache.flink.table.api.DataTypes[]{ + org.apache.flink.table.api.DataTypes.INT().notNull(), + org.apache.flink.table.api.DataTypes.STRING() + }); + + assertThat(flussHudiLakeCatalog.isHudiSchemaCompatible(table1, table2)).isTrue(); + } + + @Test + void testIsHudiSchemaCompatibleWithDifferentColumnCount() { + CatalogTable table1 = buildTestCatalogTable( + new String[]{"id", "name"}, + new org.apache.flink.table.api.DataTypes[]{ + org.apache.flink.table.api.DataTypes.INT().notNull(), + org.apache.flink.table.api.DataTypes.STRING() + }); + CatalogTable table2 = buildTestCatalogTable( + new String[]{"id"}, + new org.apache.flink.table.api.DataTypes[]{ + org.apache.flink.table.api.DataTypes.INT().notNull() + }); + + assertThat(flussHudiLakeCatalog.isHudiSchemaCompatible(table1, table2)).isFalse(); + } + + @Test + void testIsHudiSchemaCompatibleWithDifferentColumnName() { + CatalogTable table1 = buildTestCatalogTable( + new String[]{"id", "name"}, + new org.apache.flink.table.api.DataTypes[]{ + org.apache.flink.table.api.DataTypes.INT().notNull(), + org.apache.flink.table.api.DataTypes.STRING() + }); + CatalogTable table2 = buildTestCatalogTable( + new String[]{"id", "value"}, + new org.apache.flink.table.api.DataTypes[]{ + org.apache.flink.table.api.DataTypes.INT().notNull(), + org.apache.flink.table.api.DataTypes.STRING() + }); + + assertThat(flussHudiLakeCatalog.isHudiSchemaCompatible(table1, table2)).isFalse(); + } + + @Test + void testIsHudiSchemaCompatibleWithDifferentColumnType() { + CatalogTable table1 = buildTestCatalogTable( + new String[]{"id", "name"}, + new org.apache.flink.table.api.DataTypes[]{ + org.apache.flink.table.api.DataTypes.INT().notNull(), + org.apache.flink.table.api.DataTypes.STRING() + }); + CatalogTable table2 = buildTestCatalogTable( + new String[]{"id", "name"}, + new org.apache.flink.table.api.DataTypes[]{ + org.apache.flink.table.api.DataTypes.INT().notNull(), + org.apache.flink.table.api.DataTypes.BIGINT() + }); + + assertThat(flussHudiLakeCatalog.isHudiSchemaCompatible(table1, table2)).isFalse(); + } + + // ------------------------------------------------------------------ + // Duplicate table creation idempotency tests + // ------------------------------------------------------------------ + + @Test + void testCreateDuplicateTableWithCompatibleSchema() throws TableNotExistException { + String database = "idempotent_db"; + String tableName = "idempotent_table"; + + Schema flussSchema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .primaryKey("id") + .build(); + + TableDescriptor tableDescriptor = + TableDescriptor.builder().schema(flussSchema).distributedBy(4, "id").build(); + + TablePath tablePath = TablePath.of(database, tableName); + TestingLakeCatalogContext context = new TestingLakeCatalogContext(); + + // First creation should succeed + flussHudiLakeCatalog.createTable(tablePath, tableDescriptor, context); + + // Second creation with same schema should not throw + flussHudiLakeCatalog.createTable(tablePath, tableDescriptor, context); + + // Verify the table still exists and has correct schema + CatalogBaseTable table = + flussHudiLakeCatalog.getHudiCatalog().getTable(HudiConversions.toHudiObjectPath(tablePath)); + assertThat(table).isNotNull(); + } + + @Test + void testCreateDuplicateTableWithIncompatibleSchema() { + String database = "incompat_db"; + String tableName = "incompat_table"; + + // First: create a table with id(INT) + name(STRING) + Schema flussSchema1 = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .primaryKey("id") + .build(); + + TableDescriptor tableDescriptor1 = + TableDescriptor.builder().schema(flussSchema1).distributedBy(4, "id").build(); + + TablePath tablePath = TablePath.of(database, tableName); + TestingLakeCatalogContext context = new TestingLakeCatalogContext(); + flussHudiLakeCatalog.createTable(tablePath, tableDescriptor1, context); + + // Second: try creating a table with id(INT) + name(BIGINT) - different type + Schema flussSchema2 = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.BIGINT()) + .primaryKey("id") + .build(); + + TableDescriptor tableDescriptor2 = + TableDescriptor.builder().schema(flussSchema2).distributedBy(4, "id").build(); + + assertThatThrownBy(() -> flussHudiLakeCatalog.createTable(tablePath, tableDescriptor2, context)) + .isInstanceOf(TableAlreadyExistException.class) + .hasMessageContaining("not compatible"); + } + + // ------------------------------------------------------------------ + // HUDI_UNSETTABLE_OPTIONS validation tests + // ------------------------------------------------------------------ + + @Test + void testUnsettableOptionInPropertiesThrowsException() { + Schema flussSchema = + Schema.newBuilder().column("id", DataTypes.INT()).primaryKey("id").build(); + + // Set a protected Hudi option via properties (without hudi. prefix) + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(flussSchema) + .distributedBy(4, "id") + .property("hudi.hoodie.datasource.write.table.type", "COPY_ON_WRITE") + .build(); + + TablePath tablePath = TablePath.of("test_db", "protected_option_table"); + TestingLakeCatalogContext context = new TestingLakeCatalogContext(); + + assertThatThrownBy(() -> flussHudiLakeCatalog.createTable(tablePath, tableDescriptor, context)) + .isInstanceOf(InvalidConfigException.class) + .hasMessageContaining("hoodie.datasource.write.table.type") + .hasMessageContaining("should not be set manually"); + } + + @Test + void testUnsettableOptionInCustomPropertiesThrowsException() { + Schema flussSchema = + Schema.newBuilder().column("id", DataTypes.INT()).primaryKey("id").build(); + + Map customProperties = new HashMap<>(); + // Set a protected Hudi option via customProperties + customProperties.put("hudi.hoodie.datasource.write.recordkey.field", "id"); + + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(flussSchema) + .distributedBy(4, "id") + .customProperties(customProperties) + .build(); + + TablePath tablePath = TablePath.of("test_db", "protected_custom_table"); + TestingLakeCatalogContext context = new TestingLakeCatalogContext(); + + assertThatThrownBy(() -> flussHudiLakeCatalog.createTable(tablePath, tableDescriptor, context)) + .isInstanceOf(InvalidConfigException.class) + .hasMessageContaining("hoodie.datasource.write.recordkey.field") + .hasMessageContaining("should not be set manually"); + } + + @Test + void testNonProtectedHudiOptionPassesValidation() throws TableNotExistException { + Schema flussSchema = + Schema.newBuilder().column("id", DataTypes.INT()).primaryKey("id").build(); + + // Set a non-protected Hudi option (e.g., precombine.field) — should work fine + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(flussSchema) + .distributedBy(4, "id") + .property("hudi.precombine.field", "id") + .build(); + + TablePath tablePath = TablePath.of("test_db", "non_protected_table"); + TestingLakeCatalogContext context = new TestingLakeCatalogContext(); + + // Should not throw + flussHudiLakeCatalog.createTable(tablePath, tableDescriptor, context); + + CatalogBaseTable table = + flussHudiLakeCatalog.getHudiCatalog().getTable(HudiConversions.toHudiObjectPath(tablePath)); + assertThat(table).isNotNull(); + assertThat(table.getOptions()).containsEntry("precombine.field", "id"); + } + + // ------------------------------------------------------------------ + // System column name conflict tests + // ------------------------------------------------------------------ + + @Test + void testSystemColumnBucketConflictThrowsException() { + Schema flussSchema = + Schema.newBuilder() + .column("__bucket", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .primaryKey("__bucket") + .build(); + + TableDescriptor tableDescriptor = + TableDescriptor.builder().schema(flussSchema).distributedBy(4, "__bucket").build(); + + TablePath tablePath = TablePath.of("test_db", "bucket_conflict_table"); + TestingLakeCatalogContext context = new TestingLakeCatalogContext(); + + assertThatThrownBy(() -> flussHudiLakeCatalog.createTable(tablePath, tableDescriptor, context)) + .isInstanceOf(InvalidTableException.class) + .hasMessageContaining("__bucket") + .hasMessageContaining("system column"); + } + + @Test + void testSystemColumnOffsetConflictThrowsException() { + Schema flussSchema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("__offset", DataTypes.BIGINT()) + .primaryKey("id") + .build(); + + TableDescriptor tableDescriptor = + TableDescriptor.builder().schema(flussSchema).distributedBy(4, "id").build(); + + TablePath tablePath = TablePath.of("test_db", "offset_conflict_table"); + TestingLakeCatalogContext context = new TestingLakeCatalogContext(); + + assertThatThrownBy(() -> flussHudiLakeCatalog.createTable(tablePath, tableDescriptor, context)) + .isInstanceOf(InvalidTableException.class) + .hasMessageContaining("__offset") + .hasMessageContaining("system column"); + } + + @Test + void testSystemColumnTimestampConflictThrowsException() { + Schema flussSchema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("__timestamp", DataTypes.TIMESTAMP(6)) + .primaryKey("id") + .build(); + + TableDescriptor tableDescriptor = + TableDescriptor.builder().schema(flussSchema).distributedBy(4, "id").build(); + + TablePath tablePath = TablePath.of("test_db", "timestamp_conflict_table"); + TestingLakeCatalogContext context = new TestingLakeCatalogContext(); + + assertThatThrownBy(() -> flussHudiLakeCatalog.createTable(tablePath, tableDescriptor, context)) + .isInstanceOf(InvalidTableException.class) + .hasMessageContaining("__timestamp") + .hasMessageContaining("system column"); + } + + // ------------------------------------------------------------------ + // Helper methods + // ------------------------------------------------------------------ + + private CatalogTable buildTestCatalogTable( + String[] columnNames, org.apache.flink.table.api.DataTypes[] columnTypes) { + List columns = new ArrayList<>(); + for (int i = 0; i < columnNames.length; i++) { + columns.add(Column.physical(columnNames[i], columnTypes[i])); + } + ResolvedSchema resolvedSchema = new ResolvedSchema(columns, Collections.emptyList(), null); + org.apache.flink.table.api.Schema schema = + org.apache.flink.table.api.Schema.newBuilder() + .fromResolvedSchema(resolvedSchema) + .build(); + return CatalogTable.of(schema, null, Collections.emptyList(), new HashMap<>()); + } } From 233a8bd8b53227b761482d15125d29d6b432c40c Mon Sep 17 00:00:00 2001 From: fhan Date: Thu, 28 May 2026 19:05:17 +0800 Subject: [PATCH 07/11] [lake/hudi] add version info for flink-table-api-java dependency --- fluss-lake/fluss-lake-hudi/pom.xml | 1 + fluss-lake/pom.xml | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/fluss-lake/fluss-lake-hudi/pom.xml b/fluss-lake/fluss-lake-hudi/pom.xml index e816c84c11..caf4f7d8b1 100644 --- a/fluss-lake/fluss-lake-hudi/pom.xml +++ b/fluss-lake/fluss-lake-hudi/pom.xml @@ -152,6 +152,7 @@ org.apache.flink flink-table-api-java + ${flink.version} provided diff --git a/fluss-lake/pom.xml b/fluss-lake/pom.xml index 508206bfed..eab338f590 100644 --- a/fluss-lake/pom.xml +++ b/fluss-lake/pom.xml @@ -44,6 +44,11 @@ flink-table-common ${flink.version} + + org.apache.flink + flink-table-api-java + ${flink.version} + org.apache.flink flink-table-runtime From 1c7abb9f7948b3ba37f0937ca059bd45c5473ff9 Mon Sep 17 00:00:00 2001 From: fhan Date: Thu, 28 May 2026 19:08:13 +0800 Subject: [PATCH 08/11] [lake/hudi] fix format violations --- .../fluss/flink/lake/LakeFlinkCatalog.java | 127 +++++++++-------- .../fluss/flink/lake/LakeTableFactory.java | 60 ++++---- .../fluss/lake/hudi/HudiLakeCatalog.java | 6 +- .../lake/hudi/utils/HudiConversions.java | 8 +- .../fluss/lake/hudi/HudiLakeCatalogTest.java | 130 ++++++++++-------- 5 files changed, 172 insertions(+), 159 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java index 420a9a63e0..d80c391f10 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java @@ -1,4 +1,3 @@ - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -54,7 +53,7 @@ public LakeFlinkCatalog(String catalogName, ClassLoader classLoader) { } public Catalog getLakeCatalog( - Configuration tableOptions, Map lakeCatalogProperties) { + Configuration tableOptions, Map lakeCatalogProperties) { // TODO: Currently, a Fluss cluster only supports a single DataLake storage. // However, in the // future, it may support multiple DataLakes. The following code assumes @@ -66,36 +65,36 @@ public Catalog getLakeCatalog( synchronized (this) { if (catalog == null) { DataLakeFormat lakeFormat = - tableOptions.get(ConfigOptions.TABLE_DATALAKE_FORMAT); + tableOptions.get(ConfigOptions.TABLE_DATALAKE_FORMAT); if (lakeFormat == null) { throw new IllegalArgumentException( - "DataLake format is not specified in table options. " - + "Please ensure '" - + ConfigOptions.TABLE_DATALAKE_FORMAT.key() - + "' is set."); + "DataLake format is not specified in table options. " + + "Please ensure '" + + ConfigOptions.TABLE_DATALAKE_FORMAT.key() + + "' is set."); } Map catalogProperties = - new HashMap<>(DataLakeUtils.extractLakeCatalogProperties(tableOptions)); + new HashMap<>(DataLakeUtils.extractLakeCatalogProperties(tableOptions)); // properties in catalog are preferred catalogProperties.putAll( - PropertiesUtils.extractAndRemovePrefix( - lakeCatalogProperties, lakeFormat + ".")); + PropertiesUtils.extractAndRemovePrefix( + lakeCatalogProperties, lakeFormat + ".")); if (lakeFormat == PAIMON) { catalog = - PaimonCatalogFactory.create( - catalogName, catalogProperties, classLoader); + PaimonCatalogFactory.create( + catalogName, catalogProperties, classLoader); this.lakeFormat = PAIMON; } else if (lakeFormat == ICEBERG) { catalog = IcebergCatalogFactory.create(catalogName, catalogProperties); this.lakeFormat = ICEBERG; } else if (lakeFormat == HUDI) { catalog = - HudiCatalogFactory.create( - catalogName, catalogProperties, classLoader); + HudiCatalogFactory.create( + catalogName, catalogProperties, classLoader); this.lakeFormat = HUDI; } else { throw new UnsupportedOperationException( - "Unsupported data lake format: " + lakeFormat); + "Unsupported data lake format: " + lakeFormat); } } } @@ -105,8 +104,8 @@ public Catalog getLakeCatalog( public DataLakeFormat getLakeFormat() { checkNotNull( - lakeFormat, - "DataLake format is null, must call getLakeCatalog first to initialize lake format."); + lakeFormat, + "DataLake format is null, must call getLakeCatalog first to initialize lake format."); return lakeFormat; } @@ -128,14 +127,14 @@ public static class PaimonCatalogFactory { private PaimonCatalogFactory() {} public static Catalog create( - String catalogName, - Map catalogProperties, - ClassLoader classLoader) { + String catalogName, + Map catalogProperties, + ClassLoader classLoader) { return FlinkCatalogFactory.createCatalog( - catalogName, - CatalogContext.create( - Options.fromMap(catalogProperties), null, new FlinkFileIOLoader()), - classLoader); + catalogName, + CatalogContext.create( + Options.fromMap(catalogProperties), null, new FlinkFileIOLoader()), + classLoader); } } @@ -157,19 +156,19 @@ public static Catalog create(String catalogName, Map catalogProp } try { Class flinkCatalogFactoryClass = - Class.forName("org.apache.iceberg.flink.FlinkCatalogFactory"); + Class.forName("org.apache.iceberg.flink.FlinkCatalogFactory"); Object factoryInstance = - flinkCatalogFactoryClass.getDeclaredConstructor().newInstance(); + flinkCatalogFactoryClass.getDeclaredConstructor().newInstance(); Method createCatalogMethod = - flinkCatalogFactoryClass.getMethod( - "createCatalog", String.class, Map.class); + flinkCatalogFactoryClass.getMethod( + "createCatalog", String.class, Map.class); return (Catalog) - createCatalogMethod.invoke(factoryInstance, catalogName, catalogProperties); + createCatalogMethod.invoke(factoryInstance, catalogName, catalogProperties); } catch (Exception e) { throw new RuntimeException( - "Failed to create Iceberg catalog using reflection. Please make sure iceberg-flink-runtime is on the classpath.", - e); + "Failed to create Iceberg catalog using reflection. Please make sure iceberg-flink-runtime is on the classpath.", + e); } } } @@ -195,24 +194,24 @@ public static Catalog create(String catalogName, Map catalogProp public static class HudiCatalogFactory { private static final String HOODIE_CATALOG_FACTORY_CLASS = - "org.apache.hudi.table.catalog.HoodieCatalogFactory"; + "org.apache.hudi.table.catalog.HoodieCatalogFactory"; private static final String FLINK_CATALOG_FACTORY_CONTEXT_CLASS = - "org.apache.flink.table.factories.CatalogFactory$Context"; + "org.apache.flink.table.factories.CatalogFactory$Context"; private static final String FLINK_DEFAULT_CATALOG_CONTEXT_CLASS = - "org.apache.flink.table.factories.FactoryUtil$DefaultCatalogContext"; + "org.apache.flink.table.factories.FactoryUtil$DefaultCatalogContext"; private HudiCatalogFactory() {} public static Catalog create( - String catalogName, - Map catalogProperties, - ClassLoader classLoader) { + String catalogName, + Map catalogProperties, + ClassLoader classLoader) { try { // 1) Build Hudi's catalog factory instance via reflection. Class hoodieCatalogFactoryClass = - Class.forName(HOODIE_CATALOG_FACTORY_CLASS, true, classLoader); + Class.forName(HOODIE_CATALOG_FACTORY_CLASS, true, classLoader); Object factoryInstance = - hoodieCatalogFactoryClass.getDeclaredConstructor().newInstance(); + hoodieCatalogFactoryClass.getDeclaredConstructor().newInstance(); // 2) Build a CatalogFactory.Context via Flink's internal default impl. // Constructor: DefaultCatalogContext(String name, @@ -223,43 +222,43 @@ public static Catalog create( // empty Flink Configuration (which implements ReadableConfig) as a benign // placeholder — Hudi's factory only consumes 'options' / 'name' / classloader. Class defaultCatalogContextClass = - Class.forName(FLINK_DEFAULT_CATALOG_CONTEXT_CLASS, true, classLoader); + Class.forName(FLINK_DEFAULT_CATALOG_CONTEXT_CLASS, true, classLoader); Class readableConfigClass = - Class.forName( - "org.apache.flink.configuration.ReadableConfig", true, classLoader); + Class.forName( + "org.apache.flink.configuration.ReadableConfig", true, classLoader); Class flinkConfigurationClass = - Class.forName( - "org.apache.flink.configuration.Configuration", true, classLoader); + Class.forName( + "org.apache.flink.configuration.Configuration", true, classLoader); Object emptyFlinkConfiguration = - flinkConfigurationClass.getDeclaredConstructor().newInstance(); + flinkConfigurationClass.getDeclaredConstructor().newInstance(); Object context = - defaultCatalogContextClass - .getDeclaredConstructor( - String.class, - Map.class, - readableConfigClass, - ClassLoader.class) - .newInstance( - catalogName, - catalogProperties, - emptyFlinkConfiguration, - classLoader); + defaultCatalogContextClass + .getDeclaredConstructor( + String.class, + Map.class, + readableConfigClass, + ClassLoader.class) + .newInstance( + catalogName, + catalogProperties, + emptyFlinkConfiguration, + classLoader); // 3) Invoke HoodieCatalogFactory#createCatalog(Context). Class contextInterface = - Class.forName(FLINK_CATALOG_FACTORY_CONTEXT_CLASS, true, classLoader); + Class.forName(FLINK_CATALOG_FACTORY_CONTEXT_CLASS, true, classLoader); Method createCatalogMethod = - hoodieCatalogFactoryClass.getMethod("createCatalog", contextInterface); + hoodieCatalogFactoryClass.getMethod("createCatalog", contextInterface); return (Catalog) createCatalogMethod.invoke(factoryInstance, context); } catch (Exception e) { throw new RuntimeException( - "Failed to create Hudi catalog using reflection. Please make sure " - + "hudi-flink-bundle (matching the current Flink version, " - + "Hudi 1.0+) is on the classpath, typically under " - + "plugins/hudi/, and that the catalog options include a valid " - + "'mode' (supported: 'hms' or 'dfs').", - e); + "Failed to create Hudi catalog using reflection. Please make sure " + + "hudi-flink-bundle (matching the current Flink version, " + + "Hudi 1.0+) is on the classpath, typically under " + + "plugins/hudi/, and that the catalog options include a valid " + + "'mode' (supported: 'hms' or 'dfs').", + e); } } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java index 6fb0671cd9..3def9bbdc2 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java @@ -36,24 +36,24 @@ public LakeTableFactory(LakeFlinkCatalog lakeFlinkCatalog) { } public DynamicTableSource createDynamicTableSource( - DynamicTableFactory.Context context, String tableName) { + DynamicTableFactory.Context context, String tableName) { ObjectIdentifier originIdentifier = context.getObjectIdentifier(); ObjectIdentifier lakeIdentifier = - ObjectIdentifier.of( - originIdentifier.getCatalogName(), - originIdentifier.getDatabaseName(), - tableName); + ObjectIdentifier.of( + originIdentifier.getCatalogName(), + originIdentifier.getDatabaseName(), + tableName); // For Iceberg, Hudi and Paimon, pass the table name as-is to their factory. // Metadata tables will be handled internally by their respective factories. DynamicTableFactory.Context newContext = - new FactoryUtil.DefaultDynamicTableContext( - lakeIdentifier, - context.getCatalogTable(), - context.getEnrichmentOptions(), - context.getConfiguration(), - context.getClassLoader(), - context.isTemporary()); + new FactoryUtil.DefaultDynamicTableContext( + lakeIdentifier, + context.getCatalogTable(), + context.getEnrichmentOptions(), + context.getConfiguration(), + context.getClassLoader(), + context.isTemporary()); // Get the appropriate factory based on connector type DynamicTableSourceFactory factory = getLakeTableFactory(); @@ -70,9 +70,9 @@ private DynamicTableSourceFactory getLakeTableFactory() { return getHudiFactory(); default: throw new UnsupportedOperationException( - "Unsupported lake connector: " - + lakeFlinkCatalog.getLakeFormat() - + ". Only 'paimon', 'iceberg' and 'hudi' are supported."); + "Unsupported lake connector: " + + lakeFlinkCatalog.getLakeFormat() + + ". Only 'paimon', 'iceberg' and 'hudi' are supported."); } } @@ -84,23 +84,23 @@ private DynamicTableSourceFactory getIcebergFactory() { try { // Get catalog with explicit ICEBERG format org.apache.flink.table.catalog.Catalog catalog = - lakeFlinkCatalog.getLakeCatalog( - // we can pass empty configuration to get catalog - // since the catalog should already be initialized - new Configuration(), Collections.emptyMap()); + lakeFlinkCatalog.getLakeCatalog( + // we can pass empty configuration to get catalog + // since the catalog should already be initialized + new Configuration(), Collections.emptyMap()); // Create FlinkDynamicTableFactory with the catalog Class icebergFactoryClass = - Class.forName("org.apache.iceberg.flink.FlinkDynamicTableFactory"); + Class.forName("org.apache.iceberg.flink.FlinkDynamicTableFactory"); Class flinkCatalogClass = Class.forName("org.apache.iceberg.flink.FlinkCatalog"); return (DynamicTableSourceFactory) - icebergFactoryClass - .getDeclaredConstructor(flinkCatalogClass) - .newInstance(catalog); + icebergFactoryClass + .getDeclaredConstructor(flinkCatalogClass) + .newInstance(catalog); } catch (Exception e) { throw new RuntimeException( - "Failed to create Iceberg table factory. Please ensure iceberg-flink-runtime is on the classpath.", - e); + "Failed to create Iceberg table factory. Please ensure iceberg-flink-runtime is on the classpath.", + e); } } @@ -108,13 +108,13 @@ private DynamicTableSourceFactory getHudiFactory() { try { Class hudiFactoryClass = Class.forName("org.apache.hudi.table.HoodieTableFactory"); return (DynamicTableSourceFactory) - hudiFactoryClass.getDeclaredConstructor().newInstance(); + hudiFactoryClass.getDeclaredConstructor().newInstance(); } catch (Exception e) { throw new RuntimeException( - "Failed to create Hudi table factory. Please ensure hudi-flink-bundle " - + "(matching the current Flink version) is on the classpath, " - + "typically under plugins/hudi/.", - e); + "Failed to create Hudi table factory. Please ensure hudi-flink-bundle " + + "(matching the current Flink version) is on the classpath, " + + "typically under plugins/hudi/.", + e); } } } diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java index 272f061416..24485b48fa 100644 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java @@ -187,10 +187,8 @@ boolean isHudiSchemaCompatible(CatalogBaseTable existingTable, CatalogBaseTable } for (int i = 0; i < existingSchema.getColumns().size(); i++) { - org.apache.flink.table.catalog.Column existingCol = - existingSchema.getColumns().get(i); - org.apache.flink.table.catalog.Column expectedCol = - expectedSchema.getColumns().get(i); + org.apache.flink.table.catalog.Column existingCol = existingSchema.getColumns().get(i); + org.apache.flink.table.catalog.Column expectedCol = expectedSchema.getColumns().get(i); if (!existingCol.getName().equals(expectedCol.getName()) || !existingCol.getDataType().equals(expectedCol.getDataType())) { return false; diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiConversions.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiConversions.java index 157e49eb1f..9becbe20f1 100644 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiConversions.java +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiConversions.java @@ -63,8 +63,7 @@ public class HudiConversions { private static final String DELIMITER = ","; /** Hudi config options set by Fluss should not be set by users. */ - @VisibleForTesting - public static final Set HUDI_UNSETTABLE_OPTIONS = new HashSet<>(); + @VisibleForTesting public static final Set HUDI_UNSETTABLE_OPTIONS = new HashSet<>(); static { HUDI_UNSETTABLE_OPTIONS.add(FlinkOptions.TABLE_TYPE.key()); @@ -109,10 +108,7 @@ public static ResolvedSchema convertToFlinkResolvedSchema( + columnName + " conflicts with a system column name of hudi table, please rename the column."); } - columns.add( - Column.physical( - columnName, - column.getDataType().accept(converter))); + columns.add(Column.physical(columnName, column.getDataType().accept(converter))); } // add system metadata columns to schema diff --git a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/HudiLakeCatalogTest.java b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/HudiLakeCatalogTest.java index 9c96e51346..42ace0b451 100644 --- a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/HudiLakeCatalogTest.java +++ b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/HudiLakeCatalogTest.java @@ -187,71 +187,79 @@ void testCreateLogTable() throws TableNotExistException { @Test void testIsHudiSchemaCompatibleWithSameSchema() { // Build two catalog tables with identical schema - CatalogTable table1 = buildTestCatalogTable( - new String[]{"id", "name"}, - new org.apache.flink.table.api.DataTypes[]{ - org.apache.flink.table.api.DataTypes.INT().notNull(), - org.apache.flink.table.api.DataTypes.STRING() - }); - CatalogTable table2 = buildTestCatalogTable( - new String[]{"id", "name"}, - new org.apache.flink.table.api.DataTypes[]{ - org.apache.flink.table.api.DataTypes.INT().notNull(), - org.apache.flink.table.api.DataTypes.STRING() - }); + CatalogTable table1 = + buildTestCatalogTable( + new String[] {"id", "name"}, + new org.apache.flink.table.api.DataTypes[] { + org.apache.flink.table.api.DataTypes.INT().notNull(), + org.apache.flink.table.api.DataTypes.STRING() + }); + CatalogTable table2 = + buildTestCatalogTable( + new String[] {"id", "name"}, + new org.apache.flink.table.api.DataTypes[] { + org.apache.flink.table.api.DataTypes.INT().notNull(), + org.apache.flink.table.api.DataTypes.STRING() + }); assertThat(flussHudiLakeCatalog.isHudiSchemaCompatible(table1, table2)).isTrue(); } @Test void testIsHudiSchemaCompatibleWithDifferentColumnCount() { - CatalogTable table1 = buildTestCatalogTable( - new String[]{"id", "name"}, - new org.apache.flink.table.api.DataTypes[]{ - org.apache.flink.table.api.DataTypes.INT().notNull(), - org.apache.flink.table.api.DataTypes.STRING() - }); - CatalogTable table2 = buildTestCatalogTable( - new String[]{"id"}, - new org.apache.flink.table.api.DataTypes[]{ - org.apache.flink.table.api.DataTypes.INT().notNull() - }); + CatalogTable table1 = + buildTestCatalogTable( + new String[] {"id", "name"}, + new org.apache.flink.table.api.DataTypes[] { + org.apache.flink.table.api.DataTypes.INT().notNull(), + org.apache.flink.table.api.DataTypes.STRING() + }); + CatalogTable table2 = + buildTestCatalogTable( + new String[] {"id"}, + new org.apache.flink.table.api.DataTypes[] { + org.apache.flink.table.api.DataTypes.INT().notNull() + }); assertThat(flussHudiLakeCatalog.isHudiSchemaCompatible(table1, table2)).isFalse(); } @Test void testIsHudiSchemaCompatibleWithDifferentColumnName() { - CatalogTable table1 = buildTestCatalogTable( - new String[]{"id", "name"}, - new org.apache.flink.table.api.DataTypes[]{ - org.apache.flink.table.api.DataTypes.INT().notNull(), - org.apache.flink.table.api.DataTypes.STRING() - }); - CatalogTable table2 = buildTestCatalogTable( - new String[]{"id", "value"}, - new org.apache.flink.table.api.DataTypes[]{ - org.apache.flink.table.api.DataTypes.INT().notNull(), - org.apache.flink.table.api.DataTypes.STRING() - }); + CatalogTable table1 = + buildTestCatalogTable( + new String[] {"id", "name"}, + new org.apache.flink.table.api.DataTypes[] { + org.apache.flink.table.api.DataTypes.INT().notNull(), + org.apache.flink.table.api.DataTypes.STRING() + }); + CatalogTable table2 = + buildTestCatalogTable( + new String[] {"id", "value"}, + new org.apache.flink.table.api.DataTypes[] { + org.apache.flink.table.api.DataTypes.INT().notNull(), + org.apache.flink.table.api.DataTypes.STRING() + }); assertThat(flussHudiLakeCatalog.isHudiSchemaCompatible(table1, table2)).isFalse(); } @Test void testIsHudiSchemaCompatibleWithDifferentColumnType() { - CatalogTable table1 = buildTestCatalogTable( - new String[]{"id", "name"}, - new org.apache.flink.table.api.DataTypes[]{ - org.apache.flink.table.api.DataTypes.INT().notNull(), - org.apache.flink.table.api.DataTypes.STRING() - }); - CatalogTable table2 = buildTestCatalogTable( - new String[]{"id", "name"}, - new org.apache.flink.table.api.DataTypes[]{ - org.apache.flink.table.api.DataTypes.INT().notNull(), - org.apache.flink.table.api.DataTypes.BIGINT() - }); + CatalogTable table1 = + buildTestCatalogTable( + new String[] {"id", "name"}, + new org.apache.flink.table.api.DataTypes[] { + org.apache.flink.table.api.DataTypes.INT().notNull(), + org.apache.flink.table.api.DataTypes.STRING() + }); + CatalogTable table2 = + buildTestCatalogTable( + new String[] {"id", "name"}, + new org.apache.flink.table.api.DataTypes[] { + org.apache.flink.table.api.DataTypes.INT().notNull(), + org.apache.flink.table.api.DataTypes.BIGINT() + }); assertThat(flussHudiLakeCatalog.isHudiSchemaCompatible(table1, table2)).isFalse(); } @@ -286,7 +294,9 @@ void testCreateDuplicateTableWithCompatibleSchema() throws TableNotExistExceptio // Verify the table still exists and has correct schema CatalogBaseTable table = - flussHudiLakeCatalog.getHudiCatalog().getTable(HudiConversions.toHudiObjectPath(tablePath)); + flussHudiLakeCatalog + .getHudiCatalog() + .getTable(HudiConversions.toHudiObjectPath(tablePath)); assertThat(table).isNotNull(); } @@ -321,7 +331,10 @@ void testCreateDuplicateTableWithIncompatibleSchema() { TableDescriptor tableDescriptor2 = TableDescriptor.builder().schema(flussSchema2).distributedBy(4, "id").build(); - assertThatThrownBy(() -> flussHudiLakeCatalog.createTable(tablePath, tableDescriptor2, context)) + assertThatThrownBy( + () -> + flussHudiLakeCatalog.createTable( + tablePath, tableDescriptor2, context)) .isInstanceOf(TableAlreadyExistException.class) .hasMessageContaining("not compatible"); } @@ -346,7 +359,8 @@ void testUnsettableOptionInPropertiesThrowsException() { TablePath tablePath = TablePath.of("test_db", "protected_option_table"); TestingLakeCatalogContext context = new TestingLakeCatalogContext(); - assertThatThrownBy(() -> flussHudiLakeCatalog.createTable(tablePath, tableDescriptor, context)) + assertThatThrownBy( + () -> flussHudiLakeCatalog.createTable(tablePath, tableDescriptor, context)) .isInstanceOf(InvalidConfigException.class) .hasMessageContaining("hoodie.datasource.write.table.type") .hasMessageContaining("should not be set manually"); @@ -371,7 +385,8 @@ void testUnsettableOptionInCustomPropertiesThrowsException() { TablePath tablePath = TablePath.of("test_db", "protected_custom_table"); TestingLakeCatalogContext context = new TestingLakeCatalogContext(); - assertThatThrownBy(() -> flussHudiLakeCatalog.createTable(tablePath, tableDescriptor, context)) + assertThatThrownBy( + () -> flussHudiLakeCatalog.createTable(tablePath, tableDescriptor, context)) .isInstanceOf(InvalidConfigException.class) .hasMessageContaining("hoodie.datasource.write.recordkey.field") .hasMessageContaining("should not be set manually"); @@ -397,7 +412,9 @@ void testNonProtectedHudiOptionPassesValidation() throws TableNotExistException flussHudiLakeCatalog.createTable(tablePath, tableDescriptor, context); CatalogBaseTable table = - flussHudiLakeCatalog.getHudiCatalog().getTable(HudiConversions.toHudiObjectPath(tablePath)); + flussHudiLakeCatalog + .getHudiCatalog() + .getTable(HudiConversions.toHudiObjectPath(tablePath)); assertThat(table).isNotNull(); assertThat(table.getOptions()).containsEntry("precombine.field", "id"); } @@ -421,7 +438,8 @@ void testSystemColumnBucketConflictThrowsException() { TablePath tablePath = TablePath.of("test_db", "bucket_conflict_table"); TestingLakeCatalogContext context = new TestingLakeCatalogContext(); - assertThatThrownBy(() -> flussHudiLakeCatalog.createTable(tablePath, tableDescriptor, context)) + assertThatThrownBy( + () -> flussHudiLakeCatalog.createTable(tablePath, tableDescriptor, context)) .isInstanceOf(InvalidTableException.class) .hasMessageContaining("__bucket") .hasMessageContaining("system column"); @@ -442,7 +460,8 @@ void testSystemColumnOffsetConflictThrowsException() { TablePath tablePath = TablePath.of("test_db", "offset_conflict_table"); TestingLakeCatalogContext context = new TestingLakeCatalogContext(); - assertThatThrownBy(() -> flussHudiLakeCatalog.createTable(tablePath, tableDescriptor, context)) + assertThatThrownBy( + () -> flussHudiLakeCatalog.createTable(tablePath, tableDescriptor, context)) .isInstanceOf(InvalidTableException.class) .hasMessageContaining("__offset") .hasMessageContaining("system column"); @@ -463,7 +482,8 @@ void testSystemColumnTimestampConflictThrowsException() { TablePath tablePath = TablePath.of("test_db", "timestamp_conflict_table"); TestingLakeCatalogContext context = new TestingLakeCatalogContext(); - assertThatThrownBy(() -> flussHudiLakeCatalog.createTable(tablePath, tableDescriptor, context)) + assertThatThrownBy( + () -> flussHudiLakeCatalog.createTable(tablePath, tableDescriptor, context)) .isInstanceOf(InvalidTableException.class) .hasMessageContaining("__timestamp") .hasMessageContaining("system column"); From 5d7d654f0e19569582fc6f9d8841a12f8a06dcc4 Mon Sep 17 00:00:00 2001 From: fhan Date: Thu, 28 May 2026 19:33:19 +0800 Subject: [PATCH 09/11] [lake/hudi] fix build errors --- fluss-lake/fluss-lake-hudi/pom.xml | 4 +- .../fluss/lake/hudi/HudiLakeCatalog.java | 52 ++++++++----------- 2 files changed, 24 insertions(+), 32 deletions(-) diff --git a/fluss-lake/fluss-lake-hudi/pom.xml b/fluss-lake/fluss-lake-hudi/pom.xml index caf4f7d8b1..0f2bc76d3a 100644 --- a/fluss-lake/fluss-lake-hudi/pom.xml +++ b/fluss-lake/fluss-lake-hudi/pom.xml @@ -36,7 +36,7 @@ org.apache.hudi hudi-flink${flink.major.version}-bundle ${hudi.version} - test + provided log4j @@ -171,4 +171,4 @@ - \ No newline at end of file + diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java index 24485b48fa..6aa40311ac 100644 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java @@ -19,7 +19,6 @@ import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.config.Configuration; -import org.apache.fluss.exception.TableAlreadyExistException; import org.apache.fluss.lake.hudi.utils.HudiConversions; import org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils; import org.apache.fluss.lake.lakestorage.LakeCatalog; @@ -29,11 +28,13 @@ import org.apache.fluss.utils.IOUtils; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectPath; -import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.table.types.DataType; @@ -80,7 +81,7 @@ protected Catalog getHudiCatalog() { @Override public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Context context) - throws TableAlreadyExistException { + throws org.apache.fluss.exception.TableAlreadyExistException { LOG.info("create the lake table for : {} with props: {}", tablePath, tableDescriptor); ObjectPath objectPath = HudiConversions.toHudiObjectPath(tablePath); @@ -123,16 +124,17 @@ private void createTable( try { hudiCatalog.createTable(tablePath, catalogTable, false); LOG.info("Table {} created successfully.", tablePath); - } catch (TableAlreadyExistException e) { + } catch (org.apache.flink.table.catalog.exceptions.TableAlreadyExistException e) { // table already exists, check schema compatibility for idempotency try { CatalogBaseTable existingTable = hudiCatalog.getTable(tablePath); if (!isHudiSchemaCompatible(existingTable, catalogTable)) { - throw new TableAlreadyExistException( + throw new org.apache.fluss.exception.TableAlreadyExistException( String.format( "The table %s already exists in Hudi catalog, but the table schema is not compatible. " + "Please first drop the table in Hudi catalog or use a new table name.", - tablePath)); + tablePath), + e); } // if creating a new fluss table, we should ensure the lake table is empty // TODO: add emptiness check for Hudi table once LakeTieringFactory is implemented @@ -164,33 +166,21 @@ private void createTable( */ @VisibleForTesting boolean isHudiSchemaCompatible(CatalogBaseTable existingTable, CatalogBaseTable expectedTable) { - ResolvedSchema existingSchema; - ResolvedSchema expectedSchema; - try { - existingSchema = existingTable.getResolvedSchema(); - expectedSchema = expectedTable.getResolvedSchema(); - } catch (Exception e) { - // Fallback: if resolved schema is not available, compare unresolved columns - List existingColumns = - existingTable.getSchema().getColumns().stream() - .map(org.apache.flink.table.api.Schema.UnresolvedColumn::getName) - .toList(); - List expectedColumns = - expectedTable.getSchema().getColumns().stream() - .map(org.apache.flink.table.api.Schema.UnresolvedColumn::getName) - .toList(); - return existingColumns.equals(expectedColumns); - } - - if (existingSchema.getColumns().size() != expectedSchema.getColumns().size()) { + TableSchema existingSchema = existingTable.getSchema(); + TableSchema expectedSchema = expectedTable.getSchema(); + String[] existingFieldNames = existingSchema.getFieldNames(); + String[] expectedFieldNames = expectedSchema.getFieldNames(); + DataType[] existingFieldDataTypes = existingSchema.getFieldDataTypes(); + DataType[] expectedFieldDataTypes = expectedSchema.getFieldDataTypes(); + + if (existingFieldNames.length != expectedFieldNames.length + || existingFieldDataTypes.length != expectedFieldDataTypes.length) { return false; } - for (int i = 0; i < existingSchema.getColumns().size(); i++) { - org.apache.flink.table.catalog.Column existingCol = existingSchema.getColumns().get(i); - org.apache.flink.table.catalog.Column expectedCol = expectedSchema.getColumns().get(i); - if (!existingCol.getName().equals(expectedCol.getName()) - || !existingCol.getDataType().equals(expectedCol.getDataType())) { + for (int i = 0; i < existingFieldNames.length; i++) { + if (!existingFieldNames[i].equals(expectedFieldNames[i]) + || !existingFieldDataTypes[i].equals(expectedFieldDataTypes[i])) { return false; } } @@ -212,6 +202,8 @@ public void createDatabase(String databaseName) { new HashMap<>(), "Hudi database"); hudiCatalog.createDatabase(databaseName, database, true); } + } catch (DatabaseAlreadyExistException e) { + LOG.debug("Database {} already exists in Hudi catalog.", databaseName, e); } catch (UnsupportedOperationException e) { throw new UnsupportedOperationException( String.format( From 05161e2a14402b4430bab1d13e18ffaab114d355 Mon Sep 17 00:00:00 2001 From: fhan Date: Thu, 28 May 2026 19:39:00 +0800 Subject: [PATCH 10/11] [lake/hudi] fix build errors in HudiLakeCatalogTest --- .../fluss/lake/hudi/HudiLakeCatalogTest.java | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/HudiLakeCatalogTest.java b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/HudiLakeCatalogTest.java index 42ace0b451..66d517e0f4 100644 --- a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/HudiLakeCatalogTest.java +++ b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/HudiLakeCatalogTest.java @@ -35,6 +35,7 @@ import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.types.DataType; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -190,14 +191,14 @@ void testIsHudiSchemaCompatibleWithSameSchema() { CatalogTable table1 = buildTestCatalogTable( new String[] {"id", "name"}, - new org.apache.flink.table.api.DataTypes[] { + new DataType[] { org.apache.flink.table.api.DataTypes.INT().notNull(), org.apache.flink.table.api.DataTypes.STRING() }); CatalogTable table2 = buildTestCatalogTable( new String[] {"id", "name"}, - new org.apache.flink.table.api.DataTypes[] { + new DataType[] { org.apache.flink.table.api.DataTypes.INT().notNull(), org.apache.flink.table.api.DataTypes.STRING() }); @@ -210,16 +211,14 @@ void testIsHudiSchemaCompatibleWithDifferentColumnCount() { CatalogTable table1 = buildTestCatalogTable( new String[] {"id", "name"}, - new org.apache.flink.table.api.DataTypes[] { + new DataType[] { org.apache.flink.table.api.DataTypes.INT().notNull(), org.apache.flink.table.api.DataTypes.STRING() }); CatalogTable table2 = buildTestCatalogTable( new String[] {"id"}, - new org.apache.flink.table.api.DataTypes[] { - org.apache.flink.table.api.DataTypes.INT().notNull() - }); + new DataType[] {org.apache.flink.table.api.DataTypes.INT().notNull()}); assertThat(flussHudiLakeCatalog.isHudiSchemaCompatible(table1, table2)).isFalse(); } @@ -229,14 +228,14 @@ void testIsHudiSchemaCompatibleWithDifferentColumnName() { CatalogTable table1 = buildTestCatalogTable( new String[] {"id", "name"}, - new org.apache.flink.table.api.DataTypes[] { + new DataType[] { org.apache.flink.table.api.DataTypes.INT().notNull(), org.apache.flink.table.api.DataTypes.STRING() }); CatalogTable table2 = buildTestCatalogTable( new String[] {"id", "value"}, - new org.apache.flink.table.api.DataTypes[] { + new DataType[] { org.apache.flink.table.api.DataTypes.INT().notNull(), org.apache.flink.table.api.DataTypes.STRING() }); @@ -249,14 +248,14 @@ void testIsHudiSchemaCompatibleWithDifferentColumnType() { CatalogTable table1 = buildTestCatalogTable( new String[] {"id", "name"}, - new org.apache.flink.table.api.DataTypes[] { + new DataType[] { org.apache.flink.table.api.DataTypes.INT().notNull(), org.apache.flink.table.api.DataTypes.STRING() }); CatalogTable table2 = buildTestCatalogTable( new String[] {"id", "name"}, - new org.apache.flink.table.api.DataTypes[] { + new DataType[] { org.apache.flink.table.api.DataTypes.INT().notNull(), org.apache.flink.table.api.DataTypes.BIGINT() }); @@ -493,8 +492,7 @@ void testSystemColumnTimestampConflictThrowsException() { // Helper methods // ------------------------------------------------------------------ - private CatalogTable buildTestCatalogTable( - String[] columnNames, org.apache.flink.table.api.DataTypes[] columnTypes) { + private CatalogTable buildTestCatalogTable(String[] columnNames, DataType[] columnTypes) { List columns = new ArrayList<>(); for (int i = 0; i < columnNames.length; i++) { columns.add(Column.physical(columnNames[i], columnTypes[i])); From f8696f145804d3d9b0424842fa6c104848163444 Mon Sep 17 00:00:00 2001 From: fhan Date: Thu, 28 May 2026 20:35:32 +0800 Subject: [PATCH 11/11] [lake/hudi] fix test failures in HudiLakeCatalogTest --- .../fluss/lake/hudi/HudiLakeCatalog.java | 73 +++++++++++++++---- .../lake/hudi/utils/HudiConversions.java | 25 +++++-- 2 files changed, 76 insertions(+), 22 deletions(-) diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java index 6aa40311ac..f3f543728c 100644 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java @@ -28,22 +28,28 @@ import org.apache.fluss.utils.IOUtils; import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.Schema; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogDatabase; import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.types.AbstractDataType; import org.apache.flink.table.types.DataType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; +import java.util.Objects; import static org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils.HIVE_META_STORE_TYPE; import static org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils.HUDI_CATALOG_DEFAULT_NAME; @@ -166,25 +172,62 @@ private void createTable( */ @VisibleForTesting boolean isHudiSchemaCompatible(CatalogBaseTable existingTable, CatalogBaseTable expectedTable) { - TableSchema existingSchema = existingTable.getSchema(); - TableSchema expectedSchema = expectedTable.getSchema(); - String[] existingFieldNames = existingSchema.getFieldNames(); - String[] expectedFieldNames = expectedSchema.getFieldNames(); - DataType[] existingFieldDataTypes = existingSchema.getFieldDataTypes(); - DataType[] expectedFieldDataTypes = expectedSchema.getFieldDataTypes(); - - if (existingFieldNames.length != expectedFieldNames.length - || existingFieldDataTypes.length != expectedFieldDataTypes.length) { - return false; + return extractColumns(existingTable).equals(extractColumns(expectedTable)); + } + + private static List extractColumns(CatalogBaseTable table) { + if (table instanceof ResolvedCatalogBaseTable) { + ResolvedSchema resolvedSchema = + ((ResolvedCatalogBaseTable) table).getResolvedSchema(); + List columns = new ArrayList<>(); + for (Column column : resolvedSchema.getColumns()) { + columns.add(new ColumnSignature(column.getName(), column.getDataType())); + } + return columns; + } + + Schema schema = table.getUnresolvedSchema(); + List columns = new ArrayList<>(); + for (Schema.UnresolvedColumn column : schema.getColumns()) { + columns.add(new ColumnSignature(column.getName(), getDataType(column))); } + return columns; + } - for (int i = 0; i < existingFieldNames.length; i++) { - if (!existingFieldNames[i].equals(expectedFieldNames[i]) - || !existingFieldDataTypes[i].equals(expectedFieldDataTypes[i])) { + private static AbstractDataType getDataType(Schema.UnresolvedColumn column) { + if (column instanceof Schema.UnresolvedPhysicalColumn) { + return ((Schema.UnresolvedPhysicalColumn) column).getDataType(); + } else if (column instanceof Schema.UnresolvedMetadataColumn) { + return ((Schema.UnresolvedMetadataColumn) column).getDataType(); + } + return null; + } + + private static class ColumnSignature { + private final String name; + private final AbstractDataType dataType; + + private ColumnSignature(String name, AbstractDataType dataType) { + this.name = name; + this.dataType = dataType; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ColumnSignature)) { return false; } + ColumnSignature that = (ColumnSignature) o; + return Objects.equals(name, that.name) && Objects.equals(dataType, that.dataType); + } + + @Override + public int hashCode() { + return Objects.hash(name, dataType); } - return true; } @Override diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiConversions.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiConversions.java index 9becbe20f1..5f4df92873 100644 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiConversions.java +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiConversions.java @@ -61,12 +61,14 @@ public class HudiConversions { private static final String HUDI_CONF_PREFIX = "hudi."; private static final String DELIMITER = ","; + private static final String HUDI_TABLE_TYPE_KEY = "hoodie.datasource.write.table.type"; /** Hudi config options set by Fluss should not be set by users. */ @VisibleForTesting public static final Set HUDI_UNSETTABLE_OPTIONS = new HashSet<>(); static { HUDI_UNSETTABLE_OPTIONS.add(FlinkOptions.TABLE_TYPE.key()); + HUDI_UNSETTABLE_OPTIONS.add(HUDI_TABLE_TYPE_KEY); HUDI_UNSETTABLE_OPTIONS.add(FlinkOptions.RECORD_KEY_FIELD.key()); HUDI_UNSETTABLE_OPTIONS.add(FlinkOptions.INDEX_TYPE.key()); HUDI_UNSETTABLE_OPTIONS.add(FlinkOptions.INDEX_KEY_FIELD.key()); @@ -87,8 +89,8 @@ public static ObjectPath toHudiObjectPath(TablePath tablePath) { public static ResolvedSchema convertToFlinkResolvedSchema( TableDescriptor tableDescriptor, boolean isPkTable, String catalogMode) { // validate hudi options first - validateHudiOptions(tableDescriptor.getProperties()); - validateHudiOptions(tableDescriptor.getCustomProperties()); + validateHudiOptions(tableDescriptor.getProperties(), isPkTable); + validateHudiOptions(tableDescriptor.getCustomProperties(), isPkTable); // choose the correct converter based on catalog mode FlussDataTypeToHudiDataType converter = @@ -150,10 +152,7 @@ public static Map buildHudiTableProperties( } else { hudiProperties.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.COPY_ON_WRITE.name()); // set primary key for Fluss Log Table. - String recordKeyField = - tableDescriptor - .getCustomProperties() - .get(HUDI_CONF_PREFIX + FlinkOptions.RECORD_KEY_FIELD.key()); + String recordKeyField = getRecordKeyField(tableDescriptor); if (recordKeyField == null || recordKeyField.isEmpty()) { throw new IllegalArgumentException("Record key field should be set."); } @@ -227,13 +226,25 @@ private static void setFlussPropertyToHudi( } } - private static void validateHudiOptions(Map properties) { + private static String getRecordKeyField(TableDescriptor tableDescriptor) { + String recordKeyOption = HUDI_CONF_PREFIX + FlinkOptions.RECORD_KEY_FIELD.key(); + String recordKeyField = tableDescriptor.getCustomProperties().get(recordKeyOption); + if (recordKeyField == null) { + recordKeyField = tableDescriptor.getProperties().get(recordKeyOption); + } + return recordKeyField; + } + + private static void validateHudiOptions(Map properties, boolean isPkTable) { properties.forEach( (k, v) -> { String hudiKey = k; if (k.startsWith(HUDI_CONF_PREFIX)) { hudiKey = k.substring(HUDI_CONF_PREFIX.length()); } + if (!isPkTable && FlinkOptions.RECORD_KEY_FIELD.key().equals(hudiKey)) { + return; + } if (HUDI_UNSETTABLE_OPTIONS.contains(hudiKey)) { throw new InvalidConfigException( String.format(