Skip to content

[Fix #468] Try/raise implementation #488

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions api/src/main/resources/schema/workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -777,19 +777,22 @@ $defs:
errors:
type: object
title: CatchErrors
description: The configuration of a concept used to catch errors.
properties:
with:
$ref: '#/$defs/errorFilter'
description: static error filter
as:
type: string
title: CatchAs
description: The name of the runtime expression variable to save the error as. Defaults to 'error'.
when:
type: string
title: CatchWhen
description: A runtime expression used to determine whether or not to catch the filtered error.
description: A runtime expression used to determine whether to catch the filtered error.
exceptWhen:
type: string
title: CatchExceptWhen
description: A runtime expression used to determine whether or not to catch the filtered error.
description: A runtime expression used to determine whether not to catch the filtered error.
retry:
oneOf:
- $ref: '#/$defs/retryPolicy'
Expand Down Expand Up @@ -1152,6 +1155,27 @@ $defs:
title: ErrorDetails
description: A human-readable explanation specific to this occurrence of the error.
required: [ type, status ]
errorFilter:
type: object
title: ErrorFilter
description: Error filtering base on static values. For error filtering on dynamic values, use catch.when property
minProperties: 1
properties:
type:
type: string
description: if present, means this value should be used for filtering
status:
type: integer
description: if present, means this value should be used for filtering
instance:
type: string
description: if present, means this value should be used for filtering
title:
type: string
description: if present, means this value should be used for filtering
details:
type: string
description: if present, means this value should be used for filtering
uriTemplate:
title: UriTemplate
anyOf:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl;

import java.util.function.BiFunction;

@FunctionalInterface
public interface StringFilter extends BiFunction<WorkflowContext, TaskContext<?>, String> {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl;

public record WorkflowError(
String type, int status, String instance, String title, String details) {

private static final String ERROR_FORMAT = "https://serverlessworkflow.io/spec/1.0.0/errors/%s";
public static final String RUNTIME_TYPE = String.format(ERROR_FORMAT, "runtime");
public static final String COMM_TYPE = String.format(ERROR_FORMAT, "communication");

public static Builder error(String type, int status) {
return new Builder(type, status);
}

public static Builder communication(int status, TaskContext<?> context, Exception ex) {
return new Builder(COMM_TYPE, status)
.instance(context.position().jsonPointer())
.title(ex.getMessage());
}

public static Builder runtime(int status, TaskContext<?> context, Exception ex) {
return new Builder(RUNTIME_TYPE, status)
.instance(context.position().jsonPointer())
.title(ex.getMessage());
}

public static class Builder {

private final String type;
private int status;
private String instance;
private String title;
private String details;

private Builder(String type, int status) {
this.type = type;
this.status = status;
}

public Builder instance(String instance) {
this.instance = instance;
return this;
}

public Builder title(String title) {
this.title = title;
return this;
}

public Builder details(String details) {
this.details = details;
return this;
}

public WorkflowError build() {
return new WorkflowError(type, status, instance, title, details);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl;

public class WorkflowException extends RuntimeException {

private static final long serialVersionUID = 1L;

private final WorkflowError worflowError;

public WorkflowException(WorkflowError error) {
this(error, null);
}

public WorkflowException(WorkflowError error, Throwable cause) {
super(error.toString(), cause);
this.worflowError = error;
}

public WorkflowError getWorflowError() {
return worflowError;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ public class WorkflowInstance {
.inputFilter()
.ifPresent(f -> taskContext.input(f.apply(workflowContext, taskContext, input)));
state = WorkflowState.STARTED;
taskContext.rawOutput(
WorkflowUtils.processTaskList(definition.workflow().getDo(), workflowContext, taskContext));
WorkflowUtils.processTaskList(definition.workflow().getDo(), workflowContext, taskContext);
definition
.outputFilter()
.ifPresent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@

import java.util.function.Supplier;

@FunctionalInterface
public interface WorkflowPositionFactory extends Supplier<WorkflowPosition> {}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,23 @@ public static Optional<WorkflowFilter> buildWorkflowFilter(
: Optional.empty();
}

public static StringFilter buildStringFilter(
ExpressionFactory exprFactory, String expression, String literal) {
return expression != null ? from(buildWorkflowFilter(exprFactory, expression)) : from(literal);
}

public static StringFilter buildStringFilter(ExpressionFactory exprFactory, String str) {
return ExpressionUtils.isExpr(str) ? from(buildWorkflowFilter(exprFactory, str)) : from(str);
}

public static StringFilter from(WorkflowFilter filter) {
return (w, t) -> filter.apply(w, t, t.input()).asText();
}

private static StringFilter from(String literal) {
return (w, t) -> literal;
}

private static WorkflowFilter buildWorkflowFilter(
ExpressionFactory exprFactory, String str, Object object) {
if (str != null) {
Expand Down Expand Up @@ -127,7 +144,7 @@ private static TaskItem findTaskByName(ListIterator<TaskItem> iter, String taskN
throw new IllegalArgumentException("Cannot find task with name " + taskName);
}

public static JsonNode processTaskList(
public static void processTaskList(
List<TaskItem> tasks, WorkflowContext context, TaskContext<?> parentTask) {
parentTask.position().addProperty("do");
TaskContext<? extends TaskBase> currentContext = parentTask;
Expand All @@ -136,7 +153,7 @@ public static JsonNode processTaskList(
TaskItem nextTask = iter.next();
while (nextTask != null) {
TaskItem task = nextTask;
parentTask.position().addIndex(iter.nextIndex()).addProperty(task.getName());
parentTask.position().addIndex(iter.previousIndex()).addProperty(task.getName());
context
.definition()
.listeners()
Expand Down Expand Up @@ -175,7 +192,7 @@ public static JsonNode processTaskList(
}
}
parentTask.position().back();
return currentContext.output();
parentTask.rawOutput(currentContext.output());
}

public static WorkflowFilter buildWorkflowFilter(ExpressionFactory exprFactory, String str) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ public TaskExecutor<? extends TaskBase> getTaskExecutor(
return new SetExecutor(task.getSetTask(), definition);
} else if (task.getForTask() != null) {
return new ForExecutor(task.getForTask(), definition);
} else if (task.getRaiseTask() != null) {
return new RaiseExecutor(task.getRaiseTask(), definition);
} else if (task.getTryTask() != null) {
return new TryExecutor(task.getTryTask(), definition);
}
throw new UnsupportedOperationException(task.get().getClass().getName() + " not supported yet");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ protected DoExecutor(DoTask task, WorkflowDefinition definition) {

@Override
protected void internalExecute(WorkflowContext workflow, TaskContext<DoTask> taskContext) {
taskContext.rawOutput(WorkflowUtils.processTaskList(task.getDo(), workflow, taskContext));
WorkflowUtils.processTaskList(task.getDo(), workflow, taskContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ protected void internalExecute(WorkflowContext workflow, TaskContext<ForTask> ta
JsonNode item = iter.next();
taskContext.variables().put(task.getFor().getEach(), item);
taskContext.variables().put(task.getFor().getAt(), i++);
taskContext.rawOutput(WorkflowUtils.processTaskList(task.getDo(), workflow, taskContext));
WorkflowUtils.processTaskList(task.getDo(), workflow, taskContext);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.executors;

import io.serverlessworkflow.api.types.Error;
import io.serverlessworkflow.api.types.ErrorInstance;
import io.serverlessworkflow.api.types.ErrorType;
import io.serverlessworkflow.api.types.RaiseTask;
import io.serverlessworkflow.api.types.RaiseTaskError;
import io.serverlessworkflow.impl.StringFilter;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowError;
import io.serverlessworkflow.impl.WorkflowException;
import io.serverlessworkflow.impl.WorkflowUtils;
import io.serverlessworkflow.impl.expressions.ExpressionFactory;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiFunction;

public class RaiseExecutor extends AbstractTaskExecutor<RaiseTask> {

private final BiFunction<WorkflowContext, TaskContext<RaiseTask>, WorkflowError> errorBuilder;

private final StringFilter typeFilter;
private final Optional<StringFilter> instanceFilter;
private final StringFilter titleFilter;
private final StringFilter detailFilter;

protected RaiseExecutor(RaiseTask task, WorkflowDefinition definition) {
super(task, definition);
RaiseTaskError raiseError = task.getRaise().getError();
Error error =
raiseError.getRaiseErrorDefinition() != null
? raiseError.getRaiseErrorDefinition()
: findError(definition, raiseError.getRaiseErrorReference());
this.typeFilter = getTypeFunction(definition.expressionFactory(), error.getType());
this.instanceFilter = getInstanceFunction(definition.expressionFactory(), error.getInstance());
this.titleFilter =
WorkflowUtils.buildStringFilter(definition.expressionFactory(), error.getTitle());
this.detailFilter =
WorkflowUtils.buildStringFilter(definition.expressionFactory(), error.getDetail());
this.errorBuilder = (w, t) -> buildError(error, w, t);
}

private static Error findError(WorkflowDefinition definition, String raiseErrorReference) {
Map<String, Error> errorsMap =
definition.workflow().getUse().getErrors().getAdditionalProperties();
Error error = errorsMap.get(raiseErrorReference);
if (error == null) {
throw new IllegalArgumentException("Error " + error + "is not defined in " + errorsMap);
}
return error;
}

private WorkflowError buildError(
Error error, WorkflowContext context, TaskContext<RaiseTask> taskContext) {
return WorkflowError.error(typeFilter.apply(context, taskContext), error.getStatus())
.instance(
instanceFilter
.map(f -> f.apply(context, taskContext))
.orElseGet(() -> taskContext.position().jsonPointer()))
.title(titleFilter.apply(context, taskContext))
.details(detailFilter.apply(context, taskContext))
.build();
}

private Optional<StringFilter> getInstanceFunction(
ExpressionFactory expressionFactory, ErrorInstance errorInstance) {
return errorInstance != null
? Optional.of(
WorkflowUtils.buildStringFilter(
expressionFactory,
errorInstance.getExpressionErrorInstance(),
errorInstance.getLiteralErrorInstance()))
: Optional.empty();
}

private StringFilter getTypeFunction(ExpressionFactory expressionFactory, ErrorType type) {
return WorkflowUtils.buildStringFilter(
expressionFactory,
type.getExpressionErrorType(),
type.getLiteralErrorType().get().toString());
}

@Override
protected void internalExecute(WorkflowContext workflow, TaskContext<RaiseTask> taskContext) {
throw new WorkflowException(errorBuilder.apply(workflow, taskContext));
}
}
Loading
Loading