diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java index f42b0c4fb2..d80c391f10 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java @@ -33,6 +33,7 @@ import java.util.HashMap; import java.util.Map; +import static org.apache.fluss.metadata.DataLakeFormat.HUDI; import static org.apache.fluss.metadata.DataLakeFormat.ICEBERG; import static org.apache.fluss.metadata.DataLakeFormat.PAIMON; import static org.apache.fluss.utils.Preconditions.checkNotNull; @@ -86,6 +87,11 @@ public Catalog getLakeCatalog( } else if (lakeFormat == ICEBERG) { catalog = IcebergCatalogFactory.create(catalogName, catalogProperties); this.lakeFormat = ICEBERG; + } else if (lakeFormat == HUDI) { + catalog = + HudiCatalogFactory.create( + catalogName, catalogProperties, classLoader); + this.lakeFormat = HUDI; } else { throw new UnsupportedOperationException( "Unsupported data lake format: " + lakeFormat); @@ -166,4 +172,94 @@ public static Catalog create(String catalogName, Map catalogProp } } } + + /** + * Factory using reflection to create Hudi Catalog instances. + * + *

Hudi is intentionally NOT a compile-time dependency of fluss-flink-common to avoid + * dragging the shaded {@code hudi-flink-bundle} (which re-bundles hadoop / parquet / + * avro / jackson / guava) into every downstream consumer. The bundle is shipped as a plugin + * under {@code plugins/hudi/} and loaded via the plugin classloader at runtime, mirroring the + * Iceberg integration pattern. + * + *

Unlike Iceberg's {@code FlinkCatalogFactory}, which keeps a convenience overload {@code + * createCatalog(String, Map)}, Hudi's {@code HoodieCatalogFactory} only exposes the standard + * Flink SPI signature {@code createCatalog(CatalogFactory.Context)}. Since Fluss is not + * invoking the factory through Flink's SQL {@code CREATE CATALOG} path, no {@code Context} is + * provided to us — we have to build one ourselves. We do that by reflectively instantiating + * Flink's {@code FactoryUtil$DefaultCatalogContext}, the same internal implementation Flink + * itself uses in {@code FactoryUtil#createCatalog(...)} when bridging the legacy and new SPI + * stacks. + */ + public static class HudiCatalogFactory { + + private static final String HOODIE_CATALOG_FACTORY_CLASS = + "org.apache.hudi.table.catalog.HoodieCatalogFactory"; + private static final String FLINK_CATALOG_FACTORY_CONTEXT_CLASS = + "org.apache.flink.table.factories.CatalogFactory$Context"; + private static final String FLINK_DEFAULT_CATALOG_CONTEXT_CLASS = + "org.apache.flink.table.factories.FactoryUtil$DefaultCatalogContext"; + + private HudiCatalogFactory() {} + + public static Catalog create( + String catalogName, + Map catalogProperties, + ClassLoader classLoader) { + try { + // 1) Build Hudi's catalog factory instance via reflection. + Class hoodieCatalogFactoryClass = + Class.forName(HOODIE_CATALOG_FACTORY_CLASS, true, classLoader); + Object factoryInstance = + hoodieCatalogFactoryClass.getDeclaredConstructor().newInstance(); + + // 2) Build a CatalogFactory.Context via Flink's internal default impl. + // Constructor: DefaultCatalogContext(String name, + // Map options, + // ReadableConfig configuration, + // ClassLoader classLoader) + // We have no real Flink ReadableConfig in this code path, so we pass an + // empty Flink Configuration (which implements ReadableConfig) as a benign + // placeholder — Hudi's factory only consumes 'options' / 'name' / classloader. + Class defaultCatalogContextClass = + Class.forName(FLINK_DEFAULT_CATALOG_CONTEXT_CLASS, true, classLoader); + Class readableConfigClass = + Class.forName( + "org.apache.flink.configuration.ReadableConfig", true, classLoader); + Class flinkConfigurationClass = + Class.forName( + "org.apache.flink.configuration.Configuration", true, classLoader); + Object emptyFlinkConfiguration = + flinkConfigurationClass.getDeclaredConstructor().newInstance(); + + Object context = + defaultCatalogContextClass + .getDeclaredConstructor( + String.class, + Map.class, + readableConfigClass, + ClassLoader.class) + .newInstance( + catalogName, + catalogProperties, + emptyFlinkConfiguration, + classLoader); + + // 3) Invoke HoodieCatalogFactory#createCatalog(Context). + Class contextInterface = + Class.forName(FLINK_CATALOG_FACTORY_CONTEXT_CLASS, true, classLoader); + Method createCatalogMethod = + hoodieCatalogFactoryClass.getMethod("createCatalog", contextInterface); + return (Catalog) createCatalogMethod.invoke(factoryInstance, context); + } catch (Exception e) { + throw new RuntimeException( + "Failed to create Hudi catalog using reflection. Please make sure " + + "hudi-flink-bundle (matching the current Flink version, " + + "Hudi 1.0+) is on the classpath, typically under " + + "plugins/hudi/, and that the catalog options include a valid " + + "'mode' (supported: 'hms' or 'dfs').", + e); + } + } + } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java index ad9918f389..3def9bbdc2 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java @@ -44,7 +44,7 @@ public DynamicTableSource createDynamicTableSource( originIdentifier.getDatabaseName(), tableName); - // For Iceberg and Paimon, pass the table name as-is to their factory. + // For Iceberg, Hudi and Paimon, pass the table name as-is to their factory. // Metadata tables will be handled internally by their respective factories. DynamicTableFactory.Context newContext = new FactoryUtil.DefaultDynamicTableContext( @@ -66,11 +66,13 @@ private DynamicTableSourceFactory getLakeTableFactory() { return getPaimonFactory(); case ICEBERG: return getIcebergFactory(); + case HUDI: + return getHudiFactory(); default: throw new UnsupportedOperationException( "Unsupported lake connector: " + lakeFlinkCatalog.getLakeFormat() - + ". Only 'paimon' and 'iceberg' are supported."); + + ". Only 'paimon', 'iceberg' and 'hudi' are supported."); } } @@ -101,4 +103,18 @@ private DynamicTableSourceFactory getIcebergFactory() { e); } } + + private DynamicTableSourceFactory getHudiFactory() { + try { + Class hudiFactoryClass = Class.forName("org.apache.hudi.table.HoodieTableFactory"); + return (DynamicTableSourceFactory) + hudiFactoryClass.getDeclaredConstructor().newInstance(); + } catch (Exception e) { + throw new RuntimeException( + "Failed to create Hudi table factory. Please ensure hudi-flink-bundle " + + "(matching the current Flink version) is on the classpath, " + + "typically under plugins/hudi/.", + e); + } + } } diff --git a/fluss-lake/fluss-lake-hudi/pom.xml b/fluss-lake/fluss-lake-hudi/pom.xml index d4c5070e10..0f2bc76d3a 100644 --- a/fluss-lake/fluss-lake-hudi/pom.xml +++ b/fluss-lake/fluss-lake-hudi/pom.xml @@ -36,7 +36,7 @@ org.apache.hudi hudi-flink${flink.major.version}-bundle ${hudi.version} - test + provided log4j @@ -69,12 +69,106 @@ + + org.apache.hadoop + hadoop-common + ${fluss.hadoop.version} + provided + + + avro + org.apache.avro + + + log4j + log4j + + + slf4j-log4j12 + org.slf4j + + + ch.qos.reload4j + reload4j + + + org.slf4j + slf4j-reload4j + + + jdk.tools + jdk.tools + + + protobuf-java + com.google.protobuf + + + commons-io + commons-io + + + org.apache.commons + commons-lang3 + + + org.apache.curator + curator-client + + + org.apache.curator + curator-framework + + + org.apache.curator + curator-recipes + + + org.apache.zookeeper + zookeeper + + + + org.apache.fluss fluss-common ${project.version} provided + + + org.apache.flink + flink-table-common + provided + + + org.apache.commons + commons-lang3 + + + + + + org.apache.flink + flink-table-api-java + ${flink.version} + provided + + + + + org.apache.fluss + fluss-common + ${project.version} + test-jar + test + + + + org.apache.fluss + fluss-test-utils + - \ No newline at end of file + diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/FlussDataTypeToHudiDataType.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/FlussDataTypeToHudiDataType.java new file mode 100644 index 0000000000..2d62c07f5d --- /dev/null +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/FlussDataTypeToHudiDataType.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.hudi; + +import org.apache.fluss.types.ArrayType; +import org.apache.fluss.types.BigIntType; +import org.apache.fluss.types.BinaryType; +import org.apache.fluss.types.BooleanType; +import org.apache.fluss.types.BytesType; +import org.apache.fluss.types.CharType; +import org.apache.fluss.types.DataTypeVisitor; +import org.apache.fluss.types.DateType; +import org.apache.fluss.types.DecimalType; +import org.apache.fluss.types.DoubleType; +import org.apache.fluss.types.FloatType; +import org.apache.fluss.types.IntType; +import org.apache.fluss.types.LocalZonedTimestampType; +import org.apache.fluss.types.MapType; +import org.apache.fluss.types.RowType; +import org.apache.fluss.types.SmallIntType; +import org.apache.fluss.types.StringType; +import org.apache.fluss.types.TimeType; +import org.apache.fluss.types.TimestampType; +import org.apache.fluss.types.TinyIntType; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; + +import static org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils.FILE_SYSTEM_TYPE; +import static org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils.HIVE_META_STORE_TYPE; + +/** Convert from Fluss's data type to Hudi's internal data type. */ +public class FlussDataTypeToHudiDataType implements DataTypeVisitor { + + private String catalogMode; + + public FlussDataTypeToHudiDataType(String catalogMode) { + this.catalogMode = catalogMode; + } + + public static final FlussDataTypeToHudiDataType DFS_INSTANCE = + new FlussDataTypeToHudiDataType(FILE_SYSTEM_TYPE); + public static final FlussDataTypeToHudiDataType HMS_INSTANCE = + new FlussDataTypeToHudiDataType(HIVE_META_STORE_TYPE); + + @Override + public DataType visit(CharType charType) { + return withNullability(DataTypes.STRING(), charType.isNullable()); + } + + @Override + public DataType visit(StringType stringType) { + return withNullability(DataTypes.STRING(), stringType.isNullable()); + } + + @Override + public DataType visit(BooleanType booleanType) { + return withNullability(DataTypes.BOOLEAN(), booleanType.isNullable()); + } + + @Override + public DataType visit(BinaryType binaryType) { + return withNullability(DataTypes.BINARY(binaryType.getLength()), binaryType.isNullable()); + } + + @Override + public DataType visit(BytesType bytesType) { + return withNullability(DataTypes.BYTES(), bytesType.isNullable()); + } + + @Override + public DataType visit(DecimalType decimalType) { + return withNullability( + DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale()), + decimalType.isNullable()); + } + + @Override + public DataType visit(TinyIntType tinyIntType) { + // Hudi IntType covers 8/16/32-bit integers (consistent with internal conversion logic) + return withNullability(DataTypes.TINYINT(), tinyIntType.isNullable()); + } + + @Override + public DataType visit(SmallIntType smallIntType) { + return withNullability(DataTypes.SMALLINT(), smallIntType.isNullable()); + } + + @Override + public DataType visit(IntType intType) { + return withNullability(DataTypes.INT(), intType.isNullable()); + } + + @Override + public DataType visit(BigIntType bigIntType) { + return withNullability(DataTypes.BIGINT(), bigIntType.isNullable()); + } + + @Override + public DataType visit(FloatType floatType) { + return withNullability(DataTypes.FLOAT(), floatType.isNullable()); + } + + @Override + public DataType visit(DoubleType doubleType) { + return withNullability(DataTypes.DOUBLE(), doubleType.isNullable()); + } + + @Override + public DataType visit(DateType dateType) { + return withNullability(DataTypes.DATE(), dateType.isNullable()); + } + + @Override + public DataType visit(TimeType timeType) { + return withNullability(DataTypes.TIME(timeType.getPrecision()), timeType.isNullable()); + } + + @Override + public DataType visit(TimestampType timestampType) { + return withNullability( + DataTypes.TIMESTAMP(timestampType.getPrecision()), timestampType.isNullable()); + } + + @Override + public DataType visit(LocalZonedTimestampType localZonedTimestampType) { + return HIVE_META_STORE_TYPE.equals(catalogMode) + ? withNullability(DataTypes.BIGINT(), localZonedTimestampType.isNullable()) + : withNullability( + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(), + localZonedTimestampType.isNullable()); + } + + @Override + public DataType visit(ArrayType arrayType) { + DataType elementType = arrayType.getElementType().accept(this); + return withNullability(DataTypes.ARRAY(elementType), arrayType.isNullable()); + } + + @Override + public DataType visit(MapType mapType) { + DataType keyType = mapType.getKeyType().accept(this); + DataType valueType = mapType.getValueType().accept(this); + return withNullability(DataTypes.MAP(keyType, valueType), mapType.isNullable()); + } + + @Override + public DataType visit(RowType rowType) { + DataTypes.Field[] fields = new DataTypes.Field[rowType.getFieldCount()]; + + for (int i = 0; i < rowType.getFieldCount(); i++) { + String fieldName = rowType.getFieldNames().get(i); + DataType fieldType = rowType.getTypeAt(i).accept(this); + fields[i] = DataTypes.FIELD(fieldName, fieldType); + } + return withNullability(DataTypes.ROW(fields), rowType.isNullable()); + } + + private DataType withNullability(DataType hudi, boolean nullable) { + if (hudi.getLogicalType().isNullable() != nullable) { + return nullable ? hudi.nullable() : hudi.notNull(); + } + return hudi; + } +} diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java new file mode 100644 index 0000000000..f3f543728c --- /dev/null +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeCatalog.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.hudi; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.lake.hudi.utils.HudiConversions; +import org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils; +import org.apache.fluss.lake.lakestorage.LakeCatalog; +import org.apache.fluss.metadata.TableChange; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.utils.IOUtils; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.types.AbstractDataType; +import org.apache.flink.table.types.DataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Objects; + +import static org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils.HIVE_META_STORE_TYPE; +import static org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils.HUDI_CATALOG_DEFAULT_NAME; +import static org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils.MODE_CONFIG; +import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME; +import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME; +import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME; + +/** Implementation of {@link LakeCatalog} for Hudi. */ +public class HudiLakeCatalog implements LakeCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(HudiLakeCatalog.class); + + public static final LinkedHashMap SYSTEM_COLUMNS = new LinkedHashMap<>(); + + static { + SYSTEM_COLUMNS.put(BUCKET_COLUMN_NAME, DataTypes.INT()); + SYSTEM_COLUMNS.put(OFFSET_COLUMN_NAME, DataTypes.BIGINT()); + SYSTEM_COLUMNS.put(TIMESTAMP_COLUMN_NAME, DataTypes.TIMESTAMP(6)); + } + + private final Catalog hudiCatalog; + private final String catalogMode; + + public HudiLakeCatalog(Configuration configuration) { + this.catalogMode = configuration.toMap().getOrDefault(MODE_CONFIG, HIVE_META_STORE_TYPE); + this.hudiCatalog = HudiCatalogUtils.createHudiCatalog(configuration); + this.hudiCatalog.open(); + } + + @VisibleForTesting + protected Catalog getHudiCatalog() { + return hudiCatalog; + } + + @Override + public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Context context) + throws org.apache.fluss.exception.TableAlreadyExistException { + LOG.info("create the lake table for : {} with props: {}", tablePath, tableDescriptor); + + ObjectPath objectPath = HudiConversions.toHudiObjectPath(tablePath); + + boolean isPkTable = tableDescriptor.getSchema().getPrimaryKeyIndexes().length > 0; + + // Create Hudi catalog table + CatalogTable catalogTable = + HudiConversions.createHudiCatalogTable(tableDescriptor, isPkTable, catalogMode); + + // Create table in Hudi catalog + try { + createTable(objectPath, catalogTable, context.isCreatingFlussTable()); + } catch (DatabaseNotExistException e) { + createDatabase(tablePath.getDatabaseName()); + try { + createTable(objectPath, catalogTable, context.isCreatingFlussTable()); + } catch (DatabaseNotExistException t) { + // shouldn't happen in normal cases + throw new RuntimeException( + String.format( + "Fail to create table %s in Hudi, because " + + "Database %s still doesn't exist although create database " + + "successfully, please try again.", + tablePath, tablePath.getDatabaseName())); + } + } + } + + @Override + public void alterTable(TablePath tablePath, List tableChanges, Context context) + throws org.apache.fluss.exception.TableNotExistException { + throw new UnsupportedOperationException( + "Alter table is not supported for Hudi at the moment"); + } + + private void createTable( + ObjectPath tablePath, CatalogBaseTable catalogTable, boolean isCreatingFlussTable) + throws DatabaseNotExistException { + try { + hudiCatalog.createTable(tablePath, catalogTable, false); + LOG.info("Table {} created successfully.", tablePath); + } catch (org.apache.flink.table.catalog.exceptions.TableAlreadyExistException e) { + // table already exists, check schema compatibility for idempotency + try { + CatalogBaseTable existingTable = hudiCatalog.getTable(tablePath); + if (!isHudiSchemaCompatible(existingTable, catalogTable)) { + throw new org.apache.fluss.exception.TableAlreadyExistException( + String.format( + "The table %s already exists in Hudi catalog, but the table schema is not compatible. " + + "Please first drop the table in Hudi catalog or use a new table name.", + tablePath), + e); + } + // if creating a new fluss table, we should ensure the lake table is empty + // TODO: add emptiness check for Hudi table once LakeTieringFactory is implemented + if (isCreatingFlussTable) { + LOG.warn( + "Table {} already exists in Hudi catalog with compatible schema. " + + "Skipping creation as the table may not be empty.", + tablePath); + } + } catch (TableNotExistException tableNotExistException) { + // shouldn't happen in normal cases + throw new RuntimeException( + String.format( + "Failed to create table %s in Hudi. The table already existed " + + "during the initial creation attempt, but subsequently " + + "could not be found when trying to get it. " + + "Please check whether the Hudi table was manually deleted, and try again.", + tablePath)); + } + } + } + + /** + * Checks whether the existing Hudi table schema is compatible with the expected schema. + * + *

Compatibility means the column names, types, and nullability match. This is used for + * crash-recovery idempotency: if the table already exists with a compatible schema, the + * creation is considered successful. + */ + @VisibleForTesting + boolean isHudiSchemaCompatible(CatalogBaseTable existingTable, CatalogBaseTable expectedTable) { + return extractColumns(existingTable).equals(extractColumns(expectedTable)); + } + + private static List extractColumns(CatalogBaseTable table) { + if (table instanceof ResolvedCatalogBaseTable) { + ResolvedSchema resolvedSchema = + ((ResolvedCatalogBaseTable) table).getResolvedSchema(); + List columns = new ArrayList<>(); + for (Column column : resolvedSchema.getColumns()) { + columns.add(new ColumnSignature(column.getName(), column.getDataType())); + } + return columns; + } + + Schema schema = table.getUnresolvedSchema(); + List columns = new ArrayList<>(); + for (Schema.UnresolvedColumn column : schema.getColumns()) { + columns.add(new ColumnSignature(column.getName(), getDataType(column))); + } + return columns; + } + + private static AbstractDataType getDataType(Schema.UnresolvedColumn column) { + if (column instanceof Schema.UnresolvedPhysicalColumn) { + return ((Schema.UnresolvedPhysicalColumn) column).getDataType(); + } else if (column instanceof Schema.UnresolvedMetadataColumn) { + return ((Schema.UnresolvedMetadataColumn) column).getDataType(); + } + return null; + } + + private static class ColumnSignature { + private final String name; + private final AbstractDataType dataType; + + private ColumnSignature(String name, AbstractDataType dataType) { + this.name = name; + this.dataType = dataType; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ColumnSignature)) { + return false; + } + ColumnSignature that = (ColumnSignature) o; + return Objects.equals(name, that.name) && Objects.equals(dataType, that.dataType); + } + + @Override + public int hashCode() { + return Objects.hash(name, dataType); + } + } + + @Override + public void close() { + if (hudiCatalog != null && hudiCatalog instanceof AutoCloseable) { + IOUtils.closeQuietly((AutoCloseable) hudiCatalog, HUDI_CATALOG_DEFAULT_NAME); + } + } + + public void createDatabase(String databaseName) { + try { + if (!hudiCatalog.databaseExists(databaseName)) { + CatalogDatabase database = + new org.apache.flink.table.catalog.CatalogDatabaseImpl( + new HashMap<>(), "Hudi database"); + hudiCatalog.createDatabase(databaseName, database, true); + } + } catch (DatabaseAlreadyExistException e) { + LOG.debug("Database {} already exists in Hudi catalog.", databaseName, e); + } catch (UnsupportedOperationException e) { + throw new UnsupportedOperationException( + String.format( + "The underlying Hudi catalog does not support database operations for database '%s'. " + + "This typically occurs with a filesystem-based catalog (dfs mode). " + + "Consider using Hive Metastore (hms mode) instead.", + databaseName), + e); + } + } +} diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeStorage.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeStorage.java index f0865de0ea..66db56057b 100644 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeStorage.java +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeStorage.java @@ -43,10 +43,7 @@ public HudiLakeStorage(Configuration configuration) { @Override public LakeCatalog createLakeCatalog() { - throw new UnsupportedOperationException( - "HudiLakeStorage is currently a scaffold and does not support creating a " - + "LakeCatalog yet. Verify that Hudi lake storage was selected " - + "intentionally and that the required Hudi support/module is available."); + return new HudiLakeCatalog(hudiConfig); } @Override diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiConversions.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiConversions.java new file mode 100644 index 0000000000..5f4df92873 --- /dev/null +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiConversions.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.hudi.utils; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.exception.InvalidConfigException; +import org.apache.fluss.exception.InvalidTableException; +import org.apache.fluss.lake.hudi.FlussDataTypeToHudiDataType; +import org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; + +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.types.DataType; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.index.HoodieIndex; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.fluss.lake.hudi.HudiLakeCatalog.SYSTEM_COLUMNS; +import static org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils.HIVE_META_STORE_TYPE; + +/** Utils for conversion between Hudi and Fluss. */ +public class HudiConversions { + + private static final Logger LOG = LoggerFactory.getLogger(HudiConversions.class); + + // for fluss config + private static final String FLUSS_CONF_PREFIX = "fluss."; + // for hudi config + private static final String HUDI_CONF_PREFIX = "hudi."; + + private static final String DELIMITER = ","; + private static final String HUDI_TABLE_TYPE_KEY = "hoodie.datasource.write.table.type"; + + /** Hudi config options set by Fluss should not be set by users. */ + @VisibleForTesting public static final Set HUDI_UNSETTABLE_OPTIONS = new HashSet<>(); + + static { + HUDI_UNSETTABLE_OPTIONS.add(FlinkOptions.TABLE_TYPE.key()); + HUDI_UNSETTABLE_OPTIONS.add(HUDI_TABLE_TYPE_KEY); + HUDI_UNSETTABLE_OPTIONS.add(FlinkOptions.RECORD_KEY_FIELD.key()); + HUDI_UNSETTABLE_OPTIONS.add(FlinkOptions.INDEX_TYPE.key()); + HUDI_UNSETTABLE_OPTIONS.add(FlinkOptions.INDEX_KEY_FIELD.key()); + HUDI_UNSETTABLE_OPTIONS.add(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key()); + HUDI_UNSETTABLE_OPTIONS.add(FlinkOptions.PARTITION_PATH_FIELD.key()); + } + + /** + * Converts a Fluss TablePath to a Hudi ObjectPath. + * + * @param tablePath the Fluss table path + * @return the corresponding Hudi ObjectPath + */ + public static ObjectPath toHudiObjectPath(TablePath tablePath) { + return new ObjectPath(tablePath.getDatabaseName(), tablePath.getTableName()); + } + + public static ResolvedSchema convertToFlinkResolvedSchema( + TableDescriptor tableDescriptor, boolean isPkTable, String catalogMode) { + // validate hudi options first + validateHudiOptions(tableDescriptor.getProperties(), isPkTable); + validateHudiOptions(tableDescriptor.getCustomProperties(), isPkTable); + + // choose the correct converter based on catalog mode + FlussDataTypeToHudiDataType converter = + HIVE_META_STORE_TYPE.equals(catalogMode) + ? FlussDataTypeToHudiDataType.HMS_INSTANCE + : FlussDataTypeToHudiDataType.DFS_INSTANCE; + + List columns = new ArrayList<>(); + + // Add regular columns + for (org.apache.fluss.metadata.Schema.Column column : + tableDescriptor.getSchema().getColumns()) { + String columnName = column.getName(); + if (SYSTEM_COLUMNS.containsKey(columnName)) { + throw new InvalidTableException( + "Column " + + columnName + + " conflicts with a system column name of hudi table, please rename the column."); + } + columns.add(Column.physical(columnName, column.getDataType().accept(converter))); + } + + // add system metadata columns to schema + for (Map.Entry systemColumn : SYSTEM_COLUMNS.entrySet()) { + columns.add(Column.physical(systemColumn.getKey(), systemColumn.getValue())); + } + + UniqueConstraint constraint = null; + // Set primary key if this is a PK table + if (isPkTable && tableDescriptor.hasPrimaryKey()) { + constraint = + UniqueConstraint.primaryKey( + "primaryKey", extractPrimaryKeyColumns(tableDescriptor)); + } + + return new ResolvedSchema(columns, Collections.emptyList(), constraint); + } + + /** + * Builds Hudi table properties from Fluss TableDescriptor. + * + * @param tableDescriptor the Fluss table descriptor + * @param isPkTable whether this is a primary key table + * @return map of Hudi table properties + */ + public static Map buildHudiTableProperties( + TableDescriptor tableDescriptor, boolean isPkTable) { + Map hudiProperties = new HashMap<>(); + // Set connector type + hudiProperties.put(FactoryUtil.CONNECTOR.key(), "hudi"); + hudiProperties.put("storageType", "hudi"); + + // Set table type based on whether it's a PK table + if (isPkTable) { + hudiProperties.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); + hudiProperties.put( + FlinkOptions.RECORD_KEY_FIELD.key(), + String.join(DELIMITER, extractPrimaryKeyColumns(tableDescriptor))); + } else { + hudiProperties.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.COPY_ON_WRITE.name()); + // set primary key for Fluss Log Table. + String recordKeyField = getRecordKeyField(tableDescriptor); + if (recordKeyField == null || recordKeyField.isEmpty()) { + throw new IllegalArgumentException("Record key field should be set."); + } + hudiProperties.put(FlinkOptions.RECORD_KEY_FIELD.key(), recordKeyField); + hudiProperties.put( + FlinkOptions.INDEX_KEY_FIELD.key(), + recordKeyField); // use primary key as index key + } + + // buket keys column + hudiProperties.put(FlinkOptions.INDEX_TYPE.key(), HoodieIndex.IndexType.BUCKET.name()); + List bucketKeys = tableDescriptor.getBucketKeys(); + int numBuckets = + tableDescriptor + .getTableDistribution() + .flatMap(TableDescriptor.TableDistribution::getBucketCount) + .orElseThrow( + () -> new IllegalArgumentException("Bucket count should be set.")); + + if (!bucketKeys.isEmpty()) { + hudiProperties.put( + FlinkOptions.INDEX_KEY_FIELD.key(), String.join(DELIMITER, bucketKeys)); + } + hudiProperties.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), String.valueOf(numBuckets)); + + // partition keys column + List partitionKeys = tableDescriptor.getPartitionKeys(); + hudiProperties.put( + FlinkOptions.PARTITION_PATH_FIELD.key(), String.join(DELIMITER, partitionKeys)); + + // Convert Fluss properties to Hudi properties + tableDescriptor + .getProperties() + .forEach((k, v) -> setFlussPropertyToHudi(k, v, hudiProperties)); + tableDescriptor + .getCustomProperties() + .forEach((k, v) -> setFlussPropertyToHudi(k, v, hudiProperties)); + + return hudiProperties; + } + + /** + * Creates a CatalogTable for Hudi from Fluss TableDescriptor. + * + * @param tableDescriptor the Fluss table descriptor + * @param isPkTable whether this is a primary key table + * @return the created CatalogTable + */ + public static CatalogTable createHudiCatalogTable( + TableDescriptor tableDescriptor, boolean isPkTable, String catalogMode) { + ResolvedSchema resolvedSchema = + convertToFlinkResolvedSchema(tableDescriptor, isPkTable, catalogMode); + Schema schema = Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(); + List partitionKeys = tableDescriptor.getPartitionKeys(); + Map options = buildHudiTableProperties(tableDescriptor, isPkTable); + LOG.info("Hudi table properties: {}", options); + + String comment = tableDescriptor.getComment().orElse("Hudi table created from Fluss"); + return HIVE_META_STORE_TYPE.equals(catalogMode) + ? HudiCatalogUtils.createCatalogTable(schema, partitionKeys, options, comment) + : HudiCatalogUtils.createResolvedCatalogTable( + schema, partitionKeys, options, comment, resolvedSchema); + } + + private static void setFlussPropertyToHudi( + String key, String value, Map hudiProperties) { + if (key.startsWith(HUDI_CONF_PREFIX)) { + hudiProperties.put(key.substring(HUDI_CONF_PREFIX.length()), value); + } else { + hudiProperties.put(FLUSS_CONF_PREFIX + key, value); + } + } + + private static String getRecordKeyField(TableDescriptor tableDescriptor) { + String recordKeyOption = HUDI_CONF_PREFIX + FlinkOptions.RECORD_KEY_FIELD.key(); + String recordKeyField = tableDescriptor.getCustomProperties().get(recordKeyOption); + if (recordKeyField == null) { + recordKeyField = tableDescriptor.getProperties().get(recordKeyOption); + } + return recordKeyField; + } + + private static void validateHudiOptions(Map properties, boolean isPkTable) { + properties.forEach( + (k, v) -> { + String hudiKey = k; + if (k.startsWith(HUDI_CONF_PREFIX)) { + hudiKey = k.substring(HUDI_CONF_PREFIX.length()); + } + if (!isPkTable && FlinkOptions.RECORD_KEY_FIELD.key().equals(hudiKey)) { + return; + } + if (HUDI_UNSETTABLE_OPTIONS.contains(hudiKey)) { + throw new InvalidConfigException( + String.format( + "The Hudi option %s will be set automatically by Fluss " + + "and should not be set manually.", + k)); + } + }); + } + + /** + * Extracts the primary key column names from a Fluss TableDescriptor. + * + * @param tableDescriptor the Fluss table descriptor + * @return list of primary key column names + */ + private static List extractPrimaryKeyColumns(TableDescriptor tableDescriptor) { + List primaryKeys = new ArrayList<>(); + for (int pkIndex : tableDescriptor.getSchema().getPrimaryKeyIndexes()) { + primaryKeys.add(tableDescriptor.getSchema().getColumns().get(pkIndex).getName()); + } + return primaryKeys; + } +} diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/catalog/HudiCatalogUtils.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/catalog/HudiCatalogUtils.java new file mode 100644 index 0000000000..c4bd3413a3 --- /dev/null +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/catalog/HudiCatalogUtils.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.hudi.utils.catalog; + +import org.apache.fluss.config.Configuration; + +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.hudi.exception.HoodieCatalogException; +import org.apache.hudi.table.catalog.CatalogOptions; +import org.apache.hudi.table.catalog.HoodieCatalog; +import org.apache.hudi.table.catalog.HoodieHiveCatalog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Locale; +import java.util.Map; + +/** Implementation of {@link HudiCatalogUtils} for Hudi. */ +public class HudiCatalogUtils { + + private static final Logger LOG = LoggerFactory.getLogger(HudiCatalogUtils.class); + + public static final String MODE_CONFIG = "mode"; + public static final String CATALOG_NAME_CONFIG = "name"; + + public static final String HUDI_CATALOG_DEFAULT_NAME = "fluss-hudi-catalog"; + public static final String HIVE_META_STORE_TYPE = "hms"; + public static final String FILE_SYSTEM_TYPE = "dfs"; + + public static Catalog createHudiCatalog(Configuration configuration) { + Map hudiProps = configuration.toMap(); + // copy the configuration to avoid modifying the original + Configuration copiedConfig = new Configuration(configuration); + copiedConfig.setString(CatalogOptions.DEFAULT_DATABASE.key(), "tmp"); + copiedConfig.setString(CatalogOptions.TABLE_EXTERNAL.key(), "true"); + String catalogName = hudiProps.getOrDefault(CATALOG_NAME_CONFIG, HUDI_CATALOG_DEFAULT_NAME); + return buildHudiCatalog( + catalogName, + hudiProps, + org.apache.flink.configuration.Configuration.fromMap(copiedConfig.toMap())); + } + + public static Catalog buildHudiCatalog( + String catalogName, + Map hudiProps, + org.apache.flink.configuration.Configuration configuration) { + String catalogMode = + hudiProps.getOrDefault(CatalogOptions.MODE.key(), HIVE_META_STORE_TYPE); + LOG.info( + "create hudi catalog: {}, mode: {}, configuration: {}", + catalogName, + catalogMode, + configuration); + switch (catalogMode.toLowerCase(Locale.ENGLISH)) { + case HIVE_META_STORE_TYPE: + return new HoodieHiveCatalog(catalogName, configuration); + case FILE_SYSTEM_TYPE: + return new HoodieCatalog(catalogName, configuration); + default: + throw new HoodieCatalogException( + String.format( + "Invalid hudi-catalog mode: %s, supported modes: [hms, dfs].", + catalogMode)); + } + } + + public static CatalogTable createCatalogTable( + Schema schema, + List partitionKeys, + Map options, + @Nullable String comment) { + return CatalogTable.of(schema, comment, partitionKeys, options); + } + + public static ResolvedCatalogTable createResolvedCatalogTable( + Schema schema, + List partitionKeys, + Map options, + @Nullable String comment, + ResolvedSchema resolvedSchema) { + return new ResolvedCatalogTable( + CatalogTable.of(schema, comment, partitionKeys, options), resolvedSchema); + } +} diff --git a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/HudiLakeCatalogTest.java b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/HudiLakeCatalogTest.java new file mode 100644 index 0000000000..66d517e0f4 --- /dev/null +++ b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/HudiLakeCatalogTest.java @@ -0,0 +1,507 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.hudi; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.InvalidConfigException; +import org.apache.fluss.exception.InvalidTableException; +import org.apache.fluss.exception.TableAlreadyExistException; +import org.apache.fluss.lake.hudi.utils.HudiConversions; +import org.apache.fluss.lake.lakestorage.TestingLakeCatalogContext; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.types.DataTypes; + +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.types.DataType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Unit test for {@link HudiLakeCatalog}. */ +class HudiLakeCatalogTest { + + @TempDir private File tempWarehouseDir; + + private HudiLakeCatalog flussHudiLakeCatalog; + + @BeforeEach + public void setUp() { + Configuration configuration = new Configuration(); + configuration.setString("catalog.path", tempWarehouseDir.toURI().toString()); + configuration.setString("mode", "dfs"); + this.flussHudiLakeCatalog = new HudiLakeCatalog(configuration); + } + + /** Verify property prefix rewriting. */ + @Test + void testPropertyPrefixRewriting() throws TableNotExistException { + String database = "test_db"; + String tableName = "test_table"; + + Schema flussSchema = + Schema.newBuilder().column("id", DataTypes.BIGINT()).primaryKey("id").build(); + + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(flussSchema) + .distributedBy(3) + .property("hudi.precombine.field", "id") + .property("table.datalake.freshness", "30s") + .build(); + + TablePath tablePath = TablePath.of(database, tableName); + TestingLakeCatalogContext context = new TestingLakeCatalogContext(); + flussHudiLakeCatalog.createTable(tablePath, tableDescriptor, context); + + CatalogBaseTable table = + flussHudiLakeCatalog + .getHudiCatalog() + .getTable(HudiConversions.toHudiObjectPath(tablePath)); + + // Verify property prefix rewriting + assertThat(table.getOptions()).containsEntry("precombine.field", "id"); + assertThat(table.getOptions()).containsEntry("fluss.table.datalake.freshness", "30s"); + assertThat(table.getOptions()) + .doesNotContainKeys("hudi.precombine.field", "table.datalake.freshness"); + } + + @Test + void testCreatePrimaryKeyTable() throws TableNotExistException { + String database = "test_db"; + String tableName = "pk_table"; + + Schema flussSchema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .withComment("pk_table") + .primaryKey("id") + .build(); + + TableDescriptor tableDescriptor = + TableDescriptor.builder().schema(flussSchema).distributedBy(4, "id").build(); + + TablePath tablePath = TablePath.of(database, tableName); + TestingLakeCatalogContext context = new TestingLakeCatalogContext(); + flussHudiLakeCatalog.createTable(tablePath, tableDescriptor, context); + + ObjectPath objectPath = HudiConversions.toHudiObjectPath(tablePath); + CatalogBaseTable table = flussHudiLakeCatalog.getHudiCatalog().getTable(objectPath); + + assertThat(table).isNotNull(); + + List primaryKeys = new ArrayList<>(); + primaryKeys.add("id"); + TableSchema expectHudiSchema = + TableSchema.builder() + .field("id", org.apache.flink.table.api.DataTypes.INT().notNull()) + .field("name", org.apache.flink.table.api.DataTypes.STRING()) + .field("__bucket", org.apache.flink.table.api.DataTypes.INT()) + .field("__offset", org.apache.flink.table.api.DataTypes.BIGINT()) + .field("__timestamp", org.apache.flink.table.api.DataTypes.TIMESTAMP(6)) + .primaryKey("primaryKey", primaryKeys.toArray(new String[0])) + .build(); + + assertThat(table.getUnresolvedSchema()).isEqualTo(expectHudiSchema.toSchema()); + } + + @Test + void testCreateLogTable() throws TableNotExistException { + String database = "test_db"; + String tableName = "log_table"; + + Schema flussSchema = + Schema.newBuilder() + .column("id", DataTypes.BIGINT()) + .column("name", DataTypes.STRING()) + .build(); + + Map customProperties = new HashMap<>(); + customProperties.put("hudi.hoodie.datasource.write.recordkey.field", "id"); + + TableDescriptor td = + TableDescriptor.builder() + .schema(flussSchema) + .distributedBy(3) // no bucket key + .customProperties(customProperties) + .build(); + + TablePath tablePath = TablePath.of(database, tableName); + TestingLakeCatalogContext context = new TestingLakeCatalogContext(); + flussHudiLakeCatalog.createTable(tablePath, td, context); + + ObjectPath objectPath = HudiConversions.toHudiObjectPath(tablePath); + CatalogBaseTable table = flussHudiLakeCatalog.getHudiCatalog().getTable(objectPath); + + List primaryKeys = new ArrayList<>(); + primaryKeys.add("id"); + TableSchema expectHudiSchema = + TableSchema.builder() + .field("id", org.apache.flink.table.api.DataTypes.BIGINT().notNull()) + .field("name", org.apache.flink.table.api.DataTypes.STRING()) + .field("__bucket", org.apache.flink.table.api.DataTypes.INT()) + .field("__offset", org.apache.flink.table.api.DataTypes.BIGINT()) + .field("__timestamp", org.apache.flink.table.api.DataTypes.TIMESTAMP(6)) + .primaryKey("PK_id", primaryKeys.toArray(new String[0])) + .build(); + + assertThat(table.getUnresolvedSchema()).isEqualTo(expectHudiSchema.toSchema()); + } + + // ------------------------------------------------------------------ + // isHudiSchemaCompatible() tests + // ------------------------------------------------------------------ + + @Test + void testIsHudiSchemaCompatibleWithSameSchema() { + // Build two catalog tables with identical schema + CatalogTable table1 = + buildTestCatalogTable( + new String[] {"id", "name"}, + new DataType[] { + org.apache.flink.table.api.DataTypes.INT().notNull(), + org.apache.flink.table.api.DataTypes.STRING() + }); + CatalogTable table2 = + buildTestCatalogTable( + new String[] {"id", "name"}, + new DataType[] { + org.apache.flink.table.api.DataTypes.INT().notNull(), + org.apache.flink.table.api.DataTypes.STRING() + }); + + assertThat(flussHudiLakeCatalog.isHudiSchemaCompatible(table1, table2)).isTrue(); + } + + @Test + void testIsHudiSchemaCompatibleWithDifferentColumnCount() { + CatalogTable table1 = + buildTestCatalogTable( + new String[] {"id", "name"}, + new DataType[] { + org.apache.flink.table.api.DataTypes.INT().notNull(), + org.apache.flink.table.api.DataTypes.STRING() + }); + CatalogTable table2 = + buildTestCatalogTable( + new String[] {"id"}, + new DataType[] {org.apache.flink.table.api.DataTypes.INT().notNull()}); + + assertThat(flussHudiLakeCatalog.isHudiSchemaCompatible(table1, table2)).isFalse(); + } + + @Test + void testIsHudiSchemaCompatibleWithDifferentColumnName() { + CatalogTable table1 = + buildTestCatalogTable( + new String[] {"id", "name"}, + new DataType[] { + org.apache.flink.table.api.DataTypes.INT().notNull(), + org.apache.flink.table.api.DataTypes.STRING() + }); + CatalogTable table2 = + buildTestCatalogTable( + new String[] {"id", "value"}, + new DataType[] { + org.apache.flink.table.api.DataTypes.INT().notNull(), + org.apache.flink.table.api.DataTypes.STRING() + }); + + assertThat(flussHudiLakeCatalog.isHudiSchemaCompatible(table1, table2)).isFalse(); + } + + @Test + void testIsHudiSchemaCompatibleWithDifferentColumnType() { + CatalogTable table1 = + buildTestCatalogTable( + new String[] {"id", "name"}, + new DataType[] { + org.apache.flink.table.api.DataTypes.INT().notNull(), + org.apache.flink.table.api.DataTypes.STRING() + }); + CatalogTable table2 = + buildTestCatalogTable( + new String[] {"id", "name"}, + new DataType[] { + org.apache.flink.table.api.DataTypes.INT().notNull(), + org.apache.flink.table.api.DataTypes.BIGINT() + }); + + assertThat(flussHudiLakeCatalog.isHudiSchemaCompatible(table1, table2)).isFalse(); + } + + // ------------------------------------------------------------------ + // Duplicate table creation idempotency tests + // ------------------------------------------------------------------ + + @Test + void testCreateDuplicateTableWithCompatibleSchema() throws TableNotExistException { + String database = "idempotent_db"; + String tableName = "idempotent_table"; + + Schema flussSchema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .primaryKey("id") + .build(); + + TableDescriptor tableDescriptor = + TableDescriptor.builder().schema(flussSchema).distributedBy(4, "id").build(); + + TablePath tablePath = TablePath.of(database, tableName); + TestingLakeCatalogContext context = new TestingLakeCatalogContext(); + + // First creation should succeed + flussHudiLakeCatalog.createTable(tablePath, tableDescriptor, context); + + // Second creation with same schema should not throw + flussHudiLakeCatalog.createTable(tablePath, tableDescriptor, context); + + // Verify the table still exists and has correct schema + CatalogBaseTable table = + flussHudiLakeCatalog + .getHudiCatalog() + .getTable(HudiConversions.toHudiObjectPath(tablePath)); + assertThat(table).isNotNull(); + } + + @Test + void testCreateDuplicateTableWithIncompatibleSchema() { + String database = "incompat_db"; + String tableName = "incompat_table"; + + // First: create a table with id(INT) + name(STRING) + Schema flussSchema1 = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .primaryKey("id") + .build(); + + TableDescriptor tableDescriptor1 = + TableDescriptor.builder().schema(flussSchema1).distributedBy(4, "id").build(); + + TablePath tablePath = TablePath.of(database, tableName); + TestingLakeCatalogContext context = new TestingLakeCatalogContext(); + flussHudiLakeCatalog.createTable(tablePath, tableDescriptor1, context); + + // Second: try creating a table with id(INT) + name(BIGINT) - different type + Schema flussSchema2 = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.BIGINT()) + .primaryKey("id") + .build(); + + TableDescriptor tableDescriptor2 = + TableDescriptor.builder().schema(flussSchema2).distributedBy(4, "id").build(); + + assertThatThrownBy( + () -> + flussHudiLakeCatalog.createTable( + tablePath, tableDescriptor2, context)) + .isInstanceOf(TableAlreadyExistException.class) + .hasMessageContaining("not compatible"); + } + + // ------------------------------------------------------------------ + // HUDI_UNSETTABLE_OPTIONS validation tests + // ------------------------------------------------------------------ + + @Test + void testUnsettableOptionInPropertiesThrowsException() { + Schema flussSchema = + Schema.newBuilder().column("id", DataTypes.INT()).primaryKey("id").build(); + + // Set a protected Hudi option via properties (without hudi. prefix) + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(flussSchema) + .distributedBy(4, "id") + .property("hudi.hoodie.datasource.write.table.type", "COPY_ON_WRITE") + .build(); + + TablePath tablePath = TablePath.of("test_db", "protected_option_table"); + TestingLakeCatalogContext context = new TestingLakeCatalogContext(); + + assertThatThrownBy( + () -> flussHudiLakeCatalog.createTable(tablePath, tableDescriptor, context)) + .isInstanceOf(InvalidConfigException.class) + .hasMessageContaining("hoodie.datasource.write.table.type") + .hasMessageContaining("should not be set manually"); + } + + @Test + void testUnsettableOptionInCustomPropertiesThrowsException() { + Schema flussSchema = + Schema.newBuilder().column("id", DataTypes.INT()).primaryKey("id").build(); + + Map customProperties = new HashMap<>(); + // Set a protected Hudi option via customProperties + customProperties.put("hudi.hoodie.datasource.write.recordkey.field", "id"); + + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(flussSchema) + .distributedBy(4, "id") + .customProperties(customProperties) + .build(); + + TablePath tablePath = TablePath.of("test_db", "protected_custom_table"); + TestingLakeCatalogContext context = new TestingLakeCatalogContext(); + + assertThatThrownBy( + () -> flussHudiLakeCatalog.createTable(tablePath, tableDescriptor, context)) + .isInstanceOf(InvalidConfigException.class) + .hasMessageContaining("hoodie.datasource.write.recordkey.field") + .hasMessageContaining("should not be set manually"); + } + + @Test + void testNonProtectedHudiOptionPassesValidation() throws TableNotExistException { + Schema flussSchema = + Schema.newBuilder().column("id", DataTypes.INT()).primaryKey("id").build(); + + // Set a non-protected Hudi option (e.g., precombine.field) — should work fine + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(flussSchema) + .distributedBy(4, "id") + .property("hudi.precombine.field", "id") + .build(); + + TablePath tablePath = TablePath.of("test_db", "non_protected_table"); + TestingLakeCatalogContext context = new TestingLakeCatalogContext(); + + // Should not throw + flussHudiLakeCatalog.createTable(tablePath, tableDescriptor, context); + + CatalogBaseTable table = + flussHudiLakeCatalog + .getHudiCatalog() + .getTable(HudiConversions.toHudiObjectPath(tablePath)); + assertThat(table).isNotNull(); + assertThat(table.getOptions()).containsEntry("precombine.field", "id"); + } + + // ------------------------------------------------------------------ + // System column name conflict tests + // ------------------------------------------------------------------ + + @Test + void testSystemColumnBucketConflictThrowsException() { + Schema flussSchema = + Schema.newBuilder() + .column("__bucket", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .primaryKey("__bucket") + .build(); + + TableDescriptor tableDescriptor = + TableDescriptor.builder().schema(flussSchema).distributedBy(4, "__bucket").build(); + + TablePath tablePath = TablePath.of("test_db", "bucket_conflict_table"); + TestingLakeCatalogContext context = new TestingLakeCatalogContext(); + + assertThatThrownBy( + () -> flussHudiLakeCatalog.createTable(tablePath, tableDescriptor, context)) + .isInstanceOf(InvalidTableException.class) + .hasMessageContaining("__bucket") + .hasMessageContaining("system column"); + } + + @Test + void testSystemColumnOffsetConflictThrowsException() { + Schema flussSchema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("__offset", DataTypes.BIGINT()) + .primaryKey("id") + .build(); + + TableDescriptor tableDescriptor = + TableDescriptor.builder().schema(flussSchema).distributedBy(4, "id").build(); + + TablePath tablePath = TablePath.of("test_db", "offset_conflict_table"); + TestingLakeCatalogContext context = new TestingLakeCatalogContext(); + + assertThatThrownBy( + () -> flussHudiLakeCatalog.createTable(tablePath, tableDescriptor, context)) + .isInstanceOf(InvalidTableException.class) + .hasMessageContaining("__offset") + .hasMessageContaining("system column"); + } + + @Test + void testSystemColumnTimestampConflictThrowsException() { + Schema flussSchema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("__timestamp", DataTypes.TIMESTAMP(6)) + .primaryKey("id") + .build(); + + TableDescriptor tableDescriptor = + TableDescriptor.builder().schema(flussSchema).distributedBy(4, "id").build(); + + TablePath tablePath = TablePath.of("test_db", "timestamp_conflict_table"); + TestingLakeCatalogContext context = new TestingLakeCatalogContext(); + + assertThatThrownBy( + () -> flussHudiLakeCatalog.createTable(tablePath, tableDescriptor, context)) + .isInstanceOf(InvalidTableException.class) + .hasMessageContaining("__timestamp") + .hasMessageContaining("system column"); + } + + // ------------------------------------------------------------------ + // Helper methods + // ------------------------------------------------------------------ + + private CatalogTable buildTestCatalogTable(String[] columnNames, DataType[] columnTypes) { + List columns = new ArrayList<>(); + for (int i = 0; i < columnNames.length; i++) { + columns.add(Column.physical(columnNames[i], columnTypes[i])); + } + ResolvedSchema resolvedSchema = new ResolvedSchema(columns, Collections.emptyList(), null); + org.apache.flink.table.api.Schema schema = + org.apache.flink.table.api.Schema.newBuilder() + .fromResolvedSchema(resolvedSchema) + .build(); + return CatalogTable.of(schema, null, Collections.emptyList(), new HashMap<>()); + } +} diff --git a/fluss-lake/pom.xml b/fluss-lake/pom.xml index 508206bfed..eab338f590 100644 --- a/fluss-lake/pom.xml +++ b/fluss-lake/pom.xml @@ -44,6 +44,11 @@ flink-table-common ${flink.version} + + org.apache.flink + flink-table-api-java + ${flink.version} + org.apache.flink flink-table-runtime