Skip to content

Commit e9ae7de

Browse files
committed
cleanup
1 parent 3644589 commit e9ae7de

File tree

6 files changed

+32
-152
lines changed

6 files changed

+32
-152
lines changed

batch.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -97,25 +97,25 @@ func (b *BatchNodeBuilder) Post(ctx context.Context, shared *SharedStore, prepRe
9797

9898
// Fluent configuration methods
9999
func (b *BatchNodeBuilder) WithMaxRetries(retries int) *BatchNodeBuilder {
100-
WithMaxRetries(retries)(b.BatchNode.CustomNode.BaseNode)
100+
WithMaxRetries(retries)(b.BaseNode)
101101
return b
102102
}
103103

104104
func (b *BatchNodeBuilder) WithWait(wait time.Duration) *BatchNodeBuilder {
105-
WithWait(wait)(b.BatchNode.CustomNode.BaseNode)
105+
WithWait(wait)(b.BaseNode)
106106
return b
107107
}
108108

109109
func (b *BatchNodeBuilder) WithBatchConcurrency(n int) *BatchNodeBuilder {
110-
b.BatchNode.CustomNode.BaseNode.batchConcurrency = n
110+
b.batchConcurrency = n
111111
return b
112112
}
113113

114114
func (b *BatchNodeBuilder) WithBatchErrorHandling(continueOnError bool) *BatchNodeBuilder {
115115
if continueOnError {
116-
b.BatchNode.CustomNode.BaseNode.batchErrorHandling = "continue"
116+
b.batchErrorHandling = "continue"
117117
} else {
118-
b.BatchNode.CustomNode.BaseNode.batchErrorHandling = "stop"
118+
b.batchErrorHandling = "stop"
119119
}
120120
return b
121121
}
@@ -126,23 +126,23 @@ func (b *BatchNodeBuilder) WithBatchErrorHandling(continueOnError bool) *BatchNo
126126
// - func(context.Context, *flyt.SharedStore, []flyt.Result, []flyt.Result) (flyt.Action, error)
127127

128128
func (b *BatchNodeBuilder) WithPrepFunc(fn func(context.Context, *SharedStore) ([]Result, error)) *BatchNodeBuilder {
129-
b.BatchNode.batchPrepFunc = fn
129+
b.batchPrepFunc = fn
130130
return b
131131
}
132132

133133
func (b *BatchNodeBuilder) WithExecFunc(fn func(context.Context, Result) (Result, error)) *BatchNodeBuilder {
134-
b.BatchNode.CustomNode.execFunc = fn
134+
b.execFunc = fn
135135
return b
136136
}
137137

138138
func (b *BatchNodeBuilder) WithPostFunc(fn func(context.Context, *SharedStore, []Result, []Result) (Action, error)) *BatchNodeBuilder {
139-
b.BatchNode.batchPostFunc = fn
139+
b.batchPostFunc = fn
140140
return b
141141
}
142142

143143
// Alternative: WithExecFuncAny for compatibility
144144
func (b *BatchNodeBuilder) WithExecFuncAny(fn func(context.Context, any) (any, error)) *BatchNodeBuilder {
145-
b.BatchNode.CustomNode.execFunc = func(ctx context.Context, prepResult Result) (Result, error) {
145+
b.execFunc = func(ctx context.Context, prepResult Result) (Result, error) {
146146
val, err := fn(ctx, prepResult.Value())
147147
if err != nil {
148148
return Result{}, err
@@ -190,20 +190,20 @@ func runBatch(ctx context.Context, node Node, shared *SharedStore) (Action, erro
190190

191191
// Get batch configuration
192192
var concurrency int
193-
var errorHandling string = "continue"
193+
var errorHandling = "continue"
194194

195195
if baseNode, ok := node.(*BaseNode); ok {
196196
concurrency = baseNode.GetBatchConcurrency()
197197
errorHandling = baseNode.GetBatchErrorHandling()
198198
} else if customNode, ok := node.(*CustomNode); ok {
199-
concurrency = customNode.BaseNode.GetBatchConcurrency()
200-
errorHandling = customNode.BaseNode.GetBatchErrorHandling()
199+
concurrency = customNode.GetBatchConcurrency()
200+
errorHandling = customNode.GetBatchErrorHandling()
201201
} else if batchNode, ok := node.(*BatchNode); ok {
202-
concurrency = batchNode.CustomNode.BaseNode.GetBatchConcurrency()
203-
errorHandling = batchNode.CustomNode.BaseNode.GetBatchErrorHandling()
202+
concurrency = batchNode.GetBatchConcurrency()
203+
errorHandling = batchNode.GetBatchErrorHandling()
204204
} else if batchBuilder, ok := node.(*BatchNodeBuilder); ok {
205-
concurrency = batchBuilder.BatchNode.CustomNode.BaseNode.GetBatchConcurrency()
206-
errorHandling = batchBuilder.BatchNode.CustomNode.BaseNode.GetBatchErrorHandling()
205+
concurrency = batchBuilder.GetBatchConcurrency()
206+
errorHandling = batchBuilder.GetBatchErrorHandling()
207207
}
208208

209209
// Execute items
@@ -303,7 +303,7 @@ func runBatchConcurrent(ctx context.Context, node Node, items []Result, results
303303

304304
func runExecWithRetries(ctx context.Context, node Node, item Result) (any, error) {
305305
// Get retry settings
306-
var maxRetries int = 1
306+
var maxRetries = 1
307307
var wait time.Duration = 0
308308

309309
if retryable, ok := node.(RetryableNode); ok {

builder.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -36,60 +36,60 @@ func (b *NodeBuilder) ExecFallback(prepResult any, err error) (any, error) {
3636

3737
// GetMaxRetries implements RetryableNode.GetMaxRetries by delegating to the embedded BaseNode
3838
func (b *NodeBuilder) GetMaxRetries() int {
39-
return b.CustomNode.BaseNode.GetMaxRetries()
39+
return b.CustomNode.GetMaxRetries()
4040
}
4141

4242
// GetWait implements RetryableNode.GetWait by delegating to the embedded BaseNode
4343
func (b *NodeBuilder) GetWait() time.Duration {
44-
return b.CustomNode.BaseNode.GetWait()
44+
return b.CustomNode.GetWait()
4545
}
4646

4747
// WithMaxRetries sets the maximum number of retries for the node's Exec phase.
4848
// Returns the builder for method chaining.
4949
func (b *NodeBuilder) WithMaxRetries(retries int) *NodeBuilder {
50-
WithMaxRetries(retries)(b.CustomNode.BaseNode)
50+
WithMaxRetries(retries)(b.BaseNode)
5151
return b
5252
}
5353

5454
// WithWait sets the wait duration between retries.
5555
// Returns the builder for method chaining.
5656
func (b *NodeBuilder) WithWait(wait time.Duration) *NodeBuilder {
57-
WithWait(wait)(b.CustomNode.BaseNode)
57+
WithWait(wait)(b.BaseNode)
5858
return b
5959
}
6060

6161
// WithPrepFunc sets a custom Prep implementation using Result types.
6262
// Returns the builder for method chaining.
6363
func (b *NodeBuilder) WithPrepFunc(fn func(context.Context, *SharedStore) (Result, error)) *NodeBuilder {
64-
b.CustomNode.prepFunc = fn
64+
b.prepFunc = fn
6565
return b
6666
}
6767

6868
// WithExecFunc sets a custom Exec implementation using Result types.
6969
// Returns the builder for method chaining.
7070
func (b *NodeBuilder) WithExecFunc(fn func(context.Context, Result) (Result, error)) *NodeBuilder {
71-
b.CustomNode.execFunc = fn
71+
b.execFunc = fn
7272
return b
7373
}
7474

7575
// WithPostFunc sets a custom Post implementation using Result types.
7676
// Returns the builder for method chaining.
7777
func (b *NodeBuilder) WithPostFunc(fn func(context.Context, *SharedStore, Result, Result) (Action, error)) *NodeBuilder {
78-
b.CustomNode.postFunc = fn
78+
b.postFunc = fn
7979
return b
8080
}
8181

8282
// WithExecFallbackFunc sets a custom ExecFallback implementation.
8383
// Returns the builder for method chaining.
8484
func (b *NodeBuilder) WithExecFallbackFunc(fn func(any, error) (any, error)) *NodeBuilder {
85-
b.CustomNode.execFallbackFunc = fn
85+
b.execFallbackFunc = fn
8686
return b
8787
}
8888

8989
// WithPrepFuncAny sets a custom Prep implementation using any types.
9090
// Returns the builder for method chaining.
9191
func (b *NodeBuilder) WithPrepFuncAny(fn func(context.Context, *SharedStore) (any, error)) *NodeBuilder {
92-
b.CustomNode.prepFunc = func(ctx context.Context, shared *SharedStore) (Result, error) {
92+
b.prepFunc = func(ctx context.Context, shared *SharedStore) (Result, error) {
9393
val, err := fn(ctx, shared)
9494
if err != nil {
9595
return Result{}, err
@@ -102,7 +102,7 @@ func (b *NodeBuilder) WithPrepFuncAny(fn func(context.Context, *SharedStore) (an
102102
// WithExecFuncAny sets a custom Exec implementation using any types.
103103
// Returns the builder for method chaining.
104104
func (b *NodeBuilder) WithExecFuncAny(fn func(context.Context, any) (any, error)) *NodeBuilder {
105-
b.CustomNode.execFunc = func(ctx context.Context, prepResult Result) (Result, error) {
105+
b.execFunc = func(ctx context.Context, prepResult Result) (Result, error) {
106106
val, err := fn(ctx, prepResult.Value())
107107
if err != nil {
108108
return Result{}, err
@@ -115,7 +115,7 @@ func (b *NodeBuilder) WithExecFuncAny(fn func(context.Context, any) (any, error)
115115
// WithPostFuncAny sets a custom Post implementation using any types.
116116
// Returns the builder for method chaining.
117117
func (b *NodeBuilder) WithPostFuncAny(fn func(context.Context, *SharedStore, any, any) (Action, error)) *NodeBuilder {
118-
b.CustomNode.postFunc = func(ctx context.Context, shared *SharedStore, prepResult, execResult Result) (Action, error) {
118+
b.postFunc = func(ctx context.Context, shared *SharedStore, prepResult, execResult Result) (Action, error) {
119119
return fn(ctx, shared, prepResult.Value(), execResult.Value())
120120
}
121121
return b
@@ -124,17 +124,17 @@ func (b *NodeBuilder) WithPostFuncAny(fn func(context.Context, *SharedStore, any
124124
// WithBatchConcurrency sets the concurrency level for batch processing.
125125
// Returns the builder for method chaining.
126126
func (b *NodeBuilder) WithBatchConcurrency(n int) *NodeBuilder {
127-
b.CustomNode.BaseNode.batchConcurrency = n
127+
b.batchConcurrency = n
128128
return b
129129
}
130130

131131
// WithBatchErrorHandling sets the error handling strategy for batch processing.
132132
// Returns the builder for method chaining.
133133
func (b *NodeBuilder) WithBatchErrorHandling(continueOnError bool) *NodeBuilder {
134134
if continueOnError {
135-
b.CustomNode.BaseNode.batchErrorHandling = "continue"
135+
b.batchErrorHandling = "continue"
136136
} else {
137-
b.CustomNode.BaseNode.batchErrorHandling = "stop"
137+
b.batchErrorHandling = "stop"
138138
}
139139
return b
140140
}
2.5 MB
Binary file not shown.
25.6 MB
Binary file not shown.

flyt.go

Lines changed: 1 addition & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -441,18 +441,6 @@ const (
441441
// DefaultAction is the default action if none is specified.
442442
// Flows use this when a node doesn't explicitly return an action.
443443
DefaultAction Action = "default"
444-
445-
// KeyItems is the shared store key for items to be processed.
446-
// Batch nodes look for items under this key by default.
447-
KeyItems = "items"
448-
449-
// KeyResults is the shared store key for processing results.
450-
// Batch nodes store their results under this key by default.
451-
KeyResults = "results"
452-
453-
// KeyBatchCount is the shared store key for batch count.
454-
// Batch flows store the number of iterations under this key.
455-
KeyBatchCount = "batch_count"
456444
)
457445

458446
// Node is the interface that all nodes must implement.
@@ -716,7 +704,7 @@ func Run(ctx context.Context, node Node, shared *SharedStore) (Action, error) {
716704
}
717705

718706
// Get retry settings if available
719-
var maxRetries int = 1
707+
var maxRetries = 1
720708
var wait time.Duration = 0
721709

722710
if retryable, ok := node.(RetryableNode); ok {
@@ -1087,22 +1075,6 @@ func ToSlice(v any) []any {
10871075
}
10881076
}
10891077

1090-
// FlowFactory creates new instances of a flow.
1091-
// This is used to create reusable flow definitions. Each call should return
1092-
// a new flow instance to avoid race conditions when running flows concurrently.
1093-
//
1094-
// Example:
1095-
//
1096-
// factory := func() *flyt.Flow {
1097-
// node1 := &ProcessNode{BaseNode: flyt.NewBaseNode()}
1098-
// node2 := &SaveNode{BaseNode: flyt.NewBaseNode()}
1099-
//
1100-
// flow := flyt.NewFlow(node1)
1101-
// flow.Connect(node1, flyt.DefaultAction, node2)
1102-
// return flow
1103-
// }
1104-
type FlowFactory func() *Flow
1105-
11061078
// CustomNode is a node implementation that uses custom functions
11071079
// for Prep, Exec, and Post phases. This allows creating nodes
11081080
// without defining new types, useful for simple operations.

generic.go

Lines changed: 0 additions & 92 deletions
This file was deleted.

0 commit comments

Comments
 (0)