Skip to content

Commit f9ae4c3

Browse files
authored
Resharding Add Flush and Refresh functionality (#136880)
Add functionality to forward flush and refresh requests to target shards during a reshard operation.
1 parent 8a7f820 commit f9ae4c3

File tree

14 files changed

+261
-30
lines changed

14 files changed

+261
-30
lines changed

server/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.elasticsearch.action.support.ActiveShardCount;
1313
import org.elasticsearch.action.support.replication.ReplicationRequest;
14+
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
1415
import org.elasticsearch.common.io.stream.StreamInput;
1516
import org.elasticsearch.common.io.stream.StreamOutput;
1617
import org.elasticsearch.index.shard.ShardId;
@@ -21,8 +22,13 @@ public class ShardFlushRequest extends ReplicationRequest<ShardFlushRequest> {
2122

2223
private final FlushRequest request;
2324

24-
public ShardFlushRequest(FlushRequest request, ShardId shardId) {
25-
super(shardId);
25+
/**
26+
* Creates a request for a resolved shard id and SplitShardCountSummary (used
27+
* to determine if the request needs to be executed on a split shard not yet seen by the
28+
* coordinator that sent the request)
29+
*/
30+
public ShardFlushRequest(FlushRequest request, ShardId shardId, SplitShardCountSummary reshardSplitShardCountSummary) {
31+
super(shardId, reshardSplitShardCountSummary);
2632
this.request = request;
2733
this.waitForActiveShards = ActiveShardCount.NONE; // don't wait for any active shards before proceeding, by default
2834
}

server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.client.internal.node.NodeClient;
1919
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2020
import org.elasticsearch.cluster.project.ProjectResolver;
21+
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
2122
import org.elasticsearch.cluster.service.ClusterService;
2223
import org.elasticsearch.index.shard.ShardId;
2324
import org.elasticsearch.injection.guice.Inject;
@@ -59,8 +60,8 @@ public TransportFlushAction(
5960
}
6061

6162
@Override
62-
protected ShardFlushRequest newShardRequest(FlushRequest request, ShardId shardId) {
63-
return new ShardFlushRequest(request, shardId);
63+
protected ShardFlushRequest newShardRequest(FlushRequest request, ShardId shardId, SplitShardCountSummary shardCountSummary) {
64+
return new ShardFlushRequest(request, shardId, shardCountSummary);
6465
}
6566

6667
@Override

server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,16 @@
1313
import org.elasticsearch.action.ActionListener;
1414
import org.elasticsearch.action.ActionType;
1515
import org.elasticsearch.action.support.ActionFilters;
16+
import org.elasticsearch.action.support.replication.ReplicationRequestSplitHelper;
1617
import org.elasticsearch.action.support.replication.ReplicationResponse;
1718
import org.elasticsearch.action.support.replication.TransportReplicationAction;
1819
import org.elasticsearch.cluster.action.shard.ShardStateAction;
20+
import org.elasticsearch.cluster.project.ProjectResolver;
1921
import org.elasticsearch.cluster.service.ClusterService;
2022
import org.elasticsearch.common.io.stream.StreamInput;
2123
import org.elasticsearch.common.io.stream.StreamOutput;
2224
import org.elasticsearch.common.settings.Settings;
25+
import org.elasticsearch.core.Tuple;
2326
import org.elasticsearch.index.shard.IndexShard;
2427
import org.elasticsearch.index.shard.ShardId;
2528
import org.elasticsearch.indices.IndicesService;
@@ -32,12 +35,15 @@
3235
import org.elasticsearch.transport.TransportService;
3336

3437
import java.io.IOException;
38+
import java.util.Map;
3539

3640
public class TransportShardFlushAction extends TransportReplicationAction<ShardFlushRequest, ShardFlushRequest, ReplicationResponse> {
3741

3842
public static final String NAME = FlushAction.NAME + "[s]";
3943
public static final ActionType<ReplicationResponse> TYPE = new ActionType<>(NAME);
4044

45+
private final ProjectResolver projectResolver;
46+
4147
@Inject
4248
public TransportShardFlushAction(
4349
Settings settings,
@@ -46,7 +52,8 @@ public TransportShardFlushAction(
4652
IndicesService indicesService,
4753
ThreadPool threadPool,
4854
ShardStateAction shardStateAction,
49-
ActionFilters actionFilters
55+
ActionFilters actionFilters,
56+
ProjectResolver projectResolver
5057
) {
5158
super(
5259
settings,
@@ -64,6 +71,7 @@ public TransportShardFlushAction(
6471
PrimaryActionExecution.RejectOnOverload,
6572
ReplicaActionExecution.SubjectToCircuitBreaker
6673
);
74+
this.projectResolver = projectResolver;
6775
transportService.registerRequestHandler(
6876
PRE_SYNCED_FLUSH_ACTION_NAME,
6977
threadPool.executor(ThreadPool.Names.FLUSH),
@@ -89,6 +97,27 @@ protected void shardOperationOnPrimary(
8997
}));
9098
}
9199

100+
// We are here because there was a mismatch between the SplitShardCountSummary in the request
101+
// and that on the primary shard node. We assume that the request is exactly 1 reshard split behind
102+
// the current state.
103+
@Override
104+
protected Map<ShardId, ShardFlushRequest> splitRequestOnPrimary(ShardFlushRequest request) {
105+
return ReplicationRequestSplitHelper.splitRequest(
106+
request,
107+
projectResolver.getProjectMetadata(clusterService.state()),
108+
(targetShard, shardCountSummary) -> new ShardFlushRequest(request.getRequest(), targetShard, shardCountSummary)
109+
);
110+
}
111+
112+
@Override
113+
protected Tuple<ReplicationResponse, Exception> combineSplitResponses(
114+
ShardFlushRequest originalRequest,
115+
Map<ShardId, ShardFlushRequest> splitRequests,
116+
Map<ShardId, Tuple<ReplicationResponse, Exception>> responses
117+
) {
118+
return ReplicationRequestSplitHelper.combineSplitResponses(originalRequest, splitRequests, responses);
119+
}
120+
92121
@Override
93122
protected void shardOperationOnReplica(ShardFlushRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
94123
replica.flush(request.getRequest(), listener.map(flushed -> {

server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.client.internal.node.NodeClient;
2020
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2121
import org.elasticsearch.cluster.project.ProjectResolver;
22+
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
2223
import org.elasticsearch.cluster.service.ClusterService;
2324
import org.elasticsearch.index.shard.ShardId;
2425
import org.elasticsearch.injection.guice.Inject;
@@ -60,8 +61,8 @@ public TransportRefreshAction(
6061
}
6162

6263
@Override
63-
protected BasicReplicationRequest newShardRequest(RefreshRequest request, ShardId shardId) {
64-
BasicReplicationRequest replicationRequest = new BasicReplicationRequest(shardId);
64+
protected BasicReplicationRequest newShardRequest(RefreshRequest request, ShardId shardId, SplitShardCountSummary shardCountSummary) {
65+
BasicReplicationRequest replicationRequest = new BasicReplicationRequest(shardId, shardCountSummary);
6566
replicationRequest.waitForActiveShards(ActiveShardCount.NONE);
6667
return replicationRequest;
6768
}

server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,18 @@
1616
import org.elasticsearch.action.support.ActionFilters;
1717
import org.elasticsearch.action.support.replication.BasicReplicationRequest;
1818
import org.elasticsearch.action.support.replication.ReplicationOperation;
19+
import org.elasticsearch.action.support.replication.ReplicationRequestSplitHelper;
1920
import org.elasticsearch.action.support.replication.ReplicationResponse;
2021
import org.elasticsearch.action.support.replication.TransportReplicationAction;
2122
import org.elasticsearch.cluster.action.shard.ShardStateAction;
23+
import org.elasticsearch.cluster.project.ProjectResolver;
2224
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
2325
import org.elasticsearch.cluster.service.ClusterService;
2426
import org.elasticsearch.common.io.stream.StreamInput;
2527
import org.elasticsearch.common.settings.Settings;
28+
import org.elasticsearch.core.Tuple;
2629
import org.elasticsearch.index.shard.IndexShard;
30+
import org.elasticsearch.index.shard.ShardId;
2731
import org.elasticsearch.indices.IndicesService;
2832
import org.elasticsearch.injection.guice.Inject;
2933
import org.elasticsearch.logging.LogManager;
@@ -32,6 +36,7 @@
3236
import org.elasticsearch.transport.TransportService;
3337

3438
import java.io.IOException;
39+
import java.util.Map;
3540
import java.util.concurrent.Executor;
3641

3742
public class TransportShardRefreshAction extends TransportReplicationAction<
@@ -46,6 +51,7 @@ public class TransportShardRefreshAction extends TransportReplicationAction<
4651
public static final String SOURCE_API = "api";
4752

4853
private final Executor refreshExecutor;
54+
private final ProjectResolver projectResolver;
4955

5056
@Inject
5157
public TransportShardRefreshAction(
@@ -55,7 +61,8 @@ public TransportShardRefreshAction(
5561
IndicesService indicesService,
5662
ThreadPool threadPool,
5763
ShardStateAction shardStateAction,
58-
ActionFilters actionFilters
64+
ActionFilters actionFilters,
65+
ProjectResolver projectResolver
5966
) {
6067
super(
6168
settings,
@@ -73,6 +80,7 @@ public TransportShardRefreshAction(
7380
PrimaryActionExecution.RejectOnOverload,
7481
ReplicaActionExecution.SubjectToCircuitBreaker
7582
);
83+
this.projectResolver = projectResolver;
7684
// registers the unpromotable version of shard refresh action
7785
new TransportUnpromotableShardRefreshAction(
7886
clusterService,
@@ -104,6 +112,27 @@ protected void shardOperationOnPrimary(
104112
}));
105113
}
106114

115+
// We are here because there was mismatch between the SplitShardCountSummary in the request
116+
// and that on the primary shard node. We assume that the request is exactly 1 reshard split behind
117+
// the current state.
118+
@Override
119+
protected Map<ShardId, BasicReplicationRequest> splitRequestOnPrimary(BasicReplicationRequest request) {
120+
return ReplicationRequestSplitHelper.splitRequest(
121+
request,
122+
projectResolver.getProjectMetadata(clusterService.state()),
123+
(targetShard, shardCountSummary) -> new BasicReplicationRequest(targetShard, shardCountSummary)
124+
);
125+
}
126+
127+
@Override
128+
protected Tuple<ReplicationResponse, Exception> combineSplitResponses(
129+
BasicReplicationRequest originalRequest,
130+
Map<ShardId, BasicReplicationRequest> splitRequests,
131+
Map<ShardId, Tuple<ReplicationResponse, Exception>> responses
132+
) {
133+
return ReplicationRequestSplitHelper.combineSplitResponses(originalRequest, splitRequests, responses);
134+
}
135+
107136
@Override
108137
protected void shardOperationOnReplica(ShardRefreshReplicaRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
109138
replica.externalRefresh(SOURCE_API, listener.safeMap(refreshResult -> {

server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -403,11 +403,9 @@ private void executeBulkRequestsByShard(
403403
final List<BulkItemRequest> requests = entry.getValue();
404404

405405
// Get effective shardCount for shardId and pass it on as parameter to new BulkShardRequest
406-
var indexMetadata = project.index(shardId.getIndexName());
407-
SplitShardCountSummary reshardSplitShardCountSummary = SplitShardCountSummary.UNSET;
408-
if (indexMetadata != null) {
409-
reshardSplitShardCountSummary = SplitShardCountSummary.forIndexing(indexMetadata, shardId.getId());
410-
}
406+
var indexMetadata = project.getIndexSafe(shardId.getIndex());
407+
SplitShardCountSummary reshardSplitShardCountSummary = SplitShardCountSummary.forIndexing(indexMetadata, shardId.getId());
408+
411409
BulkShardRequest bulkShardRequest = new BulkShardRequest(
412410
shardId,
413411
reshardSplitShardCountSummary,
@@ -416,7 +414,7 @@ private void executeBulkRequestsByShard(
416414
bulkRequest.isSimulated()
417415
);
418416

419-
if (indexMetadata != null && indexMetadata.getInferenceFields().isEmpty() == false) {
417+
if (indexMetadata.getInferenceFields().isEmpty() == false) {
420418
bulkShardRequest.setInferenceFieldMap(indexMetadata.getInferenceFields());
421419
}
422420
bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());

server/src/main/java/org/elasticsearch/action/bulk/ShardBulkSplitHelper.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@
1010
package org.elasticsearch.action.bulk;
1111

1212
import org.elasticsearch.action.DocWriteRequest;
13+
import org.elasticsearch.cluster.metadata.IndexMetadata;
1314
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1415
import org.elasticsearch.cluster.routing.IndexRouting;
16+
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
1517
import org.elasticsearch.core.Tuple;
1618
import org.elasticsearch.index.Index;
1719
import org.elasticsearch.index.shard.ShardId;
@@ -33,7 +35,9 @@ private ShardBulkSplitHelper() {}
3335
public static Map<ShardId, BulkShardRequest> splitRequests(BulkShardRequest request, ProjectMetadata project) {
3436
final ShardId sourceShardId = request.shardId();
3537
final Index index = sourceShardId.getIndex();
36-
IndexRouting indexRouting = IndexRouting.fromIndexMetadata(project.getIndexSafe(index));
38+
IndexMetadata indexMetadata = project.getIndexSafe(index);
39+
IndexRouting indexRouting = IndexRouting.fromIndexMetadata(indexMetadata);
40+
SplitShardCountSummary shardCountSummary = SplitShardCountSummary.forIndexing(indexMetadata, request.shardId().getId());
3741

3842
Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
3943
Map<ShardId, BulkShardRequest> bulkRequestsPerShard = new HashMap<>();
@@ -57,17 +61,26 @@ public static Map<ShardId, BulkShardRequest> splitRequests(BulkShardRequest requ
5761

5862
// All items belong to either the source shard or target shard.
5963
if (requestsByShard.size() == 1) {
60-
// Return the original request if no items were split to target.
64+
// Return the original request if no items were split to target. Note that
65+
// this original request still contains the stale SplitShardCountSummary.
66+
// This is alright because we hold primary indexing permits while calling this split
67+
// method and we execute this request on the primary without letting go of the indexing permits.
68+
// This means that a second split cannot occur in the meantime.
6169
if (requestsByShard.containsKey(sourceShardId)) {
6270
return Map.of(sourceShardId, request);
6371
}
6472
}
6573

74+
// Create a new BulkShardRequest(s) with the updated SplitShardCountSummary. This is because
75+
// we do not hold primary permits on the target shard, and hence it can proceed with
76+
// a second split operation while this request is still pending. We must verify the
77+
// SplitShardCountSummary again on the target.
6678
for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
6779
final ShardId shardId = entry.getKey();
6880
final List<BulkItemRequest> requests = entry.getValue();
6981
BulkShardRequest bulkShardRequest = new BulkShardRequest(
7082
shardId,
83+
shardCountSummary,
7184
request.getRefreshPolicy(),
7285
requests.toArray(new BulkItemRequest[0]),
7386
request.isSimulated()

server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ private void handleMultiGetOnUnpromotableShard(
174174
ShardId shardId = indexShard.shardId();
175175
if (request.refresh()) {
176176
logger.trace("send refresh action for shard {}", shardId);
177+
// TODO: Do we need to pass in shardCountSummary here ?
177178
var refreshRequest = new BasicReplicationRequest(shardId);
178179
refreshRequest.setParentTask(request.getParentTask());
179180
client.executeLocally(

server/src/main/java/org/elasticsearch/action/support/replication/BasicReplicationRequest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.action.support.replication;
1111

12+
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
1213
import org.elasticsearch.common.io.stream.StreamInput;
1314
import org.elasticsearch.index.shard.ShardId;
1415

@@ -24,10 +25,20 @@ public class BasicReplicationRequest extends ReplicationRequest<BasicReplication
2425
/**
2526
* Creates a new request with resolved shard id
2627
*/
28+
// TODO: Check if callers of this need to be modified to pass in shardCountSummary and eventually remove this constructor
2729
public BasicReplicationRequest(ShardId shardId) {
2830
super(shardId);
2931
}
3032

33+
/**
34+
* Creates a new request with resolved shard id and SplitShardCountSummary (used
35+
* to determine if the request needs to be executed on a split shard not yet seen by the
36+
* coordinator that sent the request)
37+
*/
38+
public BasicReplicationRequest(ShardId shardId, SplitShardCountSummary reshardSplitShardCountSummary) {
39+
super(shardId, reshardSplitShardCountSummary);
40+
}
41+
3142
public BasicReplicationRequest(StreamInput in) throws IOException {
3243
super(in);
3344
}

0 commit comments

Comments
 (0)