Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

package com.apple.foundationdb.record.provider.foundationdb;

import com.apple.foundationdb.FDBError;
import com.apple.foundationdb.FDBException;
import com.apple.foundationdb.MutationType;
import com.apple.foundationdb.annotation.API;
Expand Down Expand Up @@ -59,7 +58,6 @@
import java.time.DateTimeException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -1213,22 +1211,6 @@ protected static <T> T findException(@Nullable Throwable ex, Class<T> classT) {
}
return null;
}

protected static boolean shouldLessenWork(@Nullable FDBException ex) {
// These error codes represent a list of errors that can occur if there is too much work to be done
// in a single transaction.
if (ex == null) {
return false;
}
final Set<Integer> lessenWorkCodes = new HashSet<>(Arrays.asList(
FDBError.TIMED_OUT.code(),
FDBError.TRANSACTION_TOO_OLD.code(),
FDBError.NOT_COMMITTED.code(),
FDBError.TRANSACTION_TIMED_OUT.code(),
FDBError.COMMIT_READ_INCOMPLETE.code(),
FDBError.TRANSACTION_TOO_LARGE.code()));
return lessenWorkCodes.contains(ex.getCode());
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -148,9 +150,8 @@
// merges. Not perfect, but as long as it's rare the impact should be minimal.

mergeControl.mergeHadFailed(); // report to adjust stats
final FDBException ex = IndexingBase.findException(e, FDBException.class);
final IndexDeferredMaintenanceControl.LastStep lastStep = mergeControl.getLastStep();
if (!IndexingBase.shouldLessenWork(ex)) {
if (shouldAbort(e)) {
giveUpMerging(mergeControl, e);
}
switch (lastStep) {
Expand All @@ -175,6 +176,31 @@
return AsyncUtil.READY_TRUE; // and retry
}

private boolean shouldAbort(@Nullable Throwable e) {
if (e == null) {
return true;
}
// Expecting AsyncToSyncTimeoutException or an instance of TimeoutException. However, cannot
// refer to AsyncToSyncTimeoutException without creating a lucene dependency
// TODO: remove this kludge

Check warning on line 185 in fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingMerger.java

View check run for this annotation

fdb.teamscale.io / Teamscale | Findings

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingMerger.java#L185

TODO: remove this kludge https://fdb.teamscale.io/findings/details/foundationdb-fdb-record-layer?t=FORK_MR%2F3732%2Fjjezra%2Fmerger_retry_by_default%3AHEAD&id=38228D63267E2070D6B2FF80FD8FCC8F
Set<Throwable> seenSet = Collections.newSetFromMap(new IdentityHashMap<>());
for (Throwable t = e;
t != null && !seenSet.contains(t);
t = t.getCause()) {
if (t.getClass().getCanonicalName().contains("Timeout")) {
return false;
}
seenSet.add(t);
}

final FDBException ex = IndexingBase.findException(e, FDBException.class);
if (ex != null) {
return false;
}
// TODO: return true if the exception is clearly not retriable

Check warning on line 200 in fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingMerger.java

View check run for this annotation

fdb.teamscale.io / Teamscale | Findings

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingMerger.java#L200

TODO: return true if the exception is clearly not retriable https://fdb.teamscale.io/findings/details/foundationdb-fdb-record-layer?t=FORK_MR%2F3732%2Fjjezra%2Fmerger_retry_by_default%3AHEAD&id=8B69489D49D660B961E9007845B5895F
return true;
}

private void handleRepartitioningFailure(final IndexDeferredMaintenanceControl mergeControl, Throwable e) {
repartitionDocumentCount = mergeControl.getRepartitionDocumentCount();
if (repartitionDocumentCount == -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package com.apple.foundationdb.record.provider.foundationdb;

import com.apple.foundationdb.FDBError;
import com.apple.foundationdb.FDBException;
import com.apple.foundationdb.annotation.API;
import com.apple.foundationdb.async.AsyncUtil;
Expand Down Expand Up @@ -133,7 +134,7 @@ boolean mayRetryAfterHandlingException(@Nullable FDBException fdbException,
@Nullable List<Object> additionalLogMessageKeyValues,
int currTries,
final boolean adjustLimits) {
if (currTries >= common.config.getMaxRetries() || !IndexingBase.shouldLessenWork(fdbException)) {
if (currTries >= common.config.getMaxRetries() || !shouldLessenWork(fdbException)) {
// Here: should not retry or no more retries. There is no real need to handle limits.
return false;
}
Expand All @@ -144,6 +145,22 @@ boolean mayRetryAfterHandlingException(@Nullable FDBException fdbException,
return true;
}

private static boolean shouldLessenWork(@Nullable FDBException ex) {
// These error codes represent a list of errors that can occur if there is too much work to be done
// in a single transaction.
if (ex == null) {
return false;
}
final Set<Integer> lessenWorkCodes = new HashSet<>(Arrays.asList(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This set can be declared as a static instance - no need to instantiate every time

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wonder - wouldn't the JVM do the right thing here? It seems like an obvious optimization and I know that other languages are performing this optimization (and much more).

FDBError.TIMED_OUT.code(),
FDBError.TRANSACTION_TOO_OLD.code(),
FDBError.NOT_COMMITTED.code(),
FDBError.TRANSACTION_TIMED_OUT.code(),
FDBError.COMMIT_READ_INCOMPLETE.code(),
FDBError.TRANSACTION_TOO_LARGE.code()));
return lessenWorkCodes.contains(ex.getCode());
}

void decreaseLimit(@Nonnull FDBException fdbException,
@Nullable List<Object> additionalLogMessageKeyValues) {
// TODO: decrease the limit only for certain errors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@
import com.apple.foundationdb.record.provider.foundationdb.properties.RecordLayerPropertyStorage;
import com.apple.foundationdb.record.query.QueryToKeyMatcher;
import com.apple.foundationdb.tuple.Tuple;
import com.apple.test.BooleanSource;
import com.apple.test.Tags;
import com.google.auto.service.AutoService;
import com.google.protobuf.Message;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -58,6 +60,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
Expand All @@ -67,6 +70,7 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Tests specifically of {@link OnlineIndexer#mergeIndex()}.
Expand Down Expand Up @@ -197,6 +201,186 @@ void testMergeTimeout() {
assertEquals(repeat(0, mergeLimits.size()), repartitionLimits);
}

/**
* Test that a RuntimeException (non-FDB, non-timeout) causes the merger to abort immediately.
*/
@Test
void testNonRetriableExceptionAborts() {
final String indexType = "nonRetriableExceptionIndex";
AtomicInteger attemptCount = new AtomicInteger(0);

TestFactory.register(indexType, state -> {
adjustMergeControl(state);
attemptCount.incrementAndGet();

// Throw a RuntimeException that is not retriable
final CompletableFuture<Void> future = new CompletableFuture<>();
future.completeExceptionally(new IllegalStateException("Non-retriable error"));
return future;
});

final FDBRecordStore.Builder storeBuilder = createStore(indexType);
try (OnlineIndexer indexer = OnlineIndexer.newBuilder()
.setRecordStoreBuilder(storeBuilder)
.setTargetIndexesByName(List.of(INDEX_NAME))
.setMaxAttempts(10)
.build()) {
Assertions.assertThrows(IllegalStateException.class, indexer::mergeIndex);
}

// Should only attempt once, no retries
assertEquals(1, attemptCount.get());
}

/**
* Test that a TimeoutException causes retry behavior (not abort).
*/
@ParameterizedTest
@BooleanSource
void testTimeoutExceptionRetries(boolean customTimeout) {
final String indexType = "timeoutExceptionIndex";
final String exceptionMessage = "my timeout";
AtomicInteger attemptCount = new AtomicInteger(0);

TestFactory.register(indexType, state -> {
adjustMergeControl(state);

attemptCount.incrementAndGet();

// Throw a TimeoutException (not FDB timeout)
final CompletableFuture<Void> future = new CompletableFuture<>();
future.completeExceptionally(
customTimeout ?
new CustomOperationTimeoutException(exceptionMessage) :
new java.util.concurrent.TimeoutException(exceptionMessage));
return future;
});

final FDBRecordStore.Builder storeBuilder = createStore(indexType);
try (OnlineIndexer indexer = OnlineIndexer.newBuilder()
.setRecordStoreBuilder(storeBuilder)
.setTargetIndexesByName(List.of(INDEX_NAME))
.setMaxAttempts(5)
.build()) {
Exception thrownException = Assertions.assertThrows(Exception.class, indexer::mergeIndex);
// Assert that the timeout exception is in the cause chain
var timeoutCause =
customTimeout ?
findCause(thrownException, CustomOperationTimeoutException.class) :
findCause(thrownException, TimeoutException.class);
Assertions.assertNotNull(timeoutCause);
Assertions.assertEquals(exceptionMessage, timeoutCause.getMessage());
}
// Assert multiple retries
assertTrue(1 < attemptCount.get());
}

private static void adjustMergeControl(final IndexMaintainerState state) {
final IndexDeferredMaintenanceControl mergeControl = state.store.getIndexDeferredMaintenanceControl();
mergeControl.setLastStep(IndexDeferredMaintenanceControl.LastStep.MERGE);
if (mergeControl.getMergesLimit() == 0) {
mergeControl.setMergesTried(10);
mergeControl.setMergesFound(10);
} else {
mergeControl.setMergesTried(mergeControl.getMergesLimit());
mergeControl.setMergesFound(10);
}
}

private static class CustomOperationTimeoutException extends RuntimeException {
private static final long serialVersionUID = -7034897190745766777L;

public CustomOperationTimeoutException(String message) {
super(message);
}
}

private static <T extends Throwable> T findCause(Throwable throwable, Class<T> causeType) {
Throwable current = throwable;
while (current != null) {
if (causeType.isInstance(current)) {
return causeType.cast(current);
}
current = current.getCause();
}
return null;
}

/**
* Test that an FDBException wrapped in another exception still triggers retry behavior.
*/
@Test
void testWrappedFDBExceptionRetries() {
final String indexType = "wrappedFdbExceptionIndex";
AtomicInteger attemptCount = new AtomicInteger(0);

TestFactory.register(indexType, state -> {
adjustMergeControl(state);

attemptCount.incrementAndGet();

// Wrap FDBException in multiple layers to test deep unwrapping
final CompletableFuture<Void> future = new CompletableFuture<>();
FDBException fdbEx = new FDBException("Transaction too old", FDBError.TRANSACTION_TOO_OLD.code());
RuntimeException wrapper = new RuntimeException("Wrapper level 1",
new IllegalStateException("Wrapper level 2", fdbEx));
future.completeExceptionally(wrapper);
return future;
});

final FDBRecordStore.Builder storeBuilder = createStore(indexType);
try (OnlineIndexer indexer = OnlineIndexer.newBuilder()
.setRecordStoreBuilder(storeBuilder)
.setTargetIndexesByName(List.of(INDEX_NAME))
.setMaxAttempts(5)
.build()) {

Exception thrownException = Assertions.assertThrows(Exception.class, indexer::mergeIndex);

// Assert that FDBException is in the cause chain
FDBException fdbCause = findCause(thrownException, FDBException.class);
Assertions.assertNotNull(fdbCause);
assertEquals(FDBError.TRANSACTION_TOO_OLD.code(), fdbCause.getCode());
}

// Assert multiple retries
assertTrue(1 < attemptCount.get());
}

/**
* Test that a non-retriable exception during REPARTITION phase causes immediate abort.
*/
@Test
void testNonRetriableExceptionDuringRepartitionAborts() {
final String indexType = "nonRetriableRepartitionIndex";
AtomicInteger attemptCount = new AtomicInteger(0);

TestFactory.register(indexType, state -> {
final IndexDeferredMaintenanceControl mergeControl = state.store.getIndexDeferredMaintenanceControl();
mergeControl.setLastStep(IndexDeferredMaintenanceControl.LastStep.REPARTITION);
mergeControl.setRepartitionDocumentCount(20);

attemptCount.incrementAndGet();

// Throw a non-retriable exception during repartition
final CompletableFuture<Void> future = new CompletableFuture<>();
future.completeExceptionally(new NullPointerException("Unexpected null during repartition"));
return future;
});

final FDBRecordStore.Builder storeBuilder = createStore(indexType);
try (OnlineIndexer indexer = OnlineIndexer.newBuilder()
.setRecordStoreBuilder(storeBuilder)
.setTargetIndexesByName(List.of(INDEX_NAME))
.setMaxAttempts(10)
.build()) {
Assertions.assertThrows(NullPointerException.class, indexer::mergeIndex);
}

// Should only attempt once, no retries
assertEquals(1, attemptCount.get());
}

@Nonnull
private FDBRecordStore.Builder createStore(@Nonnull final String indexType) {
Index index = new Index(INDEX_NAME, Key.Expressions.field("num_value_2"),
Expand Down Expand Up @@ -333,7 +517,15 @@ public CompletableFuture<Void> mergeIndex() {
@Nonnull
@Override
public Iterable<String> getIndexTypes() {
return List.of("repartitionTimeoutIndex", "mergeTimeoutIndex", "mergeLimitedIndex");
return List.of(
"repartitionTimeoutIndex",
"mergeLimitedIndex",
"nonRetriableExceptionIndex",
"wrappedFdbExceptionIndex",
"mergeTimeoutIndex",
"timeoutExceptionIndex",
"nonRetriableRepartitionIndex"
);
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,11 @@ void repartitionGroupedTestWithExceptionMapping(boolean useLegacyAsyncToSync, bo
}

// run partitioning without failure - make sure the index is still in good shape
timer.reset();
explicitMergeIndex(index, contextProps, schemaSetup, false, 0);
try (FDBRecordContext context = openContext(contextProps)) {
schemaSetup.accept(context);
assertEquals(2, getCounter(context, LuceneEvents.Counts.LUCENE_REPARTITION_CALLS).getCount());
assertEquals(1, getCounter(context, LuceneEvents.Counts.LUCENE_REPARTITION_CALLS).getCount());
}
partitionInfos = getPartitionMeta(index, groupingKey, contextProps, schemaSetup);
// It should first move 6 from the most-recent to a new, older partition, then move 6 again into a partition
Expand Down
Loading