Skip to content

Commit

Permalink
Fix OnCompleted of outlier MetricStatSlot
Browse files Browse the repository at this point in the history
  • Loading branch information
wooyang2018 committed Sep 22, 2024
1 parent 6c81957 commit cf66b96
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 124 deletions.
2 changes: 2 additions & 0 deletions core/base/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,15 @@ func (r *TokenResult) NanosToWait() time.Duration {
func (r *TokenResult) FilterNodes() []string {
return r.filterNodes
}

func (r *TokenResult) HalfOpenNodes() []string {
return r.halfOpenNodes
}

func (r *TokenResult) SetFilterNodes(nodes []string) {
r.filterNodes = nodes
}

func (r *TokenResult) SetHalfOpenNodes(nodes []string) {
r.halfOpenNodes = nodes
}
Expand Down
11 changes: 7 additions & 4 deletions core/outlier/retryer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,11 @@ func getRetryerOfResource(resource string) *Retryer {

func isPortOpen(address string) bool {
conn, err := net.DialTimeout("tcp", address, 5*time.Second)
defer conn.Close()
return err == nil
if err == nil {
conn.Close()
return true
}
return false
}

func (r *Retryer) scheduleNodes(nodes []string) {
Expand All @@ -86,7 +89,7 @@ func (r *Retryer) scheduleNodes(nodes []string) {
for _, node := range nodes {
if _, ok := r.counts[node]; !ok {
r.counts[node] = 1
logging.Debug("[Outlier Retryer] Reconnecting...", "node", node)
logging.Info("[Outlier Retryer] Reconnecting...", "node", node)
time.AfterFunc(r.interval, func() {
r.connectNode(node)
})
Expand Down Expand Up @@ -114,7 +117,7 @@ func (r *Retryer) onConnected(node string, rt uint64) {
if breaker, ok := breakers[node]; ok {
breaker.OnRequestComplete(rt, nil)
} else {
logging.Info("[Outlier Retryer] Failed to update status after reconnection", "node", node)
logging.Warn("[Outlier Retryer] Failed to update status after reconnection", "node", node)
}
}

Expand Down
8 changes: 2 additions & 6 deletions core/outlier/slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,8 @@ func (s *Slot) Check(ctx *base.EntryContext) *base.TokenResult {
return result
}
filterNodes, outlierNodes, halfOpenNodes := checkAllNodes(ctx)
if len(filterNodes) != 0 {
result.SetFilterNodes(filterNodes)
}
if len(halfOpenNodes) != 0 {
result.SetHalfOpenNodes(halfOpenNodes)
}
result.SetFilterNodes(filterNodes)
result.SetHalfOpenNodes(halfOpenNodes)
if len(outlierNodes) != 0 {
if len(retryerCh) < capacity {
retryerCh <- task{outlierNodes, resource}
Expand Down
25 changes: 11 additions & 14 deletions core/outlier/stat_slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,18 @@ func (c *MetricStatSlot) OnCompleted(ctx *base.EntryContext) {
res := ctx.Resource.Name()
err := ctx.Err()
nodeBreakers := getNodeBreakersOfResource(res)
var address string
if address, ok := ctx.GetPair("address").(string); !ok || address == "" {
logging.Warn("[Outlier] Failed to get valid address", "resourceName", res)
return
}

if _, ok := nodeBreakers[address]; !ok {
addNodeBreakerOfResource(res, address)
nodeBreakers = getNodeBreakersOfResource(res)
}

breaker := nodeBreakers[address]
breaker.OnRequestComplete(ctx.Rt(), err)
if err == nil {
recycler := getRecyclerOfResource(res)
recycler.recover(address)
} else {
if _, ok2 := nodeBreakers[address]; !ok2 {
addNodeBreakerOfResource(res, address)
nodeBreakers = getNodeBreakersOfResource(res)
}
breaker := nodeBreakers[address]
breaker.OnRequestComplete(ctx.Rt(), err)
if err == nil {
recycler := getRecyclerOfResource(res)
recycler.recover(address)
}
}
}
53 changes: 25 additions & 28 deletions example/outlier/hello_kitex/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,40 @@ node_crash=${1:-false}
node_count=${2:-9} # node_count can only be 1 to 9

start_process() {
for ((i=1; i<=node_count; i++))
do
port=900$i
echo "Starting: go run . --server_address=:$port --node_crash=$node_crash"
go run . --server_address=:$port --node_crash=$node_crash &
pids[$i]=$!
done
for ((i = 1; i <= node_count; i++)); do
port=900$i
echo "Starting: go run . --server_address=:$port --node_crash=$node_crash"
go run . --server_address=:$port --node_crash=$node_crash &
pids[$i]=$!
done
}

stop_process() {
for ((i=1; i<=node_count; i++))
do
sleep 5
kill ${pids[$i]}
port=900$i
pgrep -f "hello_kitex --server_address=:$port" | xargs kill
echo "Killed process ${pids[$i]}"
done
for ((i = 1; i <= node_count; i++)); do
sleep 5
kill ${pids[$i]}
port=900$i
pgrep -f "hello_kitex --server_address=:$port" | xargs kill
echo "Killed process ${pids[$i]}"
done
}

restart_process() {
for ((i=1; i<=node_count; i++))
do
sleep 5
port=900$i
echo "Restarting: go run . --server_address=:$port --node_crash=$node_crash"
go run . --server_address=:$port --node_crash=$node_crash &
done
for ((i = 1; i <= node_count; i++)); do
sleep 5
port=900$i
echo "Restarting: go run . --server_address=:$port --node_crash=$node_crash"
go run . --server_address=:$port --node_crash=$node_crash &
done
}

if [ "$node_crash" = "true" ]; then
start_process
sleep 5
stop_process
restart_process
start_process
sleep 5
stop_process
restart_process
else
start_process
start_process
fi

wait
wait
2 changes: 1 addition & 1 deletion example/outlier/hello_kratos/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func main() {
StatIntervalMs: 1000,
Threshold: 1.0,
},
EnableActiveRecovery: true,
EnableActiveRecovery: false,
MaxEjectionPercent: 1.0,
RecoveryIntervalMs: 2000,
MaxRecoveryAttempts: 5,
Expand Down
57 changes: 27 additions & 30 deletions example/outlier/hello_kratos/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,45 +4,42 @@ node_crash=${1:-false}
node_count=${2:-9} # node_count can only be 1 to 9

start_process() {
for ((i=1; i<=node_count; i++))
do
http_port=800$i
grpc_port=900$i
echo "Starting: go run . --grpc_server_address=:$grpc_port --http_server_address=:$http_port --node_crash=$node_crash"
go run . --grpc_server_address=:$grpc_port --http_server_address=:$http_port --node_crash=$node_crash &
pids[$i]=$!
done
for ((i = 1; i <= node_count; i++)); do
http_port=800$i
grpc_port=900$i
echo "Starting: go run . --grpc_server_address=:$grpc_port --http_server_address=:$http_port --node_crash=$node_crash"
go run . --grpc_server_address=:$grpc_port --http_server_address=:$http_port --node_crash=$node_crash &
pids[$i]=$!
done
}

stop_process() {
for ((i=1; i<=node_count; i++))
do
sleep 5
kill ${pids[$i]}
port=900$i
pgrep -f "hello_kratos --grpc_server_address=:$port" | xargs kill
echo "Killed process ${pids[$i]}"
done
for ((i = 1; i <= node_count; i++)); do
sleep 5
kill ${pids[$i]}
port=900$i
pgrep -f "hello_kratos --grpc_server_address=:$port" | xargs kill
echo "Killed process ${pids[$i]}"
done
}

restart_process() {
for ((i=1; i<=node_count; i++))
do
sleep 5
http_port=800$i
grpc_port=900$i
echo "Restarting: go run . --grpc_server_address=:$grpc_port --http_server_address=:$http_port --node_crash=$node_crash"
go run . --grpc_server_address=:$grpc_port --http_server_address=:$http_port --node_crash=$node_crash &
done
for ((i = 1; i <= node_count; i++)); do
sleep 5
http_port=800$i
grpc_port=900$i
echo "Restarting: go run . --grpc_server_address=:$grpc_port --http_server_address=:$http_port --node_crash=$node_crash"
go run . --grpc_server_address=:$grpc_port --http_server_address=:$http_port --node_crash=$node_crash &
done
}

if [ "$node_crash" = "true" ]; then
start_process
sleep 5
stop_process
restart_process
start_process
sleep 5
stop_process
restart_process
else
start_process
start_process
fi

wait
wait
4 changes: 2 additions & 2 deletions example/outlier/hello_micro/client/client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package client
package main

import (
"context"
Expand Down Expand Up @@ -53,7 +53,7 @@ func main() {
StatIntervalMs: 1000,
Threshold: 1.0,
},
EnableActiveRecovery: true,
EnableActiveRecovery: false,
MaxEjectionPercent: 1.0,
RecoveryIntervalMs: 2000,
MaxRecoveryAttempts: 5,
Expand Down
53 changes: 25 additions & 28 deletions example/outlier/hello_micro/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,44 +4,41 @@ node_crash=${1:-false}
node_count=${2:-9} # node_count can only be 1 to 9

start_process() {
for ((i=1; i<=node_count; i++))
do
port=900$i
echo "Starting: go run . --server_address=:$port"
go run . --server_address=:$port &
pids[$i]=$!
done
for ((i = 1; i <= node_count; i++)); do
port=900$i
echo "Starting: go run . --server_address=:$port"
go run . --server_address=:$port &
pids[$i]=$!
done
}

stop_process() {
for ((i=1; i<=node_count; i++))
do
sleep 5
kill ${pids[$i]}
port=900$i
pgrep -f "hello_micro --server_address=:$port" | xargs kill
echo "Killed process ${pids[$i]}"
done
for ((i = 1; i <= node_count; i++)); do
sleep 5
kill ${pids[$i]}
port=900$i
pgrep -f "hello_micro --server_address=:$port" | xargs kill
echo "Killed process ${pids[$i]}"
done
}

restart_process() {
for ((i=1; i<=node_count; i++))
do
sleep 5
port=900$i
echo "Restarting: go run . --server_address=:$port"
go run . --server_address=:$port &
done
for ((i = 1; i <= node_count; i++)); do
sleep 5
port=900$i
echo "Restarting: go run . --server_address=:$port"
go run . --server_address=:$port &
done
}

sed -E -i 's/nodeCrash = (true|false)/nodeCrash = '$node_crash'/' handler.go
if [ "$node_crash" = "true" ]; then
start_process
sleep 5
stop_process
restart_process
start_process
sleep 5
stop_process
restart_process
else
start_process
start_process
fi

wait
wait
36 changes: 25 additions & 11 deletions pkg/adapters/kitex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
)

var filterNodes []string
var halfNodes []string

// SentinelClientMiddleware returns new client.Middleware
// Default resource name is {service's name}:{method}
Expand Down Expand Up @@ -53,6 +54,7 @@ func OutlierClientMiddleware(opts ...Option) func(endpoint.Endpoint) endpoint.En
)
defer entry.Exit()
filterNodes = entry.Context().FilterNodes()
halfNodes = entry.Context().HalfOpenNodes()
err := next(ctx, req, resp)
if callee := CalleeAddressExtract(ctx); callee != "" {
sentinel.TraceCallee(entry, callee)
Expand All @@ -67,18 +69,16 @@ func OutlierClientMiddleware(opts ...Option) func(endpoint.Endpoint) endpoint.En

func OutlierClientResolver(resolver discovery.Resolver) discovery.Resolver {
filterFunc := func(ctx context.Context, nodes []discovery.Instance) []discovery.Instance {
nodesMap := make(map[string]struct{})
for _, node := range filterNodes {
nodesMap[node] = struct{}{}
var nodesPost []discovery.Instance
if len(halfNodes) != 0 {
fmt.Println("Half Filter Pre: ", printNodes(nodes))
nodesPost = getRemainingNodes(nodes, halfNodes, true)
fmt.Println("Half Filter Post: ", printNodes(nodesPost))
} else {
fmt.Println("Filter Pre: ", printNodes(nodes))
nodesPost = getRemainingNodes(nodes, filterNodes, false)
fmt.Println("Filter Post: ", printNodes(nodesPost))
}
fmt.Println("Filter Pre: ", printNodes(nodes))
nodesPost := make([]discovery.Instance, 0)
for _, ep := range nodes {
if _, ok := nodesMap[ep.Address().String()]; !ok {
nodesPost = append(nodesPost, ep)
}
}
fmt.Println("Filter Post: ", printNodes(nodesPost))
return nodesPost
}
// Construct the filterRule and build rule based resolver
Expand All @@ -89,6 +89,20 @@ func OutlierClientResolver(resolver discovery.Resolver) discovery.Resolver {
return ruleBasedResolver.NewRuleBasedResolver(resolver, filterRule)
}

func getRemainingNodes(nodes []discovery.Instance, filters []string, flag bool) []discovery.Instance {
nodesMap := make(map[string]struct{})
for _, node := range filters {
nodesMap[node] = struct{}{}
}
nodesPost := make([]discovery.Instance, 0)
for _, ep := range nodes {
if _, ok := nodesMap[ep.Address().String()]; ok == flag {
nodesPost = append(nodesPost, ep)
}
}
return nodesPost
}

// TODO remove this func
func printNodes(nodes []discovery.Instance) (res []string) {
for _, v := range nodes {
Expand Down

0 comments on commit cf66b96

Please sign in to comment.