From 0738e4df78258df08ef870afd26ce15c85686af9 Mon Sep 17 00:00:00 2001 From: Felix John Date: Mon, 10 Nov 2025 18:12:08 +0100 Subject: [PATCH 1/4] Upgrade to Iceberg 1.9.1 --- java/Iceberg/S3TableSink/README.md | 18 +-- java/Iceberg/S3TableSink/pom.xml | 152 +++++++++++------- .../amazonaws/services/msf/StreamingJob.java | 4 +- .../msf/iceberg/IcebergSinkBuilder.java | 1 + .../flink-application-properties-dev.json | 2 +- 5 files changed, 104 insertions(+), 73 deletions(-) diff --git a/java/Iceberg/S3TableSink/README.md b/java/Iceberg/S3TableSink/README.md index 07ef1e3..5cc9b04 100644 --- a/java/Iceberg/S3TableSink/README.md +++ b/java/Iceberg/S3TableSink/README.md @@ -2,7 +2,7 @@ * Flink version: 1.19.0 * Flink API: DataStream API -* Iceberg 1.8.1 +* Iceberg 1.9.1 * Language: Java (11) * Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/) and [Iceberg](https://iceberg.apache.org/docs/latest/flink/) @@ -10,9 +10,7 @@ This example demonstrate how to use [Flink Iceberg Sink Connector](https://iceberg.apache.org/docs/latest/flink-writes/) with S3 Tables. -For simplicity, the application generates synthetic data, random stock prices, internally. -Data is generated as AVRO Generic Record, simulating a real source, for example a Kafka Source, that receives records -serialized with AVRO. +For simplicity, the application generates synthetic data, random stock prices, internally. Data is generated as AVRO Generic Record, simulating a real source, e.g. a Kafka Source, that receives records serialized with AVRO. ### Prerequisites @@ -49,8 +47,7 @@ The application must have IAM permissions to: ### Runtime configuration -When running on Amazon Managed Service for Apache Flink the runtime configuration is read from Runtime Properties. - +When running on Amazon Managed Service for Apache Flink the runtime configuration is read from runtime properties. When running locally, the configuration is read from the [resources/flink-application-properties-dev.json](./src/main/resources/flink-application-properties-dev.json) file. @@ -68,10 +65,7 @@ Runtime parameters: ### Checkpoints -Checkpointing must be enabled. Iceberg commits writes on checkpoint. - -When running locally, the application enables checkpoints programmatically, every 10 seconds. -When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration. +Checkpointing must be enabled. Iceberg commits writes on checkpoint. When running locally, the application enables checkpoints programmatically, every 10 seconds.When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration. ### Known limitations @@ -83,9 +77,7 @@ At the moment there are current limitations concerning Flink Iceberg integration ### Schema and schema evolution -The application must "know" the AVRO schema on start. -The schema cannot be dynamically inferred based on the incoming records, for example using a schema registry. -This is due to a limitation of the Flink Iceberg integration, that requires knowing the table schema upfront. +The application must "know" the AVRO schema on start. The schema cannot be dynamically inferred based on the incoming records, for example using a schema registry. This is due to a limitation of the Flink Iceberg integration, that requires knowing the table schema upfront. This implementation does support schema evolution in the incoming data, as long as new schema versions are FORWARD compatible. Schema changes are not propagated to Iceberg. diff --git a/java/Iceberg/S3TableSink/pom.xml b/java/Iceberg/S3TableSink/pom.xml index f43c091..1e29c83 100644 --- a/java/Iceberg/S3TableSink/pom.xml +++ b/java/Iceberg/S3TableSink/pom.xml @@ -14,12 +14,11 @@ 11 ${target.java.version} ${target.java.version} - 1.19.0 - 1.11.3 + 1.12.0 2.12 3.4.0 - 1.6.1 + 1.9.1 1.2.0 2.23.1 5.8.1 @@ -31,57 +30,66 @@ org.apache.flink flink-runtime-web ${flink.version} - provided + + + org.apache.flink + flink-table-planner_${scala.version} + ${flink.version} + + + org.apache.flink + flink-metrics-dropwizard + ${flink.version} org.apache.flink flink-streaming-java ${flink.version} - provided org.apache.flink flink-table-runtime ${flink.version} - provided org.apache.flink - flink-table-api-java-bridge + flink-connector-datagen ${flink.version} org.apache.flink - flink-table-common + flink-table-api-java-bridge ${flink.version} org.apache.flink - flink-metrics-dropwizard + flink-table-common ${flink.version} org.apache.flink flink-avro ${flink.version} + + + org.apache.avro + avro + + - - + - org.apache.flink - flink-table-planner_${scala.version} - ${flink.version} - provided + org.apache.avro + avro + ${avro.version} - com.amazonaws aws-kinesisanalytics-runtime ${kda.runtime.version} - provided @@ -89,23 +97,29 @@ software.amazon.awssdk s3tables 2.29.26 + + + ch.qos.logback + logback-classic + + software.amazon.s3tables s3-tables-catalog-for-iceberg - 0.1.3 - - - org.apache.flink - flink-connector-files - ${flink.version} - provided + 0.1.5 + + + ch.qos.logback + logback-classic + + org.apache.hadoop - hadoop-client + hadoop-common ${hadoop.version} @@ -116,17 +130,27 @@ org.slf4j slf4j-reload4j + + org.slf4j + slf4j-log4j12 + + + ch.qos.logback + logback-classic + + + ch.qos.logback + logback-core + + + - org.apache.hadoop - hadoop-common - ${hadoop.version} - - - org.apache.hadoop - hadoop-mapreduce-client-core - ${hadoop.version} + org.junit.jupiter + junit-jupiter + ${junit5.version} + test @@ -134,34 +158,45 @@ org.apache.iceberg iceberg-core ${iceberg.version} + + + org.apache.avro + avro + + org.apache.iceberg - iceberg-flink - ${iceberg.version} - - - org.apache.iceberg - iceberg-flink + iceberg-aws ${iceberg.version} + + + org.apache.avro + avro + + org.apache.iceberg - iceberg-aws-bundle + iceberg-flink-1.19 ${iceberg.version} + + + org.apache.avro + avro + + org.apache.iceberg - iceberg-aws + iceberg-api ${iceberg.version} - - - - - org.junit.jupiter - junit-jupiter - ${junit5.version} - test + + + org.apache.avro + avro + + @@ -179,13 +214,6 @@ org.apache.logging.log4j log4j-core ${log4j.version} - runtime - - - org.apache.iceberg - iceberg-flink-1.19 - 1.7.0 - compile @@ -206,7 +234,7 @@ org.apache.maven.plugins maven-shade-plugin - 3.2.1 + 3.5.0 package @@ -219,7 +247,7 @@ org.apache.flink:force-shading com.google.code.findbugs:jsr305 org.slf4j:* - log4j:* + org.apache.logging.log4j:* @@ -229,9 +257,17 @@ META-INF/*.SF META-INF/*.DSA META-INF/*.RSA + META-INF/versions/** + + + + org.apache.avro + shaded.org.apache.avro + + diff --git a/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java index ff7a960..2dc93f0 100644 --- a/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java +++ b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Properties; public class StreamingJob { @@ -58,7 +59,7 @@ private static DataGeneratorSource createDataGenerator(Properties double recordsPerSecond = Double.parseDouble(generatorProperties.getProperty("records.per.sec", "10.0")); Preconditions.checkArgument(recordsPerSecond > 0, "Generator records per sec must be > 0"); - LOG.info("Data generator: {} record/sec", recordsPerSecond); + LOG.info("Data generator: {} record/sec", Optional.of(recordsPerSecond)); return new DataGeneratorSource<>( new AvroGenericStockTradeGeneratorFunction(avroSchema), Long.MAX_VALUE, @@ -98,6 +99,7 @@ public static void main(String[] args) throws Exception { // Flink Sink Builder FlinkSink.Builder icebergSinkBuilder = IcebergSinkBuilder.createBuilder(icebergProperties, genericRecordDataStream, avroSchema); + // Sink to Iceberg Table icebergSinkBuilder.append(); diff --git a/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java index 923cab4..68939f7 100644 --- a/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java +++ b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java @@ -12,6 +12,7 @@ import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.*; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.flink.sink.AvroGenericRecordToRowDataMapper; import org.apache.iceberg.flink.sink.FlinkSink; import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; diff --git a/java/Iceberg/S3TableSink/src/main/resources/flink-application-properties-dev.json b/java/Iceberg/S3TableSink/src/main/resources/flink-application-properties-dev.json index b941e5a..be206ec 100644 --- a/java/Iceberg/S3TableSink/src/main/resources/flink-application-properties-dev.json +++ b/java/Iceberg/S3TableSink/src/main/resources/flink-application-properties-dev.json @@ -9,7 +9,7 @@ "PropertyGroupId": "Iceberg", "PropertyMap": { - "table.bucket.arn": "<>", + "table.bucket.arn": "your-table-arn", "catalog.db": "default", "catalog.table": "prices_s3table", "partition.fields": "symbol", From 17a49276bd747e352b446f1c9875089c2d8ca858 Mon Sep 17 00:00:00 2001 From: Felix John Date: Tue, 11 Nov 2025 17:41:30 +0100 Subject: [PATCH 2/4] Fix --- java/Iceberg/S3TableSink/README.md | 9 +++------ java/Iceberg/S3TableSink/pom.xml | 18 +++++++++++------- .../amazonaws/services/msf/StreamingJob.java | 4 ++-- .../msf/iceberg/IcebergSinkBuilder.java | 2 +- .../flink-application-properties-dev.json | 2 +- 5 files changed, 18 insertions(+), 17 deletions(-) diff --git a/java/Iceberg/S3TableSink/README.md b/java/Iceberg/S3TableSink/README.md index 5cc9b04..e490f3c 100644 --- a/java/Iceberg/S3TableSink/README.md +++ b/java/Iceberg/S3TableSink/README.md @@ -47,9 +47,10 @@ The application must have IAM permissions to: ### Runtime configuration -When running on Amazon Managed Service for Apache Flink the runtime configuration is read from runtime properties. +When running on Amazon Managed Service for Apache Flink the runtime configuration is read from runtime properties. Make sure that you pass the mandatory parameter `table.bucket.arn`. + When running locally, the configuration is read from the -[resources/flink-application-properties-dev.json](./src/main/resources/flink-application-properties-dev.json) file. +[resources/flink-application-properties-dev.json](./src/main/resources/flink-application-properties-dev.json) file. Make sure that the scope of the dependencides in `pom.xml` is set to `default`when running locally. Runtime parameters: @@ -64,19 +65,16 @@ Runtime parameters: | `Iceberg` | `upsert.equality.fields` | `symbol` | Comma separated list of fields used for upsert. It must match partition fields. Required if `operation` = `upsert`. | ### Checkpoints - Checkpointing must be enabled. Iceberg commits writes on checkpoint. When running locally, the application enables checkpoints programmatically, every 10 seconds.When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration. ### Known limitations - At the moment there are current limitations concerning Flink Iceberg integration with S3 Tables: * * Currently, this example needs to be in Flink v1.19, v1.20 isn't supported with the S3 Table Sink yet. * Doesn't support Iceberg Table with hidden partitioning * Doesn't support adding columns, removing columns, renaming columns or changing columns. ### Schema and schema evolution - The application must "know" the AVRO schema on start. The schema cannot be dynamically inferred based on the incoming records, for example using a schema registry. This is due to a limitation of the Flink Iceberg integration, that requires knowing the table schema upfront. This implementation does support schema evolution in the incoming data, as long as new schema versions are FORWARD compatible. @@ -90,7 +88,6 @@ It is technically possible to fetch the schema on application start from an exte schema definition file in an S3 bucket. This is beyond the scope of this example. ### Running locally, in IntelliJ - You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation. See [Running examples locally](https://github.com/nicusX/amazon-managed-service-for-apache-flink-examples/blob/main/java/running-examples-locally.md) for details. diff --git a/java/Iceberg/S3TableSink/pom.xml b/java/Iceberg/S3TableSink/pom.xml index 1e29c83..df782be 100644 --- a/java/Iceberg/S3TableSink/pom.xml +++ b/java/Iceberg/S3TableSink/pom.xml @@ -30,12 +30,15 @@ org.apache.flink flink-runtime-web ${flink.version} + provided org.apache.flink flink-table-planner_${scala.version} ${flink.version} + provided + org.apache.flink flink-metrics-dropwizard @@ -45,27 +48,34 @@ org.apache.flink flink-streaming-java ${flink.version} + provided org.apache.flink flink-table-runtime ${flink.version} + provided org.apache.flink flink-connector-datagen ${flink.version} + provided org.apache.flink flink-table-api-java-bridge ${flink.version} + provided org.apache.flink flink-table-common ${flink.version} + provided + + org.apache.flink flink-avro @@ -90,6 +100,7 @@ com.amazonaws aws-kinesisanalytics-runtime ${kda.runtime.version} + provided @@ -261,13 +272,6 @@ - - - - org.apache.avro - shaded.org.apache.avro - - diff --git a/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java index 2dc93f0..de40d29 100644 --- a/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java +++ b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java @@ -80,7 +80,7 @@ public static void main(String[] args) throws Exception { } Map applicationProperties = loadApplicationProperties(env); - icebergProperties = applicationProperties.get("Iceberg"); + icebergProperties = applicationProperties.getOrDefault("Iceberg", new Properties()); // Get AVRO Schema from the definition bundled with the application // Note that the application must "knows" the AVRO schema upfront, i.e. the schema must be either embedded @@ -107,7 +107,7 @@ public static void main(String[] args) throws Exception { } private static DataStream createDataStream(StreamExecutionEnvironment env, Map applicationProperties, Schema avroSchema) { - Properties dataGeneratorProperties = applicationProperties.get("DataGen"); + Properties dataGeneratorProperties = applicationProperties.getOrDefault("DataGen", new Properties()); return env.fromSource( createDataGenerator(dataGeneratorProperties, avroSchema), WatermarkStrategy.noWatermarks(), diff --git a/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java index 68939f7..04d2bf7 100644 --- a/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java +++ b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java @@ -26,7 +26,7 @@ public class IcebergSinkBuilder { private static final String DEFAULT_S3_CATALOG_DB = "default"; private static final String DEFAULT_ICEBERG_TABLE_NAME = "prices_iceberg"; private static final String DEFAULT_ICEBERG_PARTITION_FIELDS = "symbol"; - private static final String DEFAULT_ICEBERG_OPERATION = "upsert"; + private static final String DEFAULT_ICEBERG_OPERATION = "append"; private static final String DEFAULT_ICEBERG_UPSERT_FIELDS = "symbol"; /** diff --git a/java/Iceberg/S3TableSink/src/main/resources/flink-application-properties-dev.json b/java/Iceberg/S3TableSink/src/main/resources/flink-application-properties-dev.json index be206ec..7021ac1 100644 --- a/java/Iceberg/S3TableSink/src/main/resources/flink-application-properties-dev.json +++ b/java/Iceberg/S3TableSink/src/main/resources/flink-application-properties-dev.json @@ -9,7 +9,7 @@ "PropertyGroupId": "Iceberg", "PropertyMap": { - "table.bucket.arn": "your-table-arn", + "table.bucket.arn": "", "catalog.db": "default", "catalog.table": "prices_s3table", "partition.fields": "symbol", From 56b87aa22236d2572f9be2871df9739a00cc3f66 Mon Sep 17 00:00:00 2001 From: Felix John Date: Tue, 11 Nov 2025 17:57:18 +0100 Subject: [PATCH 3/4] Fix --- java/Iceberg/S3TableSink/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/Iceberg/S3TableSink/README.md b/java/Iceberg/S3TableSink/README.md index e490f3c..480e318 100644 --- a/java/Iceberg/S3TableSink/README.md +++ b/java/Iceberg/S3TableSink/README.md @@ -50,7 +50,7 @@ The application must have IAM permissions to: When running on Amazon Managed Service for Apache Flink the runtime configuration is read from runtime properties. Make sure that you pass the mandatory parameter `table.bucket.arn`. When running locally, the configuration is read from the -[resources/flink-application-properties-dev.json](./src/main/resources/flink-application-properties-dev.json) file. Make sure that the scope of the dependencides in `pom.xml` is set to `default`when running locally. +[resources/flink-application-properties-dev.json](./src/main/resources/flink-application-properties-dev.json) file. Make sure that the scope of the dependencides in `pom.xml` is set to `compile`when running locally. Runtime parameters: From 6d3776b85715dca05f641dc8aede355235e0b24b Mon Sep 17 00:00:00 2001 From: Felix John Date: Fri, 14 Nov 2025 10:17:44 +0100 Subject: [PATCH 4/4] Updated catalog and table versions to latest --- java/Iceberg/S3TableSink/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/java/Iceberg/S3TableSink/pom.xml b/java/Iceberg/S3TableSink/pom.xml index df782be..6da10e1 100644 --- a/java/Iceberg/S3TableSink/pom.xml +++ b/java/Iceberg/S3TableSink/pom.xml @@ -107,7 +107,7 @@ software.amazon.awssdk s3tables - 2.29.26 + 2.38.2 ch.qos.logback @@ -118,7 +118,7 @@ software.amazon.s3tables s3-tables-catalog-for-iceberg - 0.1.5 + 0.1.8 ch.qos.logback