Skip to content

Commit b02a661

Browse files
authored
Handle index deletion while querying in ES|QL (#137702) (#138114)
If the index is deleted while querying, ES|QL might try to get the index routing from the latest cluster state. This could throw an index-not-found exception, which is not handled properly, causing the user to never receive a response, even though execution has completed. This also results in a dangling task in the task manager. Relates #126653 Closes #135863 (cherry picked from commit 0051df8)
1 parent fb0c52c commit b02a661

File tree

3 files changed

+123
-21
lines changed

3 files changed

+123
-21
lines changed

docs/changelog/137702.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 137702
2+
summary: Handle index deletion while querying in ES|QL
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 135863

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlRetryIT.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
package org.elasticsearch.xpack.esql.action;
99

1010
import org.elasticsearch.action.index.IndexRequestBuilder;
11+
import org.elasticsearch.cluster.ClusterState;
12+
import org.elasticsearch.cluster.metadata.IndexMetadata;
13+
import org.elasticsearch.cluster.service.ClusterService;
1114
import org.elasticsearch.index.IndexService;
1215
import org.elasticsearch.index.shard.IndexShard;
1316
import org.elasticsearch.indices.IndicesService;
@@ -20,11 +23,14 @@
2023
import java.util.ArrayList;
2124
import java.util.Collection;
2225
import java.util.List;
26+
import java.util.concurrent.CountDownLatch;
27+
import java.util.concurrent.TimeUnit;
2328
import java.util.concurrent.atomic.AtomicBoolean;
2429

2530
import static org.elasticsearch.index.shard.IndexShardTestCase.closeShardNoCheck;
2631
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
2732
import static org.hamcrest.Matchers.equalTo;
33+
import static org.hamcrest.Matchers.not;
2834

2935
public class EsqlRetryIT extends AbstractEsqlIntegTestCase {
3036

@@ -59,6 +65,37 @@ public void testRetryOnShardFailures() throws Exception {
5965
}
6066
}
6167

68+
public void testQueryWhileDeletingIndices() {
69+
populateIndices();
70+
CountDownLatch waitForDeletion = new CountDownLatch(1);
71+
try {
72+
final AtomicBoolean deleted = new AtomicBoolean();
73+
for (String node : internalCluster().getNodeNames()) {
74+
MockTransportService.getInstance(node)
75+
.addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> {
76+
if (deleted.compareAndSet(false, true)) {
77+
deleteIndexCompletely("log-index-2");
78+
waitForDeletion.countDown();
79+
} else {
80+
assertTrue(waitForDeletion.await(10, TimeUnit.SECONDS));
81+
}
82+
handler.messageReceived(request, channel, task);
83+
});
84+
}
85+
EsqlQueryRequest request = new EsqlQueryRequest();
86+
request.query("FROM log-* | STATS COUNT(timestamp) | LIMIT 1");
87+
request.allowPartialResults(true);
88+
try (var resp = run(request)) {
89+
assertTrue(resp.isPartial());
90+
assertThat(EsqlTestUtils.getValuesList(resp).getFirst().getFirst(), equalTo(4L));
91+
}
92+
} finally {
93+
for (String node : internalCluster().getNodeNames()) {
94+
MockTransportService.getInstance(node).clearAllRules();
95+
}
96+
}
97+
}
98+
6299
private void populateIndices() {
63100
internalCluster().ensureAtLeastNumDataNodes(2);
64101
assertAcked(prepareCreate("log-index-1").setSettings(indexSettings(between(1, 3), 1)).setMapping("timestamp", "type=date"));
@@ -88,4 +125,36 @@ private void closeOrFailShards(String nodeName) throws Exception {
88125
}
89126
}
90127
}
128+
129+
/**
130+
* Deletes the given index and ensures it is completely removed from the cluster state and from all nodes
131+
*/
132+
private void deleteIndexCompletely(String indexName) throws Exception {
133+
assertAcked(indicesAdmin().prepareDelete(indexName));
134+
String[] nodeNames = internalCluster().getNodeNames();
135+
assertBusy(() -> {
136+
for (String nodeName : nodeNames) {
137+
ClusterState clusterState = internalCluster().getInstance(ClusterService.class, nodeName).state();
138+
for (IndexMetadata imd : clusterState.metadata().indicesAllProjects()) {
139+
assertThat(
140+
"Index [" + indexName + "] still exists in the cluster state on [" + nodeName + "]",
141+
imd.getIndex().getName(),
142+
not(equalTo(indexName))
143+
);
144+
}
145+
}
146+
for (String nodeName : nodeNames) {
147+
final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName);
148+
for (IndexService indexService : indicesService) {
149+
for (IndexShard indexShard : indexService) {
150+
assertThat(
151+
"Index [" + indexName + "] still exists on node [" + nodeName + "]",
152+
indexShard.shardId().getIndexName(),
153+
not(equalTo(indexName))
154+
);
155+
}
156+
}
157+
}
158+
});
159+
}
91160
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 48 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.elasticsearch.index.Index;
3030
import org.elasticsearch.index.query.QueryBuilder;
3131
import org.elasticsearch.index.shard.ShardId;
32+
import org.elasticsearch.logging.LogManager;
33+
import org.elasticsearch.logging.Logger;
3234
import org.elasticsearch.search.SearchShardTarget;
3335
import org.elasticsearch.search.internal.AliasFilter;
3436
import org.elasticsearch.tasks.CancellableTask;
@@ -65,6 +67,8 @@
6567
*/
6668
abstract class DataNodeRequestSender {
6769

70+
private static final Logger LOGGER = LogManager.getLogger(DataNodeRequestSender.class);
71+
6872
/**
6973
* Query order according to the
7074
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/node-roles-overview.html">node roles</a>.
@@ -274,38 +278,51 @@ private void sendOneNodeRequest(TargetShards targetShards, ComputeListener compu
274278
final ActionListener<DriverCompletionInfo> listener = computeListener.acquireCompute();
275279
sendRequest(request.node, request.shardIds, request.aliasFilters, new NodeListener() {
276280

277-
void onAfter(DriverCompletionInfo info) {
281+
void onAfterRequest() {
278282
nodePermits.get(request.node).release();
279283
if (concurrentRequests != null) {
280284
concurrentRequests.release();
281285
}
282286
trySendingRequestsForPendingShards(targetShards, computeListener);
283-
listener.onResponse(info);
284287
}
285288

286289
@Override
287290
public void onResponse(DataNodeComputeResponse response) {
288-
// remove failures of successful shards
289-
for (ShardId shardId : request.shardIds()) {
290-
if (response.shardLevelFailures().containsKey(shardId) == false) {
291-
shardFailures.remove(shardId);
291+
try {
292+
// remove failures of successful shards
293+
for (ShardId shardId : request.shardIds()) {
294+
if (response.shardLevelFailures().containsKey(shardId) == false) {
295+
shardFailures.remove(shardId);
296+
}
292297
}
298+
for (var entry : response.shardLevelFailures().entrySet()) {
299+
final ShardId shardId = entry.getKey();
300+
trackShardLevelFailure(shardId, false, entry.getValue());
301+
pendingShardIds.add(shardId);
302+
}
303+
onAfterRequest();
304+
} catch (Exception ex) {
305+
expectNoFailure("expect no failure while handling data node response", ex);
306+
listener.onFailure(ex);
307+
return;
293308
}
294-
for (var entry : response.shardLevelFailures().entrySet()) {
295-
final ShardId shardId = entry.getKey();
296-
trackShardLevelFailure(shardId, false, entry.getValue());
297-
pendingShardIds.add(shardId);
298-
}
299-
onAfter(response.completionInfo());
309+
listener.onResponse(response.completionInfo());
300310
}
301311

302312
@Override
303313
public void onFailure(Exception e, boolean receivedData) {
304-
for (ShardId shardId : request.shardIds) {
305-
trackShardLevelFailure(shardId, receivedData, e);
306-
pendingShardIds.add(shardId);
314+
try {
315+
for (ShardId shardId : request.shardIds) {
316+
trackShardLevelFailure(shardId, receivedData, e);
317+
pendingShardIds.add(shardId);
318+
}
319+
onAfterRequest();
320+
} catch (Exception ex) {
321+
expectNoFailure("expect no failure while handling failure of data node request", ex);
322+
listener.onFailure(ex);
323+
return;
307324
}
308-
onAfter(DriverCompletionInfo.EMPTY);
325+
listener.onResponse(DriverCompletionInfo.EMPTY);
309326
}
310327

311328
@Override
@@ -317,6 +334,11 @@ public void onSkip() {
317334
onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()));
318335
}
319336
}
337+
338+
private void expectNoFailure(String message, Exception e) {
339+
LOGGER.error(message, e);
340+
assert false : new AssertionError(message, e);
341+
}
320342
});
321343
}
322344

@@ -507,15 +529,20 @@ Map<ShardId, List<DiscoveryNode>> resolveShards(Set<ShardId> shardIds) {
507529
var project = projectResolver.getProjectState(clusterService.state());
508530
var nodes = Maps.<ShardId, List<DiscoveryNode>>newMapWithExpectedSize(shardIds.size());
509531
for (var shardId : shardIds) {
510-
nodes.put(
511-
shardId,
512-
project.routingTable()
532+
List<DiscoveryNode> allocatedNodes;
533+
try {
534+
allocatedNodes = project.routingTable()
513535
.shardRoutingTable(shardId)
514536
.allShards()
515537
.filter(shard -> shard.active() && shard.isSearchable())
516538
.map(shard -> project.cluster().nodes().get(shard.currentNodeId()))
517-
.toList()
518-
);
539+
.toList();
540+
} catch (Exception ignored) {
541+
// If the target index is deleted or the target shard is not found after the query has started,
542+
// we skip resolving its new shard routing, and that shard will not be retried.
543+
continue;
544+
}
545+
nodes.put(shardId, allocatedNodes);
519546
}
520547
return nodes;
521548
}

0 commit comments

Comments
 (0)