@@ -52,32 +52,33 @@ func (p *Plexer) Run() {
52
52
return
53
53
}
54
54
55
- if ! handledMsg {
56
- // if after ranging over all the channels we didn't have any work, that means
57
- // that there were no messages on any of the channels. We now want to block
58
- // here and wait for a message from ANY channel
55
+ if handledMsg {
56
+ continue
57
+ }
59
58
60
- cases := make ([]reflect.SelectCase , 0 , len (p .channels ))
61
- for _ , ch := range p .channels {
62
- if ch == nil {
63
- continue
64
- }
65
- cases = append (cases , reflect.SelectCase {Dir : reflect .SelectRecv , Chan : reflect .ValueOf (ch )})
59
+ // if after ranging over all the channels we didn't have any work, that means
60
+ // that there were no messages on any of the channels. We now want to block
61
+ // here and wait for a message from ANY channel
62
+ cases := make ([]reflect.SelectCase , 0 , len (p .channels ))
63
+ for _ , ch := range p .channels {
64
+ if ch == nil {
65
+ continue
66
66
}
67
+ cases = append (cases , reflect.SelectCase {Dir : reflect .SelectRecv , Chan : reflect .ValueOf (ch )})
68
+ }
67
69
68
- remaining := len (cases )
69
- for remaining > 0 {
70
- chosen , value , ok := reflect .Select (cases )
71
- if ! ok {
72
- // The chosen channel has been closed, so zero out the channel to disable the case
73
- cases [chosen ].Chan = reflect .ValueOf (nil )
74
- remaining --
75
- continue
76
- }
77
-
78
- p .output <- value .Bytes ()
79
- break
70
+ remaining := len (cases )
71
+ for remaining > 0 {
72
+ chosen , value , ok := reflect .Select (cases )
73
+ if ! ok {
74
+ // The chosen channel has been closed, so zero out the channel to disable the case
75
+ cases [chosen ].Chan = reflect .ValueOf (nil )
76
+ remaining --
77
+ continue
80
78
}
79
+
80
+ p .output <- value .Bytes ()
81
+ break
81
82
}
82
83
}
83
84
}
0 commit comments