Skip to content

Commit f729012

Browse files
authored
Refactoring run method (#236)
* Refactoring run method Signed-off-by: fjtirado <[email protected]> * Zaninis comments Signed-off-by: fjtirado <[email protected]> --------- Signed-off-by: fjtirado <[email protected]>
1 parent 45bb41e commit f729012

9 files changed

+164
-124
lines changed

impl/runner.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -137,12 +137,12 @@ func (wr *workflowRunnerImpl) Run(input interface{}) (output interface{}, err er
137137
wr.RunnerCtx.SetInput(input)
138138
// Run tasks sequentially
139139
wr.RunnerCtx.SetStatus(ctx.RunningStatus)
140-
doRunner, err := NewDoTaskRunner(wr.Workflow.Do, wr)
140+
doRunner, err := NewDoTaskRunner(wr.Workflow.Do)
141141
if err != nil {
142142
return nil, err
143143
}
144144
wr.RunnerCtx.SetStartedAt(time.Now())
145-
output, err = doRunner.Run(wr.RunnerCtx.GetInput())
145+
output, err = doRunner.Run(wr.RunnerCtx.GetInput(), wr)
146146
if err != nil {
147147
return nil, err
148148
}

impl/task_runner.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ var _ TaskRunner = &ForTaskRunner{}
2828
var _ TaskRunner = &DoTaskRunner{}
2929

3030
type TaskRunner interface {
31-
Run(input interface{}) (interface{}, error)
31+
Run(input interface{}, taskSupport TaskSupport) (interface{}, error)
3232
GetTaskName() string
3333
}
3434

impl/task_runner_call_http.go

+44
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Copyright 2025 The Serverless Workflow Specification Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package impl
16+
17+
import (
18+
"fmt"
19+
20+
"github.com/serverlessworkflow/sdk-go/v3/model"
21+
)
22+
23+
type CallHTTPTaskRunner struct {
24+
TaskName string
25+
}
26+
27+
func NewCallHttpRunner(taskName string, task *model.CallHTTP) (taskRunner *CallHTTPTaskRunner, err error) {
28+
if task == nil {
29+
err = model.NewErrValidation(fmt.Errorf("invalid For task %s", taskName), taskName)
30+
} else {
31+
taskRunner = new(CallHTTPTaskRunner)
32+
taskRunner.TaskName = taskName
33+
}
34+
return
35+
}
36+
37+
func (f *CallHTTPTaskRunner) Run(input interface{}, taskSupport TaskSupport) (interface{}, error) {
38+
return input, nil
39+
40+
}
41+
42+
func (f *CallHTTPTaskRunner) GetTaskName() string {
43+
return f.TaskName
44+
}

impl/task_runner_do.go

+50-50
Original file line numberDiff line numberDiff line change
@@ -23,110 +23,110 @@ import (
2323
)
2424

2525
// NewTaskRunner creates a TaskRunner instance based on the task type.
26-
func NewTaskRunner(taskName string, task model.Task, taskSupport TaskSupport) (TaskRunner, error) {
26+
func NewTaskRunner(taskName string, task model.Task, workflowDef *model.Workflow) (TaskRunner, error) {
2727
switch t := task.(type) {
2828
case *model.SetTask:
29-
return NewSetTaskRunner(taskName, t, taskSupport)
29+
return NewSetTaskRunner(taskName, t)
3030
case *model.RaiseTask:
31-
return NewRaiseTaskRunner(taskName, t, taskSupport)
31+
return NewRaiseTaskRunner(taskName, t, workflowDef)
3232
case *model.DoTask:
33-
return NewDoTaskRunner(t.Do, taskSupport)
33+
return NewDoTaskRunner(t.Do)
3434
case *model.ForTask:
35-
return NewForTaskRunner(taskName, t, taskSupport)
35+
return NewForTaskRunner(taskName, t)
36+
case *model.CallHTTP:
37+
return NewCallHttpRunner(taskName, t)
3638
default:
3739
return nil, fmt.Errorf("unsupported task type '%T' for task '%s'", t, taskName)
3840
}
3941
}
4042

41-
func NewDoTaskRunner(taskList *model.TaskList, taskSupport TaskSupport) (*DoTaskRunner, error) {
43+
func NewDoTaskRunner(taskList *model.TaskList) (*DoTaskRunner, error) {
4244
return &DoTaskRunner{
43-
TaskList: taskList,
44-
TaskSupport: taskSupport,
45+
TaskList: taskList,
4546
}, nil
4647
}
4748

4849
type DoTaskRunner struct {
49-
TaskList *model.TaskList
50-
TaskSupport TaskSupport
50+
TaskList *model.TaskList
5151
}
5252

53-
func (d *DoTaskRunner) Run(input interface{}) (output interface{}, err error) {
53+
func (d *DoTaskRunner) Run(input interface{}, taskSupport TaskSupport) (output interface{}, err error) {
5454
if d.TaskList == nil {
5555
return input, nil
5656
}
57-
return d.runTasks(input, d.TaskList)
57+
return d.runTasks(input, taskSupport)
5858
}
5959

6060
func (d *DoTaskRunner) GetTaskName() string {
6161
return ""
6262
}
6363

6464
// runTasks runs all defined tasks sequentially.
65-
func (d *DoTaskRunner) runTasks(input interface{}, tasks *model.TaskList) (output interface{}, err error) {
65+
func (d *DoTaskRunner) runTasks(input interface{}, taskSupport TaskSupport) (output interface{}, err error) {
6666
output = input
67-
if tasks == nil {
67+
if d.TaskList == nil {
6868
return output, nil
6969
}
7070

7171
idx := 0
72-
currentTask := (*tasks)[idx]
72+
currentTask := (*d.TaskList)[idx]
7373

7474
for currentTask != nil {
75-
if err = d.TaskSupport.SetTaskDef(currentTask); err != nil {
75+
if err = taskSupport.SetTaskDef(currentTask); err != nil {
7676
return nil, err
7777
}
78-
if err = d.TaskSupport.SetTaskReferenceFromName(currentTask.Key); err != nil {
78+
if err = taskSupport.SetTaskReferenceFromName(currentTask.Key); err != nil {
7979
return nil, err
8080
}
8181

82-
if shouldRun, err := d.shouldRunTask(input, currentTask); err != nil {
82+
if shouldRun, err := d.shouldRunTask(input, taskSupport, currentTask); err != nil {
8383
return output, err
8484
} else if !shouldRun {
85-
idx, currentTask = tasks.Next(idx)
85+
idx, currentTask = d.TaskList.Next(idx)
8686
continue
8787
}
8888

89-
d.TaskSupport.SetTaskStatus(currentTask.Key, ctx.PendingStatus)
89+
taskSupport.SetTaskStatus(currentTask.Key, ctx.PendingStatus)
9090

9191
// Check if this task is a SwitchTask and handle it
9292
if switchTask, ok := currentTask.Task.(*model.SwitchTask); ok {
93-
flowDirective, err := d.evaluateSwitchTask(input, currentTask.Key, switchTask)
93+
flowDirective, err := d.evaluateSwitchTask(input, taskSupport, currentTask.Key, switchTask)
9494
if err != nil {
95-
d.TaskSupport.SetTaskStatus(currentTask.Key, ctx.FaultedStatus)
95+
taskSupport.SetTaskStatus(currentTask.Key, ctx.FaultedStatus)
9696
return output, err
9797
}
98-
d.TaskSupport.SetTaskStatus(currentTask.Key, ctx.CompletedStatus)
98+
taskSupport.SetTaskStatus(currentTask.Key, ctx.CompletedStatus)
9999

100100
// Process FlowDirective: update idx/currentTask accordingly
101-
idx, currentTask = tasks.KeyAndIndex(flowDirective.Value)
101+
idx, currentTask = d.TaskList.KeyAndIndex(flowDirective.Value)
102102
if currentTask == nil {
103103
return nil, fmt.Errorf("flow directive target '%s' not found", flowDirective.Value)
104104
}
105105
continue
106106
}
107107

108-
runner, err := NewTaskRunner(currentTask.Key, currentTask.Task, d.TaskSupport)
108+
runner, err := NewTaskRunner(currentTask.Key, currentTask.Task, taskSupport.GetWorkflowDef())
109109
if err != nil {
110110
return output, err
111111
}
112112

113-
d.TaskSupport.SetTaskStatus(currentTask.Key, ctx.RunningStatus)
114-
if output, err = d.runTask(input, runner, currentTask.Task.GetBase()); err != nil {
115-
d.TaskSupport.SetTaskStatus(currentTask.Key, ctx.FaultedStatus)
113+
taskSupport.SetTaskStatus(currentTask.Key, ctx.RunningStatus)
114+
if output, err = d.runTask(input, taskSupport, runner, currentTask.Task.GetBase()); err != nil {
115+
taskSupport.SetTaskStatus(currentTask.Key, ctx.FaultedStatus)
116116
return output, err
117117
}
118118

119-
d.TaskSupport.SetTaskStatus(currentTask.Key, ctx.CompletedStatus)
119+
taskSupport.SetTaskStatus(currentTask.Key, ctx.CompletedStatus)
120120
input = deepCloneValue(output)
121-
idx, currentTask = tasks.Next(idx)
121+
idx, currentTask = d.TaskList.Next(idx)
122122
}
123123

124124
return output, nil
125125
}
126126

127-
func (d *DoTaskRunner) shouldRunTask(input interface{}, task *model.TaskItem) (bool, error) {
127+
func (d *DoTaskRunner) shouldRunTask(input interface{}, taskSupport TaskSupport, task *model.TaskItem) (bool, error) {
128128
if task.GetBase().If != nil {
129-
output, err := traverseAndEvaluateBool(task.GetBase().If.String(), input, d.TaskSupport.GetContext())
129+
output, err := traverseAndEvaluateBool(task.GetBase().If.String(), input, taskSupport.GetContext())
130130
if err != nil {
131131
return false, model.NewErrExpression(err, task.Key)
132132
}
@@ -135,15 +135,15 @@ func (d *DoTaskRunner) shouldRunTask(input interface{}, task *model.TaskItem) (b
135135
return true, nil
136136
}
137137

138-
func (d *DoTaskRunner) evaluateSwitchTask(input interface{}, taskKey string, switchTask *model.SwitchTask) (*model.FlowDirective, error) {
138+
func (d *DoTaskRunner) evaluateSwitchTask(input interface{}, taskSupport TaskSupport, taskKey string, switchTask *model.SwitchTask) (*model.FlowDirective, error) {
139139
var defaultThen *model.FlowDirective
140140
for _, switchItem := range switchTask.Switch {
141141
for _, switchCase := range switchItem {
142142
if switchCase.When == nil {
143143
defaultThen = switchCase.Then
144144
continue
145145
}
146-
result, err := traverseAndEvaluateBool(model.NormalizeExpr(switchCase.When.String()), input, d.TaskSupport.GetContext())
146+
result, err := traverseAndEvaluateBool(model.NormalizeExpr(switchCase.When.String()), input, taskSupport.GetContext())
147147
if err != nil {
148148
return nil, model.NewErrExpression(err, taskKey)
149149
}
@@ -162,39 +162,39 @@ func (d *DoTaskRunner) evaluateSwitchTask(input interface{}, taskKey string, swi
162162
}
163163

164164
// runTask executes an individual task.
165-
func (d *DoTaskRunner) runTask(input interface{}, runner TaskRunner, task *model.TaskBase) (output interface{}, err error) {
165+
func (d *DoTaskRunner) runTask(input interface{}, taskSupport TaskSupport, runner TaskRunner, task *model.TaskBase) (output interface{}, err error) {
166166
taskName := runner.GetTaskName()
167167

168-
d.TaskSupport.SetTaskStartedAt(time.Now())
169-
d.TaskSupport.SetTaskRawInput(input)
170-
d.TaskSupport.SetTaskName(taskName)
168+
taskSupport.SetTaskStartedAt(time.Now())
169+
taskSupport.SetTaskRawInput(input)
170+
taskSupport.SetTaskName(taskName)
171171

172172
if task.Input != nil {
173-
if input, err = d.processTaskInput(task, input, taskName); err != nil {
173+
if input, err = d.processTaskInput(task, input, taskSupport, taskName); err != nil {
174174
return nil, err
175175
}
176176
}
177177

178-
output, err = runner.Run(input)
178+
output, err = runner.Run(input, taskSupport)
179179
if err != nil {
180180
return nil, err
181181
}
182182

183-
d.TaskSupport.SetTaskRawOutput(output)
183+
taskSupport.SetTaskRawOutput(output)
184184

185-
if output, err = d.processTaskOutput(task, output, taskName); err != nil {
185+
if output, err = d.processTaskOutput(task, output, taskSupport, taskName); err != nil {
186186
return nil, err
187187
}
188188

189-
if err = d.processTaskExport(task, output, taskName); err != nil {
189+
if err = d.processTaskExport(task, output, taskSupport, taskName); err != nil {
190190
return nil, err
191191
}
192192

193193
return output, nil
194194
}
195195

196196
// processTaskInput processes task input validation and transformation.
197-
func (d *DoTaskRunner) processTaskInput(task *model.TaskBase, taskInput interface{}, taskName string) (output interface{}, err error) {
197+
func (d *DoTaskRunner) processTaskInput(task *model.TaskBase, taskInput interface{}, taskSupport TaskSupport, taskName string) (output interface{}, err error) {
198198
if task.Input == nil {
199199
return taskInput, nil
200200
}
@@ -203,20 +203,20 @@ func (d *DoTaskRunner) processTaskInput(task *model.TaskBase, taskInput interfac
203203
return nil, err
204204
}
205205

206-
if output, err = traverseAndEvaluate(task.Input.From, taskInput, taskName, d.TaskSupport.GetContext()); err != nil {
206+
if output, err = traverseAndEvaluate(task.Input.From, taskInput, taskName, taskSupport.GetContext()); err != nil {
207207
return nil, err
208208
}
209209

210210
return output, nil
211211
}
212212

213213
// processTaskOutput processes task output validation and transformation.
214-
func (d *DoTaskRunner) processTaskOutput(task *model.TaskBase, taskOutput interface{}, taskName string) (output interface{}, err error) {
214+
func (d *DoTaskRunner) processTaskOutput(task *model.TaskBase, taskOutput interface{}, taskSupport TaskSupport, taskName string) (output interface{}, err error) {
215215
if task.Output == nil {
216216
return taskOutput, nil
217217
}
218218

219-
if output, err = traverseAndEvaluate(task.Output.As, taskOutput, taskName, d.TaskSupport.GetContext()); err != nil {
219+
if output, err = traverseAndEvaluate(task.Output.As, taskOutput, taskName, taskSupport.GetContext()); err != nil {
220220
return nil, err
221221
}
222222

@@ -227,12 +227,12 @@ func (d *DoTaskRunner) processTaskOutput(task *model.TaskBase, taskOutput interf
227227
return output, nil
228228
}
229229

230-
func (d *DoTaskRunner) processTaskExport(task *model.TaskBase, taskOutput interface{}, taskName string) (err error) {
230+
func (d *DoTaskRunner) processTaskExport(task *model.TaskBase, taskOutput interface{}, taskSupport TaskSupport, taskName string) (err error) {
231231
if task.Export == nil {
232232
return nil
233233
}
234234

235-
output, err := traverseAndEvaluate(task.Export.As, taskOutput, taskName, d.TaskSupport.GetContext())
235+
output, err := traverseAndEvaluate(task.Export.As, taskOutput, taskName, taskSupport.GetContext())
236236
if err != nil {
237237
return err
238238
}
@@ -241,7 +241,7 @@ func (d *DoTaskRunner) processTaskExport(task *model.TaskBase, taskOutput interf
241241
return nil
242242
}
243243

244-
d.TaskSupport.SetWorkflowInstanceCtx(output)
244+
taskSupport.SetWorkflowInstanceCtx(output)
245245

246246
return nil
247247
}

0 commit comments

Comments
 (0)