Skip to content

[Flink] Kvscan flink integration#3383

Open
polyzos wants to merge 13 commits into
apache:mainfrom
polyzos:kvscan-flink-integration
Open

[Flink] Kvscan flink integration#3383
polyzos wants to merge 13 commits into
apache:mainfrom
polyzos:kvscan-flink-integration

Conversation

@polyzos
Copy link
Copy Markdown
Contributor

@polyzos polyzos commented May 26, 2026

#3126 extended the java client to support kvscan for the live rocksdb table.

This PR integrates that functionality inside the flink connector

Introduces KvBatchSplit and KvBatchSplitState as a new split type in the Flink source, enabling the connector to perform bounded full-table scans on primary-key tables via the server-side KV scan API (FIP-17), rather than reading from snapshots.

@polyzos polyzos force-pushed the kvscan-flink-integration branch from ddb782e to ecdb181 Compare May 26, 2026 12:21
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 17 out of 17 changed files in this pull request and generated 2 comments.

Comment thread website/docs/engine-flink/options.md Outdated
@polyzos polyzos requested review from loserwang1024 and wuchong May 27, 2026 06:20
Enable the feature by setting `client.scanner.kv.server-side.enabled = true` on the table or as a SQL hint:

- This is a **bounded** read. The source finishes once all buckets have been drained and does not continue reading the change-log.
- On task restart, each bucket is rescanned from scratch. Progress within a scan session is not checkpointed, because an expired or invalidated server-side session cannot be resumed from a mid-point.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On task restart, each bucket is scanned again from the beginning. Progress within an active scan session is not checkpointed because expired or invalidated server-side sessions cannot be resumed from an intermediate position.


- This is a **bounded** read. The source finishes once all buckets have been drained and does not continue reading the change-log.
- On task restart, each bucket is rescanned from scratch. Progress within a scan session is not checkpointed, because an expired or invalidated server-side session cannot be resumed from a mid-point.
- The feature is disabled by default (`false`). Without it, unbounded (streaming) reads on primary-key tables work as usual; bounded reads require the data-lake integration to be enabled.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When disabled, unbounded (streaming) reads on primary-key tables continue to work as usual. Bounded reads require data-lake integration unless server-side KV scanning is enabled.

SELECT * FROM pk_table;
```

You can also enable the feature dynamically without storing it in the table metadata:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also enable server-side scanning dynamically without storing the option in table metadata:

```

### Limit Read
The Fluss source supports limiting reads for both primary-key tables and log tables, making it convenient to preview the latest `N` records in a table.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

making it easy to preview the latest N records in a table.


Fluss can perform a bounded full-table scan on a primary-key table directly via the server-side KV scan API.

Enable the feature by setting `client.scanner.kv.server-side.enabled = true` on the table or as a SQL hint:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enable this feature by setting client.scanner.kv.server-side.enabled = true in the table options or by using a SQL hint.


Enable the feature by setting `client.scanner.kv.server-side.enabled = true` on the table or as a SQL hint:

- This is a **bounded** read. The source finishes once all buckets have been drained and does not continue reading the change-log.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The source finishes after all buckets have been scanned and does not continue consuming the change log.

| scan.partition.discovery.interval | Duration | 1min | The time interval for the Fluss source to discover the new partitions for partitioned table while scanning. A non-positive value disables the partition discovery. The default value is 1 minute. Currently, since Fluss Admin#listPartitions(TablePath tablePath) requires a large number of requests to ZooKeeper in server, this option cannot be set too small, as a small value would cause frequent requests and increase server load. In the future, once list partitions is optimized, the default value of this parameter can be reduced. |
| scan.kv.snapshot.lease.id | String | UUID | The lease ID used to protect acquired KV snapshots from deletion. If specified, the snapshots will be retained until either the consumer finishes processing all of them or the lease duration expires. By default, this value is set to a randomly generated UUID string if not explicitly provided. |
| scan.kv.snapshot.lease.duration | Duration | 1day | The time period how long to wait before expiring the kv snapshot lease to avoid kv snapshot blocking to delete. |
| client.scanner.kv.server-side.enabled | Boolean | false | Master switch for using the server-side KV scan (FIP-17) in bounded reads of primary-key tables when no KV snapshot file is available. When false (default), bounded primary-key reads fall back to the prior behavior (log-only when lake is enabled, or fail when lake is disabled). See [Full Scan of Primary Key Tables](engine-flink/reads.md#full-scan-of-primary-key-tables) for details. |
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enables server-side KV scanning (FIP-17) for bounded reads on primary-key tables when no KV snapshot file is available. When disabled (default), bounded reads fall back to the previous behavior: read from the log when data-lake integration is enabled, or fail when it is disabled.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants