diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/RecordCoreTimeoutException.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/RecordCoreTimeoutException.java new file mode 100644 index 0000000000..be861f94e6 --- /dev/null +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/RecordCoreTimeoutException.java @@ -0,0 +1,56 @@ +/* + * RecordCoreTimeoutException.java + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2015-2025 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apple.foundationdb.record; + +import com.apple.foundationdb.annotation.API; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * This exception extends {@link RecordCoreException} and is specifically used to indicate + * timeout-related failures. + */ +@API(API.Status.UNSTABLE) +@SuppressWarnings("serial") +public class RecordCoreTimeoutException extends RecordCoreException { + public RecordCoreTimeoutException(@Nonnull String msg, @Nullable Object ... keyValues) { + super(msg, keyValues); + } + + public RecordCoreTimeoutException(Throwable cause) { + super(cause); + } + + public RecordCoreTimeoutException(@Nonnull String msg, @Nullable Throwable cause) { + super(msg, cause); + } + + public RecordCoreTimeoutException(@Nonnull String msg) { + super(msg); + } + + protected RecordCoreTimeoutException(String message, Throwable cause, + boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingBase.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingBase.java index 5095076d57..b23228f837 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingBase.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingBase.java @@ -20,7 +20,6 @@ package com.apple.foundationdb.record.provider.foundationdb; -import com.apple.foundationdb.FDBError; import com.apple.foundationdb.FDBException; import com.apple.foundationdb.MutationType; import com.apple.foundationdb.annotation.API; @@ -59,7 +58,6 @@ import java.time.DateTimeException; import java.time.Instant; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -1213,22 +1211,6 @@ protected static T findException(@Nullable Throwable ex, Class classT) { } return null; } - - protected static boolean shouldLessenWork(@Nullable FDBException ex) { - // These error codes represent a list of errors that can occur if there is too much work to be done - // in a single transaction. - if (ex == null) { - return false; - } - final Set lessenWorkCodes = new HashSet<>(Arrays.asList( - FDBError.TIMED_OUT.code(), - FDBError.TRANSACTION_TOO_OLD.code(), - FDBError.NOT_COMMITTED.code(), - FDBError.TRANSACTION_TIMED_OUT.code(), - FDBError.COMMIT_READ_INCOMPLETE.code(), - FDBError.TRANSACTION_TOO_LARGE.code())); - return lessenWorkCodes.contains(ex.getCode()); - } } diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingMerger.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingMerger.java index b3a4cb6dc6..c118848f70 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingMerger.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingMerger.java @@ -22,6 +22,7 @@ import com.apple.foundationdb.FDBException; import com.apple.foundationdb.async.AsyncUtil; +import com.apple.foundationdb.record.RecordCoreTimeoutException; import com.apple.foundationdb.record.logging.KeyValueLogMessage; import com.apple.foundationdb.record.logging.LogMessageKeys; import com.apple.foundationdb.record.metadata.Index; @@ -37,6 +38,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -148,11 +150,10 @@ private CompletableFuture handleFailure(final IndexDeferredMaintenanceC // merges. Not perfect, but as long as it's rare the impact should be minimal. mergeControl.mergeHadFailed(); // report to adjust stats - final FDBException ex = IndexingBase.findException(e, FDBException.class); - final IndexDeferredMaintenanceControl.LastStep lastStep = mergeControl.getLastStep(); - if (!IndexingBase.shouldLessenWork(ex)) { + if (shouldAbort(e)) { giveUpMerging(mergeControl, e); } + final IndexDeferredMaintenanceControl.LastStep lastStep = mergeControl.getLastStep(); switch (lastStep) { case REPARTITION: // Here: this exception might be resolved by reducing the number of documents to move during repartitioning @@ -175,6 +176,13 @@ private CompletableFuture handleFailure(final IndexDeferredMaintenanceC return AsyncUtil.READY_TRUE; // and retry } + private boolean shouldAbort(@Nullable Throwable e) { + return e == null || + (IndexingBase.findException(e, RecordCoreTimeoutException.class) == null && + IndexingBase.findException(e, TimeoutException.class) == null && + IndexingBase.findException(e, FDBException.class) == null); + } + private void handleRepartitioningFailure(final IndexDeferredMaintenanceControl mergeControl, Throwable e) { repartitionDocumentCount = mergeControl.getRepartitionDocumentCount(); if (repartitionDocumentCount == -1) { diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingThrottle.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingThrottle.java index c9e76ee1ef..eecb6ca7db 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingThrottle.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingThrottle.java @@ -20,6 +20,7 @@ package com.apple.foundationdb.record.provider.foundationdb; +import com.apple.foundationdb.FDBError; import com.apple.foundationdb.FDBException; import com.apple.foundationdb.annotation.API; import com.apple.foundationdb.async.AsyncUtil; @@ -133,7 +134,7 @@ boolean mayRetryAfterHandlingException(@Nullable FDBException fdbException, @Nullable List additionalLogMessageKeyValues, int currTries, final boolean adjustLimits) { - if (currTries >= common.config.getMaxRetries() || !IndexingBase.shouldLessenWork(fdbException)) { + if (currTries >= common.config.getMaxRetries() || !shouldLessenWork(fdbException)) { // Here: should not retry or no more retries. There is no real need to handle limits. return false; } @@ -144,6 +145,22 @@ boolean mayRetryAfterHandlingException(@Nullable FDBException fdbException, return true; } + private static boolean shouldLessenWork(@Nullable FDBException ex) { + // These error codes represent a list of errors that can occur if there is too much work to be done + // in a single transaction. + if (ex == null) { + return false; + } + final Set lessenWorkCodes = new HashSet<>(Arrays.asList( + FDBError.TIMED_OUT.code(), + FDBError.TRANSACTION_TOO_OLD.code(), + FDBError.NOT_COMMITTED.code(), + FDBError.TRANSACTION_TIMED_OUT.code(), + FDBError.COMMIT_READ_INCOMPLETE.code(), + FDBError.TRANSACTION_TOO_LARGE.code())); + return lessenWorkCodes.contains(ex.getCode()); + } + void decreaseLimit(@Nonnull FDBException fdbException, @Nullable List additionalLogMessageKeyValues) { // TODO: decrease the limit only for certain errors diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexerMergeTest.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexerMergeTest.java index 4aded5dcc7..9e6c91b9d3 100644 --- a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexerMergeTest.java +++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexerMergeTest.java @@ -28,6 +28,7 @@ import com.apple.foundationdb.record.IndexEntry; import com.apple.foundationdb.record.IndexScanType; import com.apple.foundationdb.record.IsolationLevel; +import com.apple.foundationdb.record.RecordCoreTimeoutException; import com.apple.foundationdb.record.RecordCursor; import com.apple.foundationdb.record.RecordMetaData; import com.apple.foundationdb.record.RecordMetaDataBuilder; @@ -44,12 +45,14 @@ import com.apple.foundationdb.record.provider.foundationdb.properties.RecordLayerPropertyStorage; import com.apple.foundationdb.record.query.QueryToKeyMatcher; import com.apple.foundationdb.tuple.Tuple; +import com.apple.test.BooleanSource; import com.apple.test.Tags; import com.google.auto.service.AutoService; import com.google.protobuf.Message; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -58,6 +61,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -67,6 +71,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Tests specifically of {@link OnlineIndexer#mergeIndex()}. @@ -197,6 +202,185 @@ void testMergeTimeout() { assertEquals(repeat(0, mergeLimits.size()), repartitionLimits); } + /** + * Test that a RuntimeException (non-FDB, non-timeout) causes the merger to abort immediately. + */ + @Test + void testNonRetriableExceptionAborts() { + final String indexType = "nonRetriableExceptionIndex"; + AtomicInteger attemptCount = new AtomicInteger(0); + + TestFactory.register(indexType, state -> { + adjustMergeControl(state); + attemptCount.incrementAndGet(); + + // Throw a RuntimeException that is not retriable + final CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new IllegalStateException("Non-retriable error")); + return future; + }); + + final FDBRecordStore.Builder storeBuilder = createStore(indexType); + try (OnlineIndexer indexer = OnlineIndexer.newBuilder() + .setRecordStoreBuilder(storeBuilder) + .setTargetIndexesByName(List.of(INDEX_NAME)) + .setMaxAttempts(10) + .build()) { + Assertions.assertThrows(IllegalStateException.class, indexer::mergeIndex); + } + + // Should only attempt once, no retries + assertEquals(1, attemptCount.get()); + } + + /** + * Test that a TimeoutException causes retry behavior (not abort). + */ + @ParameterizedTest + @BooleanSource + void testTimeoutExceptionRetries(boolean customTimeout) { + final String indexType = "timeoutExceptionIndex"; + final String exceptionMessage = "my timeout"; + AtomicInteger attemptCount = new AtomicInteger(0); + + TestFactory.register(indexType, state -> { + adjustMergeControl(state); + + attemptCount.incrementAndGet(); + + // Throw a TimeoutException (not FDB timeout) + final CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally( + customTimeout ? + new CustomOperationTimeoutException(exceptionMessage) : + new java.util.concurrent.TimeoutException(exceptionMessage)); + return future; + }); + + final FDBRecordStore.Builder storeBuilder = createStore(indexType); + try (OnlineIndexer indexer = OnlineIndexer.newBuilder() + .setRecordStoreBuilder(storeBuilder) + .setTargetIndexesByName(List.of(INDEX_NAME)) + .setMaxAttempts(5) + .build()) { + Exception thrownException = Assertions.assertThrows(Exception.class, indexer::mergeIndex); + // Assert that the timeout exception is in the cause chain + var timeoutCause = + customTimeout ? + findCause(thrownException, CustomOperationTimeoutException.class) : + findCause(thrownException, TimeoutException.class); + Assertions.assertNotNull(timeoutCause); + Assertions.assertEquals(exceptionMessage, timeoutCause.getMessage()); + } + // Assert multiple retries + assertTrue(1 < attemptCount.get()); + } + + private static void adjustMergeControl(final IndexMaintainerState state) { + final IndexDeferredMaintenanceControl mergeControl = state.store.getIndexDeferredMaintenanceControl(); + mergeControl.setLastStep(IndexDeferredMaintenanceControl.LastStep.MERGE); + if (mergeControl.getMergesLimit() == 0) { + mergeControl.setMergesTried(10); + mergeControl.setMergesFound(10); + } else { + mergeControl.setMergesTried(mergeControl.getMergesLimit()); + mergeControl.setMergesFound(10); + } + } + + @SuppressWarnings("serial") + private static class CustomOperationTimeoutException extends RecordCoreTimeoutException { + public CustomOperationTimeoutException(String message) { + super(message); + } + } + + private static T findCause(Throwable throwable, Class causeType) { + Throwable current = throwable; + while (current != null) { + if (causeType.isInstance(current)) { + return causeType.cast(current); + } + current = current.getCause(); + } + return null; + } + + /** + * Test that an FDBException wrapped in another exception still triggers retry behavior. + */ + @Test + void testWrappedFDBExceptionRetries() { + final String indexType = "wrappedFdbExceptionIndex"; + AtomicInteger attemptCount = new AtomicInteger(0); + + TestFactory.register(indexType, state -> { + adjustMergeControl(state); + + attemptCount.incrementAndGet(); + + // Wrap FDBException in multiple layers to test deep unwrapping + final CompletableFuture future = new CompletableFuture<>(); + FDBException fdbEx = new FDBException("Transaction too old", FDBError.TRANSACTION_TOO_OLD.code()); + RuntimeException wrapper = new RuntimeException("Wrapper level 1", + new IllegalStateException("Wrapper level 2", fdbEx)); + future.completeExceptionally(wrapper); + return future; + }); + + final FDBRecordStore.Builder storeBuilder = createStore(indexType); + try (OnlineIndexer indexer = OnlineIndexer.newBuilder() + .setRecordStoreBuilder(storeBuilder) + .setTargetIndexesByName(List.of(INDEX_NAME)) + .setMaxAttempts(5) + .build()) { + + Exception thrownException = Assertions.assertThrows(Exception.class, indexer::mergeIndex); + + // Assert that FDBException is in the cause chain + FDBException fdbCause = findCause(thrownException, FDBException.class); + Assertions.assertNotNull(fdbCause); + assertEquals(FDBError.TRANSACTION_TOO_OLD.code(), fdbCause.getCode()); + } + + // Assert multiple retries + assertTrue(1 < attemptCount.get()); + } + + /** + * Test that a non-retriable exception during REPARTITION phase causes immediate abort. + */ + @Test + void testNonRetriableExceptionDuringRepartitionAborts() { + final String indexType = "nonRetriableRepartitionIndex"; + AtomicInteger attemptCount = new AtomicInteger(0); + + TestFactory.register(indexType, state -> { + final IndexDeferredMaintenanceControl mergeControl = state.store.getIndexDeferredMaintenanceControl(); + mergeControl.setLastStep(IndexDeferredMaintenanceControl.LastStep.REPARTITION); + mergeControl.setRepartitionDocumentCount(20); + + attemptCount.incrementAndGet(); + + // Throw a non-retriable exception during repartition + final CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new NullPointerException("Unexpected null during repartition")); + return future; + }); + + final FDBRecordStore.Builder storeBuilder = createStore(indexType); + try (OnlineIndexer indexer = OnlineIndexer.newBuilder() + .setRecordStoreBuilder(storeBuilder) + .setTargetIndexesByName(List.of(INDEX_NAME)) + .setMaxAttempts(10) + .build()) { + Assertions.assertThrows(NullPointerException.class, indexer::mergeIndex); + } + + // Should only attempt once, no retries + assertEquals(1, attemptCount.get()); + } + @Nonnull private FDBRecordStore.Builder createStore(@Nonnull final String indexType) { Index index = new Index(INDEX_NAME, Key.Expressions.field("num_value_2"), @@ -333,7 +517,15 @@ public CompletableFuture mergeIndex() { @Nonnull @Override public Iterable getIndexTypes() { - return List.of("repartitionTimeoutIndex", "mergeTimeoutIndex", "mergeLimitedIndex"); + return List.of( + "repartitionTimeoutIndex", + "mergeLimitedIndex", + "nonRetriableExceptionIndex", + "wrappedFdbExceptionIndex", + "mergeTimeoutIndex", + "timeoutExceptionIndex", + "nonRetriableRepartitionIndex" + ); } @Nonnull diff --git a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneConcurrency.java b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneConcurrency.java index 35de0b7cb5..7d5da16b86 100644 --- a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneConcurrency.java +++ b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneConcurrency.java @@ -22,7 +22,7 @@ import com.apple.foundationdb.annotation.API; import com.apple.foundationdb.async.MoreAsyncUtil; -import com.apple.foundationdb.record.RecordCoreException; +import com.apple.foundationdb.record.RecordCoreTimeoutException; import com.apple.foundationdb.record.logging.LogMessageKeys; import com.apple.foundationdb.record.provider.common.StoreTimer; import com.apple.foundationdb.record.provider.foundationdb.FDBDatabase; @@ -119,7 +119,7 @@ private LuceneConcurrency() { /** * An exception that is thrown when the async to sync operation times out. */ - public static class AsyncToSyncTimeoutException extends RecordCoreException { + public static class AsyncToSyncTimeoutException extends RecordCoreTimeoutException { private static final long serialVersionUID = -1L; public AsyncToSyncTimeoutException(final String message, final Throwable cause) { diff --git a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/FDBLuceneIndexFailureTest.java b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/FDBLuceneIndexFailureTest.java index cf20e7ade8..2fbe10a946 100644 --- a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/FDBLuceneIndexFailureTest.java +++ b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/FDBLuceneIndexFailureTest.java @@ -192,10 +192,11 @@ void repartitionGroupedTestWithExceptionMapping(boolean useLegacyAsyncToSync, bo } // run partitioning without failure - make sure the index is still in good shape + timer.reset(); explicitMergeIndex(index, contextProps, schemaSetup, false, 0); try (FDBRecordContext context = openContext(contextProps)) { schemaSetup.accept(context); - assertEquals(2, getCounter(context, LuceneEvents.Counts.LUCENE_REPARTITION_CALLS).getCount()); + assertEquals(1, getCounter(context, LuceneEvents.Counts.LUCENE_REPARTITION_CALLS).getCount()); } partitionInfos = getPartitionMeta(index, groupingKey, contextProps, schemaSetup); // It should first move 6 from the most-recent to a new, older partition, then move 6 again into a partition diff --git a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java index 7fbe62d057..9cc1e8a025 100644 --- a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java +++ b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java @@ -461,14 +461,18 @@ void flakyMerge(boolean isGrouped, return oldAsyncToSyncTimeout == null ? Duration.ofDays(1L) : oldAsyncToSyncTimeout.apply(wait); } }; - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 20; i++) { fdb.setAsyncToSyncTimeout(asyncToSyncTimeout); waitCounts.set(i); boolean success = false; try { LOGGER.info(KeyValueLogMessage.of("Merge started", "iteration", i)); - explicitMergeIndex(dataModel.index, contextProps, dataModel.schemaSetup); + try (FDBRecordContext context = openContext(contextProps)) { + FDBRecordStore recordStore = Objects.requireNonNull(dataModel.schemaSetup.apply(context)); + final CompletableFuture future = recordStore.getIndexMaintainer(Objects.requireNonNull(dataModel.index)).mergeIndex(); + fdb.asyncToSync(timer, FDBStoreTimer.Waits.WAIT_ONLINE_MERGE_INDEX, future); + } LOGGER.info(KeyValueLogMessage.of("Merge completed", "iteration", i)); assertFalse(requireFailure && i < 15, i + " merge should have failed");