Skip to content

Commit a6e637b

Browse files
committed
feat: implement block subscriber pool (#36)
(cherry picked from commit 3addfdc)
1 parent b50119b commit a6e637b

File tree

3 files changed

+166
-48
lines changed

3 files changed

+166
-48
lines changed

internal/subscribers/block_subscriber.go

+71-33
Original file line numberDiff line numberDiff line change
@@ -2,66 +2,104 @@ package subscribers
22

33
import (
44
"context"
5+
"math"
6+
"sync"
7+
"sync/atomic"
58

69
"github.com/jjeffcaii/reactor-go"
710
"github.com/jjeffcaii/reactor-go/hooks"
811
)
912

10-
type blockSubscriber struct {
11-
done chan struct{}
12-
c chan<- reactor.Item
13+
var globalBlockSubscriberPool blockSubscriberPool
14+
15+
func BorrowBlockSubscriber() *BlockSubscriber {
16+
return globalBlockSubscriberPool.get()
17+
}
18+
19+
func ReturnBlockSubscriber(s *BlockSubscriber) {
20+
globalBlockSubscriberPool.put(s)
21+
}
22+
23+
type blockSubscriberPool struct {
24+
inner sync.Pool
25+
}
26+
27+
func (bp *blockSubscriberPool) get() *BlockSubscriber {
28+
if exist, _ := bp.inner.Get().(*BlockSubscriber); exist != nil {
29+
atomic.StoreInt32(&exist.done, 0)
30+
return exist
31+
}
32+
return &BlockSubscriber{
33+
doneChan: make(chan struct{}, 1),
34+
}
1335
}
1436

15-
func NewBlockSubscriber(done chan struct{}, c chan reactor.Item) reactor.Subscriber {
16-
return blockSubscriber{
17-
done: done,
18-
c: c,
37+
func (bp *blockSubscriberPool) put(s *BlockSubscriber) {
38+
if s == nil {
39+
return
1940
}
41+
s.Reset()
42+
bp.inner.Put(s)
2043
}
2144

22-
func (b blockSubscriber) OnComplete() {
23-
select {
24-
case <-b.done:
25-
default:
26-
close(b.done)
45+
type BlockSubscriber struct {
46+
reactor.Item
47+
doneChan chan struct{}
48+
ctxChan chan struct{}
49+
done int32
50+
}
51+
52+
func (b *BlockSubscriber) Reset() {
53+
b.V = nil
54+
b.E = nil
55+
b.ctxChan = nil
56+
atomic.StoreInt32(&b.done, math.MinInt32)
57+
}
58+
59+
func (b *BlockSubscriber) Done() <-chan struct{} {
60+
return b.doneChan
61+
}
62+
63+
func (b *BlockSubscriber) OnComplete() {
64+
if atomic.CompareAndSwapInt32(&b.done, 0, 1) {
65+
b.finish()
2766
}
2867
}
2968

30-
func (b blockSubscriber) OnError(err error) {
31-
select {
32-
case <-b.done:
69+
func (b *BlockSubscriber) OnError(err error) {
70+
if !atomic.CompareAndSwapInt32(&b.done, 0, 1) {
3371
hooks.Global().OnErrorDrop(err)
34-
default:
35-
select {
36-
case b.c <- reactor.Item{E: err}:
37-
default:
38-
hooks.Global().OnErrorDrop(err)
39-
}
40-
close(b.done)
72+
return
73+
}
74+
b.E = err
75+
b.finish()
76+
}
77+
78+
func (b *BlockSubscriber) finish() {
79+
if b.ctxChan != nil {
80+
close(b.ctxChan)
4181
}
82+
b.doneChan <- struct{}{}
4283
}
4384

44-
func (b blockSubscriber) OnNext(any reactor.Any) {
45-
select {
46-
case <-b.done:
85+
func (b *BlockSubscriber) OnNext(any reactor.Any) {
86+
if atomic.LoadInt32(&b.done) != 0 || b.V != nil || b.E != nil {
4787
hooks.Global().OnNextDrop(any)
48-
default:
49-
select {
50-
case b.c <- reactor.Item{V: any}:
51-
default:
52-
hooks.Global().OnNextDrop(any)
53-
}
88+
return
5489
}
90+
b.V = any
5591
}
5692

57-
func (b blockSubscriber) OnSubscribe(ctx context.Context, subscription reactor.Subscription) {
93+
func (b *BlockSubscriber) OnSubscribe(ctx context.Context, subscription reactor.Subscription) {
5894
// workaround: watch context
5995
if ctx != context.Background() && ctx != context.TODO() {
96+
ctxChan := make(chan struct{})
97+
b.ctxChan = ctxChan
6098
go func() {
6199
select {
62100
case <-ctx.Done():
63101
b.OnError(reactor.NewContextError(ctx.Err()))
64-
case <-b.done:
102+
case <-ctxChan:
65103
}
66104
}()
67105
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package subscribers
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
"time"
8+
9+
"github.com/jjeffcaii/reactor-go"
10+
"github.com/jjeffcaii/reactor-go/internal"
11+
"github.com/stretchr/testify/assert"
12+
)
13+
14+
func TestBlockSubscriber(t *testing.T) {
15+
fakeErr := errors.New("fake error")
16+
17+
// complete test
18+
s := BorrowBlockSubscriber()
19+
go func() {
20+
s.OnNext(1)
21+
s.OnComplete()
22+
}()
23+
24+
<-s.Done()
25+
26+
assert.NoError(t, s.E, "should not return error")
27+
assert.Equal(t, 1, s.V, "bad result")
28+
ReturnBlockSubscriber(s)
29+
30+
// error test
31+
s = BorrowBlockSubscriber()
32+
s.OnError(fakeErr)
33+
// omit
34+
s.OnNext(2)
35+
s.OnError(fakeErr)
36+
s.OnComplete()
37+
38+
<-s.Done()
39+
40+
assert.Equal(t, fakeErr, s.E, "should be fake error")
41+
assert.Nil(t, s.V)
42+
ReturnBlockSubscriber(s)
43+
44+
// empty test
45+
s = BorrowBlockSubscriber()
46+
s.OnComplete()
47+
// omit
48+
s.OnNext(2)
49+
s.OnError(fakeErr)
50+
51+
<-s.Done()
52+
53+
assert.NoError(t, s.E, "should not return error")
54+
assert.Nil(t, s.V)
55+
ReturnBlockSubscriber(s)
56+
}
57+
58+
func TestReturnBlockSubscriber(t *testing.T) {
59+
assert.NotPanics(t, func() {
60+
ReturnBlockSubscriber(nil)
61+
})
62+
}
63+
64+
func TestBlockSubscriber_OnSubscribe(t *testing.T) {
65+
s := BorrowBlockSubscriber()
66+
s.OnSubscribe(context.Background(), internal.EmptySubscription)
67+
ReturnBlockSubscriber(s)
68+
69+
s = BorrowBlockSubscriber()
70+
ctx, cancel := context.WithCancel(context.Background())
71+
cancel()
72+
s.OnSubscribe(ctx, internal.EmptySubscription)
73+
<-s.Done()
74+
assert.Error(t, s.E, "should return error")
75+
assert.True(t, reactor.IsCancelledError(s.E), "should be cancelled error")
76+
ReturnBlockSubscriber(s)
77+
78+
s = BorrowBlockSubscriber()
79+
ctx, cancel = context.WithCancel(context.Background())
80+
s.OnSubscribe(ctx, internal.EmptySubscription)
81+
s.OnComplete()
82+
time.Sleep(10 * time.Millisecond)
83+
cancel()
84+
85+
<-s.Done()
86+
}

mono/wrapper_utils.go

+9-15
Original file line numberDiff line numberDiff line change
@@ -32,22 +32,16 @@ func IsSubscribeAsync(m Mono) bool {
3232
}
3333

3434
func block(ctx context.Context, publisher reactor.RawPublisher) (Any, error) {
35-
done := make(chan struct{})
36-
c := make(chan reactor.Item, 1)
37-
b := subscribers.NewBlockSubscriber(done, c)
38-
publisher.SubscribeWith(ctx, b)
39-
<-done
40-
defer close(c)
41-
42-
select {
43-
case result := <-c:
44-
if result.E != nil {
45-
return nil, result.E
46-
}
47-
return result.V, nil
48-
default:
49-
return nil, nil
35+
s := subscribers.BorrowBlockSubscriber()
36+
defer subscribers.ReturnBlockSubscriber(s)
37+
38+
publisher.SubscribeWith(ctx, s)
39+
<-s.Done()
40+
41+
if s.E != nil {
42+
return nil, s.E
5043
}
44+
return s.V, nil
5145
}
5246

5347
func mustProcessor(publisher reactor.RawPublisher) rawProcessor {

0 commit comments

Comments
 (0)