Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 6 additions & 17 deletions java/Iceberg/S3TableSink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,15 @@

* Flink version: 1.19.0
* Flink API: DataStream API
* Iceberg 1.8.1
* Iceberg 1.9.1
* Language: Java (11)
* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/)
and [Iceberg](https://iceberg.apache.org/docs/latest/flink/)

This example demonstrate how to use
[Flink Iceberg Sink Connector](https://iceberg.apache.org/docs/latest/flink-writes/) with S3 Tables.

For simplicity, the application generates synthetic data, random stock prices, internally.
Data is generated as AVRO Generic Record, simulating a real source, for example a Kafka Source, that receives records
serialized with AVRO.
For simplicity, the application generates synthetic data, random stock prices, internally. Data is generated as AVRO Generic Record, simulating a real source, e.g. a Kafka Source, that receives records serialized with AVRO.

### Prerequisites

Expand Down Expand Up @@ -49,10 +47,10 @@ The application must have IAM permissions to:

### Runtime configuration

When running on Amazon Managed Service for Apache Flink the runtime configuration is read from Runtime Properties.
When running on Amazon Managed Service for Apache Flink the runtime configuration is read from runtime properties. Make sure that you pass the mandatory parameter `table.bucket.arn`.

When running locally, the configuration is read from the
[resources/flink-application-properties-dev.json](./src/main/resources/flink-application-properties-dev.json) file.
[resources/flink-application-properties-dev.json](./src/main/resources/flink-application-properties-dev.json) file. Make sure that the scope of the dependencides in `pom.xml` is set to `compile`when running locally.

Runtime parameters:

Expand All @@ -67,25 +65,17 @@ Runtime parameters:
| `Iceberg` | `upsert.equality.fields` | `symbol` | Comma separated list of fields used for upsert. It must match partition fields. Required if `operation` = `upsert`. |

### Checkpoints

Checkpointing must be enabled. Iceberg commits writes on checkpoint.

When running locally, the application enables checkpoints programmatically, every 10 seconds.
When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration.
Checkpointing must be enabled. Iceberg commits writes on checkpoint. When running locally, the application enables checkpoints programmatically, every 10 seconds.When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration.


### Known limitations

At the moment there are current limitations concerning Flink Iceberg integration with S3 Tables:
* * Currently, this example needs to be in Flink v1.19, v1.20 isn't supported with the S3 Table Sink yet.
* Doesn't support Iceberg Table with hidden partitioning
* Doesn't support adding columns, removing columns, renaming columns or changing columns.

### Schema and schema evolution

The application must "know" the AVRO schema on start.
The schema cannot be dynamically inferred based on the incoming records, for example using a schema registry.
This is due to a limitation of the Flink Iceberg integration, that requires knowing the table schema upfront.
The application must "know" the AVRO schema on start. The schema cannot be dynamically inferred based on the incoming records, for example using a schema registry. This is due to a limitation of the Flink Iceberg integration, that requires knowing the table schema upfront.

This implementation does support schema evolution in the incoming data, as long as new schema versions are FORWARD compatible.
Schema changes are not propagated to Iceberg.
Expand All @@ -98,7 +88,6 @@ It is technically possible to fetch the schema on application start from an exte
schema definition file in an S3 bucket. This is beyond the scope of this example.

### Running locally, in IntelliJ

You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation.

See [Running examples locally](https://github.com/nicusX/amazon-managed-service-for-apache-flink-examples/blob/main/java/running-examples-locally.md) for details.
148 changes: 94 additions & 54 deletions java/Iceberg/S3TableSink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@
<target.java.version>11</target.java.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>

<flink.version>1.19.0</flink.version>
<avro.version>1.11.3</avro.version>
<avro.version>1.12.0</avro.version>
<scala.version>2.12</scala.version>
<hadoop.version>3.4.0</hadoop.version>
<iceberg.version>1.6.1</iceberg.version>
<iceberg.version>1.9.1</iceberg.version>
<kda.runtime.version>1.2.0</kda.runtime.version>
<log4j.version>2.23.1</log4j.version>
<junit5.version>5.8.1</junit5.version>
Expand All @@ -35,46 +34,66 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<artifactId>flink-table-planner_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Flink Iceberg uses DropWizard metrics -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<artifactId>flink-connector-datagen</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- Flink Table Dependencies -->
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
<!-- Flink-Avro Dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.version}</artifactId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Avro Dependencies -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>

<!-- MSF Dependencies -->
<dependency>
Expand All @@ -89,23 +108,29 @@
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3tables</artifactId>
<version>2.29.26</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>software.amazon.s3tables</groupId>
<artifactId>s3-tables-catalog-for-iceberg</artifactId>
<version>0.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<version>0.1.5</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Hadoop Dependencies -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
Expand All @@ -116,52 +141,73 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Testing Dependencies -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${junit5.version}</version>
<scope>test</scope>
</dependency>

<!-- Iceberg Dependencies -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>${iceberg.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink</artifactId>
<artifactId>iceberg-aws</artifactId>
<version>${iceberg.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-aws-bundle</artifactId>
<artifactId>iceberg-flink-1.19</artifactId>
<version>${iceberg.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-aws</artifactId>
<artifactId>iceberg-api</artifactId>
<version>${iceberg.version}</version>
</dependency>

<!-- Testing Dependencies -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${junit5.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Logging Dependencies -->
Expand All @@ -179,13 +225,6 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-1.19</artifactId>
<version>1.7.0</version>
<scope>compile</scope>
</dependency>
</dependencies>

Expand All @@ -206,7 +245,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<version>3.5.0</version>
<executions>
<execution>
<phase>package</phase>
Expand All @@ -219,7 +258,7 @@
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
Expand All @@ -229,6 +268,7 @@
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>META-INF/versions/**</exclude>
</excludes>
</filter>
</filters>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;

public class StreamingJob {
Expand Down Expand Up @@ -58,7 +59,7 @@ private static DataGeneratorSource<GenericRecord> createDataGenerator(Properties
double recordsPerSecond = Double.parseDouble(generatorProperties.getProperty("records.per.sec", "10.0"));
Preconditions.checkArgument(recordsPerSecond > 0, "Generator records per sec must be > 0");

LOG.info("Data generator: {} record/sec", recordsPerSecond);
LOG.info("Data generator: {} record/sec", Optional.of(recordsPerSecond));
return new DataGeneratorSource<>(
new AvroGenericStockTradeGeneratorFunction(avroSchema),
Long.MAX_VALUE,
Expand All @@ -79,7 +80,7 @@ public static void main(String[] args) throws Exception {
}

Map<String, Properties> applicationProperties = loadApplicationProperties(env);
icebergProperties = applicationProperties.get("Iceberg");
icebergProperties = applicationProperties.getOrDefault("Iceberg", new Properties());

// Get AVRO Schema from the definition bundled with the application
// Note that the application must "knows" the AVRO schema upfront, i.e. the schema must be either embedded
Expand All @@ -98,14 +99,15 @@ public static void main(String[] args) throws Exception {

// Flink Sink Builder
FlinkSink.Builder icebergSinkBuilder = IcebergSinkBuilder.createBuilder(icebergProperties, genericRecordDataStream, avroSchema);

// Sink to Iceberg Table
icebergSinkBuilder.append();

env.execute("Flink S3 Table Sink");
}

private static DataStream<GenericRecord> createDataStream(StreamExecutionEnvironment env, Map<String, Properties> applicationProperties, Schema avroSchema) {
Properties dataGeneratorProperties = applicationProperties.get("DataGen");
Properties dataGeneratorProperties = applicationProperties.getOrDefault("DataGen", new Properties());
return env.fromSource(
createDataGenerator(dataGeneratorProperties, avroSchema),
WatermarkStrategy.noWatermarks(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.*;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.flink.sink.AvroGenericRecordToRowDataMapper;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
Expand All @@ -25,7 +26,7 @@ public class IcebergSinkBuilder {
private static final String DEFAULT_S3_CATALOG_DB = "default";
private static final String DEFAULT_ICEBERG_TABLE_NAME = "prices_iceberg";
private static final String DEFAULT_ICEBERG_PARTITION_FIELDS = "symbol";
private static final String DEFAULT_ICEBERG_OPERATION = "upsert";
private static final String DEFAULT_ICEBERG_OPERATION = "append";
private static final String DEFAULT_ICEBERG_UPSERT_FIELDS = "symbol";

/**
Expand Down
Loading