155
155
import java .util .function .Supplier ;
156
156
157
157
import static org .elasticsearch .TransportVersions .ERROR_TRACE_IN_TRANSPORT_HEADER ;
158
+ import static org .elasticsearch .common .Strings .format ;
158
159
import static org .elasticsearch .core .TimeValue .timeValueHours ;
159
160
import static org .elasticsearch .core .TimeValue .timeValueMillis ;
160
161
import static org .elasticsearch .core .TimeValue .timeValueMinutes ;
@@ -519,12 +520,18 @@ protected void doClose() {
519
520
* @param <T> the type of the response
520
521
* @param listener the action listener to be wrapped
521
522
* @param version channel version of the request
523
+ * @param nodeId id of the current node
524
+ * @param shardId id of the shard being searched
525
+ * @param taskId id of the task being executed
522
526
* @param threadPool with context where to write the new header
523
527
* @return the wrapped action listener
524
528
*/
525
529
static <T > ActionListener <T > maybeWrapListenerForStackTrace (
526
530
ActionListener <T > listener ,
527
531
TransportVersion version ,
532
+ String nodeId ,
533
+ ShardId shardId ,
534
+ long taskId ,
528
535
ThreadPool threadPool
529
536
) {
530
537
boolean header = true ;
@@ -533,6 +540,18 @@ static <T> ActionListener<T> maybeWrapListenerForStackTrace(
533
540
}
534
541
if (header == false ) {
535
542
return listener .delegateResponse ((l , e ) -> {
543
+ org .apache .logging .log4j .util .Supplier <String > messageSupplier = () -> format (
544
+ "[%s]%s: failed to execute search request for task [%d]" ,
545
+ nodeId ,
546
+ shardId ,
547
+ taskId
548
+ );
549
+ // Keep this logic aligned with that of SUPPRESSED_ERROR_LOGGER in RestResponse
550
+ if (ExceptionsHelper .status (e ).getStatus () < 500 || ExceptionsHelper .isNodeOrShardUnavailableTypeException (e )) {
551
+ logger .debug (messageSupplier , e );
552
+ } else {
553
+ logger .warn (messageSupplier , e );
554
+ }
536
555
ExceptionsHelper .unwrapCausesAndSuppressed (e , err -> {
537
556
err .setStackTrace (EMPTY_STACK_TRACE_ARRAY );
538
557
return false ;
@@ -544,7 +563,14 @@ static <T> ActionListener<T> maybeWrapListenerForStackTrace(
544
563
}
545
564
546
565
public void executeDfsPhase (ShardSearchRequest request , SearchShardTask task , ActionListener <SearchPhaseResult > listener ) {
547
- listener = maybeWrapListenerForStackTrace (listener , request .getChannelVersion (), threadPool );
566
+ listener = maybeWrapListenerForStackTrace (
567
+ listener ,
568
+ request .getChannelVersion (),
569
+ clusterService .localNode ().getId (),
570
+ request .shardId (),
571
+ task .getId (),
572
+ threadPool
573
+ );
548
574
final IndexShard shard = getShard (request );
549
575
rewriteAndFetchShardRequest (shard , request , listener .delegateFailure ((l , rewritten ) -> {
550
576
// fork the execution in the search thread pool
@@ -582,7 +608,14 @@ private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final Sea
582
608
}
583
609
584
610
public void executeQueryPhase (ShardSearchRequest request , SearchShardTask task , ActionListener <SearchPhaseResult > listener ) {
585
- ActionListener <SearchPhaseResult > finalListener = maybeWrapListenerForStackTrace (listener , request .getChannelVersion (), threadPool );
611
+ ActionListener <SearchPhaseResult > finalListener = maybeWrapListenerForStackTrace (
612
+ listener ,
613
+ request .getChannelVersion (),
614
+ clusterService .localNode ().getId (),
615
+ request .shardId (),
616
+ task .getId (),
617
+ threadPool
618
+ );
586
619
assert request .canReturnNullResponseIfMatchNoDocs () == false || request .numberOfShards () > 1
587
620
: "empty responses require more than one shard" ;
588
621
final IndexShard shard = getShard (request );
@@ -775,9 +808,16 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchSh
775
808
}
776
809
777
810
public void executeRankFeaturePhase (RankFeatureShardRequest request , SearchShardTask task , ActionListener <RankFeatureResult > listener ) {
778
- listener = maybeWrapListenerForStackTrace (listener , request .getShardSearchRequest ().getChannelVersion (), threadPool );
779
811
final ReaderContext readerContext = findReaderContext (request .contextId (), request );
780
812
final ShardSearchRequest shardSearchRequest = readerContext .getShardSearchRequest (request .getShardSearchRequest ());
813
+ listener = maybeWrapListenerForStackTrace (
814
+ listener ,
815
+ shardSearchRequest .getChannelVersion (),
816
+ clusterService .localNode ().getId (),
817
+ shardSearchRequest .shardId (),
818
+ task .getId (),
819
+ threadPool
820
+ );
781
821
final Releasable markAsUsed = readerContext .markAsUsed (getKeepAlive (shardSearchRequest ));
782
822
runAsync (getExecutor (readerContext .indexShard ()), () -> {
783
823
try (SearchContext searchContext = createContext (readerContext , shardSearchRequest , task , ResultsType .RANK_FEATURE , false )) {
@@ -822,8 +862,15 @@ public void executeQueryPhase(
822
862
ActionListener <ScrollQuerySearchResult > listener ,
823
863
TransportVersion version
824
864
) {
825
- listener = maybeWrapListenerForStackTrace (listener , version , threadPool );
826
865
final LegacyReaderContext readerContext = (LegacyReaderContext ) findReaderContext (request .contextId (), request );
866
+ listener = maybeWrapListenerForStackTrace (
867
+ listener ,
868
+ version ,
869
+ clusterService .localNode ().getId (),
870
+ readerContext .indexShard ().shardId (),
871
+ task .getId (),
872
+ threadPool
873
+ );
827
874
final Releasable markAsUsed ;
828
875
try {
829
876
markAsUsed = readerContext .markAsUsed (getScrollKeepAlive (request .scroll ()));
@@ -864,9 +911,16 @@ public void executeQueryPhase(
864
911
ActionListener <QuerySearchResult > listener ,
865
912
TransportVersion version
866
913
) {
867
- listener = maybeWrapListenerForStackTrace (listener , version , threadPool );
868
914
final ReaderContext readerContext = findReaderContext (request .contextId (), request .shardSearchRequest ());
869
915
final ShardSearchRequest shardSearchRequest = readerContext .getShardSearchRequest (request .shardSearchRequest ());
916
+ listener = maybeWrapListenerForStackTrace (
917
+ listener ,
918
+ version ,
919
+ clusterService .localNode ().getId (),
920
+ shardSearchRequest .shardId (),
921
+ task .getId (),
922
+ threadPool
923
+ );
870
924
final Releasable markAsUsed = readerContext .markAsUsed (getKeepAlive (shardSearchRequest ));
871
925
rewriteAndFetchShardRequest (readerContext .indexShard (), shardSearchRequest , listener .delegateFailure ((l , rewritten ) -> {
872
926
// fork the execution in the search thread pool
0 commit comments