File tree 1 file changed +8
-1
lines changed
1 file changed +8
-1
lines changed Original file line number Diff line number Diff line change @@ -106,7 +106,8 @@ type Consumer struct {
106
106
channel string
107
107
config Config
108
108
109
- rng * rand.Rand
109
+ rngMtx sync.Mutex
110
+ rng * rand.Rand
110
111
111
112
needRDYRedistributed int32
112
113
@@ -369,8 +370,10 @@ func validatedLookupAddr(addr string) error {
369
370
func (r * Consumer ) lookupdLoop () {
370
371
// add some jitter so that multiple consumers discovering the same topic,
371
372
// when restarted at the same time, dont all connect at once.
373
+ r .rngMtx .Lock ()
372
374
jitter := time .Duration (int64 (r .rng .Float64 () *
373
375
r .config .LookupdPollJitter * float64 (r .config .LookupdPollInterval )))
376
+ r .rngMtx .Unlock ()
374
377
var ticker * time.Ticker
375
378
376
379
select {
@@ -830,7 +833,9 @@ func (r *Consumer) resume() {
830
833
r .backoff (time .Second )
831
834
return
832
835
}
836
+ r .rngMtx .Lock ()
833
837
idx := r .rng .Intn (len (conns ))
838
+ r .rngMtx .Unlock ()
834
839
choice := conns [idx ]
835
840
836
841
r .log (LogLevelWarning ,
@@ -1006,7 +1011,9 @@ func (r *Consumer) redistributeRDY() {
1006
1011
1007
1012
for len (possibleConns ) > 0 && availableMaxInFlight > 0 {
1008
1013
availableMaxInFlight --
1014
+ r .rngMtx .Lock ()
1009
1015
i := r .rng .Int () % len (possibleConns )
1016
+ r .rngMtx .Unlock ()
1010
1017
c := possibleConns [i ]
1011
1018
// delete
1012
1019
possibleConns = append (possibleConns [:i ], possibleConns [i + 1 :]... )
You can’t perform that action at this time.
0 commit comments