Skip to content

Commit ac85f3a

Browse files
authored
Merge pull request #6 from Azure/haitao/withRetry
Haitao/with retry
2 parents 878c42a + a5643d6 commit ac85f3a

14 files changed

+759
-352
lines changed

README.md

Lines changed: 43 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ AsyncJob aiming to help you organize code in dependencyGraph(DAG), instead of a
1010
**Step** is a individual code block which can be executed and have inputs, output.
1111
- a step would be started once all it's dependency is finished.
1212
- output of a step can be feed into next step as input, type is checked by go generics.
13-
- step is wrapped in [AsyncTask](github.com/Azure/go-asynctask) with strongType info preserved
13+
- step is wrapped in [AsyncTask](https://github.com/Azure/go-asynctask) with strongType info preserved
1414
- you can feed parameters as a step as well.
1515

1616
# Usage
@@ -38,40 +38,57 @@ AsyncJob aiming to help you organize code in dependencyGraph(DAG), instead of a
3838
# summarize
3939
StepAfterBoth(bCtx, job, "summarize", qery1ResultTask, qery2ResultTask, jobLib.SummarizeQueryResult)
4040

41-
# visualize the job
42-
dotGraph := job.Visualize()
43-
fmt.Println(dotGraph)
44-
4541
# execute job
4642
job.Start(context.Background())
4743
job.Wait(context.WithTimeout(context.Background(), 10*time.Second))
4844
```
4945

5046
### visualize of a job
51-
tried https://github.com/hashicorp/terraform/tree/main/internal/dag, which doesn't have own go module, but terraform go module have too much dependencies.
52-
baking a inhouse one.
47+
```
48+
# visualize the job
49+
dotGraph := job.Visualize()
50+
fmt.Println(dotGraph)
51+
```
5352

5453
```
5554
digraph {
56-
compound = "true"
5755
newrank = "true"
58-
subgraph "root" {
59-
"[root] [Start]" -> "[root] getConnection"
60-
"[root] [Start]" -> "[root] param_query1"
61-
"[root] [Start]" -> "[root] param_query2"
62-
"[root] [Start]" -> "[root] param_table1"
63-
"[root] [Start]" -> "[root] param_table2"
64-
"[root] getConnection" -> "[root] getTableClient1"
65-
"[root] getConnection" -> "[root] getTableClient2"
66-
"[root] getTableClient1" -> "[root] queryTable1"
67-
"[root] getTableClient2" -> "[root] queryTable2"
68-
"[root] param_query1" -> "[root] queryTable1"
69-
"[root] param_query2" -> "[root] queryTable2"
70-
"[root] param_table1" -> "[root] getTableClient1"
71-
"[root] param_table2" -> "[root] getTableClient2"
72-
"[root] queryTable1" -> "[root] summarize"
73-
"[root] queryTable2" -> "[root] summarize"
74-
}
56+
param_table1 [label="table1" shape=hexagon style=filled tooltip="Type: param\nName: table1\nState: completed\nStartAt: 2022-11-03T00:56:30.006196-07:00\nDuration: 12.657µs" fillcolor=green]
57+
param_query1 [label="query1" shape=hexagon style=filled tooltip="Type: param\nName: query1\nState: completed\nStartAt: 2022-11-03T00:56:30.0062-07:00\nDuration: 17.013µs" fillcolor=green]
58+
root_job [label="job" shape=triangle style=filled tooltip="Type: root\nName: job\nState: completed\nStartAt: 2022-11-03T00:56:30.006183-07:00\nDuration: 3.695µs" fillcolor=green]
59+
param_query2 [label="query2" shape=hexagon style=filled tooltip="Type: param\nName: query2\nState: completed\nStartAt: 2022-11-03T00:56:30.006197-07:00\nDuration: 13.781µs" fillcolor=green]
60+
task_getTableClient1 [label="getTableClient1" shape=box style=filled tooltip="Type: task\nName: getTableClient1\nState: completed\nStartAt: 2022-11-03T00:56:30.006304-07:00\nDuration: 34.652µs" fillcolor=green]
61+
task_queryTable1 [label="queryTable1" shape=box style=filled tooltip="Type: task\nName: queryTable1\nState: completed\nStartAt: 2022-11-03T00:56:30.006349-07:00\nDuration: 3.217443247s" fillcolor=green]
62+
param_table2 [label="table2" shape=hexagon style=filled tooltip="Type: param\nName: table2\nState: completed\nStartAt: 2022-11-03T00:56:30.006199-07:00\nDuration: 15.632µs" fillcolor=green]
63+
task_getTableClient2 [label="getTableClient2" shape=box style=filled tooltip="Type: task\nName: getTableClient2\nState: completed\nStartAt: 2022-11-03T00:56:30.00631-07:00\nDuration: 51.872µs" fillcolor=green]
64+
task_queryTable2 [label="queryTable2" shape=box style=filled tooltip="Type: task\nName: queryTable2\nState: completed\nStartAt: 2022-11-03T00:56:30.006377-07:00\nDuration: 67.814µs" fillcolor=green]
65+
task_emailNotification [label="emailNotification" shape=box style=filled tooltip="Type: task\nName: emailNotification\nState: completed\nStartAt: 2022-11-03T00:56:33.223952-07:00\nDuration: 3.92µs" fillcolor=green]
66+
param_serverName [label="serverName" shape=hexagon style=filled tooltip="Type: param\nName: serverName\nState: completed\nStartAt: 2022-11-03T00:56:30.006198-07:00\nDuration: 14.638µs" fillcolor=green]
67+
task_getConnection [label="getConnection" shape=box style=filled tooltip="Type: task\nName: getConnection\nState: completed\nStartAt: 2022-11-03T00:56:30.006231-07:00\nDuration: 62.234µs" fillcolor=green]
68+
task_checkAuth [label="checkAuth" shape=box style=filled tooltip="Type: task\nName: checkAuth\nState: completed\nStartAt: 2022-11-03T00:56:30.006212-07:00\nDuration: 650ns" fillcolor=green]
69+
task_summarize [label="summarize" shape=box style=filled tooltip="Type: task\nName: summarize\nState: completed\nStartAt: 2022-11-03T00:56:33.22392-07:00\nDuration: 4.325µs" fillcolor=green]
70+
71+
param_table1 -> task_getTableClient1 [style=bold tooltip="Time: 2022-11-03T00:56:30.006304-07:00" color=green]
72+
param_query1 -> task_queryTable1 [style=bold tooltip="Time: 2022-11-03T00:56:30.006349-07:00" color=green]
73+
param_table2 -> task_getTableClient2 [style=bold tooltip="Time: 2022-11-03T00:56:30.00631-07:00" color=green]
74+
task_getTableClient2 -> task_queryTable2 [style=bold tooltip="Time: 2022-11-03T00:56:30.006377-07:00" color=green]
75+
param_query2 -> task_queryTable2 [style=bold tooltip="Time: 2022-11-03T00:56:30.006377-07:00" color=green]
76+
task_queryTable2 -> task_summarize [style=bold tooltip="Time: 2022-11-03T00:56:33.22392-07:00" color=green]
77+
root_job -> param_serverName [style=bold tooltip="Time: 2022-11-03T00:56:30.006198-07:00" color=green]
78+
root_job -> task_checkAuth [style=bold tooltip="Time: 2022-11-03T00:56:30.006212-07:00" color=green]
79+
root_job -> param_table1 [style=bold tooltip="Time: 2022-11-03T00:56:30.006196-07:00" color=green]
80+
root_job -> param_query1 [style=bold tooltip="Time: 2022-11-03T00:56:30.0062-07:00" color=green]
81+
root_job -> param_table2 [style=bold tooltip="Time: 2022-11-03T00:56:30.006199-07:00" color=green]
82+
root_job -> param_query2 [style=bold tooltip="Time: 2022-11-03T00:56:30.006197-07:00" color=green]
83+
param_serverName -> task_getConnection [style=bold tooltip="Time: 2022-11-03T00:56:30.006231-07:00" color=green]
84+
task_getTableClient1 -> task_queryTable1 [style=bold tooltip="Time: 2022-11-03T00:56:30.006349-07:00" color=green]
85+
task_queryTable1 -> task_summarize [style=bold tooltip="Time: 2022-11-03T00:56:33.22392-07:00" color=green]
86+
task_summarize -> task_emailNotification [style=bold tooltip="Time: 2022-11-03T00:56:33.223952-07:00" color=green]
87+
task_getConnection -> task_getTableClient1 [style=bold tooltip="Time: 2022-11-03T00:56:30.006304-07:00" color=green]
88+
task_getConnection -> task_getTableClient2 [style=bold tooltip="Time: 2022-11-03T00:56:30.00631-07:00" color=green]
89+
task_checkAuth -> task_queryTable1 [style=bold tooltip="Time: 2022-11-03T00:56:30.006349-07:00" color=green]
90+
task_checkAuth -> task_queryTable2 [style=bold tooltip="Time: 2022-11-03T00:56:30.006377-07:00" color=green]
91+
7592
}
7693
```
77-
![visualize job graph](media/graphviz.svg)
94+
![visualize job graph](media/asyncjob.svg)

error.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package asyncjob
2+
3+
import (
4+
"fmt"
5+
)
6+
7+
type JobErrorCode string
8+
9+
const (
10+
ErrPrecedentStepFailure JobErrorCode = "precedent step failed"
11+
ErrStepFailed JobErrorCode = "current step failed"
12+
)
13+
14+
func (code JobErrorCode) Error() string {
15+
return string(code)
16+
}
17+
18+
type JobError struct {
19+
Code JobErrorCode
20+
StepError error
21+
StepName string
22+
Message string
23+
}
24+
25+
func newJobError(code JobErrorCode, message string) *JobError {
26+
return &JobError{Code: code, Message: message}
27+
}
28+
29+
func newStepError(stepName string, stepErr error) *JobError {
30+
return &JobError{Code: ErrStepFailed, StepName: stepName, StepError: stepErr}
31+
}
32+
33+
func (je *JobError) Error() string {
34+
if je.Code == ErrStepFailed && je.StepError != nil {
35+
return fmt.Sprintf("step %q failed: %s", je.StepName, je.StepError.Error())
36+
}
37+
return je.Code.Error() + ": " + je.Message
38+
}
39+
40+
func (je *JobError) Unwrap() error {
41+
if je.Code == ErrStepFailed {
42+
return je.StepError
43+
}
44+
45+
return je.Code
46+
}

graph/template.go

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -10,43 +10,9 @@ import (
1010
var digraphTemplate = template.Must(template.New("digraph").Parse(digraphTemplateText))
1111

1212
const digraphTemplateText = `digraph {
13-
compound = "true"
1413
newrank = "true"
15-
subgraph "root" {
1614
{{ range $node := $.Nodes}} {{$node.ID}} [label="{{$node.Name}}" shape={{$node.Shape}} style={{$node.Style}} tooltip="{{$node.Tooltip}}" fillcolor={{$node.FillColor}}]
1715
{{ end }}
1816
{{ range $edge := $.Edges}} {{$edge.FromNodeID}} -> {{$edge.ToNodeID}} [style={{$edge.Style}} tooltip="{{$edge.Tooltip}}" color={{$edge.Color}}]
19-
{{ end }}
20-
}
17+
{{ end }}
2118
}`
22-
23-
/* ideal output
24-
digraph G {
25-
jobroot [shape=triangle style=filled fillcolor=gold tooltip="State: Failed\nDuration:5s"]
26-
param_servername [label="servername" shape=doublecircle style=filled fillcolor=green tooltip="Value: dummy.server.io"]
27-
param_table1 [label="table1" shape=doublecircle style=filled fillcolor=green tooltip="Value: table1"]
28-
param_query1 [label="query1" shape=doublecircle style=filled fillcolor=green tooltip="Value: select * from table1"]
29-
param_table2 [label="table2" shape=doublecircle style=filled fillcolor=green tooltip="Value: table2"]
30-
param_query2 [label="query2" shape=doublecircle style=filled fillcolor=green tooltip="Value: select * from table2"]
31-
jobroot -> param_servername [tooltip="time:2022-10-28T21:16:07Z"]
32-
param_servername -> func_getConnection
33-
func_getConnection [label="getConnection" shape=ellipse style=filled fillcolor=green tooltip="State: Finished\nDuration:1s"]
34-
func_query1 [label="query1" shape=ellipse style=filled fillcolor=green tooltip="State: Finished\nDuration:2s"]
35-
func_query2 [label="query2" shape=ellipse style=filled fillcolor=red tooltip="State: Failed\nDuration:2s"]
36-
jobroot -> param_table1 [style=bold color=green tooltip="time:2022-10-28T21:16:07Z"]
37-
param_table1 -> func_query1 [tooltip="time:2022-10-28T21:16:07Z"]
38-
jobroot -> param_query1 [tooltip="time:2022-10-28T21:16:07Z"]
39-
param_query1 -> func_query1
40-
jobroot -> param_table2 [tooltip="time:2022-10-28T21:16:07Z"]
41-
param_table2 -> func_query2
42-
jobroot -> param_query2 [tooltip="time:2022-10-28T21:16:07Z"]
43-
param_query2 -> func_query2
44-
func_getConnection -> func_query1
45-
func_query1 -> func_summarize
46-
func_getConnection -> func_query2
47-
func_query2 -> func_summarize [color=red]
48-
func_summarize [label="summarize" shape=ellipse style=filled fillcolor=red tooltip="State: Blocked"]
49-
func_email [label="email" shape=ellipse style=filled tooltip="State: Pending"]
50-
func_summarize -> func_email [style=dotted]
51-
}
52-
*/

graph_node.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func (sn *stepNode) getShape() string {
3333
case stepTypeRoot:
3434
return "triangle"
3535
case stepTypeParam:
36-
return "doublecircle"
36+
return "hexagon"
3737
case stepTypeTask:
3838
return "box"
3939
default:

job.go

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -116,18 +116,28 @@ func AddStep[T any](bCtx context.Context, j *Job, stepName string, stepFunc asyn
116116
step.executionData.StartTime = time.Now()
117117
step.state = StepStateFailed
118118
step.executionData.Duration = 0 */
119-
return nil, err
119+
return nil, newJobError(ErrPrecedentStepFailure, "")
120120
}
121121
step.executionData.StartTime = time.Now()
122122
step.state = StepStateRunning
123-
result, err := stepFunc(j.runtimeCtx)
123+
124+
var result *T
125+
var err error
126+
if step.executionOptions.RetryPolicy != nil {
127+
step.executionData.Retried = &RetryReport{}
128+
result, err = newRetryer(step.executionOptions.RetryPolicy, step.executionData.Retried, func() (*T, error) { return stepFunc(j.runtimeCtx) }).Run()
129+
} else {
130+
result, err = stepFunc(j.runtimeCtx)
131+
}
132+
124133
if err != nil {
125134
step.state = StepStateFailed
126135
} else {
127136
step.state = StepStateCompleted
128137
}
138+
129139
step.executionData.Duration = time.Since(step.executionData.StartTime)
130-
return result, err
140+
return result, newStepError(stepName, err)
131141
}
132142

133143
step.task = asynctask.Start(bCtx, instrumentedFunc)
@@ -171,18 +181,27 @@ func StepAfter[T, S any](bCtx context.Context, j *Job, stepName string, parentSt
171181
step.executionData.StartTime = time.Now()
172182
step.state = StepStateFailed
173183
step.executionData.Duration = 0 */
174-
return nil, err
184+
return nil, newJobError(ErrPrecedentStepFailure, "")
175185
}
176186
step.executionData.StartTime = time.Now()
177187
step.state = StepStateRunning
178-
result, err := stepFunc(j.runtimeCtx, t)
188+
var result *S
189+
var err error
190+
if step.executionOptions.RetryPolicy != nil {
191+
step.executionData.Retried = &RetryReport{}
192+
result, err = newRetryer(step.executionOptions.RetryPolicy, step.executionData.Retried, func() (*S, error) { return stepFunc(j.runtimeCtx, t) }).Run()
193+
} else {
194+
result, err = stepFunc(j.runtimeCtx, t)
195+
}
196+
179197
if err != nil {
180198
step.state = StepStateFailed
181199
} else {
182200
step.state = StepStateCompleted
183201
}
202+
184203
step.executionData.Duration = time.Since(step.executionData.StartTime)
185-
return result, err
204+
return result, newStepError(stepName, err)
186205
}
187206

188207
step.task = asynctask.ContinueWith(bCtx, parentStep.task, instrumentedFunc)
@@ -228,19 +247,29 @@ func StepAfterBoth[T, S, R any](bCtx context.Context, j *Job, stepName string, p
228247
step.executionData.StartTime = time.Now()
229248
step.state = StepStateFailed
230249
step.executionData.Duration = 0 */
231-
return nil, err
250+
return nil, newJobError(ErrPrecedentStepFailure, "")
232251
}
233252

234253
step.executionData.StartTime = time.Now()
235254
step.state = StepStateRunning
236-
result, err := stepFunc(j.runtimeCtx, t, s)
255+
256+
var result *R
257+
var err error
258+
if step.executionOptions.RetryPolicy != nil {
259+
step.executionData.Retried = &RetryReport{}
260+
result, err = newRetryer(step.executionOptions.RetryPolicy, step.executionData.Retried, func() (*R, error) { return stepFunc(j.runtimeCtx, t, s) }).Run()
261+
} else {
262+
result, err = stepFunc(j.runtimeCtx, t, s)
263+
}
264+
237265
if err != nil {
238266
step.state = StepStateFailed
239267
} else {
240268
step.state = StepStateCompleted
241269
}
270+
242271
step.executionData.Duration = time.Since(step.executionData.StartTime)
243-
return result, err
272+
return result, newStepError(stepName, err)
244273
}
245274

246275
step.task = asynctask.AfterBoth(bCtx, parentStepT.task, parentStepS.task, instrumentedFunc)

0 commit comments

Comments
 (0)