From 6d270cccd18ae5936ebeeafdd596933337aeca94 Mon Sep 17 00:00:00 2001 From: Jon Meredith Date: Fri, 12 Jun 2026 15:16:50 +0100 Subject: [PATCH] Introduce CMS_COMMIT stage for serialized commit handling Add a dedicated single-threaded CMS_COMMIT stage to prevent INTERNAL_METADATA pool exhaustion under concurrent TCM_COMMIT_REQ load. Multiple threads entering AbstractLocalProcessor.commit() simultaneously amplify LWT retry pressure. Switching to a single commit thread eliminates that without reducing useful throughput since Paxos serializes the actual log write anyway. Patch by Jon Meredith; reviewed by Sam Tunnicliffe for CASSANDRA-21454 --- .../apache/cassandra/concurrent/Stage.java | 1 + .../config/CassandraRelevantProperties.java | 8 ++++ src/java/org/apache/cassandra/net/Verb.java | 29 +++++++++++++-- .../cassandra/tcm/ClusterMetadataService.java | 37 ++++++++++++++++++- 4 files changed, 69 insertions(+), 6 deletions(-) diff --git a/src/java/org/apache/cassandra/concurrent/Stage.java b/src/java/org/apache/cassandra/concurrent/Stage.java index 557c1ca0a48b..498aff9a31d8 100644 --- a/src/java/org/apache/cassandra/concurrent/Stage.java +++ b/src/java/org/apache/cassandra/concurrent/Stage.java @@ -59,6 +59,7 @@ public enum Stage PAXOS_REPAIR (false, "PaxosRepairStage", "internal", FBUtilities::getAvailableProcessors, null, Stage::multiThreadedStage), INTERNAL_METADATA (false, "InternalMetadataStage", "internal", FBUtilities::getAvailableProcessors, null, Stage::multiThreadedStage), FETCH_METADATA (false, "MetadataFetchLogStage", "internal", () -> 1, null, Stage::singleThreadedStage), + CMS_COMMIT (false, "CMSCommitStage", "internal", () -> 1, null, Stage::singleThreadedStage), ; public final String jmxName; private final Supplier executorSupplier; diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index 9aad3a26de7c..91ffea1e8040 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -606,6 +606,14 @@ public enum CassandraRelevantProperties // transactional cluster metadata relevant properties // TODO: not a fan of being forced to prefix these to satisfy the alphabetic ordering constraint // but it makes sense to group logically related properties together + /** + * Route TCM_COMMIT_REQ messages and local CMS commits through a dedicated single-threaded + * CMS_COMMIT stage rather than INTERNAL_METADATA, serializing concurrent commit attempts + * to eliminate LWT retry amplification. Also if enabled, move log fetching to the INTERNAL_METADATA pool + * rather than serializing. + * Default false (disabled) until qualified. + */ + TCM_SERIALIZE_CMS_COMMITS("cassandra.tcm.serialize_cms_commits", "false"), TCM_SHADOW_ROUND_MAX_ATTEMPTS("cassandra.shadow_round_max_attempts", "3"), TCM_SHADOW_ROUND_TIMEOUT("cassandra.shadow_round_timeout_millis", "15000"), /** diff --git a/src/java/org/apache/cassandra/net/Verb.java b/src/java/org/apache/cassandra/net/Verb.java index 1bb3b7a8900a..60abdcd1ea8a 100644 --- a/src/java/org/apache/cassandra/net/Verb.java +++ b/src/java/org/apache/cassandra/net/Verb.java @@ -31,6 +31,7 @@ import org.apache.cassandra.batchlog.BatchRemoveVerbHandler; import org.apache.cassandra.batchlog.BatchStoreVerbHandler; import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.CounterMutation; import org.apache.cassandra.db.CounterMutationVerbHandler; @@ -152,6 +153,7 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.apache.cassandra.concurrent.Stage.ANTI_ENTROPY; +import static org.apache.cassandra.concurrent.Stage.CMS_COMMIT; import static org.apache.cassandra.concurrent.Stage.COUNTER_MUTATION; import static org.apache.cassandra.concurrent.Stage.FETCH_METADATA; import static org.apache.cassandra.concurrent.Stage.GOSSIP; @@ -301,9 +303,10 @@ public enum Verb // transactional cluster metadata TCM_COMMIT_RSP (801, P0, rpcTimeout, INTERNAL_METADATA, MessageSerializers::commitResultSerializer, RESPONSE_HANDLER ), - TCM_COMMIT_REQ (802, P0, rpcTimeout, INTERNAL_METADATA, MessageSerializers::commitSerializer, () -> commitRequestHandler(), TCM_COMMIT_RSP ), - TCM_FETCH_CMS_LOG_RSP (803, P0, shortTimeout, FETCH_METADATA, MessageSerializers::logStateSerializer, RESPONSE_HANDLER ), - TCM_FETCH_CMS_LOG_REQ (804, P0, rpcTimeout, FETCH_METADATA, () -> FetchCMSLog.serializer, () -> fetchLogRequestHandler(), TCM_FETCH_CMS_LOG_RSP ), + TCM_COMMIT_REQ (802, P0, rpcTimeout, cmsCommitStage(), MessageSerializers::commitSerializer, () -> commitRequestHandler(), TCM_COMMIT_RSP ), + // Received log entries should always be processed serially on the log fetcher thread + TCM_FETCH_CMS_LOG_RSP (803, P0, shortTimeout, FETCH_METADATA, MessageSerializers::logStateSerializer, RESPONSE_HANDLER ), + TCM_FETCH_CMS_LOG_REQ (804, P0, rpcTimeout, cmsFetchLogStage(), () -> FetchCMSLog.serializer, () -> fetchLogRequestHandler(), TCM_FETCH_CMS_LOG_RSP ), TCM_REPLICATION (805, P0, rpcTimeout, INTERNAL_METADATA, MessageSerializers::logStateSerializer, () -> replicationHandler() ), TCM_NOTIFY_RSP (806, P0, rpcTimeout, INTERNAL_METADATA, () -> Epoch.messageSerializer, RESPONSE_HANDLER ), TCM_NOTIFY_REQ (807, P0, rpcTimeout, INTERNAL_METADATA, MessageSerializers::logStateSerializer, () -> logNotifyHandler(), TCM_NOTIFY_RSP ), @@ -313,8 +316,9 @@ public enum Verb TCM_ABORT_MIG (811, P0, rpcTimeout, INTERNAL_METADATA, () -> CMSInitializationRequest.Initiator.serializer,() -> Election.instance.abortHandler, TCM_INIT_MIG_RSP ), TCM_DISCOVER_RSP (812, P0, rpcTimeout, INTERNAL_METADATA, () -> Discovery.serializer, RESPONSE_HANDLER ), TCM_DISCOVER_REQ (813, P0, rpcTimeout, INTERNAL_METADATA, () -> NoPayload.serializer, () -> Discovery.instance.requestHandler, TCM_DISCOVER_RSP ), + // Received log entries should always be processed serially on the log fetcher thread TCM_FETCH_PEER_LOG_RSP (818, P0, shortTimeout, FETCH_METADATA, MessageSerializers::logStateSerializer, RESPONSE_HANDLER ), - TCM_FETCH_PEER_LOG_REQ (819, P0, rpcTimeout, FETCH_METADATA, () -> FetchPeerLog.serializer, () -> FetchPeerLog.Handler.instance, TCM_FETCH_PEER_LOG_RSP ), + TCM_FETCH_PEER_LOG_REQ (819, P0, rpcTimeout, cmsFetchLogStage(), () -> FetchPeerLog.serializer, () -> FetchPeerLog.Handler.instance, TCM_FETCH_PEER_LOG_RSP ), INITIATE_DATA_MOVEMENTS_RSP (814, P1, rpcTimeout, MISC, () -> NoPayload.serializer, RESPONSE_HANDLER ), INITIATE_DATA_MOVEMENTS_REQ (815, P1, rpcTimeout, MISC, () -> DataMovement.serializer, () -> DataMovementVerbHandler.instance, INITIATE_DATA_MOVEMENTS_RSP ), @@ -580,6 +584,23 @@ ToLongFunction unsafeSetExpiration(ToLongFunction expiration // could cause a conflict later when new normal verbs are added. private static final int MAX_CUSTOM_VERB_ID = 1000; + /** Selects the stage for TCM_COMMIT_REQ at class-load time based on the serialize_cms_commits property. */ + private static Stage cmsCommitStage() + { + return CassandraRelevantProperties.TCM_SERIALIZE_CMS_COMMITS.getBoolean() ? CMS_COMMIT : INTERNAL_METADATA; + } + + /** + * Selects the stage for fetch-log verbs at class-load time based on the serialize_cms_commits property. + * When commit serialization is enabled, commits are off INTERNAL_METADATA so fetch-log handlers can + * safely share it. When disabled, fetch-log keeps its own dedicated thread to avoid starvation from + * concurrent commit retry loops on INTERNAL_METADATA. + */ + private static Stage cmsFetchLogStage() + { + return CassandraRelevantProperties.TCM_SERIALIZE_CMS_COMMITS.getBoolean() ? INTERNAL_METADATA : FETCH_METADATA; + } + private static final Verb[] idToVerbMap; private static final Verb[] idToCustomVerbMap; private static final int minCustomId; diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java index 9c2c4ca0bfe0..8dbebc79d665 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java @@ -26,6 +26,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -39,6 +41,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.ScheduledExecutors; +import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.exceptions.ExceptionCode; @@ -921,6 +924,8 @@ public boolean commitsPaused() @VisibleForTesting public static class SwitchableProcessor implements Processor { + private static final boolean SERIALIZE_CMS_COMMITS = CassandraRelevantProperties.TCM_SERIALIZE_CMS_COMMITS.getBoolean(); + private final Processor local; private final RemoteProcessor remote; private final GossipProcessor gossip; @@ -970,10 +975,38 @@ public Commit.Result commit(Entry.Id entryId, Transformation transform, Epoch la try { Pair delegate = delegateInternal(); - Commit.Result result = delegate.right.commit(entryId, transform, lastKnown, retryPolicy); + Commit.Result result; ClusterMetadataService.State state = delegate.left; - if (state == LOCAL || state == RESET) + if ((state == LOCAL || state == RESET) && SERIALIZE_CMS_COMMITS) + { + try + { + result = Stage.CMS_COMMIT.executor().submit( + () -> delegate.right.commit(entryId, transform, lastKnown, retryPolicy)).get(); + } + catch (ExecutionException e) + { + Throwable cause = e.getCause(); + if (cause instanceof NotCMSException) throw (NotCMSException) cause; + if (cause instanceof RuntimeException) throw (RuntimeException) cause; + throw new RuntimeException(cause); + } + catch (CancellationException e) + { + return Commit.Result.failed(ExceptionCode.SERVER_ERROR, "CMS commit executor shut down"); + } + catch (InterruptedException e) + { + throw new RuntimeException("Interrupted waiting for CMS commit", e); + } replicator.send(result, null); + } + else + { + result = delegate.right.commit(entryId, transform, lastKnown, retryPolicy); + if (state == LOCAL || state == RESET) + replicator.send(result, null); + } return result; } catch (NotCMSException e)