Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.HashMap;
import java.util.Map;

import static org.apache.fluss.metadata.DataLakeFormat.HUDI;
import static org.apache.fluss.metadata.DataLakeFormat.ICEBERG;
import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
import static org.apache.fluss.utils.Preconditions.checkNotNull;
Expand Down Expand Up @@ -86,6 +87,11 @@ public Catalog getLakeCatalog(
} else if (lakeFormat == ICEBERG) {
catalog = IcebergCatalogFactory.create(catalogName, catalogProperties);
this.lakeFormat = ICEBERG;
} else if (lakeFormat == HUDI) {
catalog =
HudiCatalogFactory.create(
catalogName, catalogProperties, classLoader);
this.lakeFormat = HUDI;
} else {
throw new UnsupportedOperationException(
"Unsupported data lake format: " + lakeFormat);
Expand Down Expand Up @@ -166,4 +172,94 @@ public static Catalog create(String catalogName, Map<String, String> catalogProp
}
}
}

/**
* Factory using reflection to create Hudi Catalog instances.
*
* <p>Hudi is intentionally NOT a compile-time dependency of fluss-flink-common to avoid
* dragging the shaded {@code hudi-flink<major>-bundle} (which re-bundles hadoop / parquet /
* avro / jackson / guava) into every downstream consumer. The bundle is shipped as a plugin
* under {@code plugins/hudi/} and loaded via the plugin classloader at runtime, mirroring the
* Iceberg integration pattern.
*
* <p>Unlike Iceberg's {@code FlinkCatalogFactory}, which keeps a convenience overload {@code
* createCatalog(String, Map)}, Hudi's {@code HoodieCatalogFactory} only exposes the standard
* Flink SPI signature {@code createCatalog(CatalogFactory.Context)}. Since Fluss is not
* invoking the factory through Flink's SQL {@code CREATE CATALOG} path, no {@code Context} is
* provided to us — we have to build one ourselves. We do that by reflectively instantiating
* Flink's {@code FactoryUtil$DefaultCatalogContext}, the same internal implementation Flink
* itself uses in {@code FactoryUtil#createCatalog(...)} when bridging the legacy and new SPI
* stacks.
*/
public static class HudiCatalogFactory {

private static final String HOODIE_CATALOG_FACTORY_CLASS =
"org.apache.hudi.table.catalog.HoodieCatalogFactory";
private static final String FLINK_CATALOG_FACTORY_CONTEXT_CLASS =
"org.apache.flink.table.factories.CatalogFactory$Context";
private static final String FLINK_DEFAULT_CATALOG_CONTEXT_CLASS =
"org.apache.flink.table.factories.FactoryUtil$DefaultCatalogContext";

private HudiCatalogFactory() {}

public static Catalog create(
String catalogName,
Map<String, String> catalogProperties,
ClassLoader classLoader) {
try {
// 1) Build Hudi's catalog factory instance via reflection.
Class<?> hoodieCatalogFactoryClass =
Class.forName(HOODIE_CATALOG_FACTORY_CLASS, true, classLoader);
Object factoryInstance =
hoodieCatalogFactoryClass.getDeclaredConstructor().newInstance();

// 2) Build a CatalogFactory.Context via Flink's internal default impl.
// Constructor: DefaultCatalogContext(String name,
// Map<String,String> options,
// ReadableConfig configuration,
// ClassLoader classLoader)
// We have no real Flink ReadableConfig in this code path, so we pass an
// empty Flink Configuration (which implements ReadableConfig) as a benign
// placeholder — Hudi's factory only consumes 'options' / 'name' / classloader.
Class<?> defaultCatalogContextClass =
Class.forName(FLINK_DEFAULT_CATALOG_CONTEXT_CLASS, true, classLoader);
Class<?> readableConfigClass =
Class.forName(
"org.apache.flink.configuration.ReadableConfig", true, classLoader);
Class<?> flinkConfigurationClass =
Class.forName(
"org.apache.flink.configuration.Configuration", true, classLoader);
Object emptyFlinkConfiguration =
flinkConfigurationClass.getDeclaredConstructor().newInstance();

Object context =
defaultCatalogContextClass
.getDeclaredConstructor(
String.class,
Map.class,
readableConfigClass,
ClassLoader.class)
.newInstance(
catalogName,
catalogProperties,
emptyFlinkConfiguration,
classLoader);

// 3) Invoke HoodieCatalogFactory#createCatalog(Context).
Class<?> contextInterface =
Class.forName(FLINK_CATALOG_FACTORY_CONTEXT_CLASS, true, classLoader);
Method createCatalogMethod =
hoodieCatalogFactoryClass.getMethod("createCatalog", contextInterface);
return (Catalog) createCatalogMethod.invoke(factoryInstance, context);
} catch (Exception e) {
throw new RuntimeException(
"Failed to create Hudi catalog using reflection. Please make sure "
+ "hudi-flink-bundle (matching the current Flink version, "
+ "Hudi 1.0+) is on the classpath, typically under "
+ "plugins/hudi/, and that the catalog options include a valid "
+ "'mode' (supported: 'hms' or 'dfs').",
e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public DynamicTableSource createDynamicTableSource(
originIdentifier.getDatabaseName(),
tableName);

// For Iceberg and Paimon, pass the table name as-is to their factory.
// For Iceberg, Hudi and Paimon, pass the table name as-is to their factory.
// Metadata tables will be handled internally by their respective factories.
DynamicTableFactory.Context newContext =
new FactoryUtil.DefaultDynamicTableContext(
Expand All @@ -66,11 +66,13 @@ private DynamicTableSourceFactory getLakeTableFactory() {
return getPaimonFactory();
case ICEBERG:
return getIcebergFactory();
case HUDI:
return getHudiFactory();
default:
throw new UnsupportedOperationException(
"Unsupported lake connector: "
+ lakeFlinkCatalog.getLakeFormat()
+ ". Only 'paimon' and 'iceberg' are supported.");
+ ". Only 'paimon', 'iceberg' and 'hudi' are supported.");
}
}

Expand Down Expand Up @@ -101,4 +103,18 @@ private DynamicTableSourceFactory getIcebergFactory() {
e);
}
}

private DynamicTableSourceFactory getHudiFactory() {
try {
Class<?> hudiFactoryClass = Class.forName("org.apache.hudi.table.HoodieTableFactory");
return (DynamicTableSourceFactory)
hudiFactoryClass.getDeclaredConstructor().newInstance();
} catch (Exception e) {
throw new RuntimeException(
"Failed to create Hudi table factory. Please ensure hudi-flink-bundle "
+ "(matching the current Flink version) is on the classpath, "
+ "typically under plugins/hudi/.",
e);
}
}
}
98 changes: 96 additions & 2 deletions fluss-lake/fluss-lake-hudi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink${flink.major.version}-bundle</artifactId>
<version>${hudi.version}</version>
<scope>test</scope>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
Expand Down Expand Up @@ -69,12 +69,106 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${fluss.hadoop.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>avro</artifactId>
<groupId>org.apache.avro</groupId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<groupId>ch.qos.reload4j</groupId>
<artifactId>reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
<exclusion>
<artifactId>jdk.tools</artifactId>
<groupId>jdk.tools</groupId>
</exclusion>
<exclusion>
<artifactId>protobuf-java</artifactId>
<groupId>com.google.protobuf</groupId>
</exclusion>
<exclusion>
<artifactId>commons-io</artifactId>
<groupId>commons-io</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.fluss</groupId>
<artifactId>fluss-common</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- test dependency -->
<dependency>
<groupId>org.apache.fluss</groupId>
<artifactId>fluss-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.fluss</groupId>
<artifactId>fluss-test-utils</artifactId>
</dependency>
</dependencies>

</project>
</project>
Loading