Skip to content

Commit d7a49c4

Browse files
authored
fix: panics when calling BlockSlice (#128)
1 parent 7d6e9e0 commit d7a49c4

File tree

2 files changed

+21
-8
lines changed

2 files changed

+21
-8
lines changed

rx/flux/flux.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"github.com/jjeffcaii/reactor-go/flux"
77
"github.com/jjeffcaii/reactor-go/scheduler"
8+
89
"github.com/rsocket/rsocket-go/payload"
910
"github.com/rsocket/rsocket-go/rx"
1011
)
@@ -22,10 +23,10 @@ type Sink interface {
2223
Error(e error)
2324
}
2425

25-
// Flux represents represents a reactive sequence of 0..N items.
26+
// Flux represents a reactive sequence of 0..N items.
2627
type Flux interface {
2728
rx.Publisher
28-
// Take take only the first N values from this Flux, if available.
29+
// Take takes only the first N values from this Flux, if available.
2930
Take(n int) Flux
3031
// Filter evaluate each source value against the given Predicate.
3132
// If the predicate test succeeds, the value is emitted.

rx/flux/proxy.go

+18-6
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"github.com/jjeffcaii/reactor-go"
77
"github.com/jjeffcaii/reactor-go/flux"
88
"github.com/jjeffcaii/reactor-go/scheduler"
9+
910
"github.com/rsocket/rsocket-go/payload"
1011
"github.com/rsocket/rsocket-go/rx"
1112
)
@@ -76,12 +77,19 @@ func (p proxy) ToChan(ctx context.Context, cap int) (<-chan payload.Payload, <-c
7677
err <- reactor.ErrSubscribeCancelled
7778
}
7879
}).
80+
Map(func(any reactor.Any) (reactor.Any, error) {
81+
return payload.Clone(any.(payload.Payload)), nil
82+
}).
7983
SubscribeWithChan(ctx, ch, err)
8084
return ch, err
8185
}
8286

8387
func (p proxy) BlockFirst(ctx context.Context) (first payload.Payload, err error) {
84-
v, err := p.Flux.BlockFirst(ctx)
88+
v, err := p.Flux.
89+
Map(func(any reactor.Any) (reactor.Any, error) {
90+
return payload.Clone(any.(payload.Payload)), nil
91+
}).
92+
BlockFirst(ctx)
8593
if err != nil {
8694
return
8795
}
@@ -92,14 +100,18 @@ func (p proxy) BlockFirst(ctx context.Context) (first payload.Payload, err error
92100
}
93101

94102
func (p proxy) BlockLast(ctx context.Context) (last payload.Payload, err error) {
95-
v, err := p.Flux.BlockLast(ctx)
103+
v, err := p.Flux.
104+
Map(func(any reactor.Any) (reactor.Any, error) {
105+
return payload.Clone(any.(payload.Payload)), nil
106+
}).
107+
BlockLast(ctx)
96108
if err != nil {
97109
return
98110
}
99-
if v == nil {
100-
return
111+
if v != nil {
112+
last = v.(payload.Payload)
101113
}
102-
last = v.(payload.Payload)
114+
103115
return
104116
}
105117

@@ -119,7 +131,7 @@ func (p proxy) BlockSlice(ctx context.Context) (results []payload.Payload, err e
119131
Subscribe(
120132
ctx,
121133
reactor.OnNext(func(v reactor.Any) error {
122-
results = append(results, v.(payload.Payload))
134+
results = append(results, payload.Clone(v.(payload.Payload)))
123135
return nil
124136
}),
125137
reactor.OnError(func(e error) {

0 commit comments

Comments
 (0)