From cd1a6ac755039fd11036a5f7f3926c5c3d820acf Mon Sep 17 00:00:00 2001 From: QuakeWang Date: Fri, 29 May 2026 14:17:01 +0800 Subject: [PATCH] [flink] Fail fast on batch virtual log table reads Signed-off-by: QuakeWang --- .../flink/catalog/FlinkTableFactory.java | 11 +++ .../flink/catalog/FlinkTableFactoryTest.java | 80 ++++++++++++++++++- .../source/BinlogVirtualTableITCase.java | 28 ++++++- .../source/ChangelogVirtualTableITCase.java | 29 ++++++- 4 files changed, 142 insertions(+), 6 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java index 1b082855e6..ab64959abd 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java @@ -322,6 +322,15 @@ private static void validateSourceOptions(ReadableConfig tableOptions) { FlinkConnectorOptionsUtils.validateTableSourceOptions(tableOptions); } + private static void validateVirtualLogTableRuntimeMode( + String virtualTableSuffix, boolean isStreamingMode) { + if (!isStreamingMode) { + throw new UnsupportedOperationException( + String.format( + "%s virtual tables only support streaming mode.", virtualTableSuffix)); + } + } + /** Creates a ChangelogFlinkTableSource for $changelog virtual tables. */ private DynamicTableSource createChangelogTableSource( Context context, ObjectIdentifier tableIdentifier, String tableName) { @@ -333,6 +342,7 @@ private DynamicTableSource createChangelogTableSource( boolean isStreamingMode = context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING; + validateVirtualLogTableRuntimeMode(FlinkCatalog.CHANGELOG_TABLE_SUFFIX, isStreamingMode); // tableOutputType includes metadata columns: [_change_type, _log_offset, _commit_timestamp, // data_cols...] @@ -395,6 +405,7 @@ private DynamicTableSource createBinlogTableSource( boolean isStreamingMode = context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING; + validateVirtualLogTableRuntimeMode(FlinkCatalog.BINLOG_TABLE_SUFFIX, isStreamingMode); // tableOutputType: [_change_type, _log_offset, _commit_timestamp, before ROW<...>, after // ROW<...>] diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkTableFactoryTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkTableFactoryTest.java index 57bf8e8da6..c379eff9ab 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkTableFactoryTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkTableFactoryTest.java @@ -23,7 +23,9 @@ import org.apache.fluss.flink.source.lookup.FlinkAsyncLookupFunction; import org.apache.fluss.flink.source.lookup.FlinkLookupFunction; +import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.ValidationException; @@ -71,6 +73,12 @@ abstract class FlinkTableFactoryTest { public static final ObjectIdentifier OBJECT_IDENTIFIER = ObjectIdentifier.of("default", "default", "t1"); + public static final ObjectIdentifier CHANGELOG_TABLE_IDENTIFIER = + ObjectIdentifier.of("default", "default", "t1" + FlinkCatalog.CHANGELOG_TABLE_SUFFIX); + + public static final ObjectIdentifier BINLOG_TABLE_IDENTIFIER = + ObjectIdentifier.of("default", "default", "t1" + FlinkCatalog.BINLOG_TABLE_SUFFIX); + @Test void testTableSourceOptions() { ResolvedSchema schema = createBasicSchema(); @@ -179,6 +187,36 @@ void testLookupSource() { .hasMessageContaining("Full lookup caching is not supported yet."); } + @Test + void testVirtualLogTableSourceDoesNotSupportBatchMode() { + ResolvedSchema schema = createBasicSchema(); + Map properties = getBasicOptions(); + Configuration configuration = new Configuration(); + configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH); + + assertThatThrownBy( + () -> + createTableSource( + CHANGELOG_TABLE_IDENTIFIER, + schema, + properties, + Collections.emptyMap(), + configuration)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("$changelog virtual tables only support streaming mode."); + + assertThatThrownBy( + () -> + createTableSource( + BINLOG_TABLE_IDENTIFIER, + createBinlogSchema(), + properties, + Collections.emptyMap(), + configuration)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("$binlog virtual tables only support streaming mode."); + } + @Test void testSink() { ResolvedSchema schema = createBasicSchema(); @@ -204,6 +242,30 @@ private ResolvedSchema createBasicSchema() { UniqueConstraint.primaryKey("PK_first_third", Arrays.asList("first", "third"))); } + private ResolvedSchema createBinlogSchema() { + return new ResolvedSchema( + Arrays.asList( + Column.physical( + "_change_type", DataTypes.STRING().notNull()), + Column.physical("_log_offset", DataTypes.BIGINT().notNull()), + Column.physical( + "_commit_timestamp", DataTypes.TIMESTAMP_LTZ(3).notNull()), + Column.physical( + "before", + DataTypes.ROW( + DataTypes.FIELD("first", DataTypes.STRING().notNull()), + DataTypes.FIELD("second", DataTypes.INT()), + DataTypes.FIELD("third", DataTypes.STRING().notNull()))), + Column.physical( + "after", + DataTypes.ROW( + DataTypes.FIELD("first", DataTypes.STRING().notNull()), + DataTypes.FIELD("second", DataTypes.INT()), + DataTypes.FIELD("third", DataTypes.STRING().notNull())))), + Collections.emptyList(), + null); + } + private static Map getBasicOptions() { Map options = new HashMap<>(); options.put("connector", "fluss"); @@ -226,10 +288,24 @@ private static DynamicTableSource createTableSource( ResolvedSchema schema, Map options, Map enrichmentOptions) { + return createTableSource( + OBJECT_IDENTIFIER, + schema, + options, + enrichmentOptions, + new Configuration()); + } + + private static DynamicTableSource createTableSource( + ObjectIdentifier objectIdentifier, + ResolvedSchema schema, + Map options, + Map enrichmentOptions, + Configuration configuration) { FlinkTableFactory tableFactory = createFlinkTableFactory(); FactoryUtil.DefaultDynamicTableContext context = new FactoryUtil.DefaultDynamicTableContext( - OBJECT_IDENTIFIER, + objectIdentifier, new ResolvedCatalogTable( CatalogTable.of( Schema.newBuilder().fromResolvedSchema(schema).build(), @@ -240,7 +316,7 @@ private static DynamicTableSource createTableSource( options), schema), enrichmentOptions, - new Configuration(), + configuration, Thread.currentThread().getContextClassLoader(), false); return tableFactory.createDynamicTableSource(context); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/BinlogVirtualTableITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/BinlogVirtualTableITCase.java index 619553acac..4787fe162d 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/BinlogVirtualTableITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/BinlogVirtualTableITCase.java @@ -131,6 +131,15 @@ protected void afterEach() throws Exception { // init table environment from savepointPath private StreamTableEnvironment initTableEnvironment(@Nullable String savepointPath) { + return initTableEnvironment(savepointPath, EnvironmentSettings.inStreamingMode()); + } + + private StreamTableEnvironment initBatchTableEnvironment() { + return initTableEnvironment(null, EnvironmentSettings.inBatchMode()); + } + + private StreamTableEnvironment initTableEnvironment( + @Nullable String savepointPath, EnvironmentSettings environmentSettings) { org.apache.flink.configuration.Configuration conf = new org.apache.flink.configuration.Configuration(); if (savepointPath != null) { @@ -140,8 +149,7 @@ private StreamTableEnvironment initTableEnvironment(@Nullable String savepointPa StreamExecutionEnvironment.getExecutionEnvironment(conf); execEnv.setParallelism(1); execEnv.enableCheckpointing(1000); - StreamTableEnvironment tEnv = - StreamTableEnvironment.create(execEnv, EnvironmentSettings.inStreamingMode()); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, environmentSettings); String bootstrapServers = String.join(",", clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS)); // crate catalog using sql tEnv.executeSql( @@ -227,6 +235,22 @@ public void testBinlogUnsupportedForLogTable() throws Exception { .hasMessageContaining("only supported for primary key tables"); } + @Test + public void testBatchReadBinlogTableFailsFast() throws Exception { + tEnv.executeSql( + "CREATE TABLE batch_binlog_test (" + + " id INT NOT NULL," + + " name STRING," + + " PRIMARY KEY (id) NOT ENFORCED" + + ") WITH ('bucket.num' = '1')"); + + tEnv = initBatchTableEnvironment(); + + assertThatThrownBy(() -> tEnv.explainSql("SELECT * FROM batch_binlog_test$binlog")) + .hasRootCauseInstanceOf(UnsupportedOperationException.class) + .hasRootCauseMessage("$binlog virtual tables only support streaming mode."); + } + @Test public void testBinlogWithAllChangeTypes() throws Exception { // Create a primary key table with 1 bucket for consistent log_offset numbers diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java index 1248976f8b..d9fbf9e5c1 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java @@ -64,6 +64,7 @@ import static org.apache.fluss.testutils.DataTestUtils.row; import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Integration test for $changelog virtual table functionality. */ abstract class ChangelogVirtualTableITCase { @@ -128,6 +129,15 @@ protected void afterEach() throws Exception { // init table environment from savepointPath private StreamTableEnvironment initTableEnvironment(@Nullable String savepointPath) { + return initTableEnvironment(savepointPath, EnvironmentSettings.inStreamingMode()); + } + + private StreamTableEnvironment initBatchTableEnvironment() { + return initTableEnvironment(null, EnvironmentSettings.inBatchMode()); + } + + private StreamTableEnvironment initTableEnvironment( + @Nullable String savepointPath, EnvironmentSettings environmentSettings) { org.apache.flink.configuration.Configuration conf = new org.apache.flink.configuration.Configuration(); if (savepointPath != null) { @@ -137,8 +147,7 @@ private StreamTableEnvironment initTableEnvironment(@Nullable String savepointPa StreamExecutionEnvironment.getExecutionEnvironment(conf); execEnv.setParallelism(1); execEnv.enableCheckpointing(1000); - StreamTableEnvironment tEnv = - StreamTableEnvironment.create(execEnv, EnvironmentSettings.inStreamingMode()); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, environmentSettings); String bootstrapServers = String.join(",", clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS)); // crate catalog using sql tEnv.executeSql( @@ -239,6 +248,22 @@ public void testDescribeChangelogTable() throws Exception { + ")"); } + @Test + public void testBatchReadChangelogTableFailsFast() throws Exception { + tEnv.executeSql( + "CREATE TABLE batch_changelog_test (" + + " id INT NOT NULL," + + " name STRING," + + " PRIMARY KEY (id) NOT ENFORCED" + + ") WITH ('bucket.num' = '1')"); + + tEnv = initBatchTableEnvironment(); + + assertThatThrownBy(() -> tEnv.explainSql("SELECT * FROM batch_changelog_test$changelog")) + .hasRootCauseInstanceOf(UnsupportedOperationException.class) + .hasRootCauseMessage("$changelog virtual tables only support streaming mode."); + } + @Test public void testChangelogVirtualTableWithLogTable() throws Exception { // Create a log table (no primary key) with 1 bucket for predictable offsets