Skip to content

Commit 6444df3

Browse files
committed
Handling message too large error when sending query response. Various other changes.
1 parent 44b1e90 commit 6444df3

File tree

7 files changed

+268
-64
lines changed

7 files changed

+268
-64
lines changed

temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java

Lines changed: 53 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99
import com.uber.m3.tally.Scope;
1010
import com.uber.m3.tally.Stopwatch;
1111
import com.uber.m3.util.ImmutableMap;
12+
import io.grpc.StatusRuntimeException;
1213
import io.temporal.api.common.v1.WorkflowExecution;
14+
import io.temporal.api.enums.v1.QueryResultType;
1315
import io.temporal.api.enums.v1.TaskQueueKind;
1416
import io.temporal.api.enums.v1.WorkflowTaskFailedCause;
1517
import io.temporal.api.failure.v1.Failure;
@@ -405,8 +407,30 @@ public void handle(WorkflowTask task) throws Exception {
405407
RespondQueryTaskCompletedRequest queryCompleted = result.getQueryCompleted();
406408

407409
if (queryCompleted != null) {
408-
sendDirectQueryCompletedResponse(
409-
currentTask.getTaskToken(), queryCompleted.toBuilder(), workflowTypeScope);
410+
try {
411+
sendDirectQueryCompletedResponse(
412+
currentTask.getTaskToken(), queryCompleted.toBuilder(), workflowTypeScope);
413+
} catch (StatusRuntimeException e) {
414+
GrpcMessageTooLargeException tooLargeException =
415+
GrpcMessageTooLargeException.tryWrap(e);
416+
if (tooLargeException == null) {
417+
throw e;
418+
}
419+
Failure failure =
420+
grpcMessageTooLargeFailure(
421+
workflowExecution.getWorkflowId(),
422+
tooLargeException,
423+
"Failed to send query response");
424+
RespondQueryTaskCompletedRequest.Builder queryFailedBuilder =
425+
RespondQueryTaskCompletedRequest.newBuilder()
426+
.setTaskToken(currentTask.getTaskToken())
427+
.setNamespace(namespace)
428+
.setCompletedType(QueryResultType.QUERY_RESULT_TYPE_FAILED)
429+
.setErrorMessage(failure.getMessage())
430+
.setFailure(failure);
431+
sendDirectQueryCompletedResponse(
432+
currentTask.getTaskToken(), queryFailedBuilder, workflowTypeScope);
433+
}
410434
} else {
411435
try {
412436
if (taskCompleted != null) {
@@ -443,33 +467,28 @@ public void handle(WorkflowTask task) throws Exception {
443467
workflowTypeScope);
444468
}
445469
} catch (GrpcMessageTooLargeException e) {
470+
// Only fail workflow task on the first attempt, subsequent failures of the same
471+
// workflow task should timeout.
472+
if (currentTask.getAttempt() > 1) {
473+
throw e;
474+
}
475+
446476
releaseReason = SlotReleaseReason.error(e);
447477
handleReportingFailure(
448478
e, currentTask, result, workflowExecution, workflowTypeScope);
449-
// replacing failure cause for metrics purposes
479+
// setting/replacing failure cause for metrics purposes
450480
taskFailedCause =
451481
WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_GRPC_MESSAGE_TOO_LARGE;
452482

453-
String message =
483+
String messagePrefix =
454484
String.format(
455-
"Failed to send workflow task %s: %s",
456-
taskFailed == null ? "completion" : "failure", e.getMessage());
457-
ApplicationFailure applicationFailure =
458-
ApplicationFailure.newBuilder()
459-
.setMessage(message)
460-
.setType("GrpcMessageTooLargeException")
461-
.setNonRetryable(true)
462-
.build();
463-
Failure failure =
464-
options
465-
.getDataConverter()
466-
.withContext(
467-
new WorkflowSerializationContext(
468-
namespace, workflowExecution.getWorkflowId()))
469-
.exceptionToFailure(applicationFailure);
485+
"Failed to send workflow task %s",
486+
taskFailed == null ? "completion" : "failure");
470487
RespondWorkflowTaskFailedRequest.Builder taskFailedBuilder =
471488
RespondWorkflowTaskFailedRequest.newBuilder()
472-
.setFailure(failure)
489+
.setFailure(
490+
grpcMessageTooLargeFailure(
491+
workflowExecution.getWorkflowId(), e, messagePrefix))
473492
.setCause(
474493
WorkflowTaskFailedCause
475494
.WORKFLOW_TASK_FAILED_CAUSE_GRPC_MESSAGE_TOO_LARGE);
@@ -671,5 +690,19 @@ private void handleReportingFailure(
671690
cache.invalidate(
672691
workflowExecution, workflowTypeScope, "Failed result reporting to the server", e);
673692
}
693+
694+
private Failure grpcMessageTooLargeFailure(
695+
String workflowId, GrpcMessageTooLargeException e, String messagePrefix) {
696+
ApplicationFailure applicationFailure =
697+
ApplicationFailure.newBuilder()
698+
.setMessage(messagePrefix + ": " + e.getMessage())
699+
.setType(GrpcMessageTooLargeException.class.getSimpleName())
700+
.build();
701+
applicationFailure.setStackTrace(new StackTraceElement[0]); // don't serialize stack trace
702+
return options
703+
.getDataConverter()
704+
.withContext(new WorkflowSerializationContext(namespace, workflowId))
705+
.exceptionToFailure(applicationFailure);
706+
}
674707
}
675708
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package io.temporal.testUtils;
2+
3+
import ch.qos.logback.classic.Level;
4+
import ch.qos.logback.classic.Logger;
5+
import java.util.ArrayList;
6+
import java.util.Arrays;
7+
import java.util.List;
8+
import java.util.stream.Collectors;
9+
import org.slf4j.LoggerFactory;
10+
11+
public class LoggerUtils {
12+
public static SilenceLoggers silenceLoggers(Class<?>... classes) {
13+
return new SilenceLoggers(classes);
14+
}
15+
16+
public static class SilenceLoggers implements AutoCloseable {
17+
private final List<Logger> loggers;
18+
List<Level> oldLogLevels;
19+
20+
public SilenceLoggers(Class<?>... classes) {
21+
loggers =
22+
Arrays.stream(classes)
23+
.map(LoggerFactory::getLogger)
24+
.filter(Logger.class::isInstance)
25+
.map(Logger.class::cast)
26+
.collect(Collectors.toList());
27+
oldLogLevels = new ArrayList<>();
28+
for (Logger logger : loggers) {
29+
oldLogLevels.add(logger.getLevel());
30+
logger.setLevel(Level.OFF);
31+
}
32+
}
33+
34+
@Override
35+
public void close() {
36+
for (int i = 0; i < loggers.size(); i++) {
37+
loggers.get(i).setLevel(oldLogLevels.get(i));
38+
}
39+
}
40+
}
41+
}

temporal-sdk/src/test/java/io/temporal/workflow/GrpcMessageTooLargeTest.java

Lines changed: 139 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,23 @@
77
import io.temporal.api.enums.v1.EventType;
88
import io.temporal.api.enums.v1.WorkflowTaskFailedCause;
99
import io.temporal.api.history.v1.HistoryEvent;
10-
import io.temporal.client.WorkflowClient;
11-
import io.temporal.client.WorkflowFailedException;
12-
import io.temporal.client.WorkflowOptions;
13-
import io.temporal.client.WorkflowServiceException;
10+
import io.temporal.client.*;
1411
import io.temporal.failure.ApplicationFailure;
1512
import io.temporal.failure.TimeoutFailure;
13+
import io.temporal.internal.replay.ReplayWorkflowTaskHandler;
1614
import io.temporal.internal.retryer.GrpcMessageTooLargeException;
15+
import io.temporal.internal.worker.PollerOptions;
16+
import io.temporal.testUtils.LoggerUtils;
1717
import io.temporal.testing.internal.SDKTestWorkflowRule;
1818
import io.temporal.workflow.shared.TestActivities;
19-
import io.temporal.workflow.shared.TestWorkflows;
2019
import java.time.Duration;
2120
import java.util.List;
2221
import org.junit.Rule;
2322
import org.junit.Test;
2423

2524
public class GrpcMessageTooLargeTest {
25+
private static final String QUERY_ERROR_MESSAGE =
26+
"Failed to send query response: RESOURCE_EXHAUSTED: grpc: received message larger than max";
2627
private static final String VERY_LARGE_DATA;
2728

2829
static {
@@ -36,16 +37,27 @@ public class GrpcMessageTooLargeTest {
3637
}
3738

3839
@Rule
39-
public SDKTestWorkflowRule testWorkflowRule =
40+
public SDKTestWorkflowRule activityStartWorkflowRule =
4041
SDKTestWorkflowRule.newBuilder()
41-
.setWorkflowTypes(TestWorkflowImpl.class)
42+
.setWorkflowTypes(ActivityStartWorkflowImpl.class)
4243
.setActivityImplementations(new TestActivityImpl())
4344
.build();
4445

46+
@Rule
47+
public SDKTestWorkflowRule failureWorkflowRule =
48+
SDKTestWorkflowRule.newBuilder().setWorkflowTypes(FailureWorkflowImpl.class).build();
49+
50+
@Rule
51+
public SDKTestWorkflowRule querySuccessWorkflowRule =
52+
SDKTestWorkflowRule.newBuilder().setWorkflowTypes(QuerySuccessWorkflowImpl.class).build();
53+
54+
@Rule
55+
public SDKTestWorkflowRule queryFailureWorkflowRule =
56+
SDKTestWorkflowRule.newBuilder().setWorkflowTypes(QueryFailureWorkflowImpl.class).build();
57+
4558
@Test
4659
public void workflowStartTooLarge() {
47-
TestWorkflows.TestWorkflowStringArg workflow =
48-
testWorkflowRule.newWorkflowStub(TestWorkflows.TestWorkflowStringArg.class);
60+
TestWorkflow workflow = createWorkflowStub(TestWorkflow.class, activityStartWorkflowRule);
4961
WorkflowServiceException e =
5062
assertThrows(
5163
WorkflowServiceException.class,
@@ -55,40 +67,137 @@ public void workflowStartTooLarge() {
5567

5668
@Test
5769
public void activityStartTooLarge() {
58-
WorkflowOptions options =
59-
WorkflowOptions.newBuilder()
60-
.setWorkflowRunTimeout(Duration.ofSeconds(1))
61-
.setTaskQueue(testWorkflowRule.getTaskQueue())
62-
.build();
63-
TestWorkflows.TestWorkflowStringArg workflow =
64-
testWorkflowRule
65-
.getWorkflowClient()
66-
.newWorkflowStub(TestWorkflows.TestWorkflowStringArg.class, options);
70+
TestWorkflow workflow = createWorkflowStub(TestWorkflow.class, activityStartWorkflowRule);
6771

6872
WorkflowFailedException e =
6973
assertThrows(WorkflowFailedException.class, () -> workflow.execute(""));
7074
assertTrue(e.getCause() instanceof TimeoutFailure);
7175

76+
String workflowId = WorkflowStub.fromTyped(workflow).getExecution().getWorkflowId();
77+
assertTrue(
78+
activityStartWorkflowRule
79+
.getHistoryEvents(workflowId, EventType.EVENT_TYPE_ACTIVITY_TASK_FAILED)
80+
.isEmpty());
7281
List<HistoryEvent> events =
73-
testWorkflowRule.getHistoryEvents(
74-
e.getExecution().getWorkflowId(), EventType.EVENT_TYPE_WORKFLOW_TASK_FAILED);
75-
assertFalse(events.isEmpty());
76-
for (HistoryEvent event : events) {
82+
activityStartWorkflowRule.getHistoryEvents(
83+
workflowId, EventType.EVENT_TYPE_WORKFLOW_TASK_FAILED);
84+
assertEquals(1, events.size());
85+
assertEquals(
86+
WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_GRPC_MESSAGE_TOO_LARGE,
87+
events.get(0).getWorkflowTaskFailedEventAttributes().getCause());
88+
}
89+
90+
@Test
91+
public void workflowFailureTooLarge() {
92+
// Avoding logging exception with very large data
93+
try (LoggerUtils.SilenceLoggers sl =
94+
LoggerUtils.silenceLoggers(ReplayWorkflowTaskHandler.class, PollerOptions.class)) {
95+
TestWorkflow workflow = createWorkflowStub(TestWorkflow.class, failureWorkflowRule);
96+
97+
WorkflowFailedException e =
98+
assertThrows(WorkflowFailedException.class, () -> workflow.execute(""));
99+
100+
assertTrue(e.getCause() instanceof TimeoutFailure);
101+
String workflowId = WorkflowStub.fromTyped(workflow).getExecution().getWorkflowId();
102+
List<HistoryEvent> events =
103+
failureWorkflowRule.getHistoryEvents(
104+
workflowId, EventType.EVENT_TYPE_WORKFLOW_TASK_FAILED);
105+
assertEquals(1, events.size());
77106
assertEquals(
78107
WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_GRPC_MESSAGE_TOO_LARGE,
79-
event.getWorkflowTaskFailedEventAttributes().getCause());
108+
events.get(0).getWorkflowTaskFailedEventAttributes().getCause());
109+
}
110+
}
111+
112+
@Test
113+
public void queryResultTooLarge() {
114+
TestWorkflowWithQuery workflow =
115+
createWorkflowStub(TestWorkflowWithQuery.class, querySuccessWorkflowRule);
116+
workflow.execute();
117+
118+
WorkflowQueryException e = assertThrows(WorkflowQueryException.class, workflow::query);
119+
120+
assertNotNull(e.getCause());
121+
// The exception will not contain the original failure object, so instead of type check we're
122+
// checking the message to ensure the correct error is being sent.
123+
assertTrue(e.getCause().getMessage().contains(QUERY_ERROR_MESSAGE));
124+
}
125+
126+
@Test
127+
public void queryErrorTooLarge() {
128+
TestWorkflowWithQuery workflow =
129+
createWorkflowStub(TestWorkflowWithQuery.class, queryFailureWorkflowRule);
130+
workflow.execute();
131+
132+
WorkflowQueryException e = assertThrows(WorkflowQueryException.class, workflow::query);
133+
134+
assertNotNull(e.getCause());
135+
assertTrue(e.getCause().getMessage().contains(QUERY_ERROR_MESSAGE));
136+
}
137+
138+
private static <T> T createWorkflowStub(Class<T> clazz, SDKTestWorkflowRule workflowRule) {
139+
WorkflowOptions options =
140+
WorkflowOptions.newBuilder()
141+
.setWorkflowRunTimeout(Duration.ofSeconds(1))
142+
.setWorkflowTaskTimeout(Duration.ofMillis(250))
143+
.setTaskQueue(workflowRule.getTaskQueue())
144+
.build();
145+
return workflowRule.getWorkflowClient().newWorkflowStub(clazz, options);
146+
}
147+
148+
@WorkflowInterface
149+
public interface TestWorkflow {
150+
@WorkflowMethod
151+
void execute(String arg);
152+
}
153+
154+
@WorkflowInterface
155+
public interface TestWorkflowWithQuery {
156+
@WorkflowMethod
157+
void execute();
158+
159+
@QueryMethod
160+
String query();
161+
}
162+
163+
public static class ActivityStartWorkflowImpl implements TestWorkflow {
164+
private final TestActivities.TestActivity1 activity =
165+
Workflow.newActivityStub(
166+
TestActivities.TestActivity1.class,
167+
ActivityOptions.newBuilder()
168+
.setStartToCloseTimeout(Duration.ofSeconds(1))
169+
.validateAndBuildWithDefaults());
170+
171+
@Override
172+
public void execute(String arg) {
173+
activity.execute(VERY_LARGE_DATA);
80174
}
81175
}
82176

83-
public static class TestWorkflowImpl implements TestWorkflows.TestWorkflowStringArg {
177+
public static class FailureWorkflowImpl implements TestWorkflow {
84178
@Override
85179
public void execute(String arg) {
86-
Workflow.newActivityStub(
87-
TestActivities.TestActivity1.class,
88-
ActivityOptions.newBuilder()
89-
.setStartToCloseTimeout(Duration.ofSeconds(1))
90-
.validateAndBuildWithDefaults())
91-
.execute(VERY_LARGE_DATA);
180+
throw new RuntimeException(VERY_LARGE_DATA);
181+
}
182+
}
183+
184+
public static class QuerySuccessWorkflowImpl implements TestWorkflowWithQuery {
185+
@Override
186+
public void execute() {}
187+
188+
@Override
189+
public String query() {
190+
return VERY_LARGE_DATA;
191+
}
192+
}
193+
194+
public static class QueryFailureWorkflowImpl implements TestWorkflowWithQuery {
195+
@Override
196+
public void execute() {}
197+
198+
@Override
199+
public String query() {
200+
throw new RuntimeException(VERY_LARGE_DATA);
92201
}
93202
}
94203

0 commit comments

Comments
 (0)