Skip to content

Commit 4a0216e

Browse files
committed
Refactoring run method
Signed-off-by: fjtirado <[email protected]>
1 parent 45bb41e commit 4a0216e

9 files changed

+167
-125
lines changed

impl/runner.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -137,12 +137,12 @@ func (wr *workflowRunnerImpl) Run(input interface{}) (output interface{}, err er
137137
wr.RunnerCtx.SetInput(input)
138138
// Run tasks sequentially
139139
wr.RunnerCtx.SetStatus(ctx.RunningStatus)
140-
doRunner, err := NewDoTaskRunner(wr.Workflow.Do, wr)
140+
doRunner, err := NewDoTaskRunner(wr.Workflow.Do, wr.GetWorkflowDef())
141141
if err != nil {
142142
return nil, err
143143
}
144144
wr.RunnerCtx.SetStartedAt(time.Now())
145-
output, err = doRunner.Run(wr.RunnerCtx.GetInput())
145+
output, err = doRunner.Run(wr.RunnerCtx.GetInput(), wr)
146146
if err != nil {
147147
return nil, err
148148
}

impl/task_runner.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ var _ TaskRunner = &ForTaskRunner{}
2828
var _ TaskRunner = &DoTaskRunner{}
2929

3030
type TaskRunner interface {
31-
Run(input interface{}) (interface{}, error)
31+
Run(input interface{}, taskSupport TaskSupport) (interface{}, error)
3232
GetTaskName() string
3333
}
3434

impl/task_runner_call_http.go

+44
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Copyright 2025 The Serverless Workflow Specification Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package impl
16+
17+
import (
18+
"fmt"
19+
20+
"github.com/serverlessworkflow/sdk-go/v3/model"
21+
)
22+
23+
type CallHTTPTaskRunner struct {
24+
TaskName string
25+
}
26+
27+
func NewCallHttpRunner(taskName string, task *model.CallHTTP) (taskRunner *CallHTTPTaskRunner, err error) {
28+
if task == nil {
29+
err = model.NewErrValidation(fmt.Errorf("invalid For task %s", taskName), taskName)
30+
} else {
31+
taskRunner = new(CallHTTPTaskRunner)
32+
taskRunner.TaskName = taskName
33+
}
34+
return
35+
}
36+
37+
func (f *CallHTTPTaskRunner) Run(input interface{}, taskSupport TaskSupport) (interface{}, error) {
38+
return input, nil
39+
40+
}
41+
42+
func (f *CallHTTPTaskRunner) GetTaskName() string {
43+
return f.TaskName
44+
}

impl/task_runner_do.go

+53-51
Original file line numberDiff line numberDiff line change
@@ -23,110 +23,112 @@ import (
2323
)
2424

2525
// NewTaskRunner creates a TaskRunner instance based on the task type.
26-
func NewTaskRunner(taskName string, task model.Task, taskSupport TaskSupport) (TaskRunner, error) {
26+
func NewTaskRunner(taskName string, task model.Task, workflowDef *model.Workflow) (TaskRunner, error) {
2727
switch t := task.(type) {
2828
case *model.SetTask:
29-
return NewSetTaskRunner(taskName, t, taskSupport)
29+
return NewSetTaskRunner(taskName, t)
3030
case *model.RaiseTask:
31-
return NewRaiseTaskRunner(taskName, t, taskSupport)
31+
return NewRaiseTaskRunner(taskName, t, workflowDef)
3232
case *model.DoTask:
33-
return NewDoTaskRunner(t.Do, taskSupport)
33+
return NewDoTaskRunner(t.Do, workflowDef)
3434
case *model.ForTask:
35-
return NewForTaskRunner(taskName, t, taskSupport)
35+
return NewForTaskRunner(taskName, t, workflowDef)
36+
case *model.CallHTTP:
37+
return NewCallHttpRunner(taskName, t)
3638
default:
3739
return nil, fmt.Errorf("unsupported task type '%T' for task '%s'", t, taskName)
3840
}
3941
}
4042

41-
func NewDoTaskRunner(taskList *model.TaskList, taskSupport TaskSupport) (*DoTaskRunner, error) {
43+
func NewDoTaskRunner(taskList *model.TaskList, workflowDef *model.Workflow) (*DoTaskRunner, error) {
4244
return &DoTaskRunner{
4345
TaskList: taskList,
44-
TaskSupport: taskSupport,
46+
WorkflowDef: workflowDef,
4547
}, nil
4648
}
4749

4850
type DoTaskRunner struct {
4951
TaskList *model.TaskList
50-
TaskSupport TaskSupport
52+
WorkflowDef *model.Workflow
5153
}
5254

53-
func (d *DoTaskRunner) Run(input interface{}) (output interface{}, err error) {
55+
func (d *DoTaskRunner) Run(input interface{}, taskSupport TaskSupport) (output interface{}, err error) {
5456
if d.TaskList == nil {
5557
return input, nil
5658
}
57-
return d.runTasks(input, d.TaskList)
59+
return d.runTasks(input, taskSupport)
5860
}
5961

6062
func (d *DoTaskRunner) GetTaskName() string {
6163
return ""
6264
}
6365

6466
// runTasks runs all defined tasks sequentially.
65-
func (d *DoTaskRunner) runTasks(input interface{}, tasks *model.TaskList) (output interface{}, err error) {
67+
func (d *DoTaskRunner) runTasks(input interface{}, taskSupport TaskSupport) (output interface{}, err error) {
6668
output = input
67-
if tasks == nil {
69+
if d.TaskList == nil {
6870
return output, nil
6971
}
7072

7173
idx := 0
72-
currentTask := (*tasks)[idx]
74+
currentTask := (*d.TaskList)[idx]
7375

7476
for currentTask != nil {
75-
if err = d.TaskSupport.SetTaskDef(currentTask); err != nil {
77+
if err = taskSupport.SetTaskDef(currentTask); err != nil {
7678
return nil, err
7779
}
78-
if err = d.TaskSupport.SetTaskReferenceFromName(currentTask.Key); err != nil {
80+
if err = taskSupport.SetTaskReferenceFromName(currentTask.Key); err != nil {
7981
return nil, err
8082
}
8183

82-
if shouldRun, err := d.shouldRunTask(input, currentTask); err != nil {
84+
if shouldRun, err := d.shouldRunTask(input, taskSupport, currentTask); err != nil {
8385
return output, err
8486
} else if !shouldRun {
85-
idx, currentTask = tasks.Next(idx)
87+
idx, currentTask = d.TaskList.Next(idx)
8688
continue
8789
}
8890

89-
d.TaskSupport.SetTaskStatus(currentTask.Key, ctx.PendingStatus)
91+
taskSupport.SetTaskStatus(currentTask.Key, ctx.PendingStatus)
9092

9193
// Check if this task is a SwitchTask and handle it
9294
if switchTask, ok := currentTask.Task.(*model.SwitchTask); ok {
93-
flowDirective, err := d.evaluateSwitchTask(input, currentTask.Key, switchTask)
95+
flowDirective, err := d.evaluateSwitchTask(input, taskSupport, currentTask.Key, switchTask)
9496
if err != nil {
95-
d.TaskSupport.SetTaskStatus(currentTask.Key, ctx.FaultedStatus)
97+
taskSupport.SetTaskStatus(currentTask.Key, ctx.FaultedStatus)
9698
return output, err
9799
}
98-
d.TaskSupport.SetTaskStatus(currentTask.Key, ctx.CompletedStatus)
100+
taskSupport.SetTaskStatus(currentTask.Key, ctx.CompletedStatus)
99101

100102
// Process FlowDirective: update idx/currentTask accordingly
101-
idx, currentTask = tasks.KeyAndIndex(flowDirective.Value)
103+
idx, currentTask = d.TaskList.KeyAndIndex(flowDirective.Value)
102104
if currentTask == nil {
103105
return nil, fmt.Errorf("flow directive target '%s' not found", flowDirective.Value)
104106
}
105107
continue
106108
}
107109

108-
runner, err := NewTaskRunner(currentTask.Key, currentTask.Task, d.TaskSupport)
110+
runner, err := NewTaskRunner(currentTask.Key, currentTask.Task, taskSupport.GetWorkflowDef())
109111
if err != nil {
110112
return output, err
111113
}
112114

113-
d.TaskSupport.SetTaskStatus(currentTask.Key, ctx.RunningStatus)
114-
if output, err = d.runTask(input, runner, currentTask.Task.GetBase()); err != nil {
115-
d.TaskSupport.SetTaskStatus(currentTask.Key, ctx.FaultedStatus)
115+
taskSupport.SetTaskStatus(currentTask.Key, ctx.RunningStatus)
116+
if output, err = d.runTask(input, taskSupport, runner, currentTask.Task.GetBase()); err != nil {
117+
taskSupport.SetTaskStatus(currentTask.Key, ctx.FaultedStatus)
116118
return output, err
117119
}
118120

119-
d.TaskSupport.SetTaskStatus(currentTask.Key, ctx.CompletedStatus)
121+
taskSupport.SetTaskStatus(currentTask.Key, ctx.CompletedStatus)
120122
input = deepCloneValue(output)
121-
idx, currentTask = tasks.Next(idx)
123+
idx, currentTask = d.TaskList.Next(idx)
122124
}
123125

124126
return output, nil
125127
}
126128

127-
func (d *DoTaskRunner) shouldRunTask(input interface{}, task *model.TaskItem) (bool, error) {
129+
func (d *DoTaskRunner) shouldRunTask(input interface{}, taskSupport TaskSupport, task *model.TaskItem) (bool, error) {
128130
if task.GetBase().If != nil {
129-
output, err := traverseAndEvaluateBool(task.GetBase().If.String(), input, d.TaskSupport.GetContext())
131+
output, err := traverseAndEvaluateBool(task.GetBase().If.String(), input, taskSupport.GetContext())
130132
if err != nil {
131133
return false, model.NewErrExpression(err, task.Key)
132134
}
@@ -135,15 +137,15 @@ func (d *DoTaskRunner) shouldRunTask(input interface{}, task *model.TaskItem) (b
135137
return true, nil
136138
}
137139

138-
func (d *DoTaskRunner) evaluateSwitchTask(input interface{}, taskKey string, switchTask *model.SwitchTask) (*model.FlowDirective, error) {
140+
func (d *DoTaskRunner) evaluateSwitchTask(input interface{}, taskSupport TaskSupport, taskKey string, switchTask *model.SwitchTask) (*model.FlowDirective, error) {
139141
var defaultThen *model.FlowDirective
140142
for _, switchItem := range switchTask.Switch {
141143
for _, switchCase := range switchItem {
142144
if switchCase.When == nil {
143145
defaultThen = switchCase.Then
144146
continue
145147
}
146-
result, err := traverseAndEvaluateBool(model.NormalizeExpr(switchCase.When.String()), input, d.TaskSupport.GetContext())
148+
result, err := traverseAndEvaluateBool(model.NormalizeExpr(switchCase.When.String()), input, taskSupport.GetContext())
147149
if err != nil {
148150
return nil, model.NewErrExpression(err, taskKey)
149151
}
@@ -162,86 +164,86 @@ func (d *DoTaskRunner) evaluateSwitchTask(input interface{}, taskKey string, swi
162164
}
163165

164166
// runTask executes an individual task.
165-
func (d *DoTaskRunner) runTask(input interface{}, runner TaskRunner, task *model.TaskBase) (output interface{}, err error) {
167+
func (d *DoTaskRunner) runTask(input interface{}, taskSupport TaskSupport, runner TaskRunner, task *model.TaskBase) (output interface{}, err error) {
166168
taskName := runner.GetTaskName()
167169

168-
d.TaskSupport.SetTaskStartedAt(time.Now())
169-
d.TaskSupport.SetTaskRawInput(input)
170-
d.TaskSupport.SetTaskName(taskName)
170+
taskSupport.SetTaskStartedAt(time.Now())
171+
taskSupport.SetTaskRawInput(input)
172+
taskSupport.SetTaskName(taskName)
171173

172174
if task.Input != nil {
173-
if input, err = d.processTaskInput(task, input, taskName); err != nil {
175+
if input, err = d.processTaskInput(task, input, taskSupport); err != nil {
174176
return nil, err
175177
}
176178
}
177179

178-
output, err = runner.Run(input)
180+
output, err = runner.Run(input, taskSupport)
179181
if err != nil {
180182
return nil, err
181183
}
182184

183-
d.TaskSupport.SetTaskRawOutput(output)
185+
taskSupport.SetTaskRawOutput(output)
184186

185-
if output, err = d.processTaskOutput(task, output, taskName); err != nil {
187+
if output, err = d.processTaskOutput(task, output, taskSupport); err != nil {
186188
return nil, err
187189
}
188190

189-
if err = d.processTaskExport(task, output, taskName); err != nil {
191+
if err = d.processTaskExport(task, output, taskSupport); err != nil {
190192
return nil, err
191193
}
192194

193195
return output, nil
194196
}
195197

196198
// processTaskInput processes task input validation and transformation.
197-
func (d *DoTaskRunner) processTaskInput(task *model.TaskBase, taskInput interface{}, taskName string) (output interface{}, err error) {
199+
func (d *DoTaskRunner) processTaskInput(task *model.TaskBase, taskInput interface{}, taskSupport TaskSupport) (output interface{}, err error) {
198200
if task.Input == nil {
199201
return taskInput, nil
200202
}
201203

202-
if err = validateSchema(taskInput, task.Input.Schema, taskName); err != nil {
204+
if err = validateSchema(taskInput, task.Input.Schema, d.GetTaskName()); err != nil {
203205
return nil, err
204206
}
205207

206-
if output, err = traverseAndEvaluate(task.Input.From, taskInput, taskName, d.TaskSupport.GetContext()); err != nil {
208+
if output, err = traverseAndEvaluate(task.Input.From, taskInput, d.GetTaskName(), taskSupport.GetContext()); err != nil {
207209
return nil, err
208210
}
209211

210212
return output, nil
211213
}
212214

213215
// processTaskOutput processes task output validation and transformation.
214-
func (d *DoTaskRunner) processTaskOutput(task *model.TaskBase, taskOutput interface{}, taskName string) (output interface{}, err error) {
216+
func (d *DoTaskRunner) processTaskOutput(task *model.TaskBase, taskOutput interface{}, taskSupport TaskSupport) (output interface{}, err error) {
215217
if task.Output == nil {
216218
return taskOutput, nil
217219
}
218220

219-
if output, err = traverseAndEvaluate(task.Output.As, taskOutput, taskName, d.TaskSupport.GetContext()); err != nil {
221+
if output, err = traverseAndEvaluate(task.Output.As, taskOutput, d.GetTaskName(), taskSupport.GetContext()); err != nil {
220222
return nil, err
221223
}
222224

223-
if err = validateSchema(output, task.Output.Schema, taskName); err != nil {
225+
if err = validateSchema(output, task.Output.Schema, d.GetTaskName()); err != nil {
224226
return nil, err
225227
}
226228

227229
return output, nil
228230
}
229231

230-
func (d *DoTaskRunner) processTaskExport(task *model.TaskBase, taskOutput interface{}, taskName string) (err error) {
232+
func (d *DoTaskRunner) processTaskExport(task *model.TaskBase, taskOutput interface{}, taskSupport TaskSupport) (err error) {
231233
if task.Export == nil {
232234
return nil
233235
}
234236

235-
output, err := traverseAndEvaluate(task.Export.As, taskOutput, taskName, d.TaskSupport.GetContext())
237+
output, err := traverseAndEvaluate(task.Export.As, taskOutput, d.GetTaskName(), taskSupport.GetContext())
236238
if err != nil {
237239
return err
238240
}
239241

240-
if err = validateSchema(output, task.Export.Schema, taskName); err != nil {
242+
if err = validateSchema(output, task.Export.Schema, d.GetTaskName()); err != nil {
241243
return nil
242244
}
243245

244-
d.TaskSupport.SetWorkflowInstanceCtx(output)
246+
taskSupport.SetWorkflowInstanceCtx(output)
245247

246248
return nil
247249
}

0 commit comments

Comments
 (0)