From 41eb19646e3b1f5269dd5c30caff00b02866bfa0 Mon Sep 17 00:00:00 2001 From: Jon Meredith Date: Fri, 12 Jun 2026 14:54:15 +0100 Subject: [PATCH] Unable to catch up TCM Log from peer with gaps in log sequence Patch by Jon Meredith and Marcus Eriksson; reviewed by Sam Tunnicliffe for CASSANDRA-21455 Co-authored-by: Marcus Eriksson --- .../apache/cassandra/tcm/log/LogReader.java | 11 ++- .../test/log/CatchupViaSnapshotTest.java | 87 +++++++++++++++++++ .../tcm/log/LocalStorageLogStateTest.java | 48 ++++++++++ 3 files changed, 145 insertions(+), 1 deletion(-) create mode 100644 test/distributed/org/apache/cassandra/distributed/test/log/CatchupViaSnapshotTest.java diff --git a/src/java/org/apache/cassandra/tcm/log/LogReader.java b/src/java/org/apache/cassandra/tcm/log/LogReader.java index e7fcd2b30805..4be476475f81 100644 --- a/src/java/org/apache/cassandra/tcm/log/LogReader.java +++ b/src/java/org/apache/cassandra/tcm/log/LogReader.java @@ -79,7 +79,11 @@ default LogState getLogState(Epoch startEpoch, boolean allowSnapshots) if (snapshotEpochs.size() <= 1 || !allowSnapshots) { entries = getEntries(startEpoch); - if (entries.isContinuous()) + // Only return entries directly if they are continuous AND they reach at least as far as any + // known snapshot. If a node caught up via a ForceSnapshot (which is not written to local_metadata_log), + // its log will have a gap between the old entries and the snapshot epoch. In that case entries may + // appear continuous up to their last epoch, but there is a snapshot beyond them that is needed. + if (entries.isContinuous() && (snapshotEpochs.isEmpty() || !snapshotEpochs.get(0).isAfter(entries.latestEpoch()))) return new LogState(null, entries.immutable()); else if (!allowSnapshots) throw new IllegalStateException("Can't construct a continuous log since " + startEpoch + " and inclusion of snapshots is disallowed"); @@ -136,6 +140,11 @@ public void add(Entry entry) entries.add(entry); } + private Epoch latestEpoch() + { + return entries.isEmpty() ? since : entries.last().epoch; + } + private boolean isContinuous() { Epoch prev = since; diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/CatchupViaSnapshotTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/CatchupViaSnapshotTest.java new file mode 100644 index 000000000000..4393c927a8fc --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/log/CatchupViaSnapshotTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.log; + +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.tcm.Epoch; + +import static org.junit.Assert.assertEquals; + +public class CatchupViaSnapshotTest extends TestBaseImpl +{ + @Test + public void catchupViaSnapshotTest() throws Exception + { + try (Cluster cluster = init(builder().withNodes(3) + .withConfig(c -> c.set("metadata_snapshot_frequency", "10")) + .start())) + { + cluster.schemaChange(withKeyspace("alter keyspace %s with replication = {'class':'SimpleStrategy', 'replication_factor':3}")); + cluster.schemaChange(withKeyspace("create table %s.tbl (id int primary key)")); + + // isolate node2 and node3 + cluster.filters().inbound().from(1).to(2,3).drop(); + + for (int i = 0; i < 30; i++) + cluster.coordinator(1).execute(withKeyspace("alter table %s.tbl with comment='abc" + i + "'"), ConsistencyLevel.ONE); + + // Snapshot needs to be the last transformation in the log + cluster.get(1).nodetoolResult("cms", "snapshot").asserts().success(); + + // allow node2 to catch up: + cluster.filters().reset(); + cluster.filters().inbound().from(1).to(3).drop(); + String node1Address = cluster.get(1).config().broadcastAddress().getHostString(); + + fetchLogFromPeerAsync(cluster.get(2), node1Address); + // allow node3 to catch up + cluster.filters().reset(); + String node2Address = cluster.get(2).config().broadcastAddress().getHostString(); + // by now node2 has an incomplete log, it only caught up from the snapshot above + // this means the log is continuous, but its current epoch is beyond the last entry in the log + fetchLogFromPeerAsync(cluster.get(3), node2Address); + + long expectedEpoch = cluster.get(1).callOnInstance(() -> ClusterMetadata.current().epoch.getEpoch()); + for (IInvokableInstance i : cluster) + assertEquals(expectedEpoch, (long)i.callOnInstance(() -> ClusterMetadata.current().epoch.getEpoch())); + } + } + + private static void fetchLogFromPeerAsync(IInvokableInstance i, String address) + { + i.runOnInstance(() -> { + try + { + ClusterMetadataService.instance().fetchLogFromPeerAsync(InetAddressAndPort.getByNameUnchecked(address), Epoch.create(30)).get(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + }); + } +} diff --git a/test/unit/org/apache/cassandra/tcm/log/LocalStorageLogStateTest.java b/test/unit/org/apache/cassandra/tcm/log/LocalStorageLogStateTest.java index 5bc6ec0fa831..3460468abbdb 100644 --- a/test/unit/org/apache/cassandra/tcm/log/LocalStorageLogStateTest.java +++ b/test/unit/org/apache/cassandra/tcm/log/LocalStorageLogStateTest.java @@ -21,13 +21,16 @@ import java.io.IOException; import org.junit.BeforeClass; +import org.junit.Test; import org.apache.cassandra.ServerTestUtils; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.MetadataSnapshots; @@ -37,6 +40,8 @@ import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; import static org.apache.cassandra.db.SystemKeyspace.METADATA_LOG; import static org.apache.cassandra.schema.SchemaConstants.SYSTEM_KEYSPACE_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class LocalStorageLogStateTest extends LogStateTestBase { @@ -109,4 +114,47 @@ public void dumpTables() throws IOException }; } + @Test + public void catchUpViaForceSnapshotLeavesGappedLog() throws Exception + { + // Simulates a non-CMS node that caught up via a ForceSnapshot (real or synthetic). + // LocalLog.append(LogState) converts the received baseState into a synthetic ForceSnapshot, + // which is processed but not written to local_metadata_log. + // The snapshot is stored in metadata_snapshots by MetadataSnapshotListener. + // The resulting on-disk state is: entries 1..X in local_metadata_log, snapshot at S sometime after X in + // metadata_snapshots, with no entries between X and S. + // Any peer requesting log since epoch <= X must receive the snapshot — not just the continuous + // run of entries up to X, which would leave it unable to advance past the gap. + MetadataSnapshots realSnapshots = new MetadataSnapshots.SystemKeyspaceMetadataSnapshots(); + LogStateSUT sut = getSystemUnderTest(realSnapshots); + sut.cleanup(); + ColumnFamilyStore.getIfExists(SYSTEM_KEYSPACE_NAME, SystemKeyspace.SNAPSHOT_TABLE_NAME) + .truncateBlockingWithoutSnapshot(); + + // insertRegularEntry inserts 2 entries on the first call (epoch 1 and 2) due to the + // Epoch.FIRST pre-init entry, then 1 per subsequent call. After 3 calls: epochs 1..4. + sut.insertRegularEntry(); + sut.insertRegularEntry(); + sut.insertRegularEntry(); + + // Simulate ForceSnapshot at epoch 50: snapshot stored, no intermediate log entries written + Epoch gapSnapshotEpoch = Epoch.create(50); + realSnapshots.storeSnapshot(ClusterMetadataTestHelper.minimalForTesting(Murmur3Partitioner.instance) + .forceEpoch(gapSnapshotEpoch)); + + // A peer at epoch 3 sees entry [4] which is continuous, but does not bridge to epoch 50. + // Must return the snapshot rather than just entry 4, which would leave the peer stuck. + LogState state = sut.getLogState(Epoch.create(3)); + assertEquals(gapSnapshotEpoch, state.baseState.epoch); + assertTrue(state.entries.isEmpty()); + + // A peer already at epoch 4 (the last log entry) previously got an empty response and stalled. + state = sut.getLogState(Epoch.create(4)); + assertEquals(gapSnapshotEpoch, state.baseState.epoch); + assertTrue(state.entries.isEmpty()); + + ColumnFamilyStore.getIfExists(SYSTEM_KEYSPACE_NAME, SystemKeyspace.SNAPSHOT_TABLE_NAME) + .truncateBlockingWithoutSnapshot(); + } + }