Skip to content

Commit 57133bd

Browse files
authored
Support restartInstance and add restartPostUri in HttpManagementPayload (#125)
* Support restartInstance and add restartPostUri in HttpManagementPayload * Update CHANGELOG.md * Add func description and remove unused var in test * Remove unsupported resume and suspend post URIs * Add end to end tests
1 parent e872639 commit 57133bd

File tree

6 files changed

+133
-8
lines changed

6 files changed

+133
-8
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
* Fix the potential NPE issue of `DurableTaskClient#terminate` method ([#104](https://github.com/microsoft/durabletask-java/issues/104))
1010
* Add waitForCompletionOrCreateCheckStatusResponse client API ([#115](https://github.com/microsoft/durabletask-java/pull/115))
1111
* Support long timers by breaking up into smaller timers ([#114](https://github.com/microsoft/durabletask-java/issues/114))
12+
* Support restartInstance and pass restartPostUri in HttpManagementPayload ([#108](https://github.com/microsoft/durabletask-java/issues/108))
1213

1314
## v1.0.0
1415

azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/HttpManagementPayload.java

+12
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
public class HttpManagementPayload {
1313
private final String id;
1414
private final String purgeHistoryDeleteUri;
15+
private final String restartPostUri;
1516
private final String sendEventPostUri;
1617
private final String statusQueryGetUri;
1718
private final String terminatePostUri;
@@ -29,6 +30,7 @@ public HttpManagementPayload(
2930
String requiredQueryStringParameters) {
3031
this.id = instanceId;
3132
this.purgeHistoryDeleteUri = instanceStatusURL + "?" + requiredQueryStringParameters;
33+
this.restartPostUri = instanceStatusURL + "/restart?" + requiredQueryStringParameters;
3234
this.sendEventPostUri = instanceStatusURL + "/raiseEvent/{eventName}?" + requiredQueryStringParameters;
3335
this.statusQueryGetUri = instanceStatusURL + "?" + requiredQueryStringParameters;
3436
this.terminatePostUri = instanceStatusURL + "/terminate?reason={text}&" + requiredQueryStringParameters;
@@ -78,4 +80,14 @@ public String getTerminatePostUri() {
7880
public String getPurgeHistoryDeleteUri() {
7981
return this.purgeHistoryDeleteUri;
8082
}
83+
84+
/**
85+
* Gets the HTTP POST instance restart endpoint.
86+
*
87+
* @return The HTTP URL for posting instance restart commands.
88+
*/
89+
public String getRestartPostUri() {
90+
return restartPostUri;
91+
}
92+
8193
}

client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java

+10
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,16 @@ public abstract OrchestrationMetadata waitForInstanceCompletion(
282282
*/
283283
public abstract PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) throws TimeoutException;
284284

285+
/**
286+
* Restarts an existing orchestration instance with the original input.
287+
* @param instanceId the ID of the previously run orchestration instance to restart.
288+
* @param restartWithNewInstanceId <code>true</code> to restart the orchestration instance with a new instance ID
289+
* <code>false</code> to restart the orchestration instance with same instance ID
290+
* @return the ID of the scheduled orchestration instance, which is either <code>instanceId</code> or randomly
291+
* generated depending on the value of <code>restartWithNewInstanceId</code>
292+
*/
293+
public abstract String restartInstance(String instanceId, boolean restartWithNewInstanceId);
294+
285295
// /**
286296
// * Suspends a running orchestration instance.
287297
// * @param instanceId the ID of the orchestration instance to suspend

client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java

+18
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,24 @@ public PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) t
303303
// this.sidecarClient.resumeInstance(resumeRequestBuilder.build());
304304
// }
305305

306+
@Override
307+
public String restartInstance(String instanceId, boolean restartWithNewInstanceId) {
308+
OrchestrationMetadata metadata = this.getInstanceMetadata(instanceId, true);
309+
if (!metadata.isInstanceFound()) {
310+
throw new IllegalArgumentException(new StringBuilder()
311+
.append("An orchestration with instanceId ")
312+
.append(instanceId)
313+
.append(" was not found.").toString());
314+
}
315+
316+
if (restartWithNewInstanceId) {
317+
return this.scheduleNewOrchestrationInstance(metadata.getName(), this.dataConverter.deserialize(metadata.getSerializedInput(), Object.class));
318+
}
319+
else {
320+
return this.scheduleNewOrchestrationInstance(metadata.getName(), this.dataConverter.deserialize(metadata.getSerializedInput(), Object.class), metadata.getInstanceId());
321+
}
322+
}
323+
306324
private PurgeResult toPurgeResult(PurgeInstancesResponse response){
307325
return new PurgeResult(response.getDeletedInstanceCount());
308326
}

client/src/test/java/com/microsoft/durabletask/IntegrationTests.java

+49
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,55 @@ void termination() throws TimeoutException {
408408
}
409409
}
410410

411+
@ParameterizedTest
412+
@ValueSource(booleans = {true, false})
413+
void restartOrchestrationWithNewInstanceId(boolean restartWithNewInstanceId) throws TimeoutException {
414+
final String orchestratorName = "restart";
415+
final Duration delay = Duration.ofSeconds(3);
416+
417+
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
418+
.addOrchestrator(orchestratorName, ctx -> ctx.createTimer(delay).await())
419+
.buildAndStart();
420+
421+
DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
422+
try (worker; client) {
423+
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName, "RestartTest");
424+
client.waitForInstanceCompletion(instanceId, defaultTimeout, true);
425+
String newInstanceId = client.restartInstance(instanceId, restartWithNewInstanceId);
426+
OrchestrationMetadata instance = client.waitForInstanceCompletion(newInstanceId, defaultTimeout, true);
427+
428+
if (restartWithNewInstanceId) {
429+
assertNotEquals(instanceId, newInstanceId);
430+
} else {
431+
assertEquals(instanceId, newInstanceId);
432+
}
433+
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
434+
assertEquals("\"RestartTest\"", instance.getSerializedInput());
435+
}
436+
}
437+
438+
@Test
439+
void restartOrchestrationThrowsException() {
440+
final String orchestratorName = "restart";
441+
final Duration delay = Duration.ofSeconds(3);
442+
final String nonExistentId = "123";
443+
444+
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
445+
.addOrchestrator(orchestratorName, ctx -> ctx.createTimer(delay).await())
446+
.buildAndStart();
447+
448+
DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
449+
try (worker; client) {
450+
client.scheduleNewOrchestrationInstance(orchestratorName, "RestartTest");
451+
452+
assertThrows(
453+
IllegalArgumentException.class,
454+
() -> client.restartInstance(nonExistentId, true)
455+
);
456+
}
457+
458+
}
459+
411460
// @Test
412461
// void suspendResumeOrchestration() throws TimeoutException, InterruptedException {
413462
// final String orchestratorName = "suspend";

samples-azure-functions/src/test/java/com/functions/EndToEndTests.java

+43-8
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
import org.junit.jupiter.api.Order;
66
import org.junit.jupiter.api.Tag;
77
import org.junit.jupiter.api.Test;
8+
import org.junit.jupiter.params.ParameterizedTest;
9+
import org.junit.jupiter.params.provider.ValueSource;
810

911
import static io.restassured.RestAssured.get;
1012
import static io.restassured.RestAssured.post;
@@ -26,14 +28,7 @@ public void basicChain() throws InterruptedException {
2628
Response response = post(startOrchestrationPath);
2729
JsonPath jsonPath = response.jsonPath();
2830
String statusQueryGetUri = jsonPath.get("statusQueryGetUri");
29-
String runTimeStatus = null;
30-
for (int i = 0; i < 15; i++) {
31-
Response statusResponse = get(statusQueryGetUri);
32-
runTimeStatus = statusResponse.jsonPath().get("runtimeStatus");
33-
if (!"Completed".equals(runTimeStatus)) {
34-
Thread.sleep(1000);
35-
} else break;
36-
}
31+
String runTimeStatus = waitForCompletion(statusQueryGetUri);
3732
assertEquals("Completed", runTimeStatus);
3833
}
3934

@@ -59,4 +54,44 @@ public void continueAsNew() throws InterruptedException {
5954
runTimeStatus = statusResponse.jsonPath().get("runtimeStatus");
6055
assertEquals("Terminated", runTimeStatus);
6156
}
57+
58+
@ParameterizedTest
59+
@ValueSource(booleans = {true, false})
60+
public void restart(boolean restartWithNewInstanceId) throws InterruptedException {
61+
String startOrchestrationPath = "/api/StartOrchestration";
62+
Response response = post(startOrchestrationPath);
63+
JsonPath jsonPath = response.jsonPath();
64+
String statusQueryGetUri = jsonPath.get("statusQueryGetUri");
65+
String runTimeStatus = waitForCompletion(statusQueryGetUri);
66+
assertEquals("Completed", runTimeStatus);
67+
Response statusResponse = get(statusQueryGetUri);
68+
String instanceId = statusResponse.jsonPath().get("instanceId");
69+
70+
String restartPostUri = jsonPath.get("restartPostUri") + "&restartWithNewInstanceId=" + restartWithNewInstanceId;
71+
Response restartResponse = post(restartPostUri);
72+
JsonPath restartJsonPath = restartResponse.jsonPath();
73+
String restartStatusQueryGetUri = restartJsonPath.get("statusQueryGetUri");
74+
String restartRuntimeStatus = waitForCompletion(restartStatusQueryGetUri);
75+
assertEquals("Completed", restartRuntimeStatus);
76+
Response restartStatusResponse = get(restartStatusQueryGetUri);
77+
String newInstanceId = restartStatusResponse.jsonPath().get("instanceId");
78+
if (restartWithNewInstanceId) {
79+
assertNotEquals(instanceId, newInstanceId);
80+
} else {
81+
assertEquals(instanceId, newInstanceId);
82+
}
83+
}
84+
85+
private String waitForCompletion(String statusQueryGetUri) throws InterruptedException {
86+
String runTimeStatus = null;
87+
for (int i = 0; i < 15; i++) {
88+
Response statusResponse = get(statusQueryGetUri);
89+
runTimeStatus = statusResponse.jsonPath().get("runtimeStatus");
90+
if (!"Completed".equals(runTimeStatus)) {
91+
Thread.sleep(1000);
92+
} else break;
93+
}
94+
return runTimeStatus;
95+
}
96+
6297
}

0 commit comments

Comments
 (0)