Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/java/org/apache/cassandra/tcm/log/LogReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ default LogState getLogState(Epoch startEpoch, boolean allowSnapshots)
if (snapshotEpochs.size() <= 1 || !allowSnapshots)
{
entries = getEntries(startEpoch);
if (entries.isContinuous())
// Only return entries directly if they are continuous AND they reach at least as far as any
// known snapshot. If a node caught up via a ForceSnapshot (which is not written to local_metadata_log),
// its log will have a gap between the old entries and the snapshot epoch. In that case entries may
// appear continuous up to their last epoch, but there is a snapshot beyond them that is needed.
if (entries.isContinuous() && (snapshotEpochs.isEmpty() || !snapshotEpochs.get(0).isAfter(entries.latestEpoch())))
return new LogState(null, entries.immutable());
else if (!allowSnapshots)
throw new IllegalStateException("Can't construct a continuous log since " + startEpoch + " and inclusion of snapshots is disallowed");
Expand Down Expand Up @@ -136,6 +140,11 @@ public void add(Entry entry)
entries.add(entry);
}

private Epoch latestEpoch()
{
return entries.isEmpty() ? since : entries.last().epoch;
}

private boolean isContinuous()
{
Epoch prev = since;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.distributed.test.log;

import org.junit.Test;

import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.ClusterMetadataService;
import org.apache.cassandra.tcm.Epoch;

import static org.junit.Assert.assertEquals;

public class CatchupViaSnapshotTest extends TestBaseImpl
{
@Test
public void catchupViaSnapshotTest() throws Exception
{
try (Cluster cluster = init(builder().withNodes(3)
.withConfig(c -> c.set("metadata_snapshot_frequency", "10"))
.start()))
{
cluster.schemaChange(withKeyspace("alter keyspace %s with replication = {'class':'SimpleStrategy', 'replication_factor':3}"));
cluster.schemaChange(withKeyspace("create table %s.tbl (id int primary key)"));

// isolate node2 and node3
cluster.filters().inbound().from(1).to(2,3).drop();

for (int i = 0; i < 30; i++)
cluster.coordinator(1).execute(withKeyspace("alter table %s.tbl with comment='abc" + i + "'"), ConsistencyLevel.ONE);

// Snapshot needs to be the last transformation in the log
cluster.get(1).nodetoolResult("cms", "snapshot").asserts().success();

// allow node2 to catch up:
cluster.filters().reset();
cluster.filters().inbound().from(1).to(3).drop();
String node1Address = cluster.get(1).config().broadcastAddress().getHostString();

fetchLogFromPeerAsync(cluster.get(2), node1Address);
// allow node3 to catch up
cluster.filters().reset();
String node2Address = cluster.get(2).config().broadcastAddress().getHostString();
// by now node2 has an incomplete log, it only caught up from the snapshot above
// this means the log is continuous, but its current epoch is beyond the last entry in the log
fetchLogFromPeerAsync(cluster.get(3), node2Address);

long expectedEpoch = cluster.get(1).callOnInstance(() -> ClusterMetadata.current().epoch.getEpoch());
for (IInvokableInstance i : cluster)
assertEquals(expectedEpoch, (long)i.callOnInstance(() -> ClusterMetadata.current().epoch.getEpoch()));
}
}

private static void fetchLogFromPeerAsync(IInvokableInstance i, String address)
{
i.runOnInstance(() -> {
try
{
ClusterMetadataService.instance().fetchLogFromPeerAsync(InetAddressAndPort.getByNameUnchecked(address), Epoch.create(30)).get();
}
catch (Exception e)
{
throw new RuntimeException(e);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@
import java.io.IOException;

import org.junit.BeforeClass;
import org.junit.Test;

import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.MetadataSnapshots;
Expand All @@ -37,6 +40,8 @@
import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
import static org.apache.cassandra.db.SystemKeyspace.METADATA_LOG;
import static org.apache.cassandra.schema.SchemaConstants.SYSTEM_KEYSPACE_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class LocalStorageLogStateTest extends LogStateTestBase
{
Expand Down Expand Up @@ -109,4 +114,47 @@ public void dumpTables() throws IOException
};
}

@Test
public void catchUpViaForceSnapshotLeavesGappedLog() throws Exception
{
// Simulates a non-CMS node that caught up via a ForceSnapshot (real or synthetic).
// LocalLog.append(LogState) converts the received baseState into a synthetic ForceSnapshot,
// which is processed but not written to local_metadata_log.
// The snapshot is stored in metadata_snapshots by MetadataSnapshotListener.
// The resulting on-disk state is: entries 1..X in local_metadata_log, snapshot at S sometime after X in
// metadata_snapshots, with no entries between X and S.
// Any peer requesting log since epoch <= X must receive the snapshot — not just the continuous
// run of entries up to X, which would leave it unable to advance past the gap.
MetadataSnapshots realSnapshots = new MetadataSnapshots.SystemKeyspaceMetadataSnapshots();
LogStateSUT sut = getSystemUnderTest(realSnapshots);
sut.cleanup();
ColumnFamilyStore.getIfExists(SYSTEM_KEYSPACE_NAME, SystemKeyspace.SNAPSHOT_TABLE_NAME)
.truncateBlockingWithoutSnapshot();

// insertRegularEntry inserts 2 entries on the first call (epoch 1 and 2) due to the
// Epoch.FIRST pre-init entry, then 1 per subsequent call. After 3 calls: epochs 1..4.
sut.insertRegularEntry();
sut.insertRegularEntry();
sut.insertRegularEntry();

// Simulate ForceSnapshot at epoch 50: snapshot stored, no intermediate log entries written
Epoch gapSnapshotEpoch = Epoch.create(50);
realSnapshots.storeSnapshot(ClusterMetadataTestHelper.minimalForTesting(Murmur3Partitioner.instance)
.forceEpoch(gapSnapshotEpoch));

// A peer at epoch 3 sees entry [4] which is continuous, but does not bridge to epoch 50.
// Must return the snapshot rather than just entry 4, which would leave the peer stuck.
LogState state = sut.getLogState(Epoch.create(3));
assertEquals(gapSnapshotEpoch, state.baseState.epoch);
assertTrue(state.entries.isEmpty());

// A peer already at epoch 4 (the last log entry) previously got an empty response and stalled.
state = sut.getLogState(Epoch.create(4));
assertEquals(gapSnapshotEpoch, state.baseState.epoch);
assertTrue(state.entries.isEmpty());

ColumnFamilyStore.getIfExists(SYSTEM_KEYSPACE_NAME, SystemKeyspace.SNAPSHOT_TABLE_NAME)
.truncateBlockingWithoutSnapshot();
}

}