Skip to content

Commit ac1270f

Browse files
authored
fix: deadlock when using switch_if_error (#43)
1 parent a7bd3e9 commit ac1270f

File tree

4 files changed

+21
-49
lines changed

4 files changed

+21
-49
lines changed

hc/hc_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@ import (
66
"net/http"
77
"testing"
88

9+
"github.com/stretchr/testify/assert"
10+
911
"github.com/jjeffcaii/reactor-go"
1012
"github.com/jjeffcaii/reactor-go/hc"
1113
"github.com/jjeffcaii/reactor-go/mono"
1214
"github.com/jjeffcaii/reactor-go/scheduler"
1315
"github.com/jjeffcaii/reactor-go/tuple"
14-
"github.com/stretchr/testify/assert"
1516
)
1617

1718
var httpBinUrl = "https://httpbin.org/anything"
@@ -89,7 +90,7 @@ func TestClient_Do(t *testing.T) {
8990
}
9091

9192
func TestDo_Failed(t *testing.T) {
92-
req, _ := http.NewRequest(http.MethodGet, "http://127.0.0.1/not-exists-path", nil)
93+
req, _ := http.NewRequest(http.MethodGet, "http://127.0.0.1:8080/not-exists-path", nil)
9394
_, err := hc.Do(req).Block(context.Background())
9495
assert.Error(t, err)
9596
}

mono/create.go

+6-33
Original file line numberDiff line numberDiff line change
@@ -2,38 +2,14 @@ package mono
22

33
import (
44
"context"
5-
"math"
6-
"sync"
75
"sync/atomic"
86

7+
"github.com/pkg/errors"
8+
99
"github.com/jjeffcaii/reactor-go"
1010
"github.com/jjeffcaii/reactor-go/hooks"
11-
"github.com/pkg/errors"
1211
)
1312

14-
var globalSinkPool sinkPool
15-
16-
type sinkPool struct {
17-
inner sync.Pool
18-
}
19-
20-
func (p *sinkPool) get() *sink {
21-
if exist, _ := p.inner.Get().(*sink); exist != nil {
22-
atomic.StoreInt32(&exist.stat, 0)
23-
return exist
24-
}
25-
return &sink{}
26-
}
27-
28-
func (p *sinkPool) put(s *sink) {
29-
if s == nil {
30-
return
31-
}
32-
atomic.StoreInt32(&s.stat, math.MinInt32)
33-
s.actual = nil
34-
p.inner.Put(s)
35-
}
36-
3713
type Sink interface {
3814
Success(Any)
3915
Error(error)
@@ -84,23 +60,21 @@ func (s *sink) Success(v Any) {
8460
s.Complete()
8561
}
8662

87-
func (s *sink) Request(n int) {
63+
func (s *sink) Request(_ int) {
8864
// ignore
8965
}
9066

9167
func (s *sink) Cancel() {
9268
if !atomic.CompareAndSwapInt32(&s.stat, 0, statCancel) {
9369
return
9470
}
95-
defer globalSinkPool.put(s)
9671
s.actual.OnError(reactor.ErrSubscribeCancelled)
9772
}
9873

9974
func (s *sink) Complete() {
10075
if !atomic.CompareAndSwapInt32(&s.stat, 0, statComplete) {
10176
return
10277
}
103-
defer globalSinkPool.put(s)
10478
s.actual.OnComplete()
10579
}
10680

@@ -109,7 +83,6 @@ func (s *sink) Error(err error) {
10983
hooks.Global().OnErrorDrop(err)
11084
return
11185
}
112-
defer globalSinkPool.put(s)
11386
s.actual.OnError(err)
11487
}
11588

@@ -129,8 +102,8 @@ func (s *sink) Next(v Any) {
129102
}
130103

131104
func (m monoCreate) SubscribeWith(ctx context.Context, s reactor.Subscriber) {
132-
sk := globalSinkPool.get()
105+
var sk sink
133106
sk.actual = s
134-
s.OnSubscribe(ctx, sk)
135-
m.sinker(ctx, sk)
107+
s.OnSubscribe(ctx, &sk)
108+
m.sinker(ctx, &sk)
136109
}

mono/create_test.go

+1-7
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ import (
66
"testing"
77

88
"github.com/golang/mock/gomock"
9+
910
"github.com/jjeffcaii/reactor-go"
10-
"github.com/stretchr/testify/assert"
1111
)
1212

1313
func TestMonoCreate_SubscribeWith(t *testing.T) {
@@ -97,9 +97,3 @@ func TestMonoCreate_Cancel(t *testing.T) {
9797
s.Success(1)
9898
}).SubscribeWith(context.Background(), s)
9999
}
100-
101-
func TestSinkPool_PutWithNilValue(t *testing.T) {
102-
assert.NotPanics(t, func() {
103-
globalSinkPool.put(nil)
104-
})
105-
}

mono/peek.go

+11-7
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@ import (
44
"context"
55
"sync/atomic"
66

7+
"github.com/pkg/errors"
8+
79
"github.com/jjeffcaii/reactor-go"
810
"github.com/jjeffcaii/reactor-go/internal"
9-
"github.com/pkg/errors"
1011
)
1112

1213
type monoPeek struct {
@@ -20,10 +21,11 @@ type monoPeek struct {
2021
}
2122

2223
type peekSubscriber struct {
23-
actual reactor.Subscriber
24-
parent *monoPeek
25-
s reactor.Subscription
26-
stat int32
24+
actual reactor.Subscriber
25+
parent *monoPeek
26+
s reactor.Subscription
27+
stat int32
28+
cancelled int32
2729
}
2830

2931
func newMonoPeek(source reactor.RawPublisher, first monoPeekOption, others ...monoPeekOption) *monoPeek {
@@ -52,7 +54,7 @@ func (p *peekSubscriber) Request(n int) {
5254
}
5355

5456
func (p *peekSubscriber) Cancel() {
55-
if !atomic.CompareAndSwapInt32(&p.stat, 0, statCancel) {
57+
if !atomic.CompareAndSwapInt32(&p.cancelled, 0, 1) {
5658
return
5759
}
5860
if call := p.parent.onCancelCall; call != nil {
@@ -73,7 +75,9 @@ func (p *peekSubscriber) OnComplete() {
7375

7476
func (p *peekSubscriber) OnError(err error) {
7577
if !atomic.CompareAndSwapInt32(&p.stat, 0, statError) {
76-
return
78+
if isCancelled := atomic.LoadInt32(&p.stat) == statCancel && err == reactor.ErrSubscribeCancelled; !isCancelled {
79+
return
80+
}
7781
}
7882

7983
defer func() {

0 commit comments

Comments
 (0)