Skip to content

Commit fa74c42

Browse files
committed
add chaining to flow.Connect
1 parent bbd9fce commit fa74c42

File tree

2 files changed

+151
-1
lines changed

2 files changed

+151
-1
lines changed

flyt.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -728,6 +728,10 @@ func Run(ctx context.Context, node Node, shared *SharedStore) (Action, error) {
728728
// flow.Connect(validateNode, ActionSuccess, processNode)
729729
// flow.Connect(validateNode, ActionFail, errorNode)
730730
//
731+
// // Or with chaining:
732+
// flow.Connect(validateNode, ActionSuccess, processNode)
733+
// .Connect(validateNode, ActionFail, errorNode)
734+
//
731735
// // Run flow
732736
// err := flow.Run(ctx, shared)
733737
type Flow struct {
@@ -757,6 +761,7 @@ func NewFlow(start Node) *Flow {
757761
// Connect adds a transition from one node to another based on an action.
758762
// When the 'from' node returns the specified action, execution continues
759763
// with the 'to' node. Multiple actions can be connected from a single node.
764+
// Returns the flow instance for method chaining.
760765
//
761766
// Parameters:
762767
// - from: The source node
@@ -768,11 +773,18 @@ func NewFlow(start Node) *Flow {
768773
// flow.Connect(nodeA, "success", nodeB)
769774
// flow.Connect(nodeA, "retry", nodeA) // Self-loop for retry
770775
// flow.Connect(nodeA, "fail", errorNode)
771-
func (f *Flow) Connect(from Node, action Action, to Node) {
776+
//
777+
// Example with chaining:
778+
//
779+
// flow.Connect(nodeA, "success", nodeB)
780+
// .Connect(nodeB, "success", finalNode)
781+
// .Connect(nodeB, "fail", errorNode)
782+
func (f *Flow) Connect(from Node, action Action, to Node) *Flow {
772783
if f.transitions[from] == nil {
773784
f.transitions[from] = make(map[Action]Node)
774785
}
775786
f.transitions[from][action] = to
787+
return f
776788
}
777789

778790
// Run executes the flow starting from the start node.

flyt_test.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -689,4 +689,142 @@ func BenchmarkSharedStoreConcurrentAccess(b *testing.B) {
689689
})
690690
}
691691

692+
// TestFlowConnectChaining tests that Connect method can be chained
693+
func TestFlowConnectChaining(t *testing.T) {
694+
// Create nodes
695+
node1 := &TestNode{
696+
BaseNode: NewBaseNode(),
697+
name: "node1",
698+
action: "next",
699+
}
700+
701+
node2 := &TestNode{
702+
BaseNode: NewBaseNode(),
703+
name: "node2",
704+
action: "final",
705+
}
706+
707+
node3 := &TestNode{
708+
BaseNode: NewBaseNode(),
709+
name: "node3",
710+
action: DefaultAction,
711+
}
712+
713+
// Test chaining
714+
flow := NewFlow(node1)
715+
flow.Connect(node1, "next", node2).
716+
Connect(node2, "final", node3)
717+
718+
// Run the flow
719+
ctx := context.Background()
720+
shared := NewSharedStore()
721+
err := flow.Run(ctx, shared)
722+
if err != nil {
723+
t.Fatalf("unexpected error: %v", err)
724+
}
725+
726+
// Verify execution order
727+
if !node1.executed {
728+
t.Error("node1 not executed")
729+
}
730+
if !node2.executed {
731+
t.Error("node2 not executed")
732+
}
733+
if !node3.executed {
734+
t.Error("node3 not executed")
735+
}
736+
}
737+
738+
// TestFlowConnectChainingWithMultipleActions tests chaining with multiple actions from same node
739+
func TestFlowConnectChainingWithMultipleActions(t *testing.T) {
740+
// Create nodes
741+
decisionNode := &TestNode{
742+
BaseNode: NewBaseNode(),
743+
name: "decision",
744+
action: "branch",
745+
}
746+
747+
successNode := &TestNode{
748+
BaseNode: NewBaseNode(),
749+
name: "success",
750+
action: DefaultAction,
751+
}
752+
753+
failNode := &TestNode{
754+
BaseNode: NewBaseNode(),
755+
name: "fail",
756+
action: DefaultAction,
757+
}
758+
759+
retryNode := &TestNode{
760+
BaseNode: NewBaseNode(),
761+
name: "retry",
762+
action: DefaultAction,
763+
}
764+
765+
// Test chaining with multiple actions from same node
766+
flow := NewFlow(decisionNode)
767+
flow.Connect(decisionNode, "branch", successNode).
768+
Connect(decisionNode, "fail", failNode).
769+
Connect(decisionNode, "retry", retryNode)
770+
771+
// Run the flow
772+
ctx := context.Background()
773+
shared := NewSharedStore()
774+
err := flow.Run(ctx, shared)
775+
if err != nil {
776+
t.Fatalf("unexpected error: %v", err)
777+
}
778+
779+
// Verify execution
780+
if !decisionNode.executed {
781+
t.Error("decisionNode not executed")
782+
}
783+
if !successNode.executed {
784+
t.Error("successNode not executed")
785+
}
786+
if failNode.executed {
787+
t.Error("failNode should not be executed")
788+
}
789+
if retryNode.executed {
790+
t.Error("retryNode should not be executed")
791+
}
792+
}
793+
794+
// TestFlowConnectChainingBackwardsCompatibility tests that traditional usage still works
795+
func TestFlowConnectChainingBackwardsCompatibility(t *testing.T) {
796+
// Create nodes
797+
node1 := &TestNode{
798+
BaseNode: NewBaseNode(),
799+
name: "node1",
800+
action: "next",
801+
}
802+
803+
node2 := &TestNode{
804+
BaseNode: NewBaseNode(),
805+
name: "node2",
806+
action: DefaultAction,
807+
}
808+
809+
// Test that traditional non-chaining usage still works
810+
flow := NewFlow(node1)
811+
flow.Connect(node1, "next", node2) // Traditional usage without chaining
812+
813+
// Run the flow
814+
ctx := context.Background()
815+
shared := NewSharedStore()
816+
err := flow.Run(ctx, shared)
817+
if err != nil {
818+
t.Fatalf("unexpected error: %v", err)
819+
}
820+
821+
// Verify execution
822+
if !node1.executed {
823+
t.Error("node1 not executed")
824+
}
825+
if !node2.executed {
826+
t.Error("node2 not executed")
827+
}
828+
}
829+
692830
// BenchmarkBatchNodeProcessing benchmarks batch processing

0 commit comments

Comments
 (0)