Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ private class ContextImplTask implements TaskOrchestrationContext {
private String appId;

// LinkedHashMap to maintain insertion order when returning the list of pending actions
private final LinkedHashMap<Integer, OrchestratorAction> pendingActions = new LinkedHashMap<>();
private final HashMap<Integer, TaskRecord<?>> openTasks = new HashMap<>();
private final LinkedHashMap<String, Queue<TaskRecord<?>>> outstandingEvents = new LinkedHashMap<>();
private final LinkedList<HistoryEvent> unprocessedEvents = new LinkedList<>();
private final Map<Integer, OrchestratorAction> pendingActions = new LinkedHashMap<>();
private final Map<Integer, TaskRecord<?>> openTasks = new HashMap<>();
private final Map<String, Queue<TaskRecord<?>>> outstandingEvents = new LinkedHashMap<>();
private final List<HistoryEvent> unprocessedEvents = new LinkedList<>();
private final Queue<HistoryEvent> eventsWhileSuspended = new ArrayDeque<>();
private final DataConverter dataConverter = TaskOrchestrationExecutor.this.dataConverter;
private final Duration maximumTimerInterval = TaskOrchestrationExecutor.this.maximumTimerInterval;
Expand Down Expand Up @@ -303,12 +303,10 @@ public <V> Task<V> callActivity(
}
TaskFactory<V> taskFactory = () -> {
int id = this.sequenceNumber++;

ScheduleTaskAction scheduleTaskAction = scheduleTaskBuilder.build();
OrchestratorAction.Builder actionBuilder = OrchestratorAction.newBuilder()
.setId(id)
.setScheduleTask(scheduleTaskBuilder);

if (options != null && options.hasAppID()) {
String targetAppId = options.getAppID();
TaskRouter actionRouter = TaskRouter.newBuilder()
Expand All @@ -317,7 +315,6 @@ public <V> Task<V> callActivity(
.build();
actionBuilder.setRouter(actionRouter);
}

this.pendingActions.put(id, actionBuilder.build());

if (!this.isReplaying) {
Expand Down Expand Up @@ -410,7 +407,7 @@ public <V> Task<V> callSubOrchestrator(
if (input instanceof TaskOptions) {
throw new IllegalArgumentException("TaskOptions cannot be used as an input. Did you call the wrong method overload?");
}

String serializedInput = this.dataConverter.serialize(input);
CreateSubOrchestrationAction.Builder createSubOrchestrationActionBuilder = CreateSubOrchestrationAction.newBuilder().setName(name);
if (serializedInput != null) {
Expand Down Expand Up @@ -466,7 +463,7 @@ public <V> Task<V> waitForExternalEvent(String name, Duration timeout, Class<V>
int id = this.sequenceNumber++;

CompletableTask<V> eventTask = new ExternalEventTask<>(name, id, timeout);

// Check for a previously received event with the same name
for (HistoryEvent e : this.unprocessedEvents) {
EventRaisedEvent existing = e.getEventRaised();
Expand Down Expand Up @@ -648,8 +645,7 @@ public Task<Void> createTimer(ZonedDateTime zonedDateTime) {
}

private Task<Void> createTimer(Instant finalFireAt) {
TimerTask timer = new TimerTask(finalFireAt);
return timer;
return new TimerTask(finalFireAt);
}

private CompletableTask<Void> createInstantTimer(int id, Instant fireAt) {
Expand All @@ -660,7 +656,7 @@ private CompletableTask<Void> createInstantTimer(int id, Instant fireAt) {
.build());

if (!this.isReplaying) {
// TODO: Log timer creation, including the expected fire-time
logger.finer(() -> String.format("Creating Instant Timer with id: %s, fireAt: %s ", id, fireAt));
}

CompletableTask<Void> timerTask = new CompletableTask<>();
Expand Down Expand Up @@ -701,7 +697,10 @@ public void handleTimerFired(HistoryEvent e) {
}

if (!this.isReplaying) {
// TODO: Log timer fired, including the scheduled fire-time
this.logger.finer(() ->
String.format("Firing timer by completing task: %s expected fire at time: %s", timerEventId,
Instant.ofEpochSecond(timerFiredEvent.getFireAt().getSeconds(),
timerFiredEvent.getFireAt().getNanos())));
}

CompletableTask<?> task = record.getTask();
Expand Down Expand Up @@ -851,7 +850,7 @@ private void addCarryoverEvents(CompleteOrchestrationAction.Builder builder) {

externalEvents.forEach(builder::addCarryoverEvents);
}

private boolean waitingForEvents() {
return this.outstandingEvents.size() > 0;
}
Expand Down Expand Up @@ -894,7 +893,7 @@ private void processEvent(HistoryEvent e) {
if (factory == null) {
throw new IllegalStateException("No factory found for orchestrator: " + executionStarted.getName());
}

TaskOrchestration orchestrator = factory.create();
orchestrator.run(this);
break;
Expand Down Expand Up @@ -1038,11 +1037,12 @@ public TimerTask(Instant finalFireAt) {
// if necessary. Otherwise, we return and no more sub-timers are created.
private CompletableFuture<Void> createTimerChain(Instant finalFireAt, CompletableFuture<Void> currentFuture) {
return currentFuture.thenRun(() -> {
if (currentInstant.compareTo(finalFireAt) > 0) {
Instant currentInstsanceMinusNanos = currentInstant.minusNanos(currentInstant.getNano());
Instant finalFireAtMinusNanos = finalFireAt.minusNanos(finalFireAt.getNano());
if (currentInstsanceMinusNanos.compareTo(finalFireAtMinusNanos) >= 0) {
return;
}
Task<Void> nextTimer = createTimerTask(finalFireAt);

createTimerChain(finalFireAt, nextTimer.future);
});
}
Expand All @@ -1062,7 +1062,9 @@ private CompletableTask<Void> createTimerTask(Instant finalFireAt) {

private void handleSubTimerSuccess() {
// check if it is the last timer
if (currentInstant.compareTo(finalFireAt) >= 0) {
Instant currentInstantMinusNanos = currentInstant.minusNanos(currentInstant.getNano());
Instant finalFireAtMinusNanos = finalFireAt.minusNanos(finalFireAt.getNano());
if (currentInstantMinusNanos.compareTo(finalFireAtMinusNanos) >= 0) {
this.complete(null);
}
}
Expand Down
172 changes: 161 additions & 11 deletions client/src/test/java/io/dapr/durabletask/IntegrationTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,14 @@ void emptyOrchestration() throws TimeoutException {
void singleTimer() throws IOException, TimeoutException {
final String orchestratorName = "SingleTimer";
final Duration delay = Duration.ofSeconds(3);
AtomicReferenceArray<LocalDateTime> timestamps = new AtomicReferenceArray<>(2);
AtomicInteger counter = new AtomicInteger();
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
.addOrchestrator(orchestratorName, ctx -> ctx.createTimer(delay).await())
.addOrchestrator(orchestratorName, ctx -> {
timestamps.set(counter.get(), LocalDateTime.now());
counter.incrementAndGet();
ctx.createTimer(delay).await();
})
.buildAndStart();

DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
Expand All @@ -103,11 +109,130 @@ void singleTimer() throws IOException, TimeoutException {
long expectedCompletionSecond = instance.getCreatedAt().plus(delay).getEpochSecond();
long actualCompletionSecond = instance.getLastUpdatedAt().getEpochSecond();
assertTrue(expectedCompletionSecond <= actualCompletionSecond);

// Verify that the correct number of timers were created
// This should yield 2 (first invocation + replay invocations for internal timers)
assertEquals(2, counter.get());

// Verify that each timer is the expected length
int[] secondsElapsed = new int[1];
for (int i = 0; i < timestamps.length() - 1; i++) {
secondsElapsed[i] = timestamps.get(i + 1).getSecond() - timestamps.get(i).getSecond();
}
assertEquals(3, secondsElapsed[0]);

}
}


@Test
void loopWithTimer() throws IOException, TimeoutException {
final String orchestratorName = "LoopWithTimer";
final Duration delay = Duration.ofSeconds(2);
AtomicReferenceArray<LocalDateTime> timestamps = new AtomicReferenceArray<>(4);
AtomicInteger counter = new AtomicInteger();
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
.addOrchestrator(orchestratorName, ctx -> {
for(int i = 0 ; i < 3; i++) {
if(!ctx.getIsReplaying()) {
timestamps.set(counter.get(), LocalDateTime.now());
counter.incrementAndGet();
}
ctx.createTimer(delay).await();
}
})
.buildAndStart();

DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
try (worker; client) {
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName);
Duration timeout = delay.plus(defaultTimeout);
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, timeout, false);
assertNotNull(instance);
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());

// Verify that the delay actually happened
long expectedCompletionSecond = instance.getCreatedAt().plus(delay).getEpochSecond();
long actualCompletionSecond = instance.getLastUpdatedAt().getEpochSecond();
assertTrue(expectedCompletionSecond <= actualCompletionSecond);

// Verify that the correct number of timers were created
assertEquals(3, counter.get());

// Verify that each timer is the expected length
int[] secondsElapsed = new int[timestamps.length()];
for (int i = 0; i < timestamps.length() - 1; i++) {
if(timestamps.get(i + 1) != null && timestamps.get(i) != null ) {
secondsElapsed[i] = timestamps.get(i + 1).getSecond() - timestamps.get(i).getSecond();
}else{
secondsElapsed[i] = -1;
}
}
assertEquals(2, secondsElapsed[0]);
assertEquals(2, secondsElapsed[1]);
assertEquals(-1, secondsElapsed[2]);


}
}

@Test
void loopWithWaitForEvent() throws IOException, TimeoutException {
final String orchestratorName = "LoopWithTimer";
final Duration delay = Duration.ofSeconds(2);
AtomicReferenceArray<LocalDateTime> timestamps = new AtomicReferenceArray<>(4);
AtomicInteger counter = new AtomicInteger();
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
.addOrchestrator(orchestratorName, ctx -> {
for(int i = 0 ; i < 4; i++) {
try{
ctx.waitForExternalEvent("HELLO", delay).await();
}catch(TaskCanceledException tce ){
if(!ctx.getIsReplaying()){
timestamps.set(counter.get(), LocalDateTime.now());
counter.incrementAndGet();
}

}
}
})
.buildAndStart();

DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
try (worker; client) {
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName);
Duration timeout = delay.plus(defaultTimeout);
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, timeout, false);
assertNotNull(instance);
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());

// Verify that the delay actually happened
long expectedCompletionSecond = instance.getCreatedAt().plus(delay).getEpochSecond();
long actualCompletionSecond = instance.getLastUpdatedAt().getEpochSecond();
assertTrue(expectedCompletionSecond <= actualCompletionSecond);

// Verify that the correct number of timers were created
assertEquals(4, counter.get());

// Verify that each timer is the expected length
int[] secondsElapsed = new int[timestamps.length()];
for (int i = 0; i < timestamps.length() - 1; i++) {
if(timestamps.get(i + 1) != null && timestamps.get(i) != null ) {
secondsElapsed[i] = timestamps.get(i + 1).getSecond() - timestamps.get(i).getSecond();
}else{
secondsElapsed[i] = -1;
}
}
assertEquals(2, secondsElapsed[0]);
assertEquals(2, secondsElapsed[1]);
assertEquals(2, secondsElapsed[2]);
assertEquals(0, secondsElapsed[3]);


}
}

@Test
@Disabled("Test is disabled for investigation, fixing the test retry pattern exposed the failure (could be timer creation issue)")
void longTimer() throws TimeoutException {
final String orchestratorName = "LongTimer";
final Duration delay = Duration.ofSeconds(7);
Expand All @@ -128,7 +253,7 @@ void longTimer() throws TimeoutException {
Duration timeout = delay.plus(defaultTimeout);
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, timeout, false);
assertNotNull(instance);
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus(),
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus(),
String.format("Orchestration failed with error: %s", instance.getFailureDetails().getErrorMessage()));

// Verify that the delay actually happened
Expand Down Expand Up @@ -285,6 +410,31 @@ void singleTimeStampTimer() throws IOException, TimeoutException {
}
}


@Test
void singleTimeStampCreateTimer() throws IOException, TimeoutException {
final String orchestratorName = "SingleTimeStampTimer";
final Duration delay = Duration.ofSeconds(3);
final ZonedDateTime zonedDateTime = ZonedDateTime.of(LocalDateTime.now().plusSeconds(delay.getSeconds()), ZoneId.systemDefault());
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
.addOrchestrator(orchestratorName, ctx -> ctx.createTimer(zonedDateTime).await())
.buildAndStart();

DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
try (worker; client) {
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName);
Duration timeout = delay.plus(defaultTimeout);
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, timeout, false);
assertNotNull(instance);
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());

// Verify that the delay actually happened
long expectedCompletionSecond = zonedDateTime.toInstant().getEpochSecond();
long actualCompletionSecond = instance.getLastUpdatedAt().getEpochSecond();
assertTrue(expectedCompletionSecond <= actualCompletionSecond);
}
}

@Test
void isReplaying() throws IOException, InterruptedException, TimeoutException {
final String orchestratorName = "SingleTimer";
Expand Down Expand Up @@ -884,13 +1034,13 @@ void multiInstanceQuery() throws TimeoutException{
// Test CreatedTimeTo filter
query.setCreatedTimeTo(startTime.minus(Duration.ofSeconds(1)));
result = client.queryInstances(query);
assertTrue(result.getOrchestrationState().isEmpty(),
"Result should be empty but found " + result.getOrchestrationState().size() + " instances: " +
assertTrue(result.getOrchestrationState().isEmpty(),
"Result should be empty but found " + result.getOrchestrationState().size() + " instances: " +
"Start time: " + startTime + ", " +
result.getOrchestrationState().stream()
.map(state -> String.format("\nID: %s, Status: %s, Created: %s",
state.getInstanceId(),
state.getRuntimeStatus(),
.map(state -> String.format("\nID: %s, Status: %s, Created: %s",
state.getInstanceId(),
state.getRuntimeStatus(),
state.getCreatedAt()))
.collect(Collectors.joining(", ")));

Expand Down Expand Up @@ -1203,7 +1353,7 @@ void waitForInstanceStartThrowsException() {
client.scheduleNewOrchestrationInstance(orchestratorName, null, instanceId);
});
thread.start();

assertThrows(TimeoutException.class, () -> client.waitForInstanceStart(instanceId, Duration.ofSeconds(2)) );
}
}
Expand Down Expand Up @@ -1591,8 +1741,8 @@ public void taskExecutionIdTest() {

DurableTaskGrpcWorker worker = this.createWorkerBuilder()
.addOrchestrator(orchestratorName, ctx -> {
ctx.callActivity(retryActivityName,null,taskOptions).await();
ctx.callActivity(retryActivityName,null,taskOptions).await();
ctx.callActivity(retryActivityName,null,taskOptions).await();
ctx.callActivity(retryActivityName,null,taskOptions).await();
ctx.complete(true);
})
.addActivity(retryActivityName, ctx -> {
Expand Down
Loading