Skip to content

Commit 6467b49

Browse files
authored
Add sample for cron (#33)
1 parent 822d26d commit 6467b49

File tree

4 files changed

+213
-2
lines changed

4 files changed

+213
-2
lines changed

README.md

+3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ These samples demonstrate various capabilities of Java Cadence client and server
2020
* **HelloQuery**: a query
2121
* **HelloSignal**: sending and handling a signal
2222
* **HelloPeriodic**: a sample workflow that executes an activity periodically forever
23+
* **HelloSearchAttributes**: how to use search attributes
24+
* **HelloCron**: a cron workflow
2325

2426
* **FileProcessing** demonstrates task routing features. The sample workflow downloads a file, processes it, and uploads
2527
the result to a destination. The first activity can be picked up by any worker. However, the second and third activities
@@ -97,6 +99,7 @@ To run the hello world samples:
9799
./gradlew -q execute -PmainClass=com.uber.cadence.samples.hello.HelloQuery
98100
./gradlew -q execute -PmainClass=com.uber.cadence.samples.hello.HelloSignal
99101
./gradlew -q execute -PmainClass=com.uber.cadence.samples.hello.HelloSearchAttributes
102+
./gradlew -q execute -PmainClass=com.uber.cadence.samples.hello.HelloCron
100103

101104
### File Processing
102105

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.samples.hello;
19+
20+
import static com.uber.cadence.samples.common.SampleConstants.DOMAIN;
21+
22+
import com.uber.cadence.TerminateWorkflowExecutionRequest;
23+
import com.uber.cadence.WorkflowExecution;
24+
import com.uber.cadence.WorkflowIdReusePolicy;
25+
import com.uber.cadence.activity.Activity;
26+
import com.uber.cadence.activity.ActivityOptions;
27+
import com.uber.cadence.client.WorkflowClient;
28+
import com.uber.cadence.common.CronSchedule;
29+
import com.uber.cadence.serviceclient.IWorkflowService;
30+
import com.uber.cadence.serviceclient.WorkflowServiceTChannel;
31+
import com.uber.cadence.worker.Worker;
32+
import com.uber.cadence.workflow.Workflow;
33+
import com.uber.cadence.workflow.WorkflowMethod;
34+
import java.time.Duration;
35+
36+
/**
37+
* Demonstrates a cron workflow that executes activity periodically. Requires a local instance of
38+
* Cadence server to be running.
39+
*/
40+
public class HelloCron {
41+
42+
static final String TASK_LIST = "HelloCron";
43+
static final String CRON_WORKFLOW_ID = "HelloCron";
44+
45+
public interface CronWorkflow {
46+
@WorkflowMethod(
47+
// At most one instance.
48+
workflowId = CRON_WORKFLOW_ID,
49+
taskList = TASK_LIST,
50+
// timeout for every run
51+
executionStartToCloseTimeoutSeconds = 30,
52+
// To allow starting workflow with the same ID after the previous one has terminated.
53+
workflowIdReusePolicy = WorkflowIdReusePolicy.AllowDuplicate
54+
)
55+
@CronSchedule("*/1 * * * *") // new workflow run every minute
56+
void greetPeriodically(String name);
57+
}
58+
59+
public interface GreetingActivities {
60+
void greet(String greeting);
61+
}
62+
63+
public static class CronWorkflowImpl implements CronWorkflow {
64+
65+
private final GreetingActivities activities =
66+
Workflow.newActivityStub(
67+
GreetingActivities.class,
68+
new ActivityOptions.Builder()
69+
.setScheduleToCloseTimeout(Duration.ofSeconds(10))
70+
.build());
71+
72+
@Override
73+
public void greetPeriodically(String name) {
74+
activities.greet("Hello " + name + "!");
75+
}
76+
}
77+
78+
static class GreetingActivitiesImpl implements GreetingActivities {
79+
@Override
80+
public void greet(String greeting) {
81+
System.out.println("From " + Activity.getWorkflowExecution() + ": " + greeting);
82+
}
83+
}
84+
85+
public static void main(String[] args) throws InterruptedException {
86+
// Start a worker that hosts both workflow and activity implementations.
87+
Worker.Factory factory = new Worker.Factory(DOMAIN);
88+
Worker worker = factory.newWorker(TASK_LIST);
89+
// Workflows are stateful. So you need a type to create instances.
90+
worker.registerWorkflowImplementationTypes(CronWorkflowImpl.class);
91+
// Activities are stateless and thread safe. So a shared instance is used.
92+
worker.registerActivitiesImplementations(new GreetingActivitiesImpl());
93+
// Start listening to the workflow and activity task lists.
94+
factory.start();
95+
96+
// Start a workflow execution async. Usually this is done from another program.
97+
WorkflowClient workflowClient = WorkflowClient.newInstance(DOMAIN);
98+
CronWorkflow workflow = workflowClient.newWorkflowStub(CronWorkflow.class);
99+
WorkflowClient.start(workflow::greetPeriodically, "World");
100+
System.out.println("Cron workflow is running");
101+
102+
// Cron workflow will not stop until it is terminated or cancelled.
103+
// So we wait some time to see cron run twice then terminate the cron workflow.
104+
Thread.sleep(90000);
105+
106+
IWorkflowService cadenceService = new WorkflowServiceTChannel();
107+
// execution without RunID set will be used to terminate current run
108+
WorkflowExecution execution = new WorkflowExecution();
109+
execution.setWorkflowId(CRON_WORKFLOW_ID);
110+
TerminateWorkflowExecutionRequest request = new TerminateWorkflowExecutionRequest();
111+
request.setDomain(DOMAIN);
112+
request.setWorkflowExecution(execution);
113+
try {
114+
cadenceService.TerminateWorkflowExecution(request);
115+
System.out.println("Cron workflow is terminated");
116+
} catch (Exception e) {
117+
System.out.println(e);
118+
}
119+
System.exit(0);
120+
}
121+
}

src/main/java/com/uber/cadence/samples/hello/HelloPeriodic.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,11 @@
3535
import java.util.Optional;
3636

3737
/**
38-
* Demonstrates a "cron" workflow that executes activity periodically. Requires a local instance of
39-
* Cadence server to be running.
38+
* Demonstrates how to use sleep and ContinueAsNew to executes activity periodically. In most cases,
39+
* use @see com.uber.cadence.samples.hello.HelloCron cadence built-in support of cron workflow to
40+
* save efforts.
41+
*
42+
* <p>Requires a local instance of Cadence server to be running.
4043
*/
4144
public class HelloPeriodic {
4245

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.samples.hello;
19+
20+
import static org.junit.Assert.assertEquals;
21+
import static org.junit.Assert.assertTrue;
22+
23+
import com.uber.cadence.ListClosedWorkflowExecutionsRequest;
24+
import com.uber.cadence.ListClosedWorkflowExecutionsResponse;
25+
import com.uber.cadence.WorkflowExecution;
26+
import com.uber.cadence.WorkflowExecutionCloseStatus;
27+
import com.uber.cadence.WorkflowExecutionFilter;
28+
import com.uber.cadence.WorkflowExecutionInfo;
29+
import com.uber.cadence.client.WorkflowClient;
30+
import com.uber.cadence.testing.TestWorkflowEnvironment;
31+
import com.uber.cadence.worker.Worker;
32+
import java.time.Duration;
33+
import org.apache.thrift.TException;
34+
import org.junit.After;
35+
import org.junit.Before;
36+
import org.junit.Test;
37+
38+
/** Unit test for {@link HelloCron}. Doesn't use an external Cadence service. */
39+
public class HelloCronTest {
40+
41+
private TestWorkflowEnvironment testEnv;
42+
private Worker worker;
43+
private WorkflowClient workflowClient;
44+
45+
@Before
46+
public void setUp() {
47+
testEnv = TestWorkflowEnvironment.newInstance();
48+
worker = testEnv.newWorker(HelloCron.TASK_LIST);
49+
worker.registerWorkflowImplementationTypes(HelloCron.CronWorkflowImpl.class);
50+
51+
workflowClient = testEnv.newWorkflowClient();
52+
}
53+
54+
@After
55+
public void tearDown() {
56+
testEnv.close();
57+
}
58+
59+
@Test
60+
public void testCron() throws TException {
61+
worker.registerActivitiesImplementations(new HelloCron.GreetingActivitiesImpl());
62+
testEnv.start();
63+
64+
// start cron workflow async
65+
HelloCron.CronWorkflow workflow = workflowClient.newWorkflowStub(HelloCron.CronWorkflow.class);
66+
WorkflowExecution execution = WorkflowClient.start(workflow::greetPeriodically, "World");
67+
assertEquals(HelloCron.CRON_WORKFLOW_ID, execution.getWorkflowId());
68+
69+
// Validate that workflow was continued as new at least once.
70+
// Use TestWorkflowEnvironment.sleep to execute the unit test without really sleeping.
71+
testEnv.sleep(Duration.ofMinutes(1));
72+
ListClosedWorkflowExecutionsRequest request =
73+
new ListClosedWorkflowExecutionsRequest()
74+
.setDomain(testEnv.getDomain())
75+
.setExecutionFilter(
76+
new WorkflowExecutionFilter().setWorkflowId(HelloCron.CRON_WORKFLOW_ID));
77+
ListClosedWorkflowExecutionsResponse listResponse =
78+
testEnv.getWorkflowService().ListClosedWorkflowExecutions(request);
79+
assertTrue(listResponse.getExecutions().size() > 1);
80+
for (WorkflowExecutionInfo e : listResponse.getExecutions()) {
81+
assertEquals(WorkflowExecutionCloseStatus.CONTINUED_AS_NEW, e.getCloseStatus());
82+
}
83+
}
84+
}

0 commit comments

Comments
 (0)