Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
rootProject.name='temporal-java-sdk'
includeBuild('../nexus/nexus-sdk-java')
include 'temporal-bom'
include 'temporal-serviceclient'
include 'temporal-sdk'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@

package io.temporal.workflow

import io.nexusrpc.Operation
import io.nexusrpc.Service
import io.nexusrpc.handler.ServiceImpl
import io.temporal.client.WorkflowClientOptions
import io.temporal.client.WorkflowOptions
import io.temporal.common.converter.DefaultDataConverter
import io.temporal.common.converter.JacksonJsonPayloadConverter
import io.temporal.common.converter.KotlinObjectMapperFactory
import io.temporal.nexus.TemporalNexusClient
import io.temporal.nexus.TemporalOperation
import io.temporal.nexus.TemporalOperationResult
import io.temporal.nexus.TemporalOperationStartContext
import io.temporal.testing.internal.SDKTestWorkflowRule
import org.junit.Assert.assertEquals
import org.junit.Rule
import org.junit.Test
import java.time.Duration

class KotlinTemporalOperationTest {

@Rule
@JvmField
var testWorkflowRule: SDKTestWorkflowRule = SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(CallerWorkflowImpl::class.java)
.setNexusServiceImplementation(KotlinSugarServiceImpl())
.setWorkflowClientOptions(
WorkflowClientOptions.newBuilder()
.setDataConverter(DefaultDataConverter(JacksonJsonPayloadConverter(KotlinObjectMapperFactory.new())))
.build()
)
.build()

@Service
interface KotlinSugarService {
@Operation
fun greet(input: String): String
}

@ServiceImpl(service = KotlinSugarService::class)
class KotlinSugarServiceImpl {
@TemporalOperation
fun greet(
ctx: TemporalOperationStartContext,
client: TemporalNexusClient,
input: String
): TemporalOperationResult<String> {
return TemporalOperationResult.sync("kotlin-$input")
}
}

@WorkflowInterface
interface CallerWorkflow {
@WorkflowMethod
fun execute(arg: String): String
}

class CallerWorkflowImpl : CallerWorkflow {
override fun execute(arg: String): String {
val stub = Workflow.newNexusServiceStub(
KotlinSugarService::class.java,
NexusServiceOptions {
setOperationOptions(
NexusOperationOptions {
setScheduleToCloseTimeout(Duration.ofSeconds(10))
}
)
}
)
return stub.greet(arg)
}
}

@Test
fun temporalOperationSugar_endToEnd() {
val client = testWorkflowRule.workflowClient
val options = WorkflowOptions.newBuilder().setTaskQueue(testWorkflowRule.taskQueue).build()
val workflowStub = client.newWorkflowStub(CallerWorkflow::class.java, options)
assertEquals("kotlin-hi", workflowStub.execute("hi"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ private void registerNexusService(Object nexusService) {
if (nexusService instanceof Class) {
throw new IllegalArgumentException("Nexus service object instance expected, not the class");
}
ServiceImplInstance instance = ServiceImplInstance.fromInstance(nexusService);
ServiceImplInstance instance = TemporalOperationProcessor.process(nexusService);
InternalUtils.checkMethodName(instance);
if (serviceImplInstances.put(instance.getDefinition().getName(), instance) != null) {
throw new TypeAlreadyRegisteredException(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package io.temporal.internal.nexus;

import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Primitives;
import io.nexusrpc.OperationDefinition;
import io.nexusrpc.ServiceDefinition;
import io.nexusrpc.handler.MethodExtension;
import io.nexusrpc.handler.OperationImpl;
import io.nexusrpc.handler.ServiceImplInstance;
import io.temporal.nexus.TemporalNexusClient;
import io.temporal.nexus.TemporalOperation;
import io.temporal.nexus.TemporalOperationHandler;
import io.temporal.nexus.TemporalOperationResult;
import io.temporal.nexus.TemporalOperationStartContext;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.stream.Collectors;

/**
* Entry point for registering a Nexus service instance whose class may contain {@link
* TemporalOperation}-annotated methods. Delegates to {@link
* ServiceImplInstance#fromInstance(Object, java.util.List)} with a single {@link MethodExtension}
* that recognizes {@link TemporalOperation} alongside the built-in {@link OperationImpl}.
*/
public final class TemporalOperationProcessor {

private static final ImmutableList<MethodExtension> EXTENSIONS =
ImmutableList.of(new TemporalOperationExtension());

private TemporalOperationProcessor() {}

public static ServiceImplInstance process(Object instance) {
return ServiceImplInstance.fromInstance(instance, EXTENSIONS);
}

/** Recognizes {@link TemporalOperation}-annotated methods during nexusrpc service scanning. */
private static final class TemporalOperationExtension implements MethodExtension {
@Override
public Result extract(Object instance, Method method, ServiceDefinition serviceDefinition) {
if (method.getDeclaredAnnotation(TemporalOperation.class) == null) {
return null;
}
if (method.isAnnotationPresent(OperationImpl.class)) {
throw new IllegalArgumentException(
"@TemporalOperation and @OperationImpl cannot be combined on method "
+ method.getName());
}

validateSignature(method);

OperationDefinition operationDefinition =
serviceDefinition.getOperations().values().stream()
.filter(o -> method.getName().equals(o.getMethodName()))
.findFirst()
.orElseThrow(
() ->
new IllegalStateException(
"No matching @Operation on service "
+ serviceDefinition.getName()
+ " for @TemporalOperation method "
+ method.getName()));

validateTypes(method, operationDefinition);

MethodHandle handle;
try {
handle = MethodHandles.lookup().unreflect(method).bindTo(instance);
} catch (IllegalAccessException e) {
throw new RuntimeException(
"Failed to obtain method handle for @TemporalOperation method " + method.getName(), e);
}

TemporalOperationHandler.StartHandler<Object, Object> startHandler =
(ctx, client, input) -> invokeStartHandler(handle, ctx, client, input);

return new Result(
operationDefinition.getName(),
new TemporalOperationHandler<Object, Object>(startHandler) {});
}
}

private static void validateSignature(Method method) {
if (!Modifier.isPublic(method.getModifiers())) {
throw new IllegalArgumentException(
"@TemporalOperation method " + method.getName() + " must be public");
}
if (Modifier.isStatic(method.getModifiers())) {
throw new IllegalArgumentException(
"@TemporalOperation method " + method.getName() + " must not be static");
}
Class<?>[] paramTypes = method.getParameterTypes();
if (paramTypes.length != 3
|| !TemporalOperationStartContext.class.equals(paramTypes[0])
|| !TemporalNexusClient.class.equals(paramTypes[1])) {
throw new IllegalArgumentException(
"@TemporalOperation method "
+ method.getName()
+ " must accept (TemporalOperationStartContext, TemporalNexusClient, I); got "
+ describeSignature(method));
}
if (!TemporalOperationResult.class.equals(method.getReturnType())) {
throw new IllegalArgumentException(
"@TemporalOperation method "
+ method.getName()
+ " must return TemporalOperationResult<?>; got "
+ method.getGenericReturnType().getTypeName()
+ ". Use @OperationImpl for custom handler shapes.");
}
}

private static void validateTypes(Method method, OperationDefinition operationDefinition) {
Type expectedInputType = operationDefinition.getInputType();
Type declaredInputType = method.getGenericParameterTypes()[2];
if (!typesMatch(declaredInputType, expectedInputType)) {
throw new IllegalArgumentException(
"@TemporalOperation method "
+ method.getName()
+ " input type mismatch: expected "
+ expectedInputType.getTypeName()
+ " but got "
+ declaredInputType.getTypeName());
}
Type returnType = method.getGenericReturnType();
if (!(returnType instanceof ParameterizedType)) {
throw new IllegalArgumentException(
"@TemporalOperation method "
+ method.getName()
+ " must use parameterized TemporalOperationResult<R>, not the raw type.");
}
Type resultTypeArg = ((ParameterizedType) returnType).getActualTypeArguments()[0];
if (!typesMatch(resultTypeArg, operationDefinition.getOutputType())) {
throw new IllegalArgumentException(
"@TemporalOperation method "
+ method.getName()
+ " output type mismatch: expected "
+ operationDefinition.getOutputType().getTypeName()
+ " but got "
+ resultTypeArg.getTypeName());
}
}

// Package-private for testing.
@SuppressWarnings("unchecked")
static TemporalOperationResult<Object> invokeStartHandler(
MethodHandle handle,
TemporalOperationStartContext ctx,
TemporalNexusClient client,
Object input) {
try {
return (TemporalOperationResult<Object>) handle.invoke(ctx, client, input);
} catch (RuntimeException | Error e) {
throw e;
} catch (Throwable t) {
throw new RuntimeException("@TemporalOperation method threw checked exception", t);
}
}

private static boolean typesMatch(Type declared, Type expected) {
if (declared.equals(expected)) {
return true;
}
if (declared instanceof Class && expected instanceof Class) {
return Primitives.wrap((Class<?>) declared).equals(Primitives.wrap((Class<?>) expected));
}
return false;
}

private static String describeSignature(Method method) {
return Arrays.stream(method.getParameterTypes())
.map(Class::getSimpleName)
.collect(Collectors.joining(", ", "(", ")"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,36 +10,36 @@
* Nexus-aware client wrapping {@link WorkflowClient}. Provides methods for interacting with
* Temporal from within a Nexus operation handler.
*
* <p>Obtained via the {@link TemporalOperationHandler.StartHandler} parameter.
* <p>Passed to {@link TemporalOperation}-annotated methods (and {@link
* TemporalOperationHandler.StartHandler} implementations) alongside the start context and input.
*
* <p>Example usage to start a workflow from an operation handler:
* <p>Example usage to start a workflow from an operation:
*
* <pre>{@code
* @OperationImpl
* public OperationHandler<TransferInput, TransferResult> startTransfer() {
* return TemporalOperationHandler.create((context, client, input) -> {
* return client.startWorkflow(
* TransferWorkflow.class,
* TransferWorkflow::transfer, input.getFromAccount(), input.getToAccount(),
* WorkflowOptions.newBuilder()
* .setWorkflowId("transfer-" + input.getTransferId())
* .build());
* });
* @TemporalOperation
* public TemporalOperationResult<TransferResult> startTransfer(
* TemporalOperationStartContext ctx, TemporalNexusClient client, TransferInput input) {
* return client.startWorkflow(
* TransferWorkflow.class,
* TransferWorkflow::transfer,
* input,
* WorkflowOptions.newBuilder()
* .setWorkflowId("transfer-" + input.getTransferId())
* .build());
* }
* }</pre>
*
* <p>For synchronous operations, use {@link #getWorkflowClient()} directly and return a {@link
* TemporalOperationResult#sync} result. For example, to send a signal:
*
* <pre>{@code
* @OperationImpl
* public OperationHandler<CancelOrderInput, Void> cancelOrder() {
* return TemporalOperationHandler.create((context, client, input) -> {
* client.getWorkflowClient()
* .newUntypedWorkflowStub("order-" + input.getOrderId())
* .signal("requestCancellation", input);
* return TemporalOperationResult.sync(null);
* });
* @TemporalOperation
* public TemporalOperationResult<Void> cancelOrder(
* TemporalOperationStartContext ctx, TemporalNexusClient client, CancelOrderInput input) {
* client.getWorkflowClient()
* .newUntypedWorkflowStub("order-" + input.getOrderId())
* .signal("requestCancellation", input);
* return TemporalOperationResult.sync(null);
* }
* }</pre>
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.temporal.nexus;

import io.nexusrpc.handler.OperationCancelDetails;
import io.nexusrpc.handler.OperationContext;
import io.nexusrpc.handler.OperationImpl;
import io.nexusrpc.handler.ServiceImpl;
import io.temporal.common.Experimental;
import java.lang.annotation.*;

/**
* Marks a method on a {@link ServiceImpl}-annotated class as a Temporal-backed Nexus operation. The
* method body <i>is</i> the start handler — the framework wraps it in a {@link
* TemporalOperationHandler} at registration time, with default cancel behavior matching {@link
* TemporalOperationHandler#cancel(OperationContext, OperationCancelDetails)}.
*
* <p>The method must:
*
* <ul>
* <li>be {@code public},
* <li>accept exactly three parameters: {@link TemporalOperationStartContext}, {@link
* TemporalNexusClient}, and the operation input type,
* <li>return {@link TemporalOperationResult}.
* </ul>
*
* <p>Workflow-run example:
*
* <pre>{@code
* @ServiceImpl(service = TransferService.class)
* public class TransferServiceImpl {
* @TemporalOperation
* public TemporalOperationResult<TransferResult> transfer(
* TemporalOperationStartContext ctx, TemporalNexusClient client, TransferInput input) {
* return client.startWorkflow(
* TransferWorkflow.class,
* TransferWorkflow::transfer,
* input,
* WorkflowOptions.newBuilder()
* .setWorkflowId("transfer-" + input.getTransferId())
* .build());
* }
* }
* }</pre>
*
* <p>For custom cancel, or any other handler composition, use {@link OperationImpl} with a {@link
* TemporalOperationHandler} subclass that overrides {@link
* TemporalOperationHandler#cancelWorkflowRun}. Both annotations can coexist on the same {@link
* ServiceImpl} class, but never on the same method.
*/
@Experimental
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface TemporalOperation {}
Loading
Loading