diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index af7e996c8bef..a1b016fecfee 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -1187,6 +1187,12 @@ String When a batch job queries from a table, if a partition does not exist in the current branch, the reader will try to get this partition from this fallback branch. + +
scan.fallback-branch.read-fail-fast
+ false + Boolean + Whether to fail the read immediately when reading from a fallback branch throws. By default the failure is logged with the full stack trace and the reader falls through to the current branch, which can mask data issues. Set this to true to surface fallback branch errors to the caller instead. +
scan.fallback-delta-branch
(none) diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 2f24db43f4db..3364ca4d3928 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -1972,6 +1972,16 @@ public InlineElement getDescription() { "When a batch job queries from a table, if a partition does not exist in the current branch, " + "the reader will try to get this partition from this fallback branch."); + public static final ConfigOption SCAN_FALLBACK_BRANCH_READ_FAIL_FAST = + key("scan.fallback-branch.read-fail-fast") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to fail the read immediately when reading from a fallback branch throws. " + + "By default the failure is logged with the full stack trace and the reader " + + "falls through to the current branch, which can mask data issues. " + + "Set this to true to surface fallback branch errors to the caller instead."); + public static final ConfigOption SCAN_PRIMARY_BRANCH = key("scan.primary-branch") .stringType() diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java index addca008d197..5c32135f764a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java @@ -573,12 +573,17 @@ private class Read implements InnerTableRead { private final InnerTableRead mainRead; private final InnerTableRead fallbackRead; + private final boolean fallbackReadFailFast; private Read() { FileStoreTable first = wrappedFirst ? wrapped : other; FileStoreTable second = wrappedFirst ? other : wrapped; this.mainRead = first.newRead(); this.fallbackRead = second.newRead(); + this.fallbackReadFailFast = + wrapped.coreOptions() + .toConfiguration() + .get(CoreOptions.SCAN_FALLBACK_BRANCH_READ_FAIL_FAST); } @Override @@ -623,10 +628,23 @@ public RecordReader createReader(Split split) throws IOException { if (fallbackSplit.isFallback()) { try { return fallbackRead.createReader(fallbackSplit.wrapped()); - } catch (Exception ignored) { + } catch (Exception e) { + if (fallbackReadFailFast) { + if (e instanceof IOException) { + throw (IOException) e; + } + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } + throw new IOException( + "Failed to read fallback branch split: " + + fallbackSplit.wrapped(), + e); + } LOG.error( "Reading from supplemental branch has problems: {}", - fallbackSplit.wrapped()); + fallbackSplit.wrapped(), + e); } } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java index 6a1f782fd2d6..7f586875a193 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java @@ -26,6 +26,7 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.options.Options; +import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; @@ -34,7 +35,9 @@ import org.apache.paimon.table.sink.StreamTableCommit; import org.apache.paimon.table.sink.StreamTableWrite; import org.apache.paimon.table.source.DataTableScan; +import org.apache.paimon.table.source.InnerTableRead; import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.source.TableRead; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; @@ -42,10 +45,13 @@ import org.apache.paimon.utils.TraceableFileIO; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mockito; +import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.UUID; @@ -53,6 +59,7 @@ import static org.apache.paimon.table.SchemaEvolutionTableTestBase.rowData; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link FallbackReadFileStoreTable}. */ public class FallbackReadFileStoreTableTest { @@ -242,6 +249,89 @@ public void testWriteGoesToWrapped(boolean wrappedFirst) throws Exception { assertThat(mergedPartitions).containsExactlyInAnyOrder(1, 2, 3); } + @Test + public void testFallbackReadFailFastDefaultSwallowsException() throws Exception { + FallbackReadFileStoreTable table = setUpTableWithThrowingFallback(false); + Split split = onlyFallbackSplit(table); + + // Default behavior: the failing fallback read is swallowed and the reader + // falls through to the main branch, which has no data for partition 3 and + // either returns an empty reader or throws something other than the + // injected fallback exception. + try { + table.newRead().createReader(split); + } catch (Exception e) { + assertThat(e.getMessage()) + .as("fallback exception must not propagate when fail-fast is disabled") + .doesNotContain("injected fallback failure"); + } + } + + @Test + public void testFallbackReadFailFastPropagatesException() throws Exception { + FallbackReadFileStoreTable table = setUpTableWithThrowingFallback(true); + Split split = onlyFallbackSplit(table); + + assertThatThrownBy(() -> table.newRead().createReader(split)) + .hasMessageContaining("injected fallback failure"); + } + + private FallbackReadFileStoreTable setUpTableWithThrowingFallback(boolean failFast) + throws Exception { + String branchName = "bc"; + FileStoreTable mainTable = createTable(); + writeDataIntoTable(mainTable, 0, rowData(1, 10)); + mainTable.createBranch(branchName); + FileStoreTable branchTable = createTableFromBranch(mainTable, branchName); + writeDataIntoTable(branchTable, 0, rowData(3, 60)); + + Options overrides = new Options(); + overrides.set(CoreOptions.SCAN_FALLBACK_BRANCH_READ_FAIL_FAST, failFast); + FileStoreTable mainWithOption = mainTable.copy(overrides.toMap()); + + FileStoreTable spyBranch = Mockito.spy(branchTable); + InnerTableRead throwing = throwingInnerTableRead(); + Mockito.doReturn(throwing).when(spyBranch).newRead(); + + return new FallbackReadFileStoreTable(mainWithOption, spyBranch, true); + } + + private static Split onlyFallbackSplit(FallbackReadFileStoreTable table) { + DataTableScan scan = table.newScan(); + scan.withFilter(new PredicateBuilder(ROW_TYPE).equal(0, 3)); + List splits = scan.plan().splits(); + assertThat(splits).hasSize(1); + FallbackReadFileStoreTable.FallbackSplit fs = + (FallbackReadFileStoreTable.FallbackSplit) splits.get(0); + assertThat(fs.isFallback()).isTrue(); + return splits.get(0); + } + + private static InnerTableRead throwingInnerTableRead() { + return new InnerTableRead() { + @Override + public InnerTableRead withFilter(Predicate predicate) { + return this; + } + + @Override + public InnerTableRead withReadType(RowType readType) { + return this; + } + + @Override + public TableRead withIOManager(org.apache.paimon.disk.IOManager ioManager) { + return this; + } + + @Override + public org.apache.paimon.reader.RecordReader createReader(Split split) + throws IOException { + throw new IOException("injected fallback failure"); + } + }; + } + private void writeDataIntoTable( FileStoreTable table, long commitIdentifier, InternalRow... allData) throws Exception { StreamTableWrite write = table.newWrite(commitUser);