Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.apple.foundationdb.record.logging.LogMessageKeys;
import com.apple.foundationdb.record.provider.foundationdb.FDBDatabase;
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStore;
import com.apple.foundationdb.record.provider.foundationdb.FormatVersion;
import com.apple.foundationdb.record.provider.foundationdb.runners.throttled.CursorFactory;
import com.apple.foundationdb.record.provider.foundationdb.runners.throttled.ThrottledRetryingIterator;
import com.apple.foundationdb.tuple.Tuple;
Expand Down Expand Up @@ -96,15 +97,18 @@ public enum ValidationKind { RECORD_VALUE, RECORD_VALUE_AND_VERSION }
private final ValidationKind validationKind;
@Nonnull
private final ThrottledRetryingIterator<Tuple> throttledIterator;
private final boolean allowRepair;

protected RecordRepair(@Nonnull final Builder config) {
protected RecordRepair(@Nonnull final Builder config, boolean allowRepair) {
this.database = config.database;
this.storeBuilder = config.getStoreBuilder();
this.validationKind = config.getValidationKind();
ThrottledRetryingIterator.Builder<Tuple> iteratorBuilder =
ThrottledRetryingIterator.builder(database, cursorFactory(), this::handleOneItem)
.withMdcContext(MDC.getCopyOfContextMap());
throttledIterator = configureThrottlingIterator(iteratorBuilder, config).build();
this.allowRepair = allowRepair;
// This will also ensure the transaction only commits when needed
throttledIterator = configureThrottlingIterator(iteratorBuilder, config, allowRepair).build();
}

/**
Expand Down Expand Up @@ -171,15 +175,16 @@ protected CompletableFuture<RecordRepairResult> validateInternal(@Nonnull final
});
}

private ThrottledRetryingIterator.Builder<Tuple> configureThrottlingIterator(ThrottledRetryingIterator.Builder<Tuple> builder, Builder config) {
private ThrottledRetryingIterator.Builder<Tuple> configureThrottlingIterator(ThrottledRetryingIterator.Builder<Tuple> builder, Builder config, boolean allowRepair) {
return builder
.withTransactionInitNotification(this::logStartTransaction)
.withTransactionSuccessNotification(this::logCommitTransaction)
.withTransactionTimeQuotaMillis(config.getTransactionTimeQuotaMillis())
.withMaxRecordsDeletesPerTransaction(config.getMaxRecordDeletesPerTransaction())
.withMaxRecordsScannedPerSec(config.getMaxRecordScannedPerSec())
.withMaxRecordsDeletesPerSec(config.getMaxRecordDeletesPerSec())
.withNumOfRetries(config.getNumOfRetries());
.withNumOfRetries(config.getNumOfRetries())
.withCommitWhenDone(allowRepair);
}

@SuppressWarnings("PMD.UnusedFormalParameter")
Expand All @@ -191,7 +196,8 @@ private void logStartTransaction(ThrottledRetryingIterator.QuotaManager quotaMan

private void logCommitTransaction(ThrottledRetryingIterator.QuotaManager quotaManager) {
if (logger.isDebugEnabled()) {
logger.debug(KeyValueLogMessage.of("RecordRepairRunner: transaction committed",
String message = allowRepair ? "RecordRepairRunner: transaction committed" : "RecordRepairRunner: transaction ended";
logger.debug(KeyValueLogMessage.of(message,
LogMessageKeys.RECORDS_SCANNED, quotaManager.getScannedCount(),
LogMessageKeys.RECORDS_DELETED, quotaManager.getDeletesCount()));
}
Expand All @@ -215,6 +221,8 @@ public static class Builder {
private int maxRecordScannedPerSec = 0;
private int maxRecordDeletesPerSec = 1000;
private int numOfRetries = 4;
private int userVersion;
private @Nullable FormatVersion minimumPossibleFormatVersion;

/**
* Constructor.
Expand All @@ -236,6 +244,7 @@ public RecordRepairStatsRunner buildStatsRunner() {

/**
* Finalize the build and create a repair runner.
* @param allowRepair whether to repair the found issues (TRUE) or run in read-only mode (FALSE)
* @return the newly created repair runner
*/
public RecordRepairValidateRunner buildRepairRunner(boolean allowRepair) {
Expand Down Expand Up @@ -329,14 +338,36 @@ public Builder withNumOfRetries(final int numOfRetries) {
return this;
}

/**
* Set the store header repair parameters.
* If set, the runner will try to repair the store header (See {@link FDBRecordStore.Builder#repairMissingHeader(int, FormatVersion)})
* as part of the repair operation in case the store fails to open.
* If the runner is running in dry run mode (repair not allowed) then the operation will be rolled back once the run
* is complete, making no change to the header.
* @param userVersion the user version for the header repair
* @param minimumPossibleFormatVersion the minimum store format version to use for the repair
* Default: null minimumPossibleFormatVersion will not attempt to repair the header
* @return this builder
*/
public Builder withHeaderRepairParameters(int userVersion, @Nullable FormatVersion minimumPossibleFormatVersion) {
this.userVersion = userVersion;
this.minimumPossibleFormatVersion = minimumPossibleFormatVersion;
return this;
}

@Nonnull
public FDBDatabase getDatabase() {
return database;
}

@Nonnull
public FDBRecordStore.Builder getStoreBuilder() {
return storeBuilder;
if (minimumPossibleFormatVersion != null) {
// override the store builder to repair the header if necessary
return new StoreBuilderWithRepair(storeBuilder, userVersion, minimumPossibleFormatVersion);
} else {
return storeBuilder;
}
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public class RecordRepairStatsRunner extends RecordRepair {
private final RepairStatsResults statsResult;

RecordRepairStatsRunner(@Nonnull final Builder config) {
super(config);
// stats runner never commits a transaction
super(config, false);
statsResult = new RepairStatsResults();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class RecordRepairValidateRunner extends RecordRepair {
private final AtomicBoolean earlyReturn;

RecordRepairValidateRunner(@Nonnull final Builder config, boolean allowRepair) {
super(config);
super(config, allowRepair);
this.allowRepair = allowRepair;
this.maxResultsReturned = config.getMaxResultsReturned();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* StoreBuilderWithRepair.java
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2015-2025 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.apple.foundationdb.record.provider.foundationdb.recordrepair;

import com.apple.foundationdb.annotation.API;
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStore;
import com.apple.foundationdb.record.provider.foundationdb.FormatVersion;
import com.apple.foundationdb.record.util.pair.NonnullPair;

import javax.annotation.Nonnull;
import java.util.concurrent.CompletableFuture;

/**
* A flavor of {@link FDBRecordStore.Builder} that can handle the case where the store cannot be opened.
* In case the given store builder fails to open the store due to a missing header, the {@link FDBRecordStore.Builder#repairMissingHeader(int, FormatVersion)}
* method is called and the repaired store is returned.
*/
@API(API.Status.INTERNAL)
public class StoreBuilderWithRepair extends FDBRecordStore.Builder {
private final int userVersion;
private final FormatVersion minimumPossibleFormatVersion;

/**
* Constructor.
* @param other the source store builder to delegate to
* @param userVersion the userVersion to use for repairing the header if necessary
* @param minimumPossibleFormatVersion the minimumPossibleFormatVersion to use if necessary
*/
public StoreBuilderWithRepair(@Nonnull FDBRecordStore.Builder other,
final int userVersion,
@Nonnull FormatVersion minimumPossibleFormatVersion) {
super(other);
this.userVersion = userVersion;
this.minimumPossibleFormatVersion = minimumPossibleFormatVersion;
}

/**
* Override the {@link FDBRecordStore.Builder#openAsync()} method to add support for repairing the header.
* In case the store fails to be opened normally, try to repair it given the provided repair
* parameters.
*
* @return a future that will contain the opened store if successful
*/
@Nonnull
@Override
public CompletableFuture<FDBRecordStore> openAsync() {
return repairMissingHeader(userVersion, minimumPossibleFormatVersion)
.thenApply(NonnullPair::getRight);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,37 @@ public TransactionalRunner(@Nonnull FDBDatabase database,
@SuppressWarnings({"PMD.CloseResource", "PMD.UseTryWithResources"})
public <T> CompletableFuture<T> runAsync(final boolean clearWeakReadSemantics,
@Nonnull Function<? super FDBRecordContext, CompletableFuture<? extends T>> runnable) {
return runAsync(clearWeakReadSemantics, true, runnable);
}

/**
* A flavor of the {@link #runAsync(boolean, Function)} method that supports read-only transactions.
* @param clearWeakReadSemantics whether to clear the {@link FDBRecordContextConfig#getWeakReadSemantics()} before
* creating the transaction. These should be cleared if retrying a transaction, particularly in response to a
* conflict, because reusing the old read version would just cause it to re-conflict.
* @param commitWhenDone if FALSE the transaction will not be committed. If TRUE, behaves the same as described in {@link #runAsync(boolean, Function)}
* @param runnable some code to run that uses an {@link FDBRecordContext}
* @param <T> the type of the value returned by the future
* @return a future containing the result of the runnable, if successfully committed.
* Note: the future will not be {@code null}, but if the runnable returns a future containing {@code null} then
* so will the future returned here.
*/
@Nonnull
@SuppressWarnings({"PMD.CloseResource", "PMD.UseTryWithResources"})
public <T> CompletableFuture<T> runAsync(final boolean clearWeakReadSemantics,
boolean commitWhenDone,
@Nonnull Function<? super FDBRecordContext, CompletableFuture<? extends T>> runnable) {
FDBRecordContext context = openContext(clearWeakReadSemantics);
boolean returnedFuture = false;
try {
CompletableFuture<T> future = runnable.apply(context)
.thenCompose((T val) -> context.commitAsync().thenApply(vignore -> val));
.thenCompose((T val) -> {
if (commitWhenDone) {
return context.commitAsync().thenApply(vignore -> val);
} else {
return CompletableFuture.completedFuture(val);
}
});
returnedFuture = true;
return future.whenComplete((result, exception) -> context.close());
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public class ThrottledRetryingIterator<T> implements AutoCloseable {
@Nullable
private final Consumer<QuotaManager> transactionInitNotification;
private final int numOfRetries;
private final boolean commitWhenDone;

private boolean closed = false;
/** Starting time of the current/most-recent transaction. */
Expand All @@ -117,6 +118,7 @@ public ThrottledRetryingIterator(Builder<T> builder) {
this.transactionInitNotification = builder.transactionInitNotification;
this.cursorRowsLimit = 0;
this.numOfRetries = builder.numOfRetries;
this.commitWhenDone = builder.commitWhenDone;
futureManager = new FutureAutoClose();
}

Expand Down Expand Up @@ -175,7 +177,7 @@ private CompletableFuture<RecordCursorResult<T>> iterateOneRange(FDBRecordStore.
QuotaManager singleIterationQuotaManager) {
AtomicReference<RecordCursorResult<T>> cont = new AtomicReference<>();

return transactionalRunner.runAsync(true, transaction -> {
return transactionalRunner.runAsync(true, commitWhenDone, transaction -> {
// this layer returns last cursor result
singleIterationQuotaManager.init();

Expand Down Expand Up @@ -418,6 +420,7 @@ public static class Builder<T> {
private int maxRecordScannedPerSec;
private int maxRecordDeletesPerSec;
private int numOfRetries;
private boolean commitWhenDone;

private Builder(FDBDatabase database, FDBRecordContextConfig.Builder contextConfigBuilder, CursorFactory<T> cursorCreator, ItemHandler<T> singleItemHandler) {
// Mandatory fields are set in the constructor. Everything else is optional.
Expand All @@ -431,6 +434,7 @@ private Builder(FDBDatabase database, FDBRecordContextConfig.Builder contextConf
this.maxRecordScannedPerSec = 0;
this.maxRecordDeletesPerSec = 0;
this.numOfRetries = NUMBER_OF_RETRIES;
this.commitWhenDone = false;
}

/**
Expand Down Expand Up @@ -520,6 +524,19 @@ public Builder<T> withNumOfRetries(int numOfRetries) {
return this;
}

/**
* Set whether to commit the transaction when done.
* Setting this to TRUE will commit every transaction created before creating a new one. Setting to FALSE will
* roll back the transactions.
* Defaults to FALSE.
* @param commitWhenDone whether to commit or roll back the transactions created
* @return this builder
*/
public Builder<T> withCommitWhenDone(boolean commitWhenDone) {
this.commitWhenDone = commitWhenDone;
return this;
}

/**
* Set the MDC context for the runner/executor.
* This MDC context will be carried out into the runner and executor and will allow them to pass that down to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,21 @@
import com.apple.foundationdb.record.RecordMetaData;
import com.apple.foundationdb.record.RecordMetaDataBuilder;
import com.apple.foundationdb.record.RecordMetaDataProto;
import com.apple.foundationdb.record.RecordMetaDataProvider;
import com.apple.foundationdb.record.ScanProperties;
import com.apple.foundationdb.record.TestRecords1Proto;
import com.apple.foundationdb.record.TupleRange;
import com.apple.foundationdb.record.provider.foundationdb.FDBDatabaseRunner;
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordContext;
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStore;
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStoreKeyspace;
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStoreTestBase;
import com.apple.foundationdb.record.provider.foundationdb.FDBStoredRecord;
import com.apple.foundationdb.record.provider.foundationdb.FormatVersion;
import com.apple.foundationdb.record.provider.foundationdb.RecordStoreNoInfoAndNotEmptyException;
import com.apple.foundationdb.record.provider.foundationdb.SplitHelper;
import com.apple.foundationdb.tuple.Tuple;
import com.apple.test.BooleanSource;
import com.apple.test.ParameterizedTestUtils;
import com.google.protobuf.Message;
import org.assertj.core.api.Assertions;
Expand Down Expand Up @@ -93,6 +97,49 @@ void testValidateRecordsNoIssue(boolean splitLongRecords, FormatVersion formatVe
validateNormalScan(hook, formatVersion, NUM_RECORDS, storeVersions);
}

@ParameterizedTest
@BooleanSource({"allowRepair", "repairHeader"})
void testCorruptStoreHeaderNoCorruptRecords(final boolean allowRepair, final boolean repairHeader) throws Exception {
final boolean splitLongRecords = true;
final FormatVersion storeVersion = FormatVersion.SAVE_VERSION_WITH_RECORD;
final RecordMetaDataHook hook = ValidationTestUtils.getRecordMetaDataHook(splitLongRecords, true);
saveRecords(splitLongRecords, storeVersion, hook);

FDBRecordStore.Builder storeBuilder;
try (FDBRecordContext context = openContext()) {
final FDBRecordStore store = openSimpleRecordStore(context, hook, storeVersion);
storeBuilder = store.asBuilder();
}
clearStoreHeader(simpleMetaData(hook));

RecordRepair.Builder builder = RecordRepair.builder(fdb, storeBuilder)
.withValidationKind(RecordRepair.ValidationKind.RECORD_VALUE_AND_VERSION);
if (repairHeader) {
// This will allow the runner to repair the header before repairing records
builder = builder.withHeaderRepairParameters(1, storeVersion);
}
// Run validation and repair
try (RecordRepairValidateRunner runner = builder.buildRepairRunner(allowRepair)) {
RepairValidationResults repairResults = runner.run().join();
if (repairHeader) {
ValidationTestUtils.assertCompleteResults(repairResults, NUM_RECORDS);
// Verify records: all is OK.
ValidationTestUtils.assertNoInvalidResults(repairResults.getInvalidResults());
} else {
Assertions.assertThat(repairResults.getCaughtException()).hasCauseInstanceOf(RecordStoreNoInfoAndNotEmptyException.class);
}
}

if (repairHeader && allowRepair) {
validateNormalScan(hook, storeVersion, NUM_RECORDS, true);
} else {
try (FDBRecordContext context = openContext()) {
Assertions.assertThatThrownBy(() -> openSimpleRecordStore(context, hook, storeVersion))
.isInstanceOf(RecordStoreNoInfoAndNotEmptyException.class);
}
}
}

public static Stream<Arguments> splitNumberFormatVersion() {
return ParameterizedTestUtils.cartesianProduct(
Stream.of(0, 1, 2, 3),
Expand Down Expand Up @@ -742,7 +789,6 @@ private List<FDBStoredRecord<Message>> saveRecords(int initialId, int totalRecor
return result;
}


private void validateNormalScan(final RecordMetaDataHook hook, final FormatVersion formatVersion, final int numRecords, Boolean hasVersion) throws Exception {
// Load the records again to make sure they are all there
try (FDBRecordContext context = openContext()) {
Expand All @@ -758,4 +804,12 @@ private void validateNormalScan(final RecordMetaDataHook hook, final FormatVersi
}
}
}

private void clearStoreHeader(final RecordMetaDataProvider metaData) {
try (FDBRecordContext context = openContext()) {
recordStore = getStoreBuilder(context, metaData, path).createOrOpen();
context.ensureActive().clear(recordStore.getSubspace().pack(FDBRecordStoreKeyspace.STORE_INFO.key()));
commit(context);
}
}
}
Loading
Loading