Skip to content

Commit da245ee

Browse files
committed
Add support for RunTask.shell task
Signed-off-by: Matheus Cruz <[email protected]>
1 parent b043e89 commit da245ee

File tree

16 files changed

+653
-1
lines changed

16 files changed

+653
-1
lines changed
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors;
17+
18+
public record ProcessResult(int code, String stdout, String stderr) {}
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors;
17+
18+
import io.serverlessworkflow.api.types.RunShell;
19+
import io.serverlessworkflow.api.types.RunTaskConfiguration;
20+
import io.serverlessworkflow.api.types.Shell;
21+
import io.serverlessworkflow.impl.TaskContext;
22+
import io.serverlessworkflow.impl.WorkflowApplication;
23+
import io.serverlessworkflow.impl.WorkflowContext;
24+
import io.serverlessworkflow.impl.WorkflowDefinition;
25+
import io.serverlessworkflow.impl.WorkflowError;
26+
import io.serverlessworkflow.impl.WorkflowException;
27+
import io.serverlessworkflow.impl.WorkflowModel;
28+
import io.serverlessworkflow.impl.WorkflowModelFactory;
29+
import io.serverlessworkflow.impl.WorkflowUtils;
30+
import io.serverlessworkflow.impl.expressions.ExpressionUtils;
31+
import java.io.BufferedReader;
32+
import java.io.IOException;
33+
import java.io.InputStream;
34+
import java.io.InputStreamReader;
35+
import java.io.StringWriter;
36+
import java.nio.charset.StandardCharsets;
37+
import java.util.Map;
38+
import java.util.concurrent.CompletableFuture;
39+
40+
public class RunShellExecutor implements RunnableTask<RunShell> {
41+
42+
private ShellResultSupplier shellResultSupplier;
43+
private ProcessBuilderSupplier processBuilderSupplier;
44+
45+
@FunctionalInterface
46+
private interface ShellResultSupplier {
47+
WorkflowModel apply(
48+
TaskContext taskContext, WorkflowModel input, ProcessBuilder processBuilder);
49+
}
50+
51+
@FunctionalInterface
52+
private interface ProcessBuilderSupplier {
53+
ProcessBuilder apply(WorkflowContext workflowContext, TaskContext taskContext);
54+
}
55+
56+
@Override
57+
public CompletableFuture<WorkflowModel> apply(
58+
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
59+
ProcessBuilder processBuilder = this.processBuilderSupplier.apply(workflowContext, taskContext);
60+
WorkflowModel workflowModel =
61+
this.shellResultSupplier.apply(taskContext, input, processBuilder);
62+
return CompletableFuture.completedFuture(workflowModel);
63+
}
64+
65+
@Override
66+
public void init(RunShell taskConfiguration, WorkflowDefinition definition) {
67+
Shell shell = taskConfiguration.getShell();
68+
final String shellCommand = shell.getCommand();
69+
70+
if (shellCommand == null || shellCommand.isBlank()) {
71+
throw new IllegalStateException("Missing shell command in RunShell task configuration");
72+
}
73+
this.processBuilderSupplier =
74+
(workflowContext, taskContext) -> {
75+
WorkflowApplication application = definition.application();
76+
77+
String command =
78+
ExpressionUtils.isExpr(shellCommand)
79+
? WorkflowUtils.buildStringResolver(
80+
application, shellCommand, taskContext.input().asJavaObject())
81+
.apply(workflowContext, taskContext, taskContext.input())
82+
: shellCommand;
83+
84+
StringBuilder commandBuilder = new StringBuilder(command);
85+
86+
if (shell.getArguments() != null
87+
&& shell.getArguments().getAdditionalProperties() != null) {
88+
for (Map.Entry<String, Object> entry :
89+
shell.getArguments().getAdditionalProperties().entrySet()) {
90+
91+
String argKey =
92+
ExpressionUtils.isExpr(entry.getKey())
93+
? WorkflowUtils.buildStringResolver(
94+
application, entry.getKey(), taskContext.input().asJavaObject())
95+
.apply(workflowContext, taskContext, taskContext.input())
96+
: entry.getKey();
97+
98+
if (entry.getValue() == null) {
99+
commandBuilder.append(" ").append(argKey);
100+
continue;
101+
}
102+
103+
String argValue =
104+
ExpressionUtils.isExpr(entry.getValue())
105+
? WorkflowUtils.buildStringResolver(
106+
application,
107+
entry.getValue().toString(),
108+
taskContext.input().asJavaObject())
109+
.apply(workflowContext, taskContext, taskContext.input())
110+
: entry.getValue().toString();
111+
112+
commandBuilder.append(" ").append(argKey).append("=").append(argValue);
113+
}
114+
}
115+
116+
// TODO: support Windows cmd.exe
117+
ProcessBuilder builder = new ProcessBuilder("sh", "-c", commandBuilder.toString());
118+
119+
if (shell.getEnvironment() != null
120+
&& shell.getEnvironment().getAdditionalProperties() != null) {
121+
for (Map.Entry<String, Object> entry :
122+
shell.getEnvironment().getAdditionalProperties().entrySet()) {
123+
String value =
124+
ExpressionUtils.isExpr(entry.getValue())
125+
? WorkflowUtils.buildStringResolver(
126+
application,
127+
entry.getValue().toString(),
128+
taskContext.input().asJavaObject())
129+
.apply(workflowContext, taskContext, taskContext.input())
130+
: entry.getValue().toString();
131+
132+
// configure environments
133+
builder.environment().put(entry.getKey(), value);
134+
}
135+
}
136+
137+
return builder;
138+
};
139+
140+
this.shellResultSupplier =
141+
(taskContext, input, processBuilder) -> {
142+
try {
143+
Process process = processBuilder.start();
144+
145+
if (taskConfiguration.isAwait()) {
146+
return waitForResult(taskConfiguration, definition, process);
147+
} else {
148+
return input;
149+
}
150+
151+
} catch (IOException | InterruptedException e) {
152+
throw new WorkflowException(WorkflowError.runtime(taskContext, e).build(), e);
153+
}
154+
};
155+
}
156+
157+
private WorkflowModel waitForResult(
158+
RunShell taskConfiguration, WorkflowDefinition definition, Process process)
159+
throws IOException, InterruptedException {
160+
161+
CompletableFuture<String> futureStdout =
162+
CompletableFuture.supplyAsync(() -> readInputStream(process.getInputStream()));
163+
CompletableFuture<String> futureStderr =
164+
CompletableFuture.supplyAsync(() -> readInputStream(process.getErrorStream()));
165+
166+
int exitCode = process.waitFor();
167+
168+
CompletableFuture<Void> allStd = CompletableFuture.allOf(futureStdout, futureStderr);
169+
170+
allStd.join();
171+
172+
String stdout = futureStdout.join();
173+
String stderr = futureStderr.join();
174+
175+
RunTaskConfiguration.ProcessReturnType returnType = taskConfiguration.getReturn();
176+
177+
WorkflowModelFactory modelFactory = definition.application().modelFactory();
178+
179+
return switch (returnType) {
180+
case ALL -> modelFactory.fromAny(new ProcessResult(exitCode, stdout.trim(), stderr.trim()));
181+
case NONE -> modelFactory.fromNull();
182+
case CODE -> modelFactory.from(exitCode);
183+
case STDOUT -> modelFactory.from(stdout.trim());
184+
case STDERR -> modelFactory.from(stderr.trim());
185+
};
186+
}
187+
188+
@Override
189+
public boolean accept(Class<? extends RunTaskConfiguration> clazz) {
190+
return RunShell.class.equals(clazz);
191+
}
192+
193+
/**
194+
* Reads an InputStream and returns its content as a String. It keeps the original content using
195+
* UTF-8 encoding.
196+
*
197+
* @param inputStream {@link InputStream} to be read
198+
* @return {@link String} with the content of the InputStream
199+
*/
200+
public static String readInputStream(InputStream inputStream) {
201+
StringWriter writer = new StringWriter();
202+
try (BufferedReader reader =
203+
new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
204+
char[] buffer = new char[1024];
205+
int charsRead;
206+
while ((charsRead = reader.read(buffer)) != -1) {
207+
writer.write(buffer, 0, charsRead);
208+
}
209+
} catch (IOException e) {
210+
throw new RuntimeException(e);
211+
}
212+
return writer.toString();
213+
}
214+
}
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1-
io.serverlessworkflow.impl.executors.RunWorkflowExecutor
1+
io.serverlessworkflow.impl.executors.RunWorkflowExecutor
2+
io.serverlessworkflow.impl.executors.RunShellExecutor

0 commit comments

Comments
 (0)