Skip to content

Commit

Permalink
Add unit tests for outlier ejection
Browse files Browse the repository at this point in the history
  • Loading branch information
wooyang2018 committed Sep 28, 2024
1 parent cf66b96 commit 67796fb
Show file tree
Hide file tree
Showing 5 changed files with 442 additions and 3 deletions.
3 changes: 2 additions & 1 deletion core/outlier/recycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ func (r *Recycler) scheduleNodes(nodes []string) {
for _, node := range nodes {
if _, ok := r.status[node]; !ok {
r.status[node] = false
nodeCopy := node // Copy values to correctly capture the closure for node.
time.AfterFunc(r.interval, func() {
r.recycle(node)
r.recycle(nodeCopy)
})
}
}
Expand Down
181 changes: 181 additions & 0 deletions core/outlier/recycler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Copyright 1999-2020 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package outlier

import (
"fmt"
"math/rand"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func newRecycler(resource string, interval time.Duration) *Recycler {
return &Recycler{
resource: resource,
status: make(map[string]bool),
interval: interval,
}
}

func addOutlierRuleForRecycler(resource string, seconds uint32) {
updateMux.Lock()
defer updateMux.Unlock()
outlierRules[resource] = &Rule{RecycleIntervalS: seconds}
}

// randomSample selects a random sample of nodes without modifying the original slice
func randomSample(nodes []string, sampleSize int) []string {
if sampleSize > len(nodes) {
sampleSize = len(nodes)
}
startIndex := rand.Intn(len(nodes)) // Select a random starting index
result := make([]string, 0, sampleSize)
for i := 0; i < sampleSize; i++ {
result = append(result, nodes[(startIndex+i)%len(nodes)])
}
return result
}

func generateNodes(n int) []string {
nodes := make([]string, n)
for i := 0; i < n; i++ {
nodes[i] = fmt.Sprintf("node%d", i)
}
return nodes
}

func TestRecycler(t *testing.T) {
nodes := []string{"node0", "node1"}
resource := "testResource"
addNodeBreakers(resource, nodes)

recycler := newRecycler(resource, 4*time.Second)
recycler.scheduleNodes(nodes)
assert.Equal(t, len(nodes), len(recycler.status))

// Restore node0 after 2 seconds of simulation
time.AfterFunc(2*time.Second, func() {
recycler.recover(nodes[0])
})
time.Sleep(5 * time.Second)

m := getNodeBreakersOfResource(resource)
assert.Equal(t, 1, len(m))
assert.Contains(t, m, nodes[0]) // node0 should have been recovered
}

func TestRecyclerConcurrent(t *testing.T) {
nodes := generateNodes(100) // Generate 100 nodes
resource := "testResource"
addNodeBreakers(resource, nodes)

numGoroutines := 10
var wg sync.WaitGroup
wg.Add(numGoroutines)
recycler := newRecycler(resource, 4*time.Second)

// Start multiple goroutines to schedule random nodes
for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()
numToSchedule := len(nodes) / numGoroutines * 2
selectedNodes := randomSample(nodes, numToSchedule)
recycler.scheduleNodes(selectedNodes)
}()
}
wg.Wait()

// Check the status of nodes
assert.GreaterOrEqual(t, len(nodes), len(recycler.status))
recycler.scheduleNodes(nodes)
assert.Equal(t, len(nodes), len(recycler.status))

// Recover node0 and node1 after 2 seconds
time.AfterFunc(2*time.Second, func() {
recycler.recover(nodes[0])
recycler.recover(nodes[1])
})
time.Sleep(5 * time.Second)

// node0 and node1 should be recovered
m := getNodeBreakersOfResource(resource)
assert.Equal(t, 2, len(m))
assert.Contains(t, m, nodes[0])
assert.Contains(t, m, nodes[1])
}

func TestRecyclerCh(t *testing.T) {
nodes := []string{"node0", "node1"}
resource := "testResource"
addNodeBreakers(resource, nodes)
addOutlierRuleForRecycler(resource, 4)

recyclerCh <- task{nodes, resource}

// Restore node0 after 2 seconds of simulation
time.AfterFunc(2*time.Second, func() {
recycler := getRecyclerOfResource(resource)
recycler.recover(nodes[0])
})
time.Sleep(5 * time.Second)

m := getNodeBreakersOfResource(resource)
assert.Equal(t, 1, len(m))
assert.Contains(t, m, nodes[0]) // node0 should have been recovered
}

func TestRecyclerChConcurrent(t *testing.T) {
nodes := generateNodes(100) // Generate 100 nodes
resource := "testResource"
addNodeBreakers(resource, nodes)
addOutlierRuleForRecycler(resource, 4)

numGoroutines := 10
var wg sync.WaitGroup
wg.Add(numGoroutines)

recycler := getRecyclerOfResource(resource)
// Start multiple goroutines to schedule random nodes
for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()
numToSchedule := len(nodes) / numGoroutines * 2
selectedNodes := randomSample(nodes, numToSchedule)
recyclerCh <- task{selectedNodes, resource}
}()
}
wg.Wait()

// Check the status of nodes
assert.GreaterOrEqual(t, len(nodes), len(recycler.status))
recycler.scheduleNodes(nodes)
assert.Equal(t, len(nodes), len(recycler.status))

// Recover node0 and node1 after 2 seconds
time.AfterFunc(2*time.Second, func() {
recycler.recover(nodes[0])
recycler.recover(nodes[1])
})
time.Sleep(5 * time.Second)

// node0 and node1 should be recovered
m := getNodeBreakersOfResource(resource)
assert.Equal(t, 2, len(m))
assert.Contains(t, m, nodes[0])
assert.Contains(t, m, nodes[1])
}
3 changes: 2 additions & 1 deletion core/outlier/retryer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ func (r *Retryer) scheduleNodes(nodes []string) {
if _, ok := r.counts[node]; !ok {
r.counts[node] = 1
logging.Info("[Outlier Retryer] Reconnecting...", "node", node)
nodeCopy := node // Copy values to correctly capture the closure for node.
time.AfterFunc(r.interval, func() {
r.connectNode(node)
r.connectNode(nodeCopy)
})
}
}
Expand Down
Loading

0 comments on commit 67796fb

Please sign in to comment.