From e9424f5021f2c68c77029ff84f29a257cdded1cf Mon Sep 17 00:00:00 2001 From: lavishlohiya Date: Fri, 7 Nov 2025 20:30:24 +0530 Subject: [PATCH 1/3] docs: add Flink CDC integration section under Flink Engine --- website/docs/quickstart/flink.md | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/website/docs/quickstart/flink.md b/website/docs/quickstart/flink.md index 8faf764a3a..0c9d4ac69b 100644 --- a/website/docs/quickstart/flink.md +++ b/website/docs/quickstart/flink.md @@ -509,3 +509,31 @@ to stop all containers. ## Learn more Now that you're up and running with Fluss and Flink, check out the [Apache Flink Engine](engine-flink/getting-started.md) docs to learn more features with Flink or [this guide](/maintenance/observability/quickstart.md) to learn how to set up an observability stack for Fluss and Flink. + +## Flink CDC Integration + +Flink CDC (Change Data Capture) provides rich connectors to easily capture changes from databases like **PostgreSQL** and bring them into **Fluss**. + +For more details, see the official Flink CDC documentation: +🔗 https://nightlies.apache.org/flink/flink-cdc-docs-release-3.5/docs/connectors/pipeline-connectors/fluss/ + +You can use Flink CDC with Fluss in two ways: + +### 1. Flink CDC Pipeline Connector + +Use a **YAML pipeline** file to define synchronization between PostgreSQL and Fluss. + +**Example:** + +```yaml +source: + type: postgresql + hostname: localhost + port: 5432 + database: mydb + username: user + password: pass + +sink: + type: fluss + topic: my-topic From 320b7cdf58f4ff11cd8f8e1d4932d63bbb8f40f9 Mon Sep 17 00:00:00 2001 From: Thorne <63960540@qq.com> Date: Tue, 19 May 2026 16:12:03 +0800 Subject: [PATCH 2/3] add Flink CDC integration section under Flink Engine --- .../engine-flink/flink-cdc-integration.md | 212 ++++++++++++++++++ website/docs/quickstart/flink.md | 30 +-- 2 files changed, 213 insertions(+), 29 deletions(-) create mode 100644 website/docs/engine-flink/flink-cdc-integration.md diff --git a/website/docs/engine-flink/flink-cdc-integration.md b/website/docs/engine-flink/flink-cdc-integration.md new file mode 100644 index 0000000000..8c583536c3 --- /dev/null +++ b/website/docs/engine-flink/flink-cdc-integration.md @@ -0,0 +1,212 @@ +--- +sidebar_label: Flink CDC +title: Flink CDC Integration +sidebar_position: 9 +--- + +# Flink CDC Integration + +[Flink CDC](https://nightlies.apache.org/flink/flink-cdc-docs-master/) is a streaming data integration tool built on top of Apache Flink that can capture real-time changes from various databases. Flink CDC supports Fluss as a [Pipeline Sink Connector](https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/pipeline-connectors/fluss/), making it straightforward to sync CDC data from databases like PostgreSQL, MySQL, and Oracle into Fluss. + +There are two ways to sync database changes into Fluss using Flink CDC: + +- **Flink SQL with CDC connectors** — Use SQL to define CDC source tables and write data into Fluss tables. Best for per-table synchronization with SQL-native workflows. +- **Flink CDC Pipeline Connector** — Use a YAML pipeline definition to sync entire databases (including multiple tables) into Fluss. Best for whole-database synchronization. + +## Prerequisites + +- A running **Fluss cluster** (CoordinatorServer + TabletServer). See [Deploying with Docker](../install-deploy/deploying-with-docker.md) for setup instructions. +- A running **Flink cluster** with the required connector JARs. See [Getting Started with Flink](getting-started.md) for Flink setup. +- The required connector JARs placed under `/lib/`. The examples below use MySQL as the source, but other databases (PostgreSQL, Oracle, etc.) are also supported — see [Further Reading](#further-reading) for the full list of connectors. + - For SQL approach: [flink-sql-connector-mysql-cdc](https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-mysql-cdc) and the [Fluss Flink connector](getting-started.md) + - For Pipeline approach: [flink-cdc-pipeline-connector-fluss](https://mvnrepository.com/artifact/org.apache.flink/flink-cdc-pipeline-connector-fluss) + +## Example 1: Sync MySQL to Fluss with Flink SQL + +This example shows how to capture changes from a MySQL table and write them into a Fluss primary-key table using Flink SQL. + +### Step 1: Start MySQL with Docker + +Start a MySQL instance with binlog enabled using the Debezium example image: + +```shell +docker run -d --name mysql \ + -e MYSQL_ROOT_PASSWORD=123456 \ + -e MYSQL_USER=mysqluser \ + -e MYSQL_PASSWORD=mysqlpw \ + -p 3306:3306 \ + debezium/example-mysql:1.1 +``` + +Wait for the container to start, then connect to MySQL: + +```shell +docker exec -it mysql mysql -uroot -p123456 +``` + +Create a sample database and table: + +```sql +CREATE DATABASE mydb; +USE mydb; +CREATE TABLE orders ( + order_id INT AUTO_INCREMENT PRIMARY KEY, + customer_name VARCHAR(255), + product VARCHAR(255), + quantity INT, + order_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); + +INSERT INTO orders (customer_name, product, quantity) VALUES + ('Alice', 'Laptop', 1), + ('Bob', 'Phone', 2), + ('Charlie', 'Tablet', 3); +``` + +### Step 2: Create a MySQL CDC Source Table in Flink SQL + +Open the Flink SQL CLI and create a CDC source table that captures changes from the MySQL `orders` table: + +```sql title="Flink SQL" +CREATE TABLE mysql_orders ( + order_id INT, + customer_name STRING, + product STRING, + quantity INT, + order_date TIMESTAMP(3), + PRIMARY KEY (order_id) NOT ENFORCED +) WITH ( + 'connector' = 'mysql-cdc', + 'hostname' = 'localhost', + 'port' = '3306', + 'username' = 'root', + 'password' = '123456', + 'database-name' = 'mydb', + 'table-name' = 'orders' +); +``` + +### Step 3: Create a Fluss Sink Table + +Create a Fluss Catalog and a primary-key table in Fluss to receive the CDC data: + +```sql title="Flink SQL" +CREATE CATALOG fluss_catalog WITH ( + 'type' = 'fluss', + 'bootstrap.servers' = 'localhost:9123' +); + +USE CATALOG fluss_catalog; + +CREATE DATABASE IF NOT EXISTS mydb; + +CREATE TABLE mydb.orders ( + order_id INT, + customer_name STRING, + product STRING, + quantity INT, + order_date TIMESTAMP(3), + PRIMARY KEY (order_id) NOT ENFORCED +); +``` + +### Step 4: Sync Data + +Switch back to the default catalog and start the synchronization job: + +```sql title="Flink SQL" +USE CATALOG default_catalog; + +INSERT INTO fluss_catalog.mydb.orders +SELECT * FROM mysql_orders; +``` + +This starts a streaming job that continuously captures changes from MySQL and writes them to Fluss. + +### Step 5: Query the Data in Fluss + +You can now query the synced data in Fluss: + +```sql title="Flink SQL" +-- Switch to Fluss catalog +USE CATALOG fluss_catalog; + +-- Point query by primary key +SELECT * FROM mydb.orders WHERE order_id = 1; + +-- Streaming read to observe real-time changes +SELECT * FROM mydb.orders; +``` + +Try inserting or updating rows in MySQL — changes will be captured and reflected in Fluss in real time. Open a MySQL client: + +```shell +docker exec -it mysql mysql -uroot -p123456 mydb +``` + +Then execute: + +```sql +INSERT INTO orders (customer_name, product, quantity) VALUES ('Dave', 'Monitor', 2); +UPDATE orders SET quantity = 5 WHERE customer_name = 'Alice'; +``` + +## Example 2: Sync MySQL to Fluss with Pipeline Connector + +For whole-database synchronization, the Flink CDC Pipeline Connector allows you to define a YAML pipeline that syncs all tables from a MySQL database into Fluss automatically — without writing any SQL. + +:::note +This example reuses the MySQL container started in [Example 1](#example-1-sync-mysql-to-fluss-with-flink-sql). If you haven't started it yet, follow [Step 1](#step-1-start-mysql-with-docker) first. +::: + +### Step 1: Define the Pipeline YAML + +Create a file named `mysql-to-fluss.yaml`: + +```yaml +source: + type: mysql + name: MySQL Source + hostname: 127.0.0.1 + port: 3306 + username: root + password: 123456 + tables: mydb.\.* + server-id: 5401-5404 + +sink: + type: fluss + name: Fluss Sink + bootstrap.servers: localhost:9123 + +pipeline: + name: MySQL to Fluss Pipeline + parallelism: 2 +``` + +### Step 2: Submit the Pipeline + +Submit the pipeline using the Flink CDC CLI: + +```shell +./bin/flink-cdc.sh mysql-to-fluss.yaml +``` + +This will automatically create the corresponding tables in Fluss and start syncing data from all matching MySQL tables. + +For the full list of Pipeline Connector options, see the [Fluss Pipeline Connector Documentation](https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/pipeline-connectors/fluss/). + +## Clean Up + +After finishing the examples, stop and remove the MySQL container: + +```shell +docker stop mysql && docker rm mysql +``` + +## Further Reading + +- [Flink CDC Official Documentation](https://nightlies.apache.org/flink/flink-cdc-docs-master/) +- [Flink CDC Pipeline Connectors Overview](https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/pipeline-connectors/overview/) +- [Flink SQL CDC Source Connectors](https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/flink-sources/overview/) +- [Fluss Flink Engine Options](options.md) diff --git a/website/docs/quickstart/flink.md b/website/docs/quickstart/flink.md index 0c9d4ac69b..b7ddc57c2e 100644 --- a/website/docs/quickstart/flink.md +++ b/website/docs/quickstart/flink.md @@ -508,32 +508,4 @@ docker compose down -v to stop all containers. ## Learn more -Now that you're up and running with Fluss and Flink, check out the [Apache Flink Engine](engine-flink/getting-started.md) docs to learn more features with Flink or [this guide](/maintenance/observability/quickstart.md) to learn how to set up an observability stack for Fluss and Flink. - -## Flink CDC Integration - -Flink CDC (Change Data Capture) provides rich connectors to easily capture changes from databases like **PostgreSQL** and bring them into **Fluss**. - -For more details, see the official Flink CDC documentation: -🔗 https://nightlies.apache.org/flink/flink-cdc-docs-release-3.5/docs/connectors/pipeline-connectors/fluss/ - -You can use Flink CDC with Fluss in two ways: - -### 1. Flink CDC Pipeline Connector - -Use a **YAML pipeline** file to define synchronization between PostgreSQL and Fluss. - -**Example:** - -```yaml -source: - type: postgresql - hostname: localhost - port: 5432 - database: mydb - username: user - password: pass - -sink: - type: fluss - topic: my-topic +Now that you're up and running with Fluss and Flink, check out the [Apache Flink Engine](engine-flink/getting-started.md) docs to learn more features with Flink, [Flink CDC Integration](engine-flink/flink-cdc-integration.md) to sync database changes into Fluss, or [this guide](/maintenance/observability/quickstart.md) to learn how to set up an observability stack for Fluss and Flink. From 7910de71c6f7b4a508bb5e67a218464dd4e82149 Mon Sep 17 00:00:00 2001 From: Thorne <63960540@qq.com> Date: Tue, 26 May 2026 15:47:30 +0800 Subject: [PATCH 3/3] change source with pgsql --- .../engine-flink/flink-cdc-integration.md | 114 ++++++++++-------- 1 file changed, 65 insertions(+), 49 deletions(-) diff --git a/website/docs/engine-flink/flink-cdc-integration.md b/website/docs/engine-flink/flink-cdc-integration.md index 8c583536c3..9ae5969f32 100644 --- a/website/docs/engine-flink/flink-cdc-integration.md +++ b/website/docs/engine-flink/flink-cdc-integration.md @@ -1,12 +1,12 @@ --- sidebar_label: Flink CDC title: Flink CDC Integration -sidebar_position: 9 +sidebar_position: 10 --- # Flink CDC Integration -[Flink CDC](https://nightlies.apache.org/flink/flink-cdc-docs-master/) is a streaming data integration tool built on top of Apache Flink that can capture real-time changes from various databases. Flink CDC supports Fluss as a [Pipeline Sink Connector](https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/pipeline-connectors/fluss/), making it straightforward to sync CDC data from databases like PostgreSQL, MySQL, and Oracle into Fluss. +[Flink CDC](https://nightlies.apache.org/flink/flink-cdc-docs-master/) is a streaming data integration tool built on top of Apache Flink that can capture real-time changes from various databases. Flink CDC supports Fluss as a [Pipeline Sink Connector](https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/pipeline-connectors/fluss/), making it straightforward to sync CDC data from databases like MySQL, PostgreSQL, and Oracle into Fluss. There are two ways to sync database changes into Fluss using Flink CDC: @@ -17,40 +17,46 @@ There are two ways to sync database changes into Fluss using Flink CDC: - A running **Fluss cluster** (CoordinatorServer + TabletServer). See [Deploying with Docker](../install-deploy/deploying-with-docker.md) for setup instructions. - A running **Flink cluster** with the required connector JARs. See [Getting Started with Flink](getting-started.md) for Flink setup. -- The required connector JARs placed under `/lib/`. The examples below use MySQL as the source, but other databases (PostgreSQL, Oracle, etc.) are also supported — see [Further Reading](#further-reading) for the full list of connectors. - - For SQL approach: [flink-sql-connector-mysql-cdc](https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-mysql-cdc) and the [Fluss Flink connector](getting-started.md) +- The required connector JARs placed under `/lib/`. The examples below use PostgreSQL as the source, but other databases (MySQL, Oracle, etc.) are also supported — see [Further Reading](#further-reading) for the full list of connectors. + - For SQL approach: [flink-sql-connector-postgres-cdc](https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-postgres-cdc) - For Pipeline approach: [flink-cdc-pipeline-connector-fluss](https://mvnrepository.com/artifact/org.apache.flink/flink-cdc-pipeline-connector-fluss) -## Example 1: Sync MySQL to Fluss with Flink SQL +:::note +The Pipeline approach (Example 2) uses `./bin/flink-cdc.sh`, which is part of the **standalone Flink CDC distribution** (`flink-cdc--bin.tar.gz`) — it is **not** included in the standard Flink release. Download it from the [Flink CDC releases page](https://github.com/apache/flink-cdc/releases) and extract it. You can point it to your Flink installation via the `--flink-home` flag when submitting the pipeline. +::: -This example shows how to capture changes from a MySQL table and write them into a Fluss primary-key table using Flink SQL. +## Example 1: Sync PostgreSQL to Fluss with Flink SQL -### Step 1: Start MySQL with Docker +This example shows how to capture changes from a PostgreSQL table and write them into a Fluss primary-key table using Flink SQL. -Start a MySQL instance with binlog enabled using the Debezium example image: +### Step 1: Start PostgreSQL with Docker + +Start a PostgreSQL instance with logical replication enabled: ```shell -docker run -d --name mysql \ - -e MYSQL_ROOT_PASSWORD=123456 \ - -e MYSQL_USER=mysqluser \ - -e MYSQL_PASSWORD=mysqlpw \ - -p 3306:3306 \ - debezium/example-mysql:1.1 +docker run -d --name postgres \ + -e POSTGRES_PASSWORD=postgres \ + -p 5432:5432 \ + postgres:15 \ + postgres -c wal_level=logical ``` -Wait for the container to start, then connect to MySQL: +Wait for the container to start, then connect to PostgreSQL and create the database: ```shell -docker exec -it mysql mysql -uroot -p123456 +docker exec -it postgres psql -U postgres ``` -Create a sample database and table: - ```sql CREATE DATABASE mydb; -USE mydb; +``` + +Then create the table: + + +```sql CREATE TABLE orders ( - order_id INT AUTO_INCREMENT PRIMARY KEY, + order_id SERIAL PRIMARY KEY, customer_name VARCHAR(255), product VARCHAR(255), quantity INT, @@ -63,12 +69,12 @@ INSERT INTO orders (customer_name, product, quantity) VALUES ('Charlie', 'Tablet', 3); ``` -### Step 2: Create a MySQL CDC Source Table in Flink SQL +### Step 2: Create a PostgreSQL CDC Source Table in Flink SQL -Open the Flink SQL CLI and create a CDC source table that captures changes from the MySQL `orders` table: +Open the Flink SQL CLI and create a CDC source table that captures changes from the PostgreSQL `orders` table: ```sql title="Flink SQL" -CREATE TABLE mysql_orders ( +CREATE TABLE pg_orders ( order_id INT, customer_name STRING, product STRING, @@ -76,13 +82,15 @@ CREATE TABLE mysql_orders ( order_date TIMESTAMP(3), PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( - 'connector' = 'mysql-cdc', + 'connector' = 'postgres-cdc', 'hostname' = 'localhost', - 'port' = '3306', - 'username' = 'root', - 'password' = '123456', + 'port' = '5432', + 'username' = 'postgres', + 'password' = 'postgres', 'database-name' = 'mydb', - 'table-name' = 'orders' + 'schema-name' = 'public', + 'table-name' = 'orders', + 'slot.name' = 'flink_slot' ); ``` @@ -118,10 +126,10 @@ Switch back to the default catalog and start the synchronization job: USE CATALOG default_catalog; INSERT INTO fluss_catalog.mydb.orders -SELECT * FROM mysql_orders; +SELECT * FROM pg_orders; ``` -This starts a streaming job that continuously captures changes from MySQL and writes them to Fluss. +This starts a streaming job that continuously captures changes from PostgreSQL and writes them to Fluss. ### Step 5: Query the Data in Fluss @@ -138,10 +146,10 @@ SELECT * FROM mydb.orders WHERE order_id = 1; SELECT * FROM mydb.orders; ``` -Try inserting or updating rows in MySQL — changes will be captured and reflected in Fluss in real time. Open a MySQL client: +Try inserting or updating rows in PostgreSQL — changes will be captured and reflected in Fluss in real time. Open a PostgreSQL client: ```shell -docker exec -it mysql mysql -uroot -p123456 mydb +docker exec -it postgres psql -U postgres mydb ``` Then execute: @@ -151,28 +159,28 @@ INSERT INTO orders (customer_name, product, quantity) VALUES ('Dave', 'Monitor', UPDATE orders SET quantity = 5 WHERE customer_name = 'Alice'; ``` -## Example 2: Sync MySQL to Fluss with Pipeline Connector +## Example 2: Sync PostgreSQL to Fluss with Pipeline Connector -For whole-database synchronization, the Flink CDC Pipeline Connector allows you to define a YAML pipeline that syncs all tables from a MySQL database into Fluss automatically — without writing any SQL. +For whole-database synchronization, the Flink CDC Pipeline Connector allows you to define a YAML pipeline that syncs all tables from a PostgreSQL database into Fluss automatically — without writing any SQL. :::note -This example reuses the MySQL container started in [Example 1](#example-1-sync-mysql-to-fluss-with-flink-sql). If you haven't started it yet, follow [Step 1](#step-1-start-mysql-with-docker) first. +This example reuses the PostgreSQL container started in [Example 1](#example-1-sync-postgresql-to-fluss-with-flink-sql). If you haven't started it yet, follow [Step 1](#step-1-start-postgresql-with-docker) first. ::: ### Step 1: Define the Pipeline YAML -Create a file named `mysql-to-fluss.yaml`: +Create a file named `postgres-to-fluss.yaml`: ```yaml source: - type: mysql - name: MySQL Source + type: postgres + name: PostgreSQL Source hostname: 127.0.0.1 - port: 3306 - username: root - password: 123456 - tables: mydb.\.* - server-id: 5401-5404 + port: 5432 + username: postgres + password: postgres + tables: public.\.* + slot.name: flink_pipeline_slot sink: type: fluss @@ -180,28 +188,36 @@ sink: bootstrap.servers: localhost:9123 pipeline: - name: MySQL to Fluss Pipeline + name: PostgreSQL to Fluss Pipeline parallelism: 2 ``` ### Step 2: Submit the Pipeline -Submit the pipeline using the Flink CDC CLI: +Submit the pipeline using the Flink CDC CLI, passing `--flink-home` to point to your Flink installation: ```shell -./bin/flink-cdc.sh mysql-to-fluss.yaml +./bin/flink-cdc.sh --flink-home /path/to/flink postgres-to-fluss.yaml ``` -This will automatically create the corresponding tables in Fluss and start syncing data from all matching MySQL tables. +This will automatically create the corresponding tables in Fluss and start syncing data from all matching PostgreSQL tables. For the full list of Pipeline Connector options, see the [Fluss Pipeline Connector Documentation](https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/pipeline-connectors/fluss/). ## Clean Up -After finishing the examples, stop and remove the MySQL container: +Before stopping the container, drop the replication slots created by the examples. PostgreSQL replication slots are not removed automatically when a Flink job stops — leaving them active causes WAL files to accumulate and can exhaust disk space. + +```sql +SELECT pg_drop_replication_slot(slot_name) +FROM pg_replication_slots +WHERE slot_name IN ('flink_slot', 'flink_pipeline_slot'); +``` + +Then stop and remove the container: ```shell -docker stop mysql && docker rm mysql +docker stop postgres && docker rm postgres ``` ## Further Reading