From ffeb8f752aa2fcb247cda398bfd6118e2d1bd3d8 Mon Sep 17 00:00:00 2001 From: ohad Date: Wed, 26 Nov 2025 19:55:28 +0200 Subject: [PATCH 1/5] - Add read only mode to transactional runner and retrying iterator - Add read only mode to repair runner - Create repair-if-necessary store builder - Add read-only and repair ro repair runners --- .../recordrepair/RecordRepair.java | 43 +++++++-- .../recordrepair/RecordRepairStatsRunner.java | 3 +- .../RecordRepairValidateRunner.java | 2 +- .../recordrepair/StoreBuilderWithRepair.java | 95 +++++++++++++++++++ .../runners/TransactionalRunner.java | 28 +++++- .../throttled/ThrottledRetryingIterator.java | 19 +++- .../RecordValidateAndRepairTest.java | 56 ++++++++++- .../throttled/ThrottledIteratorTest.java | 62 ++++++++++++ 8 files changed, 297 insertions(+), 11 deletions(-) create mode 100644 fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/StoreBuilderWithRepair.java diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepair.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepair.java index 7ca244d8c2..1829156e4d 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepair.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepair.java @@ -27,6 +27,7 @@ import com.apple.foundationdb.record.logging.LogMessageKeys; import com.apple.foundationdb.record.provider.foundationdb.FDBDatabase; import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStore; +import com.apple.foundationdb.record.provider.foundationdb.FormatVersion; import com.apple.foundationdb.record.provider.foundationdb.runners.throttled.CursorFactory; import com.apple.foundationdb.record.provider.foundationdb.runners.throttled.ThrottledRetryingIterator; import com.apple.foundationdb.tuple.Tuple; @@ -95,13 +96,16 @@ public enum ValidationKind { RECORD_VALUE, RECORD_VALUE_AND_VERSION } private final ValidationKind validationKind; @Nonnull private final ThrottledRetryingIterator throttledIterator; + private final boolean allowRepair; - protected RecordRepair(@Nonnull final Builder config) { + protected RecordRepair(@Nonnull final Builder config, boolean allowRepair) { this.database = config.database; this.storeBuilder = config.getStoreBuilder(); this.validationKind = config.getValidationKind(); ThrottledRetryingIterator.Builder iteratorBuilder = ThrottledRetryingIterator.builder(database, cursorFactory(), this::handleOneItem); - throttledIterator = configureThrottlingIterator(iteratorBuilder, config).build(); + this.allowRepair = allowRepair; + // This will also ensure the transaction only commits when needed + throttledIterator = configureThrottlingIterator(iteratorBuilder, config, allowRepair).build(); } /** @@ -168,7 +172,7 @@ protected CompletableFuture validateInternal(@Nonnull final }); } - private ThrottledRetryingIterator.Builder configureThrottlingIterator(ThrottledRetryingIterator.Builder builder, Builder config) { + private ThrottledRetryingIterator.Builder configureThrottlingIterator(ThrottledRetryingIterator.Builder builder, Builder config, boolean allowRepair) { return builder .withTransactionInitNotification(this::logStartTransaction) .withTransactionSuccessNotification(this::logCommitTransaction) @@ -176,7 +180,8 @@ private ThrottledRetryingIterator.Builder configureThrottlingIterator(Thr .withMaxRecordsDeletesPerTransaction(config.getMaxRecordDeletesPerTransaction()) .withMaxRecordsScannedPerSec(config.getMaxRecordScannedPerSec()) .withMaxRecordsDeletesPerSec(config.getMaxRecordDeletesPerSec()) - .withNumOfRetries(config.getNumOfRetries()); + .withNumOfRetries(config.getNumOfRetries()) + .withCommitWhenDone(allowRepair); } @SuppressWarnings("PMD.UnusedFormalParameter") @@ -188,7 +193,8 @@ private void logStartTransaction(ThrottledRetryingIterator.QuotaManager quotaMan private void logCommitTransaction(ThrottledRetryingIterator.QuotaManager quotaManager) { if (logger.isDebugEnabled()) { - logger.debug(KeyValueLogMessage.of("RecordRepairRunner: transaction committed", + String message = allowRepair ? "RecordRepairRunner: transaction committed" : "RecordRepairRunner: transaction ended"; + logger.debug(KeyValueLogMessage.of(message, LogMessageKeys.RECORDS_SCANNED, quotaManager.getScannedCount(), LogMessageKeys.RECORDS_DELETED, quotaManager.getDeletesCount())); } @@ -212,6 +218,8 @@ public static class Builder { private int maxRecordScannedPerSec = 0; private int maxRecordDeletesPerSec = 1000; private int numOfRetries = 4; + private int userVersion; + private @Nullable FormatVersion minimumPossibleFormatVersion; /** * Constructor. @@ -233,6 +241,7 @@ public RecordRepairStatsRunner buildStatsRunner() { /** * Finalize the build and create a repair runner. + * @param allowRepair whether to repair the found issues (TRUE) or run in read-only mode (FALSE) * @return the newly created repair runner */ public RecordRepairValidateRunner buildRepairRunner(boolean allowRepair) { @@ -326,6 +335,23 @@ public Builder withNumOfRetries(final int numOfRetries) { return this; } + /** + * Set the store header repair parameters. + * If set, the runner will try to repair the store header (See {@link FDBRecordStore.Builder#repairMissingHeader(int, FormatVersion)}) + * as part of the repair operation in case the store fails to open. + * If the runner is running in dry run mode (repair not allowed) then the operation will be rolled back once the run + * is complete, making no change to the header. + * @param userVersion the user version for the header repair + * @param minimumPossibleFormatVersion the minimum store format version to use for the repair + * Default: null minimumPossibleFormatVersion will not attempt to repair the header + * @return this builder + */ + public Builder withHeaderRepairParameters(int userVersion, @Nullable FormatVersion minimumPossibleFormatVersion) { + this.userVersion = userVersion; + this.minimumPossibleFormatVersion = minimumPossibleFormatVersion; + return this; + } + @Nonnull public FDBDatabase getDatabase() { return database; @@ -333,7 +359,12 @@ public FDBDatabase getDatabase() { @Nonnull public FDBRecordStore.Builder getStoreBuilder() { - return storeBuilder; + if (minimumPossibleFormatVersion != null) { + // override the store builder to repair the header if necessary + return new StoreBuilderWithRepair(storeBuilder, userVersion, minimumPossibleFormatVersion); + } else { + return storeBuilder; + } } @Nonnull diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepairStatsRunner.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepairStatsRunner.java index c7c82728a0..921a0205ef 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepairStatsRunner.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepairStatsRunner.java @@ -38,7 +38,8 @@ public class RecordRepairStatsRunner extends RecordRepair { private final RepairStatsResults statsResult; RecordRepairStatsRunner(@Nonnull final Builder config) { - super(config); + // stats runner never commits a transaction + super(config, false); statsResult = new RepairStatsResults(); } diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepairValidateRunner.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepairValidateRunner.java index c9b8c32771..5cfcb05268 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepairValidateRunner.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepairValidateRunner.java @@ -49,7 +49,7 @@ public class RecordRepairValidateRunner extends RecordRepair { private final AtomicBoolean earlyReturn; RecordRepairValidateRunner(@Nonnull final Builder config, boolean allowRepair) { - super(config); + super(config, allowRepair); this.allowRepair = allowRepair; this.maxResultsReturned = config.getMaxResultsReturned(); diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/StoreBuilderWithRepair.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/StoreBuilderWithRepair.java new file mode 100644 index 0000000000..8b6d201186 --- /dev/null +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/StoreBuilderWithRepair.java @@ -0,0 +1,95 @@ +/* + * StoreBuilderWithRepair.java + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2015-2025 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apple.foundationdb.record.provider.foundationdb.recordrepair; + +import com.apple.foundationdb.annotation.API; +import com.apple.foundationdb.async.AsyncUtil; +import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStore; +import com.apple.foundationdb.record.provider.foundationdb.FormatVersion; +import com.apple.foundationdb.record.provider.foundationdb.RecordStoreNoInfoAndNotEmptyException; +import com.apple.foundationdb.record.util.pair.NonnullPair; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +/** + * A flavor of {@link FDBRecordStore.Builder} that can handle the case where the store cannot be opened. + * In case the given store builder fails to open the store due to a missing header, the {@link FDBRecordStore.Builder#repairMissingHeader(int, FormatVersion)} + * method is called and the repaired store is returned. + */ +@API(API.Status.INTERNAL) +public class StoreBuilderWithRepair extends FDBRecordStore.Builder { + private final int userVersion; + private final FormatVersion minimumPossibleFormatVersion; + + /** + * Constructor. + * @param other the source store builder to delegate to + * @param userVersion the userVersion to use for repairing the header if necessary + * @param minimumPossibleFormatVersion the minimumPossibleFormatVersion to use if necessary + */ + public StoreBuilderWithRepair(@Nonnull FDBRecordStore.Builder other, + final int userVersion, + @Nonnull FormatVersion minimumPossibleFormatVersion) { + super(other); + this.userVersion = userVersion; + this.minimumPossibleFormatVersion = minimumPossibleFormatVersion; + } + + /** + * Override the {@link super#openAsync()} method to add support for repairing the header. + * In case the store fails to be opened normally, try to repair it given the provided repair + * parameters. + * + * @return a future that will contain the opened store if successful + */ + @Nonnull + @Override + public CompletableFuture openAsync() { + return AsyncUtil.composeHandle( + super.openAsync(), + (store, ex) -> { + if (ex == null) { + // succeeded in opening normally + return CompletableFuture.completedFuture(store); + } else if (isMissingStoreHeaderCaused(ex)) { + // Store header corrupt, try repairing + return repairMissingHeader(userVersion, minimumPossibleFormatVersion) + .thenApply(NonnullPair::getRight); + } else { + // Some other exception, cast to RuntimeException since this is the only one that can throws from openAsync + throw (RuntimeException)ex; + } + }); + } + + private boolean isMissingStoreHeaderCaused(Throwable ex) { + while (ex != null) { + if (ex instanceof RecordStoreNoInfoAndNotEmptyException) { + return true; + } + ex = ex.getCause(); + } + return false; + } +} diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/TransactionalRunner.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/TransactionalRunner.java index 9f1c52cb08..37e5865810 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/TransactionalRunner.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/TransactionalRunner.java @@ -98,11 +98,37 @@ public TransactionalRunner(@Nonnull FDBDatabase database, @SuppressWarnings({"PMD.CloseResource", "PMD.UseTryWithResources"}) public CompletableFuture runAsync(final boolean clearWeakReadSemantics, @Nonnull Function> runnable) { + return runAsync(clearWeakReadSemantics, runnable, true); + } + + /** + * A flavor of the {@link #runAsync(boolean, Function)} method that supports read-only transactions. + * @param clearWeakReadSemantics whether to clear the {@link FDBRecordContextConfig#getWeakReadSemantics()} before + * creating the transaction. These should be cleared if retrying a transaction, particularly in response to a + * conflict, because reusing the old read version would just cause it to re-conflict. + * @param runnable some code to run that uses an {@link FDBRecordContext} + * @param commitWhenDone if FALSE the transaction will not be committed. If TRUE, behaves the same as described in {@link #runAsync(boolean, Function)} + * @param the type of the value returned by the future + * @return a future containing the result of the runnable, if successfully committed. + * Note: the future will not be {@code null}, but if the runnable returns a future containing {@code null} then + * so will the future returned here. + */ + @Nonnull + @SuppressWarnings({"PMD.CloseResource", "PMD.UseTryWithResources"}) + public CompletableFuture runAsync(final boolean clearWeakReadSemantics, + @Nonnull Function> runnable, + boolean commitWhenDone) { FDBRecordContext context = openContext(clearWeakReadSemantics); boolean returnedFuture = false; try { CompletableFuture future = runnable.apply(context) - .thenCompose((T val) -> context.commitAsync().thenApply(vignore -> val)); + .thenCompose((T val) -> { + if (commitWhenDone) { + return context.commitAsync().thenApply(vignore -> val); + } else { + return CompletableFuture.completedFuture(val); + } + }); returnedFuture = true; return future.whenComplete((result, exception) -> context.close()); } finally { diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledRetryingIterator.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledRetryingIterator.java index b47e2ba554..32cebc5f64 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledRetryingIterator.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledRetryingIterator.java @@ -91,6 +91,7 @@ public class ThrottledRetryingIterator implements AutoCloseable { @Nullable private final Consumer transactionInitNotification; private final int numOfRetries; + private final boolean commitWhenDone; private boolean closed = false; /** Starting time of the current/most-recent transaction. */ @@ -116,6 +117,7 @@ public ThrottledRetryingIterator(Builder builder) { this.transactionInitNotification = builder.transactionInitNotification; this.cursorRowsLimit = 0; this.numOfRetries = builder.numOfRetries; + this.commitWhenDone = builder.commitWhenDone; futureManager = new FutureAutoClose(); } @@ -218,7 +220,7 @@ private CompletableFuture> iterateOneRange(FDBRecordStore. .whenComplete((r, e) -> cursor.close()); }); - }).thenApply(ignore -> cont.get()); + }, commitWhenDone).thenApply(ignore -> cont.get()); } private CompletableFuture handleSuccess(QuotaManager quotaManager) { @@ -421,6 +423,7 @@ public static class Builder { private int maxRecordScannedPerSec; private int maxRecordDeletesPerSec; private int numOfRetries; + private boolean commitWhenDone; /** * Constructor. @@ -441,6 +444,7 @@ private Builder(TransactionalRunner runner, Executor executor, ScheduledExecutor this.maxRecordScannedPerSec = 0; this.maxRecordDeletesPerSec = 0; this.numOfRetries = NUMBER_OF_RETRIES; + this.commitWhenDone = false; } private Builder(FDBDatabase database, FDBRecordContextConfig.Builder contextConfigBuilder, CursorFactory cursorCreator, ItemHandler singleItemHandler) { @@ -538,6 +542,19 @@ public Builder withNumOfRetries(int numOfRetries) { return this; } + /** + * Set whether to commit the transaction when done. + * Setting this to TRUE will commit every transaction created before creating a new one. Setting to FALSE will + * roll back the transactions. + * Defaults to FALSE. + * @param commitWhenDone whether to commit or roll back the transactions created + * @return this builder + */ + public Builder withCommitWhenDone(boolean commitWhenDone) { + this.commitWhenDone = commitWhenDone; + return this; + } + /** * Create the iterator. * @return the newly minted iterator diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordValidateAndRepairTest.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordValidateAndRepairTest.java index 191bc67c32..0392d0d22f 100644 --- a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordValidateAndRepairTest.java +++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordValidateAndRepairTest.java @@ -23,17 +23,21 @@ import com.apple.foundationdb.record.RecordMetaData; import com.apple.foundationdb.record.RecordMetaDataBuilder; import com.apple.foundationdb.record.RecordMetaDataProto; +import com.apple.foundationdb.record.RecordMetaDataProvider; import com.apple.foundationdb.record.ScanProperties; import com.apple.foundationdb.record.TestRecords1Proto; import com.apple.foundationdb.record.TupleRange; import com.apple.foundationdb.record.provider.foundationdb.FDBDatabaseRunner; import com.apple.foundationdb.record.provider.foundationdb.FDBRecordContext; import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStore; +import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStoreKeyspace; import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStoreTestBase; import com.apple.foundationdb.record.provider.foundationdb.FDBStoredRecord; import com.apple.foundationdb.record.provider.foundationdb.FormatVersion; +import com.apple.foundationdb.record.provider.foundationdb.RecordStoreNoInfoAndNotEmptyException; import com.apple.foundationdb.record.provider.foundationdb.SplitHelper; import com.apple.foundationdb.tuple.Tuple; +import com.apple.test.BooleanSource; import com.apple.test.ParameterizedTestUtils; import com.google.protobuf.Message; import org.assertj.core.api.Assertions; @@ -93,6 +97,49 @@ void testValidateRecordsNoIssue(boolean splitLongRecords, FormatVersion formatVe validateNormalScan(hook, formatVersion, NUM_RECORDS, storeVersions); } + @ParameterizedTest + @BooleanSource({"allowRepair", "repairHeader"}) + void testCorruptStoreHeaderNoCorruptRecords(final boolean allowRepair, final boolean repairHeader) throws Exception { + final boolean splitLongRecords = true; + final FormatVersion storeVersion = FormatVersion.SAVE_VERSION_WITH_RECORD; + final RecordMetaDataHook hook = ValidationTestUtils.getRecordMetaDataHook(splitLongRecords, true); + saveRecords(splitLongRecords, storeVersion, hook); + + FDBRecordStore.Builder storeBuilder; + try (FDBRecordContext context = openContext()) { + final FDBRecordStore store = openSimpleRecordStore(context, hook, storeVersion); + storeBuilder = store.asBuilder(); + } + clearStoreHeader(simpleMetaData(hook)); + + RecordRepair.Builder builder = RecordRepair.builder(fdb, storeBuilder) + .withValidationKind(RecordRepair.ValidationKind.RECORD_VALUE_AND_VERSION); + if (repairHeader) { + // This will allow the runner to repair the header before repairing records + builder = builder.withHeaderRepairParameters(1, storeVersion); + } + // Run validation and repair + try (RecordRepairValidateRunner runner = builder.buildRepairRunner(allowRepair)) { + RepairValidationResults repairResults = runner.run().join(); + if (repairHeader) { + ValidationTestUtils.assertCompleteResults(repairResults, NUM_RECORDS); + // Verify records: all is OK. + ValidationTestUtils.assertNoInvalidResults(repairResults.getInvalidResults()); + } else { + Assertions.assertThat(repairResults.getCaughtException()).hasCauseInstanceOf(RecordStoreNoInfoAndNotEmptyException.class); + } + } + + if (repairHeader && allowRepair) { + validateNormalScan(hook, storeVersion, NUM_RECORDS, true); + } else { + try (FDBRecordContext context = openContext()) { + Assertions.assertThatThrownBy(() -> openSimpleRecordStore(context, hook, storeVersion)) + .isInstanceOf(RecordStoreNoInfoAndNotEmptyException.class); + } + } + } + public static Stream splitNumberFormatVersion() { return ParameterizedTestUtils.cartesianProduct( Stream.of(0, 1, 2, 3), @@ -742,7 +789,6 @@ private List> saveRecords(int initialId, int totalRecor return result; } - private void validateNormalScan(final RecordMetaDataHook hook, final FormatVersion formatVersion, final int numRecords, Boolean hasVersion) throws Exception { // Load the records again to make sure they are all there try (FDBRecordContext context = openContext()) { @@ -758,4 +804,12 @@ private void validateNormalScan(final RecordMetaDataHook hook, final FormatVersi } } } + + private void clearStoreHeader(final RecordMetaDataProvider metaData) { + try (FDBRecordContext context = openContext()) { + recordStore = getStoreBuilder(context, metaData, path).createOrOpen(); + context.ensureActive().clear(recordStore.getSubspace().pack(FDBRecordStoreKeyspace.STORE_INFO.key())); + commit(context); + } + } } diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledIteratorTest.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledIteratorTest.java index aae40a62f0..5629fd5936 100644 --- a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledIteratorTest.java +++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledIteratorTest.java @@ -32,7 +32,10 @@ import com.apple.foundationdb.record.provider.foundationdb.FDBRecordContext; import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStore; import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStoreTestBase; +import com.apple.foundationdb.record.provider.foundationdb.FDBStoredRecord; import com.apple.foundationdb.tuple.Tuple; +import com.apple.test.BooleanSource; +import com.google.protobuf.Message; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -577,6 +580,65 @@ void testWithRealRecords() throws Exception { assertThat(itemsScanned).isEqualTo(IntStream.range(0, numRecords).boxed().collect(Collectors.toList())); } + @ParameterizedTest + @BooleanSource + void testRollBackTransaction(boolean commitWhenDone) throws Exception { + final int numRecords = 50; + + final CursorFactory cursorFactory = (store, lastResult, rowLimit) -> { + final byte[] continuation = lastResult == null ? null : lastResult.getContinuation().toBytes(); + final ScanProperties scanProperties = ScanProperties.FORWARD_SCAN.with(executeProperties -> executeProperties.setReturnedRowLimit(rowLimit)); + return store.scanRecordKeys(continuation, scanProperties); + }; + + final ItemHandler itemHandler = (store, item, quotaManager) -> { + // delete all records + quotaManager.deleteCountInc(); + return store.deleteRecordAsync(item.get()).thenApply(ignore -> null); + }; + + try (FDBRecordContext context = openContext()) { + openSimpleRecordStore(context); + for (int i = 0; i < numRecords; i++) { + final TestRecords1Proto.MySimpleRecord record = TestRecords1Proto.MySimpleRecord.newBuilder() + .setRecNo(i) + .setStrValueIndexed("Some text") + .setNumValue3Indexed(1415 + i * 7) + .build(); + recordStore.saveRecord(record); + } + commit(context); + } + + try (ThrottledRetryingIterator iterator = ThrottledRetryingIterator + .builder(fdb, cursorFactory, itemHandler) + .withNumOfRetries(2) + .withMaxRecordsDeletesPerTransaction(10) // ensure we create multiple transactions + .withCommitWhenDone(commitWhenDone) + .build()) { + CompletableFuture iterateAll; + try (FDBRecordContext context = openContext()) { + openSimpleRecordStore(context); + iterateAll = iterator.iterateAll(recordStore.asBuilder()); + } + iterateAll.join(); + } + + try (FDBRecordContext context = openContext()) { + openSimpleRecordStore(context); + final ScanProperties scanProperties = ScanProperties.FORWARD_SCAN; + final RecordCursor> cursor = recordStore.scanRecords(null, scanProperties); + final List> records = cursor.asList().get(); + if (commitWhenDone) { + // all records deleted + assertThat(records).isEmpty(); + } else { + // Transaction rolled back - no records deleted + assertThat(records).hasSize(50); + } + } + } + @Test void testLateCompleteFutures() throws Exception { // A test that completes the first future outside the transaction From 68dfaed72da7c39d7c295e4603ddf5eaf5e77a01 Mon Sep 17 00:00:00 2001 From: ohad Date: Wed, 26 Nov 2025 22:31:56 +0200 Subject: [PATCH 2/5] style --- .../foundationdb/recordrepair/StoreBuilderWithRepair.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/StoreBuilderWithRepair.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/StoreBuilderWithRepair.java index 8b6d201186..93354968bb 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/StoreBuilderWithRepair.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/StoreBuilderWithRepair.java @@ -28,9 +28,7 @@ import com.apple.foundationdb.record.util.pair.NonnullPair; import javax.annotation.Nonnull; -import javax.annotation.Nullable; import java.util.concurrent.CompletableFuture; -import java.util.function.Consumer; /** * A flavor of {@link FDBRecordStore.Builder} that can handle the case where the store cannot be opened. @@ -57,7 +55,7 @@ public StoreBuilderWithRepair(@Nonnull FDBRecordStore.Builder other, } /** - * Override the {@link super#openAsync()} method to add support for repairing the header. + * Override the {@link FDBRecordStore.Builder#openAsync()} method to add support for repairing the header. * In case the store fails to be opened normally, try to repair it given the provided repair * parameters. * From 1ba00e4f83bdcbc60ac0d2195cd896dae6375e6e Mon Sep 17 00:00:00 2001 From: ohad Date: Fri, 5 Dec 2025 16:23:04 -0500 Subject: [PATCH 3/5] PR comments --- .../recordrepair/StoreBuilderWithRepair.java | 17 ++-------- .../runners/TransactionalRunner.java | 8 ++--- .../throttled/ThrottledRetryingIterator.java | 4 +-- .../throttled/ThrottledIteratorTest.java | 31 ++++++++++--------- 4 files changed, 24 insertions(+), 36 deletions(-) diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/StoreBuilderWithRepair.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/StoreBuilderWithRepair.java index 93354968bb..41fe1cd1b6 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/StoreBuilderWithRepair.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/StoreBuilderWithRepair.java @@ -64,21 +64,8 @@ public StoreBuilderWithRepair(@Nonnull FDBRecordStore.Builder other, @Nonnull @Override public CompletableFuture openAsync() { - return AsyncUtil.composeHandle( - super.openAsync(), - (store, ex) -> { - if (ex == null) { - // succeeded in opening normally - return CompletableFuture.completedFuture(store); - } else if (isMissingStoreHeaderCaused(ex)) { - // Store header corrupt, try repairing - return repairMissingHeader(userVersion, minimumPossibleFormatVersion) - .thenApply(NonnullPair::getRight); - } else { - // Some other exception, cast to RuntimeException since this is the only one that can throws from openAsync - throw (RuntimeException)ex; - } - }); + return repairMissingHeader(userVersion, minimumPossibleFormatVersion) + .thenApply(NonnullPair::getRight); } private boolean isMissingStoreHeaderCaused(Throwable ex) { diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/TransactionalRunner.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/TransactionalRunner.java index 37e5865810..bb0ae43d58 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/TransactionalRunner.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/TransactionalRunner.java @@ -98,7 +98,7 @@ public TransactionalRunner(@Nonnull FDBDatabase database, @SuppressWarnings({"PMD.CloseResource", "PMD.UseTryWithResources"}) public CompletableFuture runAsync(final boolean clearWeakReadSemantics, @Nonnull Function> runnable) { - return runAsync(clearWeakReadSemantics, runnable, true); + return runAsync(clearWeakReadSemantics, true, runnable); } /** @@ -106,8 +106,8 @@ public CompletableFuture runAsync(final boolean clearWeakReadSemantics, * @param clearWeakReadSemantics whether to clear the {@link FDBRecordContextConfig#getWeakReadSemantics()} before * creating the transaction. These should be cleared if retrying a transaction, particularly in response to a * conflict, because reusing the old read version would just cause it to re-conflict. - * @param runnable some code to run that uses an {@link FDBRecordContext} * @param commitWhenDone if FALSE the transaction will not be committed. If TRUE, behaves the same as described in {@link #runAsync(boolean, Function)} + * @param runnable some code to run that uses an {@link FDBRecordContext} * @param the type of the value returned by the future * @return a future containing the result of the runnable, if successfully committed. * Note: the future will not be {@code null}, but if the runnable returns a future containing {@code null} then @@ -116,8 +116,8 @@ public CompletableFuture runAsync(final boolean clearWeakReadSemantics, @Nonnull @SuppressWarnings({"PMD.CloseResource", "PMD.UseTryWithResources"}) public CompletableFuture runAsync(final boolean clearWeakReadSemantics, - @Nonnull Function> runnable, - boolean commitWhenDone) { + boolean commitWhenDone, + @Nonnull Function> runnable) { FDBRecordContext context = openContext(clearWeakReadSemantics); boolean returnedFuture = false; try { diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledRetryingIterator.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledRetryingIterator.java index 32cebc5f64..988d892595 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledRetryingIterator.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledRetryingIterator.java @@ -176,7 +176,7 @@ private CompletableFuture> iterateOneRange(FDBRecordStore. QuotaManager singleIterationQuotaManager) { AtomicReference> cont = new AtomicReference<>(); - return transactionalRunner.runAsync(true, transaction -> { + return transactionalRunner.runAsync(true, commitWhenDone, transaction -> { // this layer returns last cursor result singleIterationQuotaManager.init(); @@ -220,7 +220,7 @@ private CompletableFuture> iterateOneRange(FDBRecordStore. .whenComplete((r, e) -> cursor.close()); }); - }, commitWhenDone).thenApply(ignore -> cont.get()); + }).thenApply(ignore -> cont.get()); } private CompletableFuture handleSuccess(QuotaManager quotaManager) { diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledIteratorTest.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledIteratorTest.java index 5629fd5936..3254428e1c 100644 --- a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledIteratorTest.java +++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledIteratorTest.java @@ -522,12 +522,7 @@ void testWithRealRecords() throws Exception { // A test with saved records, to see that future handling works final int numRecords = 50; List itemsScanned = new ArrayList<>(numRecords); - - final CursorFactory cursorFactory = (store, lastResult, rowLimit) -> { - final byte[] continuation = lastResult == null ? null : lastResult.getContinuation().toBytes(); - final ScanProperties scanProperties = ScanProperties.FORWARD_SCAN.with(executeProperties -> executeProperties.setReturnedRowLimit(rowLimit)); - return store.scanRecordKeys(continuation, scanProperties); - }; + final CursorFactory cursorFactory = createCursorFactory(); final ItemHandler itemHandler = (store, item, quotaManager) -> { return store.loadRecordAsync(item.get()).thenApply(rec -> { @@ -584,28 +579,25 @@ void testWithRealRecords() throws Exception { @BooleanSource void testRollBackTransaction(boolean commitWhenDone) throws Exception { final int numRecords = 50; - - final CursorFactory cursorFactory = (store, lastResult, rowLimit) -> { - final byte[] continuation = lastResult == null ? null : lastResult.getContinuation().toBytes(); - final ScanProperties scanProperties = ScanProperties.FORWARD_SCAN.with(executeProperties -> executeProperties.setReturnedRowLimit(rowLimit)); - return store.scanRecordKeys(continuation, scanProperties); - }; + final CursorFactory cursorFactory = createCursorFactory(); final ItemHandler itemHandler = (store, item, quotaManager) -> { - // delete all records + // mark records as deleted so that the 10 max deletions per transaction will force multiple transactions quotaManager.deleteCountInc(); + // Actually trying to delete the records here so that we can verify that the transaction was not committed + // as all the records remain return store.deleteRecordAsync(item.get()).thenApply(ignore -> null); }; try (FDBRecordContext context = openContext()) { openSimpleRecordStore(context); for (int i = 0; i < numRecords; i++) { - final TestRecords1Proto.MySimpleRecord record = TestRecords1Proto.MySimpleRecord.newBuilder() + final TestRecords1Proto.MySimpleRecord rec = TestRecords1Proto.MySimpleRecord.newBuilder() .setRecNo(i) .setStrValueIndexed("Some text") .setNumValue3Indexed(1415 + i * 7) .build(); - recordStore.saveRecord(record); + recordStore.saveRecord(rec); } commit(context); } @@ -825,6 +817,15 @@ private CursorFactory listCursor(List items, AtomicInteger limitRef) { }; } + @Nonnull + private static CursorFactory createCursorFactory() { + return (store, lastResult, rowLimit) -> { + final byte[] continuation = lastResult == null ? null : lastResult.getContinuation().toBytes(); + final ScanProperties scanProperties = ScanProperties.FORWARD_SCAN.with(executeProperties -> executeProperties.setReturnedRowLimit(rowLimit)); + return store.scanRecordKeys(continuation, scanProperties); + }; + } + private CompletableFuture futureFailure() { return CompletableFuture.failedFuture(new RuntimeException("intentionally failed while testing")); } From 65b8022822445d10247dec267ba0185a0d0939e3 Mon Sep 17 00:00:00 2001 From: ohad Date: Fri, 5 Dec 2025 16:26:13 -0500 Subject: [PATCH 4/5] Style --- .../foundationdb/recordrepair/StoreBuilderWithRepair.java | 1 - 1 file changed, 1 deletion(-) diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/StoreBuilderWithRepair.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/StoreBuilderWithRepair.java index 41fe1cd1b6..570ac0f27b 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/StoreBuilderWithRepair.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/StoreBuilderWithRepair.java @@ -21,7 +21,6 @@ package com.apple.foundationdb.record.provider.foundationdb.recordrepair; import com.apple.foundationdb.annotation.API; -import com.apple.foundationdb.async.AsyncUtil; import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStore; import com.apple.foundationdb.record.provider.foundationdb.FormatVersion; import com.apple.foundationdb.record.provider.foundationdb.RecordStoreNoInfoAndNotEmptyException; From 299fd82217e9aeab906c0d2582bffd394396e6f8 Mon Sep 17 00:00:00 2001 From: ohad Date: Fri, 5 Dec 2025 16:36:28 -0500 Subject: [PATCH 5/5] Remove unused method --- .../recordrepair/StoreBuilderWithRepair.java | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/StoreBuilderWithRepair.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/StoreBuilderWithRepair.java index 570ac0f27b..c89bdc95e0 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/StoreBuilderWithRepair.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/StoreBuilderWithRepair.java @@ -23,7 +23,6 @@ import com.apple.foundationdb.annotation.API; import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStore; import com.apple.foundationdb.record.provider.foundationdb.FormatVersion; -import com.apple.foundationdb.record.provider.foundationdb.RecordStoreNoInfoAndNotEmptyException; import com.apple.foundationdb.record.util.pair.NonnullPair; import javax.annotation.Nonnull; @@ -66,14 +65,4 @@ public CompletableFuture openAsync() { return repairMissingHeader(userVersion, minimumPossibleFormatVersion) .thenApply(NonnullPair::getRight); } - - private boolean isMissingStoreHeaderCaused(Throwable ex) { - while (ex != null) { - if (ex instanceof RecordStoreNoInfoAndNotEmptyException) { - return true; - } - ex = ex.getCause(); - } - return false; - } }