@@ -83,6 +83,9 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<S
8383 private static final Logger logger = LogManager .getLogger (SearchQueryThenFetchAsyncAction .class );
8484
8585 private static final TransportVersion BATCHED_QUERY_PHASE_VERSION = TransportVersion .fromName ("batched_query_phase_version" );
86+ private static final TransportVersion BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE = TransportVersion .fromName (
87+ "batched_response_might_include_reduction_failure"
88+ );
8689
8790 private final SearchProgressListener progressListener ;
8891
@@ -221,20 +224,32 @@ public static final class NodeQueryResponse extends TransportResponse {
221224 private final RefCounted refCounted = LeakTracker .wrap (new SimpleRefCounted ());
222225
223226 private final Object [] results ;
227+ private final Exception reductionFailure ;
224228 private final SearchPhaseController .TopDocsStats topDocsStats ;
225229 private final QueryPhaseResultConsumer .MergeResult mergeResult ;
226230
227231 NodeQueryResponse (StreamInput in ) throws IOException {
228232 this .results = in .readArray (i -> i .readBoolean () ? new QuerySearchResult (i ) : i .readException (), Object []::new );
229- this .mergeResult = QueryPhaseResultConsumer .MergeResult .readFrom (in );
230- this .topDocsStats = SearchPhaseController .TopDocsStats .readFrom (in );
233+ if (in .getTransportVersion ().supports (BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE ) && in .readBoolean ()) {
234+ this .reductionFailure = in .readException ();
235+ this .mergeResult = null ;
236+ this .topDocsStats = null ;
237+ } else {
238+ this .reductionFailure = null ;
239+ this .mergeResult = QueryPhaseResultConsumer .MergeResult .readFrom (in );
240+ this .topDocsStats = SearchPhaseController .TopDocsStats .readFrom (in );
241+ }
231242 }
232243
233244 // public for tests
234245 public Object [] getResults () {
235246 return results ;
236247 }
237248
249+ Exception getReductionFailure () {
250+ return reductionFailure ;
251+ }
252+
238253 @ Override
239254 public void writeTo (StreamOutput out ) throws IOException {
240255 out .writeVInt (results .length );
@@ -245,7 +260,17 @@ public void writeTo(StreamOutput out) throws IOException {
245260 writePerShardResult (out , (QuerySearchResult ) result );
246261 }
247262 }
248- writeMergeResult (out , mergeResult , topDocsStats );
263+ if (out .getTransportVersion ().supports (BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE )) {
264+ boolean hasReductionFailure = reductionFailure != null ;
265+ out .writeBoolean (hasReductionFailure );
266+ if (hasReductionFailure ) {
267+ out .writeException (reductionFailure );
268+ } else {
269+ writeMergeResult (out , mergeResult , topDocsStats );
270+ }
271+ } else {
272+ writeMergeResult (out , mergeResult , topDocsStats );
273+ }
249274 }
250275
251276 @ Override
@@ -498,7 +523,12 @@ public Executor executor() {
498523 @ Override
499524 public void handleResponse (NodeQueryResponse response ) {
500525 if (results instanceof QueryPhaseResultConsumer queryPhaseResultConsumer ) {
501- queryPhaseResultConsumer .addBatchedPartialResult (response .topDocsStats , response .mergeResult );
526+ Exception reductionFailure = response .getReductionFailure ();
527+ if (reductionFailure != null ) {
528+ queryPhaseResultConsumer .failure .compareAndSet (null , reductionFailure );
529+ } else {
530+ queryPhaseResultConsumer .addBatchedPartialResult (response .topDocsStats , response .mergeResult );
531+ }
502532 }
503533 for (int i = 0 ; i < response .results .length ; i ++) {
504534 var s = request .shards .get (i );
@@ -520,6 +550,21 @@ public void handleResponse(NodeQueryResponse response) {
520550
521551 @ Override
522552 public void handleException (TransportException e ) {
553+ if (connection .getTransportVersion ().supports (BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE ) == false ) {
554+ bwcHandleException (e );
555+ return ;
556+ }
557+ Exception cause = (Exception ) ExceptionsHelper .unwrapCause (e );
558+ logger .debug ("handling node search exception coming from [" + nodeId + "]" , cause );
559+ onNodeQueryFailure (e , request , routing );
560+ }
561+
562+ /**
563+ * This code is strictly for _snapshot_ backwards compatibility. The feature flag
564+ * {@link SearchService#BATCHED_QUERY_PHASE_FEATURE_FLAG} was not turned on when the transport version
565+ * {@link SearchQueryThenFetchAsyncAction#BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE} was introduced.
566+ */
567+ private void bwcHandleException (TransportException e ) {
523568 Exception cause = (Exception ) ExceptionsHelper .unwrapCause (e );
524569 logger .debug ("handling node search exception coming from [" + nodeId + "]" , cause );
525570 if (e instanceof SendRequestTransportException || cause instanceof TaskCancelledException ) {
@@ -791,13 +836,101 @@ void onShardDone() {
791836 if (countDown .countDown () == false ) {
792837 return ;
793838 }
839+ if (channel .getVersion ().supports (BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE ) == false ) {
840+ bwcRespond ();
841+ return ;
842+ }
843+ var channelListener = new ChannelActionListener <>(channel );
844+ RecyclerBytesStreamOutput out = dependencies .transportService .newNetworkBytesStream ();
845+ out .setTransportVersion (channel .getVersion ());
846+ try (queryPhaseResultConsumer ) {
847+ Exception reductionFailure = queryPhaseResultConsumer .failure .get ();
848+ if (reductionFailure == null ) {
849+ writeSuccessfulResponse (out );
850+ } else {
851+ writeReductionFailureResponse (out , reductionFailure );
852+ }
853+ } catch (IOException e ) {
854+ releaseAllResultsContexts ();
855+ channelListener .onFailure (e );
856+ return ;
857+ }
858+ ActionListener .respondAndRelease (
859+ channelListener ,
860+ new BytesTransportResponse (out .moveToBytesReference (), out .getTransportVersion ())
861+ );
862+ }
863+
864+ // Writes the "successful" response (see NodeQueryResponse for the corresponding read logic)
865+ private void writeSuccessfulResponse (RecyclerBytesStreamOutput out ) throws IOException {
866+ final QueryPhaseResultConsumer .MergeResult mergeResult ;
867+ try {
868+ mergeResult = Objects .requireNonNullElse (
869+ queryPhaseResultConsumer .consumePartialMergeResultDataNode (),
870+ EMPTY_PARTIAL_MERGE_RESULT
871+ );
872+ } catch (Exception e ) {
873+ writeReductionFailureResponse (out , e );
874+ return ;
875+ }
876+ // translate shard indices to those on the coordinator so that it can interpret the merge result without adjustments,
877+ // also collect the set of indices that may be part of a subsequent fetch operation here so that we can release all other
878+ // indices without a roundtrip to the coordinating node
879+ final BitSet relevantShardIndices = new BitSet (searchRequest .shards .size ());
880+ if (mergeResult .reducedTopDocs () != null ) {
881+ for (ScoreDoc scoreDoc : mergeResult .reducedTopDocs ().scoreDocs ) {
882+ final int localIndex = scoreDoc .shardIndex ;
883+ scoreDoc .shardIndex = searchRequest .shards .get (localIndex ).shardIndex ;
884+ relevantShardIndices .set (localIndex );
885+ }
886+ }
887+ final int resultCount = queryPhaseResultConsumer .getNumShards ();
888+ out .writeVInt (resultCount );
889+ for (int i = 0 ; i < resultCount ; i ++) {
890+ var result = queryPhaseResultConsumer .results .get (i );
891+ if (result == null ) {
892+ NodeQueryResponse .writePerShardException (out , failures .remove (i ));
893+ } else {
894+ // free context id and remove it from the result right away in case we don't need it anymore
895+ maybeFreeContext (result , relevantShardIndices , namedWriteableRegistry );
896+ NodeQueryResponse .writePerShardResult (out , result );
897+ }
898+ }
899+ out .writeBoolean (false ); // does not have a reduction failure
900+ NodeQueryResponse .writeMergeResult (out , mergeResult , queryPhaseResultConsumer .topDocsStats );
901+ }
902+
903+ // Writes the "reduction failure" response (see NodeQueryResponse for the corresponding read logic)
904+ private void writeReductionFailureResponse (RecyclerBytesStreamOutput out , Exception reductionFailure ) throws IOException {
905+ final int resultCount = queryPhaseResultConsumer .getNumShards ();
906+ out .writeVInt (resultCount );
907+ for (int i = 0 ; i < resultCount ; i ++) {
908+ var result = queryPhaseResultConsumer .results .get (i );
909+ if (result == null ) {
910+ NodeQueryResponse .writePerShardException (out , failures .remove (i ));
911+ } else {
912+ NodeQueryResponse .writePerShardResult (out , result );
913+ }
914+ }
915+ out .writeBoolean (true ); // does have a reduction failure
916+ out .writeException (reductionFailure );
917+ releaseAllResultsContexts ();
918+ }
919+
920+ /**
921+ * This code is strictly for _snapshot_ backwards compatibility. The feature flag
922+ * {@link SearchService#BATCHED_QUERY_PHASE_FEATURE_FLAG} was not turned on when the transport version
923+ * {@link SearchQueryThenFetchAsyncAction#BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE} was introduced.
924+ */
925+ void bwcRespond () {
794926 RecyclerBytesStreamOutput out = null ;
795927 boolean success = false ;
796928 var channelListener = new ChannelActionListener <>(channel );
797929 try (queryPhaseResultConsumer ) {
798930 var failure = queryPhaseResultConsumer .failure .get ();
799931 if (failure != null ) {
800- handleMergeFailure (failure , channelListener , namedWriteableRegistry );
932+ releaseAllResultsContexts ();
933+ channelListener .onFailure (failure );
801934 return ;
802935 }
803936 final QueryPhaseResultConsumer .MergeResult mergeResult ;
@@ -807,7 +940,8 @@ void onShardDone() {
807940 EMPTY_PARTIAL_MERGE_RESULT
808941 );
809942 } catch (Exception e ) {
810- handleMergeFailure (e , channelListener , namedWriteableRegistry );
943+ releaseAllResultsContexts ();
944+ channelListener .onFailure (e );
811945 return ;
812946 }
813947 // translate shard indices to those on the coordinator so that it can interpret the merge result without adjustments,
@@ -839,7 +973,8 @@ void onShardDone() {
839973 NodeQueryResponse .writeMergeResult (out , mergeResult , queryPhaseResultConsumer .topDocsStats );
840974 success = true ;
841975 } catch (IOException e ) {
842- handleMergeFailure (e , channelListener , namedWriteableRegistry );
976+ releaseAllResultsContexts ();
977+ channelListener .onFailure (e );
843978 return ;
844979 }
845980 } finally {
@@ -868,11 +1003,7 @@ && isPartOfPIT(searchRequest.searchRequest, q.getContextId(), namedWriteableRegi
8681003 }
8691004 }
8701005
871- private void handleMergeFailure (
872- Exception e ,
873- ChannelActionListener <TransportResponse > channelListener ,
874- NamedWriteableRegistry namedWriteableRegistry
875- ) {
1006+ private void releaseAllResultsContexts () {
8761007 queryPhaseResultConsumer .getSuccessfulResults ()
8771008 .forEach (
8781009 searchPhaseResult -> releaseLocalContext (
@@ -882,7 +1013,6 @@ private void handleMergeFailure(
8821013 namedWriteableRegistry
8831014 )
8841015 );
885- channelListener .onFailure (e );
8861016 }
8871017
8881018 void consumeResult (QuerySearchResult queryResult ) {
0 commit comments