@@ -2,6 +2,8 @@ package mono
2
2
3
3
import (
4
4
"context"
5
+ "math"
6
+ "sync/atomic"
5
7
"time"
6
8
7
9
"github.com/jjeffcaii/reactor-go"
@@ -13,6 +15,21 @@ type monoTimeout struct {
13
15
timeout time.Duration
14
16
}
15
17
18
+ func newMonoTimeout (source reactor.RawPublisher , timeout time.Duration ) * monoTimeout {
19
+ return & monoTimeout {
20
+ source : source ,
21
+ timeout : timeout ,
22
+ }
23
+ }
24
+
25
+ func (m * monoTimeout ) SubscribeWith (ctx context.Context , s reactor.Subscriber ) {
26
+ m .source .SubscribeWith (ctx , & timeoutSubscriber {
27
+ actual : s ,
28
+ timeout : m .timeout ,
29
+ done : make (chan struct {}),
30
+ })
31
+ }
32
+
16
33
func (m * monoTimeout ) Parent () reactor.RawPublisher {
17
34
return m .source
18
35
}
@@ -21,33 +38,37 @@ type timeoutSubscriber struct {
21
38
actual reactor.Subscriber
22
39
timeout time.Duration
23
40
done chan struct {}
41
+ closed int32
24
42
}
25
43
26
44
func (t * timeoutSubscriber ) OnComplete () {
27
- select {
28
- case <- t .done :
29
- default :
45
+ if atomic .CompareAndSwapInt32 (& t .closed , 0 , math .MaxInt32 ) || atomic .CompareAndSwapInt32 (& t .closed , 1 , math .MaxInt32 ) {
30
46
close (t .done )
31
47
t .actual .OnComplete ()
32
48
}
33
49
}
34
50
35
51
func (t * timeoutSubscriber ) OnError (err error ) {
36
- select {
37
- case <- t .done :
38
- hooks .Global ().OnErrorDrop (err )
39
- default :
52
+ if atomic .CompareAndSwapInt32 (& t .closed , 0 , - 1 ) {
40
53
close (t .done )
41
54
t .actual .OnError (err )
55
+ return
56
+ }
57
+
58
+ // item is emitted before error reach, should be processed as completed.
59
+ if atomic .CompareAndSwapInt32 (& t .closed , 1 , - 1 ) {
60
+ close (t .done )
61
+ t .actual .OnComplete ()
42
62
}
63
+
64
+ hooks .Global ().OnErrorDrop (err )
43
65
}
44
66
45
67
func (t * timeoutSubscriber ) OnNext (any reactor.Any ) {
46
- select {
47
- case <- t .done :
48
- hooks .Global ().OnNextDrop (any )
49
- default :
68
+ if atomic .CompareAndSwapInt32 (& t .closed , 0 , 1 ) {
50
69
t .actual .OnNext (any )
70
+ } else {
71
+ hooks .Global ().OnNextDrop (any )
51
72
}
52
73
}
53
74
@@ -63,18 +84,3 @@ func (t *timeoutSubscriber) OnSubscribe(ctx context.Context, subscription reacto
63
84
}()
64
85
t .actual .OnSubscribe (ctx , subscription )
65
86
}
66
-
67
- func (m * monoTimeout ) SubscribeWith (ctx context.Context , s reactor.Subscriber ) {
68
- m .source .SubscribeWith (ctx , & timeoutSubscriber {
69
- actual : s ,
70
- timeout : m .timeout ,
71
- done : make (chan struct {}),
72
- })
73
- }
74
-
75
- func newMonoTimeout (source reactor.RawPublisher , timeout time.Duration ) * monoTimeout {
76
- return & monoTimeout {
77
- source : source ,
78
- timeout : timeout ,
79
- }
80
- }
0 commit comments