Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fix bugs of outlier unit tests
Browse files Browse the repository at this point in the history
wooyang2018 committed Oct 14, 2024
1 parent dd1b92f commit af130d0
Showing 17 changed files with 447 additions and 211 deletions.
3 changes: 0 additions & 3 deletions api/slot_chain.go
Original file line number Diff line number Diff line change
@@ -21,7 +21,6 @@ import (
"github.com/alibaba/sentinel-golang/core/hotspot"
"github.com/alibaba/sentinel-golang/core/isolation"
"github.com/alibaba/sentinel-golang/core/log"
"github.com/alibaba/sentinel-golang/core/outlier"
"github.com/alibaba/sentinel-golang/core/stat"
"github.com/alibaba/sentinel-golang/core/system"
)
@@ -41,13 +40,11 @@ func BuildDefaultSlotChain() *base.SlotChain {
sc.AddRuleCheckSlot(isolation.DefaultSlot)
sc.AddRuleCheckSlot(hotspot.DefaultSlot)
sc.AddRuleCheckSlot(circuitbreaker.DefaultSlot)
sc.AddRuleCheckSlot(outlier.DefaultSlot)

sc.AddStatSlot(stat.DefaultSlot)
sc.AddStatSlot(log.DefaultSlot)
sc.AddStatSlot(flow.DefaultStandaloneStatSlot)
sc.AddStatSlot(hotspot.DefaultConcurrencyStatSlot)
sc.AddStatSlot(circuitbreaker.DefaultMetricStatSlot)
sc.AddStatSlot(outlier.DefaultMetricStatSlot)
return sc
}
6 changes: 6 additions & 0 deletions core/outlier/recycler.go
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ package outlier

import (
"errors"
"fmt"
"sync"
"time"

@@ -38,6 +39,11 @@ type task struct {

func init() {
go func() {
defer func() {
if err := recover(); err != nil {
logging.Error(fmt.Errorf("%+v", err), "Unexpected panic when consuming recyclerCh")
}
}()
for task := range recyclerCh {
recycler := getRecyclerOfResource(task.resource)
recycler.scheduleNodes(task.nodes)
15 changes: 11 additions & 4 deletions core/outlier/recycler_test.go
Original file line number Diff line number Diff line change
@@ -59,7 +59,7 @@ func generateNodes(n int) []string {
return nodes
}

func TestRecycler(t *testing.T) {
func testRecycler(t *testing.T) {
nodes := []string{"node0", "node1"}
resource := "testResource"
addNodeBreakers(resource, nodes)
@@ -79,7 +79,7 @@ func TestRecycler(t *testing.T) {
assert.Contains(t, m, nodes[0]) // node0 should have been recovered
}

func TestRecyclerConcurrent(t *testing.T) {
func testRecyclerConcurrent(t *testing.T) {
nodes := generateNodes(100) // Generate 100 nodes
resource := "testResource"
addNodeBreakers(resource, nodes)
@@ -119,7 +119,7 @@ func TestRecyclerConcurrent(t *testing.T) {
assert.Contains(t, m, nodes[1])
}

func TestRecyclerCh(t *testing.T) {
func testRecyclerCh(t *testing.T) {
nodes := []string{"node0", "node1"}
resource := "testResource"
addNodeBreakers(resource, nodes)
@@ -139,7 +139,7 @@ func TestRecyclerCh(t *testing.T) {
assert.Contains(t, m, nodes[0]) // node0 should have been recovered
}

func TestRecyclerChConcurrent(t *testing.T) {
func testRecyclerChConcurrent(t *testing.T) {
nodes := generateNodes(100) // Generate 100 nodes
resource := "testResource"
addNodeBreakers(resource, nodes)
@@ -179,3 +179,10 @@ func TestRecyclerChConcurrent(t *testing.T) {
assert.Contains(t, m, nodes[0])
assert.Contains(t, m, nodes[1])
}

func TestRecyclerAll(t *testing.T) {
t.Run("TestRecycler", testRecycler)
t.Run("TestRecyclerConcurrent", testRecyclerConcurrent)
t.Run("TestRecyclerCh", testRecyclerCh)
t.Run("TestRecyclerChConcurrent", testRecyclerChConcurrent)
}
6 changes: 6 additions & 0 deletions core/outlier/retryer.go
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ package outlier

import (
"errors"
"fmt"
"net"
"sync"
"time"
@@ -32,6 +33,11 @@ var (

func init() {
go func() {
defer func() {
if err := recover(); err != nil {
logging.Error(fmt.Errorf("%+v", err), "Unexpected panic when consuming retryerCh")
}
}()
for task := range retryerCh {
retryer := getRetryerOfResource(task.resource)
retryer.scheduleNodes(task.nodes)
99 changes: 59 additions & 40 deletions core/outlier/retryer_test.go
Original file line number Diff line number Diff line change
@@ -28,47 +28,57 @@ import (
"github.com/alibaba/sentinel-golang/core/circuitbreaker"
)

var callCounts = make(map[string]int)
var recoverCount int
var mu sync.Mutex
type dummyCall struct {
callCounts map[string]int
recoverCount int
mtx sync.Mutex
}

func newDummyCall() *dummyCall {
return &dummyCall{
callCounts: make(map[string]int),
}
}

func registerAddress(address string, n int) {
mu.Lock()
defer mu.Unlock()
callCounts[address] = n
func (d *dummyCall) registerAddress(address string, n int) {
d.mtx.Lock()
defer d.mtx.Unlock()
d.callCounts[address] = n
}

// dummyCall checks whether the node address has returned to normal.
// dummyCall's Check checks whether the node address has returned to normal.
// It returns to normal when the value recorded in callCounts decreases to 0.
func dummyCall(address string) bool {
mu.Lock()
defer mu.Unlock()
if _, ok := callCounts[address]; ok {
callCounts[address]--
func (d *dummyCall) Check(address string) bool {
d.mtx.Lock()
defer d.mtx.Unlock()
if _, ok := d.callCounts[address]; ok {
d.callCounts[address]--
time.Sleep(100 * time.Millisecond) // simulate network latency
if callCounts[address] == 0 {
if d.callCounts[address] == 0 {
fmt.Printf("%s successfully reconnected\n", address)
recoverCount++
d.recoverCount++
return true
}
return false
}
fmt.Println(d.callCounts)
panic("Attempting to call an unregistered node address.")
return false

Check failure on line 66 in core/outlier/retryer_test.go

GitHub Actions / Lint check

unreachable: unreachable code (govet)
}

func getRecoverCount() int {
mu.Lock()
defer mu.Unlock()
return recoverCount
func (d *dummyCall) getRecoverCount() int {
d.mtx.Lock()
defer d.mtx.Unlock()
return d.recoverCount
}

func addOutlierRuleForRetryer(resource string, n, internal uint32) {
func addOutlierRuleForRetryer(resource string, n, internal uint32, f RecoveryCheckFunc) {
updateMux.Lock()
defer updateMux.Unlock()
outlierRules[resource] = &Rule{
MaxRecoveryAttempts: n,
RecoveryIntervalMs: internal,
RecoveryCheckFunc: dummyCall,
RecoveryCheckFunc: f,
}
}

@@ -124,14 +134,15 @@ func setNodeBreaker(resource string, node string, breaker *MockCircuitBreaker) {
// Construct two dummy node addresses: the first one recovers after the third check,
// and the second one recovers after math.MaxInt32 checks. Observe the changes in the
// circuit breaker and callCounts status for the first node before and after recovery.
func TestRetryer(t *testing.T) {
resource := "testResource"
func testRetryer(t *testing.T) {
resource := "testResource0"
nodes := []string{"node0", "node1"}
var internal, n uint32 = 1000, 3
registerAddress(nodes[0], int(n))
registerAddress(nodes[1], math.MaxInt32)
d := newDummyCall()
d.registerAddress(nodes[0], int(n))
d.registerAddress(nodes[1], math.MaxInt32)

addOutlierRuleForRetryer(resource, n, internal)
addOutlierRuleForRetryer(resource, n, internal, d.Check)
retryer := getRetryerOfResource(resource)
retryer.scheduleNodes(nodes)

@@ -140,31 +151,32 @@ func TestRetryer(t *testing.T) {
setNodeBreaker(resource, nodes[0], mockCB)

minDuration := time.Duration(n * (n + 1) / 2 * internal * 1e6)
for getRecoverCount() < 1 {
for d.getRecoverCount() < 1 {
time.Sleep(minDuration)
}
assert.Equal(t, len(nodes)-1, len(retryer.counts))
mockCB.AssertExpectations(t)
}

func TestRetryerConcurrent(t *testing.T) {
resource := "testResource"
func testRetryerConcurrent(t *testing.T) {
resource := "testResource1"
nodes := generateNodes(100) // Generate 100 nodes
var internal, n uint32 = 1000, 3
d := newDummyCall()
mockCBs := make([]*MockCircuitBreaker, 0, len(nodes)/2)
for i, node := range nodes {
if i%2 == 0 {
mockCB := new(MockCircuitBreaker)
mockCB.On("OnRequestComplete", mock.AnythingOfType("uint64"), nil).Return()
setNodeBreaker(resource, node, mockCB)
mockCBs = append(mockCBs, mockCB)
registerAddress(node, int(n))
d.registerAddress(node, int(n))
} else {
registerAddress(node, math.MaxInt32)
d.registerAddress(node, math.MaxInt32)
}
}

addOutlierRuleForRetryer(resource, n, internal)
fmt.Println(d.callCounts)
addOutlierRuleForRetryer(resource, n, internal, d.Check)
retryer := getRetryerOfResource(resource)
numGoroutines := 10
var wg sync.WaitGroup
@@ -187,7 +199,7 @@ func TestRetryerConcurrent(t *testing.T) {
assert.Equal(t, len(nodes), len(retryer.counts))

minDuration := time.Duration(n * (n + 1) / 2 * internal * 1e6)
for getRecoverCount() < len(nodes)/2 {
for d.getRecoverCount() < len(nodes)/2 {
time.Sleep(minDuration)
}
assert.Equal(t, len(nodes)/2, len(retryer.counts))
@@ -196,14 +208,15 @@ func TestRetryerConcurrent(t *testing.T) {
}
}

func TestRetryerCh(t *testing.T) {
func testRetryerCh(t *testing.T) {
nodes := []string{"node0", "node1"}
resource := "testResource"
resource := "testResource2"
var internal, n uint32 = 1000, 3
registerAddress(nodes[0], int(n))
registerAddress(nodes[1], math.MaxInt32)
d := newDummyCall()
d.registerAddress(nodes[0], int(n))
d.registerAddress(nodes[1], math.MaxInt32)

addOutlierRuleForRetryer(resource, n, internal)
addOutlierRuleForRetryer(resource, n, internal, d.Check)
retryer := getRetryerOfResource(resource)

mockCB := new(MockCircuitBreaker)
@@ -213,9 +226,15 @@ func TestRetryerCh(t *testing.T) {
retryerCh <- task{nodes, resource}

minDuration := time.Duration(n * (n + 1) / 2 * internal * 1e6)
for getRecoverCount() < 1 {
for d.getRecoverCount() < 1 {
time.Sleep(minDuration)
}
assert.Equal(t, len(nodes)-1, len(retryer.counts))
mockCB.AssertExpectations(t)
}

func TestRetryerAll(t *testing.T) {
t.Run("TestRetryer", testRetryer)
t.Run("TestRetryerConcurrent", testRetryerConcurrent)
t.Run("TestRetryerCh", testRetryerCh)
}
9 changes: 5 additions & 4 deletions core/outlier/slot.go
Original file line number Diff line number Diff line change
@@ -40,11 +40,14 @@ func (s *Slot) Check(ctx *base.EntryContext) *base.TokenResult {
if len(resource) == 0 {
return result
}

filterNodes, outlierNodes, halfOpenNodes := checkAllNodes(ctx)
result.SetFilterNodes(filterNodes)
result.SetHalfOpenNodes(halfOpenNodes)

if len(outlierNodes) != 0 {
if len(retryerCh) < capacity {
rule := getOutlierRuleOfResource(resource)
if rule.EnableActiveRecovery && len(retryerCh) < capacity {
retryerCh <- task{outlierNodes, resource}
}
if len(recyclerCh) < capacity {
@@ -66,9 +69,7 @@ func checkAllNodes(ctx *base.EntryContext) (filters []string, outliers []string,
}
continue
}
if rule.EnableActiveRecovery {
outliers = append(outliers, address)
}
outliers = append(outliers, address)
if len(filters) < int(float64(nodeCount)*rule.MaxEjectionPercent) {
filters = append(filters, address)
}
Loading

0 comments on commit af130d0

Please sign in to comment.