From f8f18dae990c40c48b10bc1707ceb77d94b01479 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Thu, 13 Nov 2025 13:56:48 -0600 Subject: [PATCH 1/5] Adding nodeContext to TransportBroadcastByNodeAction --- .../TransportDataStreamsStatsAction.java | 6 ++- .../TransportReloadAnalyzersAction.java | 6 ++- .../TransportClearIndicesCacheAction.java | 6 ++- .../forcemerge/TransportForceMergeAction.java | 6 ++- .../recovery/TransportRecoveryAction.java | 10 ++++- .../TransportIndicesSegmentsAction.java | 6 ++- .../stats/TransportFieldUsageAction.java | 6 ++- .../stats/TransportIndicesStatsAction.java | 14 +++++- .../node/TransportBroadcastByNodeAction.java | 16 ++++--- .../TransportBroadcastByNodeActionTests.java | 45 ++++++++++++++++--- .../action/TransportForgetFollowerAction.java | 6 ++- ...actTransportSearchableSnapshotsAction.java | 10 ++++- 12 files changed, 105 insertions(+), 32 deletions(-) diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportDataStreamsStatsAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportDataStreamsStatsAction.java index e4e942ca79e7a..4ae89dac9b86c 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportDataStreamsStatsAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportDataStreamsStatsAction.java @@ -52,7 +52,8 @@ public class TransportDataStreamsStatsAction extends TransportBroadcastByNodeAction< DataStreamsStatsAction.Request, DataStreamsStatsAction.Response, - DataStreamsStatsAction.DataStreamShardStats> { + DataStreamsStatsAction.DataStreamShardStats, + Void> { private final IndicesService indicesService; private final ProjectResolver projectResolver; @@ -114,7 +115,8 @@ protected void shardOperation( DataStreamsStatsAction.Request request, ShardRouting shardRouting, Task task, - ActionListener listener + ActionListener listener, + Void nodeContext ) { ActionListener.completeWith(listener, () -> { assert shardRouting.isSearchable() : "shard routing is not searchable: " + shardRouting; diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportReloadAnalyzersAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportReloadAnalyzersAction.java index 2d511f493aba7..fdb8a29a9870a 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportReloadAnalyzersAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportReloadAnalyzersAction.java @@ -50,7 +50,8 @@ public class TransportReloadAnalyzersAction extends TransportBroadcastByNodeAction< ReloadAnalyzersRequest, ReloadAnalyzersResponse, - TransportReloadAnalyzersAction.ReloadResult> { + TransportReloadAnalyzersAction.ReloadResult, + Void> { public static final ActionType TYPE = new ActionType<>("indices:admin/reload_analyzers"); private static final Logger logger = LogManager.getLogger(TransportReloadAnalyzersAction.class); @@ -122,7 +123,8 @@ protected void shardOperation( ReloadAnalyzersRequest request, ShardRouting shardRouting, Task task, - ActionListener listener + ActionListener listener, + Void nodeContext ) { ActionListener.completeWith(listener, () -> { logger.info("reloading analyzers for index shard " + shardRouting); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java index 65d8d43776c72..5ee4c83efdf57 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java @@ -37,7 +37,8 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastByNodeAction< ClearIndicesCacheRequest, BroadcastResponse, - TransportBroadcastByNodeAction.EmptyResult> { + TransportBroadcastByNodeAction.EmptyResult, + Void> { public static final ActionType TYPE = new ActionType<>("indices:admin/cache/clear"); private final IndicesService indicesService; @@ -94,7 +95,8 @@ protected void shardOperation( ClearIndicesCacheRequest request, ShardRouting shardRouting, Task task, - ActionListener listener + ActionListener listener, + Void nodeContext ) { ActionListener.completeWith(listener, () -> { indicesService.clearIndexShardCache( diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/TransportForceMergeAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/TransportForceMergeAction.java index 8f6df9a1c3a61..4c80e94826863 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/TransportForceMergeAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/TransportForceMergeAction.java @@ -40,7 +40,8 @@ public class TransportForceMergeAction extends TransportBroadcastByNodeAction< ForceMergeRequest, BroadcastResponse, - TransportBroadcastByNodeAction.EmptyResult> { + TransportBroadcastByNodeAction.EmptyResult, + Void> { private final IndicesService indicesService; private final ThreadPool threadPool; @@ -94,7 +95,8 @@ protected void shardOperation( ForceMergeRequest request, ShardRouting shardRouting, Task task, - ActionListener listener + ActionListener listener, + Void nodeContext ) { assert (task instanceof CancellableTask) == false; // TODO: add cancellation handling here once the task supports it SubscribableListener.newForked(l -> { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java index 142493a738cfe..df498452d7e8e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java @@ -41,7 +41,7 @@ * Transport action for shard recovery operation. This transport action does not actually * perform shard recovery, it only reports on recoveries (both active and complete). */ -public class TransportRecoveryAction extends TransportBroadcastByNodeAction { +public class TransportRecoveryAction extends TransportBroadcastByNodeAction { private final IndicesService indicesService; private final ProjectResolver projectResolver; @@ -103,7 +103,13 @@ protected RecoveryRequest readRequestFrom(StreamInput in) throws IOException { } @Override - protected void shardOperation(RecoveryRequest request, ShardRouting shardRouting, Task task, ActionListener listener) { + protected void shardOperation( + RecoveryRequest request, + ShardRouting shardRouting, + Task task, + ActionListener listener, + Void nodeContext + ) { ActionListener.completeWith(listener, () -> { assert task instanceof CancellableTask; IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/segments/TransportIndicesSegmentsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/segments/TransportIndicesSegmentsAction.java index 76d0c0175f94d..376cc3502688b 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/segments/TransportIndicesSegmentsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/segments/TransportIndicesSegmentsAction.java @@ -35,7 +35,8 @@ public class TransportIndicesSegmentsAction extends TransportBroadcastByNodeAction< IndicesSegmentsRequest, IndicesSegmentResponse, - ShardSegments> { + ShardSegments, + Void> { private final IndicesService indicesService; private final ProjectResolver projectResolver; @@ -109,7 +110,8 @@ protected void shardOperation( IndicesSegmentsRequest request, ShardRouting shardRouting, Task task, - ActionListener listener + ActionListener listener, + Void nodeContext ) { ActionListener.completeWith(listener, () -> { assert task instanceof CancellableTask; diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportFieldUsageAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportFieldUsageAction.java index be9709a596e77..e46d8d59d314b 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportFieldUsageAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportFieldUsageAction.java @@ -38,7 +38,8 @@ public class TransportFieldUsageAction extends TransportBroadcastByNodeAction< FieldUsageStatsRequest, FieldUsageStatsResponse, - FieldUsageShardResponse> { + FieldUsageShardResponse, + Void> { private final IndicesService indicesService; private final ProjectResolver projectResolver; @@ -94,7 +95,8 @@ protected void shardOperation( FieldUsageStatsRequest request, ShardRouting shardRouting, Task task, - ActionListener listener + ActionListener listener, + Void nodeContext ) { ActionListener.completeWith(listener, () -> { final ShardId shardId = shardRouting.shardId(); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java index 210fb42ca584b..ea81278798fc9 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java @@ -36,7 +36,11 @@ import java.io.IOException; -public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction { +public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction< + IndicesStatsRequest, + IndicesStatsResponse, + ShardStats, + Void> { private final IndicesService indicesService; private final ProjectResolver projectResolver; @@ -109,7 +113,13 @@ protected IndicesStatsRequest readRequestFrom(StreamInput in) throws IOException } @Override - protected void shardOperation(IndicesStatsRequest request, ShardRouting shardRouting, Task task, ActionListener listener) { + protected void shardOperation( + IndicesStatsRequest request, + ShardRouting shardRouting, + Task task, + ActionListener listener, + Void nodeContext + ) { ActionListener.completeWith(listener, () -> { assert task instanceof CancellableTask; IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()); diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java index 899f83f5f6fb1..b9d0d1800556c 100644 --- a/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java @@ -74,7 +74,8 @@ public abstract class TransportBroadcastByNodeAction< Request extends BroadcastRequest, Response extends BaseBroadcastResponse, - ShardOperationResult extends Writeable> extends HandledTransportAction { + ShardOperationResult extends Writeable, + NodeContext> extends HandledTransportAction { private static final Logger logger = LogManager.getLogger(TransportBroadcastByNodeAction.class); @@ -184,9 +185,14 @@ protected abstract void shardOperation( Request request, ShardRouting shardRouting, Task task, - ActionListener listener + ActionListener listener, + NodeContext nodeContext ); + protected NodeContext createNodeContext() { + return null; + } + /** * Determines the shards on which this operation will be executed on. The operation is executed once per shard. * @@ -415,7 +421,7 @@ private void executeAsDataNode( ) { assert Transports.assertNotTransportThread("O(#shards) work must always fork to an appropriate executor"); logger.trace("[{}] executing operation on [{}] shards", actionName, shards.size()); - + NodeContext nodeContext = createNodeContext(); new CancellableFanOut() { final ArrayList results = new ArrayList<>(shards.size()); @@ -424,7 +430,7 @@ private void executeAsDataNode( @Override protected void sendItemRequest(ShardRouting shardRouting, ActionListener listener) { logger.trace(() -> format("[%s] executing operation for shard [%s]", actionName, shardRouting.shortSummary())); - ActionRunnable.wrap(listener, l -> shardOperation(request, shardRouting, task, l)).run(); + ActionRunnable.wrap(listener, l -> shardOperation(request, shardRouting, task, l, nodeContext)).run(); } @Override @@ -610,7 +616,7 @@ public void writeTo(StreamOutput out) throws IOException { } /** - * Can be used for implementations of {@link #shardOperation(BroadcastRequest, ShardRouting, Task, ActionListener) shardOperation} for + * Can be used for implementations of {@link #shardOperation(BroadcastRequest, ShardRouting, Task, ActionListener, NodeContext) shardOperation} for * which there is no shard-level return value. */ public static final class EmptyResult implements Writeable { diff --git a/server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java b/server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java index 8e86044a33673..990a66a2c54c4 100644 --- a/server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java @@ -95,7 +95,10 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.anEmptyMap; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.object.HasToString.hasToString; public class TransportBroadcastByNodeActionTests extends ESTestCase { @@ -165,8 +168,9 @@ public ShardResult() {} public void writeTo(StreamOutput out) throws IOException {} } - class TestTransportBroadcastByNodeAction extends TransportBroadcastByNodeAction { + class TestTransportBroadcastByNodeAction extends TransportBroadcastByNodeAction { private final Map shards = new HashMap<>(); + private Integer expectedNodeContext = null; TestTransportBroadcastByNodeAction(String actionName) { super( @@ -201,7 +205,22 @@ protected Request readRequestFrom(StreamInput in) throws IOException { } @Override - protected void shardOperation(Request request, ShardRouting shardRouting, Task task, ActionListener listener) { + public Integer createNodeContext() { + assertThat(expectedNodeContext, nullValue()); + expectedNodeContext = randomInt(); + return expectedNodeContext; + } + + @Override + protected void shardOperation( + Request request, + ShardRouting shardRouting, + Task task, + ActionListener listener, + Integer nodeContext + ) { + assertThat(expectedNodeContext, not(nullValue())); + assertThat(nodeContext, equalTo(expectedNodeContext)); ActionListener.completeWith(listener, () -> { if (rarely()) { shards.put(shardRouting, Boolean.TRUE); @@ -413,7 +432,7 @@ public void testNoShardOperationsExecutedIfTaskCancelled() throws Exception { shards.add(shard); } } - final TransportBroadcastByNodeAction.BroadcastByNodeTransportRequestHandler handler = + final TransportBroadcastByNodeAction.BroadcastByNodeTransportRequestHandler handler = action.new BroadcastByNodeTransportRequestHandler(); final PlainActionFuture future = new PlainActionFuture<>(); @@ -474,7 +493,7 @@ public void testOperationExecution() throws Exception { shards.add(shard); } } - final TransportBroadcastByNodeAction.BroadcastByNodeTransportRequestHandler handler = + final TransportBroadcastByNodeAction.BroadcastByNodeTransportRequestHandler handler = action.new BroadcastByNodeTransportRequestHandler(); final PlainActionFuture future = new PlainActionFuture<>(); @@ -568,7 +587,7 @@ public void testResultAggregation() throws ExecutionException, InterruptedExcept } } totalSuccessfulShards += shardResults.size(); - TransportBroadcastByNodeAction.NodeResponse nodeResponse = action.new NodeResponse( + TransportBroadcastByNodeAction.NodeResponse nodeResponse = action.new NodeResponse( entry.getKey(), shards.size(), shardResults, exceptions ); transport.handleResponse(requestId, nodeResponse); @@ -643,7 +662,13 @@ public void testShardLevelOperationsStopOnCancellation() throws Exception { int expectedShardId; @Override - protected void shardOperation(Request request, ShardRouting shardRouting, Task task, ActionListener listener) { + protected void shardOperation( + Request request, + ShardRouting shardRouting, + Task task, + ActionListener listener, + Integer nodeContext + ) { // this test runs a node-level operation on three shards, cancelling the task some time during the execution on the second if (task instanceof CancellableTask cancellableTask) { assertEquals(expectedShardId++, shardRouting.shardId().id()); @@ -697,7 +722,13 @@ public void testShardResultsReleasedOnCancellation() throws Exception { action = new TestTransportBroadcastByNodeAction("indices:admin/shard_level_gc_test") { @Override - protected void shardOperation(Request request, ShardRouting shardRouting, Task task, ActionListener listener) { + protected void shardOperation( + Request request, + ShardRouting shardRouting, + Task task, + ActionListener listener, + Integer nodeContext + ) { listeners.add(listener); } }; diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportForgetFollowerAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportForgetFollowerAction.java index cb5c6f9985753..5d1055d0fe17a 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportForgetFollowerAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportForgetFollowerAction.java @@ -44,7 +44,8 @@ public class TransportForgetFollowerAction extends TransportBroadcastByNodeAction< ForgetFollowerAction.Request, BroadcastResponse, - TransportBroadcastByNodeAction.EmptyResult> { + TransportBroadcastByNodeAction.EmptyResult, + Void> { private final IndicesService indicesService; @@ -96,7 +97,8 @@ protected void shardOperation( final ForgetFollowerAction.Request request, final ShardRouting shardRouting, Task task, - ActionListener listener + ActionListener listener, + Void nodeContext ) { final Index followerIndex = new Index(request.followerIndex(), request.followerIndexUUID()); final Index leaderIndex = clusterService.state().metadata().getProject().index(request.leaderIndex()).getIndex(); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/AbstractTransportSearchableSnapshotsAction.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/AbstractTransportSearchableSnapshotsAction.java index 25b38e12a0680..8ab962d049de1 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/AbstractTransportSearchableSnapshotsAction.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/AbstractTransportSearchableSnapshotsAction.java @@ -40,7 +40,7 @@ public abstract class AbstractTransportSearchableSnapshotsAction< Request extends BroadcastRequest, Response extends BaseBroadcastResponse, - ShardOperationResult extends Writeable> extends TransportBroadcastByNodeAction { + ShardOperationResult extends Writeable> extends TransportBroadcastByNodeAction { private final IndicesService indicesService; private final XPackLicenseState licenseState; @@ -106,7 +106,13 @@ protected ShardsIterator shards(ClusterState state, Request request, String[] co } @Override - protected void shardOperation(Request request, ShardRouting shardRouting, Task task, ActionListener listener) { + protected void shardOperation( + Request request, + ShardRouting shardRouting, + Task task, + ActionListener listener, + Void nodeContext + ) { ActionListener.completeWith(listener, () -> { SearchableSnapshots.ensureValidLicense(licenseState); final IndexShard indexShard = indicesService.indexServiceSafe(shardRouting.index()).getShard(shardRouting.id()); From 95aefceca3583cf6fa4e1ef713599993cd593926 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Thu, 13 Nov 2025 14:26:40 -0600 Subject: [PATCH 2/5] Moving listener to last in the args list --- .../action/TransportDataStreamsStatsAction.java | 4 ++-- .../analyze/TransportReloadAnalyzersAction.java | 4 ++-- .../clear/TransportClearIndicesCacheAction.java | 4 ++-- .../forcemerge/TransportForceMergeAction.java | 4 ++-- .../indices/recovery/TransportRecoveryAction.java | 4 ++-- .../segments/TransportIndicesSegmentsAction.java | 4 ++-- .../indices/stats/TransportFieldUsageAction.java | 4 ++-- .../indices/stats/TransportIndicesStatsAction.java | 4 ++-- .../node/TransportBroadcastByNodeAction.java | 9 +++++---- .../node/TransportBroadcastByNodeActionTests.java | 12 ++++++------ .../ccr/action/TransportForgetFollowerAction.java | 4 ++-- .../AbstractTransportSearchableSnapshotsAction.java | 4 ++-- 12 files changed, 31 insertions(+), 30 deletions(-) diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportDataStreamsStatsAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportDataStreamsStatsAction.java index 4ae89dac9b86c..c5026a660210f 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportDataStreamsStatsAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportDataStreamsStatsAction.java @@ -115,8 +115,8 @@ protected void shardOperation( DataStreamsStatsAction.Request request, ShardRouting shardRouting, Task task, - ActionListener listener, - Void nodeContext + Void nodeContext, + ActionListener listener ) { ActionListener.completeWith(listener, () -> { assert shardRouting.isSearchable() : "shard routing is not searchable: " + shardRouting; diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportReloadAnalyzersAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportReloadAnalyzersAction.java index fdb8a29a9870a..ea1299f594e6e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportReloadAnalyzersAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportReloadAnalyzersAction.java @@ -123,8 +123,8 @@ protected void shardOperation( ReloadAnalyzersRequest request, ShardRouting shardRouting, Task task, - ActionListener listener, - Void nodeContext + Void nodeContext, + ActionListener listener ) { ActionListener.completeWith(listener, () -> { logger.info("reloading analyzers for index shard " + shardRouting); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java index 5ee4c83efdf57..66aa4f734640c 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java @@ -95,8 +95,8 @@ protected void shardOperation( ClearIndicesCacheRequest request, ShardRouting shardRouting, Task task, - ActionListener listener, - Void nodeContext + Void nodeContext, + ActionListener listener ) { ActionListener.completeWith(listener, () -> { indicesService.clearIndexShardCache( diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/TransportForceMergeAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/TransportForceMergeAction.java index 4c80e94826863..eb6d8f973fc61 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/TransportForceMergeAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/TransportForceMergeAction.java @@ -95,8 +95,8 @@ protected void shardOperation( ForceMergeRequest request, ShardRouting shardRouting, Task task, - ActionListener listener, - Void nodeContext + Void nodeContext, + ActionListener listener ) { assert (task instanceof CancellableTask) == false; // TODO: add cancellation handling here once the task supports it SubscribableListener.newForked(l -> { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java index df498452d7e8e..bbd29bb4d39ee 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java @@ -107,8 +107,8 @@ protected void shardOperation( RecoveryRequest request, ShardRouting shardRouting, Task task, - ActionListener listener, - Void nodeContext + Void nodeContext, + ActionListener listener ) { ActionListener.completeWith(listener, () -> { assert task instanceof CancellableTask; diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/segments/TransportIndicesSegmentsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/segments/TransportIndicesSegmentsAction.java index 376cc3502688b..319b6ad73487c 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/segments/TransportIndicesSegmentsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/segments/TransportIndicesSegmentsAction.java @@ -110,8 +110,8 @@ protected void shardOperation( IndicesSegmentsRequest request, ShardRouting shardRouting, Task task, - ActionListener listener, - Void nodeContext + Void nodeContext, + ActionListener listener ) { ActionListener.completeWith(listener, () -> { assert task instanceof CancellableTask; diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportFieldUsageAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportFieldUsageAction.java index e46d8d59d314b..1c0f970a6ff20 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportFieldUsageAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportFieldUsageAction.java @@ -95,8 +95,8 @@ protected void shardOperation( FieldUsageStatsRequest request, ShardRouting shardRouting, Task task, - ActionListener listener, - Void nodeContext + Void nodeContext, + ActionListener listener ) { ActionListener.completeWith(listener, () -> { final ShardId shardId = shardRouting.shardId(); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java index ea81278798fc9..77b6264671063 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java @@ -117,8 +117,8 @@ protected void shardOperation( IndicesStatsRequest request, ShardRouting shardRouting, Task task, - ActionListener listener, - Void nodeContext + Void nodeContext, + ActionListener listener ) { ActionListener.completeWith(listener, () -> { assert task instanceof CancellableTask; diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java index b9d0d1800556c..709c8aae41c83 100644 --- a/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java @@ -185,8 +185,8 @@ protected abstract void shardOperation( Request request, ShardRouting shardRouting, Task task, - ActionListener listener, - NodeContext nodeContext + NodeContext nodeContext, + ActionListener listener ); protected NodeContext createNodeContext() { @@ -430,7 +430,7 @@ private void executeAsDataNode( @Override protected void sendItemRequest(ShardRouting shardRouting, ActionListener listener) { logger.trace(() -> format("[%s] executing operation for shard [%s]", actionName, shardRouting.shortSummary())); - ActionRunnable.wrap(listener, l -> shardOperation(request, shardRouting, task, l, nodeContext)).run(); + ActionRunnable.wrap(listener, l -> shardOperation(request, shardRouting, task, nodeContext, l)).run(); } @Override @@ -616,7 +616,8 @@ public void writeTo(StreamOutput out) throws IOException { } /** - * Can be used for implementations of {@link #shardOperation(BroadcastRequest, ShardRouting, Task, ActionListener, NodeContext) shardOperation} for + * Can be used for implementations of + * {@linkTransportBroadcastByNodeAction#shardOperation(BroadcastRequest, ShardRouting, Task, Object, ActionListener) shardOperation} for * which there is no shard-level return value. */ public static final class EmptyResult implements Writeable { diff --git a/server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java b/server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java index 990a66a2c54c4..f3bfe979976a4 100644 --- a/server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java @@ -216,8 +216,8 @@ protected void shardOperation( Request request, ShardRouting shardRouting, Task task, - ActionListener listener, - Integer nodeContext + Integer nodeContext, + ActionListener listener ) { assertThat(expectedNodeContext, not(nullValue())); assertThat(nodeContext, equalTo(expectedNodeContext)); @@ -666,8 +666,8 @@ protected void shardOperation( Request request, ShardRouting shardRouting, Task task, - ActionListener listener, - Integer nodeContext + Integer nodeContext, + ActionListener listener ) { // this test runs a node-level operation on three shards, cancelling the task some time during the execution on the second if (task instanceof CancellableTask cancellableTask) { @@ -726,8 +726,8 @@ protected void shardOperation( Request request, ShardRouting shardRouting, Task task, - ActionListener listener, - Integer nodeContext + Integer nodeContext, + ActionListener listener ) { listeners.add(listener); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportForgetFollowerAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportForgetFollowerAction.java index 5d1055d0fe17a..bfbec3c72c010 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportForgetFollowerAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportForgetFollowerAction.java @@ -97,8 +97,8 @@ protected void shardOperation( final ForgetFollowerAction.Request request, final ShardRouting shardRouting, Task task, - ActionListener listener, - Void nodeContext + Void nodeContext, + ActionListener listener ) { final Index followerIndex = new Index(request.followerIndex(), request.followerIndexUUID()); final Index leaderIndex = clusterService.state().metadata().getProject().index(request.leaderIndex()).getIndex(); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/AbstractTransportSearchableSnapshotsAction.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/AbstractTransportSearchableSnapshotsAction.java index 8ab962d049de1..8af6db59691c4 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/AbstractTransportSearchableSnapshotsAction.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/AbstractTransportSearchableSnapshotsAction.java @@ -110,8 +110,8 @@ protected void shardOperation( Request request, ShardRouting shardRouting, Task task, - ActionListener listener, - Void nodeContext + Void nodeContext, + ActionListener listener ) { ActionListener.completeWith(listener, () -> { SearchableSnapshots.ensureValidLicense(licenseState); From 8baf0a6f25967cb7e8ac417e2822a09e6bba1dfc Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Thu, 13 Nov 2025 14:30:19 -0600 Subject: [PATCH 3/5] updating javadocs --- .../broadcast/node/TransportBroadcastByNodeAction.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java index 709c8aae41c83..b68d62937b582 100644 --- a/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java @@ -40,6 +40,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.FixForMultiProject; +import org.elasticsearch.core.Nullable; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.AbstractTransportRequest; @@ -70,6 +71,7 @@ * @param the underlying client request * @param the response to the client request * @param per-shard operation results + * @param the optional node context created by createNodeContext and passed to shardOperation */ public abstract class TransportBroadcastByNodeAction< Request extends BroadcastRequest, @@ -179,13 +181,14 @@ Response newResponse( * @param request the node-level request * @param shardRouting the shard on which to execute the operation * @param task the task for this node-level request + * @param nodeContext the context created by {{@link #createNodeContext()}} * @param listener the listener to notify with the result of the shard-level operation */ protected abstract void shardOperation( Request request, ShardRouting shardRouting, Task task, - NodeContext nodeContext, + @Nullable NodeContext nodeContext, ActionListener listener ); From df8f40522700683f5c907886d701b2d051fd6e8c Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Thu, 13 Nov 2025 15:18:54 -0600 Subject: [PATCH 4/5] fixing javadocs --- .../broadcast/node/TransportBroadcastByNodeAction.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java index b68d62937b582..218d0af9a6c9d 100644 --- a/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java @@ -620,8 +620,8 @@ public void writeTo(StreamOutput out) throws IOException { /** * Can be used for implementations of - * {@linkTransportBroadcastByNodeAction#shardOperation(BroadcastRequest, ShardRouting, Task, Object, ActionListener) shardOperation} for - * which there is no shard-level return value. + * {@link #shardOperation(BroadcastRequest, ShardRouting, Task, Object, ActionListener) shardOperation} for which there is no + * shard-level return value. */ public static final class EmptyResult implements Writeable { public static final EmptyResult INSTANCE = new EmptyResult(); From 219c64811252e03a046eb88c205aa558ebc99fdd Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Fri, 14 Nov 2025 13:22:53 -0600 Subject: [PATCH 5/5] Apply suggestions from code review Co-authored-by: David Turner --- .../node/TransportBroadcastByNodeAction.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java index 218d0af9a6c9d..e8c0fc92cf544 100644 --- a/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java @@ -71,7 +71,8 @@ * @param the underlying client request * @param the response to the client request * @param per-shard operation results - * @param the optional node context created by createNodeContext and passed to shardOperation + * @param an (optional) node context created by {@link #createNodeContext} on each node and passed to each call + * to {@link #shardOperation} */ public abstract class TransportBroadcastByNodeAction< Request extends BroadcastRequest, @@ -192,6 +193,10 @@ protected abstract void shardOperation( ActionListener listener ); + /** + * @return an (optional) node-level context for this operation, passed to each call to {@link #shardOperation}. + */ + @Nullable protected NodeContext createNodeContext() { return null; } @@ -424,7 +429,7 @@ private void executeAsDataNode( ) { assert Transports.assertNotTransportThread("O(#shards) work must always fork to an appropriate executor"); logger.trace("[{}] executing operation on [{}] shards", actionName, shards.size()); - NodeContext nodeContext = createNodeContext(); + final NodeContext nodeContext = createNodeContext(); new CancellableFanOut() { final ArrayList results = new ArrayList<>(shards.size()); @@ -619,9 +624,7 @@ public void writeTo(StreamOutput out) throws IOException { } /** - * Can be used for implementations of - * {@link #shardOperation(BroadcastRequest, ShardRouting, Task, Object, ActionListener) shardOperation} for which there is no - * shard-level return value. + * Can be used for implementations of {@link #shardOperation} for which there is no shard-level return value. */ public static final class EmptyResult implements Writeable { public static final EmptyResult INSTANCE = new EmptyResult();