Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ static LogSegment loadSegment(RaftStorage storage, File file, LogSegmentStartEnd
final CorruptionPolicy corruptionPolicy = CorruptionPolicy.get(storage, RaftStorage::getLogCorruptionPolicy);
final boolean isOpen = startEnd.isOpen();
final int entryCount = readSegmentFile(file, startEnd, maxOpSize, corruptionPolicy, raftLogMetrics, entry -> {
segment.append(keepEntryInCache || isOpen, entry, Op.LOAD_SEGMENT_FILE);
segment.append(keepEntryInCache || isOpen, entry, Op.LOAD_SEGMENT_FILE, true);
if (logConsumer != null) {
logConsumer.accept(entry);
}
Expand Down Expand Up @@ -353,24 +353,17 @@ CorruptionPolicy getLogCorruptionPolicy() {
return CorruptionPolicy.get(storage, RaftStorage::getLogCorruptionPolicy);
}

void appendToOpenSegment(LogEntryProto entry, Op op) {
void appendToOpenSegment(LogEntryProto entry, Op op, boolean verifyEntryIndex) {
Preconditions.assertTrue(isOpen(), "The log segment %s is not open for append", this);
append(true, entry, op);
append(true, entry, op, verifyEntryIndex);
}

public static final String APPEND_RECORD = LogSegment.class.getSimpleName() + ".append";
private void append(boolean keepEntryInCache, LogEntryProto entry, Op op) {
private void append(boolean keepEntryInCache, LogEntryProto entry, Op op, boolean verifyEntryIndex) {
Objects.requireNonNull(entry, "entry == null");
final LogRecord currentLast = records.getLast();
if (currentLast == null) {
Preconditions.assertTrue(entry.getIndex() == startIndex,
"gap between start index %s and first entry to append %s",
startIndex, entry.getIndex());
} else {
Preconditions.assertTrue(entry.getIndex() == currentLast.getTermIndex().getIndex() + 1,
"gap between entries %s and %s", entry.getIndex(), currentLast.getTermIndex().getIndex());
if (verifyEntryIndex) {
verifyEntryIndex(entry.getIndex());
}

final LogRecord record = new LogRecord(totalFileSize, entry);
if (keepEntryInCache) {
// It is important to put the entry into the cache before appending the
Expand All @@ -385,6 +378,18 @@ private void append(boolean keepEntryInCache, LogEntryProto entry, Op op) {
endIndex = entry.getIndex();
}

void verifyEntryIndex(long entryIndex) {
final LogRecord currentLast = records.getLast();
if (currentLast == null) {
Preconditions.assertTrue(entryIndex == startIndex,
"gap between start index %s and first entry to append %s",
startIndex, entryIndex);
} else {
Preconditions.assertTrue(entryIndex == currentLast.getTermIndex().getIndex() + 1,
"gap between entries %s and %s", entryIndex, currentLast.getTermIndex().getIndex());
}
}

LogEntryProto getEntryFromCache(TermIndex ti) {
return entryCache.get(ti);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ protected CompletableFuture<Long> appendEntryImpl(LogEntryProto entry, Transacti
// If the entry has state machine data, then the entry should be inserted
// to statemachine first and then to the cache. Not following the order
// will leave a spurious entry in the cache.
cache.verifyAppendEntryIndex(entry);
CompletableFuture<Long> writeFuture =
fileLogWorker.writeLogEntry(entry, context).getFuture();
if (stateMachineCachingEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,11 +622,17 @@ TermIndex getLastTermIndex() {
}
}

void verifyAppendEntryIndex(LogEntryProto entry) {
// SegmentedRaftLog does the segment creation/rolling work.
Objects.requireNonNull(openSegment, "openSegment == null");
openSegment.verifyEntryIndex(entry.getIndex());
}

void appendEntry(LogEntryProto entry, LogSegment.Op op) {
// SegmentedRaftLog does the segment creation/rolling work. Here we just
// simply append the entry into the open segment.
Objects.requireNonNull(openSegment, "openSegment == null");
openSegment.appendToOpenSegment(entry, op);
openSegment.appendToOpenSegment(entry, op, false);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public void testAppendEntries() throws Exception {
SimpleOperation op = new SimpleOperation("m" + i);
LogEntryProto entry = LogProtoUtils.toLogEntryProto(op.getLogEntryContent(), term, i++ + start);
size += getEntrySize(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, true);
}

Assertions.assertTrue(segment.getTotalFileSize() >= max);
Expand Down Expand Up @@ -238,18 +238,18 @@ public void testAppendWithGap() throws Exception {
final StateMachineLogEntryProto m = op.getLogEntryContent();
try {
LogEntryProto entry = LogProtoUtils.toLogEntryProto(m, 0, 1001);
segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, true);
Assertions.fail("should fail since the entry's index needs to be 1000");
} catch (IllegalStateException e) {
// the exception is expected.
}

LogEntryProto entry = LogProtoUtils.toLogEntryProto(m, 0, 1000);
segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, true);

try {
entry = LogProtoUtils.toLogEntryProto(m, 0, 1002);
segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, true);
Assertions.fail("should fail since the entry's index needs to be 1001");
} catch (IllegalStateException e) {
// the exception is expected.
Expand All @@ -264,7 +264,7 @@ public void testTruncate() throws Exception {
for (int i = 0; i < 100; i++) {
LogEntryProto entry = LogProtoUtils.toLogEntryProto(
new SimpleOperation("m" + i).getLogEntryContent(), term, i + start);
segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, true);
}

// truncate an open segment (remove 1080~1099)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
Expand All @@ -83,6 +82,8 @@
import static java.lang.Boolean.FALSE;
import static java.lang.Boolean.TRUE;
import static org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogWorker.RUN_WORKER;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.params.provider.Arguments.arguments;

Expand Down Expand Up @@ -231,7 +232,7 @@ public void testLoadLogSegments(Boolean useAsyncFlush, Boolean smSyncFlush) thro
// check if log entries are loaded correctly
for (LogEntryProto e : entries) {
LogEntryProto entry = raftLog.get(e.getIndex());
Assertions.assertEquals(e, entry);
assertEquals(e, entry);
}

final LogEntryHeader[] termIndices = raftLog.getEntries(0, 500);
Expand All @@ -245,7 +246,7 @@ public void testLoadLogSegments(Boolean useAsyncFlush, Boolean smSyncFlush) thro
})
.toArray(LogEntryProto[]::new);
Assertions.assertArrayEquals(entries, entriesFromLog);
Assertions.assertEquals(entries[entries.length - 1], getLastEntry(raftLog));
assertEquals(entries[entries.length - 1], getLastEntry(raftLog));

final RatisMetricRegistry metricRegistryForLogWorker = RaftLogMetricsBase.createRegistry(MEMBER_ID);

Expand Down Expand Up @@ -400,7 +401,7 @@ public void testAppendAndRoll(Boolean useAsyncFlush, Boolean smSyncFlush) throws
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
// check if the raft log is correct
checkEntries(raftLog, entries, 0, entries.size());
Assertions.assertEquals(9, raftLog.getRaftLogCache().getNumOfSegments());
assertEquals(9, raftLog.getRaftLogCache().getNumOfSegments());
}
}

Expand Down Expand Up @@ -466,12 +467,12 @@ public void testPurgeAfterAppendEntry(Boolean useAsyncFlush, Boolean smSyncFlush
if(!tasksAdded.await(FIVE_SECONDS.getDuration(), FIVE_SECONDS.getUnit())) {
throw new TimeoutException();
}
Assertions.assertEquals(entries.size() + 1, tasksCount.get());
assertEquals(entries.size() + 1, tasksCount.get());

// check if the purge task is executed
final Long purged = purgeFuture.get().get();
LOG.info("purgeIndex = {}, purged = {}", endIndexOfClosedSegment, purged);
Assertions.assertEquals(endIndexOfClosedSegment, raftLog.getRaftLogCache().getStartIndex());
assertEquals(endIndexOfClosedSegment, raftLog.getRaftLogCache().getStartIndex());

// check if the appendEntry futures are done
JavaUtils.allOf(appendFutures).get(FIVE_SECONDS.getDuration(), FIVE_SECONDS.getUnit());
Expand Down Expand Up @@ -515,7 +516,7 @@ private void testTruncate(List<LogEntryProto> entries, long fromIndex)
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
// check if the raft log is correct
if (fromIndex > 0) {
Assertions.assertEquals(entries.get((int) (fromIndex - 1)),
assertEquals(entries.get((int) (fromIndex - 1)),
getLastEntry(raftLog));
} else {
Assertions.assertNull(raftLog.getLastEntryTermIndex());
Expand All @@ -529,7 +530,7 @@ private void checkEntries(RaftLog raftLog, List<LogEntryProto> expected,
if (size > 0) {
for (int i = offset; i < size + offset; i++) {
LogEntryProto entry = raftLog.get(expected.get(i).getIndex());
Assertions.assertEquals(expected.get(i), entry);
assertEquals(expected.get(i), entry);
}
final LogEntryHeader[] termIndices = raftLog.getEntries(
expected.get(offset).getIndex(),
Expand Down Expand Up @@ -637,7 +638,7 @@ private void purgeAndVerify(int startTerm, int endTerm, int segmentSize, int pur
final CompletableFuture<Long> f = raftLog.purge(purgeIndex);
final Long purged = f.get();
LOG.info("purgeIndex = {}, purged = {}", purgeIndex, purged);
Assertions.assertEquals(expectedIndex, raftLog.getRaftLogCache().getStartIndex());
assertEquals(expectedIndex, raftLog.getRaftLogCache().getStartIndex());
}
}

Expand Down Expand Up @@ -681,9 +682,9 @@ public void testAppendEntriesWithInconsistency(Boolean useAsyncFlush, Boolean sm
checkFailedEntries(entries, 650, retryCache);
checkEntries(raftLog, entries, 0, 650);
checkEntries(raftLog, newEntries, 100, 100);
Assertions.assertEquals(newEntries.get(newEntries.size() - 1),
assertEquals(newEntries.get(newEntries.size() - 1),
getLastEntry(raftLog));
Assertions.assertEquals(newEntries.get(newEntries.size() - 1).getIndex(),
assertEquals(newEntries.get(newEntries.size() - 1).getIndex(),
raftLog.getFlushIndex());
}

Expand All @@ -693,13 +694,57 @@ public void testAppendEntriesWithInconsistency(Boolean useAsyncFlush, Boolean sm
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
checkEntries(raftLog, entries, 0, 650);
checkEntries(raftLog, newEntries, 100, 100);
Assertions.assertEquals(newEntries.get(newEntries.size() - 1),
assertEquals(newEntries.get(newEntries.size() - 1),
getLastEntry(raftLog));
Assertions.assertEquals(newEntries.get(newEntries.size() - 1).getIndex(),
assertEquals(newEntries.get(newEntries.size() - 1).getIndex(),
raftLog.getFlushIndex());

SegmentedRaftLogCache cache = raftLog.getRaftLogCache();
Assertions.assertEquals(5, cache.getNumOfSegments());
assertEquals(5, cache.getNumOfSegments());
}
}

@ParameterizedTest
@MethodSource("data")
public void testAppendEntriesWithGap(Boolean useAsyncFlush, Boolean smSyncFlush) throws Exception {
RaftServerConfigKeys.Log.setAsyncFlushEnabled(properties, useAsyncFlush);
RaftServerConfigKeys.Log.StateMachineData.setSync(properties, smSyncFlush);
// prepare the log for truncation
List<SegmentRange> ranges = prepareRanges(0, 5, 200, 0);
List<LogEntryProto> entries = prepareLogEntries(ranges, null);

final RetryCache retryCache = RetryCacheTestUtil.createRetryCache();
try (SegmentedRaftLog raftLog =
RetryCacheTestUtil.newSegmentedRaftLog(MEMBER_ID, retryCache, storage, properties)) {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
entries.forEach(entry -> RetryCacheTestUtil.createEntry(retryCache, entry));
// append entries to the raftlog
entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
}

long lastIndex = ranges.get(ranges.size() - 1).end;
long snapshotIndex = lastIndex + 100;
LogEntryProto entryProto = prepareLogEntry(4, snapshotIndex + 1, null, false);
final LongSupplier getSnapshotIndexFromStateMachine = new LongSupplier() {
@Override
public long getAsLong() {
return snapshotIndex;
}
};
try (SegmentedRaftLog raftLog = newSegmentedRaftLog(getSnapshotIndexFromStateMachine)) {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
// Assert the wrapped exception
IllegalStateException exception = assertThrows(IllegalStateException.class,
() -> raftLog.appendEntry(entryProto));
// Assert the original cause
assertTrue(exception.getMessage().contains("gap between entries"));
}

// load the raftlog again and check
try (SegmentedRaftLog raftLog =
RetryCacheTestUtil.newSegmentedRaftLog(MEMBER_ID, retryCache, storage, properties)) {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
Assertions.assertEquals(lastIndex, raftLog.getRaftLogCache().getEndIndex());
}
}

Expand Down Expand Up @@ -795,7 +840,7 @@ public void notifyLogFailed(Throwable cause, LogEntryProto entry) {
// SegmentedRaftLogWorker should catch TimeoutIOException
CompletableFuture<Long> f = raftLog.appendEntry(entry);
// Wait for async writeStateMachineData to finish
ex = Assertions.assertThrows(ExecutionException.class, f::get);
ex = assertThrows(ExecutionException.class, f::get);
}
Assertions.assertSame(LifeCycle.State.PAUSED, sm.getLifeCycleState());
Assertions.assertInstanceOf(TimeoutIOException.class, ex.getCause());
Expand All @@ -815,9 +860,9 @@ static Thread startAppendEntryThread(RaftLog raftLog, LogEntryProto entry) {

void assertIndices(RaftLog raftLog, long expectedFlushIndex, long expectedNextIndex) {
LOG.info("assert expectedFlushIndex={}", expectedFlushIndex);
Assertions.assertEquals(expectedFlushIndex, raftLog.getFlushIndex());
assertEquals(expectedFlushIndex, raftLog.getFlushIndex());
LOG.info("assert expectedNextIndex={}", expectedNextIndex);
Assertions.assertEquals(expectedNextIndex, raftLog.getNextIndex());
assertEquals(expectedNextIndex, raftLog.getNextIndex());
}

void assertIndicesMultipleAttempts(RaftLog raftLog, long expectedFlushIndex, long expectedNextIndex)
Expand Down Expand Up @@ -938,10 +983,10 @@ public void testConcurrentGetDuringAppend() throws Exception {

// When the reader's get() call completed, the append was fully finished,
// so it should have returned the correct entry.
Assertions.assertEquals(newEntry.getIndex(), raftLog.getLastEntryTermIndex().getIndex());
assertEquals(newEntry.getIndex(), raftLog.getLastEntryTermIndex().getIndex());
readEntry.set(raftLog.get(newEntry.getIndex()));
Assertions.assertNotNull(readEntry.get());
Assertions.assertEquals(newEntry, readEntry.get());
assertEquals(newEntry, readEntry.get());
} finally {
CodeInjectionForTesting.remove(LogSegment.APPEND_RECORD);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ private LogSegment prepareLogSegment(long start, long end, boolean isOpen) {
for (long i = start; i <= end; i++) {
SimpleOperation m = new SimpleOperation("m" + i);
LogEntryProto entry = LogProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
s.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
s.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, true);
}
if (!isOpen) {
s.close();
Expand Down
Loading