diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepair.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepair.java index 3e114200fd..42b9fbe260 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepair.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepair.java @@ -27,6 +27,7 @@ import com.apple.foundationdb.record.logging.LogMessageKeys; import com.apple.foundationdb.record.provider.foundationdb.FDBDatabase; import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStore; +import com.apple.foundationdb.record.provider.foundationdb.FormatVersion; import com.apple.foundationdb.record.provider.foundationdb.runners.throttled.CursorFactory; import com.apple.foundationdb.record.provider.foundationdb.runners.throttled.ThrottledRetryingIterator; import com.apple.foundationdb.tuple.Tuple; @@ -96,15 +97,18 @@ public enum ValidationKind { RECORD_VALUE, RECORD_VALUE_AND_VERSION } private final ValidationKind validationKind; @Nonnull private final ThrottledRetryingIterator throttledIterator; + private final boolean allowRepair; - protected RecordRepair(@Nonnull final Builder config) { + protected RecordRepair(@Nonnull final Builder config, boolean allowRepair) { this.database = config.database; this.storeBuilder = config.getStoreBuilder(); this.validationKind = config.getValidationKind(); ThrottledRetryingIterator.Builder iteratorBuilder = ThrottledRetryingIterator.builder(database, cursorFactory(), this::handleOneItem) .withMdcContext(MDC.getCopyOfContextMap()); - throttledIterator = configureThrottlingIterator(iteratorBuilder, config).build(); + this.allowRepair = allowRepair; + // This will also ensure the transaction only commits when needed + throttledIterator = configureThrottlingIterator(iteratorBuilder, config, allowRepair).build(); } /** @@ -171,7 +175,7 @@ protected CompletableFuture validateInternal(@Nonnull final }); } - private ThrottledRetryingIterator.Builder configureThrottlingIterator(ThrottledRetryingIterator.Builder builder, Builder config) { + private ThrottledRetryingIterator.Builder configureThrottlingIterator(ThrottledRetryingIterator.Builder builder, Builder config, boolean allowRepair) { return builder .withTransactionInitNotification(this::logStartTransaction) .withTransactionSuccessNotification(this::logCommitTransaction) @@ -179,7 +183,8 @@ private ThrottledRetryingIterator.Builder configureThrottlingIterator(Thr .withMaxRecordsDeletesPerTransaction(config.getMaxRecordDeletesPerTransaction()) .withMaxRecordsScannedPerSec(config.getMaxRecordScannedPerSec()) .withMaxRecordsDeletesPerSec(config.getMaxRecordDeletesPerSec()) - .withNumOfRetries(config.getNumOfRetries()); + .withNumOfRetries(config.getNumOfRetries()) + .withCommitWhenDone(allowRepair); } @SuppressWarnings("PMD.UnusedFormalParameter") @@ -191,7 +196,8 @@ private void logStartTransaction(ThrottledRetryingIterator.QuotaManager quotaMan private void logCommitTransaction(ThrottledRetryingIterator.QuotaManager quotaManager) { if (logger.isDebugEnabled()) { - logger.debug(KeyValueLogMessage.of("RecordRepairRunner: transaction committed", + String message = allowRepair ? "RecordRepairRunner: transaction committed" : "RecordRepairRunner: transaction ended"; + logger.debug(KeyValueLogMessage.of(message, LogMessageKeys.RECORDS_SCANNED, quotaManager.getScannedCount(), LogMessageKeys.RECORDS_DELETED, quotaManager.getDeletesCount())); } @@ -215,6 +221,8 @@ public static class Builder { private int maxRecordScannedPerSec = 0; private int maxRecordDeletesPerSec = 1000; private int numOfRetries = 4; + private int userVersion; + private @Nullable FormatVersion minimumPossibleFormatVersion; /** * Constructor. @@ -236,6 +244,7 @@ public RecordRepairStatsRunner buildStatsRunner() { /** * Finalize the build and create a repair runner. + * @param allowRepair whether to repair the found issues (TRUE) or run in read-only mode (FALSE) * @return the newly created repair runner */ public RecordRepairValidateRunner buildRepairRunner(boolean allowRepair) { @@ -329,6 +338,23 @@ public Builder withNumOfRetries(final int numOfRetries) { return this; } + /** + * Set the store header repair parameters. + * If set, the runner will try to repair the store header (See {@link FDBRecordStore.Builder#repairMissingHeader(int, FormatVersion)}) + * as part of the repair operation in case the store fails to open. + * If the runner is running in dry run mode (repair not allowed) then the operation will be rolled back once the run + * is complete, making no change to the header. + * @param userVersion the user version for the header repair + * @param minimumPossibleFormatVersion the minimum store format version to use for the repair + * Default: null minimumPossibleFormatVersion will not attempt to repair the header + * @return this builder + */ + public Builder withHeaderRepairParameters(int userVersion, @Nullable FormatVersion minimumPossibleFormatVersion) { + this.userVersion = userVersion; + this.minimumPossibleFormatVersion = minimumPossibleFormatVersion; + return this; + } + @Nonnull public FDBDatabase getDatabase() { return database; @@ -336,7 +362,12 @@ public FDBDatabase getDatabase() { @Nonnull public FDBRecordStore.Builder getStoreBuilder() { - return storeBuilder; + if (minimumPossibleFormatVersion != null) { + // override the store builder to repair the header if necessary + return new StoreBuilderWithRepair(storeBuilder, userVersion, minimumPossibleFormatVersion); + } else { + return storeBuilder; + } } @Nonnull diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepairStatsRunner.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepairStatsRunner.java index c7c82728a0..921a0205ef 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepairStatsRunner.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepairStatsRunner.java @@ -38,7 +38,8 @@ public class RecordRepairStatsRunner extends RecordRepair { private final RepairStatsResults statsResult; RecordRepairStatsRunner(@Nonnull final Builder config) { - super(config); + // stats runner never commits a transaction + super(config, false); statsResult = new RepairStatsResults(); } diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepairValidateRunner.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepairValidateRunner.java index c9b8c32771..5cfcb05268 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepairValidateRunner.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepairValidateRunner.java @@ -49,7 +49,7 @@ public class RecordRepairValidateRunner extends RecordRepair { private final AtomicBoolean earlyReturn; RecordRepairValidateRunner(@Nonnull final Builder config, boolean allowRepair) { - super(config); + super(config, allowRepair); this.allowRepair = allowRepair; this.maxResultsReturned = config.getMaxResultsReturned(); diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/StoreBuilderWithRepair.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/StoreBuilderWithRepair.java new file mode 100644 index 0000000000..c89bdc95e0 --- /dev/null +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/StoreBuilderWithRepair.java @@ -0,0 +1,68 @@ +/* + * StoreBuilderWithRepair.java + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2015-2025 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apple.foundationdb.record.provider.foundationdb.recordrepair; + +import com.apple.foundationdb.annotation.API; +import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStore; +import com.apple.foundationdb.record.provider.foundationdb.FormatVersion; +import com.apple.foundationdb.record.util.pair.NonnullPair; + +import javax.annotation.Nonnull; +import java.util.concurrent.CompletableFuture; + +/** + * A flavor of {@link FDBRecordStore.Builder} that can handle the case where the store cannot be opened. + * In case the given store builder fails to open the store due to a missing header, the {@link FDBRecordStore.Builder#repairMissingHeader(int, FormatVersion)} + * method is called and the repaired store is returned. + */ +@API(API.Status.INTERNAL) +public class StoreBuilderWithRepair extends FDBRecordStore.Builder { + private final int userVersion; + private final FormatVersion minimumPossibleFormatVersion; + + /** + * Constructor. + * @param other the source store builder to delegate to + * @param userVersion the userVersion to use for repairing the header if necessary + * @param minimumPossibleFormatVersion the minimumPossibleFormatVersion to use if necessary + */ + public StoreBuilderWithRepair(@Nonnull FDBRecordStore.Builder other, + final int userVersion, + @Nonnull FormatVersion minimumPossibleFormatVersion) { + super(other); + this.userVersion = userVersion; + this.minimumPossibleFormatVersion = minimumPossibleFormatVersion; + } + + /** + * Override the {@link FDBRecordStore.Builder#openAsync()} method to add support for repairing the header. + * In case the store fails to be opened normally, try to repair it given the provided repair + * parameters. + * + * @return a future that will contain the opened store if successful + */ + @Nonnull + @Override + public CompletableFuture openAsync() { + return repairMissingHeader(userVersion, minimumPossibleFormatVersion) + .thenApply(NonnullPair::getRight); + } +} diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/TransactionalRunner.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/TransactionalRunner.java index 9f1c52cb08..bb0ae43d58 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/TransactionalRunner.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/TransactionalRunner.java @@ -98,11 +98,37 @@ public TransactionalRunner(@Nonnull FDBDatabase database, @SuppressWarnings({"PMD.CloseResource", "PMD.UseTryWithResources"}) public CompletableFuture runAsync(final boolean clearWeakReadSemantics, @Nonnull Function> runnable) { + return runAsync(clearWeakReadSemantics, true, runnable); + } + + /** + * A flavor of the {@link #runAsync(boolean, Function)} method that supports read-only transactions. + * @param clearWeakReadSemantics whether to clear the {@link FDBRecordContextConfig#getWeakReadSemantics()} before + * creating the transaction. These should be cleared if retrying a transaction, particularly in response to a + * conflict, because reusing the old read version would just cause it to re-conflict. + * @param commitWhenDone if FALSE the transaction will not be committed. If TRUE, behaves the same as described in {@link #runAsync(boolean, Function)} + * @param runnable some code to run that uses an {@link FDBRecordContext} + * @param the type of the value returned by the future + * @return a future containing the result of the runnable, if successfully committed. + * Note: the future will not be {@code null}, but if the runnable returns a future containing {@code null} then + * so will the future returned here. + */ + @Nonnull + @SuppressWarnings({"PMD.CloseResource", "PMD.UseTryWithResources"}) + public CompletableFuture runAsync(final boolean clearWeakReadSemantics, + boolean commitWhenDone, + @Nonnull Function> runnable) { FDBRecordContext context = openContext(clearWeakReadSemantics); boolean returnedFuture = false; try { CompletableFuture future = runnable.apply(context) - .thenCompose((T val) -> context.commitAsync().thenApply(vignore -> val)); + .thenCompose((T val) -> { + if (commitWhenDone) { + return context.commitAsync().thenApply(vignore -> val); + } else { + return CompletableFuture.completedFuture(val); + } + }); returnedFuture = true; return future.whenComplete((result, exception) -> context.close()); } finally { diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledRetryingIterator.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledRetryingIterator.java index 66ed060a4d..a892a74283 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledRetryingIterator.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledRetryingIterator.java @@ -92,6 +92,7 @@ public class ThrottledRetryingIterator implements AutoCloseable { @Nullable private final Consumer transactionInitNotification; private final int numOfRetries; + private final boolean commitWhenDone; private boolean closed = false; /** Starting time of the current/most-recent transaction. */ @@ -117,6 +118,7 @@ public ThrottledRetryingIterator(Builder builder) { this.transactionInitNotification = builder.transactionInitNotification; this.cursorRowsLimit = 0; this.numOfRetries = builder.numOfRetries; + this.commitWhenDone = builder.commitWhenDone; futureManager = new FutureAutoClose(); } @@ -175,7 +177,7 @@ private CompletableFuture> iterateOneRange(FDBRecordStore. QuotaManager singleIterationQuotaManager) { AtomicReference> cont = new AtomicReference<>(); - return transactionalRunner.runAsync(true, transaction -> { + return transactionalRunner.runAsync(true, commitWhenDone, transaction -> { // this layer returns last cursor result singleIterationQuotaManager.init(); @@ -418,6 +420,7 @@ public static class Builder { private int maxRecordScannedPerSec; private int maxRecordDeletesPerSec; private int numOfRetries; + private boolean commitWhenDone; private Builder(FDBDatabase database, FDBRecordContextConfig.Builder contextConfigBuilder, CursorFactory cursorCreator, ItemHandler singleItemHandler) { // Mandatory fields are set in the constructor. Everything else is optional. @@ -431,6 +434,7 @@ private Builder(FDBDatabase database, FDBRecordContextConfig.Builder contextConf this.maxRecordScannedPerSec = 0; this.maxRecordDeletesPerSec = 0; this.numOfRetries = NUMBER_OF_RETRIES; + this.commitWhenDone = false; } /** @@ -520,6 +524,19 @@ public Builder withNumOfRetries(int numOfRetries) { return this; } + /** + * Set whether to commit the transaction when done. + * Setting this to TRUE will commit every transaction created before creating a new one. Setting to FALSE will + * roll back the transactions. + * Defaults to FALSE. + * @param commitWhenDone whether to commit or roll back the transactions created + * @return this builder + */ + public Builder withCommitWhenDone(boolean commitWhenDone) { + this.commitWhenDone = commitWhenDone; + return this; + } + /** * Set the MDC context for the runner/executor. * This MDC context will be carried out into the runner and executor and will allow them to pass that down to diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordValidateAndRepairTest.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordValidateAndRepairTest.java index 191bc67c32..0392d0d22f 100644 --- a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordValidateAndRepairTest.java +++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordValidateAndRepairTest.java @@ -23,17 +23,21 @@ import com.apple.foundationdb.record.RecordMetaData; import com.apple.foundationdb.record.RecordMetaDataBuilder; import com.apple.foundationdb.record.RecordMetaDataProto; +import com.apple.foundationdb.record.RecordMetaDataProvider; import com.apple.foundationdb.record.ScanProperties; import com.apple.foundationdb.record.TestRecords1Proto; import com.apple.foundationdb.record.TupleRange; import com.apple.foundationdb.record.provider.foundationdb.FDBDatabaseRunner; import com.apple.foundationdb.record.provider.foundationdb.FDBRecordContext; import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStore; +import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStoreKeyspace; import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStoreTestBase; import com.apple.foundationdb.record.provider.foundationdb.FDBStoredRecord; import com.apple.foundationdb.record.provider.foundationdb.FormatVersion; +import com.apple.foundationdb.record.provider.foundationdb.RecordStoreNoInfoAndNotEmptyException; import com.apple.foundationdb.record.provider.foundationdb.SplitHelper; import com.apple.foundationdb.tuple.Tuple; +import com.apple.test.BooleanSource; import com.apple.test.ParameterizedTestUtils; import com.google.protobuf.Message; import org.assertj.core.api.Assertions; @@ -93,6 +97,49 @@ void testValidateRecordsNoIssue(boolean splitLongRecords, FormatVersion formatVe validateNormalScan(hook, formatVersion, NUM_RECORDS, storeVersions); } + @ParameterizedTest + @BooleanSource({"allowRepair", "repairHeader"}) + void testCorruptStoreHeaderNoCorruptRecords(final boolean allowRepair, final boolean repairHeader) throws Exception { + final boolean splitLongRecords = true; + final FormatVersion storeVersion = FormatVersion.SAVE_VERSION_WITH_RECORD; + final RecordMetaDataHook hook = ValidationTestUtils.getRecordMetaDataHook(splitLongRecords, true); + saveRecords(splitLongRecords, storeVersion, hook); + + FDBRecordStore.Builder storeBuilder; + try (FDBRecordContext context = openContext()) { + final FDBRecordStore store = openSimpleRecordStore(context, hook, storeVersion); + storeBuilder = store.asBuilder(); + } + clearStoreHeader(simpleMetaData(hook)); + + RecordRepair.Builder builder = RecordRepair.builder(fdb, storeBuilder) + .withValidationKind(RecordRepair.ValidationKind.RECORD_VALUE_AND_VERSION); + if (repairHeader) { + // This will allow the runner to repair the header before repairing records + builder = builder.withHeaderRepairParameters(1, storeVersion); + } + // Run validation and repair + try (RecordRepairValidateRunner runner = builder.buildRepairRunner(allowRepair)) { + RepairValidationResults repairResults = runner.run().join(); + if (repairHeader) { + ValidationTestUtils.assertCompleteResults(repairResults, NUM_RECORDS); + // Verify records: all is OK. + ValidationTestUtils.assertNoInvalidResults(repairResults.getInvalidResults()); + } else { + Assertions.assertThat(repairResults.getCaughtException()).hasCauseInstanceOf(RecordStoreNoInfoAndNotEmptyException.class); + } + } + + if (repairHeader && allowRepair) { + validateNormalScan(hook, storeVersion, NUM_RECORDS, true); + } else { + try (FDBRecordContext context = openContext()) { + Assertions.assertThatThrownBy(() -> openSimpleRecordStore(context, hook, storeVersion)) + .isInstanceOf(RecordStoreNoInfoAndNotEmptyException.class); + } + } + } + public static Stream splitNumberFormatVersion() { return ParameterizedTestUtils.cartesianProduct( Stream.of(0, 1, 2, 3), @@ -742,7 +789,6 @@ private List> saveRecords(int initialId, int totalRecor return result; } - private void validateNormalScan(final RecordMetaDataHook hook, final FormatVersion formatVersion, final int numRecords, Boolean hasVersion) throws Exception { // Load the records again to make sure they are all there try (FDBRecordContext context = openContext()) { @@ -758,4 +804,12 @@ private void validateNormalScan(final RecordMetaDataHook hook, final FormatVersi } } } + + private void clearStoreHeader(final RecordMetaDataProvider metaData) { + try (FDBRecordContext context = openContext()) { + recordStore = getStoreBuilder(context, metaData, path).createOrOpen(); + context.ensureActive().clear(recordStore.getSubspace().pack(FDBRecordStoreKeyspace.STORE_INFO.key())); + commit(context); + } + } } diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledIteratorTest.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledIteratorTest.java index ab913268f0..c4fd292eee 100644 --- a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledIteratorTest.java +++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledIteratorTest.java @@ -32,7 +32,10 @@ import com.apple.foundationdb.record.provider.foundationdb.FDBRecordContext; import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStore; import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStoreTestBase; +import com.apple.foundationdb.record.provider.foundationdb.FDBStoredRecord; import com.apple.foundationdb.tuple.Tuple; +import com.apple.test.BooleanSource; +import com.google.protobuf.Message; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -524,12 +527,7 @@ void testWithRealRecords() throws Exception { // A test with saved records, to see that future handling works final int numRecords = 50; List itemsScanned = new ArrayList<>(numRecords); - - final CursorFactory cursorFactory = (store, lastResult, rowLimit) -> { - final byte[] continuation = lastResult == null ? null : lastResult.getContinuation().toBytes(); - final ScanProperties scanProperties = ScanProperties.FORWARD_SCAN.with(executeProperties -> executeProperties.setReturnedRowLimit(rowLimit)); - return store.scanRecordKeys(continuation, scanProperties); - }; + final CursorFactory cursorFactory = createCursorFactory(); final ItemHandler itemHandler = (store, item, quotaManager) -> { return store.loadRecordAsync(item.get()).thenApply(rec -> { @@ -582,6 +580,62 @@ void testWithRealRecords() throws Exception { assertThat(itemsScanned).isEqualTo(IntStream.range(0, numRecords).boxed().collect(Collectors.toList())); } + @ParameterizedTest + @BooleanSource + void testRollBackTransaction(boolean commitWhenDone) throws Exception { + final int numRecords = 50; + final CursorFactory cursorFactory = createCursorFactory(); + + final ItemHandler itemHandler = (store, item, quotaManager) -> { + // mark records as deleted so that the 10 max deletions per transaction will force multiple transactions + quotaManager.deleteCountInc(); + // Actually trying to delete the records here so that we can verify that the transaction was not committed + // as all the records remain + return store.deleteRecordAsync(item.get()).thenApply(ignore -> null); + }; + + try (FDBRecordContext context = openContext()) { + openSimpleRecordStore(context); + for (int i = 0; i < numRecords; i++) { + final TestRecords1Proto.MySimpleRecord rec = TestRecords1Proto.MySimpleRecord.newBuilder() + .setRecNo(i) + .setStrValueIndexed("Some text") + .setNumValue3Indexed(1415 + i * 7) + .build(); + recordStore.saveRecord(rec); + } + commit(context); + } + + try (ThrottledRetryingIterator iterator = ThrottledRetryingIterator + .builder(fdb, cursorFactory, itemHandler) + .withNumOfRetries(2) + .withMaxRecordsDeletesPerTransaction(10) // ensure we create multiple transactions + .withCommitWhenDone(commitWhenDone) + .build()) { + CompletableFuture iterateAll; + try (FDBRecordContext context = openContext()) { + openSimpleRecordStore(context); + iterateAll = iterator.iterateAll(recordStore.asBuilder()); + } + iterateAll.join(); + } + + try (FDBRecordContext context = openContext()) { + openSimpleRecordStore(context); + final ScanProperties scanProperties = ScanProperties.FORWARD_SCAN; + final RecordCursor> cursor = recordStore.scanRecords(null, scanProperties); + final List> records = cursor.asList().get(); + if (commitWhenDone) { + // all records deleted + assertThat(records).isEmpty(); + } else { + // Transaction rolled back - no records deleted + assertThat(records).hasSize(50); + } + } + } + @Test void testLateCompleteFutures() throws Exception { // A test that completes the first future outside the transaction @@ -822,6 +876,15 @@ private CursorFactory listCursor(List items, AtomicInteger limitRef) { }; } + @Nonnull + private static CursorFactory createCursorFactory() { + return (store, lastResult, rowLimit) -> { + final byte[] continuation = lastResult == null ? null : lastResult.getContinuation().toBytes(); + final ScanProperties scanProperties = ScanProperties.FORWARD_SCAN.with(executeProperties -> executeProperties.setReturnedRowLimit(rowLimit)); + return store.scanRecordKeys(continuation, scanProperties); + }; + } + private CompletableFuture futureFailure() { return CompletableFuture.failedFuture(new RuntimeException("intentionally failed while testing")); }