Skip to content

Commit 50ca31c

Browse files
ezynda3opencode
andcommitted
Fix BatchFlow sequential execution and improve clarity
- Fix bug where sequential batch flows shared state between iterations - Rename BatchParams to FlowInputs for better clarity - Add documentation explaining FlowInputs purpose - Enhance README with detailed batch flow example - Ensure each flow iteration gets its own isolated SharedStore Breaking change: BatchParams renamed to FlowInputs 🤖 Generated with [opencode](https://opencode.ai) Co-Authored-By: opencode <[email protected]>
1 parent 85bfdde commit 50ca31c

File tree

3 files changed

+53
-28
lines changed

3 files changed

+53
-28
lines changed

README.md

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -208,29 +208,55 @@ func (n *RateLimitedNode) Exec(ctx context.Context, prepResult any) (any, error)
208208
Process multiple items concurrently:
209209

210210
```go
211-
// Create a flow factory
211+
// Simple batch node for processing items
212+
processFunc := func(ctx context.Context, item any) (any, error) {
213+
// Process each item
214+
return fmt.Sprintf("processed: %v", item), nil
215+
}
216+
217+
batchNode := flyt.NewBatchNode(processFunc, true) // true for concurrent
218+
shared.Set("items", []string{"item1", "item2", "item3"})
219+
```
220+
221+
### Batch Flows
222+
223+
Run the same flow multiple times with different parameters:
224+
225+
```go
226+
// Create a flow factory - returns a new flow instance for each iteration
212227
flowFactory := func() *flyt.Flow {
213-
node := flyt.NewNode(
228+
validateNode := flyt.NewNode(
229+
flyt.WithPrepFunc(func(ctx context.Context, shared *flyt.SharedStore) (any, error) {
230+
// Each flow has its own SharedStore with merged FlowInputs
231+
userID, _ := shared.Get("user_id")
232+
email, _ := shared.Get("email")
233+
return map[string]any{"user_id": userID, "email": email}, nil
234+
}),
214235
flyt.WithExecFunc(func(ctx context.Context, prepResult any) (any, error) {
215-
// Process individual item
216-
return processItem(prepResult)
236+
data := prepResult.(map[string]any)
237+
// Process user data
238+
return processUser(data), nil
217239
}),
218240
)
219-
return flyt.NewFlow(node)
241+
return flyt.NewFlow(validateNode)
220242
}
221243

222-
// Define batch parameters
223-
batchFunc := func(ctx context.Context, shared *flyt.SharedStore) ([]flyt.BatchParams, error) {
224-
return []flyt.BatchParams{
225-
{"id": 1, "data": "item1"},
226-
{"id": 2, "data": "item2"},
227-
{"id": 3, "data": "item3"},
244+
// Define input parameters for each flow iteration
245+
// Each FlowInputs map is merged into that flow's isolated SharedStore
246+
batchFunc := func(ctx context.Context, shared *flyt.SharedStore) ([]flyt.FlowInputs, error) {
247+
// Could fetch from database, API, etc.
248+
return []flyt.FlowInputs{
249+
{"user_id": 1, "email": "[email protected]"},
250+
{"user_id": 2, "email": "[email protected]"},
251+
{"user_id": 3, "email": "[email protected]"},
228252
}, nil
229253
}
230254

231255
// Create and run batch flow
232256
batchFlow := flyt.NewBatchFlow(flowFactory, batchFunc, true) // true for concurrent
233257
err := batchFlow.Run(ctx, shared)
258+
259+
// Each flow runs in isolation with its own SharedStore containing the FlowInputs
234260
```
235261

236262
### Nested Flows

batch.go

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -195,11 +195,12 @@ func (n *batchNode) Post(ctx context.Context, shared *SharedStore, prepResult, e
195195
return DefaultAction, nil
196196
}
197197

198-
// BatchParams holds parameters for a batch iteration
199-
type BatchParams map[string]any
198+
// FlowInputs holds input parameters for a flow iteration in batch processing.
199+
// These parameters are merged into each flow's isolated SharedStore.
200+
type FlowInputs map[string]any
200201

201-
// BatchFlowFunc returns parameters for each batch iteration
202-
type BatchFlowFunc func(ctx context.Context, shared *SharedStore) ([]BatchParams, error)
202+
// BatchFlowFunc returns input parameters for each flow iteration in batch processing
203+
type BatchFlowFunc func(ctx context.Context, shared *SharedStore) ([]FlowInputs, error)
203204

204205
// NewBatchFlow creates a flow that runs multiple times with different parameters.
205206
// The flowFactory must create new flow instances for concurrent execution to avoid
@@ -271,7 +272,7 @@ func (n *batchFlowNode) Exec(ctx context.Context, prepResult any) (any, error) {
271272
return nil, fmt.Errorf("batchFlowNode: exec failed: invalid prepResult type %T, expected map[string]any", prepResult)
272273
}
273274

274-
batchParams, ok := data["batchParams"].([]BatchParams)
275+
batchParams, ok := data["batchParams"].([]FlowInputs)
275276
if !ok {
276277
return nil, fmt.Errorf("batchFlowNode: exec failed: invalid batchParams type in prepResult")
277278
}
@@ -352,10 +353,6 @@ func (n *batchFlowNode) Exec(ctx context.Context, prepResult any) (any, error) {
352353
}
353354
} else {
354355
// Run flows sequentially
355-
// Create a shared store for sequential execution
356-
seqShared := NewSharedStore()
357-
seqShared.Merge(sharedData)
358-
359356
for i, params := range batchParams {
360357
// Check context
361358
if err := ctx.Err(); err != nil {
@@ -365,10 +362,12 @@ func (n *batchFlowNode) Exec(ctx context.Context, prepResult any) (any, error) {
365362
// Create a new flow instance for each iteration
366363
flow := n.flowFactory()
367364

368-
// Merge params into shared for the flow execution
369-
seqShared.Merge(params)
365+
// Create isolated shared store for each iteration
366+
iterShared := NewSharedStore()
367+
iterShared.Merge(sharedData)
368+
iterShared.Merge(params)
370369

371-
if err := flow.Run(ctx, seqShared); err != nil {
370+
if err := flow.Run(ctx, iterShared); err != nil {
372371
return nil, fmt.Errorf("batchFlowNode: flow %d failed: %w", i, err)
373372
}
374373
}

batch_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -198,10 +198,10 @@ func TestBatchFlowConcurrency(t *testing.T) {
198198
}
199199

200200
// Batch function returns parameters for each iteration
201-
batchFunc := func(ctx context.Context, shared *SharedStore) ([]BatchParams, error) {
202-
params := make([]BatchParams, 5)
201+
batchFunc := func(ctx context.Context, shared *SharedStore) ([]FlowInputs, error) {
202+
params := make([]FlowInputs, 5)
203203
for i := range params {
204-
params[i] = BatchParams{"index": i}
204+
params[i] = FlowInputs{"index": i}
205205
}
206206
return params, nil
207207
}
@@ -247,8 +247,8 @@ func TestBatchFlowCustomCountKey(t *testing.T) {
247247
}
248248

249249
// Batch function returns parameters
250-
batchFunc := func(ctx context.Context, shared *SharedStore) ([]BatchParams, error) {
251-
return []BatchParams{
250+
batchFunc := func(ctx context.Context, shared *SharedStore) ([]FlowInputs, error) {
251+
return []FlowInputs{
252252
{"id": 1},
253253
{"id": 2},
254254
{"id": 3},

0 commit comments

Comments
 (0)