Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/java/org/apache/cassandra/concurrent/Stage.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public enum Stage
PAXOS_REPAIR (false, "PaxosRepairStage", "internal", FBUtilities::getAvailableProcessors, null, Stage::multiThreadedStage),
INTERNAL_METADATA (false, "InternalMetadataStage", "internal", FBUtilities::getAvailableProcessors, null, Stage::multiThreadedStage),
FETCH_METADATA (false, "MetadataFetchLogStage", "internal", () -> 1, null, Stage::singleThreadedStage),
CMS_COMMIT (false, "CMSCommitStage", "internal", () -> 1, null, Stage::singleThreadedStage),
;
public final String jmxName;
private final Supplier<ExecutorPlus> executorSupplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,14 @@ public enum CassandraRelevantProperties
// transactional cluster metadata relevant properties
// TODO: not a fan of being forced to prefix these to satisfy the alphabetic ordering constraint
// but it makes sense to group logically related properties together
/**
* Route TCM_COMMIT_REQ messages and local CMS commits through a dedicated single-threaded
* CMS_COMMIT stage rather than INTERNAL_METADATA, serializing concurrent commit attempts
* to eliminate LWT retry amplification. Also if enabled, move log fetching to the INTERNAL_METADATA pool
* rather than serializing.
* Default false (disabled) until qualified.
*/
TCM_SERIALIZE_CMS_COMMITS("cassandra.tcm.serialize_cms_commits", "false"),
TCM_SHADOW_ROUND_MAX_ATTEMPTS("cassandra.shadow_round_max_attempts", "3"),
TCM_SHADOW_ROUND_TIMEOUT("cassandra.shadow_round_timeout_millis", "15000"),
/**
Expand Down
29 changes: 25 additions & 4 deletions src/java/org/apache/cassandra/net/Verb.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.cassandra.batchlog.BatchRemoveVerbHandler;
import org.apache.cassandra.batchlog.BatchStoreVerbHandler;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.CounterMutation;
import org.apache.cassandra.db.CounterMutationVerbHandler;
Expand Down Expand Up @@ -152,6 +153,7 @@

import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.cassandra.concurrent.Stage.ANTI_ENTROPY;
import static org.apache.cassandra.concurrent.Stage.CMS_COMMIT;
import static org.apache.cassandra.concurrent.Stage.COUNTER_MUTATION;
import static org.apache.cassandra.concurrent.Stage.FETCH_METADATA;
import static org.apache.cassandra.concurrent.Stage.GOSSIP;
Expand Down Expand Up @@ -301,9 +303,10 @@ public enum Verb

// transactional cluster metadata
TCM_COMMIT_RSP (801, P0, rpcTimeout, INTERNAL_METADATA, MessageSerializers::commitResultSerializer, RESPONSE_HANDLER ),
TCM_COMMIT_REQ (802, P0, rpcTimeout, INTERNAL_METADATA, MessageSerializers::commitSerializer, () -> commitRequestHandler(), TCM_COMMIT_RSP ),
TCM_FETCH_CMS_LOG_RSP (803, P0, shortTimeout, FETCH_METADATA, MessageSerializers::logStateSerializer, RESPONSE_HANDLER ),
TCM_FETCH_CMS_LOG_REQ (804, P0, rpcTimeout, FETCH_METADATA, () -> FetchCMSLog.serializer, () -> fetchLogRequestHandler(), TCM_FETCH_CMS_LOG_RSP ),
TCM_COMMIT_REQ (802, P0, rpcTimeout, cmsCommitStage(), MessageSerializers::commitSerializer, () -> commitRequestHandler(), TCM_COMMIT_RSP ),
// Received log entries should always be processed serially on the log fetcher thread
TCM_FETCH_CMS_LOG_RSP (803, P0, shortTimeout, FETCH_METADATA, MessageSerializers::logStateSerializer, RESPONSE_HANDLER ),
TCM_FETCH_CMS_LOG_REQ (804, P0, rpcTimeout, cmsFetchLogStage(), () -> FetchCMSLog.serializer, () -> fetchLogRequestHandler(), TCM_FETCH_CMS_LOG_RSP ),
TCM_REPLICATION (805, P0, rpcTimeout, INTERNAL_METADATA, MessageSerializers::logStateSerializer, () -> replicationHandler() ),
TCM_NOTIFY_RSP (806, P0, rpcTimeout, INTERNAL_METADATA, () -> Epoch.messageSerializer, RESPONSE_HANDLER ),
TCM_NOTIFY_REQ (807, P0, rpcTimeout, INTERNAL_METADATA, MessageSerializers::logStateSerializer, () -> logNotifyHandler(), TCM_NOTIFY_RSP ),
Expand All @@ -313,8 +316,9 @@ public enum Verb
TCM_ABORT_MIG (811, P0, rpcTimeout, INTERNAL_METADATA, () -> CMSInitializationRequest.Initiator.serializer,() -> Election.instance.abortHandler, TCM_INIT_MIG_RSP ),
TCM_DISCOVER_RSP (812, P0, rpcTimeout, INTERNAL_METADATA, () -> Discovery.serializer, RESPONSE_HANDLER ),
TCM_DISCOVER_REQ (813, P0, rpcTimeout, INTERNAL_METADATA, () -> NoPayload.serializer, () -> Discovery.instance.requestHandler, TCM_DISCOVER_RSP ),
// Received log entries should always be processed serially on the log fetcher thread
TCM_FETCH_PEER_LOG_RSP (818, P0, shortTimeout, FETCH_METADATA, MessageSerializers::logStateSerializer, RESPONSE_HANDLER ),
TCM_FETCH_PEER_LOG_REQ (819, P0, rpcTimeout, FETCH_METADATA, () -> FetchPeerLog.serializer, () -> FetchPeerLog.Handler.instance, TCM_FETCH_PEER_LOG_RSP ),
TCM_FETCH_PEER_LOG_REQ (819, P0, rpcTimeout, cmsFetchLogStage(), () -> FetchPeerLog.serializer, () -> FetchPeerLog.Handler.instance, TCM_FETCH_PEER_LOG_RSP ),

INITIATE_DATA_MOVEMENTS_RSP (814, P1, rpcTimeout, MISC, () -> NoPayload.serializer, RESPONSE_HANDLER ),
INITIATE_DATA_MOVEMENTS_REQ (815, P1, rpcTimeout, MISC, () -> DataMovement.serializer, () -> DataMovementVerbHandler.instance, INITIATE_DATA_MOVEMENTS_RSP ),
Expand Down Expand Up @@ -580,6 +584,23 @@ ToLongFunction<TimeUnit> unsafeSetExpiration(ToLongFunction<TimeUnit> expiration
// could cause a conflict later when new normal verbs are added.
private static final int MAX_CUSTOM_VERB_ID = 1000;

/** Selects the stage for TCM_COMMIT_REQ at class-load time based on the serialize_cms_commits property. */
private static Stage cmsCommitStage()
{
return CassandraRelevantProperties.TCM_SERIALIZE_CMS_COMMITS.getBoolean() ? CMS_COMMIT : INTERNAL_METADATA;
}

/**
* Selects the stage for fetch-log verbs at class-load time based on the serialize_cms_commits property.
* When commit serialization is enabled, commits are off INTERNAL_METADATA so fetch-log handlers can
* safely share it. When disabled, fetch-log keeps its own dedicated thread to avoid starvation from
* concurrent commit retry loops on INTERNAL_METADATA.
*/
private static Stage cmsFetchLogStage()
{
return CassandraRelevantProperties.TCM_SERIALIZE_CMS_COMMITS.getBoolean() ? INTERNAL_METADATA : FETCH_METADATA;
}

private static final Verb[] idToVerbMap;
private static final Verb[] idToCustomVerbMap;
private static final int minCustomId;
Expand Down
37 changes: 35 additions & 2 deletions src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -39,6 +41,7 @@
import org.slf4j.LoggerFactory;

import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.exceptions.ExceptionCode;
Expand Down Expand Up @@ -921,6 +924,8 @@ public boolean commitsPaused()
@VisibleForTesting
public static class SwitchableProcessor implements Processor
{
private static final boolean SERIALIZE_CMS_COMMITS = CassandraRelevantProperties.TCM_SERIALIZE_CMS_COMMITS.getBoolean();

private final Processor local;
private final RemoteProcessor remote;
private final GossipProcessor gossip;
Expand Down Expand Up @@ -970,10 +975,38 @@ public Commit.Result commit(Entry.Id entryId, Transformation transform, Epoch la
try
{
Pair<State, Processor> delegate = delegateInternal();
Commit.Result result = delegate.right.commit(entryId, transform, lastKnown, retryPolicy);
Commit.Result result;
ClusterMetadataService.State state = delegate.left;
if (state == LOCAL || state == RESET)
if ((state == LOCAL || state == RESET) && SERIALIZE_CMS_COMMITS)
{
try
{
result = Stage.CMS_COMMIT.executor().submit(
() -> delegate.right.commit(entryId, transform, lastKnown, retryPolicy)).get();
}
catch (ExecutionException e)
{
Throwable cause = e.getCause();
if (cause instanceof NotCMSException) throw (NotCMSException) cause;
if (cause instanceof RuntimeException) throw (RuntimeException) cause;
throw new RuntimeException(cause);
}
catch (CancellationException e)
{
return Commit.Result.failed(ExceptionCode.SERVER_ERROR, "CMS commit executor shut down");
}
catch (InterruptedException e)
{
throw new RuntimeException("Interrupted waiting for CMS commit", e);
}
replicator.send(result, null);
}
else
{
result = delegate.right.commit(entryId, transform, lastKnown, retryPolicy);
if (state == LOCAL || state == RESET)
replicator.send(result, null);
}
return result;
}
catch (NotCMSException e)
Expand Down