Skip to content

Commit 0becec9

Browse files
committed
draft work on trace prop for durable tasks
Signed-off-by: salaboy <[email protected]>
1 parent 688b059 commit 0becec9

22 files changed

+1734
-18
lines changed

client/build.gradle

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,15 @@ plugins {
1111
}
1212

1313
group 'io.dapr'
14-
version = '1.5.7'
14+
version = '1.5.8-SNAPSHOT'
1515
archivesBaseName = 'durabletask-client'
1616

1717
def grpcVersion = '1.69.0'
1818
def protocVersion = '3.25.5'
1919
def jacksonVersion = '2.15.3'
20+
def otelVersion = '1.51.0'
21+
def micrometerVersion = '1.5.1'
22+
2023
// When build on local, you need to set this value to your local jdk11 directory.
2124
// Java11 is used to compile and run all the tests.
2225
// Example for Windows: C:/Program Files/Java/openjdk-11.0.12_7/
@@ -29,6 +32,9 @@ dependencies {
2932
implementation "io.grpc:grpc-stub:${grpcVersion}"
3033
runtimeOnly "io.grpc:grpc-netty-shaded:${grpcVersion}"
3134

35+
implementation "io.opentelemetry:opentelemetry-api:${otelVersion}"
36+
implementation "io.opentelemetry:opentelemetry-context:${otelVersion}"
37+
3238
compileOnly "org.apache.tomcat:annotations-api:6.0.53"
3339

3440
// https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core

client/src/main/java/io/dapr/durabletask/DurableTaskClient.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the MIT License.
33
package io.dapr.durabletask;
44

5+
import io.opentelemetry.context.Context;
56
import javax.annotation.Nullable;
67
import java.time.Duration;
78
import java.util.concurrent.TimeoutException;
@@ -38,7 +39,7 @@ public void close() {
3839
* @return the randomly-generated instance ID of the scheduled orchestration instance
3940
*/
4041
public String scheduleNewOrchestrationInstance(String orchestratorName) {
41-
return this.scheduleNewOrchestrationInstance(orchestratorName, null, null);
42+
return this.scheduleNewOrchestrationInstance(orchestratorName, null, null, null);
4243
}
4344

4445
/**
@@ -67,6 +68,19 @@ public String scheduleNewOrchestrationInstance(String orchestratorName, Object i
6768
return this.scheduleNewOrchestrationInstance(orchestratorName, options);
6869
}
6970

71+
public String scheduleNewOrchestrationInstance(
72+
String orchestratorName,
73+
Object input, String instanceId, Context context){
74+
NewOrchestrationInstanceOptions options = new NewOrchestrationInstanceOptions()
75+
.setInput(input)
76+
.setInstanceId(instanceId);
77+
return this.scheduleNewOrchestrationInstance(orchestratorName, options, context);
78+
}
79+
80+
public abstract String scheduleNewOrchestrationInstance(
81+
String orchestratorName,
82+
NewOrchestrationInstanceOptions options, Context context);
83+
7084
/**
7185
* Schedules a new orchestration instance with a specified set of options for execution.
7286
*
@@ -98,6 +112,24 @@ public void raiseEvent(String instanceId, String eventName) {
98112
this.raiseEvent(instanceId, eventName, null);
99113
}
100114

115+
/**
116+
* Sends an event notification message to a waiting orchestration instance.
117+
* <p>
118+
* In order to handle the event, the target orchestration instance must be waiting for an event named
119+
* <code>eventName</code> using the {@link TaskOrchestrationContext#waitForExternalEvent(String)} method.
120+
* If the target orchestration instance is not yet waiting for an event named <code>eventName</code>,
121+
* then the event will be saved in the orchestration instance state and dispatched immediately when the
122+
* orchestrator calls {@link TaskOrchestrationContext#waitForExternalEvent(String)}. This event saving occurs even
123+
* if the orchestrator has canceled its wait operation before the event was received.
124+
* <p>
125+
* Raised events for a completed or non-existent orchestration instance will be silently discarded.
126+
*
127+
* @param instanceId the ID of the orchestration instance that will handle the event
128+
* @param eventName the case-insensitive name of the event
129+
* @param context Otel context for trace propagation.
130+
*/
131+
public abstract void raiseEvent(String instanceId, String eventName, @Nullable Object eventPayload, Context context);
132+
101133
/**
102134
* Sends an event notification message with a payload to a waiting orchestration instance.
103135
* <p>

client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java

Lines changed: 73 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,14 @@
88
import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc;
99
import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc.*;
1010

11+
12+
import io.dapr.durabletask.interceptors.DaprWorkflowClientGrpcInterceptors;
1113
import io.grpc.*;
1214
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
1315
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
1416
import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
17+
import io.opentelemetry.context.Context;
18+
1519
import java.io.FileInputStream;
1620
import java.io.InputStream;
1721

@@ -38,6 +42,7 @@ public final class DurableTaskGrpcClient extends DurableTaskClient {
3842
private final DataConverter dataConverter;
3943
private final ManagedChannel managedSidecarChannel;
4044
private final TaskHubSidecarServiceBlockingStub sidecarClient;
45+
private final DaprWorkflowClientGrpcInterceptors interceptors;
4146

4247
DurableTaskGrpcClient(DurableTaskGrpcClientBuilder builder) {
4348
this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
@@ -112,6 +117,7 @@ public final class DurableTaskGrpcClient extends DurableTaskClient {
112117
}
113118

114119
this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel);
120+
this.interceptors = builder.interceptors;
115121
}
116122

117123
/**
@@ -133,6 +139,53 @@ public void close() {
133139
}
134140
}
135141

142+
/*
143+
* @salaboy TODO: refactor to avoid duplicated code
144+
*/
145+
@Override
146+
public String scheduleNewOrchestrationInstance(
147+
String orchestratorName,
148+
NewOrchestrationInstanceOptions options, Context context) {
149+
if (orchestratorName == null || orchestratorName.length() == 0) {
150+
throw new IllegalArgumentException("A non-empty orchestrator name must be specified.");
151+
}
152+
153+
Helpers.throwIfArgumentNull(options, "options");
154+
155+
CreateInstanceRequest.Builder builder = CreateInstanceRequest.newBuilder();
156+
builder.setName(orchestratorName);
157+
158+
String instanceId = options.getInstanceId();
159+
if (instanceId == null) {
160+
instanceId = UUID.randomUUID().toString();
161+
}
162+
builder.setInstanceId(instanceId);
163+
164+
String version = options.getVersion();
165+
if (version != null) {
166+
builder.setVersion(StringValue.of(version));
167+
}
168+
169+
Object input = options.getInput();
170+
if (input != null) {
171+
String serializedInput = this.dataConverter.serialize(input);
172+
builder.setInput(StringValue.of(serializedInput));
173+
}
174+
175+
Instant startTime = options.getStartTime();
176+
if (startTime != null) {
177+
Timestamp ts = DataConverter.getTimestampFromInstant(startTime);
178+
builder.setScheduledStartTimestamp(ts);
179+
}
180+
181+
CreateInstanceRequest request = builder.build();
182+
183+
CreateInstanceResponse response = interceptors.intercept(this.sidecarClient, context)
184+
.startInstance(request);
185+
return response.getInstanceId();
186+
187+
}
188+
136189
@Override
137190
public String scheduleNewOrchestrationInstance(
138191
String orchestratorName,
@@ -170,12 +223,25 @@ public String scheduleNewOrchestrationInstance(
170223
}
171224

172225
CreateInstanceRequest request = builder.build();
173-
CreateInstanceResponse response = this.sidecarClient.startInstance(request);
226+
227+
CreateInstanceResponse response = interceptors.intercept(this.sidecarClient)
228+
.startInstance(request);
174229
return response.getInstanceId();
175230
}
176231

177232
@Override
178233
public void raiseEvent(String instanceId, String eventName, Object eventPayload) {
234+
RaiseEventRequest request = raiseEventRequest(instanceId, eventName, eventPayload);
235+
this.sidecarClient.raiseEvent(request);
236+
}
237+
238+
@Override
239+
public void raiseEvent(String instanceId, String eventName, Object eventPayload, Context context) {
240+
RaiseEventRequest request = raiseEventRequest(instanceId, eventName, eventPayload);
241+
interceptors.intercept(this.sidecarClient, context).raiseEvent(request);
242+
}
243+
244+
private RaiseEventRequest raiseEventRequest(String instanceId, String eventName, Object eventPayload){
179245
Helpers.throwIfArgumentNull(instanceId, "instanceId");
180246
Helpers.throwIfArgumentNull(eventName, "eventName");
181247

@@ -186,11 +252,13 @@ public void raiseEvent(String instanceId, String eventName, Object eventPayload)
186252
String serializedPayload = this.dataConverter.serialize(eventPayload);
187253
builder.setInput(StringValue.of(serializedPayload));
188254
}
189-
190-
RaiseEventRequest request = builder.build();
191-
this.sidecarClient.raiseEvent(request);
255+
return builder.build();
192256
}
193257

258+
259+
260+
261+
194262
@Override
195263
public OrchestrationMetadata getInstanceMetadata(String instanceId, boolean getInputsAndOutputs) {
196264
GetInstanceRequest request = GetInstanceRequest.newBuilder()
@@ -295,7 +363,7 @@ private OrchestrationStatusQueryResult toQueryResult(QueryInstancesResponse quer
295363

296364
@Override
297365
public void createTaskHub(boolean recreateIfExists) {
298-
this.sidecarClient.createTaskHub(CreateTaskHubRequest.newBuilder().setRecreateIfExists(recreateIfExists).build());
366+
interceptors.intercept(this.sidecarClient).createTaskHub(CreateTaskHubRequest.newBuilder().setRecreateIfExists(recreateIfExists).build());
299367
}
300368

301369
@Override

client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClientBuilder.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,12 @@
22
// Licensed under the MIT License.
33
package io.dapr.durabletask;
44

5+
import io.dapr.durabletask.interceptors.DaprWorkflowClientGrpcInterceptors;
56
import io.grpc.Channel;
7+
import io.grpc.ClientInterceptor;
8+
9+
import java.util.Arrays;
10+
import java.util.List;
611

712
/**
813
* Builder class for constructing new {@link DurableTaskClient} objects that communicate with a sidecar process
@@ -16,6 +21,7 @@ public final class DurableTaskGrpcClientBuilder {
1621
String tlsCertPath;
1722
String tlsKeyPath;
1823
boolean insecure;
24+
DaprWorkflowClientGrpcInterceptors interceptors;
1925

2026
/**
2127
* Sets the {@link DataConverter} to use for converting serializable data payloads.
@@ -106,6 +112,11 @@ public DurableTaskGrpcClientBuilder insecure(boolean insecure) {
106112
return this;
107113
}
108114

115+
public DurableTaskGrpcClientBuilder interceptor(DaprWorkflowClientGrpcInterceptors interceptors){
116+
this.interceptors = interceptors;
117+
return this;
118+
}
119+
109120
/**
110121
* Initializes a new {@link DurableTaskClient} object with the settings specified in the current builder object.
111122
* @return a new {@link DurableTaskClient} object

client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,14 @@
99
import io.dapr.durabletask.implementation.protobuf.OrchestratorService.WorkItem.RequestCase;
1010
import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc.*;
1111

12+
13+
import io.dapr.durabletask.interceptors.DaprWorkflowClientGrpcInterceptors;
1214
import io.grpc.*;
1315

16+
import io.opentelemetry.context.Context;
17+
import io.opentelemetry.context.ContextKey;
18+
19+
import javax.annotation.Nullable;
1420
import java.time.Duration;
1521
import java.util.*;
1622
import java.util.concurrent.ExecutorService;
@@ -19,6 +25,7 @@
1925
import java.util.logging.Level;
2026
import java.util.logging.Logger;
2127

28+
2229
/**
2330
* Task hub worker that connects to a sidecar process over gRPC to execute
2431
* orchestrator and activity events.
@@ -41,6 +48,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
4148
private final boolean isExecutorServiceManaged;
4249
private volatile boolean isNormalShutdown = false;
4350
private Thread workerThread;
51+
private DaprWorkflowClientGrpcInterceptors interceptors;
4452

4553
DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) {
4654
this.orchestrationFactories.putAll(builder.orchestrationFactories);
@@ -67,10 +75,11 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
6775
}
6876

6977
this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel);
78+
this.interceptors = builder.interceptors;
7079
this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
7180
this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval
7281
: DEFAULT_MAXIMUM_TIMER_INTERVAL;
73-
this.workerPool = builder.executorService != null ? builder.executorService : Executors.newCachedThreadPool();
82+
this.workerPool = Context.taskWrapping(builder.executorService != null ? builder.executorService : Executors.newCachedThreadPool());
7483
this.isExecutorServiceManaged = builder.executorService == null;
7584
}
7685

@@ -137,7 +146,7 @@ public void startAndBlock() {
137146
while (true) {
138147
try {
139148
GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest.newBuilder().build();
140-
Iterator<WorkItem> workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest);
149+
Iterator<WorkItem> workItemStream = interceptors.intercept(this.sidecarClient, Context.root()).getWorkItems(getWorkItemsRequest);
141150
while (workItemStream.hasNext()) {
142151
WorkItem workItem = workItemStream.next();
143152
RequestCase requestType = workItem.getRequestCase();
@@ -158,7 +167,7 @@ public void startAndBlock() {
158167
.build();
159168

160169
try {
161-
this.sidecarClient.completeOrchestratorTask(response);
170+
interceptors.intercept(this.sidecarClient, Context.root()).completeOrchestratorTask(response);
162171
} catch (StatusRuntimeException e) {
163172
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
164173
logger.log(Level.WARNING,
@@ -178,6 +187,7 @@ public void startAndBlock() {
178187
} else if (requestType == RequestCase.ACTIVITYREQUEST) {
179188
ActivityRequest activityRequest = workItem.getActivityRequest();
180189

190+
System.out.println(activityRequest);
181191
this.workerPool.submit(() -> {
182192
String output = null;
183193
TaskFailureDetails failureDetails = null;
@@ -186,7 +196,8 @@ public void startAndBlock() {
186196
activityRequest.getName(),
187197
activityRequest.getInput().getValue(),
188198
activityRequest.getTaskExecutionId(),
189-
activityRequest.getTaskId());
199+
activityRequest.getTaskId(),
200+
activityRequest.getParentTraceId());
190201
} catch (Throwable e) {
191202
failureDetails = TaskFailureDetails.newBuilder()
192203
.setErrorType(e.getClass().getName())
@@ -209,7 +220,11 @@ public void startAndBlock() {
209220
}
210221

211222
try {
212-
this.sidecarClient.completeActivityTask(responseBuilder.build());
223+
System.out.println(activityRequest);
224+
225+
226+
Context activityContext = Context.current().with(ContextKey.named("traceparent"), activityRequest.getParentTraceContext().getTraceParent());
227+
interceptors.intercept(this.sidecarClient, activityContext).completeActivityTask(responseBuilder.build());
213228
} catch (StatusRuntimeException e) {
214229
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
215230
logger.log(Level.WARNING,

client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
// Licensed under the MIT License.
33
package io.dapr.durabletask;
44

5+
import io.dapr.durabletask.interceptors.DaprWorkflowClientGrpcInterceptors;
56
import io.grpc.Channel;
7+
import io.opentelemetry.context.Context;
68

79
import java.time.Duration;
810
import java.util.HashMap;
@@ -20,6 +22,7 @@ public final class DurableTaskGrpcWorkerBuilder {
2022
DataConverter dataConverter;
2123
Duration maximumTimerInterval;
2224
ExecutorService executorService;
25+
DaprWorkflowClientGrpcInterceptors interceptors;
2326

2427
/**
2528
* Adds an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker}.
@@ -127,6 +130,10 @@ public DurableTaskGrpcWorkerBuilder withExecutorService(ExecutorService executor
127130
return this;
128131
}
129132

133+
public DurableTaskGrpcWorkerBuilder interceptors(DaprWorkflowClientGrpcInterceptors interceptors){
134+
this.interceptors = interceptors;
135+
return this;
136+
}
130137
/**
131138
* Initializes a new {@link DurableTaskGrpcWorker} object with the settings specified in the current builder object.
132139
* @return a new {@link DurableTaskGrpcWorker} object

client/src/main/java/io/dapr/durabletask/TaskActivityContext.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,11 @@ public interface TaskActivityContext {
3434
* @return the task id of the current task activity
3535
*/
3636
int getTaskId();
37+
38+
/**
39+
* Get the task parent trace id for Otel trace propagation.
40+
* @return the task parent traceId
41+
*
42+
*/
43+
String getParentTraceId();
3744
}

0 commit comments

Comments
 (0)