From 02cfea42483c12fc8e610c7dd53bea9a49558985 Mon Sep 17 00:00:00 2001 From: JunRuiLee Date: Tue, 10 Mar 2026 10:54:55 +0800 Subject: [PATCH] [core] Fix FallbackReadScan#plan mixing partition and data filters --- .../paimon/table/ChainGroupReadTable.java | 58 +++------- .../table/FallbackReadFileStoreTable.java | 100 ++++++++++++++++-- .../table/system/ReadOptimizedTable.java | 7 +- .../table/FallbackReadFileStoreTableTest.java | 63 +++++++++++ 4 files changed, 177 insertions(+), 51 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java index 36f9778ee381..23798820702c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java @@ -130,11 +130,9 @@ public static class ChainTableBatchScan extends FallbackReadScan { private final RowDataToObjectArrayConverter partitionConverter; private final InternalRowPartitionComputer partitionComputer; - private final TableSchema tableSchema; private final CoreOptions options; private final RecordComparator partitionComparator; private final ChainGroupReadTable chainGroupReadTable; - private PartitionPredicate partitionPredicate; private Predicate dataPredicate; private Filter bucketFilter; @@ -143,8 +141,12 @@ public ChainTableBatchScan( DataTableScan fallbackScan, TableSchema tableSchema, ChainGroupReadTable chainGroupReadTable) { - super(mainScan, fallbackScan); - this.tableSchema = tableSchema; + super( + mainScan, + fallbackScan, + chainGroupReadTable.wrapped, + chainGroupReadTable.fallback(), + tableSchema); this.options = CoreOptions.fromMap(tableSchema.options()); this.chainGroupReadTable = chainGroupReadTable; this.partitionConverter = @@ -169,7 +171,6 @@ public ChainTableBatchScan withFilter(Predicate predicate) { predicate, tableSchema.logicalRowType(), tableSchema.partitionKeys()); - setPartitionPredicate(pair.getLeft().orElse(null)); dataPredicate = pair.getRight().isEmpty() ? null : PredicateBuilder.and(pair.getRight()); } @@ -179,57 +180,30 @@ public ChainTableBatchScan withFilter(Predicate predicate) { @Override public ChainTableBatchScan withPartitionFilter(Map partitionSpec) { super.withPartitionFilter(partitionSpec); - if (partitionSpec != null) { - setPartitionPredicate( - PartitionPredicate.fromMap( - tableSchema.logicalPartitionType(), - partitionSpec, - options.partitionDefaultName())); - } return this; } @Override public ChainTableBatchScan withPartitionFilter(List partitions) { super.withPartitionFilter(partitions); - if (partitions != null) { - setPartitionPredicate( - PartitionPredicate.fromMultiple( - tableSchema.logicalPartitionType(), partitions)); - } return this; } @Override public ChainTableBatchScan withPartitionsFilter(List> partitions) { super.withPartitionsFilter(partitions); - if (partitions != null) { - setPartitionPredicate( - PartitionPredicate.fromMaps( - tableSchema.logicalPartitionType(), - partitions, - options.partitionDefaultName())); - } return this; } @Override public ChainTableBatchScan withPartitionFilter(PartitionPredicate partitionPredicate) { super.withPartitionFilter(partitionPredicate); - if (partitionPredicate != null) { - setPartitionPredicate(partitionPredicate); - } return this; } @Override public ChainTableBatchScan withPartitionFilter(Predicate partitionPredicate) { super.withPartitionFilter(partitionPredicate); - if (partitionPredicate != null) { - setPartitionPredicate( - PartitionPredicate.fromPredicate( - tableSchema.logicalPartitionType(), partitionPredicate)); - } return this; } @@ -252,6 +226,7 @@ public ChainTableBatchScan withBucketFilter(Filter bucketFilter) { @Override public Plan plan() { List splits = new ArrayList<>(); + PartitionPredicate partitionPredicate = getPartitionPredicate(); PredicateBuilder builder = new PredicateBuilder(tableSchema.logicalPartitionType()); for (Split split : mainScan.plan().splits()) { DataSplit dataSplit = (DataSplit) split; @@ -271,9 +246,11 @@ public Plan plan() { Set snapshotPartitions = new HashSet<>( - newPartitionListingScan(true, partitionPredicate).listPartitions()); + newChainPartitionListingScan(true, partitionPredicate) + .listPartitions()); - DataTableScan deltaPartitionScan = newPartitionListingScan(false, partitionPredicate); + DataTableScan deltaPartitionScan = + newChainPartitionListingScan(false, partitionPredicate); List deltaPartitions = deltaPartitionScan.listPartitions().stream() .filter(p -> !snapshotPartitions.contains(p)) @@ -292,7 +269,7 @@ public Plan plan() { PartitionPredicate.fromPredicate( tableSchema.logicalPartitionType(), snapshotPredicate); DataTableScan snapshotPartitionsScan = - newPartitionListingScan(true, snapshotPartitionPredicate); + newChainPartitionListingScan(true, snapshotPartitionPredicate); List candidateSnapshotPartitions = snapshotPartitionsScan.listPartitions(); candidateSnapshotPartitions = @@ -393,8 +370,9 @@ public Plan plan() { @Override public List listPartitionEntries() { - DataTableScan snapshotScan = newPartitionListingScan(true, partitionPredicate); - DataTableScan deltaScan = newPartitionListingScan(false, partitionPredicate); + PartitionPredicate partitionPredicate = getPartitionPredicate(); + DataTableScan snapshotScan = newChainPartitionListingScan(true, partitionPredicate); + DataTableScan deltaScan = newChainPartitionListingScan(false, partitionPredicate); List partitionEntries = new ArrayList<>(snapshotScan.listPartitionEntries()); Set partitions = @@ -408,11 +386,7 @@ public List listPartitionEntries() { return partitionEntries; } - private void setPartitionPredicate(PartitionPredicate predicate) { - this.partitionPredicate = predicate; - } - - private DataTableScan newPartitionListingScan( + private DataTableScan newChainPartitionListingScan( boolean snapshot, PartitionPredicate scanPartitionPredicate) { DataTableScan scan = snapshot diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java index 65e875c6a59f..aff2aa9a2348 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java @@ -47,6 +47,7 @@ import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Filter; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SegmentsCache; @@ -189,7 +190,8 @@ protected Map rewriteFallbackOptions(Map options @Override public DataTableScan newScan() { validateSchema(); - return new FallbackReadScan(wrapped.newScan(), fallback.newScan()); + return new FallbackReadScan( + wrapped.newScan(), fallback.newScan(), wrapped, fallback, wrapped.schema()); } protected void validateSchema() { @@ -356,10 +358,22 @@ public static class FallbackReadScan implements DataTableScan { protected final DataTableScan mainScan; protected final DataTableScan fallbackScan; - - public FallbackReadScan(DataTableScan mainScan, DataTableScan fallbackScan) { + protected final FileStoreTable wrappedTable; + protected final FileStoreTable fallbackTable; + protected final TableSchema tableSchema; + private PartitionPredicate partitionPredicate; + + public FallbackReadScan( + DataTableScan mainScan, + DataTableScan fallbackScan, + FileStoreTable wrappedTable, + FileStoreTable fallbackTable, + TableSchema tableSchema) { this.mainScan = mainScan; this.fallbackScan = fallbackScan; + this.wrappedTable = wrappedTable; + this.fallbackTable = fallbackTable; + this.tableSchema = tableSchema; } @Override @@ -373,6 +387,14 @@ public FallbackReadScan withShard(int indexOfThisSubtask, int numberOfParallelSu public FallbackReadScan withFilter(Predicate predicate) { mainScan.withFilter(predicate); fallbackScan.withFilter(predicate); + if (predicate != null) { + Pair, List> pair = + PartitionPredicate.splitPartitionPredicatesAndDataPredicates( + predicate, + tableSchema.logicalRowType(), + tableSchema.partitionKeys()); + setPartitionPredicate(pair.getLeft().orElse(null)); + } return this; } @@ -387,6 +409,13 @@ public FallbackReadScan withLimit(int limit) { public FallbackReadScan withPartitionFilter(Map partitionSpec) { mainScan.withPartitionFilter(partitionSpec); fallbackScan.withPartitionFilter(partitionSpec); + if (partitionSpec != null) { + setPartitionPredicate( + PartitionPredicate.fromMap( + tableSchema.logicalPartitionType(), + partitionSpec, + CoreOptions.fromMap(tableSchema.options()).partitionDefaultName())); + } return this; } @@ -394,6 +423,11 @@ public FallbackReadScan withPartitionFilter(Map partitionSpec) { public FallbackReadScan withPartitionFilter(List partitions) { mainScan.withPartitionFilter(partitions); fallbackScan.withPartitionFilter(partitions); + if (partitions != null) { + setPartitionPredicate( + PartitionPredicate.fromMultiple( + tableSchema.logicalPartitionType(), partitions)); + } return this; } @@ -401,6 +435,13 @@ public FallbackReadScan withPartitionFilter(List partitions) { public InnerTableScan withPartitionsFilter(List> partitions) { mainScan.withPartitionsFilter(partitions); fallbackScan.withPartitionsFilter(partitions); + if (partitions != null) { + setPartitionPredicate( + PartitionPredicate.fromMaps( + tableSchema.logicalPartitionType(), + partitions, + CoreOptions.fromMap(tableSchema.options()).partitionDefaultName())); + } return this; } @@ -408,6 +449,21 @@ public InnerTableScan withPartitionsFilter(List> partitions) public InnerTableScan withPartitionFilter(PartitionPredicate partitionPredicate) { mainScan.withPartitionFilter(partitionPredicate); fallbackScan.withPartitionFilter(partitionPredicate); + if (partitionPredicate != null) { + setPartitionPredicate(partitionPredicate); + } + return this; + } + + @Override + public FallbackReadScan withPartitionFilter(Predicate partitionPredicate) { + mainScan.withPartitionFilter(partitionPredicate); + fallbackScan.withPartitionFilter(partitionPredicate); + if (partitionPredicate != null) { + setPartitionPredicate( + PartitionPredicate.fromPredicate( + tableSchema.logicalPartitionType(), partitionPredicate)); + } return this; } @@ -446,18 +502,26 @@ public InnerTableScan dropStats() { return this; } + /** + * Builds a plan for fallback read. + * + *

Partitions that exist in the main branch (based on partition predicates only) are + * treated as complete and are read from the main branch with the full predicate. Partitions + * that exist only in the fallback branch are read from the fallback branch. + */ @Override public TableScan.Plan plan() { List splits = new ArrayList<>(); - Set completePartitions = new HashSet<>(); + Set completePartitions = + new HashSet<>( + newPartitionListingScan(true, partitionPredicate).listPartitions()); for (Split split : mainScan.plan().splits()) { DataSplit dataSplit = (DataSplit) split; splits.add(toFallbackSplit(dataSplit, false)); - completePartitions.add(dataSplit.partition()); } List remainingPartitions = - fallbackScan.listPartitions().stream() + newPartitionListingScan(false, partitionPredicate).listPartitions().stream() .filter(p -> !completePartitions.contains(p)) .collect(Collectors.toList()); if (!remainingPartitions.isEmpty()) { @@ -471,18 +535,38 @@ public TableScan.Plan plan() { @Override public List listPartitionEntries() { + DataTableScan mainListingScan = newPartitionListingScan(true, partitionPredicate); + DataTableScan fallbackListingScan = newPartitionListingScan(false, partitionPredicate); List partitionEntries = - new ArrayList<>(mainScan.listPartitionEntries()); + new ArrayList<>(mainListingScan.listPartitionEntries()); Set partitions = partitionEntries.stream() .map(PartitionEntry::partition) .collect(Collectors.toSet()); - List fallBackPartitionEntries = fallbackScan.listPartitionEntries(); + List fallBackPartitionEntries = + fallbackListingScan.listPartitionEntries(); fallBackPartitionEntries.stream() .filter(e -> !partitions.contains(e.partition())) .forEach(partitionEntries::add); return partitionEntries; } + + protected void setPartitionPredicate(PartitionPredicate predicate) { + this.partitionPredicate = predicate; + } + + protected PartitionPredicate getPartitionPredicate() { + return partitionPredicate; + } + + private DataTableScan newPartitionListingScan( + boolean isMain, PartitionPredicate scanPartitionPredicate) { + DataTableScan scan = isMain ? wrappedTable.newScan() : fallbackTable.newScan(); + if (scanPartitionPredicate != null) { + scan.withPartitionFilter(scanPartitionPredicate); + } + return scan; + } } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java index 87d79bc53780..64ef8aa34e86 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java @@ -140,7 +140,12 @@ private SnapshotReader newSnapshotReader(FileStoreTable wrapped) { public DataTableScan newScan() { if (wrapped instanceof FallbackReadFileStoreTable) { FallbackReadFileStoreTable table = (FallbackReadFileStoreTable) wrapped; - return new FallbackReadScan(newScan(table.wrapped()), newScan(table.fallback())); + return new FallbackReadScan( + newScan(table.wrapped()), + newScan(table.fallback()), + table.wrapped(), + table.fallback(), + table.wrapped().schema()); } return newScan(wrapped); } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java index 1b7eeddd9503..7163202a8ea6 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java @@ -26,12 +26,15 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.options.Options; +import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.SchemaUtils; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.sink.StreamTableCommit; import org.apache.paimon.table.sink.StreamTableWrite; +import org.apache.paimon.table.source.DataTableScan; +import org.apache.paimon.table.source.Split; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; @@ -139,6 +142,66 @@ public void testListPartitionEntries() throws Exception { Pair.of(1, 2L), Pair.of(2, 1L), Pair.of(3, 1L), Pair.of(4, 1L)); } + /** + * Test that FallbackReadScan.plan() determines partition ownership based on partition + * predicates only, not mixed with data filters. If a partition exists in the main branch, it + * should never be read from fallback, regardless of the data filter. + * + *

Without the fix, the old code built completePartitions from mainScan.plan() results which + * already had data filters applied. When the data filter excluded all files of a main partition + * via filterByStats, that partition was incorrectly treated as "not in main" and read from + * fallback. + */ + @Test + public void testPlanWithDataFilter() throws Exception { + String branchName = "bc"; + + FileStoreTable mainTable = createTable(); + + // Main branch: partition 1 (a=10), partition 2 (a=20) + writeDataIntoTable(mainTable, 0, rowData(1, 10), rowData(2, 20)); + + mainTable.createBranch(branchName); + + FileStoreTable branchTable = createTableFromBranch(mainTable, branchName); + + // Fallback branch: partition 1 already has a=10 (inherited), add a=100. + // Also add partition 3 (a=30) which is fallback-only. + writeDataIntoTable(branchTable, 1, rowData(1, 100), rowData(3, 30)); + + FallbackReadFileStoreTable fallbackTable = + new FallbackReadFileStoreTable(mainTable, branchTable); + PredicateBuilder builder = new PredicateBuilder(ROW_TYPE); + + // Case 1: WHERE pt = 1 AND a = 100 + // Partition 1 exists in main branch. Even though main has no a=100 data, + // we should never fall back for it. The result should contain no fallback splits. + DataTableScan scan1 = fallbackTable.newScan(); + scan1.withFilter(PredicateBuilder.and(builder.equal(0, 1), builder.equal(1, 100))); + List splits1 = scan1.plan().splits(); + + for (Split split : splits1) { + FallbackReadFileStoreTable.FallbackSplit fs = + (FallbackReadFileStoreTable.FallbackSplit) split; + assertThat(fs.isFallback()) + .as("Partition that exists in main branch should never be read from fallback") + .isFalse(); + } + + // Case 2: WHERE pt = 3 AND a = 30 + // Partition 3 only exists in fallback branch, so it should be read from fallback. + DataTableScan scan2 = fallbackTable.newScan(); + scan2.withFilter(PredicateBuilder.and(builder.equal(0, 3), builder.equal(1, 30))); + List splits2 = scan2.plan().splits(); + + assertThat(splits2).hasSize(1); + FallbackReadFileStoreTable.FallbackSplit fs2 = + (FallbackReadFileStoreTable.FallbackSplit) splits2.get(0); + assertThat(fs2.isFallback()) + .as("Partition that only exists in fallback branch should be read from fallback") + .isTrue(); + } + private void writeDataIntoTable( FileStoreTable table, long commitIdentifier, InternalRow... allData) throws Exception { StreamTableWrite write = table.newWrite(commitUser);