Skip to content

Commit 78900a6

Browse files
authored
[mono] The processor will now handle correctly if item is published before being subscribed. (#12)
1 parent 14b2b2d commit 78900a6

File tree

3 files changed

+57
-42
lines changed

3 files changed

+57
-42
lines changed

mono/mono_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,19 +83,19 @@ func TestSwitchIfEmpty(t *testing.T) {
8383
}
8484

8585
func TestSuite(t *testing.T) {
86-
pc := mono.CreateProcessor()
86+
// TODO: processor
87+
//pc := mono.CreateProcessor()
88+
//go func() {
89+
// pc.Success(num)
90+
//}()
8791
all := map[string]mono.Mono{
8892
"Just": mono.Just(num),
8993
"Create": mono.Create(func(i context.Context, sink mono.Sink) {
9094
sink.Success(num)
9195
}),
92-
"Processor": pc,
96+
//"Processor": pc,
9397
}
9498

95-
go func() {
96-
pc.Success(num)
97-
}()
98-
9999
for k, v := range all {
100100
t.Run(k+"_Map", func(t *testing.T) {
101101
testMap(v, t)

mono/processor.go

Lines changed: 48 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,17 @@ import (
1111

1212
type processor struct {
1313
sync.RWMutex
14-
subs []reactor.Subscriber
15-
stat int32
16-
v Any
14+
subscriber reactor.Subscriber
15+
stat int32
16+
requested int32
17+
v Any
1718
}
1819

1920
type processorSubscriber struct {
20-
parent *processor
21-
actual reactor.Subscriber
22-
stat int32
23-
s reactor.Subscription
24-
requested int32
21+
parent *processor
22+
actual reactor.Subscriber
23+
stat int32
24+
s reactor.Subscription
2525
}
2626

2727
func (p *processor) getValue() Any {
@@ -36,26 +36,27 @@ func (p *processor) setValue(v Any) {
3636
p.v = v
3737
}
3838

39-
func (p *processor) getStat() (stat int32) {
40-
return atomic.LoadInt32(&p.stat)
41-
}
42-
4339
func (p *processor) Success(v Any) {
4440
if !atomic.CompareAndSwapInt32(&p.stat, 0, statComplete) {
4541
hooks.Global().OnNextDrop(v)
4642
return
4743
}
4844

49-
p.setValue(v)
50-
5145
p.RLock()
52-
defer p.RUnlock()
53-
for i, l := 0, len(p.subs); i < l; i++ {
54-
s := p.subs[i]
46+
if p.subscriber == nil {
47+
p.RUnlock()
48+
p.setValue(v)
49+
return
50+
}
51+
p.RUnlock()
52+
53+
if atomic.LoadInt32(&p.requested) > 0 {
5554
if v != nil {
56-
s.OnNext(v)
55+
p.subscriber.OnNext(v)
5756
}
58-
s.OnComplete()
57+
p.subscriber.OnComplete()
58+
} else {
59+
p.setValue(v)
5960
}
6061
}
6162

@@ -64,11 +65,18 @@ func (p *processor) Error(e error) {
6465
hooks.Global().OnErrorDrop(e)
6566
return
6667
}
67-
p.setValue(e)
68+
6869
p.RLock()
69-
defer p.RUnlock()
70-
for i, l := 0, len(p.subs); i < l; i++ {
71-
p.subs[i].OnError(e)
70+
if p.subscriber == nil {
71+
p.RUnlock()
72+
p.setValue(e)
73+
return
74+
}
75+
76+
if atomic.LoadInt32(&p.requested) > 0 {
77+
p.subscriber.OnError(e)
78+
} else {
79+
p.setValue(e)
7280
}
7381
}
7482

@@ -77,33 +85,45 @@ func (p *processor) SubscribeWith(ctx context.Context, s reactor.Subscriber) {
7785
case <-ctx.Done():
7886
s.OnError(reactor.ErrSubscribeCancelled)
7987
default:
88+
p.Lock()
89+
if p.subscriber != nil {
90+
p.Unlock()
91+
panic("reactor: mono processor can only been subscribed once!")
92+
}
93+
p.subscriber = s
94+
p.Unlock()
8095
s.OnSubscribe(ctx, &processorSubscriber{
8196
actual: s,
8297
parent: p,
8398
})
84-
p.Lock()
85-
p.subs = append(p.subs, s)
86-
p.Unlock()
8799
}
88100
}
89101

102+
func (p *processor) getStat() (stat int32) {
103+
return atomic.LoadInt32(&p.stat)
104+
}
105+
90106
func (p *processorSubscriber) Request(n int) {
91107
if n < 1 {
92108
panic(reactor.ErrNegativeRequest)
93109
}
94-
if atomic.AddInt32(&p.requested, 1) != 1 {
110+
111+
if atomic.AddInt32(&p.parent.requested, 1) != 1 {
95112
return
96113
}
97-
v := p.parent.getValue()
114+
98115
switch p.parent.getStat() {
99116
case statError:
117+
v := p.parent.getValue()
100118
p.OnError(v.(error))
101119
case statComplete:
120+
v := p.parent.getValue()
102121
if v != nil {
103122
p.OnNext(v)
104123
}
105124
p.OnComplete()
106125
}
126+
107127
}
108128

109129
func (p *processorSubscriber) Cancel() {

mono/processor_test.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,9 @@ func TestProcessor(t *testing.T) {
2626
assert.NoError(t, err, "block failed")
2727
assert.Equal(t, 666, v.(int), "bad result")
2828

29-
var actual int
30-
p.
31-
DoOnNext(func(v Any) error {
32-
actual = v.(int)
33-
return nil
34-
}).
35-
Subscribe(context.Background())
36-
assert.Equal(t, 333, actual, "bad result")
29+
assert.Panics(t, func() {
30+
p.Subscribe(context.Background())
31+
})
3732
}
3833

3934
func TestProcessor_Context(t *testing.T) {

0 commit comments

Comments
 (0)