From 9f14327d158d5ebfbe4f22d963f4d8edbbf72ef6 Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Thu, 28 May 2026 20:43:54 +0800 Subject: [PATCH] [lake] Fix DvTableReadableSnapshotRetriever scanning wrong snapshot due to ineffective options getBucketsWithoutL0AndWithL0() passed SCAN_SNAPSHOT_ID and BATCH_SCAN_MODE via table options to store().newScan(), but these options are only consumed by table-level scans (DataTableBatchScan), not by store-level scans (AbstractFileStoreScan). This means the scan always hit the latest snapshot instead of the specified one. Fix by using the direct .withSnapshot(snapshot) API on FileStoreScan, consistent with getBucketsWithFlushedL0() in the same file. Co-Authored-By: Claude Opus 4.6 --- .../utils/DvTableReadableSnapshotRetriever.java | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java index 54c3c7f948..3412829fa7 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java @@ -32,7 +32,6 @@ import org.apache.fluss.utils.ExceptionUtils; import org.apache.fluss.utils.types.Tuple2; -import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.BinaryString; @@ -495,16 +494,10 @@ private LakeSnapshot getOrFetchLakeSnapshot(long snapshotId, Map bucketsWithoutL0 = new HashSet<>(); Set bucketsWithL0 = new HashSet<>(); - // Scan the snapshot to get all splits including level0 - Map scanOptions = new HashMap<>(); - scanOptions.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), String.valueOf(snapshot.id())); - // hacky: set batch scan mode to compact to make sure we can get l0 level files - scanOptions.put( - CoreOptions.BATCH_SCAN_MODE.key(), CoreOptions.BatchScanMode.COMPACT.getValue()); - + // Scan the snapshot to get all data files including L0 level files Map>> manifestsByBucket = FileStoreScan.Plan.groupByPartFiles( - fileStoreTable.copy(scanOptions).store().newScan().plan().files()); + fileStoreTable.store().newScan().withSnapshot(snapshot).plan().files()); for (Map.Entry>> manifestsByBucketEntry : manifestsByBucket.entrySet()) {