diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index dbf4d7b57143..27072f60a6b9 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -410,7 +410,7 @@ public Future applyFuture(Mutation mutation, boolean writeCommitLog, boolean if (mutation.id().isNone()) return applyInternal(mutation, writeCommitLog, updateIndexes, true, true, new AsyncPromise<>()); else - return applyInternalTracked(mutation, new AsyncPromise<>()); + return applyInternalTracked(mutation, false, new AsyncPromise<>()); } public void apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes) @@ -441,7 +441,10 @@ public void apply(final Mutation mutation, boolean isDroppable) { if (MigrationRouter.isFullyTracked(mutation)) - applyInternalTracked(mutation, null); + { + // makeDurable is ignored for tracked mutations, the mutation journal is required for replication + applyInternalTracked(mutation, false, null); + } else applyInternal(mutation, makeDurable, updateIndexes, isDroppable, false, null); } @@ -612,10 +615,23 @@ else if (isDeferrable) } } + /** + * Apply a tracked mutation read from the {@link org.apache.cassandra.replication.MutationJournal} + * during static-segment replay on startup. + * + * Compared to the normal write apply path, this skips the journal append (since we're replaying from it) + * and always writes to the memtable, even when {@link MutationTrackingService#startWriting} reports the offset + * as already witnessed. + */ + public void applyForReplay(Mutation mutation) + { + applyInternalTracked(mutation, true, null); + } + /** * Append the mutation to the mutation journal, then update memtables and indexes. */ - private Future applyInternalTracked(Mutation mutation, Promise future) + private Future applyInternalTracked(Mutation mutation, boolean isReplay, Promise future) { MutationTrackingService.ensureEnabled(); if (!MigrationRouter.isFullyTracked(mutation) || mutation.id().isNone()) @@ -628,11 +644,11 @@ private Future applyInternalTracked(Mutation mutation, Promise future) throw new RuntimeException("Testing write failures"); boolean started; - try (WriteContext ctx = trackedWriteHandler.beginWrite(mutation, true)) + try (WriteContext ctx = trackedWriteHandler.beginWrite(mutation, !isReplay)) { started = MutationTrackingService.instance().startWriting(mutation); - if (started) + if (started || isReplay) { for (PartitionUpdate upd : mutation.getPartitionUpdates()) { diff --git a/src/java/org/apache/cassandra/db/tracked/TrackedKeyspaceWriteHandler.java b/src/java/org/apache/cassandra/db/tracked/TrackedKeyspaceWriteHandler.java index 6d8ed9014920..2be356538cc8 100644 --- a/src/java/org/apache/cassandra/db/tracked/TrackedKeyspaceWriteHandler.java +++ b/src/java/org/apache/cassandra/db/tracked/TrackedKeyspaceWriteHandler.java @@ -41,8 +41,12 @@ public WriteContext beginWrite(Mutation mutation, boolean makeDurable) throws Re MigrationRouter.validateTrackedMutation(mutation); - Tracing.trace("Appending to mutation journal"); - CommitLogPosition pointer = MutationJournal.instance().write(mutation.id(), mutation); + CommitLogPosition pointer = null; + if (makeDurable) + { + Tracing.trace("Appending to mutation journal"); + pointer = MutationJournal.instance().write(mutation.id(), mutation); + } return new CassandraWriteContext(group, pointer); } diff --git a/src/java/org/apache/cassandra/metrics/MutationTrackingMetrics.java b/src/java/org/apache/cassandra/metrics/MutationTrackingMetrics.java index 2b7650e36bad..9935a42bd4c4 100644 --- a/src/java/org/apache/cassandra/metrics/MutationTrackingMetrics.java +++ b/src/java/org/apache/cassandra/metrics/MutationTrackingMetrics.java @@ -48,6 +48,7 @@ public static MutationTrackingMetrics instance() public final Histogram readSummarySize; // Read summary sizes public final Gauge unreconciledMutationCount; // Number of unreconciled mutations public final Gauge journalDiskSpaceUsed; // Size of MutationJournal on disk + public final Gauge pendingClearReplaySize; // Static segments awaiting clearNeedsReplay @SuppressWarnings("Convert2MethodRef") private MutationTrackingMetrics() @@ -63,5 +64,9 @@ private MutationTrackingMetrics() factory.createMetricName("JournalDiskSpaceUsed"), () -> MutationJournal.instance().getDiskSpaceUsed() ); + pendingClearReplaySize = Metrics.register( + factory.createMetricName("PendingClearReplaySize"), + () -> MutationJournal.pendingClearReplaySize() + ); } } \ No newline at end of file diff --git a/src/java/org/apache/cassandra/replication/MutationJournal.java b/src/java/org/apache/cassandra/replication/MutationJournal.java index a5b762e7f032..1fa4909804af 100644 --- a/src/java/org/apache/cassandra/replication/MutationJournal.java +++ b/src/java/org/apache/cassandra/replication/MutationJournal.java @@ -20,7 +20,10 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collection; +import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; @@ -30,10 +33,13 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; import org.agrona.collections.Long2LongHashMap; import org.agrona.collections.Long2ObjectHashMap; import org.jctools.maps.NonBlockingHashMapLong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import accord.utils.Invariants; @@ -79,11 +85,27 @@ // TODO (required): handle table truncations public class MutationJournal { + private static final Logger logger = LoggerFactory.getLogger(MutationJournal.class); + + // opaque / immutable list of segments that we should clear the needs-replay flag on + public static class PendingClearReplay + { + private final ImmutableSet segments; + + public PendingClearReplay(ImmutableSet segments) + { + this.segments = segments; + } + } + private static final MutationJournal instance = DatabaseDescriptor.getMutationTrackingEnabled() ? new MutationJournal() : null; private final Journal journal; private final Map segmentStateTrackers; + // Static segments awaiting durable cleanup of their needsReplay=false metadata. + private final Set pendingClearReplay = ConcurrentHashMap.newKeySet(); + // Most of the time during write, we will notify last known segment, so we optimistically cache last segment tracker, // without imposing any visibility guarantees. If we do not see the right segment in this field, we will look it up // in NBHM. @@ -176,20 +198,71 @@ public CommitLogPosition getCurrentPosition() } // If all Memtables associated with given segment were flushed by the time we have closed active segment - // and opened it as static, mark its metadata to indicate it does not need replay. It may happen that we - // crash before persisting this metadata, in which case we will unnecessarily replay the segment, which - // has no correctness implications. + // and opened it as static, the segment is eligible to be marked as not needing replay. The actual durable + // recording of needsReplay=false is deferred — we record the segment in pendingClearReplay and let the + // LogStatePersister drain the queue after it has written witnessed offsets to system.coordinator_logs. + // + // See the comment in LogStatePersister or CASSANDRA-21443 for an explanation of why we do this private void maybeCleanupStaticSegment(Segment segment) { Invariants.require(segment.isStatic()); SegmentStateTracker tracker = segmentStateTrackers.get(segment.id()); if (tracker != null && tracker.removeCleanFromDirty()) + pendingClearReplay.add(segment.id()); + } + + /** + * Snapshot the current set of segments awaiting clearing of their needs replay flag. + */ + public PendingClearReplay snapshotPendingClearReplay() + { + return new PendingClearReplay(ImmutableSet.copyOf(pendingClearReplay)); + } + + /** + * Mark the given PendingClearReplay as not needing replay + * + * See the comment in LogStatePersister or CASSANDRA-21443 for an explanation of why we do this + */ + public void drainCleanup(PendingClearReplay toDrain) + { + for (long segId : toDrain.segments) { - segment.metadata().clearNeedsReplay(); - segment.persistMetadata(); + List> found = journal.getSegments(segId, segId); + if (found.isEmpty()) + { + // segment was dropped between enqueue and drain — nothing to persist. + pendingClearReplay.remove(segId); + continue; + } + Segment segment = found.get(0); + try + { + segment.metadata().clearNeedsReplay(); + segment.persistMetadata(); + pendingClearReplay.remove(segId); + } + catch (Throwable t) + { + logger.warn("Deferred cleanup failed for segment {}; will retry next persister tick", segId, t); + // leave in live queue + } } } + @VisibleForTesting + public Set pendingCleanupForTesting() + { + return pendingClearReplay; + } + + public static int pendingClearReplaySize() + { + if (instance == null) + return 0; + return instance.pendingClearReplay.size(); + } + void startInternal() { journal.start(); @@ -328,7 +401,7 @@ protected void accept(long segmentId, int position, ShortMutationId key, Mutatio if (newPUCollector != null) { assert !newPUCollector.isEmpty(); - keyspace.apply(newPUCollector.build(), false, true, false); + keyspace.applyForReplay(newPUCollector.build()); } } }, getAvailableProcessors()); diff --git a/src/java/org/apache/cassandra/replication/MutationTrackingService.java b/src/java/org/apache/cassandra/replication/MutationTrackingService.java index 5d1103e69a50..db87a88f010a 100644 --- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java +++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java @@ -33,6 +33,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.LongSupplier; import javax.annotation.Nonnull; @@ -147,11 +148,22 @@ public static void ensureEnabled() throw new IllegalStateException(DISABLED_MESSAGE); } - public static void start(ClusterMetadata metadata) + public static ClusterMetadata register(ChangeListener listener) + { + ClusterMetadataService.instance().log().addListener(listener); + return ClusterMetadata.current(); + } + + public static void start(Function register) { if (!isEnabled()) return; - instance().startInternal(metadata); + instance().startInternal(register); + } + + public static void start() + { + start(MutationTrackingService::register); } public static void shutdown() throws InterruptedException @@ -178,6 +190,11 @@ public static void shutdown() throws InterruptedException private ConcurrentHashMap log2ShardMap = new ConcurrentHashMap<>(); private final ChangeListener tcmListener; + // The highest TCM epoch we have applied to keyspaceShards via onNewClusterMetadata. + // Updates with next.epoch <= this value are skipped. Protects against state going + // backwards in time when events are delivered out of order + private volatile Epoch lastAppliedEpoch = Epoch.EMPTY; + // prevents a race between topology changes (shard recreation) and coordinator log creation. // // coordinator log creation can race with topology updates and be lost if shard recreation discards the old @@ -218,7 +235,7 @@ public void notifyPostCommit(ClusterMetadata prev, ClusterMetadata next, boolean }; } - private synchronized void startInternal(ClusterMetadata metadata) + private synchronized void startInternal(Function register) { if (started) return; @@ -227,6 +244,8 @@ private synchronized void startInternal(ClusterMetadata metadata) logger.info("Starting mutation tracking service. Previous host log id: {}", prevHostLogId); + ClusterMetadata metadata = register.apply(tcmListener); + if (metadata.myNodeId() != null) for (KeyspaceShards ks : KeyspaceShards.loadFromSystemTables(metadata, this::nextLogId, this::onNewLog)) keyspaceShards.put(ks.keyspace, ks); @@ -307,11 +326,6 @@ public ReconciledLogSnapshot snapshotReconciledLogs() return builder.build(); } - public void registerMetadataListener() - { - ClusterMetadataService.instance().log().addListener(tcmListener); - } - public synchronized boolean isStarted() { return started; @@ -322,7 +336,13 @@ private void shutdownBlocking() throws InterruptedException ClusterMetadataService.instance().log().removeListener(tcmListener); activeReconciler.shutdownBlocking(); executor.shutdown(); - executor.awaitTermination(1, TimeUnit.MINUTES); + if (!executor.awaitTermination(1, TimeUnit.MINUTES)) + logger.warn("Mutation tracking executor did not terminate within 1 minute; forcing shutdown"); + + // attempt to persist offsets and mark segments as + // not needing replay one last time before shutdown + if (isStarted()) + offsetsPersister.run(true); ExpiredStatePurger.instance.shutdownBlocking(); } @@ -883,12 +903,15 @@ public boolean isDurablyReconciled(ImmutableCoordinatorLogOffsets logOffsets) } } - private void onNewClusterMetadata(@Nullable ClusterMetadata prev, ClusterMetadata next) + private synchronized void onNewClusterMetadata(@Nullable ClusterMetadata prev, ClusterMetadata next) { if (logger.isTraceEnabled()) logger.trace("Processing cluster metadata change - epoch {} -> {}", prev != null ? prev.epoch : "none", next.epoch); + if (!next.epoch.isAfter(lastAppliedEpoch)) + return; + shardLock.readLock().lock(); try { @@ -905,6 +928,9 @@ private void onNewClusterMetadata(@Nullable ClusterMetadata prev, ClusterMetadat ConcurrentHashMap originalKeyspaceShards = keyspaceShards; try { + if (!next.epoch.isAfter(lastAppliedEpoch)) + return; + if (!shardUpdateNeeded(keyspaceShards, prev, next)) return; @@ -919,6 +945,8 @@ private void onNewClusterMetadata(@Nullable ClusterMetadata prev, ClusterMetadat if (!newKeyspaces.isEmpty()) logBackgroundReconciliationDisabledWarning(newKeyspaces); } + + lastAppliedEpoch = next.epoch; } catch (Throwable t) { @@ -1576,6 +1604,35 @@ private void run(Shard shard, boolean durable) } } + /** + * Persists per-log witnessed offsets, and durably marks needsReplay=false on any segments that have become eligible + * for it since the most recent run of this class. These 2 operations need to performed in a specific sequence to avoid + * correctness problems. + * + * For background, mutation tracking needs to keep a record of every mutation id it's written locally. For correctness + * purposes, a nodes view of mutation ids it's written locally needs to exactly match the data it has on disk. + * Having data on disk you dont have an id for, or thinking you have ids on disk that you don't breaks the mutation + * tracking consistency mechanism. + * + * To improve startup, we periodically save our view of mutation ids that we've witnessed to disk as part of this + * class. Any ids witnessed since the last time this class was run are reconstructed by replaying the journal. + * + * However, if an sstable is flushed after the most recent LogStatePersister run, AND it marks a segment as no + * longer needing replay, AND the node is stopped before the next LogStatePersister, then the offsets witnessed + * between the LogStatePersister and sstable flush will be forgotten on startup. + * + * This is a correctness problem for mutation tracking because it means that we will be returning data in reads that + * are not included in our mutation summaries, which breaks reconciliation and read monotonicity. + * + * To prevent this, witnessed offsets are flushed and segments are marked as not needing replay together in 3 steps. + * + * 1. Snapshot the set of journal segments that have been marked as needing their need replay flag set to false (but not yet updated on disk) + * 2. Flush per-log witnessed offsets to the system table + * 3. Durably mark the snapshotted segments as not needing replay + * + * This guarantees that, on startup, we will always replay all segments that may contain offsets not persisted to + * system.coordinator_logs + */ private static class LogStatePersister implements Runnable { // TODO (expected): consider a different interval @@ -1583,20 +1640,46 @@ private static class LogStatePersister implements Runnable // private static final long PERSIST_INTERVAL_MILLIS = 60_000; private static final long PERSIST_INTERVAL_MILLIS = 1_000; + private volatile boolean isPaused = false; + void start() { executor.scheduleWithFixedDelay(this, PERSIST_INTERVAL_MILLIS, PERSIST_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); } + void pauseForTesting(boolean pause) + { + isPaused = pause; + } + @Override public void run() { + if (isPaused) + return; run(true); } private void run(boolean dropSegments) { - MutationTrackingService.instance().forEachKeyspace(this::run); + + MutationJournal.PendingClearReplay toDrain = MutationJournal.instance().snapshotPendingClearReplay(); + + boolean writesOk; + try + { + MutationTrackingService.instance().forEachKeyspace(this::run); + writesOk = true; + } + catch (Throwable t) + { + writesOk = false; + logger.error("LogStatePersister write to system.coordinator_logs failed; deferring segment cleanup drain to next tick", t); + } + + if (writesOk) + MutationJournal.instance().drainCleanup(toDrain); + if (dropSegments) MutationTrackingService.instance().truncateMutationJournal(); } @@ -1643,6 +1726,18 @@ public void resumeActiveReconciler() activeReconciler.resumeForTesting(); } + @VisibleForTesting + public void pauseOffsetsPersisterForTesting() + { + offsetsPersister.pauseForTesting(true); + } + + @VisibleForTesting + public void resumeOffsetsPersisterForTesting() + { + offsetsPersister.pauseForTesting(false); + } + /** * Pause only regular-priority (background write retry) delivery in the active reconciler. * High-priority tasks (needed by tracked read reconciliation) continue to be processed. diff --git a/src/java/org/apache/cassandra/schema/KeyspaceParams.java b/src/java/org/apache/cassandra/schema/KeyspaceParams.java index 6edd68d401d3..ab7575c89f34 100644 --- a/src/java/org/apache/cassandra/schema/KeyspaceParams.java +++ b/src/java/org/apache/cassandra/schema/KeyspaceParams.java @@ -25,6 +25,7 @@ import com.google.common.base.Objects; import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.service.ClientState; @@ -180,6 +181,9 @@ public KeyspaceParams withSecurityLabel(String securityLabel) public void validate(String name, ClientState state, ClusterMetadata metadata) { + if (!durableWrites && replicationType.isTracked()) + throw new ConfigurationException(String.format("Keyspace %s cannot disable durable_writes with replication_type='tracked': " + + "the mutation journal is required for tracked replication", name)); replication.validate(name, state, metadata, replicationType); } diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 4db660fed67c..a0c7158a90e2 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -84,6 +84,7 @@ import org.apache.cassandra.metrics.DefaultNameFactory; import org.apache.cassandra.net.StartupClusterConnectivityChecker; import org.apache.cassandra.replication.MutationJournal; +import org.apache.cassandra.replication.MutationTrackingService; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.security.ThreadAwareSecurityManager; @@ -363,7 +364,10 @@ protected void setup() AccordService.localStartup(self); if (DatabaseDescriptor.getMutationTrackingEnabled()) + { + MutationTrackingService.start(); MutationJournal.instance().replayStaticSegments(); + } } catch (IOException e) { diff --git a/src/java/org/apache/cassandra/tcm/Startup.java b/src/java/org/apache/cassandra/tcm/Startup.java index 5a462665cce3..4a95ad29ca14 100644 --- a/src/java/org/apache/cassandra/tcm/Startup.java +++ b/src/java/org/apache/cassandra/tcm/Startup.java @@ -54,7 +54,6 @@ import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.replication.MutationJournal; -import org.apache.cassandra.replication.MutationTrackingService; import org.apache.cassandra.schema.DistributedSchema; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.Keyspaces; @@ -160,8 +159,7 @@ public static void initializeAsNonCmsNode(Function wrapPro LocalLog.LogSpec logSpec = LocalLog.logSpec() .withStorage(LogStorage.SystemKeyspace) .afterReplay(Startup::scrubDataDirectories, - (metadata) -> StorageService.instance.registerMBeans(), - MutationTrackingService::start) + (metadata) -> StorageService.instance.registerMBeans()) .withDefaultListeners(); ClusterMetadataService.setInstance(new ClusterMetadataService(new UniformRangePlacement(), wrapProcessor, @@ -169,9 +167,6 @@ public static void initializeAsNonCmsNode(Function wrapPro logSpec)); ClusterMetadataService.instance().log().ready(); - if (DatabaseDescriptor.getMutationTrackingEnabled()) - MutationTrackingService.instance().registerMetadataListener(); - NodeId nodeId = ClusterMetadata.current().myNodeId(); UUID currentHostId = SystemKeyspace.getLocalHostId(); if (nodeId != null && !Objects.equals(nodeId.toUUID(), currentHostId)) @@ -291,8 +286,7 @@ public static void initializeFromGossip(Function wrapProce LocalLog.LogSpec logSpec = LocalLog.logSpec() .withInitialState(emptyFromSystemTables) .afterReplay(Startup::scrubDataDirectories, - (metadata) -> StorageService.instance.registerMBeans(), - MutationTrackingService::start) + (metadata) -> StorageService.instance.registerMBeans()) .withStorage(LogStorage.SystemKeyspace) .withDefaultListeners(); @@ -303,9 +297,6 @@ public static void initializeFromGossip(Function wrapProce ClusterMetadataService.instance().log().ready(); - if (DatabaseDescriptor.getMutationTrackingEnabled()) - MutationTrackingService.instance().registerMetadataListener(); - initMessaging.run(); try { @@ -406,8 +397,7 @@ public static void reinitializeWithClusterMetadata(String fileName, Function StorageService.instance.registerMBeans(), - MutationTrackingService::start) + (_metadata) -> StorageService.instance.registerMBeans()) .withPreviousState(prev) .withInitialState(metadata) .withStorage(LogStorage.SystemKeyspace) @@ -421,9 +411,6 @@ public static void reinitializeWithClusterMetadata(String fileName, Function cluster) throws IOException, NoSuchFie AccordService.localStartup(self); if ((Boolean) config.get("mutation_tracking.enabled")) + { + MutationTrackingService.start(); MutationJournal.instance().replayStaticSegments(); + } } catch (IOException e) { diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounceTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounceTest.java index d1015f3560a7..6f1c883f7ad3 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounceTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounceTest.java @@ -75,7 +75,7 @@ private void bounceTest(Cluster cluster, int rf, int bounces) throws Throwable int tables = 10; int writesPerKey = 2; int pks = 100; - withRandom(rng -> { + withRandom(1509900183613458L, rng -> { cluster.schemaChange(String.format("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': %d} " + "AND replication_type='tracked'", KEYSPACE, rf)); diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingTest.java index 85532e744afe..b95be6c14646 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingTest.java @@ -33,6 +33,7 @@ import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.shared.ClusterUtils; import org.apache.cassandra.distributed.test.TestBaseImpl; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.hints.HintsService; @@ -40,6 +41,7 @@ import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.net.Verb; import org.apache.cassandra.replication.CoordinatorLogId; +import org.apache.cassandra.replication.MutationJournal; import org.apache.cassandra.replication.MutationSummary; import org.apache.cassandra.replication.MutationTrackingService; import org.apache.cassandra.replication.Offsets; @@ -107,6 +109,260 @@ public void testBasicWritePath() throws Throwable } } + private static int getOffsetCount(IInvokableInstance node, String keyspaceName, String tableName, int key) + { + return node.callOnInstance(() -> { + TableMetadata table = Schema.instance.getTableMetadata(keyspaceName, tableName); + DecoratedKey dk = Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(key)); + MutationSummary summary = MutationTrackingService.instance().createSummaryForKey(dk, table.id, false); + if (summary.isEmpty()) + return 0; + CoordinatorLogId logId = getOnlyLogId(summary); + return summaryIdSpace(summary.get(logId)).offsetCount(); + }); + + } + + private static int getOffsetCount(IInvokableInstance node, String keyspaceName, int key) + { + return getOffsetCount(node, keyspaceName, "tbl", key); + } + + /** + * Writes tracked mutations, deliberately doesn't flush so the writes live only in the + * commit log the node, and asserts MTS witness state on boot reflects the unflushed writes. Confirm they're + * reconstructed on journal playback + */ + @Test + public void testWitnessSurvivesBounceWithoutFlush() throws Throwable + { + final int key = 1; + final int writes = 10; + + try (Cluster cluster = Cluster.build(1) + .withConfig(cfg -> cfg.with(Feature.NETWORK).with(Feature.GOSSIP)) + .start()) + { + cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 1} " + + "AND replication_type='tracked';")); + cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (k int PRIMARY KEY, v int);")); + + // Pause the persister so writes never reach system.coordinator_logs SSTables — + // the only durable record of the witnesses on disk lives in the commit log. + cluster.get(1).runOnInstance(() -> MutationTrackingService.instance().pauseOffsetsPersisterForTesting()); + + for (int i = 0; i < writes; i++) + cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (k, v) VALUES (?, ?)"), + ConsistencyLevel.QUORUM, key, i); + + String keyspaceName = KEYSPACE; + int preBounceOffsetCount = getOffsetCount(cluster.get(1), keyspaceName, key); + assertEquals("Pre-bounce witness count must equal write count", writes, preBounceOffsetCount); + + // Bounce without flushing. Witnesses live only in the commit log + journal segments + // (still active, needsReplay=true). On the way back up, MTS.start must run after + // CommitLog.recoverSegmentsOnDisk() so the journal replay path repopulates witnesses + // before any consumer queries MTS state. + ClusterUtils.stopUnchecked(cluster.get(1)); + cluster.get(1).startup(); + + cluster.get(1).runOnInstance(() -> MutationTrackingService.instance().pauseOffsetsPersisterForTesting()); + + int postBounceOffsetCount = getOffsetCount(cluster.get(1), keyspaceName, key); + + assertEquals("Witness state must survive bounce-without-flush: post-bounce offsets must match pre-bounce", + preBounceOffsetCount, postBounceOffsetCount); + } + } + + /** + * Regression test for the lost-witness-marker race (CASSANDRA-21443). + * + * When a memtable flush + segment close fires before the periodic LogStatePersister + * has written witnessed offsets to system.coordinator_logs, the segment metadata can be + * durably marked needsReplay=false while the witnesses for its mutations are still only + * in memory. A crash in this window leaves the node with data in SSTables but with + * witness state missing on restart, breaking mutation summaries and journal sync barrier + * guarantees. + * + * The test pauses the persister, writes a known set of mutations, forces flush and + * segment close (triggering maybeCleanupStaticSegment), bounces the node, and asserts + * that the post-restart witness state matches the pre-bounce snapshot. + */ + @Test + public void testWitnessSurvivesCrashAfterFlushAndSegmentClose() throws Throwable + { + final int key = 1; + final int writes = 10; + + try (Cluster cluster = Cluster.build(1) + .withConfig(cfg -> cfg.with(Feature.NETWORK).with(Feature.GOSSIP)) + .start()) + { + cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 1} " + + "AND replication_type='tracked';")); + cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (k int PRIMARY KEY, v int);")); + + // Pause the persister so no witness state escapes to system.coordinator_logs + // for the duration of the test window. This mirrors the in-production hazard + // between persister ticks (currently 1s, planned 60s). + cluster.get(1).runOnInstance(() -> MutationTrackingService.instance().pauseOffsetsPersisterForTesting()); + + for (int i = 0; i < writes; i++) + cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (k, v) VALUES (?, ?)"), + ConsistencyLevel.QUORUM, key, i); + + String keyspaceName = KEYSPACE; + int preBounceOffsetCount = getOffsetCount(cluster.get(1), keyspaceName, key); + assertEquals("Pre-bounce witness count must equal write count", writes, preBounceOffsetCount); + + // Flush so notifyFlushed marks the active segment's interval clean. + cluster.get(1).nodetoolResult("flush", KEYSPACE).asserts().success(); + + // Roll the active segment to static. The cleanup callback fires + // (maybeCleanupStaticSegment) and — in the broken code — durably clears + // needsReplay=false even though witnesses are not persisted. + cluster.get(1).runOnInstance(() -> MutationJournal.instance().closeCurrentSegmentForTestingIfNonEmpty()); + + // Bounce without running the persister. + ClusterUtils.stopUnchecked(cluster.get(1)); + cluster.get(1).startup(); + + // Re-pause on the freshly-restarted instance so any first persister tick + // cannot accidentally normalize state before we sample it. + cluster.get(1).runOnInstance(() -> MutationTrackingService.instance().pauseOffsetsPersisterForTesting()); + + int postBounceOffsetCount = getOffsetCount(cluster.get(1), keyspaceName, key); + + assertEquals("Witness state must survive crash: post-bounce offsets must match pre-bounce", + preBounceOffsetCount, postBounceOffsetCount); + } + } + + /** + * Companion to {@link #testWitnessSurvivesCrashAfterFlushAndSegmentClose}: confirms + * that the deferred-cleanup fix did not break segment cleanup itself. After a flush + * and segment close, the segment should sit in {@code pendingCleanup} (not yet + * needsReplay=false on disk), and the next persister tick should drain it. + * + * If this test fails while the witness-survival test passes, it means we have + * accidentally turned segment cleanup into a no-op - segments would never be eligible + * for journal compaction, and disk would grow unbounded. + */ + @Test + public void testPersisterDrainsPendingSegmentCleanup() throws Throwable + { + try (Cluster cluster = Cluster.build(1) + .withConfig(cfg -> cfg.with(Feature.NETWORK).with(Feature.GOSSIP)) + .start()) + { + cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 1} " + + "AND replication_type='tracked';")); + cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (k int PRIMARY KEY, v int);")); + + // Pause the scheduled persister so the only persister run in this test is the + // explicit one below — otherwise the periodic tick could drain mid-assert. + cluster.get(1).runOnInstance(() -> MutationTrackingService.instance().pauseOffsetsPersisterForTesting()); + + for (int i = 0; i < 10; i++) + cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (k, v) VALUES (?, ?)"), + ConsistencyLevel.QUORUM, 1, i); + + cluster.get(1).nodetoolResult("flush", KEYSPACE).asserts().success(); + cluster.get(1).runOnInstance(() -> MutationJournal.instance().closeCurrentSegmentForTestingIfNonEmpty()); + + // After flush + close, the segment should be queued for cleanup but not yet + // marked needsReplay=false on disk. + cluster.get(1).runOnInstance(() -> { + assertTrue("Expected at least one segment queued for cleanup after flush + close", + !MutationJournal.instance().pendingCleanupForTesting().isEmpty()); + }); + + // Run an explicit persister tick: writes coordinator_logs, drains the snapshot. + // Use the boolean variant to bypass the isPaused check. + cluster.get(1).runOnInstance(() -> MutationTrackingService.instance().persistLogStateForTesting(true)); + + // After the persister tick, the queue should be drained. + cluster.get(1).runOnInstance(() -> { + assertTrue("Expected pendingCleanup to be empty after persister tick", + MutationJournal.instance().pendingCleanupForTesting().isEmpty()); + }); + } + } + + /** + * Validates the clean-shutdown drain path (CASSANDRA-21443). + * + * Test setup pauses the periodic persister so that the only persister run is the + * shutdown-drain one. After a clean bounce, witnesses must survive and there must be no + * static segments on disk, (they're truncated on drain). + */ + @Test + public void testCleanShutdownDrainsPendingCleanup() throws Throwable + { + final int key = 1; + final int writes = 10; + + try (Cluster cluster = Cluster.build(1) + .withConfig(cfg -> cfg.with(Feature.NETWORK).with(Feature.GOSSIP)) + .start()) + { + cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 1} " + + "AND replication_type='tracked';")); + cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (k int PRIMARY KEY, v int);")); + + // Pause the periodic persister so the only run is the shutdown final tick. + cluster.get(1).runOnInstance(() -> MutationTrackingService.instance().pauseOffsetsPersisterForTesting()); + + for (int i = 0; i < writes; i++) + cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (k, v) VALUES (?, ?)"), + ConsistencyLevel.QUORUM, key, i); + + String keyspaceName = KEYSPACE; + int preBounceOffsetCount = getOffsetCount(cluster.get(1), keyspaceName, key); + assertEquals("Pre-bounce witness count must equal write count", writes, preBounceOffsetCount); + + // Flush and close the active segment so it becomes static and enters pendingCleanup. + cluster.get(1).nodetoolResult("flush", KEYSPACE).asserts().success(); + cluster.get(1).runOnInstance(() -> MutationJournal.instance().closeCurrentSegmentForTestingIfNonEmpty()); + + // Confirm pendingCleanup is non-empty before shutdown — this is the state the + // final tick is supposed to drain. + cluster.get(1).runOnInstance(() -> { + assertTrue("Expected at least one segment queued for cleanup before shutdown", + !MutationJournal.instance().pendingCleanupForTesting().isEmpty()); + }); + + // Clean (graceful) shutdown — runs MutationTrackingService.shutdownBlocking which + // performs the final persister tick. + ClusterUtils.stopUnchecked(cluster.get(1)); + cluster.get(1).startup(); + + // Re-pause so any first periodic tick post-restart cannot rewrite state before + // we sample. + cluster.get(1).runOnInstance(() -> MutationTrackingService.instance().pauseOffsetsPersisterForTesting()); + + // Assertion 1: witness count survived. The final tick wrote coordinator_logs, + // so MTS.start's loadFromSystemTables on the next boot saw the witnesses. + int postBounceOffsetCount = getOffsetCount(cluster.get(1), keyspaceName, key); + assertEquals("Witness state must survive clean shutdown via the final-tick path", + preBounceOffsetCount, postBounceOffsetCount); + + // Assertion 2: no static segments on disk. The final tick's truncation step + // dropped the fully-reconciled segments. Without the final tick, the segment would + // still be present (replay reconstitutes witnesses but doesn't drop the segment). + cluster.get(1).runOnInstance(() -> { + int staticSegments = MutationJournal.instance().countStaticSegmentsForTesting(); + assertEquals("Expected zero static segments after clean shutdown final tick", + 0, staticSegments); + }); + } + } + @Test public void testWitnessPaxosV1Reads() throws Throwable { diff --git a/test/unit/org/apache/cassandra/db/virtual/MutationTrackingShardsTableTest.java b/test/unit/org/apache/cassandra/db/virtual/MutationTrackingShardsTableTest.java index 6f34bdba96c9..cd9900a77342 100644 --- a/test/unit/org/apache/cassandra/db/virtual/MutationTrackingShardsTableTest.java +++ b/test/unit/org/apache/cassandra/db/virtual/MutationTrackingShardsTableTest.java @@ -36,7 +36,6 @@ import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.replication.MutationJournal; import org.apache.cassandra.replication.MutationTrackingService; -import org.apache.cassandra.tcm.ClusterMetadata; import static org.assertj.core.api.Assertions.assertThat; @@ -57,7 +56,7 @@ public void setUp() { // Start required services for mutation tracking MutationJournal.start(); - MutationTrackingService.start(ClusterMetadata.current()); + MutationTrackingService.start(); // Create a tracked keyspace schemaChange("CREATE KEYSPACE IF NOT EXISTS tracked_ks WITH replication = " + diff --git a/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java b/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java index fdf57d5b616f..b51227904cdb 100644 --- a/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java +++ b/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java @@ -273,7 +273,7 @@ public void reconciledBounds() throws InterruptedException, ExecutionException { ClusterMetadataTestHelper.commit(new AlterSchema(SchemaTransformations.addTable(tableMetadata, false))); CommitLog.instance.start(); - MutationTrackingService.start(metadata); + MutationTrackingService.start(unused -> metadata); // Eventually, will also run perturbations before checking isReconciled (like log truncation, durability, etc.) // to ensure that we don't prune data required to check what's been reconciled diff --git a/test/unit/org/apache/cassandra/tcm/transformations/AlterSchemaMutationTrackingTest.java b/test/unit/org/apache/cassandra/tcm/transformations/AlterSchemaMutationTrackingTest.java index 5c9581b71d7a..81b8621e9902 100644 --- a/test/unit/org/apache/cassandra/tcm/transformations/AlterSchemaMutationTrackingTest.java +++ b/test/unit/org/apache/cassandra/tcm/transformations/AlterSchemaMutationTrackingTest.java @@ -33,6 +33,7 @@ import org.apache.cassandra.dht.NormalizedRanges; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.replication.MutationJournal; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.replication.migration.KeyspaceMigrationInfo; @@ -44,6 +45,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Tests for AlterSchema auto-starting mutation tracking migration when replication type changes. @@ -372,4 +374,68 @@ private void assertStatesEqual(MutationTrackingMigrationState expected, Mutation assertEquals(expectedInfo, actualInfo); } } + + /** + * The mutation journal can't be disabled for tracked replication, so attempting to set durable_writes=false + * on tracked keyspaces needs to fail validation + */ + @Test + public void testRejectDurableWritesFalseOnTrackedKeyspace() + { + // CREATE: tracked + durable_writes=false should be rejected + String createKs = nextKsName(); + Throwable createFailure = expectFailure(() -> + schemaChange("CREATE KEYSPACE " + createKs + + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}" + + " AND replication_type = 'tracked'" + + " AND durable_writes = false") + ); + assertTrue("Expected ConfigurationException root cause, got: " + createFailure, + rootCause(createFailure) instanceof ConfigurationException); + + // ALTER existing tracked keyspace to set durable_writes=false should be rejected + String alterKs = nextKsName(); + schemaChange("CREATE KEYSPACE " + alterKs + + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}" + + " AND replication_type = 'tracked'"); + Throwable alterTrackedFailure = expectFailure(() -> + schemaChange("ALTER KEYSPACE " + alterKs + " WITH durable_writes = false") + ); + assertTrue("Expected ConfigurationException root cause, got: " + alterTrackedFailure, + rootCause(alterTrackedFailure) instanceof ConfigurationException); + + // ALTER untracked keyspace with durable_writes=false to switch replication_type=tracked should be rejected + String migratedKs = nextKsName(); + schemaChange("CREATE KEYSPACE " + migratedKs + + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}" + + " AND replication_type = 'untracked'" + + " AND durable_writes = false"); + Throwable alterToTrackedFailure = expectFailure(() -> + schemaChange("ALTER KEYSPACE " + migratedKs + " WITH replication_type = 'tracked'") + ); + assertTrue("Expected ConfigurationException root cause, got: " + alterToTrackedFailure, + rootCause(alterToTrackedFailure) instanceof ConfigurationException); + } + + private static Throwable expectFailure(Runnable r) + { + try + { + r.run(); + } + catch (Throwable t) + { + return t; + } + fail("Expected exception but none was thrown"); + return null; + } + + private static Throwable rootCause(Throwable t) + { + Throwable cause = t; + while (cause.getCause() != null && cause.getCause() != cause) + cause = cause.getCause(); + return cause; + } }