Skip to content

Commit a8ddba2

Browse files
Merge pull request #36 from longquanzheng/worker
Add a sample for how to use more options in worker
2 parents 195c172 + 2806add commit a8ddba2

File tree

1 file changed

+122
-0
lines changed

1 file changed

+122
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.samples.hello;
19+
20+
import com.uber.cadence.activity.ActivityMethod;
21+
import com.uber.cadence.client.WorkflowClient;
22+
import com.uber.cadence.worker.Worker;
23+
import com.uber.cadence.worker.WorkerOptions;
24+
import com.uber.cadence.workflow.Workflow;
25+
import com.uber.cadence.workflow.WorkflowMethod;
26+
27+
import static com.uber.cadence.samples.common.SampleConstants.DOMAIN;
28+
29+
/**
30+
* Hello World Cadence workflow that executes a single activity with full example of how to customize
31+
* a worker
32+
*/
33+
public class HelloWorkerSetup {
34+
35+
static final String TASK_LIST = "HelloActivity";
36+
37+
/** Workflow interface has to have at least one method annotated with @WorkflowMethod. */
38+
public interface GreetingWorkflow {
39+
/** @return greeting string */
40+
@WorkflowMethod(executionStartToCloseTimeoutSeconds = 10, taskList = TASK_LIST)
41+
String getGreeting(String name);
42+
}
43+
44+
/** Activity interface is just a POJI. */
45+
public interface GreetingActivities {
46+
@ActivityMethod(scheduleToCloseTimeoutSeconds = 2)
47+
String composeGreeting(String greeting, String name);
48+
}
49+
50+
/** GreetingWorkflow implementation that calls GreetingsActivities#composeGreeting. */
51+
public static class GreetingWorkflowImpl implements GreetingWorkflow {
52+
53+
/**
54+
* Activity stub implements activity interface and proxies calls to it to Cadence activity
55+
* invocations. Because activities are reentrant, only a single stub can be used for multiple
56+
* activity invocations.
57+
*/
58+
private final GreetingActivities activities =
59+
Workflow.newActivityStub(GreetingActivities.class);
60+
61+
@Override
62+
public String getGreeting(String name) {
63+
// This is a blocking call that returns only after the activity has completed.
64+
return activities.composeGreeting("Hello", name);
65+
}
66+
}
67+
68+
static class GreetingActivitiesImpl implements GreetingActivities {
69+
@Override
70+
public String composeGreeting(String greeting, String name) {
71+
return greeting + " " + name + "!";
72+
}
73+
}
74+
75+
public static void main(String[] args) {
76+
// Start a worker that hosts both workflow and activity implementations.
77+
78+
/**
79+
* If you see error "Not enough threads to execute workflows" exception it indicates that there are
80+
* not enough threads to execute currently running workflow tasks.
81+
*
82+
* For example, if each workflow uses two threads(using Asycn function) and maxConcurrentWorklfowExecutionSize is 100,
83+
* and assuming the factory only creates one worker. Then maxWorkflowThreads should be at least 200.
84+
* With such setup 0 workflows will be cached as all the threads would be consumed by the currently executing workflow tasks.
85+
* So in general it is better to keep maxWorkflowThreads much higher than maxConcurrentWorklfowExecutionSize to support caching.
86+
*
87+
* maxWorkflowThreads defines how many threads all currently executing and cached workflows can use.
88+
* It's a Factory level option, meaning that the thread pool is shared across all workers created by the factory.
89+
*
90+
* maxConcurrentWorklfowExecutionSize defines how many workflow tasks can execute in parallel.
91+
* It's a worker level option.
92+
*
93+
*
94+
*/
95+
Worker.Factory factory = new Worker.Factory(DOMAIN,
96+
new Worker.FactoryOptions.Builder()
97+
.setMaxWorkflowThreadCount(1000)
98+
.setCacheMaximumSize(100)
99+
.setDisableStickyExecution(false)
100+
.build());
101+
Worker worker = factory.newWorker(TASK_LIST,
102+
new WorkerOptions.Builder()
103+
.setMaxConcurrentActivityExecutionSize(100)
104+
.setMaxConcurrentWorkflowExecutionSize(100)
105+
.build());
106+
// Workflows are stateful. So you need a type to create instances.
107+
worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class);
108+
// Activities are stateless and thread safe. So a shared instance is used.
109+
worker.registerActivitiesImplementations(new GreetingActivitiesImpl());
110+
// Start listening to the workflow and activity task lists.
111+
factory.start();
112+
113+
// Start a workflow execution. Usually this is done from another program.
114+
WorkflowClient workflowClient = WorkflowClient.newInstance(DOMAIN);
115+
// Get a workflow stub using the same task list the worker uses.
116+
GreetingWorkflow workflow = workflowClient.newWorkflowStub(GreetingWorkflow.class);
117+
// Execute a workflow waiting for it to complete.
118+
String greeting = workflow.getGreeting("World");
119+
System.out.println(greeting);
120+
System.exit(0);
121+
}
122+
}

0 commit comments

Comments
 (0)