Skip to content

Commit 9f6eb1d

Browse files
authored
Log stack traces on data nodes before they are cleared for transport (#125732)
We recently cleared stack traces on data nodes before transport back to the coordinating node when error_trace=false to reduce unnecessary data transfer and memory on the coordinating node (#118266). However, all logging of exceptions happens on the coordinating node, so stack traces disappeared from any logs. This change logs stack traces directly on the data node when error_trace=false.
1 parent e4ce993 commit 9f6eb1d

File tree

7 files changed

+458
-7
lines changed

7 files changed

+458
-7
lines changed

docs/changelog/125732.yaml

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 125732
2+
summary: Log stack traces on data nodes before they are cleared for transport
3+
area: Search
4+
type: bug
5+
issues: []

qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/SearchErrorTraceIT.java

+117
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111

1212
import org.apache.http.entity.ContentType;
1313
import org.apache.http.nio.entity.NByteArrayEntity;
14+
import org.apache.logging.log4j.Level;
15+
import org.apache.logging.log4j.core.config.Configurator;
1416
import org.elasticsearch.action.search.MultiSearchRequest;
1517
import org.elasticsearch.action.search.SearchRequest;
1618
import org.elasticsearch.client.Request;
@@ -20,10 +22,12 @@
2022
import org.elasticsearch.search.ErrorTraceHelper;
2123
import org.elasticsearch.search.SearchService;
2224
import org.elasticsearch.search.builder.SearchSourceBuilder;
25+
import org.elasticsearch.test.MockLog;
2326
import org.elasticsearch.test.transport.MockTransportService;
2427
import org.elasticsearch.xcontent.XContentType;
2528
import org.junit.After;
2629
import org.junit.Before;
30+
import org.junit.BeforeClass;
2731

2832
import java.io.IOException;
2933
import java.nio.charset.Charset;
@@ -40,6 +44,11 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
4044
return CollectionUtils.appendToCopyNoNullElements(super.nodePlugins(), MockTransportService.TestPlugin.class);
4145
}
4246

47+
@BeforeClass
48+
public static void setDebugLogLevel() {
49+
Configurator.setLevel(SearchService.class, Level.DEBUG);
50+
}
51+
4352
@Before
4453
public void setupMessageListener() {
4554
hasStackTrace = ErrorTraceHelper.setupErrorTraceListener(internalCluster());
@@ -118,6 +127,61 @@ public void testSearchFailingQueryErrorTraceFalse() throws IOException {
118127
assertFalse(hasStackTrace.getAsBoolean());
119128
}
120129

130+
public void testDataNodeDoesNotLogStackTraceWhenErrorTraceTrue() throws IOException {
131+
setupIndexWithDocs();
132+
133+
Request searchRequest = new Request("POST", "/_search");
134+
searchRequest.setJsonEntity("""
135+
{
136+
"query": {
137+
"simple_query_string" : {
138+
"query": "foo",
139+
"fields": ["field"]
140+
}
141+
}
142+
}
143+
""");
144+
145+
String errorTriggeringIndex = "test2";
146+
int numShards = getNumShards(errorTriggeringIndex).numPrimaries;
147+
try (var mockLog = MockLog.capture(SearchService.class)) {
148+
ErrorTraceHelper.addUnseenLoggingExpectations(numShards, mockLog, errorTriggeringIndex);
149+
150+
searchRequest.addParameter("error_trace", "true");
151+
getRestClient().performRequest(searchRequest);
152+
mockLog.assertAllExpectationsMatched();
153+
}
154+
}
155+
156+
public void testDataNodeLogsStackTraceWhenErrorTraceFalseOrEmpty() throws IOException {
157+
setupIndexWithDocs();
158+
159+
Request searchRequest = new Request("POST", "/_search");
160+
searchRequest.setJsonEntity("""
161+
{
162+
"query": {
163+
"simple_query_string" : {
164+
"query": "foo",
165+
"fields": ["field"]
166+
}
167+
}
168+
}
169+
""");
170+
171+
String errorTriggeringIndex = "test2";
172+
int numShards = getNumShards(errorTriggeringIndex).numPrimaries;
173+
try (var mockLog = MockLog.capture(SearchService.class)) {
174+
ErrorTraceHelper.addSeenLoggingExpectations(numShards, mockLog, errorTriggeringIndex);
175+
176+
// error_trace defaults to false so we can test both cases with some randomization
177+
if (randomBoolean()) {
178+
searchRequest.addParameter("error_trace", "false");
179+
}
180+
getRestClient().performRequest(searchRequest);
181+
mockLog.assertAllExpectationsMatched();
182+
}
183+
}
184+
121185
public void testMultiSearchFailingQueryErrorTraceDefault() throws IOException {
122186
setupIndexWithDocs();
123187

@@ -168,4 +232,57 @@ public void testMultiSearchFailingQueryErrorTraceFalse() throws IOException {
168232

169233
assertFalse(hasStackTrace.getAsBoolean());
170234
}
235+
236+
public void testDataNodeDoesNotLogStackTraceWhenErrorTraceTrueMultiSearch() throws IOException {
237+
setupIndexWithDocs();
238+
239+
XContentType contentType = XContentType.JSON;
240+
MultiSearchRequest multiSearchRequest = new MultiSearchRequest().add(
241+
new SearchRequest("test*").source(new SearchSourceBuilder().query(simpleQueryStringQuery("foo").field("field")))
242+
);
243+
Request searchRequest = new Request("POST", "/_msearch");
244+
byte[] requestBody = MultiSearchRequest.writeMultiLineFormat(multiSearchRequest, contentType.xContent());
245+
searchRequest.setEntity(
246+
new NByteArrayEntity(requestBody, ContentType.create(contentType.mediaTypeWithoutParameters(), (Charset) null))
247+
);
248+
249+
searchRequest.addParameter("error_trace", "true");
250+
251+
String errorTriggeringIndex = "test2";
252+
int numShards = getNumShards(errorTriggeringIndex).numPrimaries;
253+
try (var mockLog = MockLog.capture(SearchService.class)) {
254+
ErrorTraceHelper.addUnseenLoggingExpectations(numShards, mockLog, errorTriggeringIndex);
255+
256+
getRestClient().performRequest(searchRequest);
257+
mockLog.assertAllExpectationsMatched();
258+
}
259+
}
260+
261+
public void testDataNodeLogsStackTraceWhenErrorTraceFalseOrEmptyMultiSearch() throws IOException {
262+
setupIndexWithDocs();
263+
264+
XContentType contentType = XContentType.JSON;
265+
MultiSearchRequest multiSearchRequest = new MultiSearchRequest().add(
266+
new SearchRequest("test*").source(new SearchSourceBuilder().query(simpleQueryStringQuery("foo").field("field")))
267+
);
268+
Request searchRequest = new Request("POST", "/_msearch");
269+
byte[] requestBody = MultiSearchRequest.writeMultiLineFormat(multiSearchRequest, contentType.xContent());
270+
searchRequest.setEntity(
271+
new NByteArrayEntity(requestBody, ContentType.create(contentType.mediaTypeWithoutParameters(), (Charset) null))
272+
);
273+
274+
// error_trace defaults to false so we can test both cases with some randomization
275+
if (randomBoolean()) {
276+
searchRequest.addParameter("error_trace", "false");
277+
}
278+
279+
String errorTriggeringIndex = "test2";
280+
int numShards = getNumShards(errorTriggeringIndex).numPrimaries;
281+
try (var mockLog = MockLog.capture(SearchService.class)) {
282+
ErrorTraceHelper.addSeenLoggingExpectations(numShards, mockLog, errorTriggeringIndex);
283+
284+
getRestClient().performRequest(searchRequest);
285+
mockLog.assertAllExpectationsMatched();
286+
}
287+
}
171288
}

server/src/main/java/org/elasticsearch/search/SearchService.java

+59-5
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@
156156
import java.util.function.Supplier;
157157

158158
import static org.elasticsearch.TransportVersions.ERROR_TRACE_IN_TRANSPORT_HEADER;
159+
import static org.elasticsearch.common.Strings.format;
159160
import static org.elasticsearch.core.TimeValue.timeValueHours;
160161
import static org.elasticsearch.core.TimeValue.timeValueMillis;
161162
import static org.elasticsearch.core.TimeValue.timeValueMinutes;
@@ -563,12 +564,18 @@ protected void doClose() {
563564
* @param <T> the type of the response
564565
* @param listener the action listener to be wrapped
565566
* @param version channel version of the request
567+
* @param nodeId id of the current node
568+
* @param shardId id of the shard being searched
569+
* @param taskId id of the task being executed
566570
* @param threadPool with context where to write the new header
567571
* @return the wrapped action listener
568572
*/
569573
static <T> ActionListener<T> maybeWrapListenerForStackTrace(
570574
ActionListener<T> listener,
571575
TransportVersion version,
576+
String nodeId,
577+
ShardId shardId,
578+
long taskId,
572579
ThreadPool threadPool
573580
) {
574581
boolean header = true;
@@ -577,6 +584,18 @@ static <T> ActionListener<T> maybeWrapListenerForStackTrace(
577584
}
578585
if (header == false) {
579586
return listener.delegateResponse((l, e) -> {
587+
org.apache.logging.log4j.util.Supplier<String> messageSupplier = () -> format(
588+
"[%s]%s: failed to execute search request for task [%d]",
589+
nodeId,
590+
shardId,
591+
taskId
592+
);
593+
// Keep this logic aligned with that of SUPPRESSED_ERROR_LOGGER in RestResponse
594+
if (ExceptionsHelper.status(e).getStatus() < 500 || ExceptionsHelper.isNodeOrShardUnavailableTypeException(e)) {
595+
logger.debug(messageSupplier, e);
596+
} else {
597+
logger.warn(messageSupplier, e);
598+
}
580599
ExceptionsHelper.unwrapCausesAndSuppressed(e, err -> {
581600
err.setStackTrace(EMPTY_STACK_TRACE_ARRAY);
582601
return false;
@@ -588,7 +607,14 @@ static <T> ActionListener<T> maybeWrapListenerForStackTrace(
588607
}
589608

590609
public void executeDfsPhase(ShardSearchRequest request, SearchShardTask task, ActionListener<SearchPhaseResult> listener) {
591-
listener = maybeWrapListenerForStackTrace(listener, request.getChannelVersion(), threadPool);
610+
listener = maybeWrapListenerForStackTrace(
611+
listener,
612+
request.getChannelVersion(),
613+
clusterService.localNode().getId(),
614+
request.shardId(),
615+
task.getId(),
616+
threadPool
617+
);
592618
final IndexShard shard = getShard(request);
593619
rewriteAndFetchShardRequest(shard, request, listener.delegateFailure((l, rewritten) -> {
594620
// fork the execution in the search thread pool
@@ -632,7 +658,14 @@ public void executeQueryPhase(ShardSearchRequest request, CancellableTask task,
632658
rewriteAndFetchShardRequest(
633659
shard,
634660
request,
635-
maybeWrapListenerForStackTrace(listener, request.getChannelVersion(), threadPool).delegateFailure((l, orig) -> {
661+
maybeWrapListenerForStackTrace(
662+
listener,
663+
request.getChannelVersion(),
664+
clusterService.localNode().getId(),
665+
request.shardId(),
666+
task.getId(),
667+
threadPool
668+
).delegateFailure((l, orig) -> {
636669
// check if we can shortcut the query phase entirely.
637670
if (orig.canReturnNullResponseIfMatchNoDocs()) {
638671
assert orig.scroll() == null;
@@ -830,9 +863,16 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, Cancella
830863
}
831864

832865
public void executeRankFeaturePhase(RankFeatureShardRequest request, SearchShardTask task, ActionListener<RankFeatureResult> listener) {
833-
listener = maybeWrapListenerForStackTrace(listener, request.getShardSearchRequest().getChannelVersion(), threadPool);
834866
final ReaderContext readerContext = findReaderContext(request.contextId(), request);
835867
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest());
868+
listener = maybeWrapListenerForStackTrace(
869+
listener,
870+
shardSearchRequest.getChannelVersion(),
871+
clusterService.localNode().getId(),
872+
shardSearchRequest.shardId(),
873+
task.getId(),
874+
threadPool
875+
);
836876
final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest));
837877
runAsync(getExecutor(readerContext.indexShard()), () -> {
838878
try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.RANK_FEATURE, false)) {
@@ -881,8 +921,15 @@ public void executeQueryPhase(
881921
ActionListener<ScrollQuerySearchResult> listener,
882922
TransportVersion version
883923
) {
884-
listener = maybeWrapListenerForStackTrace(listener, version, threadPool);
885924
final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId(), request);
925+
listener = maybeWrapListenerForStackTrace(
926+
listener,
927+
version,
928+
clusterService.localNode().getId(),
929+
readerContext.indexShard().shardId(),
930+
task.getId(),
931+
threadPool
932+
);
886933
final Releasable markAsUsed;
887934
try {
888935
markAsUsed = readerContext.markAsUsed(getScrollKeepAlive(request.scroll()));
@@ -930,9 +977,16 @@ public void executeQueryPhase(
930977
ActionListener<QuerySearchResult> listener,
931978
TransportVersion version
932979
) {
933-
listener = maybeWrapListenerForStackTrace(listener, version, threadPool);
934980
final ReaderContext readerContext = findReaderContext(request.contextId(), request.shardSearchRequest());
935981
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.shardSearchRequest());
982+
listener = maybeWrapListenerForStackTrace(
983+
listener,
984+
version,
985+
clusterService.localNode().getId(),
986+
shardSearchRequest.shardId(),
987+
task.getId(),
988+
threadPool
989+
);
936990
final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest));
937991
rewriteAndFetchShardRequest(readerContext.indexShard(), shardSearchRequest, listener.delegateFailure((l, rewritten) -> {
938992
// fork the execution in the search thread pool

0 commit comments

Comments
 (0)