Skip to content

Commit 1bf965d

Browse files
authored
Added support for conversion of generic types (#179)
1 parent 7c4aa0e commit 1bf965d

31 files changed

+667
-155
lines changed

src/main/java/com/uber/cadence/client/WorkflowStub.java

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.uber.cadence.client;
1919

2020
import com.uber.cadence.WorkflowExecution;
21+
import java.lang.reflect.Type;
2122
import java.util.Optional;
2223
import java.util.concurrent.CompletableFuture;
2324
import java.util.concurrent.TimeUnit;
@@ -43,30 +44,64 @@ public interface WorkflowStub {
4344
* Returns workflow result potentially waiting for workflow to complete. Behind the scene this
4445
* call performs long poll on Cadence service waiting for workflow completion notification.
4546
*
46-
* @param returnType class of the workflow return value
47+
* @param resultClass class of the workflow return value
48+
* @param resultType type of the workflow return value. Differs from resultClass for generic
49+
* types.
4750
* @param <R> type of the workflow return value
4851
* @return workflow return value
4952
*/
50-
<R> R getResult(Class<R> returnType);
53+
<R> R getResult(Class<R> resultClass, Type resultType);
5154

52-
<R> CompletableFuture<R> getResultAsync(Class<R> returnType);
55+
<R> CompletableFuture<R> getResultAsync(Class<R> resultClass, Type resultType);
56+
57+
/**
58+
* Returns workflow result potentially waiting for workflow to complete. Behind the scene this
59+
* call performs long poll on Cadence service waiting for workflow completion notification.
60+
*
61+
* @param resultClass class of the workflow return value
62+
* @param <R> type of the workflow return value
63+
* @return workflow return value
64+
*/
65+
<R> R getResult(Class<R> resultClass);
66+
67+
<R> CompletableFuture<R> getResultAsync(Class<R> resultClass);
68+
69+
/**
70+
* Returns workflow result potentially waiting for workflow to complete. Behind the scene this
71+
* call performs long poll on Cadence service waiting for workflow completion notification.
72+
*
73+
* @param timeout maximum time to wait
74+
* @param unit unit of timeout
75+
* @param resultClass class of the workflow return value
76+
* @param resultType type of the workflow return value. Differs from resultClass for generic
77+
* @param <R> type of the workflow return value
78+
* @return workflow return value
79+
* @throws TimeoutException if workflow is not completed after the timeout time.
80+
*/
81+
<R> R getResult(long timeout, TimeUnit unit, Class<R> resultClass, Type resultType)
82+
throws TimeoutException;
5383

5484
/**
5585
* Returns workflow result potentially waiting for workflow to complete. Behind the scene this
5686
* call performs long poll on Cadence service waiting for workflow completion notification.
5787
*
5888
* @param timeout maximum time to wait
5989
* @param unit unit of timeout
60-
* @param returnType class of the workflow return value
90+
* @param resultClass class of the workflow return value
6191
* @param <R> type of the workflow return value
6292
* @return workflow return value
6393
* @throws TimeoutException if workflow is not completed after the timeout time.
6494
*/
65-
<R> R getResult(long timeout, TimeUnit unit, Class<R> returnType) throws TimeoutException;
95+
<R> R getResult(long timeout, TimeUnit unit, Class<R> resultClass) throws TimeoutException;
96+
97+
<R> CompletableFuture<R> getResultAsync(
98+
long timeout, TimeUnit unit, Class<R> resultClass, Type resultType);
99+
100+
<R> CompletableFuture<R> getResultAsync(long timeout, TimeUnit unit, Class<R> resultClass);
66101

67-
<R> CompletableFuture<R> getResultAsync(long timeout, TimeUnit unit, Class<R> returnType);
102+
<R> R query(String queryType, Class<R> resultClass, Object... args);
68103

69-
<R> R query(String queryType, Class<R> returnType, Object... args);
104+
<R> R query(String queryType, Class<R> resultClass, Type resultType, Object... args);
70105

71106
/** Request cancellation. */
72107
void cancel();

src/main/java/com/uber/cadence/converter/DataConverter.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package com.uber.cadence.converter;
1919

20+
import java.lang.reflect.Type;
21+
2022
/**
2123
* Used by the framework to serialize/deserialize method parameters that need to be sent over the
2224
* wire.
@@ -39,11 +41,13 @@ public interface DataConverter {
3941
* Implements conversion of a single value.
4042
*
4143
* @param content Serialized value to convert to a Java object.
44+
* @param valueClass
45+
* @param valueType
4246
* @return converted Java object
4347
* @throws DataConverterException if conversion of the data passed as parameter failed for any
4448
* reason.
4549
*/
46-
<T> T fromData(byte[] content, Class<T> valueType) throws DataConverterException;
50+
<T> T fromData(byte[] content, Class<T> valueClass, Type valueType) throws DataConverterException;
4751

4852
/**
4953
* Implements conversion of an array of values of different types. Useful for deserializing
@@ -54,5 +58,5 @@ public interface DataConverter {
5458
* @throws DataConverterException if conversion of the data passed as parameter failed for any
5559
* reason.
5660
*/
57-
Object[] fromDataArray(byte[] content, Class<?>... valueType) throws DataConverterException;
61+
Object[] fromDataArray(byte[] content, Type... valueType) throws DataConverterException;
5862
}

src/main/java/com/uber/cadence/converter/DataConverterException.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package com.uber.cadence.converter;
1919

20+
import java.lang.reflect.Type;
2021
import java.nio.charset.StandardCharsets;
2122
import java.util.Arrays;
2223

@@ -27,11 +28,11 @@
2728
@SuppressWarnings("serial")
2829
public class DataConverterException extends RuntimeException {
2930

30-
private Class<?>[] valueTypes;
31+
private Type[] valueTypes;
3132

3233
private String content;
3334

34-
public DataConverterException(byte[] content, Class<?>[] valueTypes, Throwable cause) {
35+
public DataConverterException(byte[] content, Type[] valueTypes, Throwable cause) {
3536
super(cause);
3637
this.valueTypes = valueTypes;
3738
setContent(content);

src/main/java/com/uber/cadence/converter/JsonDataConverter.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.io.IOException;
3434
import java.io.PrintWriter;
3535
import java.io.StringWriter;
36+
import java.lang.reflect.Type;
3637
import java.nio.charset.StandardCharsets;
3738
import java.util.function.Function;
3839
import java.util.regex.Matcher;
@@ -128,14 +129,15 @@ public byte[] toData(Object... values) throws DataConverterException {
128129
}
129130

130131
@Override
131-
public <T> T fromData(byte[] content, Class<T> valueType) throws DataConverterException {
132+
public <T> T fromData(byte[] content, Class<T> valueClass, Type valueType)
133+
throws DataConverterException {
132134
if (content == null) {
133135
return null;
134136
}
135137
try {
136138
// Deserialize thrift values.
137-
if (TBase.class.isAssignableFrom(valueType)) {
138-
T instance = valueType.getConstructor().newInstance();
139+
if (TBase.class.isAssignableFrom(valueClass)) {
140+
T instance = valueClass.getConstructor().newInstance();
139141
newThriftDeserializer().deserialize((TBase) instance, content);
140142
return instance;
141143
}
@@ -146,15 +148,15 @@ public <T> T fromData(byte[] content, Class<T> valueType) throws DataConverterEx
146148
}
147149

148150
@Override
149-
public Object[] fromDataArray(byte[] content, Class<?>... valueType)
150-
throws DataConverterException {
151+
public Object[] fromDataArray(byte[] content, Type... valueType) throws DataConverterException {
151152
try {
152153
if ((content == null || content.length == 0)
153154
&& (valueType == null || valueType.length == 0)) {
154155
return EMPTY_OBJECT_ARRAY;
155156
}
156157
if (valueType.length == 1) {
157-
return new Object[] {fromData(content, valueType[0])};
158+
Object result = gson.fromJson(new String(content, StandardCharsets.UTF_8), valueType[0]);
159+
return new Object[] {result};
158160
}
159161
JsonArray array = parser.parse(new String(content, StandardCharsets.UTF_8)).getAsJsonArray();
160162
Object[] result = new Object[valueType.length];

src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,8 @@ void handleMarkerRecorded(HistoryEvent event) {
195195
int getVersion(String changeId, DataConverter converter, int minSupported, int maxSupported) {
196196
Predicate<byte[]> changeIdEquals =
197197
(bytesInEvent) -> {
198-
MarkerData markerData = converter.fromData(bytesInEvent, MarkerData.class);
198+
MarkerData markerData =
199+
converter.fromData(bytesInEvent, MarkerData.class, MarkerData.class);
199200
return markerData.getId().equals(changeId);
200201
};
201202
decisions.addAllMissingVersionMarker(true, Optional.of(changeIdEquals));
@@ -214,7 +215,7 @@ int getVersion(String changeId, DataConverter converter, int minSupported, int m
214215
if (!result.isPresent()) {
215216
return WorkflowInternal.DEFAULT_VERSION;
216217
}
217-
int version = converter.fromData(result.get(), Integer.class);
218+
int version = converter.fromData(result.get(), Integer.class, Integer.class);
218219
validateVersion(changeId, version, minSupported, maxSupported);
219220
return version;
220221
}

src/main/java/com/uber/cadence/internal/replay/MarkerHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,8 @@ private Optional<byte[]> getMarkerDataFromHistory(
143143
if (!markerName.equals(name)) {
144144
return Optional.empty();
145145
}
146-
MarkerData markerData = converter.fromData(attributes.getDetails(), MarkerData.class);
146+
MarkerData markerData =
147+
converter.fromData(attributes.getDetails(), MarkerData.class, MarkerData.class);
147148
// access count is used to not return data from the marker before the recorded number of calls
148149
if (!markerId.equals(markerData.getId())
149150
|| markerData.getAccessCount() > expectedAcccessCount) {

src/main/java/com/uber/cadence/internal/sync/ActivityInvocationHandler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,10 @@ public Object invoke(Object proxy, Method method, Object[] args) {
8282

8383
ActivityOptions mergedOptions = ActivityOptions.merge(activityMethod, methodRetry, options);
8484
ActivityStub stub = ActivityStubImpl.newInstance(mergedOptions, activityExecutor);
85-
function = (a) -> stub.execute(activityName, method.getReturnType(), a);
85+
function =
86+
(a) ->
87+
stub.execute(
88+
activityName, method.getReturnType(), method.getGenericReturnType(), a);
8689
methodFunctions.put(method, function);
8790
} catch (NoSuchMethodException e) {
8891
throw Workflow.wrap(e);

src/main/java/com/uber/cadence/internal/sync/ActivityStubImpl.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.uber.cadence.workflow.ActivityStub;
2424
import com.uber.cadence.workflow.Promise;
2525
import com.uber.cadence.workflow.WorkflowInterceptor;
26+
import java.lang.reflect.Type;
2627

2728
/** Supports calling activity by name and arguments without its strongly typed interface. */
2829
class ActivityStubImpl implements ActivityStub {
@@ -42,11 +43,16 @@ private ActivityStubImpl(ActivityOptions options, WorkflowInterceptor activityEx
4243
}
4344

4445
@Override
45-
public <T> T execute(String activityName, Class<T> returnType, Object... args) {
46-
Promise<T> result = executeAsync(activityName, returnType, args);
46+
public <T> T execute(String activityName, Class<T> resultClass, Object... args) {
47+
return execute(activityName, resultClass, resultClass, args);
48+
}
49+
50+
@Override
51+
public <T> T execute(String activityName, Class<T> resultClass, Type resultType, Object... args) {
52+
Promise<T> result = executeAsync(activityName, resultClass, resultType, args);
4753
if (AsyncInternal.isAsync()) {
4854
AsyncInternal.setAsyncResult(result);
49-
return Defaults.defaultValue(returnType);
55+
return Defaults.defaultValue(resultClass);
5056
}
5157
try {
5258
return result.get();
@@ -60,7 +66,13 @@ public <T> T execute(String activityName, Class<T> returnType, Object... args) {
6066
}
6167

6268
@Override
63-
public <R> Promise<R> executeAsync(String activityName, Class<R> returnType, Object... args) {
64-
return activityExecutor.executeActivity(activityName, returnType, args, options);
69+
public <R> Promise<R> executeAsync(String activityName, Class<R> resultClass, Object... args) {
70+
return executeAsync(activityName, resultClass, resultClass, args);
71+
}
72+
73+
@Override
74+
public <R> Promise<R> executeAsync(
75+
String activityName, Class<R> resultClass, Type resultType, Object... args) {
76+
return activityExecutor.executeActivity(activityName, resultClass, resultType, args, options);
6577
}
6678
}

src/main/java/com/uber/cadence/internal/sync/ChildWorkflowInvocationHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public Object invoke(Object proxy, Method method, Object[] args) {
7070
+ "from @WorkflowMethod, @QueryMethod or @SignalMethod");
7171
}
7272
if (workflowMethod != null) {
73-
return stub.execute(method.getReturnType(), args);
73+
return stub.execute(method.getReturnType(), method.getGenericReturnType(), args);
7474
}
7575
if (queryMethod != null) {
7676
throw new UnsupportedOperationException(

src/main/java/com/uber/cadence/internal/sync/ChildWorkflowStubImpl.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.uber.cadence.workflow.Workflow;
2929
import com.uber.cadence.workflow.WorkflowInterceptor;
3030
import com.uber.cadence.workflow.WorkflowInterceptor.WorkflowResult;
31+
import java.lang.reflect.Type;
3132
import java.util.Objects;
3233

3334
class ChildWorkflowStubImpl implements ChildWorkflowStub {
@@ -61,11 +62,16 @@ public ChildWorkflowOptions getOptions() {
6162
}
6263

6364
@Override
64-
public <R> R execute(Class<R> returnType, Object... args) {
65-
Promise<R> result = executeAsync(returnType, args);
65+
public <R> R execute(Class<R> resultClass, Object... args) {
66+
return execute(resultClass, resultClass, args);
67+
}
68+
69+
@Override
70+
public <R> R execute(Class<R> resultClass, Type resultType, Object... args) {
71+
Promise<R> result = executeAsync(resultClass, resultType, args);
6672
if (AsyncInternal.isAsync()) {
6773
AsyncInternal.setAsyncResult(result);
68-
return Defaults.defaultValue(returnType);
74+
return Defaults.defaultValue(resultClass);
6975
}
7076
try {
7177
return result.get();
@@ -78,9 +84,14 @@ public <R> R execute(Class<R> returnType, Object... args) {
7884
}
7985

8086
@Override
81-
public <R> Promise<R> executeAsync(Class<R> returnType, Object... args) {
87+
public <R> Promise<R> executeAsync(Class<R> resultClass, Object... args) {
88+
return executeAsync(resultClass, resultClass, args);
89+
}
90+
91+
@Override
92+
public <R> Promise<R> executeAsync(Class<R> resultClass, Type resultType, Object... args) {
8293
WorkflowResult<R> result =
83-
decisionContext.executeChildWorkflow(workflowType, returnType, args, options);
94+
decisionContext.executeChildWorkflow(workflowType, resultClass, resultType, args, options);
8495
execution.completeFrom(result.getWorkflowExecution());
8596
return result.getResult();
8697
}

src/main/java/com/uber/cadence/internal/sync/POJOActivityTaskHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ public ActivityTaskHandler.Result execute(
184184
ActivityExecutionContext context =
185185
new ActivityExecutionContextImpl(service, domain, task, dataConverter, heartbeatExecutor);
186186
byte[] input = task.getInput();
187-
Object[] args = dataConverter.fromDataArray(input, method.getParameterTypes());
187+
Object[] args = dataConverter.fromDataArray(input, method.getGenericParameterTypes());
188188
CurrentActivityExecutionContext.set(context);
189189
try {
190190
Object result = method.invoke(activity, args);

src/main/java/com/uber/cadence/internal/sync/POJOWorkflowImplementationFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ private class POJOWorkflowImplementation implements SyncWorkflowDefinition {
202202

203203
@Override
204204
public byte[] execute(byte[] input) throws CancellationException, WorkflowExecutionException {
205-
Object[] args = dataConverter.fromDataArray(input, workflowMethod.getParameterTypes());
205+
Object[] args = dataConverter.fromDataArray(input, workflowMethod.getGenericParameterTypes());
206206
try {
207207
newInstance();
208208
Object result = workflowMethod.invoke(workflow, args);
@@ -279,7 +279,7 @@ public void processSignal(String signalName, byte[] input, long eventId) {
279279
+ signalHandlers.keySet());
280280
return;
281281
}
282-
Object[] args = dataConverter.fromDataArray(input, signalMethod.getParameterTypes());
282+
Object[] args = dataConverter.fromDataArray(input, signalMethod.getGenericParameterTypes());
283283
try {
284284
newInstance();
285285
signalMethod.invoke(workflow, args);

0 commit comments

Comments
 (0)