Skip to content

Commit 3caffbe

Browse files
authored
fix: mono block race problem (#26)
1 parent 1bef933 commit 3caffbe

16 files changed

+178
-77
lines changed

flux/flux_create.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func newFluxCreate(c func(ctx context.Context, sink Sink), options ...CreateOpti
4343
func (fc fluxCreate) SubscribeWith(ctx context.Context, s reactor.Subscriber) {
4444
select {
4545
case <-ctx.Done():
46-
s.OnError(ctx.Err())
46+
s.OnError(reactor.NewContextError(ctx.Err()))
4747
default:
4848
var sink interface {
4949
reactor.Subscription

flux/flux_interval.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,7 @@ func (is *intervalSubscription) run(ctx context.Context) {
5757
for {
5858
select {
5959
case <-ctx.Done():
60-
if err := ctx.Err(); err != nil {
61-
is.actual.OnError(err)
62-
}
60+
is.actual.OnError(reactor.NewContextError(ctx.Err()))
6361
return
6462
case <-is.done:
6563
return

internal/buffer/buffer.go internal/buffer/unbounded.go

+36-8
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
// Package buffer provides an implementation of an unbounded buffer.
1919
package buffer
2020

21-
import "sync"
21+
import (
22+
"sync"
23+
)
2224

2325
// Unbounded is an implementation of an unbounded buffer which does not use
2426
// extra goroutines. This is typically used for passing updates from one entity
@@ -34,9 +36,10 @@ import "sync"
3436
// defining a new type specific implementation of this buffer is preferred. See
3537
// internal/transport/transport.go for an example of this.
3638
type Unbounded struct {
37-
c chan interface{}
38-
mu sync.Mutex
39-
backlog []interface{}
39+
c chan interface{}
40+
mu sync.Mutex
41+
backlog []interface{}
42+
disposed bool
4043
}
4144

4245
// NewUnbounded returns a new instance of Unbounded.
@@ -47,35 +50,46 @@ func NewUnbounded() *Unbounded {
4750
// Put adds t to the unbounded buffer.
4851
func (b *Unbounded) Put(t interface{}) (ok bool) {
4952
b.mu.Lock()
50-
defer func() {
51-
ok = recover() == nil
53+
54+
if b.disposed {
5255
b.mu.Unlock()
53-
}()
56+
return
57+
}
58+
59+
ok = true
60+
5461
if len(b.backlog) == 0 {
5562
select {
5663
case b.c <- t:
64+
b.mu.Unlock()
5765
return
5866
default:
5967
}
6068
}
6169
b.backlog = append(b.backlog, t)
70+
b.mu.Unlock()
6271
return
6372
}
6473

6574
// Load sends the earliest buffered data, if any, onto the read channel
6675
// returned by Get(). Users are expected to call this every time they read a
6776
// value from the read channel.
68-
func (b *Unbounded) Load() {
77+
func (b *Unbounded) Load() (n int) {
6978
b.mu.Lock()
7079
if len(b.backlog) > 0 {
7180
select {
7281
case b.c <- b.backlog[0]:
7382
b.backlog[0] = nil
7483
b.backlog = b.backlog[1:]
84+
n = 1
7585
default:
7686
}
87+
} else if b.disposed {
88+
b.close()
89+
n = -1
7790
}
7891
b.mu.Unlock()
92+
return
7993
}
8094

8195
// Get returns a read channel on which values added to the buffer, via Put(),
@@ -87,6 +101,20 @@ func (b *Unbounded) Get() <-chan interface{} {
87101
return b.c
88102
}
89103

104+
// Dispose mark current Unbounded as disposed.
90105
func (b *Unbounded) Dispose() {
106+
b.mu.Lock()
107+
b.disposed = true
108+
if len(b.backlog) == 0 {
109+
b.close()
110+
}
111+
b.mu.Unlock()
112+
}
113+
114+
func (b *Unbounded) close() (ok bool) {
115+
defer func() {
116+
ok = recover() == nil
117+
}()
91118
close(b.c)
119+
return
92120
}

internal/buffer/unbounded_test.go

+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package buffer_test
2+
3+
import (
4+
"strings"
5+
"sync/atomic"
6+
"testing"
7+
8+
"github.com/jjeffcaii/reactor-go/internal/buffer"
9+
"github.com/stretchr/testify/assert"
10+
)
11+
12+
func TestNewUnbounded(t *testing.T) {
13+
u := buffer.NewUnbounded()
14+
15+
go func() {
16+
assert.True(t, u.Put("foo"))
17+
assert.True(t, u.Put("bar"))
18+
assert.True(t, u.Put("qux"))
19+
20+
u.Dispose()
21+
assert.False(t, u.Put("must failed"))
22+
}()
23+
24+
done := make(chan struct{})
25+
var read []string
26+
27+
go func() {
28+
defer close(done)
29+
for next := range u.Get() {
30+
read = append(read, next.(string))
31+
u.Load()
32+
}
33+
}()
34+
35+
<-done
36+
37+
assert.Equal(t, "foo,bar,qux", strings.Join(read, ","), "result doesn't match")
38+
}
39+
40+
func TestEmptyUnbounded(t *testing.T) {
41+
u := buffer.NewUnbounded()
42+
43+
done := make(chan struct{})
44+
cnt := new(int32)
45+
46+
go func() {
47+
defer close(done)
48+
for range u.Get() {
49+
atomic.AddInt32(cnt, 1)
50+
u.Load()
51+
}
52+
}()
53+
54+
go func() {
55+
u.Dispose()
56+
}()
57+
58+
<-done
59+
60+
assert.Zero(t, atomic.LoadInt32(cnt))
61+
}

internal/stopwatch/Stopwatch.go

-21
This file was deleted.

internal/subscribers/block_first_subscriber.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func (b *blockFirstSubscriber) OnNext(any reactor.Any) {
5454
func (b *blockFirstSubscriber) OnSubscribe(ctx context.Context, subscription reactor.Subscription) {
5555
select {
5656
case <-ctx.Done():
57-
b.OnError(reactor.ErrSubscribeCancelled)
57+
b.OnError(reactor.NewContextError(ctx.Err()))
5858
default:
5959
b.su = subscription
6060
b.su.Request(1)

internal/subscribers/block_last_subscriber.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func (b *BlockLastSubscriber) OnNext(value reactor.Any) {
7070
func (b *BlockLastSubscriber) OnSubscribe(ctx context.Context, subscription reactor.Subscription) {
7171
select {
7272
case <-ctx.Done():
73-
b.OnError(reactor.ErrSubscribeCancelled)
73+
b.OnError(reactor.NewContextError(ctx.Err()))
7474
default:
7575
subscription.Request(reactor.RequestInfinite)
7676
}

internal/subscribers/block_subscriber.go

+13-6
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55

66
"github.com/jjeffcaii/reactor-go"
7+
"github.com/jjeffcaii/reactor-go/hooks"
78
)
89

910
type blockSubscriber struct {
@@ -29,20 +30,26 @@ func (b blockSubscriber) OnComplete() {
2930
func (b blockSubscriber) OnError(err error) {
3031
select {
3132
case <-b.done:
33+
hooks.Global().OnErrorDrop(err)
3234
default:
33-
close(b.done)
34-
b.c <- reactor.Item{
35-
E: err,
35+
select {
36+
case b.c <- reactor.Item{E: err}:
37+
default:
38+
hooks.Global().OnErrorDrop(err)
3639
}
40+
close(b.done)
3741
}
3842
}
3943

4044
func (b blockSubscriber) OnNext(any reactor.Any) {
4145
select {
4246
case <-b.done:
47+
hooks.Global().OnNextDrop(any)
4348
default:
44-
b.c <- reactor.Item{
45-
V: any,
49+
select {
50+
case b.c <- reactor.Item{V: any}:
51+
default:
52+
hooks.Global().OnNextDrop(any)
4653
}
4754
}
4855
}
@@ -53,7 +60,7 @@ func (b blockSubscriber) OnSubscribe(ctx context.Context, subscription reactor.S
5360
go func() {
5461
select {
5562
case <-ctx.Done():
56-
b.OnError(reactor.ErrSubscribeCancelled)
63+
b.OnError(reactor.NewContextError(ctx.Err()))
5764
case <-b.done:
5865
}
5966
}()

internal/subscribers/do_finally.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func (d *DoFinallySubscriber) OnNext(v reactor.Any) {
6868
func (d *DoFinallySubscriber) OnSubscribe(ctx context.Context, s reactor.Subscription) {
6969
select {
7070
case <-ctx.Done():
71-
d.OnError(reactor.ErrSubscribeCancelled)
71+
d.OnError(reactor.NewContextError(ctx.Err()))
7272
default:
7373
d.s = s
7474
d.actual.OnSubscribe(ctx, d)

justfile

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
default:
2-
echo 'Hello, world!'
1+
alias t := test
2+
33
test:
4-
go test -cover -race -count=1 ./...
4+
go test -race -count=1 ./...
55
lint:
66
golangci-lint run ./...
77
fmt:
8-
go fmt ./...
8+
go fmt ./...

mono/mono.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,12 @@ import (
88
"github.com/jjeffcaii/reactor-go/scheduler"
99
)
1010

11-
type Any = reactor.Any
11+
// Alias
12+
type (
13+
Any = reactor.Any
14+
Disposable = reactor.Disposable
15+
)
16+
1217
type FlatMapper = func(reactor.Any) Mono
1318

1419
type Mono interface {

mono/mono_create.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func newMonoCreate(gen func(context.Context, Sink)) monoCreate {
5656

5757
select {
5858
case <-ctx.Done():
59-
sink.Error(reactor.ErrSubscribeCancelled)
59+
sink.Error(reactor.NewContextError(ctx.Err()))
6060
default:
6161
gen(ctx, sink)
6262
}

mono/mono_just.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func (j *justSubscription) Cancel() {
8383
func (m *monoJust) SubscribeWith(ctx context.Context, sub reactor.Subscriber) {
8484
select {
8585
case <-ctx.Done():
86-
sub.OnError(reactor.ErrSubscribeCancelled)
86+
sub.OnError(reactor.NewContextError(ctx.Err()))
8787
default:
8888
su := borrowJustSubscription()
8989
su.parent = m

mono/mono_zip.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ func (z *zipInner) OnNext(any reactor.Any) {
143143
func (z *zipInner) OnSubscribe(ctx context.Context, su reactor.Subscription) {
144144
select {
145145
case <-ctx.Done():
146-
z.OnError(reactor.ErrSubscribeCancelled)
146+
z.OnError(reactor.NewContextError(ctx.Err()))
147147
default:
148148
var exist bool
149149
z.Lock()
@@ -169,7 +169,7 @@ type monoZip struct {
169169
func (m *monoZip) SubscribeWith(ctx context.Context, sub reactor.Subscriber) {
170170
select {
171171
case <-ctx.Done():
172-
sub.OnError(reactor.ErrSubscribeCancelled)
172+
sub.OnError(reactor.NewContextError(ctx.Err()))
173173
default:
174174
c := newZipCoordinator(sub, len(m.sources))
175175
sub.OnSubscribe(ctx, c)

0 commit comments

Comments
 (0)