From c826edde1c71981fbefc58edbb28e4def573007c Mon Sep 17 00:00:00 2001 From: Benedict Elliott Smith Date: Thu, 4 Jun 2026 19:29:58 +0100 Subject: [PATCH] Expunged records may be resurrected: CommandChanges.shouldCleanup short-circuits to NO if there is no data, but this is incorrect as Cleanup.EXPUNGE may have dropped the data and the record must receive cleanup EXPUNGE and be reported as ERASED. Also Fix: - Populate CFK system table row markers so we can stop reading sstables as soon as we have data - system_accord_debug.executors has wrong clustering type - RemoteToLocalVirtualTable should lazily allocate the partition collector to avoid LIMIT clause filtering removing it before it's populated - ActiveEpochs.withNewEpochs should handle transition from 0 -> more than 1 - RedundantBefore.minGcBefore should be NONE if empty - Update RangesForEpoch directly, so that we cannot have race conditions where the ownership is unknown - Avoid reentrancy on local callbacks - Ensure ReadCoordinator callbacks are invoked on owning thread - Avoid deadlock when notifying ComplexListener(s) - Release IntrusivePriorityHeap memory from large capacity heaps when empty - Prevent SynchronousRecoverAwait reentrancy when invoking onDone (by exposing and invoking invokeOnDone that first sets isDone) - maybeExecute must invoke either notWaiting or notifyWaiting to ensure tryExecuteListening terminates Also Improve: - Configurable execute_waiting_on_start patch by Benedict; reviewed by Alan Wang and Alex Petrov for CASSANDRA-21440 --- .gitmodules | 4 +- modules/accord | 2 +- .../apache/cassandra/config/AccordConfig.java | 3 + .../org/apache/cassandra/db/LivenessInfo.java | 6 +- .../db/compaction/CompactionTask.java | 3 +- .../apache/cassandra/db/rows/BTreeRow.java | 15 +- .../db/view/ViewUpdateGenerator.java | 4 +- .../db/virtual/AbstractLazyVirtualTable.java | 2 +- .../db/virtual/AccordDebugKeyspace.java | 2 +- .../db/virtual/RemoteToLocalVirtualTable.java | 12 +- .../service/accord/AccordCommandStore.java | 56 ++--- .../service/accord/AccordKeyspace.java | 8 +- .../accord/AccordSafeCommandStore.java | 1 - .../service/accord/AccordService.java | 24 +- .../accord/journal/JournalRangeIndex.java | 2 +- ...rdCommandStoreTryExecuteListeningTest.java | 202 ----------------- .../test/accord/AccordLoadTest.java | 2 - .../AccordJournalConsistentExpungeTest.java | 112 ++++++++++ .../accord/AccordJournalCompactionTest.java | 1 + .../accord/journal/AccordJournalBurnTest.java | 21 +- .../db/partitions/SimplePartition.java | 10 +- .../db/rows/BTreeRowHasLiveDataTest.java | 2 +- .../apache/cassandra/db/rows/RowsTest.java | 18 +- .../rows/UnfilteredRowIteratorsMergeTest.java | 2 +- .../db/rows/UnfilteredRowsGenerator.java | 4 +- .../db/transform/DuplicateRowCheckerTest.java | 2 +- .../db/transform/RTTransformationsTest.java | 2 +- .../service/accord/AccordExpungeTest.java | 211 ++++++++++++++++++ .../service/accord/AccordTestUtils.java | 10 +- .../accord/SimulatedAccordCommandStore.java | 9 +- .../CommandsForKeySerializerTest.java | 3 +- .../serializers/LatestDepsSerializerTest.java | 3 +- .../cassandra/utils/AccordGenerators.java | 7 +- 33 files changed, 453 insertions(+), 312 deletions(-) delete mode 100644 test/distributed/org/apache/cassandra/distributed/test/accord/AccordCommandStoreTryExecuteListeningTest.java create mode 100644 test/distributed/org/apache/cassandra/distributed/test/accord/journal/AccordJournalConsistentExpungeTest.java create mode 100644 test/unit/org/apache/cassandra/service/accord/AccordExpungeTest.java diff --git a/.gitmodules b/.gitmodules index 616dacf610a7..6808b5b57864 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,4 +1,4 @@ [submodule "modules/accord"] path = modules/accord - url = https://github.com/apache/cassandra-accord.git - branch = trunk + url = https://github.com/belliottsmith/cassandra-accord.git + branch = consistent-expunge diff --git a/modules/accord b/modules/accord index 0a10cd056794..f7a5a0a9e261 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit 0a10cd056794c05588114f45ce86d49d6d6538db +Subproject commit f7a5a0a9e261f4778edf2d7c7db83449369b8679 diff --git a/src/java/org/apache/cassandra/config/AccordConfig.java b/src/java/org/apache/cassandra/config/AccordConfig.java index 0f502ddc1f92..b9fe93b9a490 100644 --- a/src/java/org/apache/cassandra/config/AccordConfig.java +++ b/src/java/org/apache/cassandra/config/AccordConfig.java @@ -317,6 +317,9 @@ public enum CatchupMode // TODO (required): roll this back to catchup_on_start_exit_on_failure: true public boolean catchup_on_start_exit_on_failure = false; public CatchupMode catchup_on_start = NORMAL; + public boolean execute_waiting_on_start = true; + public DurationSpec.IntSecondsBound execute_waiting_on_start_timeout = new DurationSpec.IntSecondsBound(0); + public boolean execute_waiting_on_start_fail_on_timeout = false; public DurationSpec.IntSecondsBound shutdown_grace_period = new DurationSpec.IntSecondsBound(15 * 60); public enum RangeIndexMode { in_memory, journal_sai } diff --git a/src/java/org/apache/cassandra/db/LivenessInfo.java b/src/java/org/apache/cassandra/db/LivenessInfo.java index 22865b325bef..46456028b7f9 100644 --- a/src/java/org/apache/cassandra/db/LivenessInfo.java +++ b/src/java/org/apache/cassandra/db/LivenessInfo.java @@ -53,7 +53,7 @@ public interface LivenessInfo extends IMeasurableMemory LivenessInfo EMPTY = new ImmutableLivenessInfo(NO_TIMESTAMP); long UNSHARED_HEAP_SIZE = ObjectSizes.measure(EMPTY); - static LivenessInfo create(long timestamp, long nowInSec) + static LivenessInfo create(long timestamp) { return new ImmutableLivenessInfo(timestamp); } @@ -75,14 +75,14 @@ private static LivenessInfo expiring(long timestamp, int ttl, long nowInSec, boo private static LivenessInfo create(long timestamp, int ttl, long nowInSec, boolean applyOverflowPolicy) { return ttl == NO_TTL - ? create(timestamp, nowInSec) + ? create(timestamp) : expiring(timestamp, ttl, nowInSec, applyOverflowPolicy); } static LivenessInfo create(long timestamp, int ttl, long nowInSec) { return ttl == NO_TTL - ? create(timestamp, nowInSec) + ? create(timestamp) : expiring(timestamp, ttl, nowInSec); } diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 86f54bf8266a..334322425d30 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -63,6 +63,7 @@ import org.apache.cassandra.io.util.File; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.service.accord.AccordCommandStore; import org.apache.cassandra.service.accord.AccordService; import org.apache.cassandra.service.accord.TokenRange; import org.apache.cassandra.service.accord.api.TokenKey; @@ -575,7 +576,7 @@ protected final CompactionController getCompactionController(Set // we should impose stronger guarantees on incoming streams (that they don't contain tombstones), and validate our CommandStore-derived bounds gcBeforeSeconds = node.durableBefore().foldlWithDefault(ranges, (e, v) -> e == null ? NO_GC : Math.min(e.universal.hlc(), v), null, Long.MAX_VALUE); StoreSelector selector = StoreFinder.selector(ranges, Long.MIN_VALUE, Long.MAX_VALUE); - gcBeforeSeconds = node.commandStores().mapReduceUnsafe(selector, (ignore, commandStore) -> commandStore.unsafeGetRedundantBefore().foldl(ranges, (bs, v) -> bs.maxBound(LOCALLY_APPLIED).hlc(), Long.MAX_VALUE), Math::min, gcBeforeSeconds); + gcBeforeSeconds = node.commandStores().mapReduceUnsafe(selector, (ignore, commandStore) -> ((AccordCommandStore)commandStore).safeGetRedundantBefore().foldl(ranges, (bs, v) -> bs.maxBound(LOCALLY_APPLIED).hlc(), Long.MAX_VALUE), Math::min, gcBeforeSeconds); if (gcBeforeSeconds == Long.MAX_VALUE) gcBeforeSeconds = NO_GC; else diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java index 7c338ff33db6..f80efa965cdf 100644 --- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java +++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java @@ -120,10 +120,7 @@ public static BTreeRow create(Clustering clustering, { long minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion.time())); if (minDeletionTime != Long.MIN_VALUE) - { - long result = BTree.accumulate(btree, (cd, l) -> Math.min(l, minDeletionTime(cd)) , minDeletionTime); - minDeletionTime = result; - } + minDeletionTime = BTree.accumulate(btree, (cd, l) -> Math.min(l, minDeletionTime(cd)) , minDeletionTime); return create(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime); } @@ -144,11 +141,17 @@ public static BTreeRow emptyRow(Clustering clustering) public static BTreeRow singleCellRow(Clustering clustering, Cell cell) { + return singleCellRow(clustering, LivenessInfo.EMPTY, cell); + } + + public static BTreeRow singleCellRow(Clustering clustering, LivenessInfo primaryKeyLivenessInfo, Cell cell) + { + long minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(cell)); if (cell.column().isSimple()) - return new BTreeRow(clustering, BTree.singleton(cell), minDeletionTime(cell)); + return new BTreeRow(clustering, primaryKeyLivenessInfo, Deletion.LIVE, BTree.singleton(cell), minDeletionTime); ComplexColumnData complexData = new ComplexColumnData(cell.column(), new Cell[]{ cell }, DeletionTime.LIVE); - return new BTreeRow(clustering, BTree.singleton(complexData), minDeletionTime(cell)); + return new BTreeRow(clustering, primaryKeyLivenessInfo, Deletion.LIVE, BTree.singleton(complexData), minDeletionTime); } public static BTreeRow emptyDeletedRow(Clustering clustering, Deletion deletion) diff --git a/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java index 021fc1f7fdb1..d51b4211e521 100644 --- a/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java +++ b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java @@ -497,9 +497,9 @@ else if (cell.localDeletionTime() > biggestExpirationCell.localDeletionTime()) } } if (baseLiveness.isLive(nowInSec) && !baseLiveness.isExpiring()) - return LivenessInfo.create(timestamp, nowInSec); + return LivenessInfo.create(timestamp); if (hasNonExpiringLiveCell) - return LivenessInfo.create(timestamp, nowInSec); + return LivenessInfo.create(timestamp); if (biggestExpirationCell == null) return baseLiveness; if (biggestExpirationCell.localDeletionTime() > baseLiveness.localExpirationTime() diff --git a/src/java/org/apache/cassandra/db/virtual/AbstractLazyVirtualTable.java b/src/java/org/apache/cassandra/db/virtual/AbstractLazyVirtualTable.java index d7a2664ad3be..1924cb6f8001 100644 --- a/src/java/org/apache/cassandra/db/virtual/AbstractLazyVirtualTable.java +++ b/src/java/org/apache/cassandra/db/virtual/AbstractLazyVirtualTable.java @@ -670,7 +670,7 @@ FilteredRow materialiseAndFilter(SimplePartition.SimpleRow parent) } Arrays.sort(columns, 0, columnCount, (a, b) -> ColumnData.comparator.compare((BufferCell)a, (BufferCell)b)); Object[] btree = BTree.build(BulkIterator.of(columns), columnCount, UpdateFunction.noOp); - BTreeRow row = BTreeRow.create(parent.clustering, LivenessInfo.create(timestampMicros, nowInSeconds), Row.Deletion.LIVE, btree); + BTreeRow row = BTreeRow.create(parent.clustering, LivenessInfo.create(timestampMicros), Row.Deletion.LIVE, btree); if (!rowFilter.isSatisfiedBy(metadata, parent.partitionKey(), row, nowInSeconds)) return null; return new FilteredRow(row); diff --git a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java index b593a5629914..8c63b5767e3d 100644 --- a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java +++ b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java @@ -308,7 +308,7 @@ private ExecutorsTable() "CREATE TABLE %s (\n" + " executor_id int,\n" + " status text,\n" + - " position int,\n" + + " position bigint,\n" + " unique_position int,\n" + " description text,\n" + " command_store_id int,\n" + diff --git a/src/java/org/apache/cassandra/db/virtual/RemoteToLocalVirtualTable.java b/src/java/org/apache/cassandra/db/virtual/RemoteToLocalVirtualTable.java index a32e94ab078a..8382d3473a75 100644 --- a/src/java/org/apache/cassandra/db/virtual/RemoteToLocalVirtualTable.java +++ b/src/java/org/apache/cassandra/db/virtual/RemoteToLocalVirtualTable.java @@ -188,7 +188,6 @@ protected void collect(PartitionsCollector collector) advance = -1; } - PartitionCollector partition = collector.partition(id.id()); while (i != end) { List request = rebind(local, slices.get(i), dataRange.isReversed(), rowFilter, columnFilter); @@ -200,7 +199,7 @@ protected void collect(PartitionsCollector collector) else readCommand = PartitionRangeReadCommand.create(local, collector.nowInSeconds(), send.columnFilter, send.rowFilter, limits, send.dataRange); - RequestAndResponse rr = new RequestAndResponse(id, partition, readCommand); + RequestAndResponse rr = new RequestAndResponse(id, collector, readCommand); send(rr, endpoint); pending.addLast(rr); @@ -218,12 +217,12 @@ protected void collect(PartitionsCollector collector) private static class RequestAndResponse extends SyncPromise { final NodeId nodeId; - final PartitionCollector partition; + final PartitionsCollector partitions; final ReadCommand readCommand; - private RequestAndResponse(NodeId nodeId, PartitionCollector partition, ReadCommand readCommand) + private RequestAndResponse(NodeId nodeId, PartitionsCollector partitions, ReadCommand readCommand) { this.nodeId = nodeId; - this.partition = partition; + this.partitions = partitions; this.readCommand = readCommand; } } @@ -292,6 +291,7 @@ private void collect(PartitionsCollector collector, RequestAndResponse rr, Funct } int pkCount = local.partitionKeyColumns().size(); + PartitionCollector out = rr.partitions.partition(rr.nodeId.id()); try (UnfilteredPartitionIterator partitions = response.makeIterator(rr.readCommand)) { while (partitions.hasNext()) @@ -311,7 +311,7 @@ private void collect(PartitionsCollector collector, RequestAndResponse rr, Funct for (int j = 0 ; j < clustering.size(); ++j) clusterings[pkCount + j] = clustering.bufferAt(j); } - rr.partition.collect(rows -> { + out.collect(rows -> { rows.add((Object[])clusterings) .lazyCollect(columns -> { row.forEach(cd -> { diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java index 1b13e1807fef..f4dd59a9c7ae 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java @@ -56,7 +56,7 @@ import accord.impl.progresslog.TxnState; import accord.local.Command; import accord.local.CommandStore; -import accord.local.CommandStores; +import accord.local.CommandStores.RangesForEpoch; import accord.local.CommandSummaries; import accord.local.MaxConflicts; import accord.local.MaxDecidedRX; @@ -234,32 +234,23 @@ public AccordCommandStore(int id, DataStore dataStore, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenerFactory, - EpochUpdateHolder epochUpdateHolder, + RangesForEpoch rangesForEpoch, Journal journal, AccordExecutor sharedExecutor) { - super(id, node, agent, dataStore, progressLogFactory, listenerFactory, epochUpdateHolder); + super(id, node, agent, dataStore, progressLogFactory, listenerFactory, rangesForEpoch); this.loggingId = String.format("[%s]", id); this.journal = journal; this.sharedExecutor = sharedExecutor; if (this.progressLog instanceof DefaultProgressLog) ((DefaultProgressLog)this.progressLog).unsafeSetConfig(DatabaseDescriptor.getAccordProgressLogConfig()); + maybeLoadRangesForEpoch(journal.loadRangesForEpoch(id())); maybeLoadRedundantBefore(journal.loadRedundantBefore(id())); maybeLoadBootstrapBeganAt(journal.loadBootstrapBeganAt(id())); maybeLoadSafeToRead(journal.loadSafeToRead(id())); - maybeLoadRangesForEpoch(journal.loadRangesForEpoch(id())); - - CommandStores.RangesForEpoch ranges = this.rangesForEpoch; - if (ranges == null || ranges.all().isEmpty()) - { - EpochUpdate update = epochUpdateHolder.get(); - if (update != null) - ranges = update.newRangesForEpoch; - Invariants.require(ranges != null, "CommandStore %d created with no ranges", id); - } - tableId = (TableId)ranges.all().stream().map(r -> r.start().prefix()).reduce((a, b) -> { + tableId = (TableId)rangesForEpoch.all().stream().map(r -> r.start().prefix()).reduce((a, b) -> { Invariants.require(a.equals(b), "CommandStore created with multiple distinct TableId (%s and %s)", a, b); return a; }).orElseThrow(() -> Invariants.illegalState("CommandStore %d created with no ranges", id)); @@ -610,8 +601,8 @@ public AccordCompactionInfo getCompactionInfo() RedundantBefore redundantBefore; if (safeRedundantBefore == null) redundantBefore = RedundantBefore.EMPTY; else redundantBefore = safeRedundantBefore.redundantBefore; - CommandStores.RangesForEpoch ranges = this.rangesForEpoch; - if (ranges == null) ranges = CommandStores.RangesForEpoch.EMPTY; + RangesForEpoch ranges = this.rangesForEpoch; + if (ranges == null) ranges = RangesForEpoch.EMPTY; return new AccordCompactionInfo(id, redundantBefore, ranges, tableId); } @@ -637,8 +628,8 @@ protected void ensureDurable(Ranges ranges, RedundantBefore onCommandStoreDurabl protected void ensureDurable() { - RedundantBefore forCommandStore = nonDurable(unsafeGetRedundantBefore(), LOCALLY_DURABLE_TO_COMMAND_STORE, LOCALLY_DURABLE_TO_COMMAND_STORE_ONLY); - RedundantBefore forDataStore = nonDurable(unsafeGetRedundantBefore(), LOCALLY_DURABLE_TO_DATA_STORE, LOCALLY_DURABLE_TO_DATA_STORE_ONLY); + RedundantBefore forCommandStore = nonDurable(safeGetRedundantBefore(), LOCALLY_DURABLE_TO_COMMAND_STORE, LOCALLY_DURABLE_TO_COMMAND_STORE_ONLY); + RedundantBefore forDataStore = nonDurable(safeGetRedundantBefore(), LOCALLY_DURABLE_TO_DATA_STORE, LOCALLY_DURABLE_TO_DATA_STORE_ONLY); this.ensureDurable(forCommandStore.ranges(Objects::nonNull), forCommandStore); dataStore.ensureDurable(this, forDataStore, 0); } @@ -659,7 +650,7 @@ private RedundantBefore nonDurable(RedundantBefore redundantBefore, Property dur protected void ensureDurable(@Nullable Ranges ranges, ReportDurable onCommandStoreDurable) { - if (node().isReplaying() && onCommandStoreDurable.flags == 0 && unsafeGetRedundantBefore().isAtLeast(onCommandStoreDurable.redundantBefore)) + if (node().isReplaying() && onCommandStoreDurable.flags == 0 && safeGetRedundantBefore().isAtLeast(onCommandStoreDurable.redundantBefore)) return; long reportId = nextDurabilityLoggingId.incrementAndGet(); @@ -722,13 +713,6 @@ public void unsafeUpsertRedundantBefore(RedundantBefore addRedundantBefore) super.unsafeUpsertRedundantBefore(addRedundantBefore); } - @VisibleForTesting - public void unsafeUpdateRangesForEpoch() - { - super.unsafeUpdateRangesForEpoch(); - safeRedundantBefore = new SafeRedundantBefore(0, unsafeGetRedundantBefore()); - } - public static class AccordCommandStoreReplayer extends AbstractReplayer { private final AccordCommandStore commandStore; @@ -760,18 +744,16 @@ public AsyncChain replay(TxnId txnId) * Replay/state reloading */ - void maybeLoadRedundantBefore(RedundantBefore redundantBefore) + protected void loadRedundantBefore(RedundantBefore redundantBefore) { - Invariants.require(safeRedundantBefore == null); - if (redundantBefore != null) - { + super.loadRedundantBefore(redundantBefore); + safeRedundantBefore = new SafeRedundantBefore(0, redundantBefore); + } + + protected void maybeLoadRedundantBefore(RedundantBefore redundantBefore) + { + if (redundantBefore != null && !redundantBefore.isEmpty()) loadRedundantBefore(redundantBefore); - safeRedundantBefore = new SafeRedundantBefore(0, redundantBefore); - } - else - { - safeRedundantBefore = new SafeRedundantBefore(0, this.unsafeGetRedundantBefore()); - } } void maybeLoadBootstrapBeganAt(NavigableMap bootstrapBeganAt) @@ -786,7 +768,7 @@ void maybeLoadSafeToRead(NavigableMap safeToRead) loadSafeToRead(safeToRead); } - void maybeLoadRangesForEpoch(CommandStores.RangesForEpoch rangesForEpoch) + void maybeLoadRangesForEpoch(RangesForEpoch rangesForEpoch) { if (rangesForEpoch != null) loadRangesForEpoch(rangesForEpoch); diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java index 983ad31461ba..5123471acdeb 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java +++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java @@ -52,6 +52,7 @@ import org.apache.cassandra.db.DataRange; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.LivenessInfo; import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.db.ReadExecutionController; import org.apache.cassandra.db.RegularAndStaticColumns; @@ -116,6 +117,7 @@ import static java.lang.String.format; import static java.util.Collections.emptyMap; +import static java.util.concurrent.TimeUnit.MICROSECONDS; import static org.apache.cassandra.config.AccordConfig.RangeIndexMode.journal_sai; import static org.apache.cassandra.db.partitions.PartitionUpdate.singleRowUpdate; import static org.apache.cassandra.db.rows.BTreeRow.singleCellRow; @@ -269,7 +271,7 @@ public static CommandsForKey load(int commandStoreId, TokenKey key) static CommandsForKey unsafeLoad(CommandsForKeyAccessor accessor, int commandStoreId, TokenKey key) { long timestampMicros = TimeUnit.MILLISECONDS.toMicros(Global.currentTimeMillis()); - int nowInSeconds = (int) TimeUnit.MICROSECONDS.toSeconds(timestampMicros); + int nowInSeconds = (int) MICROSECONDS.toSeconds(timestampMicros); SinglePartitionReadCommand command = makeRead(accessor, commandStoreId, key, nowInSeconds); @@ -346,7 +348,9 @@ public static PartitionUpdate makeUpdate(int storeId, TokenKey key, long timesta { return singleRowUpdate(CFKAccessor.table, CommandsForKeyAccessor.makeSystemTableKey(storeId, key), - singleCellRow(Clustering.EMPTY, BufferCell.live(CFKAccessor.data, timestampMicros, bytes))); + singleCellRow(Clustering.EMPTY, + LivenessInfo.create(timestampMicros), + BufferCell.live(CFKAccessor.data, timestampMicros, bytes))); } public static Runnable systemTableUpdater(int storeId, TokenKey key, CommandsForKey update, Object serialized, long timestampMicros) diff --git a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java index 4626a8c6bb09..6a9817bc9766 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java +++ b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java @@ -68,7 +68,6 @@ private AccordSafeCommandStore(AccordTask task, this.task = task; this.commandsForRanges = commandsForRanges; this.commandStore = commandStore; - commandStore.updateRangesForEpoch(this); } @Override diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index 8d6dbaabb4bc..340760370c1d 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -622,11 +622,25 @@ public synchronized void localStartup() { replayJournal(minSegments); - logger.info("Try to execute pending transactions..."); - List> results = new ArrayList<>(); - node.commandStores().forAllUnsafe(commandStore -> results.add(commandStore.tryToExecuteListeningTxns(false))); - if (!results.isEmpty()) - getBlocking(AsyncResults.reduce(results, Reduce.toNull())); + if (getAccord().execute_waiting_on_start) + { + logger.info("Execute waiting transactions..."); + List> results = new ArrayList<>(); + node.commandStores().forAllUnsafe(commandStore -> results.add(commandStore.tryToExecuteListeningTxns(false))); + if (!results.isEmpty()) + { + Future future = toFuture(AsyncResults.reduce(results, Reduce.toNull())); + long timeoutSeconds = getAccord().execute_waiting_on_start_timeout.toSeconds(); + if (timeoutSeconds <= 0) future.awaitUninterruptibly().rethrowIfFailed(); + else if (!future.awaitUninterruptibly(timeoutSeconds, SECONDS)) + { + if (getAccord().execute_waiting_on_start_fail_on_timeout) + throw new RuntimeException("Timeout waiting to exeute waiting transactions"); + logger.warn("Timeout waiting to exeute waiting transactions"); + } + else future.rethrowIfFailed(); + } + } } } finally diff --git a/src/java/org/apache/cassandra/service/accord/journal/JournalRangeIndex.java b/src/java/org/apache/cassandra/service/accord/journal/JournalRangeIndex.java index ca43832657af..eba2e534b4b4 100644 --- a/src/java/org/apache/cassandra/service/accord/journal/JournalRangeIndex.java +++ b/src/java/org/apache/cassandra/service/accord/journal/JournalRangeIndex.java @@ -366,7 +366,7 @@ static Object[] toMap(TxnId txnId, RangeRoute route) public JournalRangeIndex.Loader loader(TxnId primaryTxnId, Timestamp primaryExecuteAt, LoadKeysFor loadKeysFor, Unseekables keysOrRanges) { - RedundantBefore redundantBefore = commandStore.unsafeGetRedundantBefore(); + RedundantBefore redundantBefore = commandStore.safeGetRedundantBefore(); MaxDecidedRX maxDecidedRX = commandStore.unsafeGetMaxDecidedRX(); return SummaryLoader.loader(redundantBefore, maxDecidedRX, primaryTxnId, primaryExecuteAt, loadKeysFor, keysOrRanges, this::newLoader); } diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCommandStoreTryExecuteListeningTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCommandStoreTryExecuteListeningTest.java deleted file mode 100644 index 0277095da484..000000000000 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCommandStoreTryExecuteListeningTest.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.distributed.test.accord; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import java.util.stream.Stream; - -import org.assertj.core.api.Assertions; -import org.junit.Test; - -import accord.api.Journal; -import accord.local.Command; -import accord.local.Node; -import accord.local.PreLoadContext; -import accord.local.PreLoadContext.Empty; -import accord.local.StoreParticipants; -import accord.primitives.Ballot; -import accord.primitives.Deps; -import accord.primitives.FullRoute; -import accord.primitives.KeyDeps; -import accord.primitives.Range; -import accord.primitives.RangeDeps; -import accord.primitives.Ranges; -import accord.primitives.Routable; -import accord.primitives.RoutingKeys; -import accord.primitives.SaveStatus; -import accord.primitives.Status; -import accord.primitives.Txn; -import accord.primitives.TxnId; -import accord.topology.TopologyException; -import accord.utils.ImmutableBitSet; -import accord.utils.LargeBitSet; - -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.distributed.Cluster; -import org.apache.cassandra.distributed.api.TokenSupplier; -import org.apache.cassandra.distributed.shared.NetworkTopology; -import org.apache.cassandra.distributed.test.TestBaseImpl; -import org.apache.cassandra.schema.Schema; -import org.apache.cassandra.schema.TableId; -import org.apache.cassandra.service.accord.AccordCommandStore; -import org.apache.cassandra.service.accord.AccordService; -import org.apache.cassandra.service.accord.api.PartitionKey; -import org.apache.cassandra.service.accord.txn.TxnDataResult; -import org.apache.cassandra.utils.ByteBufferUtil; - -import static org.apache.cassandra.distributed.api.Feature.GOSSIP; -import static org.apache.cassandra.distributed.api.Feature.NETWORK; - -public class AccordCommandStoreTryExecuteListeningTest extends TestBaseImpl -{ - private static DecoratedKey dk(int key) - { - IPartitioner partitioner = DatabaseDescriptor.getPartitioner(); - return partitioner.decorateKey(ByteBufferUtil.bytes(key)); - } - - private static PartitionKey pk(int key, String keyspace, String table) - { - TableId tid = Schema.instance.getTableMetadata(keyspace, table).id; - return new PartitionKey(tid, dk(key)); - } - - @Test - public void testTryExecuteListening() throws Throwable - { - try (Cluster cluster = Cluster.build().withNodes(1) - .withoutVNodes() - .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(1)) - .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(1, "dc0", "rack0")) - .withConfig(config -> config.set("accord.command_store_shard_count", 1) - .set("accord.queue_shard_count", 1) - .with(NETWORK, GOSSIP)) - .start()) - { - cluster.schemaChange("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor':1}"); - cluster.schemaChange("CREATE TABLE ks.tbl (k int, c int, v int, primary key(k, c)) WITH transactional_mode='full'"); - - cluster.get(1).runOnInstance(() -> { - AccordService service = (AccordService) AccordService.instance(); - - Node node = service.node(); - PartitionKey key = pk(1, "ks", "tbl"); - AccordCommandStore commandStore = (AccordCommandStore) node.commandStores().unsafeForKey(key.toUnseekable()); - - Command txn1a = executed(node, SaveStatus.Applied, 1); - Command txn1b = executed(node, SaveStatus.PreApplied, 2); - Command txn2a = executed(node, SaveStatus.PreApplied, 1, txn1a.txnId()); - Command txn2b = executed(node, SaveStatus.PreApplied, 2, txn1b.txnId()); - Command txn3 = executed(node, SaveStatus.PreApplied,1, txn1a.txnId(), 2, txn1b.txnId(), txn2b.txnId()); - Command txn4 = executed(node, SaveStatus.PreApplied, 1, txn1a.txnId(), txn3.txnId(), 2, txn1b.txnId(), txn3.txnId()); - Command[] commands = new Command[] { txn1a, txn1b, txn2a, txn2b, txn3, txn4 }; - - AccordService.getBlocking(commandStore.chain((Empty)() -> "Test", safeStore -> { - for (Command command : commands) - commandStore.journal.saveCommand(commandStore.id(), new Journal.CommandUpdate(null, command), () -> {}); - - commandStore.unsafeGetListeners().register(txn1a.txnId(), SaveStatus.Applied, txn2a.txnId()); - commandStore.unsafeGetListeners().register(txn3.txnId(), SaveStatus.Applied, txn4.txnId()); - })); - - AccordService.getBlocking(commandStore.tryToExecuteListeningTxns(true)); - - for (Command command : commands) - { - Command cmd = AccordService.getBlocking(commandStore.submit(PreLoadContext.contextFor(command.txnId(), "Test"), safeStore -> safeStore.unsafeGet(command.txnId()).current())); - Assertions.assertThat(cmd.saveStatus()).isEqualTo(SaveStatus.Applied); - } - }); - } - } - - private static Command executed(Node node, SaveStatus saveStatus, Object ... inputs) - { - int depCount; - Map> depsByInputKey = new TreeMap<>(); - TxnId[] txnIds; - { - PartitionKey k = null; - for (Object input : inputs) - { - if (input instanceof Integer) - { - k = keyN((Integer) input, node); - depsByInputKey.put(k, new ArrayList<>()); - } - else depsByInputKey.get(k).add((TxnId)input); - } - txnIds = depsByInputKey.values().stream().flatMap(Collection::stream).distinct().sorted().toArray(TxnId[]::new); - depCount = depsByInputKey.values().stream().mapToInt(Collection::size).sum(); - } - PartitionKey[] keys = depsByInputKey.keySet().toArray(PartitionKey[]::new); - Range[] ranges = Stream.of(keys).map(PartitionKey::asRange).toArray(Range[]::new); - - PartitionKey key = keys[0]; - AccordCommandStore commandStore = (AccordCommandStore) node.commandStores().unsafeForKey(key.toUnseekable()); - - Txn txn = node.agent().emptySystemTxn(Txn.Kind.ExclusiveSyncPoint, Routable.Domain.Range); - TxnId txnId = node.nextTxnId(txn); - FullRoute route; - try { route = node.computeRoute(txnId, Ranges.of(ranges)); } - catch (TopologyException e) { throw new RuntimeException(e); } - int[] rangesToTxnIds = new int[depCount + ranges.length]; - { - int offset = ranges.length; - for (int i = 0 ; i < ranges.length ; ++i) - { - for (TxnId dep : depsByInputKey.get(keys[i])) - rangesToTxnIds[offset++] = Arrays.binarySearch(txnIds, dep); - rangesToTxnIds[i] = offset; - } - } - Deps deps = new Deps(KeyDeps.NONE, RangeDeps.SerializerSupport.create(ranges, txnIds, rangesToTxnIds, null)); - Command.WaitingOn waitingOn; { - LargeBitSet waitingOnBits = new LargeBitSet(txnIds.length); - waitingOnBits.setRange(0, txnIds.length); - waitingOn = new Command.WaitingOn(RoutingKeys.EMPTY, deps.rangeDeps, new ImmutableBitSet(waitingOnBits), new ImmutableBitSet(txnIds.length)); - } - return Command.Executed.executed(txnId, saveStatus, Status.Durability.NotDurable, StoreParticipants.execute(commandStore.unsafeGetRangesForEpoch(), route, txnId, txnId.epoch()), Ballot.ZERO, txnId, txn.intersecting(route, true), deps.intersecting(route), Ballot.ZERO, waitingOn, null, TxnDataResult.PERSISTABLE); - } - - private static PartitionKey keyN(int n, Node node) - { - PartitionKey first = pk(1, "ks", "tbl"); - if (n == 1) - return first; - - AccordCommandStore commandStore = (AccordCommandStore) node.commandStores().unsafeForKey(first.toUnseekable()); - - int i = 2; - while (true) - { - PartitionKey next = pk(i, "ks", "tbl"); - if (commandStore.unsafeGetRangesForEpoch().all().contains(next) && --n == 0) - return next; - } - } - -} diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java index b0f339523cd7..cee395977dbf 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java @@ -116,14 +116,12 @@ public static void setUp() throws IOException .set("accord.shard_durability_max_splits", "16") .set("accord.shard_durability_cycle", "1m") .set("accord.queue_submission_model", "SIGNAL") -// .set("accord.queue_submission_model", "SEMI_SYNC") .set("accord.command_store_shard_count", "8") .set("accord.queue_thread_count", "4") .set("accord.queue_shard_count", "1") .set("accord.replica_execution", "ALL") .set("accord.send_stable", "TO_ALL_REPLICA_EXECUTABLE_ELSE_FOR_READS") .set("accord.send_minimal", "false") -// .set("accord.permit_fast_quorum_medium_path", "false") .set("accord.catchup_on_start_fail_latency", "2m"); }), nodeCount); } diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/journal/AccordJournalConsistentExpungeTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/journal/AccordJournalConsistentExpungeTest.java new file mode 100644 index 000000000000..8770df01c96f --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/journal/AccordJournalConsistentExpungeTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.accord.journal; + + +import java.util.Iterator; + +import org.junit.Test; + +import accord.local.Command; +import accord.local.Node; +import accord.primitives.SaveStatus; +import accord.primitives.TxnId; + +import org.apache.cassandra.Util; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.service.accord.AccordCacheEntry; +import org.apache.cassandra.service.accord.AccordCommandStore; +import org.apache.cassandra.service.accord.AccordService; +import org.apache.cassandra.service.accord.api.PartitionKey; +import org.apache.cassandra.utils.ByteBufferUtil; + +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class AccordJournalConsistentExpungeTest extends TestBaseImpl +{ + private static DecoratedKey dk(int key) + { + IPartitioner partitioner = DatabaseDescriptor.getPartitioner(); + return partitioner.decorateKey(ByteBufferUtil.bytes(key)); + } + + private static PartitionKey pk(int key, String keyspace, String table) + { + TableId tid = Schema.instance.getTableMetadata(keyspace, table).id; + return new PartitionKey(tid, dk(key)); + } + + @Test + public void loadCommandErasedTest() throws Throwable + { + try (Cluster cluster = Cluster.build().withNodes(3) + .withoutVNodes() + .withConfig(config -> config + .set("accord.shard_durability_cycle", "20s") + .set("accord.ephemeral_reads", false) + .with(NETWORK, GOSSIP)) + .start()) + { + cluster.schemaChange("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor':3}"); + cluster.schemaChange("CREATE TABLE ks.tbl (k int PRIMARY KEY, v int) WITH transactional_mode='full'"); + + cluster.get(1).executeInternal("BEGIN TRANSACTION \n" + + "SELECT * FROM ks.tbl WHERE k = 1; \n" + + "COMMIT TRANSACTION"); + + cluster.get(1).runOnInstance(() -> { + AccordService service = (AccordService) AccordService.instance(); + PartitionKey key = pk(1, "ks", "tbl"); + + Node node = service.node(); + AccordCommandStore commandStore = (AccordCommandStore) node.commandStores().unsafeForKey(key.toUnseekable()); + + Iterator> iterator = commandStore.cachesUnsafe().commands().iterator(); + + TxnId txnId = TxnId.NONE; + + while (iterator.hasNext()) + { + txnId = iterator.next().key(); + if (!txnId.isSystemTxn()) + break; + } + + assertFalse(txnId.isSystemTxn()); + + TxnId finalTxnId = txnId; + Util.spinUntilTrue(() -> commandStore.safeGetRedundantBefore().minGcBefore().compareTo(finalTxnId) >= 0, 25); + + service.journal().purge(service.node().commandStores(), node.topology()::minEpoch); + + assertEquals(SaveStatus.Erased, commandStore.loadCommand(txnId).saveStatus); + }); + } + } +} + diff --git a/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java b/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java index e12ec5875235..0fce66a1f1d4 100644 --- a/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java +++ b/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java @@ -73,6 +73,7 @@ public static void setUp() throws Throwable } private AtomicInteger counter = new AtomicInteger(); + @Before public void beforeTest() throws Throwable { diff --git a/test/distributed/org/apache/cassandra/service/accord/journal/AccordJournalBurnTest.java b/test/distributed/org/apache/cassandra/service/accord/journal/AccordJournalBurnTest.java index ab4ee9b3960e..00d54bd3ce7d 100644 --- a/test/distributed/org/apache/cassandra/service/accord/journal/AccordJournalBurnTest.java +++ b/test/distributed/org/apache/cassandra/service/accord/journal/AccordJournalBurnTest.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.file.Files; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.List; @@ -94,6 +95,7 @@ import org.apache.cassandra.utils.CloseableIterator; import static accord.impl.PrefixedIntHashKey.ranges; +import static accord.primitives.SaveStatus.Erased; import static org.apache.cassandra.config.AccordConfig.RangeIndexMode.journal_sai; import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID; @@ -101,6 +103,11 @@ public class AccordJournalBurnTest extends BurnTestBase { private static final Logger logger = LoggerFactory.getLogger(AccordJournalBurnTest.class); + static + { + Cluster.RandomLoader.CMD_BASE_CHECK_CHANCE = 0.1f; + } + public static void setUp() throws Throwable { StorageService.instance.registerMBeans(); @@ -331,13 +338,21 @@ public void purge(CommandStores commandStores, EpochSupplier minEpoch) { Command b = e.getValue(); Command a = after.get(e.getKey()); + if (b != null && b.saveStatus == Erased) b = null; + if (a != null && a.saveStatus == Erased) a = null; Invariants.require(Objects.equals(a, b)); } if (before.size() != after.size()) { - for (Map.Entry e : after.entrySet()) - Invariants.require(null != before.get(e.getKey())); - Invariants.require(false); + for (Map m : Arrays.asList(before, after)) + { + for (JournalKey k : m.keySet()) + { + Command b = before.get(k); + Command a = after.get(k); + Invariants.require((a == null || a.saveStatus == Erased) == (b == null || b.saveStatus == Erased), "%s != %s", a, b); + } + } } Invariants.require(!orig.equals(table.getLiveSSTables())); } diff --git a/test/unit/org/apache/cassandra/db/partitions/SimplePartition.java b/test/unit/org/apache/cassandra/db/partitions/SimplePartition.java index d13987468785..1d14cbb474a8 100644 --- a/test/unit/org/apache/cassandra/db/partitions/SimplePartition.java +++ b/test/unit/org/apache/cassandra/db/partitions/SimplePartition.java @@ -113,12 +113,12 @@ public SimplePartition addEmpty(Clustering ck) public SimplePartition addEmptyAndLive(Clustering ck) { - return addEmptyAndLive(ck, DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP); + return addEmptyAndLive(ck, DEFAULT_TIMESTAMP); } - public SimplePartition addEmptyAndLive(Clustering ck, long timestamp, long nowInSec) + public SimplePartition addEmptyAndLive(Clustering ck, long timestamp) { - return add(ck).liveness(timestamp, nowInSec).build(); + return add(ck).liveness(timestamp).build(); } public RowIterator filtered() @@ -142,9 +142,9 @@ public RowBuilder timestamp(long timestamp) return this; } - public RowBuilder liveness(long timestamp, long nowInSec) + public RowBuilder liveness(long timestamp) { - builder.addPrimaryKeyLivenessInfo(LivenessInfo.create(timestamp, nowInSec)); + builder.addPrimaryKeyLivenessInfo(LivenessInfo.create(timestamp)); return this; } diff --git a/test/unit/org/apache/cassandra/db/rows/BTreeRowHasLiveDataTest.java b/test/unit/org/apache/cassandra/db/rows/BTreeRowHasLiveDataTest.java index 7eab66e2b0b7..64768ff63610 100644 --- a/test/unit/org/apache/cassandra/db/rows/BTreeRowHasLiveDataTest.java +++ b/test/unit/org/apache/cassandra/db/rows/BTreeRowHasLiveDataTest.java @@ -146,7 +146,7 @@ public void livePK_returnsTrue() long ts = timestampMicro(nowInSec); Row.Builder b = newBuilder(); - b.addPrimaryKeyLivenessInfo(LivenessInfo.create(ts, nowInSec)); + b.addPrimaryKeyLivenessInfo(LivenessInfo.create(ts)); // No cells. Row row = b.build(); diff --git a/test/unit/org/apache/cassandra/db/rows/RowsTest.java b/test/unit/org/apache/cassandra/db/rows/RowsTest.java index 899dae6a4376..06bf701a2f76 100644 --- a/test/unit/org/apache/cassandra/db/rows/RowsTest.java +++ b/test/unit/org/apache/cassandra/db/rows/RowsTest.java @@ -230,7 +230,7 @@ private static Row.Builder createBuilder(Clustering c, long now, ByteBuffer v { long ts = secondToTs(now); Row.Builder builder = createBuilder(c); - builder.addPrimaryKeyLivenessInfo(LivenessInfo.create(ts, now)); + builder.addPrimaryKeyLivenessInfo(LivenessInfo.create(ts)); if (vVal != null) { builder.addCell(BufferCell.live(v, ts, vVal)); @@ -251,7 +251,7 @@ public void collectStats() long ts = secondToTs(now); Row.Builder builder = BTreeRow.unsortedBuilder(); builder.newRow(c1); - LivenessInfo liveness = LivenessInfo.create(ts, now); + LivenessInfo liveness = LivenessInfo.create(ts); builder.addPrimaryKeyLivenessInfo(liveness); DeletionTime complexDeletion = DeletionTime.build(ts-1, now); builder.addComplexDeletion(m, complexDeletion); @@ -289,7 +289,7 @@ public void diff() long ts1 = secondToTs(now1); Row.Builder r1Builder = BTreeRow.unsortedBuilder(); r1Builder.newRow(c1); - LivenessInfo r1Liveness = LivenessInfo.create(ts1, now1); + LivenessInfo r1Liveness = LivenessInfo.create(ts1); r1Builder.addPrimaryKeyLivenessInfo(r1Liveness); DeletionTime r1ComplexDeletion = DeletionTime.build(ts1-1, now1); r1Builder.addComplexDeletion(m, r1ComplexDeletion); @@ -305,7 +305,7 @@ public void diff() long ts2 = secondToTs(now2); Row.Builder r2Builder = BTreeRow.unsortedBuilder(); r2Builder.newRow(c1); - LivenessInfo r2Liveness = LivenessInfo.create(ts2, now2); + LivenessInfo r2Liveness = LivenessInfo.create(ts2); r2Builder.addPrimaryKeyLivenessInfo(r2Liveness); Cell r2v = BufferCell.live(v, ts2, BB2); Cell r2m2 = BufferCell.live(m, ts2, BB1, CellPath.create(BB2)); @@ -365,7 +365,7 @@ public void diffEmptyMerged() long ts1 = secondToTs(now1); Row.Builder r1Builder = BTreeRow.unsortedBuilder(); r1Builder.newRow(c1); - LivenessInfo r1Liveness = LivenessInfo.create(ts1, now1); + LivenessInfo r1Liveness = LivenessInfo.create(ts1); r1Builder.addPrimaryKeyLivenessInfo(r1Liveness); // mergedData == null @@ -373,7 +373,7 @@ public void diffEmptyMerged() long ts2 = secondToTs(now2); Row.Builder r2Builder = BTreeRow.unsortedBuilder(); r2Builder.newRow(c1); - LivenessInfo r2Liveness = LivenessInfo.create(ts2, now2); + LivenessInfo r2Liveness = LivenessInfo.create(ts2); r2Builder.addPrimaryKeyLivenessInfo(r2Liveness); DeletionTime r2ComplexDeletion = DeletionTime.build(ts2-1, now2); r2Builder.addComplexDeletion(m, r2ComplexDeletion); @@ -419,7 +419,7 @@ public void diffEmptyInput() long ts1 = secondToTs(now1); Row.Builder r1Builder = BTreeRow.unsortedBuilder(); r1Builder.newRow(c1); - LivenessInfo r1Liveness = LivenessInfo.create(ts1, now1); + LivenessInfo r1Liveness = LivenessInfo.create(ts1); r1Builder.addPrimaryKeyLivenessInfo(r1Liveness); // mergedData == null @@ -427,7 +427,7 @@ public void diffEmptyInput() long ts2 = secondToTs(now2); Row.Builder r2Builder = BTreeRow.unsortedBuilder(); r2Builder.newRow(c1); - LivenessInfo r2Liveness = LivenessInfo.create(ts2, now2); + LivenessInfo r2Liveness = LivenessInfo.create(ts2); r2Builder.addPrimaryKeyLivenessInfo(r2Liveness); DeletionTime r2ComplexDeletion = DeletionTime.build(ts2-1, now2); r2Builder.addComplexDeletion(m, r2ComplexDeletion); @@ -484,7 +484,7 @@ public void merge() Row merged = Rows.merge(existingBuilder.build(), updateBuilder.build()); Assert.assertEquals(c1, merged.clustering()); - Assert.assertEquals(LivenessInfo.create(ts2, now2), merged.primaryKeyLivenessInfo()); + Assert.assertEquals(LivenessInfo.create(ts2), merged.primaryKeyLivenessInfo()); Iterator> iter = merged.cells().iterator(); Assert.assertTrue(iter.hasNext()); diff --git a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java index f39dbaeb6295..ba38d9ca7d02 100644 --- a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java +++ b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java @@ -405,7 +405,7 @@ private static Clustering clusteringFor(int i) static Row emptyRowAt(int pos, IntUnaryOperator timeGenerator) { final Clustering clustering = clusteringFor(pos); - final LivenessInfo live = LivenessInfo.create(timeGenerator.applyAsInt(pos), nowInSec); + final LivenessInfo live = LivenessInfo.create(timeGenerator.applyAsInt(pos)); return BTreeRow.noCellLiveRow(clustering, live); } diff --git a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowsGenerator.java b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowsGenerator.java index a4a5c35f53f8..0cb06cd02cda 100644 --- a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowsGenerator.java +++ b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowsGenerator.java @@ -235,14 +235,14 @@ public List parse(String input, int default_liveness) static Row emptyRowAt(int pos, IntUnaryOperator timeGenerator) { final Clustering clustering = clusteringFor(pos); - final LivenessInfo live = LivenessInfo.create(timeGenerator.applyAsInt(pos), UnfilteredRowIteratorsMergeTest.nowInSec); + final LivenessInfo live = LivenessInfo.create(timeGenerator.applyAsInt(pos)); return BTreeRow.noCellLiveRow(clustering, live); } static Row emptyRowAt(int pos, int time, long deletionTime) { final Clustering clustering = clusteringFor(pos); - final LivenessInfo live = LivenessInfo.create(time, UnfilteredRowIteratorsMergeTest.nowInSec); + final LivenessInfo live = LivenessInfo.create(time); final DeletionTime delTime = deletionTime == -1 ? DeletionTime.LIVE : DeletionTime.build(deletionTime, deletionTime); return BTreeRow.create(clustering, live, Row.Deletion.regular(delTime), BTree.empty()); } diff --git a/test/unit/org/apache/cassandra/db/transform/DuplicateRowCheckerTest.java b/test/unit/org/apache/cassandra/db/transform/DuplicateRowCheckerTest.java index 15ac67eb89e7..2dcae5945d93 100644 --- a/test/unit/org/apache/cassandra/db/transform/DuplicateRowCheckerTest.java +++ b/test/unit/org/apache/cassandra/db/transform/DuplicateRowCheckerTest.java @@ -212,7 +212,7 @@ public static Row makeRow(TableMetadata metadata, Object... clusteringValues) for (int i = 0; i < clusteringValues.length; i++) clusteringByteBuffers[i] = decompose(metadata.clusteringColumns().get(i).type, clusteringValues[i]); - return BTreeRow.noCellLiveRow(Clustering.make(clusteringByteBuffers), LivenessInfo.create(0, 0)); + return BTreeRow.noCellLiveRow(Clustering.make(clusteringByteBuffers), LivenessInfo.create(0)); } public static UnfilteredRowIterator partition(TableMetadata metadata, diff --git a/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java b/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java index b223c0843e25..125658d4c9b7 100644 --- a/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java +++ b/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java @@ -410,7 +410,7 @@ private Row row(long timestamp, Object... clusteringValues) for (int i = 0; i < clusteringValues.length; i++) clusteringByteBuffers[i] = decompose(metadata.clusteringColumns().get(i).type, clusteringValues[i]); - return BTreeRow.noCellLiveRow(Clustering.make(clusteringByteBuffers), LivenessInfo.create(timestamp, nowInSec)); + return BTreeRow.noCellLiveRow(Clustering.make(clusteringByteBuffers), LivenessInfo.create(timestamp)); } @SuppressWarnings("unchecked") diff --git a/test/unit/org/apache/cassandra/service/accord/AccordExpungeTest.java b/test/unit/org/apache/cassandra/service/accord/AccordExpungeTest.java new file mode 100644 index 000000000000..66514033a5fc --- /dev/null +++ b/test/unit/org/apache/cassandra/service/accord/AccordExpungeTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service.accord; + +import java.nio.file.Files; +import java.util.concurrent.atomic.AtomicInteger; + +import org.assertj.core.api.SoftAssertions; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import accord.api.Journal; +import accord.local.Cleanup; +import accord.local.Command; +import accord.local.DurableBefore; +import accord.local.RedundantBefore; +import accord.primitives.Ranges; +import accord.primitives.Routable; +import accord.primitives.SaveStatus; +import accord.primitives.Timestamp; +import accord.primitives.Txn; +import accord.primitives.TxnId; +import accord.utils.AccordGens; +import accord.utils.Gen; +import accord.utils.Gens; +import accord.utils.TriConsumer; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.util.File; +import org.apache.cassandra.journal.TestParams; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.accord.journal.AccordJournal; +import org.apache.cassandra.service.accord.journal.CommandChanges; +import org.apache.cassandra.utils.AccordGenerators; +import org.apache.cassandra.utils.AccordGenerators.CommandBuilder; + +import static accord.local.Cleanup.Input.FULL; +import static accord.local.RedundantStatus.SomeStatus.GC_BEFORE_AND_LOCALLY_DURABLE; +import static accord.primitives.Routable.Domain.Range; +import static accord.primitives.Txn.Kind.ExclusiveSyncPoint; +import static accord.utils.Property.qt; +import static org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse; + +/** + * Regression tests for the interaction between Accord journal cleanup decisions + * (driven by RedundantBefore / DurableBefore) and the {@code loadCommand} path. + * + *

Symptom: a transaction that has been processed and erased (or that was past the + * GC boundary by the time the journal saw it) is read back as {@code NotDefined} + * instead of {@code Erased}/{@code Truncated}.

+ * + *

Mechanism under test: when the FULL-input cleanup path decides EXPUNGE, + * {@link accord.impl.CommandChange.Builder#construct} returns {@code null}. + * Downstream, {@link AccordSafeCommand#preExecute} maps {@code null} to a + * {@code Command.NotDefined.uninitialised(...)} — which is exactly the bogus + * NotDefined the user observed. The journal load API itself should never collapse + * "this txnId has been erased" into the same answer as "we have never heard of this + * txnId" once RedundantBefore / DurableBefore say it is past the GC boundary.

+ * + * @author Claude and Benedict + */ +public class AccordExpungeTest +{ + private static final int COMMAND_STORE_ID = 1; + + private final AtomicInteger counter = new AtomicInteger(); + + @BeforeClass + public static void setUpClass() throws Throwable + { + SchemaLoader.prepareServer(); + // a single keyspace + table is enough; AccordGenerators.commandsBuilder() will use + // ks.tbl as the table for the synthetic Txn it produces. + SchemaLoader.createKeyspace("ks", KeyspaceParams.simple(1), + parse("CREATE TABLE tbl (k int, c int, v int, primary key (k, c)) WITH transactional_mode='full'", "ks")); + StorageService.instance.initServer(); + } + + @Before + public void beforeTest() throws Throwable + { + File directory = new File(Files.createTempDirectory(Integer.toString(counter.incrementAndGet()))); + directory.deleteRecursiveOnExit(); + DatabaseDescriptor.setAccordJournalDirectory(directory.path()); + } + + private void validate(TriConsumer validate) + { + Gen saveStatusGen = Gens.enums().all(SaveStatus.class); + qt().forAll(commands().map((rs, b) -> b.build(saveStatusGen.next(rs))).filter(c -> !c.participants.touches().isEmpty())) + .check(before -> + { + Ranges ranges = before.participants.touches().toRanges(); + TxnId gcBound = gcBoundStrictlyAfter(before); + RedundantBefore rb = RedundantBefore.create(ranges, Long.MIN_VALUE, Long.MAX_VALUE, gcBound, + GC_BEFORE_AND_LOCALLY_DURABLE); + DurableBefore db = DurableBefore.create(ranges, gcBound, gcBound); + + Cleanup decided = Cleanup.shouldCleanup(FULL, + before.txnId(), + before.executeAt(), + before.saveStatus(), + before.durability(), + before.participants(), + rb, db); + + Assert.assertEquals(Cleanup.EXPUNGE, decided); + + validate.accept(before, rb, db); + }); + } + + @Test + public void expungeReadsAsErased() + { + AccordJournal journal = newJournal(); + try + { + journal.start(null); + validate((before, rb, db) -> { + CommandChanges builder = new CommandChanges(before.txnId); + builder.maybeCleanup(true, FULL, rb, db); + Command reconstructed = builder.construct(rb); + Assert.assertTrue("Empty builder for txnId=" + before.txnId() + + " constructed a non-Erased command despite being past GC: " + reconstructed, + reconstructed != null && reconstructed.saveStatus() == SaveStatus.Erased); + + loadAndValidate(before, journal, rb, db); + journal.saveCommand(COMMAND_STORE_ID, new Journal.CommandUpdate(null, before), null); + loadAndValidate(before, journal, rb, db); + journal.closeCurrentSegmentForTestingIfNonEmpty(); + loadAndValidate(before, journal, rb, db); + }); + } + finally + { + journal.stop(); + } + } + + private static AccordJournal newJournal() + { + return new AccordJournal(new TestParams() + { + @Override public int segmentSize() { return 1 << 20; } + @Override public boolean enableCompaction() { return false; } + }); + } + + private void loadAndValidate(Command before, Journal journal, RedundantBefore rb, DurableBefore db) + { + SoftAssertions checks = new SoftAssertions(); + Command loaded = journal.loadCommand(COMMAND_STORE_ID, before.txnId, rb, db); + checks.assertThat(loaded).as("loadCommand returned null after RedundantBefore advance for %s; " + + "AccordSafeCommand will surface this as NotDefined", loaded).isNotNull(); + + checks.assertThat(loaded.saveStatus()) + .as("loadCommand did not return Erased for previously-written %s; " + + "loaded=%s, redundantBefore=%s", before, loaded, rb) + .isEqualTo(SaveStatus.Erased); + } + + private static Gen commands() + { + return AccordGenerators.commandsBuilder(AccordGens.txnIds(Gens.pick(Txn.Kind.Write, Txn.Kind.Read, ExclusiveSyncPoint, Txn.Kind.VisibilitySyncPoint))); + } + + /** + * Construct a TxnId strictly greater than {@code command.txnId()} and + * with an HLC strictly greater than {@code command.executeAt().hlc()} so that + * {@code Cleanup.expunge()} fires regardless of + * {@code dataStoreRequiresUniqueHlcs()} / {@code Write}-kind gating. + * + *

The bound must carry the {@link Timestamp.Flag#SHARD_BOUND} flag (see + * {@code RedundantBefore.Bounds} invariant); we also use + * {@link Txn.Kind#ExclusiveSyncPoint} and {@link Routable.Domain#Range} to match + * the way GC bounds are generated in production.

+ */ + private static TxnId gcBoundStrictlyAfter(Command command) + { + long hlc = command.txnId().hlc(); + long epoch = command.txnId().epoch(); + if (command.executeAt() != null) + { + hlc = Math.max(hlc, command.executeAt().hlc()); + epoch = Math.max(epoch, command.executeAt().epoch()); + } + + TxnId next = new TxnId(epoch, hlc + 1, ExclusiveSyncPoint, Range, command.txnId().node); + return next.addFlag(Timestamp.Flag.SHARD_BOUND); + } +} diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java index dcc631d776cf..fd2301046a44 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java @@ -49,7 +49,7 @@ import accord.impl.DefaultLocalListeners.NotifySink.NoOpNotifySink; import accord.local.Command; import accord.local.CommandStore; -import accord.local.CommandStores; +import accord.local.CommandStores.RangesForEpoch; import accord.local.DurableBefore; import accord.local.Node; import accord.local.Node.Id; @@ -404,15 +404,11 @@ public SequentialAsyncExecutor someSequentialExecutor() AccordJournal journal = new AccordJournal(spec); journal.start(null); - CommandStore.EpochUpdateHolder holder = new CommandStore.EpochUpdateHolder(); Ranges ranges = topology.rangesForNode(node); - holder.add(1, new CommandStores.RangesForEpoch(1, ranges), ranges); - AccordCommandStore result = new AccordCommandStore(0, time, agent, null, + return new AccordCommandStore(0, time, agent, null, cs -> new NoOpProgressLog(), cs -> new DefaultLocalListeners(null, new NoOpRemoteListeners(), new NoOpNotifySink()), - holder, journal, executor); - result.unsafeUpdateRangesForEpoch(); - return result; + new RangesForEpoch(1, ranges), journal, executor); } public static AccordCommandStore createAccordCommandStore( diff --git a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java index dd3e89d3fcb3..0fabb6735202 100644 --- a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java +++ b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java @@ -52,6 +52,7 @@ import accord.local.Command; import accord.local.CommandStore; import accord.local.CommandStores; +import accord.local.CommandStores.RangesForEpoch; import accord.local.DurableBefore; import accord.local.Node; import accord.local.NodeCommandStoreService; @@ -118,7 +119,6 @@ public class SimulatedAccordCommandStore implements AutoCloseable { private final List failures = new ArrayList<>(); private final SimulatedExecutorFactory globalExecutor; - private final CommandStore.EpochUpdateHolder updateHolder; private final BooleanSupplier shouldEvict, shouldFlush, shouldCompact; public final NodeCommandStoreService storeService; @@ -166,14 +166,12 @@ public SimulatedAccordCommandStore(@Nullable TableId tableId, RandomSource rs, F stage.unsafeSetExecutor(globalExecutor.configureSequential("ignore").build()); this.nodeId = AccordTopology.tcmIdToAccord(ClusterMetadata.currentNullable().myNodeId()); - this.updateHolder = new CommandStore.EpochUpdateHolder(); this.topology = AccordTopology.createAccordTopology(ClusterMetadata.current()); this.topologies = new Topologies.Single(SizeOfIntersectionSorter.SUPPLIER, topology); Ranges ranges = topology.ranges(); if (tableId != null) ranges = ranges.overlapping(Ranges.of(TokenRange.create(TokenKey.min(tableId, getPartitioner()), TokenKey.max(tableId, getPartitioner())))); - CommandStores.RangesForEpoch rangesForEpoch = new CommandStores.RangesForEpoch(topology.epoch(), ranges); - updateHolder.add(topology.epoch(), rangesForEpoch, ranges); + RangesForEpoch rangesForEpoch = new RangesForEpoch(topology.epoch(), ranges); this.storeService = new NodeCommandStoreService() { @@ -295,14 +293,13 @@ public void onException(Throwable t) @Override public void notify(SafeCommandStore safeStore, SafeCommand safeCommand, TxnId listener) {} @Override public boolean notify(SafeCommandStore safeStore, SafeCommand safeCommand, LocalListeners.ComplexListener listener) { return false; } }), - updateHolder, + rangesForEpoch, journal, new AccordExecutorSimple(0, CommandStore.class.getSimpleName() + '[' + 0 + ']', agent)); this.commandStore.executor().executeDirectlyWithLock(() -> { commandStore.executor().setCapacity(8 << 20); commandStore.executor().setWorkingSetSize(4 << 20); }); - commandStore.unsafeUpdateRangesForEpoch(); shouldEvict = boolSource(rs.fork()); { diff --git a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java index 016bde0fe965..138dbb4f12c9 100644 --- a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java +++ b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java @@ -64,6 +64,7 @@ import accord.local.Command; import accord.local.CommandBuilder; import accord.local.CommandStore; +import accord.local.CommandStores.RangesForEpoch; import accord.local.DurableBefore; import accord.local.Node; import accord.local.NodeCommandStoreService; @@ -662,7 +663,7 @@ protected TestCommandStore() null, ignore -> new ProgressLog.NoOpProgressLog(), ignore -> new DefaultLocalListeners(null, new DefaultRemoteListeners((a, b, c, d, e)->{}), DefaultLocalListeners.DefaultNotifySink.INSTANCE), - new EpochUpdateHolder()); + new RangesForEpoch(1, Ranges.EMPTY)); } @Override public boolean inStore() { return true; } diff --git a/test/unit/org/apache/cassandra/service/accord/serializers/LatestDepsSerializerTest.java b/test/unit/org/apache/cassandra/service/accord/serializers/LatestDepsSerializerTest.java index 96e3bc544ab2..7f3a25eb955c 100644 --- a/test/unit/org/apache/cassandra/service/accord/serializers/LatestDepsSerializerTest.java +++ b/test/unit/org/apache/cassandra/service/accord/serializers/LatestDepsSerializerTest.java @@ -99,12 +99,13 @@ private void testOne(IPartitioner partitioner, RandomSource rs) throws IOExcepti Arrays.sort(starts); for (int i = 0 ; i < size ; ++i) { - if (rs.nextBoolean()) continue; + if (size > 1 && rs.nextBoolean()) continue; entries[i] = new LatestDeps.LatestEntry(knownDeps.next(rs), rs.nextBoolean() ? rs.nextBoolean() ? Ballot.ZERO : Ballot.MAX : ballots.next(rs), rs.nextBoolean() ? null : deps.next(rs), rs.nextBoolean() ? null : deps.next(rs)); } + LatestDeps latestDeps = LatestDeps.SerializerSupport.create(starts, entries); DataOutputBuffer buf = new DataOutputBuffer(); Serializers.testSerde(buf, LatestDepsSerializers.latestDeps, latestDeps); diff --git a/test/unit/org/apache/cassandra/utils/AccordGenerators.java b/test/unit/org/apache/cassandra/utils/AccordGenerators.java index 3b936ece0b01..0e7d7a3685b8 100644 --- a/test/unit/org/apache/cassandra/utils/AccordGenerators.java +++ b/test/unit/org/apache/cassandra/utils/AccordGenerators.java @@ -192,7 +192,12 @@ public enum RecoveryStatus { None, Started, Complete } public static Gen commandsBuilder() { - return commandsBuilder(AccordGens.txnIds(), Gens.bools().all(), Gens.enums().all(RecoveryStatus.class), (rs, txnId, txn) -> AccordGens.depsFor(txnId, txn).next(rs)); + return commandsBuilder(AccordGens.txnIds()); + } + + public static Gen commandsBuilder(Gen txnIdGen) + { + return commandsBuilder(txnIdGen, Gens.bools().all(), Gens.enums().all(RecoveryStatus.class), (rs, txnId, txn) -> AccordGens.depsFor(txnId, txn).next(rs)); } public static Gen commandsBuilder(Gen txnIdGen, Gen fastPath, Gen recover, TriFunction depsGen)