Skip to content

Commit 6a82fb1

Browse files
authored
Fixed requestResponseSubscriber race condition (#137)
1 parent 473989b commit 6a82fb1

File tree

1 file changed

+4
-4
lines changed

1 file changed

+4
-4
lines changed

internal/socket/subscriber_request_response.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ type requestResponseSubscriber struct {
3737
dc *DuplexConnection
3838
sid uint32
3939
receiving fragmentation.HeaderAndPayload
40-
sndCnt int32
40+
sndCnt atomic.Int32
4141
}
4242

4343
func borrowRequestResponseSubscriber(dc *DuplexConnection, sid uint32, receiving fragmentation.HeaderAndPayload) rx.Subscriber {
@@ -55,13 +55,13 @@ func returnRequestResponseSubscriber(s rx.Subscriber) {
5555
}
5656
actual.dc = nil
5757
actual.receiving = nil
58-
actual.sndCnt = 0
58+
actual.sndCnt.Store(0)
5959
globalRequestResponseSubscriberPool.put(actual)
6060
}
6161

6262
func (r *requestResponseSubscriber) OnNext(next payload.Payload) {
6363
r.dc.sendPayload(r.sid, next, core.FlagNext|core.FlagComplete)
64-
atomic.AddInt32(&r.sndCnt, 1)
64+
r.sndCnt.Add(1)
6565
}
6666

6767
func (r *requestResponseSubscriber) OnError(err error) {
@@ -73,7 +73,7 @@ func (r *requestResponseSubscriber) OnError(err error) {
7373
}
7474

7575
func (r *requestResponseSubscriber) OnComplete() {
76-
if atomic.AddInt32(&r.sndCnt, 1) == 1 {
76+
if r.sndCnt.Add(1) == 1 {
7777
r.dc.sendPayload(r.sid, payload.Empty(), core.FlagComplete)
7878
}
7979
r.dc.unregister(r.sid)

0 commit comments

Comments
 (0)