From c455b2f719cc56d8318c1d19dc4b00a9a2edd47a Mon Sep 17 00:00:00 2001 From: Sammi Chen Date: Fri, 24 Apr 2026 13:57:02 +0800 Subject: [PATCH 1/3] RATIS-2507. Fix java.lang.IllegalStateException: gap between entries --- .../server/raftlog/segmented/LogSegment.java | 29 ++++--- .../raftlog/segmented/SegmentedRaftLog.java | 1 + .../segmented/SegmentedRaftLogCache.java | 6 ++ .../segmented/TestSegmentedRaftLog.java | 83 ++++++++++++++----- 4 files changed, 88 insertions(+), 31 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java index e9cb2e50f9..644d9b2bf2 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java @@ -202,7 +202,7 @@ static LogSegment loadSegment(RaftStorage storage, File file, LogSegmentStartEnd final CorruptionPolicy corruptionPolicy = CorruptionPolicy.get(storage, RaftStorage::getLogCorruptionPolicy); final boolean isOpen = startEnd.isOpen(); final int entryCount = readSegmentFile(file, startEnd, maxOpSize, corruptionPolicy, raftLogMetrics, entry -> { - segment.append(keepEntryInCache || isOpen, entry, Op.LOAD_SEGMENT_FILE); + segment.append(keepEntryInCache || isOpen, entry, Op.LOAD_SEGMENT_FILE, true); if (logConsumer != null) { logConsumer.accept(entry); } @@ -355,22 +355,15 @@ CorruptionPolicy getLogCorruptionPolicy() { void appendToOpenSegment(LogEntryProto entry, Op op) { Preconditions.assertTrue(isOpen(), "The log segment %s is not open for append", this); - append(true, entry, op); + append(true, entry, op, false); } public static final String APPEND_RECORD = LogSegment.class.getSimpleName() + ".append"; - private void append(boolean keepEntryInCache, LogEntryProto entry, Op op) { + private void append(boolean keepEntryInCache, LogEntryProto entry, Op op, boolean verifyEntryIndex) { Objects.requireNonNull(entry, "entry == null"); - final LogRecord currentLast = records.getLast(); - if (currentLast == null) { - Preconditions.assertTrue(entry.getIndex() == startIndex, - "gap between start index %s and first entry to append %s", - startIndex, entry.getIndex()); - } else { - Preconditions.assertTrue(entry.getIndex() == currentLast.getTermIndex().getIndex() + 1, - "gap between entries %s and %s", entry.getIndex(), currentLast.getTermIndex().getIndex()); + if (verifyEntryIndex) { + verifyEntryIndex(entry.getIndex()); } - final LogRecord record = new LogRecord(totalFileSize, entry); if (keepEntryInCache) { // It is important to put the entry into the cache before appending the @@ -385,6 +378,18 @@ private void append(boolean keepEntryInCache, LogEntryProto entry, Op op) { endIndex = entry.getIndex(); } + void verifyEntryIndex(long entryIndex) { + final LogRecord currentLast = records.getLast(); + if (currentLast == null) { + Preconditions.assertTrue(entryIndex == startIndex, + "gap between start index %s and first entry to append %s", + startIndex, entryIndex); + } else { + Preconditions.assertTrue(entryIndex == currentLast.getTermIndex().getIndex() + 1, + "gap between entries %s and %s", entryIndex, currentLast.getTermIndex().getIndex()); + } + } + LogEntryProto getEntryFromCache(TermIndex ti) { return entryCache.get(ti); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java index 6bcc3f8e1c..a6ea6e3caf 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java @@ -430,6 +430,7 @@ protected CompletableFuture appendEntryImpl(LogEntryProto entry, Transacti // If the entry has state machine data, then the entry should be inserted // to statemachine first and then to the cache. Not following the order // will leave a spurious entry in the cache. + cache.verifyAppendEntryIndex(entry); CompletableFuture writeFuture = fileLogWorker.writeLogEntry(entry, context).getFuture(); if (stateMachineCachingEnabled) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java index 46acbcc3d8..2e23796af2 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java @@ -622,6 +622,12 @@ TermIndex getLastTermIndex() { } } + void verifyAppendEntryIndex(LogEntryProto entry) { + // SegmentedRaftLog does the segment creation/rolling work. + Objects.requireNonNull(openSegment, "openSegment == null"); + openSegment.verifyEntryIndex(entry.getIndex()); + } + void appendEntry(LogEntryProto entry, LogSegment.Op op) { // SegmentedRaftLog does the segment creation/rolling work. Here we just // simply append the entry into the open segment. diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java index 43aafc8967..181d1fa430 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java @@ -67,7 +67,6 @@ import java.util.function.LongSupplier; import java.util.function.Supplier; import java.util.stream.Stream; -import java.util.concurrent.atomic.AtomicBoolean; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -83,6 +82,8 @@ import static java.lang.Boolean.FALSE; import static java.lang.Boolean.TRUE; import static org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogWorker.RUN_WORKER; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.params.provider.Arguments.arguments; @@ -231,7 +232,7 @@ public void testLoadLogSegments(Boolean useAsyncFlush, Boolean smSyncFlush) thro // check if log entries are loaded correctly for (LogEntryProto e : entries) { LogEntryProto entry = raftLog.get(e.getIndex()); - Assertions.assertEquals(e, entry); + assertEquals(e, entry); } final LogEntryHeader[] termIndices = raftLog.getEntries(0, 500); @@ -245,7 +246,7 @@ public void testLoadLogSegments(Boolean useAsyncFlush, Boolean smSyncFlush) thro }) .toArray(LogEntryProto[]::new); Assertions.assertArrayEquals(entries, entriesFromLog); - Assertions.assertEquals(entries[entries.length - 1], getLastEntry(raftLog)); + assertEquals(entries[entries.length - 1], getLastEntry(raftLog)); final RatisMetricRegistry metricRegistryForLogWorker = RaftLogMetricsBase.createRegistry(MEMBER_ID); @@ -400,7 +401,7 @@ public void testAppendAndRoll(Boolean useAsyncFlush, Boolean smSyncFlush) throws raftLog.open(RaftLog.INVALID_LOG_INDEX, null); // check if the raft log is correct checkEntries(raftLog, entries, 0, entries.size()); - Assertions.assertEquals(9, raftLog.getRaftLogCache().getNumOfSegments()); + assertEquals(9, raftLog.getRaftLogCache().getNumOfSegments()); } } @@ -466,12 +467,12 @@ public void testPurgeAfterAppendEntry(Boolean useAsyncFlush, Boolean smSyncFlush if(!tasksAdded.await(FIVE_SECONDS.getDuration(), FIVE_SECONDS.getUnit())) { throw new TimeoutException(); } - Assertions.assertEquals(entries.size() + 1, tasksCount.get()); + assertEquals(entries.size() + 1, tasksCount.get()); // check if the purge task is executed final Long purged = purgeFuture.get().get(); LOG.info("purgeIndex = {}, purged = {}", endIndexOfClosedSegment, purged); - Assertions.assertEquals(endIndexOfClosedSegment, raftLog.getRaftLogCache().getStartIndex()); + assertEquals(endIndexOfClosedSegment, raftLog.getRaftLogCache().getStartIndex()); // check if the appendEntry futures are done JavaUtils.allOf(appendFutures).get(FIVE_SECONDS.getDuration(), FIVE_SECONDS.getUnit()); @@ -515,7 +516,7 @@ private void testTruncate(List entries, long fromIndex) raftLog.open(RaftLog.INVALID_LOG_INDEX, null); // check if the raft log is correct if (fromIndex > 0) { - Assertions.assertEquals(entries.get((int) (fromIndex - 1)), + assertEquals(entries.get((int) (fromIndex - 1)), getLastEntry(raftLog)); } else { Assertions.assertNull(raftLog.getLastEntryTermIndex()); @@ -529,7 +530,7 @@ private void checkEntries(RaftLog raftLog, List expected, if (size > 0) { for (int i = offset; i < size + offset; i++) { LogEntryProto entry = raftLog.get(expected.get(i).getIndex()); - Assertions.assertEquals(expected.get(i), entry); + assertEquals(expected.get(i), entry); } final LogEntryHeader[] termIndices = raftLog.getEntries( expected.get(offset).getIndex(), @@ -637,7 +638,7 @@ private void purgeAndVerify(int startTerm, int endTerm, int segmentSize, int pur final CompletableFuture f = raftLog.purge(purgeIndex); final Long purged = f.get(); LOG.info("purgeIndex = {}, purged = {}", purgeIndex, purged); - Assertions.assertEquals(expectedIndex, raftLog.getRaftLogCache().getStartIndex()); + assertEquals(expectedIndex, raftLog.getRaftLogCache().getStartIndex()); } } @@ -681,9 +682,9 @@ public void testAppendEntriesWithInconsistency(Boolean useAsyncFlush, Boolean sm checkFailedEntries(entries, 650, retryCache); checkEntries(raftLog, entries, 0, 650); checkEntries(raftLog, newEntries, 100, 100); - Assertions.assertEquals(newEntries.get(newEntries.size() - 1), + assertEquals(newEntries.get(newEntries.size() - 1), getLastEntry(raftLog)); - Assertions.assertEquals(newEntries.get(newEntries.size() - 1).getIndex(), + assertEquals(newEntries.get(newEntries.size() - 1).getIndex(), raftLog.getFlushIndex()); } @@ -693,13 +694,57 @@ public void testAppendEntriesWithInconsistency(Boolean useAsyncFlush, Boolean sm raftLog.open(RaftLog.INVALID_LOG_INDEX, null); checkEntries(raftLog, entries, 0, 650); checkEntries(raftLog, newEntries, 100, 100); - Assertions.assertEquals(newEntries.get(newEntries.size() - 1), + assertEquals(newEntries.get(newEntries.size() - 1), getLastEntry(raftLog)); - Assertions.assertEquals(newEntries.get(newEntries.size() - 1).getIndex(), + assertEquals(newEntries.get(newEntries.size() - 1).getIndex(), raftLog.getFlushIndex()); SegmentedRaftLogCache cache = raftLog.getRaftLogCache(); - Assertions.assertEquals(5, cache.getNumOfSegments()); + assertEquals(5, cache.getNumOfSegments()); + } + } + + @ParameterizedTest + @MethodSource("data") + public void testAppendEntriesWithGap(Boolean useAsyncFlush, Boolean smSyncFlush) throws Exception { + RaftServerConfigKeys.Log.setAsyncFlushEnabled(properties, useAsyncFlush); + RaftServerConfigKeys.Log.StateMachineData.setSync(properties, smSyncFlush); + // prepare the log for truncation + List ranges = prepareRanges(0, 5, 200, 0); + List entries = prepareLogEntries(ranges, null); + + final RetryCache retryCache = RetryCacheTestUtil.createRetryCache(); + try (SegmentedRaftLog raftLog = + RetryCacheTestUtil.newSegmentedRaftLog(MEMBER_ID, retryCache, storage, properties)) { + raftLog.open(RaftLog.INVALID_LOG_INDEX, null); + entries.forEach(entry -> RetryCacheTestUtil.createEntry(retryCache, entry)); + // append entries to the raftlog + entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join); + } + + long lastIndex = ranges.get(ranges.size() - 1).end; + long snapshotIndex = lastIndex + 100; + LogEntryProto entryProto = prepareLogEntry(4, snapshotIndex + 1, null, false); + final LongSupplier getSnapshotIndexFromStateMachine = new LongSupplier() { + @Override + public long getAsLong() { + return snapshotIndex; + } + }; + try (SegmentedRaftLog raftLog = newSegmentedRaftLog(getSnapshotIndexFromStateMachine)) { + raftLog.open(RaftLog.INVALID_LOG_INDEX, null); + // Assert the wrapped exception + IllegalStateException exception = assertThrows(IllegalStateException.class, + () -> raftLog.appendEntry(entryProto)); + // Assert the original cause + assertTrue(exception.getMessage().contains("gap between entries")); + } + + // load the raftlog again and check + try (SegmentedRaftLog raftLog = + RetryCacheTestUtil.newSegmentedRaftLog(MEMBER_ID, retryCache, storage, properties)) { + raftLog.open(RaftLog.INVALID_LOG_INDEX, null); + Assertions.assertEquals(lastIndex, raftLog.getRaftLogCache().getEndIndex()); } } @@ -795,7 +840,7 @@ public void notifyLogFailed(Throwable cause, LogEntryProto entry) { // SegmentedRaftLogWorker should catch TimeoutIOException CompletableFuture f = raftLog.appendEntry(entry); // Wait for async writeStateMachineData to finish - ex = Assertions.assertThrows(ExecutionException.class, f::get); + ex = assertThrows(ExecutionException.class, f::get); } Assertions.assertSame(LifeCycle.State.PAUSED, sm.getLifeCycleState()); Assertions.assertInstanceOf(TimeoutIOException.class, ex.getCause()); @@ -815,9 +860,9 @@ static Thread startAppendEntryThread(RaftLog raftLog, LogEntryProto entry) { void assertIndices(RaftLog raftLog, long expectedFlushIndex, long expectedNextIndex) { LOG.info("assert expectedFlushIndex={}", expectedFlushIndex); - Assertions.assertEquals(expectedFlushIndex, raftLog.getFlushIndex()); + assertEquals(expectedFlushIndex, raftLog.getFlushIndex()); LOG.info("assert expectedNextIndex={}", expectedNextIndex); - Assertions.assertEquals(expectedNextIndex, raftLog.getNextIndex()); + assertEquals(expectedNextIndex, raftLog.getNextIndex()); } void assertIndicesMultipleAttempts(RaftLog raftLog, long expectedFlushIndex, long expectedNextIndex) @@ -938,10 +983,10 @@ public void testConcurrentGetDuringAppend() throws Exception { // When the reader's get() call completed, the append was fully finished, // so it should have returned the correct entry. - Assertions.assertEquals(newEntry.getIndex(), raftLog.getLastEntryTermIndex().getIndex()); + assertEquals(newEntry.getIndex(), raftLog.getLastEntryTermIndex().getIndex()); readEntry.set(raftLog.get(newEntry.getIndex())); Assertions.assertNotNull(readEntry.get()); - Assertions.assertEquals(newEntry, readEntry.get()); + assertEquals(newEntry, readEntry.get()); } finally { CodeInjectionForTesting.remove(LogSegment.APPEND_RECORD); } From ce8800c61e1a10fd58be77549639f05e8a47ff3f Mon Sep 17 00:00:00 2001 From: Sammi Chen Date: Wed, 29 Apr 2026 17:12:58 +0800 Subject: [PATCH 2/3] fix failed UT --- .../ratis/server/raftlog/segmented/LogSegment.java | 4 ++-- .../raftlog/segmented/SegmentedRaftLogCache.java | 2 +- .../ratis/server/raftlog/segmented/TestLogSegment.java | 10 +++++----- .../raftlog/segmented/TestSegmentedRaftLogCache.java | 2 +- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java index 644d9b2bf2..bb2bde7edb 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java @@ -353,9 +353,9 @@ CorruptionPolicy getLogCorruptionPolicy() { return CorruptionPolicy.get(storage, RaftStorage::getLogCorruptionPolicy); } - void appendToOpenSegment(LogEntryProto entry, Op op) { + void appendToOpenSegment(LogEntryProto entry, Op op, boolean verifyEntryIndex) { Preconditions.assertTrue(isOpen(), "The log segment %s is not open for append", this); - append(true, entry, op, false); + append(true, entry, op, verifyEntryIndex); } public static final String APPEND_RECORD = LogSegment.class.getSimpleName() + ".append"; diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java index 2e23796af2..714943c49c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java @@ -632,7 +632,7 @@ void appendEntry(LogEntryProto entry, LogSegment.Op op) { // SegmentedRaftLog does the segment creation/rolling work. Here we just // simply append the entry into the open segment. Objects.requireNonNull(openSegment, "openSegment == null"); - openSegment.appendToOpenSegment(entry, op); + openSegment.appendToOpenSegment(entry, op, false); } /** diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java index a95c683c88..6a75dfb36e 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java @@ -206,7 +206,7 @@ public void testAppendEntries() throws Exception { SimpleOperation op = new SimpleOperation("m" + i); LogEntryProto entry = LogProtoUtils.toLogEntryProto(op.getLogEntryContent(), term, i++ + start); size += getEntrySize(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE); - segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE); + segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, true); } Assertions.assertTrue(segment.getTotalFileSize() >= max); @@ -238,18 +238,18 @@ public void testAppendWithGap() throws Exception { final StateMachineLogEntryProto m = op.getLogEntryContent(); try { LogEntryProto entry = LogProtoUtils.toLogEntryProto(m, 0, 1001); - segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE); + segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, true); Assertions.fail("should fail since the entry's index needs to be 1000"); } catch (IllegalStateException e) { // the exception is expected. } LogEntryProto entry = LogProtoUtils.toLogEntryProto(m, 0, 1000); - segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE); + segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, true); try { entry = LogProtoUtils.toLogEntryProto(m, 0, 1002); - segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE); + segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, true); Assertions.fail("should fail since the entry's index needs to be 1001"); } catch (IllegalStateException e) { // the exception is expected. @@ -264,7 +264,7 @@ public void testTruncate() throws Exception { for (int i = 0; i < 100; i++) { LogEntryProto entry = LogProtoUtils.toLogEntryProto( new SimpleOperation("m" + i).getLogEntryContent(), term, i + start); - segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE); + segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, true); } // truncate an open segment (remove 1080~1099) diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java index 532e32c87d..3133fb36f6 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java @@ -63,7 +63,7 @@ private LogSegment prepareLogSegment(long start, long end, boolean isOpen) { for (long i = start; i <= end; i++) { SimpleOperation m = new SimpleOperation("m" + i); LogEntryProto entry = LogProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i); - s.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE); + s.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, true); } if (!isOpen) { s.close(); From a094265e63ce3a2c4f2a5e6d713e46ae969a98cb Mon Sep 17 00:00:00 2001 From: Sammi Chen Date: Wed, 29 Apr 2026 20:03:32 +0800 Subject: [PATCH 3/3] trigger CI