Skip to content

Commit ab793cf

Browse files
authored
extproc: fixes race condition with RNG (#417)
**Commit Message** Previously, router instance shared the rng among multiple goroutines which is not a thread safe. This fixes it by having a goroutine local RNG. **Related Issues/PRs (if applicable)** Contributes to #53 --------- Signed-off-by: Takeshi Yoneda <[email protected]>
1 parent 1239952 commit ab793cf

File tree

2 files changed

+39
-5
lines changed

2 files changed

+39
-5
lines changed

internal/extproc/router/router.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,11 @@ import (
1717
// router implements [x.Router].
1818
type router struct {
1919
rules []filterapi.RouteRule
20-
rng *rand.Rand
2120
}
2221

2322
// New creates a new [x.Router] implementation for the given config.
2423
func New(config *filterapi.Config, newCustomFn x.NewCustomRouterFn) (x.Router, error) {
25-
r := &router{rules: config.Rules, rng: rand.New(rand.NewSource(uint64(time.Now().UnixNano())))} //nolint:gosec
24+
r := &router{rules: config.Rules}
2625
if newCustomFn != nil {
2726
customRouter := newCustomFn(r, config)
2827
return customRouter, nil
@@ -44,26 +43,32 @@ func (r *router) Calculate(headers map[string]string) (backend *filterapi.Backen
4443
}
4544
}
4645
}
47-
if rule == nil {
46+
if rule == nil || len(rule.Backends) == 0 {
4847
return nil, x.ErrNoMatchingRule
4948
}
5049
return r.selectBackendFromRule(rule), nil
5150
}
5251

52+
// selectBackendFromRule selects a backend from the given rule. Precondition: len(rule.Backends) > 0.
5353
func (r *router) selectBackendFromRule(rule *filterapi.RouteRule) (backend *filterapi.Backend) {
54+
if len(rule.Backends) == 1 {
55+
return &rule.Backends[0]
56+
}
57+
5458
// Each backend has a weight, so we randomly select depending on the weight.
5559
// This is a pretty naive implementation and can be buggy, so fix it later.
5660
totalWeight := 0
5761
for _, b := range rule.Backends {
5862
totalWeight += b.Weight
5963
}
6064

65+
rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) // nolint:gosec
6166
// Pick a random backend if none of them have a weight.
6267
if totalWeight == 0 {
63-
return &rule.Backends[r.rng.Intn(len(rule.Backends))]
68+
return &rule.Backends[rng.Intn(len(rule.Backends))]
6469
}
6570

66-
selected := r.rng.Intn(totalWeight)
71+
selected := rng.Intn(totalWeight)
6772
for i := range rule.Backends {
6873
b := &rule.Backends[i]
6974
if selected < b.Weight {

internal/extproc/router/router_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
package router
77

88
import (
9+
"sync"
10+
"sync/atomic"
911
"testing"
1012

1113
"github.com/stretchr/testify/require"
@@ -110,6 +112,33 @@ func TestRouter_Calculate(t *testing.T) {
110112
require.Greater(t, chosenNames["bar"], 700)
111113
require.Greater(t, chosenNames["foo"], 200)
112114
})
115+
116+
t.Run("concurrent access", func(t *testing.T) {
117+
var wg sync.WaitGroup
118+
wg.Add(1000)
119+
120+
var foo atomic.Int32
121+
var bar atomic.Int32
122+
for range 1000 {
123+
go func() {
124+
defer wg.Done()
125+
b, err := r.Calculate(map[string]string{"x-model-name": "llama3.3333"})
126+
require.NoError(t, err)
127+
require.NotNil(t, b)
128+
129+
if b.Name == "foo" {
130+
foo.Add(1)
131+
} else {
132+
bar.Add(1)
133+
}
134+
}()
135+
}
136+
wg.Wait()
137+
require.Equal(t, int32(1000), bar.Load()+foo.Load())
138+
require.Greater(t, bar.Load(), foo.Load())
139+
require.Greater(t, bar.Load(), int32(700))
140+
require.Greater(t, foo.Load(), int32(200))
141+
})
113142
}
114143

115144
func TestRouter_selectBackendFromRule(t *testing.T) {

0 commit comments

Comments
 (0)