Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,15 @@ private static void validateSourceOptions(ReadableConfig tableOptions) {
FlinkConnectorOptionsUtils.validateTableSourceOptions(tableOptions);
}

private static void validateVirtualLogTableRuntimeMode(
String virtualTableSuffix, boolean isStreamingMode) {
if (!isStreamingMode) {
throw new UnsupportedOperationException(
String.format(
"%s virtual tables only support streaming mode.", virtualTableSuffix));
}
}

/** Creates a ChangelogFlinkTableSource for $changelog virtual tables. */
private DynamicTableSource createChangelogTableSource(
Context context, ObjectIdentifier tableIdentifier, String tableName) {
Expand All @@ -333,6 +342,7 @@ private DynamicTableSource createChangelogTableSource(
boolean isStreamingMode =
context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING;
validateVirtualLogTableRuntimeMode(FlinkCatalog.CHANGELOG_TABLE_SUFFIX, isStreamingMode);

// tableOutputType includes metadata columns: [_change_type, _log_offset, _commit_timestamp,
// data_cols...]
Expand Down Expand Up @@ -395,6 +405,7 @@ private DynamicTableSource createBinlogTableSource(
boolean isStreamingMode =
context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING;
validateVirtualLogTableRuntimeMode(FlinkCatalog.BINLOG_TABLE_SUFFIX, isStreamingMode);

// tableOutputType: [_change_type, _log_offset, _commit_timestamp, before ROW<...>, after
// ROW<...>]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import org.apache.fluss.flink.source.lookup.FlinkAsyncLookupFunction;
import org.apache.fluss.flink.source.lookup.FlinkLookupFunction;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.ValidationException;
Expand Down Expand Up @@ -71,6 +73,12 @@ abstract class FlinkTableFactoryTest {
public static final ObjectIdentifier OBJECT_IDENTIFIER =
ObjectIdentifier.of("default", "default", "t1");

public static final ObjectIdentifier CHANGELOG_TABLE_IDENTIFIER =
ObjectIdentifier.of("default", "default", "t1" + FlinkCatalog.CHANGELOG_TABLE_SUFFIX);

public static final ObjectIdentifier BINLOG_TABLE_IDENTIFIER =
ObjectIdentifier.of("default", "default", "t1" + FlinkCatalog.BINLOG_TABLE_SUFFIX);

@Test
void testTableSourceOptions() {
ResolvedSchema schema = createBasicSchema();
Expand Down Expand Up @@ -179,6 +187,36 @@ void testLookupSource() {
.hasMessageContaining("Full lookup caching is not supported yet.");
}

@Test
void testVirtualLogTableSourceDoesNotSupportBatchMode() {
ResolvedSchema schema = createBasicSchema();
Map<String, String> properties = getBasicOptions();
Configuration configuration = new Configuration();
configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);

assertThatThrownBy(
() ->
createTableSource(
CHANGELOG_TABLE_IDENTIFIER,
schema,
properties,
Collections.emptyMap(),
configuration))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessageContaining("$changelog virtual tables only support streaming mode.");

assertThatThrownBy(
() ->
createTableSource(
BINLOG_TABLE_IDENTIFIER,
createBinlogSchema(),
properties,
Collections.emptyMap(),
configuration))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessageContaining("$binlog virtual tables only support streaming mode.");
}

@Test
void testSink() {
ResolvedSchema schema = createBasicSchema();
Expand All @@ -204,6 +242,30 @@ private ResolvedSchema createBasicSchema() {
UniqueConstraint.primaryKey("PK_first_third", Arrays.asList("first", "third")));
}

private ResolvedSchema createBinlogSchema() {
return new ResolvedSchema(
Arrays.asList(
Column.physical(
"_change_type", DataTypes.STRING().notNull()),
Column.physical("_log_offset", DataTypes.BIGINT().notNull()),
Column.physical(
"_commit_timestamp", DataTypes.TIMESTAMP_LTZ(3).notNull()),
Column.physical(
"before",
DataTypes.ROW(
DataTypes.FIELD("first", DataTypes.STRING().notNull()),
DataTypes.FIELD("second", DataTypes.INT()),
DataTypes.FIELD("third", DataTypes.STRING().notNull()))),
Column.physical(
"after",
DataTypes.ROW(
DataTypes.FIELD("first", DataTypes.STRING().notNull()),
DataTypes.FIELD("second", DataTypes.INT()),
DataTypes.FIELD("third", DataTypes.STRING().notNull())))),
Collections.emptyList(),
null);
}

private static Map<String, String> getBasicOptions() {
Map<String, String> options = new HashMap<>();
options.put("connector", "fluss");
Expand All @@ -226,10 +288,24 @@ private static DynamicTableSource createTableSource(
ResolvedSchema schema,
Map<String, String> options,
Map<String, String> enrichmentOptions) {
return createTableSource(
OBJECT_IDENTIFIER,
schema,
options,
enrichmentOptions,
new Configuration());
}

private static DynamicTableSource createTableSource(
ObjectIdentifier objectIdentifier,
ResolvedSchema schema,
Map<String, String> options,
Map<String, String> enrichmentOptions,
Configuration configuration) {
FlinkTableFactory tableFactory = createFlinkTableFactory();
FactoryUtil.DefaultDynamicTableContext context =
new FactoryUtil.DefaultDynamicTableContext(
OBJECT_IDENTIFIER,
objectIdentifier,
new ResolvedCatalogTable(
CatalogTable.of(
Schema.newBuilder().fromResolvedSchema(schema).build(),
Expand All @@ -240,7 +316,7 @@ private static DynamicTableSource createTableSource(
options),
schema),
enrichmentOptions,
new Configuration(),
configuration,
Thread.currentThread().getContextClassLoader(),
false);
return tableFactory.createDynamicTableSource(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,15 @@ protected void afterEach() throws Exception {

// init table environment from savepointPath
private StreamTableEnvironment initTableEnvironment(@Nullable String savepointPath) {
return initTableEnvironment(savepointPath, EnvironmentSettings.inStreamingMode());
}

private StreamTableEnvironment initBatchTableEnvironment() {
return initTableEnvironment(null, EnvironmentSettings.inBatchMode());
}

private StreamTableEnvironment initTableEnvironment(
@Nullable String savepointPath, EnvironmentSettings environmentSettings) {
org.apache.flink.configuration.Configuration conf =
new org.apache.flink.configuration.Configuration();
if (savepointPath != null) {
Expand All @@ -140,8 +149,7 @@ private StreamTableEnvironment initTableEnvironment(@Nullable String savepointPa
StreamExecutionEnvironment.getExecutionEnvironment(conf);
execEnv.setParallelism(1);
execEnv.enableCheckpointing(1000);
StreamTableEnvironment tEnv =
StreamTableEnvironment.create(execEnv, EnvironmentSettings.inStreamingMode());
StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, environmentSettings);
String bootstrapServers = String.join(",", clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
// crate catalog using sql
tEnv.executeSql(
Expand Down Expand Up @@ -227,6 +235,22 @@ public void testBinlogUnsupportedForLogTable() throws Exception {
.hasMessageContaining("only supported for primary key tables");
}

@Test
public void testBatchReadBinlogTableFailsFast() throws Exception {
tEnv.executeSql(
"CREATE TABLE batch_binlog_test ("
+ " id INT NOT NULL,"
+ " name STRING,"
+ " PRIMARY KEY (id) NOT ENFORCED"
+ ") WITH ('bucket.num' = '1')");

tEnv = initBatchTableEnvironment();

assertThatThrownBy(() -> tEnv.explainSql("SELECT * FROM batch_binlog_test$binlog"))
.hasRootCauseInstanceOf(UnsupportedOperationException.class)
.hasRootCauseMessage("$binlog virtual tables only support streaming mode.");
}

@Test
public void testBinlogWithAllChangeTypes() throws Exception {
// Create a primary key table with 1 bucket for consistent log_offset numbers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import static org.apache.fluss.testutils.DataTestUtils.row;
import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Integration test for $changelog virtual table functionality. */
abstract class ChangelogVirtualTableITCase {
Expand Down Expand Up @@ -128,6 +129,15 @@ protected void afterEach() throws Exception {

// init table environment from savepointPath
private StreamTableEnvironment initTableEnvironment(@Nullable String savepointPath) {
return initTableEnvironment(savepointPath, EnvironmentSettings.inStreamingMode());
}

private StreamTableEnvironment initBatchTableEnvironment() {
return initTableEnvironment(null, EnvironmentSettings.inBatchMode());
}

private StreamTableEnvironment initTableEnvironment(
@Nullable String savepointPath, EnvironmentSettings environmentSettings) {
org.apache.flink.configuration.Configuration conf =
new org.apache.flink.configuration.Configuration();
if (savepointPath != null) {
Expand All @@ -137,8 +147,7 @@ private StreamTableEnvironment initTableEnvironment(@Nullable String savepointPa
StreamExecutionEnvironment.getExecutionEnvironment(conf);
execEnv.setParallelism(1);
execEnv.enableCheckpointing(1000);
StreamTableEnvironment tEnv =
StreamTableEnvironment.create(execEnv, EnvironmentSettings.inStreamingMode());
StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, environmentSettings);
String bootstrapServers = String.join(",", clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
// crate catalog using sql
tEnv.executeSql(
Expand Down Expand Up @@ -239,6 +248,22 @@ public void testDescribeChangelogTable() throws Exception {
+ ")");
}

@Test
public void testBatchReadChangelogTableFailsFast() throws Exception {
tEnv.executeSql(
"CREATE TABLE batch_changelog_test ("
+ " id INT NOT NULL,"
+ " name STRING,"
+ " PRIMARY KEY (id) NOT ENFORCED"
+ ") WITH ('bucket.num' = '1')");

tEnv = initBatchTableEnvironment();

assertThatThrownBy(() -> tEnv.explainSql("SELECT * FROM batch_changelog_test$changelog"))
.hasRootCauseInstanceOf(UnsupportedOperationException.class)
.hasRootCauseMessage("$changelog virtual tables only support streaming mode.");
}

@Test
public void testChangelogVirtualTableWithLogTable() throws Exception {
// Create a log table (no primary key) with 1 bucket for predictable offsets
Expand Down