Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,18 @@ archivesBaseName = 'durabletask-client'
def grpcVersion = '1.69.0'
def protocVersion = '3.25.5'
def jacksonVersion = '2.15.3'

def otelVersion = '1.51.0'
def micrometerVersion = '1.5.1'

// When build on local, you need to set this value to your local jdk11 directory.
// Java11 is used to compile and run all the tests.
// Example for Windows: C:/Program Files/Java/openjdk-11.0.12_7/
def PATH_TO_TEST_JAVA_RUNTIME = System.env.JDK_11 ?: System.getProperty("java.home")

// Java 11 is now the minimum required version for both compilation and testing


dependencies {

// https://github.com/grpc/grpc-java#download
Expand All @@ -27,6 +37,9 @@ dependencies {
implementation 'com.google.protobuf:protobuf-java:3.25.5'
runtimeOnly "io.grpc:grpc-netty-shaded:${grpcVersion}"

implementation "io.opentelemetry:opentelemetry-api:${otelVersion}"
implementation "io.opentelemetry:opentelemetry-context:${otelVersion}"

compileOnly "org.apache.tomcat:annotations-api:6.0.53"

// https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core
Expand Down
34 changes: 33 additions & 1 deletion client/src/main/java/io/dapr/durabletask/DurableTaskClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT License.
package io.dapr.durabletask;

import io.opentelemetry.context.Context;
import javax.annotation.Nullable;
import java.time.Duration;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -38,7 +39,7 @@ public void close() {
* @return the randomly-generated instance ID of the scheduled orchestration instance
*/
public String scheduleNewOrchestrationInstance(String orchestratorName) {
return this.scheduleNewOrchestrationInstance(orchestratorName, null, null);
return this.scheduleNewOrchestrationInstance(orchestratorName, null, null, null);
}

/**
Expand Down Expand Up @@ -67,6 +68,19 @@ public String scheduleNewOrchestrationInstance(String orchestratorName, Object i
return this.scheduleNewOrchestrationInstance(orchestratorName, options);
}

public String scheduleNewOrchestrationInstance(
String orchestratorName,
Object input, String instanceId, Context context){
NewOrchestrationInstanceOptions options = new NewOrchestrationInstanceOptions()
.setInput(input)
.setInstanceId(instanceId);
return this.scheduleNewOrchestrationInstance(orchestratorName, options, context);
}

public abstract String scheduleNewOrchestrationInstance(
String orchestratorName,
NewOrchestrationInstanceOptions options, Context context);

/**
* Schedules a new orchestration instance with a specified set of options for execution.
*
Expand Down Expand Up @@ -98,6 +112,24 @@ public void raiseEvent(String instanceId, String eventName) {
this.raiseEvent(instanceId, eventName, null);
}

/**
* Sends an event notification message to a waiting orchestration instance.
* <p>
* In order to handle the event, the target orchestration instance must be waiting for an event named
* <code>eventName</code> using the {@link TaskOrchestrationContext#waitForExternalEvent(String)} method.
* If the target orchestration instance is not yet waiting for an event named <code>eventName</code>,
* then the event will be saved in the orchestration instance state and dispatched immediately when the
* orchestrator calls {@link TaskOrchestrationContext#waitForExternalEvent(String)}. This event saving occurs even
* if the orchestrator has canceled its wait operation before the event was received.
* <p>
* Raised events for a completed or non-existent orchestration instance will be silently discarded.
*
* @param instanceId the ID of the orchestration instance that will handle the event
* @param eventName the case-insensitive name of the event
* @param context Otel context for trace propagation.
*/
public abstract void raiseEvent(String instanceId, String eventName, @Nullable Object eventPayload, Context context);

/**
* Sends an event notification message with a payload to a waiting orchestration instance.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@
import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc;
import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc.*;


import io.dapr.durabletask.interceptors.DaprWorkflowClientGrpcInterceptors;
import io.grpc.*;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.opentelemetry.context.Context;

import java.io.FileInputStream;
import java.io.InputStream;

Expand All @@ -38,6 +42,7 @@ public final class DurableTaskGrpcClient extends DurableTaskClient {
private final DataConverter dataConverter;
private final ManagedChannel managedSidecarChannel;
private final TaskHubSidecarServiceBlockingStub sidecarClient;
private final DaprWorkflowClientGrpcInterceptors interceptors;

DurableTaskGrpcClient(DurableTaskGrpcClientBuilder builder) {
this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
Expand Down Expand Up @@ -112,6 +117,7 @@ public final class DurableTaskGrpcClient extends DurableTaskClient {
}

this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel);
this.interceptors = builder.interceptors;
}

/**
Expand All @@ -133,6 +139,53 @@ public void close() {
}
}

/*
* @salaboy TODO: refactor to avoid duplicated code
*/
@Override
public String scheduleNewOrchestrationInstance(
String orchestratorName,
NewOrchestrationInstanceOptions options, Context context) {
if (orchestratorName == null || orchestratorName.length() == 0) {
throw new IllegalArgumentException("A non-empty orchestrator name must be specified.");
}

Helpers.throwIfArgumentNull(options, "options");

CreateInstanceRequest.Builder builder = CreateInstanceRequest.newBuilder();
builder.setName(orchestratorName);

String instanceId = options.getInstanceId();
if (instanceId == null) {
instanceId = UUID.randomUUID().toString();
}
builder.setInstanceId(instanceId);

String version = options.getVersion();
if (version != null) {
builder.setVersion(StringValue.of(version));
}

Object input = options.getInput();
if (input != null) {
String serializedInput = this.dataConverter.serialize(input);
builder.setInput(StringValue.of(serializedInput));
}

Instant startTime = options.getStartTime();
if (startTime != null) {
Timestamp ts = DataConverter.getTimestampFromInstant(startTime);
builder.setScheduledStartTimestamp(ts);
}

CreateInstanceRequest request = builder.build();

CreateInstanceResponse response = interceptors.intercept(this.sidecarClient, context)
.startInstance(request);
return response.getInstanceId();

}

@Override
public String scheduleNewOrchestrationInstance(
String orchestratorName,
Expand Down Expand Up @@ -170,12 +223,25 @@ public String scheduleNewOrchestrationInstance(
}

CreateInstanceRequest request = builder.build();
CreateInstanceResponse response = this.sidecarClient.startInstance(request);

CreateInstanceResponse response = interceptors.intercept(this.sidecarClient)
.startInstance(request);
return response.getInstanceId();
}

@Override
public void raiseEvent(String instanceId, String eventName, Object eventPayload) {
RaiseEventRequest request = raiseEventRequest(instanceId, eventName, eventPayload);
this.sidecarClient.raiseEvent(request);
}

@Override
public void raiseEvent(String instanceId, String eventName, Object eventPayload, Context context) {
RaiseEventRequest request = raiseEventRequest(instanceId, eventName, eventPayload);
interceptors.intercept(this.sidecarClient, context).raiseEvent(request);
}

private RaiseEventRequest raiseEventRequest(String instanceId, String eventName, Object eventPayload){
Helpers.throwIfArgumentNull(instanceId, "instanceId");
Helpers.throwIfArgumentNull(eventName, "eventName");

Expand All @@ -186,11 +252,13 @@ public void raiseEvent(String instanceId, String eventName, Object eventPayload)
String serializedPayload = this.dataConverter.serialize(eventPayload);
builder.setInput(StringValue.of(serializedPayload));
}

RaiseEventRequest request = builder.build();
this.sidecarClient.raiseEvent(request);
return builder.build();
}





@Override
public OrchestrationMetadata getInstanceMetadata(String instanceId, boolean getInputsAndOutputs) {
GetInstanceRequest request = GetInstanceRequest.newBuilder()
Expand Down Expand Up @@ -295,7 +363,7 @@ private OrchestrationStatusQueryResult toQueryResult(QueryInstancesResponse quer

@Override
public void createTaskHub(boolean recreateIfExists) {
this.sidecarClient.createTaskHub(CreateTaskHubRequest.newBuilder().setRecreateIfExists(recreateIfExists).build());
interceptors.intercept(this.sidecarClient).createTaskHub(CreateTaskHubRequest.newBuilder().setRecreateIfExists(recreateIfExists).build());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
// Licensed under the MIT License.
package io.dapr.durabletask;

import io.dapr.durabletask.interceptors.DaprWorkflowClientGrpcInterceptors;
import io.grpc.Channel;
import io.grpc.ClientInterceptor;

import java.util.Arrays;
import java.util.List;

/**
* Builder class for constructing new {@link DurableTaskClient} objects that communicate with a sidecar process
Expand All @@ -16,6 +21,7 @@ public final class DurableTaskGrpcClientBuilder {
String tlsCertPath;
String tlsKeyPath;
boolean insecure;
DaprWorkflowClientGrpcInterceptors interceptors;

/**
* Sets the {@link DataConverter} to use for converting serializable data payloads.
Expand Down Expand Up @@ -106,6 +112,11 @@ public DurableTaskGrpcClientBuilder insecure(boolean insecure) {
return this;
}

public DurableTaskGrpcClientBuilder interceptor(DaprWorkflowClientGrpcInterceptors interceptors){
this.interceptors = interceptors;
return this;
}

/**
* Initializes a new {@link DurableTaskClient} object with the settings specified in the current builder object.
* @return a new {@link DurableTaskClient} object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,14 @@
import io.dapr.durabletask.implementation.protobuf.OrchestratorService.WorkItem.RequestCase;
import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc.*;


import io.dapr.durabletask.interceptors.DaprWorkflowClientGrpcInterceptors;
import io.grpc.*;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.ContextKey;

import javax.annotation.Nullable;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutorService;
Expand All @@ -19,6 +25,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;


/**
* Task hub worker that connects to a sidecar process over gRPC to execute
* orchestrator and activity events.
Expand All @@ -42,6 +49,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
private final boolean isExecutorServiceManaged;
private volatile boolean isNormalShutdown = false;
private Thread workerThread;
private DaprWorkflowClientGrpcInterceptors interceptors;

DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) {
this.orchestrationFactories.putAll(builder.orchestrationFactories);
Expand Down Expand Up @@ -69,10 +77,11 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
}

this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel);
this.interceptors = builder.interceptors;
this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval
: DEFAULT_MAXIMUM_TIMER_INTERVAL;
this.workerPool = builder.executorService != null ? builder.executorService : Executors.newCachedThreadPool();
this.workerPool = Context.taskWrapping(builder.executorService != null ? builder.executorService : Executors.newCachedThreadPool());
this.isExecutorServiceManaged = builder.executorService == null;
}

Expand Down Expand Up @@ -140,7 +149,7 @@ public void startAndBlock() {
while (true) {
try {
GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest.newBuilder().build();
Iterator<WorkItem> workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest);
Iterator<WorkItem> workItemStream = interceptors.intercept(this.sidecarClient, Context.root()).getWorkItems(getWorkItemsRequest);
while (workItemStream.hasNext()) {
WorkItem workItem = workItemStream.next();
RequestCase requestType = workItem.getRequestCase();
Expand All @@ -164,10 +173,13 @@ public void startAndBlock() {
.build();

try {
interceptors.intercept(this.sidecarClient, Context.root()).completeOrchestratorTask(response);

this.sidecarClient.completeOrchestratorTask(response);
logger.log(Level.FINEST,
"Completed orchestrator request for instance: {0}",
orchestratorRequest.getInstanceId());

} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
logger.log(Level.WARNING,
Expand All @@ -191,7 +203,9 @@ public void startAndBlock() {
activityRequest.getName(),
activityRequest.getOrchestrationInstance().getInstanceId()));


// TODO: Error handling

this.workerPool.submit(() -> {
String output = null;
TaskFailureDetails failureDetails = null;
Expand All @@ -200,7 +214,8 @@ public void startAndBlock() {
activityRequest.getName(),
activityRequest.getInput().getValue(),
activityRequest.getTaskExecutionId(),
activityRequest.getTaskId());
activityRequest.getTaskId(),
activityRequest.getParentTraceId());
} catch (Throwable e) {
failureDetails = TaskFailureDetails.newBuilder()
.setErrorType(e.getClass().getName())
Expand All @@ -223,7 +238,11 @@ public void startAndBlock() {
}

try {
this.sidecarClient.completeActivityTask(responseBuilder.build());
System.out.println(activityRequest);


Context activityContext = Context.current().with(ContextKey.named("traceparent"), activityRequest.getParentTraceContext().getTraceParent());
interceptors.intercept(this.sidecarClient, activityContext).completeActivityTask(responseBuilder.build());
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
logger.log(Level.WARNING,
Expand Down
Loading
Loading