Skip to content

Commit e6518a1

Browse files
authored
Merge pull request #41 from salaboy/weird-timers
CreateTimerChain was comparing the same timestamp for future validation
2 parents f72440d + 075f346 commit e6518a1

File tree

2 files changed

+181
-29
lines changed

2 files changed

+181
-29
lines changed

client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,10 @@ private class ContextImplTask implements TaskOrchestrationContext {
8888
private String appId;
8989

9090
// LinkedHashMap to maintain insertion order when returning the list of pending actions
91-
private final LinkedHashMap<Integer, OrchestratorAction> pendingActions = new LinkedHashMap<>();
92-
private final HashMap<Integer, TaskRecord<?>> openTasks = new HashMap<>();
93-
private final LinkedHashMap<String, Queue<TaskRecord<?>>> outstandingEvents = new LinkedHashMap<>();
94-
private final LinkedList<HistoryEvent> unprocessedEvents = new LinkedList<>();
91+
private final Map<Integer, OrchestratorAction> pendingActions = new LinkedHashMap<>();
92+
private final Map<Integer, TaskRecord<?>> openTasks = new HashMap<>();
93+
private final Map<String, Queue<TaskRecord<?>>> outstandingEvents = new LinkedHashMap<>();
94+
private final List<HistoryEvent> unprocessedEvents = new LinkedList<>();
9595
private final Queue<HistoryEvent> eventsWhileSuspended = new ArrayDeque<>();
9696
private final DataConverter dataConverter = TaskOrchestrationExecutor.this.dataConverter;
9797
private final Duration maximumTimerInterval = TaskOrchestrationExecutor.this.maximumTimerInterval;
@@ -303,12 +303,10 @@ public <V> Task<V> callActivity(
303303
}
304304
TaskFactory<V> taskFactory = () -> {
305305
int id = this.sequenceNumber++;
306-
307306
ScheduleTaskAction scheduleTaskAction = scheduleTaskBuilder.build();
308307
OrchestratorAction.Builder actionBuilder = OrchestratorAction.newBuilder()
309308
.setId(id)
310309
.setScheduleTask(scheduleTaskBuilder);
311-
312310
if (options != null && options.hasAppID()) {
313311
String targetAppId = options.getAppID();
314312
TaskRouter actionRouter = TaskRouter.newBuilder()
@@ -317,7 +315,6 @@ public <V> Task<V> callActivity(
317315
.build();
318316
actionBuilder.setRouter(actionRouter);
319317
}
320-
321318
this.pendingActions.put(id, actionBuilder.build());
322319

323320
if (!this.isReplaying) {
@@ -410,7 +407,7 @@ public <V> Task<V> callSubOrchestrator(
410407
if (input instanceof TaskOptions) {
411408
throw new IllegalArgumentException("TaskOptions cannot be used as an input. Did you call the wrong method overload?");
412409
}
413-
410+
414411
String serializedInput = this.dataConverter.serialize(input);
415412
CreateSubOrchestrationAction.Builder createSubOrchestrationActionBuilder = CreateSubOrchestrationAction.newBuilder().setName(name);
416413
if (serializedInput != null) {
@@ -466,7 +463,7 @@ public <V> Task<V> waitForExternalEvent(String name, Duration timeout, Class<V>
466463
int id = this.sequenceNumber++;
467464

468465
CompletableTask<V> eventTask = new ExternalEventTask<>(name, id, timeout);
469-
466+
470467
// Check for a previously received event with the same name
471468
for (HistoryEvent e : this.unprocessedEvents) {
472469
EventRaisedEvent existing = e.getEventRaised();
@@ -648,8 +645,7 @@ public Task<Void> createTimer(ZonedDateTime zonedDateTime) {
648645
}
649646

650647
private Task<Void> createTimer(Instant finalFireAt) {
651-
TimerTask timer = new TimerTask(finalFireAt);
652-
return timer;
648+
return new TimerTask(finalFireAt);
653649
}
654650

655651
private CompletableTask<Void> createInstantTimer(int id, Instant fireAt) {
@@ -660,7 +656,7 @@ private CompletableTask<Void> createInstantTimer(int id, Instant fireAt) {
660656
.build());
661657

662658
if (!this.isReplaying) {
663-
// TODO: Log timer creation, including the expected fire-time
659+
logger.finer(() -> String.format("Creating Instant Timer with id: %s, fireAt: %s ", id, fireAt));
664660
}
665661

666662
CompletableTask<Void> timerTask = new CompletableTask<>();
@@ -701,7 +697,10 @@ public void handleTimerFired(HistoryEvent e) {
701697
}
702698

703699
if (!this.isReplaying) {
704-
// TODO: Log timer fired, including the scheduled fire-time
700+
this.logger.finer(() ->
701+
String.format("Firing timer by completing task: %s expected fire at time: %s", timerEventId,
702+
Instant.ofEpochSecond(timerFiredEvent.getFireAt().getSeconds(),
703+
timerFiredEvent.getFireAt().getNanos())));
705704
}
706705

707706
CompletableTask<?> task = record.getTask();
@@ -851,7 +850,7 @@ private void addCarryoverEvents(CompleteOrchestrationAction.Builder builder) {
851850

852851
externalEvents.forEach(builder::addCarryoverEvents);
853852
}
854-
853+
855854
private boolean waitingForEvents() {
856855
return this.outstandingEvents.size() > 0;
857856
}
@@ -894,7 +893,7 @@ private void processEvent(HistoryEvent e) {
894893
if (factory == null) {
895894
throw new IllegalStateException("No factory found for orchestrator: " + executionStarted.getName());
896895
}
897-
896+
898897
TaskOrchestration orchestrator = factory.create();
899898
orchestrator.run(this);
900899
break;
@@ -1038,11 +1037,12 @@ public TimerTask(Instant finalFireAt) {
10381037
// if necessary. Otherwise, we return and no more sub-timers are created.
10391038
private CompletableFuture<Void> createTimerChain(Instant finalFireAt, CompletableFuture<Void> currentFuture) {
10401039
return currentFuture.thenRun(() -> {
1041-
if (currentInstant.compareTo(finalFireAt) > 0) {
1040+
Instant currentInstsanceMinusNanos = currentInstant.minusNanos(currentInstant.getNano());
1041+
Instant finalFireAtMinusNanos = finalFireAt.minusNanos(finalFireAt.getNano());
1042+
if (currentInstsanceMinusNanos.compareTo(finalFireAtMinusNanos) >= 0) {
10421043
return;
10431044
}
10441045
Task<Void> nextTimer = createTimerTask(finalFireAt);
1045-
10461046
createTimerChain(finalFireAt, nextTimer.future);
10471047
});
10481048
}
@@ -1062,7 +1062,9 @@ private CompletableTask<Void> createTimerTask(Instant finalFireAt) {
10621062

10631063
private void handleSubTimerSuccess() {
10641064
// check if it is the last timer
1065-
if (currentInstant.compareTo(finalFireAt) >= 0) {
1065+
Instant currentInstantMinusNanos = currentInstant.minusNanos(currentInstant.getNano());
1066+
Instant finalFireAtMinusNanos = finalFireAt.minusNanos(finalFireAt.getNano());
1067+
if (currentInstantMinusNanos.compareTo(finalFireAtMinusNanos) >= 0) {
10661068
this.complete(null);
10671069
}
10681070
}

client/src/test/java/io/dapr/durabletask/IntegrationTests.java

Lines changed: 161 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,14 @@ void emptyOrchestration() throws TimeoutException {
8787
void singleTimer() throws IOException, TimeoutException {
8888
final String orchestratorName = "SingleTimer";
8989
final Duration delay = Duration.ofSeconds(3);
90+
AtomicReferenceArray<LocalDateTime> timestamps = new AtomicReferenceArray<>(2);
91+
AtomicInteger counter = new AtomicInteger();
9092
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
91-
.addOrchestrator(orchestratorName, ctx -> ctx.createTimer(delay).await())
93+
.addOrchestrator(orchestratorName, ctx -> {
94+
timestamps.set(counter.get(), LocalDateTime.now());
95+
counter.incrementAndGet();
96+
ctx.createTimer(delay).await();
97+
})
9298
.buildAndStart();
9399

94100
DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
@@ -103,11 +109,130 @@ void singleTimer() throws IOException, TimeoutException {
103109
long expectedCompletionSecond = instance.getCreatedAt().plus(delay).getEpochSecond();
104110
long actualCompletionSecond = instance.getLastUpdatedAt().getEpochSecond();
105111
assertTrue(expectedCompletionSecond <= actualCompletionSecond);
112+
113+
// Verify that the correct number of timers were created
114+
// This should yield 2 (first invocation + replay invocations for internal timers)
115+
assertEquals(2, counter.get());
116+
117+
// Verify that each timer is the expected length
118+
int[] secondsElapsed = new int[1];
119+
for (int i = 0; i < timestamps.length() - 1; i++) {
120+
secondsElapsed[i] = timestamps.get(i + 1).getSecond() - timestamps.get(i).getSecond();
121+
}
122+
assertEquals(3, secondsElapsed[0]);
123+
124+
}
125+
}
126+
127+
128+
@Test
129+
void loopWithTimer() throws IOException, TimeoutException {
130+
final String orchestratorName = "LoopWithTimer";
131+
final Duration delay = Duration.ofSeconds(2);
132+
AtomicReferenceArray<LocalDateTime> timestamps = new AtomicReferenceArray<>(4);
133+
AtomicInteger counter = new AtomicInteger();
134+
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
135+
.addOrchestrator(orchestratorName, ctx -> {
136+
for(int i = 0 ; i < 3; i++) {
137+
if(!ctx.getIsReplaying()) {
138+
timestamps.set(counter.get(), LocalDateTime.now());
139+
counter.incrementAndGet();
140+
}
141+
ctx.createTimer(delay).await();
142+
}
143+
})
144+
.buildAndStart();
145+
146+
DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
147+
try (worker; client) {
148+
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName);
149+
Duration timeout = delay.plus(defaultTimeout);
150+
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, timeout, false);
151+
assertNotNull(instance);
152+
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
153+
154+
// Verify that the delay actually happened
155+
long expectedCompletionSecond = instance.getCreatedAt().plus(delay).getEpochSecond();
156+
long actualCompletionSecond = instance.getLastUpdatedAt().getEpochSecond();
157+
assertTrue(expectedCompletionSecond <= actualCompletionSecond);
158+
159+
// Verify that the correct number of timers were created
160+
assertEquals(3, counter.get());
161+
162+
// Verify that each timer is the expected length
163+
int[] secondsElapsed = new int[timestamps.length()];
164+
for (int i = 0; i < timestamps.length() - 1; i++) {
165+
if(timestamps.get(i + 1) != null && timestamps.get(i) != null ) {
166+
secondsElapsed[i] = timestamps.get(i + 1).getSecond() - timestamps.get(i).getSecond();
167+
}else{
168+
secondsElapsed[i] = -1;
169+
}
170+
}
171+
assertEquals(2, secondsElapsed[0]);
172+
assertEquals(2, secondsElapsed[1]);
173+
assertEquals(-1, secondsElapsed[2]);
174+
175+
176+
}
177+
}
178+
179+
@Test
180+
void loopWithWaitForEvent() throws IOException, TimeoutException {
181+
final String orchestratorName = "LoopWithTimer";
182+
final Duration delay = Duration.ofSeconds(2);
183+
AtomicReferenceArray<LocalDateTime> timestamps = new AtomicReferenceArray<>(4);
184+
AtomicInteger counter = new AtomicInteger();
185+
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
186+
.addOrchestrator(orchestratorName, ctx -> {
187+
for(int i = 0 ; i < 4; i++) {
188+
try{
189+
ctx.waitForExternalEvent("HELLO", delay).await();
190+
}catch(TaskCanceledException tce ){
191+
if(!ctx.getIsReplaying()){
192+
timestamps.set(counter.get(), LocalDateTime.now());
193+
counter.incrementAndGet();
194+
}
195+
196+
}
197+
}
198+
})
199+
.buildAndStart();
200+
201+
DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
202+
try (worker; client) {
203+
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName);
204+
Duration timeout = delay.plus(defaultTimeout);
205+
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, timeout, false);
206+
assertNotNull(instance);
207+
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
208+
209+
// Verify that the delay actually happened
210+
long expectedCompletionSecond = instance.getCreatedAt().plus(delay).getEpochSecond();
211+
long actualCompletionSecond = instance.getLastUpdatedAt().getEpochSecond();
212+
assertTrue(expectedCompletionSecond <= actualCompletionSecond);
213+
214+
// Verify that the correct number of timers were created
215+
assertEquals(4, counter.get());
216+
217+
// Verify that each timer is the expected length
218+
int[] secondsElapsed = new int[timestamps.length()];
219+
for (int i = 0; i < timestamps.length() - 1; i++) {
220+
if(timestamps.get(i + 1) != null && timestamps.get(i) != null ) {
221+
secondsElapsed[i] = timestamps.get(i + 1).getSecond() - timestamps.get(i).getSecond();
222+
}else{
223+
secondsElapsed[i] = -1;
224+
}
225+
}
226+
assertEquals(2, secondsElapsed[0]);
227+
assertEquals(2, secondsElapsed[1]);
228+
assertEquals(2, secondsElapsed[2]);
229+
assertEquals(0, secondsElapsed[3]);
230+
231+
106232
}
107233
}
108234

109235
@Test
110-
@Disabled("Test is disabled for investigation, fixing the test retry pattern exposed the failure (could be timer creation issue)")
111236
void longTimer() throws TimeoutException {
112237
final String orchestratorName = "LongTimer";
113238
final Duration delay = Duration.ofSeconds(7);
@@ -128,7 +253,7 @@ void longTimer() throws TimeoutException {
128253
Duration timeout = delay.plus(defaultTimeout);
129254
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, timeout, false);
130255
assertNotNull(instance);
131-
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus(),
256+
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus(),
132257
String.format("Orchestration failed with error: %s", instance.getFailureDetails().getErrorMessage()));
133258

134259
// Verify that the delay actually happened
@@ -285,6 +410,31 @@ void singleTimeStampTimer() throws IOException, TimeoutException {
285410
}
286411
}
287412

413+
414+
@Test
415+
void singleTimeStampCreateTimer() throws IOException, TimeoutException {
416+
final String orchestratorName = "SingleTimeStampTimer";
417+
final Duration delay = Duration.ofSeconds(3);
418+
final ZonedDateTime zonedDateTime = ZonedDateTime.of(LocalDateTime.now().plusSeconds(delay.getSeconds()), ZoneId.systemDefault());
419+
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
420+
.addOrchestrator(orchestratorName, ctx -> ctx.createTimer(zonedDateTime).await())
421+
.buildAndStart();
422+
423+
DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
424+
try (worker; client) {
425+
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName);
426+
Duration timeout = delay.plus(defaultTimeout);
427+
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, timeout, false);
428+
assertNotNull(instance);
429+
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
430+
431+
// Verify that the delay actually happened
432+
long expectedCompletionSecond = zonedDateTime.toInstant().getEpochSecond();
433+
long actualCompletionSecond = instance.getLastUpdatedAt().getEpochSecond();
434+
assertTrue(expectedCompletionSecond <= actualCompletionSecond);
435+
}
436+
}
437+
288438
@Test
289439
void isReplaying() throws IOException, InterruptedException, TimeoutException {
290440
final String orchestratorName = "SingleTimer";
@@ -884,13 +1034,13 @@ void multiInstanceQuery() throws TimeoutException{
8841034
// Test CreatedTimeTo filter
8851035
query.setCreatedTimeTo(startTime.minus(Duration.ofSeconds(1)));
8861036
result = client.queryInstances(query);
887-
assertTrue(result.getOrchestrationState().isEmpty(),
888-
"Result should be empty but found " + result.getOrchestrationState().size() + " instances: " +
1037+
assertTrue(result.getOrchestrationState().isEmpty(),
1038+
"Result should be empty but found " + result.getOrchestrationState().size() + " instances: " +
8891039
"Start time: " + startTime + ", " +
8901040
result.getOrchestrationState().stream()
891-
.map(state -> String.format("\nID: %s, Status: %s, Created: %s",
892-
state.getInstanceId(),
893-
state.getRuntimeStatus(),
1041+
.map(state -> String.format("\nID: %s, Status: %s, Created: %s",
1042+
state.getInstanceId(),
1043+
state.getRuntimeStatus(),
8941044
state.getCreatedAt()))
8951045
.collect(Collectors.joining(", ")));
8961046

@@ -1203,7 +1353,7 @@ void waitForInstanceStartThrowsException() {
12031353
client.scheduleNewOrchestrationInstance(orchestratorName, null, instanceId);
12041354
});
12051355
thread.start();
1206-
1356+
12071357
assertThrows(TimeoutException.class, () -> client.waitForInstanceStart(instanceId, Duration.ofSeconds(2)) );
12081358
}
12091359
}
@@ -1591,8 +1741,8 @@ public void taskExecutionIdTest() {
15911741

15921742
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
15931743
.addOrchestrator(orchestratorName, ctx -> {
1594-
ctx.callActivity(retryActivityName,null,taskOptions).await();
1595-
ctx.callActivity(retryActivityName,null,taskOptions).await();
1744+
ctx.callActivity(retryActivityName,null,taskOptions).await();
1745+
ctx.callActivity(retryActivityName,null,taskOptions).await();
15961746
ctx.complete(true);
15971747
})
15981748
.addActivity(retryActivityName, ctx -> {

0 commit comments

Comments
 (0)