diff --git a/.gitmodules b/.gitmodules index 616dacf610a7..6808b5b57864 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,4 +1,4 @@ [submodule "modules/accord"] path = modules/accord - url = https://github.com/apache/cassandra-accord.git - branch = trunk + url = https://github.com/belliottsmith/cassandra-accord.git + branch = consistent-expunge diff --git a/modules/accord b/modules/accord index 0a10cd056794..f7a5a0a9e261 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit 0a10cd056794c05588114f45ce86d49d6d6538db +Subproject commit f7a5a0a9e261f4778edf2d7c7db83449369b8679 diff --git a/src/java/org/apache/cassandra/config/AccordConfig.java b/src/java/org/apache/cassandra/config/AccordConfig.java index 0f502ddc1f92..b9fe93b9a490 100644 --- a/src/java/org/apache/cassandra/config/AccordConfig.java +++ b/src/java/org/apache/cassandra/config/AccordConfig.java @@ -317,6 +317,9 @@ public enum CatchupMode // TODO (required): roll this back to catchup_on_start_exit_on_failure: true public boolean catchup_on_start_exit_on_failure = false; public CatchupMode catchup_on_start = NORMAL; + public boolean execute_waiting_on_start = true; + public DurationSpec.IntSecondsBound execute_waiting_on_start_timeout = new DurationSpec.IntSecondsBound(0); + public boolean execute_waiting_on_start_fail_on_timeout = false; public DurationSpec.IntSecondsBound shutdown_grace_period = new DurationSpec.IntSecondsBound(15 * 60); public enum RangeIndexMode { in_memory, journal_sai } diff --git a/src/java/org/apache/cassandra/db/LivenessInfo.java b/src/java/org/apache/cassandra/db/LivenessInfo.java index 22865b325bef..46456028b7f9 100644 --- a/src/java/org/apache/cassandra/db/LivenessInfo.java +++ b/src/java/org/apache/cassandra/db/LivenessInfo.java @@ -53,7 +53,7 @@ public interface LivenessInfo extends IMeasurableMemory LivenessInfo EMPTY = new ImmutableLivenessInfo(NO_TIMESTAMP); long UNSHARED_HEAP_SIZE = ObjectSizes.measure(EMPTY); - static LivenessInfo create(long timestamp, long nowInSec) + static LivenessInfo create(long timestamp) { return new ImmutableLivenessInfo(timestamp); } @@ -75,14 +75,14 @@ private static LivenessInfo expiring(long timestamp, int ttl, long nowInSec, boo private static LivenessInfo create(long timestamp, int ttl, long nowInSec, boolean applyOverflowPolicy) { return ttl == NO_TTL - ? create(timestamp, nowInSec) + ? create(timestamp) : expiring(timestamp, ttl, nowInSec, applyOverflowPolicy); } static LivenessInfo create(long timestamp, int ttl, long nowInSec) { return ttl == NO_TTL - ? create(timestamp, nowInSec) + ? create(timestamp) : expiring(timestamp, ttl, nowInSec); } diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 86f54bf8266a..334322425d30 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -63,6 +63,7 @@ import org.apache.cassandra.io.util.File; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.service.accord.AccordCommandStore; import org.apache.cassandra.service.accord.AccordService; import org.apache.cassandra.service.accord.TokenRange; import org.apache.cassandra.service.accord.api.TokenKey; @@ -575,7 +576,7 @@ protected final CompactionController getCompactionController(Set // we should impose stronger guarantees on incoming streams (that they don't contain tombstones), and validate our CommandStore-derived bounds gcBeforeSeconds = node.durableBefore().foldlWithDefault(ranges, (e, v) -> e == null ? NO_GC : Math.min(e.universal.hlc(), v), null, Long.MAX_VALUE); StoreSelector selector = StoreFinder.selector(ranges, Long.MIN_VALUE, Long.MAX_VALUE); - gcBeforeSeconds = node.commandStores().mapReduceUnsafe(selector, (ignore, commandStore) -> commandStore.unsafeGetRedundantBefore().foldl(ranges, (bs, v) -> bs.maxBound(LOCALLY_APPLIED).hlc(), Long.MAX_VALUE), Math::min, gcBeforeSeconds); + gcBeforeSeconds = node.commandStores().mapReduceUnsafe(selector, (ignore, commandStore) -> ((AccordCommandStore)commandStore).safeGetRedundantBefore().foldl(ranges, (bs, v) -> bs.maxBound(LOCALLY_APPLIED).hlc(), Long.MAX_VALUE), Math::min, gcBeforeSeconds); if (gcBeforeSeconds == Long.MAX_VALUE) gcBeforeSeconds = NO_GC; else diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java index 7c338ff33db6..f80efa965cdf 100644 --- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java +++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java @@ -120,10 +120,7 @@ public static BTreeRow create(Clustering clustering, { long minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion.time())); if (minDeletionTime != Long.MIN_VALUE) - { - long result = BTree.accumulate(btree, (cd, l) -> Math.min(l, minDeletionTime(cd)) , minDeletionTime); - minDeletionTime = result; - } + minDeletionTime = BTree.accumulate(btree, (cd, l) -> Math.min(l, minDeletionTime(cd)) , minDeletionTime); return create(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime); } @@ -144,11 +141,17 @@ public static BTreeRow emptyRow(Clustering clustering) public static BTreeRow singleCellRow(Clustering clustering, Cell cell) { + return singleCellRow(clustering, LivenessInfo.EMPTY, cell); + } + + public static BTreeRow singleCellRow(Clustering clustering, LivenessInfo primaryKeyLivenessInfo, Cell cell) + { + long minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(cell)); if (cell.column().isSimple()) - return new BTreeRow(clustering, BTree.singleton(cell), minDeletionTime(cell)); + return new BTreeRow(clustering, primaryKeyLivenessInfo, Deletion.LIVE, BTree.singleton(cell), minDeletionTime); ComplexColumnData complexData = new ComplexColumnData(cell.column(), new Cell[]{ cell }, DeletionTime.LIVE); - return new BTreeRow(clustering, BTree.singleton(complexData), minDeletionTime(cell)); + return new BTreeRow(clustering, primaryKeyLivenessInfo, Deletion.LIVE, BTree.singleton(complexData), minDeletionTime); } public static BTreeRow emptyDeletedRow(Clustering clustering, Deletion deletion) diff --git a/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java index 021fc1f7fdb1..d51b4211e521 100644 --- a/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java +++ b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java @@ -497,9 +497,9 @@ else if (cell.localDeletionTime() > biggestExpirationCell.localDeletionTime()) } } if (baseLiveness.isLive(nowInSec) && !baseLiveness.isExpiring()) - return LivenessInfo.create(timestamp, nowInSec); + return LivenessInfo.create(timestamp); if (hasNonExpiringLiveCell) - return LivenessInfo.create(timestamp, nowInSec); + return LivenessInfo.create(timestamp); if (biggestExpirationCell == null) return baseLiveness; if (biggestExpirationCell.localDeletionTime() > baseLiveness.localExpirationTime() diff --git a/src/java/org/apache/cassandra/db/virtual/AbstractLazyVirtualTable.java b/src/java/org/apache/cassandra/db/virtual/AbstractLazyVirtualTable.java index d7a2664ad3be..1924cb6f8001 100644 --- a/src/java/org/apache/cassandra/db/virtual/AbstractLazyVirtualTable.java +++ b/src/java/org/apache/cassandra/db/virtual/AbstractLazyVirtualTable.java @@ -670,7 +670,7 @@ FilteredRow materialiseAndFilter(SimplePartition.SimpleRow parent) } Arrays.sort(columns, 0, columnCount, (a, b) -> ColumnData.comparator.compare((BufferCell)a, (BufferCell)b)); Object[] btree = BTree.build(BulkIterator.of(columns), columnCount, UpdateFunction.noOp); - BTreeRow row = BTreeRow.create(parent.clustering, LivenessInfo.create(timestampMicros, nowInSeconds), Row.Deletion.LIVE, btree); + BTreeRow row = BTreeRow.create(parent.clustering, LivenessInfo.create(timestampMicros), Row.Deletion.LIVE, btree); if (!rowFilter.isSatisfiedBy(metadata, parent.partitionKey(), row, nowInSeconds)) return null; return new FilteredRow(row); diff --git a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java index b593a5629914..8c63b5767e3d 100644 --- a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java +++ b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java @@ -308,7 +308,7 @@ private ExecutorsTable() "CREATE TABLE %s (\n" + " executor_id int,\n" + " status text,\n" + - " position int,\n" + + " position bigint,\n" + " unique_position int,\n" + " description text,\n" + " command_store_id int,\n" + diff --git a/src/java/org/apache/cassandra/db/virtual/RemoteToLocalVirtualTable.java b/src/java/org/apache/cassandra/db/virtual/RemoteToLocalVirtualTable.java index a32e94ab078a..8382d3473a75 100644 --- a/src/java/org/apache/cassandra/db/virtual/RemoteToLocalVirtualTable.java +++ b/src/java/org/apache/cassandra/db/virtual/RemoteToLocalVirtualTable.java @@ -188,7 +188,6 @@ protected void collect(PartitionsCollector collector) advance = -1; } - PartitionCollector partition = collector.partition(id.id()); while (i != end) { List request = rebind(local, slices.get(i), dataRange.isReversed(), rowFilter, columnFilter); @@ -200,7 +199,7 @@ protected void collect(PartitionsCollector collector) else readCommand = PartitionRangeReadCommand.create(local, collector.nowInSeconds(), send.columnFilter, send.rowFilter, limits, send.dataRange); - RequestAndResponse rr = new RequestAndResponse(id, partition, readCommand); + RequestAndResponse rr = new RequestAndResponse(id, collector, readCommand); send(rr, endpoint); pending.addLast(rr); @@ -218,12 +217,12 @@ protected void collect(PartitionsCollector collector) private static class RequestAndResponse extends SyncPromise { final NodeId nodeId; - final PartitionCollector partition; + final PartitionsCollector partitions; final ReadCommand readCommand; - private RequestAndResponse(NodeId nodeId, PartitionCollector partition, ReadCommand readCommand) + private RequestAndResponse(NodeId nodeId, PartitionsCollector partitions, ReadCommand readCommand) { this.nodeId = nodeId; - this.partition = partition; + this.partitions = partitions; this.readCommand = readCommand; } } @@ -292,6 +291,7 @@ private void collect(PartitionsCollector collector, RequestAndResponse rr, Funct } int pkCount = local.partitionKeyColumns().size(); + PartitionCollector out = rr.partitions.partition(rr.nodeId.id()); try (UnfilteredPartitionIterator partitions = response.makeIterator(rr.readCommand)) { while (partitions.hasNext()) @@ -311,7 +311,7 @@ private void collect(PartitionsCollector collector, RequestAndResponse rr, Funct for (int j = 0 ; j < clustering.size(); ++j) clusterings[pkCount + j] = clustering.bufferAt(j); } - rr.partition.collect(rows -> { + out.collect(rows -> { rows.add((Object[])clusterings) .lazyCollect(columns -> { row.forEach(cd -> { diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java index 1b13e1807fef..f4dd59a9c7ae 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java @@ -56,7 +56,7 @@ import accord.impl.progresslog.TxnState; import accord.local.Command; import accord.local.CommandStore; -import accord.local.CommandStores; +import accord.local.CommandStores.RangesForEpoch; import accord.local.CommandSummaries; import accord.local.MaxConflicts; import accord.local.MaxDecidedRX; @@ -234,32 +234,23 @@ public AccordCommandStore(int id, DataStore dataStore, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenerFactory, - EpochUpdateHolder epochUpdateHolder, + RangesForEpoch rangesForEpoch, Journal journal, AccordExecutor sharedExecutor) { - super(id, node, agent, dataStore, progressLogFactory, listenerFactory, epochUpdateHolder); + super(id, node, agent, dataStore, progressLogFactory, listenerFactory, rangesForEpoch); this.loggingId = String.format("[%s]", id); this.journal = journal; this.sharedExecutor = sharedExecutor; if (this.progressLog instanceof DefaultProgressLog) ((DefaultProgressLog)this.progressLog).unsafeSetConfig(DatabaseDescriptor.getAccordProgressLogConfig()); + maybeLoadRangesForEpoch(journal.loadRangesForEpoch(id())); maybeLoadRedundantBefore(journal.loadRedundantBefore(id())); maybeLoadBootstrapBeganAt(journal.loadBootstrapBeganAt(id())); maybeLoadSafeToRead(journal.loadSafeToRead(id())); - maybeLoadRangesForEpoch(journal.loadRangesForEpoch(id())); - - CommandStores.RangesForEpoch ranges = this.rangesForEpoch; - if (ranges == null || ranges.all().isEmpty()) - { - EpochUpdate update = epochUpdateHolder.get(); - if (update != null) - ranges = update.newRangesForEpoch; - Invariants.require(ranges != null, "CommandStore %d created with no ranges", id); - } - tableId = (TableId)ranges.all().stream().map(r -> r.start().prefix()).reduce((a, b) -> { + tableId = (TableId)rangesForEpoch.all().stream().map(r -> r.start().prefix()).reduce((a, b) -> { Invariants.require(a.equals(b), "CommandStore created with multiple distinct TableId (%s and %s)", a, b); return a; }).orElseThrow(() -> Invariants.illegalState("CommandStore %d created with no ranges", id)); @@ -610,8 +601,8 @@ public AccordCompactionInfo getCompactionInfo() RedundantBefore redundantBefore; if (safeRedundantBefore == null) redundantBefore = RedundantBefore.EMPTY; else redundantBefore = safeRedundantBefore.redundantBefore; - CommandStores.RangesForEpoch ranges = this.rangesForEpoch; - if (ranges == null) ranges = CommandStores.RangesForEpoch.EMPTY; + RangesForEpoch ranges = this.rangesForEpoch; + if (ranges == null) ranges = RangesForEpoch.EMPTY; return new AccordCompactionInfo(id, redundantBefore, ranges, tableId); } @@ -637,8 +628,8 @@ protected void ensureDurable(Ranges ranges, RedundantBefore onCommandStoreDurabl protected void ensureDurable() { - RedundantBefore forCommandStore = nonDurable(unsafeGetRedundantBefore(), LOCALLY_DURABLE_TO_COMMAND_STORE, LOCALLY_DURABLE_TO_COMMAND_STORE_ONLY); - RedundantBefore forDataStore = nonDurable(unsafeGetRedundantBefore(), LOCALLY_DURABLE_TO_DATA_STORE, LOCALLY_DURABLE_TO_DATA_STORE_ONLY); + RedundantBefore forCommandStore = nonDurable(safeGetRedundantBefore(), LOCALLY_DURABLE_TO_COMMAND_STORE, LOCALLY_DURABLE_TO_COMMAND_STORE_ONLY); + RedundantBefore forDataStore = nonDurable(safeGetRedundantBefore(), LOCALLY_DURABLE_TO_DATA_STORE, LOCALLY_DURABLE_TO_DATA_STORE_ONLY); this.ensureDurable(forCommandStore.ranges(Objects::nonNull), forCommandStore); dataStore.ensureDurable(this, forDataStore, 0); } @@ -659,7 +650,7 @@ private RedundantBefore nonDurable(RedundantBefore redundantBefore, Property dur protected void ensureDurable(@Nullable Ranges ranges, ReportDurable onCommandStoreDurable) { - if (node().isReplaying() && onCommandStoreDurable.flags == 0 && unsafeGetRedundantBefore().isAtLeast(onCommandStoreDurable.redundantBefore)) + if (node().isReplaying() && onCommandStoreDurable.flags == 0 && safeGetRedundantBefore().isAtLeast(onCommandStoreDurable.redundantBefore)) return; long reportId = nextDurabilityLoggingId.incrementAndGet(); @@ -722,13 +713,6 @@ public void unsafeUpsertRedundantBefore(RedundantBefore addRedundantBefore) super.unsafeUpsertRedundantBefore(addRedundantBefore); } - @VisibleForTesting - public void unsafeUpdateRangesForEpoch() - { - super.unsafeUpdateRangesForEpoch(); - safeRedundantBefore = new SafeRedundantBefore(0, unsafeGetRedundantBefore()); - } - public static class AccordCommandStoreReplayer extends AbstractReplayer { private final AccordCommandStore commandStore; @@ -760,18 +744,16 @@ public AsyncChain replay(TxnId txnId) * Replay/state reloading */ - void maybeLoadRedundantBefore(RedundantBefore redundantBefore) + protected void loadRedundantBefore(RedundantBefore redundantBefore) { - Invariants.require(safeRedundantBefore == null); - if (redundantBefore != null) - { + super.loadRedundantBefore(redundantBefore); + safeRedundantBefore = new SafeRedundantBefore(0, redundantBefore); + } + + protected void maybeLoadRedundantBefore(RedundantBefore redundantBefore) + { + if (redundantBefore != null && !redundantBefore.isEmpty()) loadRedundantBefore(redundantBefore); - safeRedundantBefore = new SafeRedundantBefore(0, redundantBefore); - } - else - { - safeRedundantBefore = new SafeRedundantBefore(0, this.unsafeGetRedundantBefore()); - } } void maybeLoadBootstrapBeganAt(NavigableMap bootstrapBeganAt) @@ -786,7 +768,7 @@ void maybeLoadSafeToRead(NavigableMap safeToRead) loadSafeToRead(safeToRead); } - void maybeLoadRangesForEpoch(CommandStores.RangesForEpoch rangesForEpoch) + void maybeLoadRangesForEpoch(RangesForEpoch rangesForEpoch) { if (rangesForEpoch != null) loadRangesForEpoch(rangesForEpoch); diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java index 983ad31461ba..5123471acdeb 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java +++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java @@ -52,6 +52,7 @@ import org.apache.cassandra.db.DataRange; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.LivenessInfo; import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.db.ReadExecutionController; import org.apache.cassandra.db.RegularAndStaticColumns; @@ -116,6 +117,7 @@ import static java.lang.String.format; import static java.util.Collections.emptyMap; +import static java.util.concurrent.TimeUnit.MICROSECONDS; import static org.apache.cassandra.config.AccordConfig.RangeIndexMode.journal_sai; import static org.apache.cassandra.db.partitions.PartitionUpdate.singleRowUpdate; import static org.apache.cassandra.db.rows.BTreeRow.singleCellRow; @@ -269,7 +271,7 @@ public static CommandsForKey load(int commandStoreId, TokenKey key) static CommandsForKey unsafeLoad(CommandsForKeyAccessor accessor, int commandStoreId, TokenKey key) { long timestampMicros = TimeUnit.MILLISECONDS.toMicros(Global.currentTimeMillis()); - int nowInSeconds = (int) TimeUnit.MICROSECONDS.toSeconds(timestampMicros); + int nowInSeconds = (int) MICROSECONDS.toSeconds(timestampMicros); SinglePartitionReadCommand command = makeRead(accessor, commandStoreId, key, nowInSeconds); @@ -346,7 +348,9 @@ public static PartitionUpdate makeUpdate(int storeId, TokenKey key, long timesta { return singleRowUpdate(CFKAccessor.table, CommandsForKeyAccessor.makeSystemTableKey(storeId, key), - singleCellRow(Clustering.EMPTY, BufferCell.live(CFKAccessor.data, timestampMicros, bytes))); + singleCellRow(Clustering.EMPTY, + LivenessInfo.create(timestampMicros), + BufferCell.live(CFKAccessor.data, timestampMicros, bytes))); } public static Runnable systemTableUpdater(int storeId, TokenKey key, CommandsForKey update, Object serialized, long timestampMicros) diff --git a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java index 4626a8c6bb09..6a9817bc9766 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java +++ b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java @@ -68,7 +68,6 @@ private AccordSafeCommandStore(AccordTask task, this.task = task; this.commandsForRanges = commandsForRanges; this.commandStore = commandStore; - commandStore.updateRangesForEpoch(this); } @Override diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index 8d6dbaabb4bc..340760370c1d 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -622,11 +622,25 @@ public synchronized void localStartup() { replayJournal(minSegments); - logger.info("Try to execute pending transactions..."); - List> results = new ArrayList<>(); - node.commandStores().forAllUnsafe(commandStore -> results.add(commandStore.tryToExecuteListeningTxns(false))); - if (!results.isEmpty()) - getBlocking(AsyncResults.reduce(results, Reduce.toNull())); + if (getAccord().execute_waiting_on_start) + { + logger.info("Execute waiting transactions..."); + List> results = new ArrayList<>(); + node.commandStores().forAllUnsafe(commandStore -> results.add(commandStore.tryToExecuteListeningTxns(false))); + if (!results.isEmpty()) + { + Future future = toFuture(AsyncResults.reduce(results, Reduce.toNull())); + long timeoutSeconds = getAccord().execute_waiting_on_start_timeout.toSeconds(); + if (timeoutSeconds <= 0) future.awaitUninterruptibly().rethrowIfFailed(); + else if (!future.awaitUninterruptibly(timeoutSeconds, SECONDS)) + { + if (getAccord().execute_waiting_on_start_fail_on_timeout) + throw new RuntimeException("Timeout waiting to exeute waiting transactions"); + logger.warn("Timeout waiting to exeute waiting transactions"); + } + else future.rethrowIfFailed(); + } + } } } finally diff --git a/src/java/org/apache/cassandra/service/accord/journal/JournalRangeIndex.java b/src/java/org/apache/cassandra/service/accord/journal/JournalRangeIndex.java index ca43832657af..eba2e534b4b4 100644 --- a/src/java/org/apache/cassandra/service/accord/journal/JournalRangeIndex.java +++ b/src/java/org/apache/cassandra/service/accord/journal/JournalRangeIndex.java @@ -366,7 +366,7 @@ static Object[] toMap(TxnId txnId, RangeRoute route) public JournalRangeIndex.Loader loader(TxnId primaryTxnId, Timestamp primaryExecuteAt, LoadKeysFor loadKeysFor, Unseekables keysOrRanges) { - RedundantBefore redundantBefore = commandStore.unsafeGetRedundantBefore(); + RedundantBefore redundantBefore = commandStore.safeGetRedundantBefore(); MaxDecidedRX maxDecidedRX = commandStore.unsafeGetMaxDecidedRX(); return SummaryLoader.loader(redundantBefore, maxDecidedRX, primaryTxnId, primaryExecuteAt, loadKeysFor, keysOrRanges, this::newLoader); } diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCommandStoreTryExecuteListeningTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCommandStoreTryExecuteListeningTest.java deleted file mode 100644 index 0277095da484..000000000000 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCommandStoreTryExecuteListeningTest.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.distributed.test.accord; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import java.util.stream.Stream; - -import org.assertj.core.api.Assertions; -import org.junit.Test; - -import accord.api.Journal; -import accord.local.Command; -import accord.local.Node; -import accord.local.PreLoadContext; -import accord.local.PreLoadContext.Empty; -import accord.local.StoreParticipants; -import accord.primitives.Ballot; -import accord.primitives.Deps; -import accord.primitives.FullRoute; -import accord.primitives.KeyDeps; -import accord.primitives.Range; -import accord.primitives.RangeDeps; -import accord.primitives.Ranges; -import accord.primitives.Routable; -import accord.primitives.RoutingKeys; -import accord.primitives.SaveStatus; -import accord.primitives.Status; -import accord.primitives.Txn; -import accord.primitives.TxnId; -import accord.topology.TopologyException; -import accord.utils.ImmutableBitSet; -import accord.utils.LargeBitSet; - -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.distributed.Cluster; -import org.apache.cassandra.distributed.api.TokenSupplier; -import org.apache.cassandra.distributed.shared.NetworkTopology; -import org.apache.cassandra.distributed.test.TestBaseImpl; -import org.apache.cassandra.schema.Schema; -import org.apache.cassandra.schema.TableId; -import org.apache.cassandra.service.accord.AccordCommandStore; -import org.apache.cassandra.service.accord.AccordService; -import org.apache.cassandra.service.accord.api.PartitionKey; -import org.apache.cassandra.service.accord.txn.TxnDataResult; -import org.apache.cassandra.utils.ByteBufferUtil; - -import static org.apache.cassandra.distributed.api.Feature.GOSSIP; -import static org.apache.cassandra.distributed.api.Feature.NETWORK; - -public class AccordCommandStoreTryExecuteListeningTest extends TestBaseImpl -{ - private static DecoratedKey dk(int key) - { - IPartitioner partitioner = DatabaseDescriptor.getPartitioner(); - return partitioner.decorateKey(ByteBufferUtil.bytes(key)); - } - - private static PartitionKey pk(int key, String keyspace, String table) - { - TableId tid = Schema.instance.getTableMetadata(keyspace, table).id; - return new PartitionKey(tid, dk(key)); - } - - @Test - public void testTryExecuteListening() throws Throwable - { - try (Cluster cluster = Cluster.build().withNodes(1) - .withoutVNodes() - .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(1)) - .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(1, "dc0", "rack0")) - .withConfig(config -> config.set("accord.command_store_shard_count", 1) - .set("accord.queue_shard_count", 1) - .with(NETWORK, GOSSIP)) - .start()) - { - cluster.schemaChange("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor':1}"); - cluster.schemaChange("CREATE TABLE ks.tbl (k int, c int, v int, primary key(k, c)) WITH transactional_mode='full'"); - - cluster.get(1).runOnInstance(() -> { - AccordService service = (AccordService) AccordService.instance(); - - Node node = service.node(); - PartitionKey key = pk(1, "ks", "tbl"); - AccordCommandStore commandStore = (AccordCommandStore) node.commandStores().unsafeForKey(key.toUnseekable()); - - Command txn1a = executed(node, SaveStatus.Applied, 1); - Command txn1b = executed(node, SaveStatus.PreApplied, 2); - Command txn2a = executed(node, SaveStatus.PreApplied, 1, txn1a.txnId()); - Command txn2b = executed(node, SaveStatus.PreApplied, 2, txn1b.txnId()); - Command txn3 = executed(node, SaveStatus.PreApplied,1, txn1a.txnId(), 2, txn1b.txnId(), txn2b.txnId()); - Command txn4 = executed(node, SaveStatus.PreApplied, 1, txn1a.txnId(), txn3.txnId(), 2, txn1b.txnId(), txn3.txnId()); - Command[] commands = new Command[] { txn1a, txn1b, txn2a, txn2b, txn3, txn4 }; - - AccordService.getBlocking(commandStore.chain((Empty)() -> "Test", safeStore -> { - for (Command command : commands) - commandStore.journal.saveCommand(commandStore.id(), new Journal.CommandUpdate(null, command), () -> {}); - - commandStore.unsafeGetListeners().register(txn1a.txnId(), SaveStatus.Applied, txn2a.txnId()); - commandStore.unsafeGetListeners().register(txn3.txnId(), SaveStatus.Applied, txn4.txnId()); - })); - - AccordService.getBlocking(commandStore.tryToExecuteListeningTxns(true)); - - for (Command command : commands) - { - Command cmd = AccordService.getBlocking(commandStore.submit(PreLoadContext.contextFor(command.txnId(), "Test"), safeStore -> safeStore.unsafeGet(command.txnId()).current())); - Assertions.assertThat(cmd.saveStatus()).isEqualTo(SaveStatus.Applied); - } - }); - } - } - - private static Command executed(Node node, SaveStatus saveStatus, Object ... inputs) - { - int depCount; - Map> depsByInputKey = new TreeMap<>(); - TxnId[] txnIds; - { - PartitionKey k = null; - for (Object input : inputs) - { - if (input instanceof Integer) - { - k = keyN((Integer) input, node); - depsByInputKey.put(k, new ArrayList<>()); - } - else depsByInputKey.get(k).add((TxnId)input); - } - txnIds = depsByInputKey.values().stream().flatMap(Collection::stream).distinct().sorted().toArray(TxnId[]::new); - depCount = depsByInputKey.values().stream().mapToInt(Collection::size).sum(); - } - PartitionKey[] keys = depsByInputKey.keySet().toArray(PartitionKey[]::new); - Range[] ranges = Stream.of(keys).map(PartitionKey::asRange).toArray(Range[]::new); - - PartitionKey key = keys[0]; - AccordCommandStore commandStore = (AccordCommandStore) node.commandStores().unsafeForKey(key.toUnseekable()); - - Txn txn = node.agent().emptySystemTxn(Txn.Kind.ExclusiveSyncPoint, Routable.Domain.Range); - TxnId txnId = node.nextTxnId(txn); - FullRoute route; - try { route = node.computeRoute(txnId, Ranges.of(ranges)); } - catch (TopologyException e) { throw new RuntimeException(e); } - int[] rangesToTxnIds = new int[depCount + ranges.length]; - { - int offset = ranges.length; - for (int i = 0 ; i < ranges.length ; ++i) - { - for (TxnId dep : depsByInputKey.get(keys[i])) - rangesToTxnIds[offset++] = Arrays.binarySearch(txnIds, dep); - rangesToTxnIds[i] = offset; - } - } - Deps deps = new Deps(KeyDeps.NONE, RangeDeps.SerializerSupport.create(ranges, txnIds, rangesToTxnIds, null)); - Command.WaitingOn waitingOn; { - LargeBitSet waitingOnBits = new LargeBitSet(txnIds.length); - waitingOnBits.setRange(0, txnIds.length); - waitingOn = new Command.WaitingOn(RoutingKeys.EMPTY, deps.rangeDeps, new ImmutableBitSet(waitingOnBits), new ImmutableBitSet(txnIds.length)); - } - return Command.Executed.executed(txnId, saveStatus, Status.Durability.NotDurable, StoreParticipants.execute(commandStore.unsafeGetRangesForEpoch(), route, txnId, txnId.epoch()), Ballot.ZERO, txnId, txn.intersecting(route, true), deps.intersecting(route), Ballot.ZERO, waitingOn, null, TxnDataResult.PERSISTABLE); - } - - private static PartitionKey keyN(int n, Node node) - { - PartitionKey first = pk(1, "ks", "tbl"); - if (n == 1) - return first; - - AccordCommandStore commandStore = (AccordCommandStore) node.commandStores().unsafeForKey(first.toUnseekable()); - - int i = 2; - while (true) - { - PartitionKey next = pk(i, "ks", "tbl"); - if (commandStore.unsafeGetRangesForEpoch().all().contains(next) && --n == 0) - return next; - } - } - -} diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java index b0f339523cd7..cee395977dbf 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java @@ -116,14 +116,12 @@ public static void setUp() throws IOException .set("accord.shard_durability_max_splits", "16") .set("accord.shard_durability_cycle", "1m") .set("accord.queue_submission_model", "SIGNAL") -// .set("accord.queue_submission_model", "SEMI_SYNC") .set("accord.command_store_shard_count", "8") .set("accord.queue_thread_count", "4") .set("accord.queue_shard_count", "1") .set("accord.replica_execution", "ALL") .set("accord.send_stable", "TO_ALL_REPLICA_EXECUTABLE_ELSE_FOR_READS") .set("accord.send_minimal", "false") -// .set("accord.permit_fast_quorum_medium_path", "false") .set("accord.catchup_on_start_fail_latency", "2m"); }), nodeCount); } diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/journal/AccordJournalConsistentExpungeTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/journal/AccordJournalConsistentExpungeTest.java new file mode 100644 index 000000000000..8770df01c96f --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/journal/AccordJournalConsistentExpungeTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.accord.journal; + + +import java.util.Iterator; + +import org.junit.Test; + +import accord.local.Command; +import accord.local.Node; +import accord.primitives.SaveStatus; +import accord.primitives.TxnId; + +import org.apache.cassandra.Util; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.service.accord.AccordCacheEntry; +import org.apache.cassandra.service.accord.AccordCommandStore; +import org.apache.cassandra.service.accord.AccordService; +import org.apache.cassandra.service.accord.api.PartitionKey; +import org.apache.cassandra.utils.ByteBufferUtil; + +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class AccordJournalConsistentExpungeTest extends TestBaseImpl +{ + private static DecoratedKey dk(int key) + { + IPartitioner partitioner = DatabaseDescriptor.getPartitioner(); + return partitioner.decorateKey(ByteBufferUtil.bytes(key)); + } + + private static PartitionKey pk(int key, String keyspace, String table) + { + TableId tid = Schema.instance.getTableMetadata(keyspace, table).id; + return new PartitionKey(tid, dk(key)); + } + + @Test + public void loadCommandErasedTest() throws Throwable + { + try (Cluster cluster = Cluster.build().withNodes(3) + .withoutVNodes() + .withConfig(config -> config + .set("accord.shard_durability_cycle", "20s") + .set("accord.ephemeral_reads", false) + .with(NETWORK, GOSSIP)) + .start()) + { + cluster.schemaChange("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor':3}"); + cluster.schemaChange("CREATE TABLE ks.tbl (k int PRIMARY KEY, v int) WITH transactional_mode='full'"); + + cluster.get(1).executeInternal("BEGIN TRANSACTION \n" + + "SELECT * FROM ks.tbl WHERE k = 1; \n" + + "COMMIT TRANSACTION"); + + cluster.get(1).runOnInstance(() -> { + AccordService service = (AccordService) AccordService.instance(); + PartitionKey key = pk(1, "ks", "tbl"); + + Node node = service.node(); + AccordCommandStore commandStore = (AccordCommandStore) node.commandStores().unsafeForKey(key.toUnseekable()); + + Iterator> iterator = commandStore.cachesUnsafe().commands().iterator(); + + TxnId txnId = TxnId.NONE; + + while (iterator.hasNext()) + { + txnId = iterator.next().key(); + if (!txnId.isSystemTxn()) + break; + } + + assertFalse(txnId.isSystemTxn()); + + TxnId finalTxnId = txnId; + Util.spinUntilTrue(() -> commandStore.safeGetRedundantBefore().minGcBefore().compareTo(finalTxnId) >= 0, 25); + + service.journal().purge(service.node().commandStores(), node.topology()::minEpoch); + + assertEquals(SaveStatus.Erased, commandStore.loadCommand(txnId).saveStatus); + }); + } + } +} + diff --git a/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java b/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java index e12ec5875235..0fce66a1f1d4 100644 --- a/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java +++ b/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java @@ -73,6 +73,7 @@ public static void setUp() throws Throwable } private AtomicInteger counter = new AtomicInteger(); + @Before public void beforeTest() throws Throwable { diff --git a/test/distributed/org/apache/cassandra/service/accord/journal/AccordJournalBurnTest.java b/test/distributed/org/apache/cassandra/service/accord/journal/AccordJournalBurnTest.java index ab4ee9b3960e..00d54bd3ce7d 100644 --- a/test/distributed/org/apache/cassandra/service/accord/journal/AccordJournalBurnTest.java +++ b/test/distributed/org/apache/cassandra/service/accord/journal/AccordJournalBurnTest.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.file.Files; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.List; @@ -94,6 +95,7 @@ import org.apache.cassandra.utils.CloseableIterator; import static accord.impl.PrefixedIntHashKey.ranges; +import static accord.primitives.SaveStatus.Erased; import static org.apache.cassandra.config.AccordConfig.RangeIndexMode.journal_sai; import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID; @@ -101,6 +103,11 @@ public class AccordJournalBurnTest extends BurnTestBase { private static final Logger logger = LoggerFactory.getLogger(AccordJournalBurnTest.class); + static + { + Cluster.RandomLoader.CMD_BASE_CHECK_CHANCE = 0.1f; + } + public static void setUp() throws Throwable { StorageService.instance.registerMBeans(); @@ -331,13 +338,21 @@ public void purge(CommandStores commandStores, EpochSupplier minEpoch) { Command b = e.getValue(); Command a = after.get(e.getKey()); + if (b != null && b.saveStatus == Erased) b = null; + if (a != null && a.saveStatus == Erased) a = null; Invariants.require(Objects.equals(a, b)); } if (before.size() != after.size()) { - for (Map.Entry e : after.entrySet()) - Invariants.require(null != before.get(e.getKey())); - Invariants.require(false); + for (Map m : Arrays.asList(before, after)) + { + for (JournalKey k : m.keySet()) + { + Command b = before.get(k); + Command a = after.get(k); + Invariants.require((a == null || a.saveStatus == Erased) == (b == null || b.saveStatus == Erased), "%s != %s", a, b); + } + } } Invariants.require(!orig.equals(table.getLiveSSTables())); } diff --git a/test/unit/org/apache/cassandra/db/partitions/SimplePartition.java b/test/unit/org/apache/cassandra/db/partitions/SimplePartition.java index d13987468785..1d14cbb474a8 100644 --- a/test/unit/org/apache/cassandra/db/partitions/SimplePartition.java +++ b/test/unit/org/apache/cassandra/db/partitions/SimplePartition.java @@ -113,12 +113,12 @@ public SimplePartition addEmpty(Clustering ck) public SimplePartition addEmptyAndLive(Clustering ck) { - return addEmptyAndLive(ck, DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP); + return addEmptyAndLive(ck, DEFAULT_TIMESTAMP); } - public SimplePartition addEmptyAndLive(Clustering ck, long timestamp, long nowInSec) + public SimplePartition addEmptyAndLive(Clustering ck, long timestamp) { - return add(ck).liveness(timestamp, nowInSec).build(); + return add(ck).liveness(timestamp).build(); } public RowIterator filtered() @@ -142,9 +142,9 @@ public RowBuilder timestamp(long timestamp) return this; } - public RowBuilder liveness(long timestamp, long nowInSec) + public RowBuilder liveness(long timestamp) { - builder.addPrimaryKeyLivenessInfo(LivenessInfo.create(timestamp, nowInSec)); + builder.addPrimaryKeyLivenessInfo(LivenessInfo.create(timestamp)); return this; } diff --git a/test/unit/org/apache/cassandra/db/rows/BTreeRowHasLiveDataTest.java b/test/unit/org/apache/cassandra/db/rows/BTreeRowHasLiveDataTest.java index 7eab66e2b0b7..64768ff63610 100644 --- a/test/unit/org/apache/cassandra/db/rows/BTreeRowHasLiveDataTest.java +++ b/test/unit/org/apache/cassandra/db/rows/BTreeRowHasLiveDataTest.java @@ -146,7 +146,7 @@ public void livePK_returnsTrue() long ts = timestampMicro(nowInSec); Row.Builder b = newBuilder(); - b.addPrimaryKeyLivenessInfo(LivenessInfo.create(ts, nowInSec)); + b.addPrimaryKeyLivenessInfo(LivenessInfo.create(ts)); // No cells. Row row = b.build(); diff --git a/test/unit/org/apache/cassandra/db/rows/RowsTest.java b/test/unit/org/apache/cassandra/db/rows/RowsTest.java index 899dae6a4376..06bf701a2f76 100644 --- a/test/unit/org/apache/cassandra/db/rows/RowsTest.java +++ b/test/unit/org/apache/cassandra/db/rows/RowsTest.java @@ -230,7 +230,7 @@ private static Row.Builder createBuilder(Clustering c, long now, ByteBuffer v { long ts = secondToTs(now); Row.Builder builder = createBuilder(c); - builder.addPrimaryKeyLivenessInfo(LivenessInfo.create(ts, now)); + builder.addPrimaryKeyLivenessInfo(LivenessInfo.create(ts)); if (vVal != null) { builder.addCell(BufferCell.live(v, ts, vVal)); @@ -251,7 +251,7 @@ public void collectStats() long ts = secondToTs(now); Row.Builder builder = BTreeRow.unsortedBuilder(); builder.newRow(c1); - LivenessInfo liveness = LivenessInfo.create(ts, now); + LivenessInfo liveness = LivenessInfo.create(ts); builder.addPrimaryKeyLivenessInfo(liveness); DeletionTime complexDeletion = DeletionTime.build(ts-1, now); builder.addComplexDeletion(m, complexDeletion); @@ -289,7 +289,7 @@ public void diff() long ts1 = secondToTs(now1); Row.Builder r1Builder = BTreeRow.unsortedBuilder(); r1Builder.newRow(c1); - LivenessInfo r1Liveness = LivenessInfo.create(ts1, now1); + LivenessInfo r1Liveness = LivenessInfo.create(ts1); r1Builder.addPrimaryKeyLivenessInfo(r1Liveness); DeletionTime r1ComplexDeletion = DeletionTime.build(ts1-1, now1); r1Builder.addComplexDeletion(m, r1ComplexDeletion); @@ -305,7 +305,7 @@ public void diff() long ts2 = secondToTs(now2); Row.Builder r2Builder = BTreeRow.unsortedBuilder(); r2Builder.newRow(c1); - LivenessInfo r2Liveness = LivenessInfo.create(ts2, now2); + LivenessInfo r2Liveness = LivenessInfo.create(ts2); r2Builder.addPrimaryKeyLivenessInfo(r2Liveness); Cell r2v = BufferCell.live(v, ts2, BB2); Cell r2m2 = BufferCell.live(m, ts2, BB1, CellPath.create(BB2)); @@ -365,7 +365,7 @@ public void diffEmptyMerged() long ts1 = secondToTs(now1); Row.Builder r1Builder = BTreeRow.unsortedBuilder(); r1Builder.newRow(c1); - LivenessInfo r1Liveness = LivenessInfo.create(ts1, now1); + LivenessInfo r1Liveness = LivenessInfo.create(ts1); r1Builder.addPrimaryKeyLivenessInfo(r1Liveness); // mergedData == null @@ -373,7 +373,7 @@ public void diffEmptyMerged() long ts2 = secondToTs(now2); Row.Builder r2Builder = BTreeRow.unsortedBuilder(); r2Builder.newRow(c1); - LivenessInfo r2Liveness = LivenessInfo.create(ts2, now2); + LivenessInfo r2Liveness = LivenessInfo.create(ts2); r2Builder.addPrimaryKeyLivenessInfo(r2Liveness); DeletionTime r2ComplexDeletion = DeletionTime.build(ts2-1, now2); r2Builder.addComplexDeletion(m, r2ComplexDeletion); @@ -419,7 +419,7 @@ public void diffEmptyInput() long ts1 = secondToTs(now1); Row.Builder r1Builder = BTreeRow.unsortedBuilder(); r1Builder.newRow(c1); - LivenessInfo r1Liveness = LivenessInfo.create(ts1, now1); + LivenessInfo r1Liveness = LivenessInfo.create(ts1); r1Builder.addPrimaryKeyLivenessInfo(r1Liveness); // mergedData == null @@ -427,7 +427,7 @@ public void diffEmptyInput() long ts2 = secondToTs(now2); Row.Builder r2Builder = BTreeRow.unsortedBuilder(); r2Builder.newRow(c1); - LivenessInfo r2Liveness = LivenessInfo.create(ts2, now2); + LivenessInfo r2Liveness = LivenessInfo.create(ts2); r2Builder.addPrimaryKeyLivenessInfo(r2Liveness); DeletionTime r2ComplexDeletion = DeletionTime.build(ts2-1, now2); r2Builder.addComplexDeletion(m, r2ComplexDeletion); @@ -484,7 +484,7 @@ public void merge() Row merged = Rows.merge(existingBuilder.build(), updateBuilder.build()); Assert.assertEquals(c1, merged.clustering()); - Assert.assertEquals(LivenessInfo.create(ts2, now2), merged.primaryKeyLivenessInfo()); + Assert.assertEquals(LivenessInfo.create(ts2), merged.primaryKeyLivenessInfo()); Iterator> iter = merged.cells().iterator(); Assert.assertTrue(iter.hasNext()); diff --git a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java index f39dbaeb6295..ba38d9ca7d02 100644 --- a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java +++ b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java @@ -405,7 +405,7 @@ private static Clustering clusteringFor(int i) static Row emptyRowAt(int pos, IntUnaryOperator timeGenerator) { final Clustering clustering = clusteringFor(pos); - final LivenessInfo live = LivenessInfo.create(timeGenerator.applyAsInt(pos), nowInSec); + final LivenessInfo live = LivenessInfo.create(timeGenerator.applyAsInt(pos)); return BTreeRow.noCellLiveRow(clustering, live); } diff --git a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowsGenerator.java b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowsGenerator.java index a4a5c35f53f8..0cb06cd02cda 100644 --- a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowsGenerator.java +++ b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowsGenerator.java @@ -235,14 +235,14 @@ public List parse(String input, int default_liveness) static Row emptyRowAt(int pos, IntUnaryOperator timeGenerator) { final Clustering clustering = clusteringFor(pos); - final LivenessInfo live = LivenessInfo.create(timeGenerator.applyAsInt(pos), UnfilteredRowIteratorsMergeTest.nowInSec); + final LivenessInfo live = LivenessInfo.create(timeGenerator.applyAsInt(pos)); return BTreeRow.noCellLiveRow(clustering, live); } static Row emptyRowAt(int pos, int time, long deletionTime) { final Clustering clustering = clusteringFor(pos); - final LivenessInfo live = LivenessInfo.create(time, UnfilteredRowIteratorsMergeTest.nowInSec); + final LivenessInfo live = LivenessInfo.create(time); final DeletionTime delTime = deletionTime == -1 ? DeletionTime.LIVE : DeletionTime.build(deletionTime, deletionTime); return BTreeRow.create(clustering, live, Row.Deletion.regular(delTime), BTree.empty()); } diff --git a/test/unit/org/apache/cassandra/db/transform/DuplicateRowCheckerTest.java b/test/unit/org/apache/cassandra/db/transform/DuplicateRowCheckerTest.java index 15ac67eb89e7..2dcae5945d93 100644 --- a/test/unit/org/apache/cassandra/db/transform/DuplicateRowCheckerTest.java +++ b/test/unit/org/apache/cassandra/db/transform/DuplicateRowCheckerTest.java @@ -212,7 +212,7 @@ public static Row makeRow(TableMetadata metadata, Object... clusteringValues) for (int i = 0; i < clusteringValues.length; i++) clusteringByteBuffers[i] = decompose(metadata.clusteringColumns().get(i).type, clusteringValues[i]); - return BTreeRow.noCellLiveRow(Clustering.make(clusteringByteBuffers), LivenessInfo.create(0, 0)); + return BTreeRow.noCellLiveRow(Clustering.make(clusteringByteBuffers), LivenessInfo.create(0)); } public static UnfilteredRowIterator partition(TableMetadata metadata, diff --git a/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java b/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java index b223c0843e25..125658d4c9b7 100644 --- a/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java +++ b/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java @@ -410,7 +410,7 @@ private Row row(long timestamp, Object... clusteringValues) for (int i = 0; i < clusteringValues.length; i++) clusteringByteBuffers[i] = decompose(metadata.clusteringColumns().get(i).type, clusteringValues[i]); - return BTreeRow.noCellLiveRow(Clustering.make(clusteringByteBuffers), LivenessInfo.create(timestamp, nowInSec)); + return BTreeRow.noCellLiveRow(Clustering.make(clusteringByteBuffers), LivenessInfo.create(timestamp)); } @SuppressWarnings("unchecked") diff --git a/test/unit/org/apache/cassandra/service/accord/AccordExpungeTest.java b/test/unit/org/apache/cassandra/service/accord/AccordExpungeTest.java new file mode 100644 index 000000000000..66514033a5fc --- /dev/null +++ b/test/unit/org/apache/cassandra/service/accord/AccordExpungeTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service.accord; + +import java.nio.file.Files; +import java.util.concurrent.atomic.AtomicInteger; + +import org.assertj.core.api.SoftAssertions; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import accord.api.Journal; +import accord.local.Cleanup; +import accord.local.Command; +import accord.local.DurableBefore; +import accord.local.RedundantBefore; +import accord.primitives.Ranges; +import accord.primitives.Routable; +import accord.primitives.SaveStatus; +import accord.primitives.Timestamp; +import accord.primitives.Txn; +import accord.primitives.TxnId; +import accord.utils.AccordGens; +import accord.utils.Gen; +import accord.utils.Gens; +import accord.utils.TriConsumer; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.util.File; +import org.apache.cassandra.journal.TestParams; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.accord.journal.AccordJournal; +import org.apache.cassandra.service.accord.journal.CommandChanges; +import org.apache.cassandra.utils.AccordGenerators; +import org.apache.cassandra.utils.AccordGenerators.CommandBuilder; + +import static accord.local.Cleanup.Input.FULL; +import static accord.local.RedundantStatus.SomeStatus.GC_BEFORE_AND_LOCALLY_DURABLE; +import static accord.primitives.Routable.Domain.Range; +import static accord.primitives.Txn.Kind.ExclusiveSyncPoint; +import static accord.utils.Property.qt; +import static org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse; + +/** + * Regression tests for the interaction between Accord journal cleanup decisions + * (driven by RedundantBefore / DurableBefore) and the {@code loadCommand} path. + * + *

Symptom: a transaction that has been processed and erased (or that was past the + * GC boundary by the time the journal saw it) is read back as {@code NotDefined} + * instead of {@code Erased}/{@code Truncated}.

+ * + *

Mechanism under test: when the FULL-input cleanup path decides EXPUNGE, + * {@link accord.impl.CommandChange.Builder#construct} returns {@code null}. + * Downstream, {@link AccordSafeCommand#preExecute} maps {@code null} to a + * {@code Command.NotDefined.uninitialised(...)} — which is exactly the bogus + * NotDefined the user observed. The journal load API itself should never collapse + * "this txnId has been erased" into the same answer as "we have never heard of this + * txnId" once RedundantBefore / DurableBefore say it is past the GC boundary.

+ * + * @author Claude and Benedict + */ +public class AccordExpungeTest +{ + private static final int COMMAND_STORE_ID = 1; + + private final AtomicInteger counter = new AtomicInteger(); + + @BeforeClass + public static void setUpClass() throws Throwable + { + SchemaLoader.prepareServer(); + // a single keyspace + table is enough; AccordGenerators.commandsBuilder() will use + // ks.tbl as the table for the synthetic Txn it produces. + SchemaLoader.createKeyspace("ks", KeyspaceParams.simple(1), + parse("CREATE TABLE tbl (k int, c int, v int, primary key (k, c)) WITH transactional_mode='full'", "ks")); + StorageService.instance.initServer(); + } + + @Before + public void beforeTest() throws Throwable + { + File directory = new File(Files.createTempDirectory(Integer.toString(counter.incrementAndGet()))); + directory.deleteRecursiveOnExit(); + DatabaseDescriptor.setAccordJournalDirectory(directory.path()); + } + + private void validate(TriConsumer validate) + { + Gen saveStatusGen = Gens.enums().all(SaveStatus.class); + qt().forAll(commands().map((rs, b) -> b.build(saveStatusGen.next(rs))).filter(c -> !c.participants.touches().isEmpty())) + .check(before -> + { + Ranges ranges = before.participants.touches().toRanges(); + TxnId gcBound = gcBoundStrictlyAfter(before); + RedundantBefore rb = RedundantBefore.create(ranges, Long.MIN_VALUE, Long.MAX_VALUE, gcBound, + GC_BEFORE_AND_LOCALLY_DURABLE); + DurableBefore db = DurableBefore.create(ranges, gcBound, gcBound); + + Cleanup decided = Cleanup.shouldCleanup(FULL, + before.txnId(), + before.executeAt(), + before.saveStatus(), + before.durability(), + before.participants(), + rb, db); + + Assert.assertEquals(Cleanup.EXPUNGE, decided); + + validate.accept(before, rb, db); + }); + } + + @Test + public void expungeReadsAsErased() + { + AccordJournal journal = newJournal(); + try + { + journal.start(null); + validate((before, rb, db) -> { + CommandChanges builder = new CommandChanges(before.txnId); + builder.maybeCleanup(true, FULL, rb, db); + Command reconstructed = builder.construct(rb); + Assert.assertTrue("Empty builder for txnId=" + before.txnId() + + " constructed a non-Erased command despite being past GC: " + reconstructed, + reconstructed != null && reconstructed.saveStatus() == SaveStatus.Erased); + + loadAndValidate(before, journal, rb, db); + journal.saveCommand(COMMAND_STORE_ID, new Journal.CommandUpdate(null, before), null); + loadAndValidate(before, journal, rb, db); + journal.closeCurrentSegmentForTestingIfNonEmpty(); + loadAndValidate(before, journal, rb, db); + }); + } + finally + { + journal.stop(); + } + } + + private static AccordJournal newJournal() + { + return new AccordJournal(new TestParams() + { + @Override public int segmentSize() { return 1 << 20; } + @Override public boolean enableCompaction() { return false; } + }); + } + + private void loadAndValidate(Command before, Journal journal, RedundantBefore rb, DurableBefore db) + { + SoftAssertions checks = new SoftAssertions(); + Command loaded = journal.loadCommand(COMMAND_STORE_ID, before.txnId, rb, db); + checks.assertThat(loaded).as("loadCommand returned null after RedundantBefore advance for %s; " + + "AccordSafeCommand will surface this as NotDefined", loaded).isNotNull(); + + checks.assertThat(loaded.saveStatus()) + .as("loadCommand did not return Erased for previously-written %s; " + + "loaded=%s, redundantBefore=%s", before, loaded, rb) + .isEqualTo(SaveStatus.Erased); + } + + private static Gen commands() + { + return AccordGenerators.commandsBuilder(AccordGens.txnIds(Gens.pick(Txn.Kind.Write, Txn.Kind.Read, ExclusiveSyncPoint, Txn.Kind.VisibilitySyncPoint))); + } + + /** + * Construct a TxnId strictly greater than {@code command.txnId()} and + * with an HLC strictly greater than {@code command.executeAt().hlc()} so that + * {@code Cleanup.expunge()} fires regardless of + * {@code dataStoreRequiresUniqueHlcs()} / {@code Write}-kind gating. + * + *

The bound must carry the {@link Timestamp.Flag#SHARD_BOUND} flag (see + * {@code RedundantBefore.Bounds} invariant); we also use + * {@link Txn.Kind#ExclusiveSyncPoint} and {@link Routable.Domain#Range} to match + * the way GC bounds are generated in production.

+ */ + private static TxnId gcBoundStrictlyAfter(Command command) + { + long hlc = command.txnId().hlc(); + long epoch = command.txnId().epoch(); + if (command.executeAt() != null) + { + hlc = Math.max(hlc, command.executeAt().hlc()); + epoch = Math.max(epoch, command.executeAt().epoch()); + } + + TxnId next = new TxnId(epoch, hlc + 1, ExclusiveSyncPoint, Range, command.txnId().node); + return next.addFlag(Timestamp.Flag.SHARD_BOUND); + } +} diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java index dcc631d776cf..fd2301046a44 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java @@ -49,7 +49,7 @@ import accord.impl.DefaultLocalListeners.NotifySink.NoOpNotifySink; import accord.local.Command; import accord.local.CommandStore; -import accord.local.CommandStores; +import accord.local.CommandStores.RangesForEpoch; import accord.local.DurableBefore; import accord.local.Node; import accord.local.Node.Id; @@ -404,15 +404,11 @@ public SequentialAsyncExecutor someSequentialExecutor() AccordJournal journal = new AccordJournal(spec); journal.start(null); - CommandStore.EpochUpdateHolder holder = new CommandStore.EpochUpdateHolder(); Ranges ranges = topology.rangesForNode(node); - holder.add(1, new CommandStores.RangesForEpoch(1, ranges), ranges); - AccordCommandStore result = new AccordCommandStore(0, time, agent, null, + return new AccordCommandStore(0, time, agent, null, cs -> new NoOpProgressLog(), cs -> new DefaultLocalListeners(null, new NoOpRemoteListeners(), new NoOpNotifySink()), - holder, journal, executor); - result.unsafeUpdateRangesForEpoch(); - return result; + new RangesForEpoch(1, ranges), journal, executor); } public static AccordCommandStore createAccordCommandStore( diff --git a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java index dd3e89d3fcb3..0fabb6735202 100644 --- a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java +++ b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java @@ -52,6 +52,7 @@ import accord.local.Command; import accord.local.CommandStore; import accord.local.CommandStores; +import accord.local.CommandStores.RangesForEpoch; import accord.local.DurableBefore; import accord.local.Node; import accord.local.NodeCommandStoreService; @@ -118,7 +119,6 @@ public class SimulatedAccordCommandStore implements AutoCloseable { private final List failures = new ArrayList<>(); private final SimulatedExecutorFactory globalExecutor; - private final CommandStore.EpochUpdateHolder updateHolder; private final BooleanSupplier shouldEvict, shouldFlush, shouldCompact; public final NodeCommandStoreService storeService; @@ -166,14 +166,12 @@ public SimulatedAccordCommandStore(@Nullable TableId tableId, RandomSource rs, F stage.unsafeSetExecutor(globalExecutor.configureSequential("ignore").build()); this.nodeId = AccordTopology.tcmIdToAccord(ClusterMetadata.currentNullable().myNodeId()); - this.updateHolder = new CommandStore.EpochUpdateHolder(); this.topology = AccordTopology.createAccordTopology(ClusterMetadata.current()); this.topologies = new Topologies.Single(SizeOfIntersectionSorter.SUPPLIER, topology); Ranges ranges = topology.ranges(); if (tableId != null) ranges = ranges.overlapping(Ranges.of(TokenRange.create(TokenKey.min(tableId, getPartitioner()), TokenKey.max(tableId, getPartitioner())))); - CommandStores.RangesForEpoch rangesForEpoch = new CommandStores.RangesForEpoch(topology.epoch(), ranges); - updateHolder.add(topology.epoch(), rangesForEpoch, ranges); + RangesForEpoch rangesForEpoch = new RangesForEpoch(topology.epoch(), ranges); this.storeService = new NodeCommandStoreService() { @@ -295,14 +293,13 @@ public void onException(Throwable t) @Override public void notify(SafeCommandStore safeStore, SafeCommand safeCommand, TxnId listener) {} @Override public boolean notify(SafeCommandStore safeStore, SafeCommand safeCommand, LocalListeners.ComplexListener listener) { return false; } }), - updateHolder, + rangesForEpoch, journal, new AccordExecutorSimple(0, CommandStore.class.getSimpleName() + '[' + 0 + ']', agent)); this.commandStore.executor().executeDirectlyWithLock(() -> { commandStore.executor().setCapacity(8 << 20); commandStore.executor().setWorkingSetSize(4 << 20); }); - commandStore.unsafeUpdateRangesForEpoch(); shouldEvict = boolSource(rs.fork()); { diff --git a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java index 016bde0fe965..138dbb4f12c9 100644 --- a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java +++ b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java @@ -64,6 +64,7 @@ import accord.local.Command; import accord.local.CommandBuilder; import accord.local.CommandStore; +import accord.local.CommandStores.RangesForEpoch; import accord.local.DurableBefore; import accord.local.Node; import accord.local.NodeCommandStoreService; @@ -662,7 +663,7 @@ protected TestCommandStore() null, ignore -> new ProgressLog.NoOpProgressLog(), ignore -> new DefaultLocalListeners(null, new DefaultRemoteListeners((a, b, c, d, e)->{}), DefaultLocalListeners.DefaultNotifySink.INSTANCE), - new EpochUpdateHolder()); + new RangesForEpoch(1, Ranges.EMPTY)); } @Override public boolean inStore() { return true; } diff --git a/test/unit/org/apache/cassandra/service/accord/serializers/LatestDepsSerializerTest.java b/test/unit/org/apache/cassandra/service/accord/serializers/LatestDepsSerializerTest.java index 96e3bc544ab2..7f3a25eb955c 100644 --- a/test/unit/org/apache/cassandra/service/accord/serializers/LatestDepsSerializerTest.java +++ b/test/unit/org/apache/cassandra/service/accord/serializers/LatestDepsSerializerTest.java @@ -99,12 +99,13 @@ private void testOne(IPartitioner partitioner, RandomSource rs) throws IOExcepti Arrays.sort(starts); for (int i = 0 ; i < size ; ++i) { - if (rs.nextBoolean()) continue; + if (size > 1 && rs.nextBoolean()) continue; entries[i] = new LatestDeps.LatestEntry(knownDeps.next(rs), rs.nextBoolean() ? rs.nextBoolean() ? Ballot.ZERO : Ballot.MAX : ballots.next(rs), rs.nextBoolean() ? null : deps.next(rs), rs.nextBoolean() ? null : deps.next(rs)); } + LatestDeps latestDeps = LatestDeps.SerializerSupport.create(starts, entries); DataOutputBuffer buf = new DataOutputBuffer(); Serializers.testSerde(buf, LatestDepsSerializers.latestDeps, latestDeps); diff --git a/test/unit/org/apache/cassandra/utils/AccordGenerators.java b/test/unit/org/apache/cassandra/utils/AccordGenerators.java index 3b936ece0b01..0e7d7a3685b8 100644 --- a/test/unit/org/apache/cassandra/utils/AccordGenerators.java +++ b/test/unit/org/apache/cassandra/utils/AccordGenerators.java @@ -192,7 +192,12 @@ public enum RecoveryStatus { None, Started, Complete } public static Gen commandsBuilder() { - return commandsBuilder(AccordGens.txnIds(), Gens.bools().all(), Gens.enums().all(RecoveryStatus.class), (rs, txnId, txn) -> AccordGens.depsFor(txnId, txn).next(rs)); + return commandsBuilder(AccordGens.txnIds()); + } + + public static Gen commandsBuilder(Gen txnIdGen) + { + return commandsBuilder(txnIdGen, Gens.bools().all(), Gens.enums().all(RecoveryStatus.class), (rs, txnId, txn) -> AccordGens.depsFor(txnId, txn).next(rs)); } public static Gen commandsBuilder(Gen txnIdGen, Gen fastPath, Gen recover, TriFunction depsGen)