Skip to content

Commit 0e1916e

Browse files
authored
Add unit tests for History Iterator in Replayer (#980)
* Add unit tests for History Iterator in Replayer
1 parent 2da5fe4 commit 0e1916e

File tree

4 files changed

+256
-33
lines changed

4 files changed

+256
-33
lines changed

src/main/java/com/uber/cadence/internal/replay/HistoryHelper.java

+2-8
Original file line numberDiff line numberDiff line change
@@ -231,14 +231,8 @@ public DecisionEvents next() {
231231
}
232232
decisionEvents.add(events.next());
233233
}
234-
DecisionEvents result =
235-
new DecisionEvents(
236-
newEvents,
237-
decisionEvents,
238-
replay,
239-
replayCurrentTimeMilliseconds,
240-
nextDecisionEventId);
241-
return result;
234+
return new DecisionEvents(
235+
newEvents, decisionEvents, replay, replayCurrentTimeMilliseconds, nextDecisionEventId);
242236
}
243237
}
244238

src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java

+13-23
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static com.uber.cadence.worker.NonDeterministicWorkflowPolicy.FailWorkflow;
2121

22+
import com.google.common.annotations.VisibleForTesting;
2223
import com.uber.cadence.EventType;
2324
import com.uber.cadence.GetWorkflowExecutionHistoryRequest;
2425
import com.uber.cadence.GetWorkflowExecutionHistoryResponse;
@@ -180,9 +181,9 @@ private void processEvent(HistoryEvent event) {
180181
context.handleChildWorkflowExecutionTimedOut(event);
181182
break;
182183
case DecisionTaskCompleted:
183-
// NOOP
184-
break;
185184
case DecisionTaskScheduled:
185+
case WorkflowExecutionTimedOut:
186+
case WorkflowExecutionTerminated:
186187
// NOOP
187188
break;
188189
case DecisionTaskStarted:
@@ -208,12 +209,6 @@ private void processEvent(HistoryEvent event) {
208209
case WorkflowExecutionStarted:
209210
handleWorkflowExecutionStarted(event);
210211
break;
211-
case WorkflowExecutionTerminated:
212-
// NOOP
213-
break;
214-
case WorkflowExecutionTimedOut:
215-
// NOOP
216-
break;
217212
case ActivityTaskScheduled:
218213
decisionsHelper.handleActivityTaskScheduled(event);
219214
break;
@@ -227,11 +222,8 @@ private void processEvent(HistoryEvent event) {
227222
context.handleMarkerRecorded(event);
228223
break;
229224
case WorkflowExecutionCompleted:
230-
break;
231225
case WorkflowExecutionFailed:
232-
break;
233226
case WorkflowExecutionCanceled:
234-
break;
235227
case WorkflowExecutionContinuedAsNew:
236228
break;
237229
case TimerStarted:
@@ -410,7 +402,7 @@ private Map<String, WorkflowQueryResult> getQueryResults(Map<String, WorkflowQue
410402
return queries
411403
.entrySet()
412404
.stream()
413-
.collect(Collectors.toMap(q -> q.getKey(), q -> queryWorkflow(q.getValue())));
405+
.collect(Collectors.toMap(Map.Entry::getKey, q -> queryWorkflow(q.getValue())));
414406
}
415407

416408
private WorkflowQueryResult queryWorkflow(WorkflowQuery query) {
@@ -632,9 +624,9 @@ private class DecisionTaskWithHistoryIteratorImpl implements DecisionTaskWithHis
632624
private final Duration retryServiceOperationInitialInterval = Duration.ofMillis(200);
633625
private final Duration retryServiceOperationMaxInterval = Duration.ofSeconds(4);
634626
private final Duration paginationStart = Duration.ofMillis(System.currentTimeMillis());
635-
private Duration decisionTaskStartToCloseTimeout;
627+
private final Duration decisionTaskStartToCloseTimeout;
636628

637-
private final Duration decisionTaskRemainingTime() {
629+
private Duration decisionTaskRemainingTime() {
638630
Duration passed = Duration.ofMillis(System.currentTimeMillis()).minus(paginationStart);
639631
return decisionTaskStartToCloseTimeout.minus(passed);
640632
}
@@ -643,6 +635,7 @@ private final Duration decisionTaskRemainingTime() {
643635
private Iterator<HistoryEvent> current;
644636
private byte[] nextPageToken;
645637

638+
@VisibleForTesting
646639
DecisionTaskWithHistoryIteratorImpl(
647640
PollForDecisionTaskResponse task, Duration decisionTaskStartToCloseTimeout) {
648641
this.task = Objects.requireNonNull(task);
@@ -692,7 +685,7 @@ public HistoryEvent next() {
692685
.setExpiration(decisionTaskRemainingTime)
693686
.setInitialInterval(retryServiceOperationInitialInterval)
694687
.setMaximumInterval(retryServiceOperationMaxInterval)
695-
.build();
688+
.validateBuildWithDefaults();
696689

697690
GetWorkflowExecutionHistoryRequest request = new GetWorkflowExecutionHistoryRequest();
698691
request
@@ -715,14 +708,11 @@ public HistoryEvent next() {
715708
}
716709
if (!current.hasNext()) {
717710
log.error(
718-
"GetWorkflowExecutionHistory returns an empty history, maybe a bug in server, workflowID:"
719-
+ request.execution.workflowId
720-
+ ", runID:"
721-
+ request.execution.runId
722-
+ ", domain:"
723-
+ request.domain
724-
+ " token:"
725-
+ Arrays.toString(request.getNextPageToken()));
711+
"GetWorkflowExecutionHistory returns an empty history, maybe a bug in server, workflowID:{}, runID:{}, domain:{} token:{}",
712+
request.execution.workflowId,
713+
request.execution.runId,
714+
request.domain,
715+
Arrays.toString(request.getNextPageToken()));
726716
throw new Error(
727717
"GetWorkflowExecutionHistory return empty history, maybe a bug in server");
728718
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.internal.replay;
19+
20+
import static org.junit.Assert.*;
21+
import static org.mockito.Mockito.*;
22+
23+
import com.uber.cadence.*;
24+
import com.uber.cadence.client.WorkflowClientOptions;
25+
import com.uber.cadence.internal.worker.SingleWorkerOptions;
26+
import com.uber.cadence.serviceclient.IWorkflowService;
27+
import java.lang.reflect.Constructor;
28+
import java.lang.reflect.InvocationTargetException;
29+
import java.lang.reflect.Method;
30+
import java.time.Duration;
31+
import java.util.*;
32+
import org.apache.thrift.TException;
33+
import org.junit.Before;
34+
import org.junit.Test;
35+
import org.mockito.Mock;
36+
import org.mockito.MockitoAnnotations;
37+
38+
public class ReplaceDeciderDecisionTaskWithHistoryIteratorTest {
39+
@Mock private IWorkflowService mockService;
40+
41+
@Mock private DecisionContextImpl mockContext;
42+
43+
@Mock private DecisionsHelper mockedHelper;
44+
45+
private static final int MAXIMUM_PAGE_SIZE = 10000;
46+
private final String WORKFLOW_ID = "testWorkflowId";
47+
private final String RUN_ID = "testRunId";
48+
private final String DOMAIN = "testDomain";
49+
private final String START_PAGE_TOKEN = "testPageToken";
50+
private final WorkflowExecution WORKFLOW_EXECUTION =
51+
new WorkflowExecution().setWorkflowId(WORKFLOW_ID).setRunId(RUN_ID);
52+
private final HistoryEvent START_EVENT =
53+
new HistoryEvent()
54+
.setWorkflowExecutionStartedEventAttributes(new WorkflowExecutionStartedEventAttributes())
55+
.setEventId(1);
56+
private final History HISTORY = new History().setEvents(Collections.singletonList(START_EVENT));
57+
private final PollForDecisionTaskResponse task =
58+
new PollForDecisionTaskResponse()
59+
.setWorkflowExecution(WORKFLOW_EXECUTION)
60+
.setHistory(HISTORY)
61+
.setNextPageToken(START_PAGE_TOKEN.getBytes());
62+
63+
private Object iterator;
64+
65+
private void setupDecisionTaskWithHistoryIteratorImpl() {
66+
try {
67+
// Find the inner class first
68+
Class<?> innerClass = findDecisionTaskWithHistoryIteratorImplClass();
69+
70+
// Get the constructor with the specific parameter types
71+
Constructor<?> constructor =
72+
innerClass.getDeclaredConstructor(
73+
ReplayDecider.class, PollForDecisionTaskResponse.class, Duration.class);
74+
75+
when(mockedHelper.getTask()).thenReturn(task);
76+
when(mockContext.getDomain()).thenReturn(DOMAIN);
77+
78+
// Create an instance of the outer class
79+
ReplayDecider outerInstance =
80+
new ReplayDecider(
81+
mockService,
82+
DOMAIN,
83+
new WorkflowType().setName("testWorkflow"),
84+
null,
85+
mockedHelper,
86+
SingleWorkerOptions.newBuilder()
87+
.setMetricsScope(WorkflowClientOptions.defaultInstance().getMetricsScope())
88+
.build(),
89+
null);
90+
91+
// Create the instance
92+
iterator = constructor.newInstance(outerInstance, task, Duration.ofSeconds(10));
93+
} catch (Exception e) {
94+
e.printStackTrace();
95+
throw new RuntimeException("Failed to set up test: " + e.getMessage(), e);
96+
}
97+
}
98+
99+
// Helper method to find the inner class
100+
private Class<?> findDecisionTaskWithHistoryIteratorImplClass() {
101+
for (Class<?> declaredClass : ReplayDecider.class.getDeclaredClasses()) {
102+
if (declaredClass.getSimpleName().equals("DecisionTaskWithHistoryIteratorImpl")) {
103+
return declaredClass;
104+
}
105+
}
106+
throw new RuntimeException("Could not find DecisionTaskWithHistoryIteratorImpl inner class");
107+
}
108+
109+
@Before
110+
public void setUp() {
111+
MockitoAnnotations.openMocks(this);
112+
setupDecisionTaskWithHistoryIteratorImpl();
113+
}
114+
115+
@Test
116+
public void testGetHistoryWithSinglePageOfEvents()
117+
throws TException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
118+
// Arrange
119+
List<HistoryEvent> events = Arrays.asList(createMockHistoryEvent(2), createMockHistoryEvent(3));
120+
History mockHistory = new History().setEvents(events);
121+
when(mockService.GetWorkflowExecutionHistory(
122+
new GetWorkflowExecutionHistoryRequest()
123+
.setDomain(DOMAIN)
124+
.setNextPageToken(START_PAGE_TOKEN.getBytes())
125+
.setExecution(WORKFLOW_EXECUTION)
126+
.setMaximumPageSize(MAXIMUM_PAGE_SIZE)))
127+
.thenReturn(new GetWorkflowExecutionHistoryResponse().setHistory(mockHistory));
128+
129+
// Act & Assert
130+
Method wrapperMethod = iterator.getClass().getMethod("getHistory");
131+
132+
Object result = wrapperMethod.invoke(iterator);
133+
Iterator<HistoryEvent> historyIterator = (Iterator<HistoryEvent>) result;
134+
assertTrue(historyIterator.hasNext());
135+
assertEquals(START_EVENT.getEventId(), historyIterator.next().getEventId());
136+
assertTrue(historyIterator.hasNext());
137+
assertEquals(events.get(0).getEventId(), historyIterator.next().getEventId());
138+
assertTrue(historyIterator.hasNext());
139+
assertEquals(events.get(1).getEventId(), historyIterator.next().getEventId());
140+
assertFalse(historyIterator.hasNext());
141+
}
142+
143+
@Test
144+
public void testGetHistoryWithMultiplePages()
145+
throws TException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
146+
// First page events
147+
List<HistoryEvent> firstPageEvents =
148+
Arrays.asList(createMockHistoryEvent(1), createMockHistoryEvent(2));
149+
History firstHistory = new History().setEvents(firstPageEvents);
150+
String firstPageToken = "firstPageToken";
151+
when(mockService.GetWorkflowExecutionHistory(
152+
eq(
153+
new GetWorkflowExecutionHistoryRequest()
154+
.setDomain(DOMAIN)
155+
.setNextPageToken(START_PAGE_TOKEN.getBytes())
156+
.setExecution(WORKFLOW_EXECUTION)
157+
.setMaximumPageSize(MAXIMUM_PAGE_SIZE))))
158+
.thenReturn(
159+
new GetWorkflowExecutionHistoryResponse()
160+
.setHistory(firstHistory)
161+
.setNextPageToken(firstPageToken.getBytes()));
162+
163+
// Second page events
164+
List<HistoryEvent> secondPageEvents =
165+
Arrays.asList(createMockHistoryEvent(3), createMockHistoryEvent(4));
166+
History secondHistory = new History().setEvents(secondPageEvents);
167+
when(mockService.GetWorkflowExecutionHistory(
168+
eq(
169+
new GetWorkflowExecutionHistoryRequest()
170+
.setDomain(DOMAIN)
171+
.setNextPageToken(firstPageToken.getBytes())
172+
.setExecution(WORKFLOW_EXECUTION)
173+
.setMaximumPageSize(MAXIMUM_PAGE_SIZE))))
174+
.thenReturn(new GetWorkflowExecutionHistoryResponse().setHistory(secondHistory));
175+
176+
// Act & Assert
177+
Method wrapperMethod = iterator.getClass().getMethod("getHistory");
178+
179+
Object result = wrapperMethod.invoke(iterator);
180+
Iterator<HistoryEvent> historyIterator = (Iterator<HistoryEvent>) result;
181+
// Check first page events
182+
assertEquals(START_EVENT.getEventId(), historyIterator.next().getEventId());
183+
assertEquals(firstPageEvents.get(0).getEventId(), historyIterator.next().getEventId());
184+
assertEquals(firstPageEvents.get(1).getEventId(), historyIterator.next().getEventId());
185+
186+
// Check second page events
187+
assertEquals(secondPageEvents.get(0).getEventId(), historyIterator.next().getEventId());
188+
assertEquals(secondPageEvents.get(1).getEventId(), historyIterator.next().getEventId());
189+
190+
assertFalse(historyIterator.hasNext());
191+
}
192+
193+
@Test(expected = Error.class)
194+
public void testGetHistoryFailure()
195+
throws InvocationTargetException, IllegalAccessException, NoSuchMethodException, TException {
196+
when(mockService.GetWorkflowExecutionHistory(
197+
new GetWorkflowExecutionHistoryRequest()
198+
.setDomain(DOMAIN)
199+
.setNextPageToken(START_PAGE_TOKEN.getBytes())
200+
.setExecution(WORKFLOW_EXECUTION)
201+
.setMaximumPageSize(MAXIMUM_PAGE_SIZE)))
202+
.thenThrow(new TException());
203+
204+
// Act & Assert
205+
Method wrapperMethod = iterator.getClass().getMethod("getHistory");
206+
207+
Object result = wrapperMethod.invoke(iterator);
208+
Iterator<HistoryEvent> historyIterator = (Iterator<HistoryEvent>) result;
209+
historyIterator.next();
210+
211+
historyIterator.next(); // This should throw an Error due to timeout
212+
}
213+
214+
@Test(expected = Error.class)
215+
public void testEmptyHistory()
216+
throws InvocationTargetException, IllegalAccessException, NoSuchMethodException, TException {
217+
when(mockService.GetWorkflowExecutionHistory(
218+
new GetWorkflowExecutionHistoryRequest()
219+
.setDomain(DOMAIN)
220+
.setNextPageToken(START_PAGE_TOKEN.getBytes())
221+
.setExecution(WORKFLOW_EXECUTION)
222+
.setMaximumPageSize(MAXIMUM_PAGE_SIZE)))
223+
.thenReturn(
224+
new GetWorkflowExecutionHistoryResponse()
225+
.setHistory(new History().setEvents(new ArrayList<>())));
226+
227+
// Act & Assert
228+
Method wrapperMethod = iterator.getClass().getMethod("getHistory");
229+
230+
Object result = wrapperMethod.invoke(iterator);
231+
Iterator<HistoryEvent> historyIterator = (Iterator<HistoryEvent>) result;
232+
historyIterator.next();
233+
234+
historyIterator.next(); // This should throw an Error due to timeout
235+
}
236+
237+
// Helper method to create mock HistoryEvent
238+
private HistoryEvent createMockHistoryEvent(int eventId) {
239+
return new HistoryEvent().setEventId(eventId);
240+
}
241+
}

src/test/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternalTest.java

-2
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,6 @@ public void testWorkflowServiceWrapperMethodDelegation() throws Exception {
126126
// Prepare test cases
127127
List<MethodTestCase> testCases = prepareMethodTestCases();
128128

129-
System.out.println(testCases);
130-
131129
// Test each method
132130
for (MethodTestCase testCase : testCases) {
133131
try {

0 commit comments

Comments
 (0)