Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[submodule "modules/accord"]
path = modules/accord
url = https://github.com/apache/cassandra-accord.git
branch = trunk
url = https://github.com/belliottsmith/cassandra-accord.git
branch = consistent-expunge
Comment thread
ifesdjeen marked this conversation as resolved.
2 changes: 1 addition & 1 deletion modules/accord
Submodule accord updated 30 files
+6 −3 accord-core/src/main/java/accord/coordinate/AbstractCoordination.java
+7 −9 accord-core/src/main/java/accord/coordinate/ExecuteTxn.java
+25 −20 accord-core/src/main/java/accord/coordinate/ReadCoordinator.java
+2 −2 accord-core/src/main/java/accord/coordinate/SynchronousRecoverAwait.java
+1 −1 accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
+15 −12 accord-core/src/main/java/accord/impl/CommandChange.java
+64 −69 accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
+10 −11 accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+2 −0 accord-core/src/main/java/accord/local/CommandBuilder.java
+17 −80 accord-core/src/main/java/accord/local/CommandStore.java
+23 −43 accord-core/src/main/java/accord/local/CommandStores.java
+3 −1 accord-core/src/main/java/accord/local/Commands.java
+12 −3 accord-core/src/main/java/accord/local/RedundantBefore.java
+34 −12 accord-core/src/main/java/accord/messages/Callback.java
+6 −3 accord-core/src/main/java/accord/topology/ActiveEpochs.java
+3 −22 accord-core/src/main/java/accord/utils/BTreeReducingRangeMap.java
+21 −2 accord-core/src/main/java/accord/utils/IntrusivePriorityHeap.java
+1 −0 accord-core/src/main/java/accord/utils/ReducingIntervalMap.java
+5 −0 accord-core/src/main/java/accord/utils/ReducingRangeMap.java
+28 −0 accord-core/src/main/java/accord/utils/btree/BTree.java
+15 −2 accord-core/src/main/java/accord/utils/btree/ReducingBTree.java
+1 −1 accord-core/src/test/java/accord/impl/RemoteListenersTest.java
+10 −6 accord-core/src/test/java/accord/impl/basic/Cluster.java
+12 −18 accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
+29 −6 accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
+408 −0 accord-core/src/test/java/accord/local/MaybeExecuteAdapterTest.java
+3 −2 accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
+3 −2 accord-core/src/test/java/accord/utils/AccordGens.java
+31 −9 accord-core/src/test/java/accord/utils/BTreeReducingRangeMapTest.java
+6 −8 accord-core/src/test/java/accord/utils/ReducingRangeMapTest.java
3 changes: 3 additions & 0 deletions src/java/org/apache/cassandra/config/AccordConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,9 @@ public enum CatchupMode
// TODO (required): roll this back to catchup_on_start_exit_on_failure: true
public boolean catchup_on_start_exit_on_failure = false;
public CatchupMode catchup_on_start = NORMAL;
public boolean execute_waiting_on_start = true;
public DurationSpec.IntSecondsBound execute_waiting_on_start_timeout = new DurationSpec.IntSecondsBound(0);
public boolean execute_waiting_on_start_fail_on_timeout = false;
public DurationSpec.IntSecondsBound shutdown_grace_period = new DurationSpec.IntSecondsBound(15 * 60);

public enum RangeIndexMode { in_memory, journal_sai }
Expand Down
6 changes: 3 additions & 3 deletions src/java/org/apache/cassandra/db/LivenessInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public interface LivenessInfo extends IMeasurableMemory
LivenessInfo EMPTY = new ImmutableLivenessInfo(NO_TIMESTAMP);
long UNSHARED_HEAP_SIZE = ObjectSizes.measure(EMPTY);

static LivenessInfo create(long timestamp, long nowInSec)
static LivenessInfo create(long timestamp)
{
return new ImmutableLivenessInfo(timestamp);
}
Expand All @@ -75,14 +75,14 @@ private static LivenessInfo expiring(long timestamp, int ttl, long nowInSec, boo
private static LivenessInfo create(long timestamp, int ttl, long nowInSec, boolean applyOverflowPolicy)
{
return ttl == NO_TTL
? create(timestamp, nowInSec)
? create(timestamp)
: expiring(timestamp, ttl, nowInSec, applyOverflowPolicy);
}

static LivenessInfo create(long timestamp, int ttl, long nowInSec)
{
return ttl == NO_TTL
? create(timestamp, nowInSec)
? create(timestamp)
: expiring(timestamp, ttl, nowInSec);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.accord.AccordCommandStore;
import org.apache.cassandra.service.accord.AccordService;
import org.apache.cassandra.service.accord.TokenRange;
import org.apache.cassandra.service.accord.api.TokenKey;
Expand Down Expand Up @@ -575,7 +576,7 @@ protected final CompactionController getCompactionController(Set<SSTableReader>
// we should impose stronger guarantees on incoming streams (that they don't contain tombstones), and validate our CommandStore-derived bounds
gcBeforeSeconds = node.durableBefore().foldlWithDefault(ranges, (e, v) -> e == null ? NO_GC : Math.min(e.universal.hlc(), v), null, Long.MAX_VALUE);
StoreSelector selector = StoreFinder.selector(ranges, Long.MIN_VALUE, Long.MAX_VALUE);
gcBeforeSeconds = node.commandStores().mapReduceUnsafe(selector, (ignore, commandStore) -> commandStore.unsafeGetRedundantBefore().foldl(ranges, (bs, v) -> bs.maxBound(LOCALLY_APPLIED).hlc(), Long.MAX_VALUE), Math::min, gcBeforeSeconds);
gcBeforeSeconds = node.commandStores().mapReduceUnsafe(selector, (ignore, commandStore) -> ((AccordCommandStore)commandStore).safeGetRedundantBefore().foldl(ranges, (bs, v) -> bs.maxBound(LOCALLY_APPLIED).hlc(), Long.MAX_VALUE), Math::min, gcBeforeSeconds);
if (gcBeforeSeconds == Long.MAX_VALUE)
gcBeforeSeconds = NO_GC;
else
Expand Down
15 changes: 9 additions & 6 deletions src/java/org/apache/cassandra/db/rows/BTreeRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,7 @@ public static BTreeRow create(Clustering<?> clustering,
{
long minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion.time()));
if (minDeletionTime != Long.MIN_VALUE)
{
long result = BTree.<ColumnData>accumulate(btree, (cd, l) -> Math.min(l, minDeletionTime(cd)) , minDeletionTime);
minDeletionTime = result;
}
minDeletionTime = BTree.<ColumnData>accumulate(btree, (cd, l) -> Math.min(l, minDeletionTime(cd)) , minDeletionTime);

return create(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime);
}
Expand All @@ -144,11 +141,17 @@ public static BTreeRow emptyRow(Clustering<?> clustering)

public static BTreeRow singleCellRow(Clustering<?> clustering, Cell<?> cell)
{
return singleCellRow(clustering, LivenessInfo.EMPTY, cell);
}

public static BTreeRow singleCellRow(Clustering<?> clustering, LivenessInfo primaryKeyLivenessInfo, Cell<?> cell)
{
long minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(cell));
if (cell.column().isSimple())
return new BTreeRow(clustering, BTree.singleton(cell), minDeletionTime(cell));
return new BTreeRow(clustering, primaryKeyLivenessInfo, Deletion.LIVE, BTree.singleton(cell), minDeletionTime);

ComplexColumnData complexData = new ComplexColumnData(cell.column(), new Cell<?>[]{ cell }, DeletionTime.LIVE);
return new BTreeRow(clustering, BTree.singleton(complexData), minDeletionTime(cell));
return new BTreeRow(clustering, primaryKeyLivenessInfo, Deletion.LIVE, BTree.singleton(complexData), minDeletionTime);
}

public static BTreeRow emptyDeletedRow(Clustering<?> clustering, Deletion deletion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,9 +497,9 @@ else if (cell.localDeletionTime() > biggestExpirationCell.localDeletionTime())
}
}
if (baseLiveness.isLive(nowInSec) && !baseLiveness.isExpiring())
return LivenessInfo.create(timestamp, nowInSec);
return LivenessInfo.create(timestamp);
if (hasNonExpiringLiveCell)
return LivenessInfo.create(timestamp, nowInSec);
return LivenessInfo.create(timestamp);
if (biggestExpirationCell == null)
return baseLiveness;
if (biggestExpirationCell.localDeletionTime() > baseLiveness.localExpirationTime()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ FilteredRow materialiseAndFilter(SimplePartition.SimpleRow parent)
}
Arrays.sort(columns, 0, columnCount, (a, b) -> ColumnData.comparator.compare((BufferCell)a, (BufferCell)b));
Object[] btree = BTree.build(BulkIterator.of(columns), columnCount, UpdateFunction.noOp);
BTreeRow row = BTreeRow.create(parent.clustering, LivenessInfo.create(timestampMicros, nowInSeconds), Row.Deletion.LIVE, btree);
BTreeRow row = BTreeRow.create(parent.clustering, LivenessInfo.create(timestampMicros), Row.Deletion.LIVE, btree);
if (!rowFilter.isSatisfiedBy(metadata, parent.partitionKey(), row, nowInSeconds))
return null;
return new FilteredRow(row);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ private ExecutorsTable()
"CREATE TABLE %s (\n" +
" executor_id int,\n" +
" status text,\n" +
" position int,\n" +
" position bigint,\n" +
" unique_position int,\n" +
" description text,\n" +
" command_store_id int,\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ protected void collect(PartitionsCollector collector)
advance = -1;
}

PartitionCollector partition = collector.partition(id.id());
while (i != end)
{
List<Request> request = rebind(local, slices.get(i), dataRange.isReversed(), rowFilter, columnFilter);
Expand All @@ -200,7 +199,7 @@ protected void collect(PartitionsCollector collector)
else
readCommand = PartitionRangeReadCommand.create(local, collector.nowInSeconds(), send.columnFilter, send.rowFilter, limits, send.dataRange);

RequestAndResponse rr = new RequestAndResponse(id, partition, readCommand);
RequestAndResponse rr = new RequestAndResponse(id, collector, readCommand);
send(rr, endpoint);
pending.addLast(rr);

Expand All @@ -218,12 +217,12 @@ protected void collect(PartitionsCollector collector)
private static class RequestAndResponse extends SyncPromise<ReadResponse>
{
final NodeId nodeId;
final PartitionCollector partition;
final PartitionsCollector partitions;
final ReadCommand readCommand;
private RequestAndResponse(NodeId nodeId, PartitionCollector partition, ReadCommand readCommand)
private RequestAndResponse(NodeId nodeId, PartitionsCollector partitions, ReadCommand readCommand)
{
this.nodeId = nodeId;
this.partition = partition;
this.partitions = partitions;
this.readCommand = readCommand;
}
}
Expand Down Expand Up @@ -292,6 +291,7 @@ private void collect(PartitionsCollector collector, RequestAndResponse rr, Funct
}

int pkCount = local.partitionKeyColumns().size();
PartitionCollector out = rr.partitions.partition(rr.nodeId.id());
try (UnfilteredPartitionIterator partitions = response.makeIterator(rr.readCommand))
{
while (partitions.hasNext())
Expand All @@ -311,7 +311,7 @@ private void collect(PartitionsCollector collector, RequestAndResponse rr, Funct
for (int j = 0 ; j < clustering.size(); ++j)
clusterings[pkCount + j] = clustering.bufferAt(j);
}
rr.partition.collect(rows -> {
out.collect(rows -> {
rows.add((Object[])clusterings)
.lazyCollect(columns -> {
row.forEach(cd -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
import accord.impl.progresslog.TxnState;
import accord.local.Command;
import accord.local.CommandStore;
import accord.local.CommandStores;
import accord.local.CommandStores.RangesForEpoch;
import accord.local.CommandSummaries;
import accord.local.MaxConflicts;
import accord.local.MaxDecidedRX;
Expand Down Expand Up @@ -234,32 +234,23 @@ public AccordCommandStore(int id,
DataStore dataStore,
ProgressLog.Factory progressLogFactory,
LocalListeners.Factory listenerFactory,
EpochUpdateHolder epochUpdateHolder,
RangesForEpoch rangesForEpoch,
Journal journal,
AccordExecutor sharedExecutor)
{
super(id, node, agent, dataStore, progressLogFactory, listenerFactory, epochUpdateHolder);
super(id, node, agent, dataStore, progressLogFactory, listenerFactory, rangesForEpoch);
this.loggingId = String.format("[%s]", id);
this.journal = journal;
this.sharedExecutor = sharedExecutor;
if (this.progressLog instanceof DefaultProgressLog)
((DefaultProgressLog)this.progressLog).unsafeSetConfig(DatabaseDescriptor.getAccordProgressLogConfig());

maybeLoadRangesForEpoch(journal.loadRangesForEpoch(id()));
maybeLoadRedundantBefore(journal.loadRedundantBefore(id()));
maybeLoadBootstrapBeganAt(journal.loadBootstrapBeganAt(id()));
maybeLoadSafeToRead(journal.loadSafeToRead(id()));
maybeLoadRangesForEpoch(journal.loadRangesForEpoch(id()));

CommandStores.RangesForEpoch ranges = this.rangesForEpoch;
if (ranges == null || ranges.all().isEmpty())
{
EpochUpdate update = epochUpdateHolder.get();
if (update != null)
ranges = update.newRangesForEpoch;
Invariants.require(ranges != null, "CommandStore %d created with no ranges", id);
}

tableId = (TableId)ranges.all().stream().map(r -> r.start().prefix()).reduce((a, b) -> {
tableId = (TableId)rangesForEpoch.all().stream().map(r -> r.start().prefix()).reduce((a, b) -> {
Invariants.require(a.equals(b), "CommandStore created with multiple distinct TableId (%s and %s)", a, b);
return a;
}).orElseThrow(() -> Invariants.illegalState("CommandStore %d created with no ranges", id));
Expand Down Expand Up @@ -610,8 +601,8 @@ public AccordCompactionInfo getCompactionInfo()
RedundantBefore redundantBefore;
if (safeRedundantBefore == null) redundantBefore = RedundantBefore.EMPTY;
else redundantBefore = safeRedundantBefore.redundantBefore;
CommandStores.RangesForEpoch ranges = this.rangesForEpoch;
if (ranges == null) ranges = CommandStores.RangesForEpoch.EMPTY;
RangesForEpoch ranges = this.rangesForEpoch;
if (ranges == null) ranges = RangesForEpoch.EMPTY;
return new AccordCompactionInfo(id, redundantBefore, ranges, tableId);
}

Expand All @@ -637,8 +628,8 @@ protected void ensureDurable(Ranges ranges, RedundantBefore onCommandStoreDurabl

protected void ensureDurable()
{
RedundantBefore forCommandStore = nonDurable(unsafeGetRedundantBefore(), LOCALLY_DURABLE_TO_COMMAND_STORE, LOCALLY_DURABLE_TO_COMMAND_STORE_ONLY);
RedundantBefore forDataStore = nonDurable(unsafeGetRedundantBefore(), LOCALLY_DURABLE_TO_DATA_STORE, LOCALLY_DURABLE_TO_DATA_STORE_ONLY);
RedundantBefore forCommandStore = nonDurable(safeGetRedundantBefore(), LOCALLY_DURABLE_TO_COMMAND_STORE, LOCALLY_DURABLE_TO_COMMAND_STORE_ONLY);
RedundantBefore forDataStore = nonDurable(safeGetRedundantBefore(), LOCALLY_DURABLE_TO_DATA_STORE, LOCALLY_DURABLE_TO_DATA_STORE_ONLY);
this.ensureDurable(forCommandStore.ranges(Objects::nonNull), forCommandStore);
dataStore.ensureDurable(this, forDataStore, 0);
}
Expand All @@ -659,7 +650,7 @@ private RedundantBefore nonDurable(RedundantBefore redundantBefore, Property dur

protected void ensureDurable(@Nullable Ranges ranges, ReportDurable onCommandStoreDurable)
{
if (node().isReplaying() && onCommandStoreDurable.flags == 0 && unsafeGetRedundantBefore().isAtLeast(onCommandStoreDurable.redundantBefore))
if (node().isReplaying() && onCommandStoreDurable.flags == 0 && safeGetRedundantBefore().isAtLeast(onCommandStoreDurable.redundantBefore))
return;

long reportId = nextDurabilityLoggingId.incrementAndGet();
Expand Down Expand Up @@ -722,13 +713,6 @@ public void unsafeUpsertRedundantBefore(RedundantBefore addRedundantBefore)
super.unsafeUpsertRedundantBefore(addRedundantBefore);
}

@VisibleForTesting
public void unsafeUpdateRangesForEpoch()
{
super.unsafeUpdateRangesForEpoch();
safeRedundantBefore = new SafeRedundantBefore(0, unsafeGetRedundantBefore());
}

public static class AccordCommandStoreReplayer extends AbstractReplayer
{
private final AccordCommandStore commandStore;
Expand Down Expand Up @@ -760,18 +744,16 @@ public AsyncChain<Route> replay(TxnId txnId)
* Replay/state reloading
*/

void maybeLoadRedundantBefore(RedundantBefore redundantBefore)
protected void loadRedundantBefore(RedundantBefore redundantBefore)
{
Invariants.require(safeRedundantBefore == null);
if (redundantBefore != null)
{
super.loadRedundantBefore(redundantBefore);
safeRedundantBefore = new SafeRedundantBefore(0, redundantBefore);
}

protected void maybeLoadRedundantBefore(RedundantBefore redundantBefore)
{
if (redundantBefore != null && !redundantBefore.isEmpty())
loadRedundantBefore(redundantBefore);
safeRedundantBefore = new SafeRedundantBefore(0, redundantBefore);
}
else
{
safeRedundantBefore = new SafeRedundantBefore(0, this.unsafeGetRedundantBefore());
}
}

void maybeLoadBootstrapBeganAt(NavigableMap<TxnId, Ranges> bootstrapBeganAt)
Expand All @@ -786,7 +768,7 @@ void maybeLoadSafeToRead(NavigableMap<Timestamp, Ranges> safeToRead)
loadSafeToRead(safeToRead);
}

void maybeLoadRangesForEpoch(CommandStores.RangesForEpoch rangesForEpoch)
void maybeLoadRangesForEpoch(RangesForEpoch rangesForEpoch)
{
if (rangesForEpoch != null)
loadRangesForEpoch(rangesForEpoch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.cassandra.db.DataRange;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.LivenessInfo;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.ReadExecutionController;
import org.apache.cassandra.db.RegularAndStaticColumns;
Expand Down Expand Up @@ -116,6 +117,7 @@

import static java.lang.String.format;
import static java.util.Collections.emptyMap;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static org.apache.cassandra.config.AccordConfig.RangeIndexMode.journal_sai;
import static org.apache.cassandra.db.partitions.PartitionUpdate.singleRowUpdate;
import static org.apache.cassandra.db.rows.BTreeRow.singleCellRow;
Expand Down Expand Up @@ -269,7 +271,7 @@ public static CommandsForKey load(int commandStoreId, TokenKey key)
static CommandsForKey unsafeLoad(CommandsForKeyAccessor accessor, int commandStoreId, TokenKey key)
{
long timestampMicros = TimeUnit.MILLISECONDS.toMicros(Global.currentTimeMillis());
int nowInSeconds = (int) TimeUnit.MICROSECONDS.toSeconds(timestampMicros);
int nowInSeconds = (int) MICROSECONDS.toSeconds(timestampMicros);

SinglePartitionReadCommand command = makeRead(accessor, commandStoreId, key, nowInSeconds);

Expand Down Expand Up @@ -346,7 +348,9 @@ public static PartitionUpdate makeUpdate(int storeId, TokenKey key, long timesta
{
return singleRowUpdate(CFKAccessor.table,
CommandsForKeyAccessor.makeSystemTableKey(storeId, key),
singleCellRow(Clustering.EMPTY, BufferCell.live(CFKAccessor.data, timestampMicros, bytes)));
singleCellRow(Clustering.EMPTY,
LivenessInfo.create(timestampMicros),
BufferCell.live(CFKAccessor.data, timestampMicros, bytes)));
}

public static Runnable systemTableUpdater(int storeId, TokenKey key, CommandsForKey update, Object serialized, long timestampMicros)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ private AccordSafeCommandStore(AccordTask<?> task,
this.task = task;
this.commandsForRanges = commandsForRanges;
this.commandStore = commandStore;
commandStore.updateRangesForEpoch(this);
}

@Override
Expand Down
Loading