From af130d08e65c68f0b329289b14a34ce55119dba2 Mon Sep 17 00:00:00 2001 From: wooyang2018 <2201212838@stu.pku.edu.cn> Date: Mon, 14 Oct 2024 12:53:55 +0800 Subject: [PATCH] Fix bugs of outlier unit tests --- api/slot_chain.go | 3 - core/outlier/recycler.go | 6 + core/outlier/recycler_test.go | 15 ++- core/outlier/retryer.go | 6 + core/outlier/retryer_test.go | 99 +++++++++------ core/outlier/slot.go | 9 +- example/outlier/README.md | 98 ++++++++++++++- example/outlier/hello_kitex/client/client.go | 5 +- example/outlier/hello_kratos/client/client.go | 5 +- example/outlier/hello_micro/client/client.go | 5 +- pkg/adapters/kitex/client.go | 72 +++++------ pkg/adapters/kitex/options.go | 13 ++ pkg/adapters/kratos/client.go | 83 +++++++----- pkg/adapters/kratos/options.go | 64 +++++++++- pkg/adapters/micro/client.go | 118 +++++++++++------- pkg/adapters/micro/options.go | 12 +- pkg/adapters/micro/outlier_client.go | 45 +------ 17 files changed, 447 insertions(+), 211 deletions(-) diff --git a/api/slot_chain.go b/api/slot_chain.go index df9704f2c..35ae23340 100644 --- a/api/slot_chain.go +++ b/api/slot_chain.go @@ -21,7 +21,6 @@ import ( "github.com/alibaba/sentinel-golang/core/hotspot" "github.com/alibaba/sentinel-golang/core/isolation" "github.com/alibaba/sentinel-golang/core/log" - "github.com/alibaba/sentinel-golang/core/outlier" "github.com/alibaba/sentinel-golang/core/stat" "github.com/alibaba/sentinel-golang/core/system" ) @@ -41,13 +40,11 @@ func BuildDefaultSlotChain() *base.SlotChain { sc.AddRuleCheckSlot(isolation.DefaultSlot) sc.AddRuleCheckSlot(hotspot.DefaultSlot) sc.AddRuleCheckSlot(circuitbreaker.DefaultSlot) - sc.AddRuleCheckSlot(outlier.DefaultSlot) sc.AddStatSlot(stat.DefaultSlot) sc.AddStatSlot(log.DefaultSlot) sc.AddStatSlot(flow.DefaultStandaloneStatSlot) sc.AddStatSlot(hotspot.DefaultConcurrencyStatSlot) sc.AddStatSlot(circuitbreaker.DefaultMetricStatSlot) - sc.AddStatSlot(outlier.DefaultMetricStatSlot) return sc } diff --git a/core/outlier/recycler.go b/core/outlier/recycler.go index af462da74..68623256a 100644 --- a/core/outlier/recycler.go +++ b/core/outlier/recycler.go @@ -16,6 +16,7 @@ package outlier import ( "errors" + "fmt" "sync" "time" @@ -38,6 +39,11 @@ type task struct { func init() { go func() { + defer func() { + if err := recover(); err != nil { + logging.Error(fmt.Errorf("%+v", err), "Unexpected panic when consuming recyclerCh") + } + }() for task := range recyclerCh { recycler := getRecyclerOfResource(task.resource) recycler.scheduleNodes(task.nodes) diff --git a/core/outlier/recycler_test.go b/core/outlier/recycler_test.go index 5eb931ee3..ff46d4a9a 100644 --- a/core/outlier/recycler_test.go +++ b/core/outlier/recycler_test.go @@ -59,7 +59,7 @@ func generateNodes(n int) []string { return nodes } -func TestRecycler(t *testing.T) { +func testRecycler(t *testing.T) { nodes := []string{"node0", "node1"} resource := "testResource" addNodeBreakers(resource, nodes) @@ -79,7 +79,7 @@ func TestRecycler(t *testing.T) { assert.Contains(t, m, nodes[0]) // node0 should have been recovered } -func TestRecyclerConcurrent(t *testing.T) { +func testRecyclerConcurrent(t *testing.T) { nodes := generateNodes(100) // Generate 100 nodes resource := "testResource" addNodeBreakers(resource, nodes) @@ -119,7 +119,7 @@ func TestRecyclerConcurrent(t *testing.T) { assert.Contains(t, m, nodes[1]) } -func TestRecyclerCh(t *testing.T) { +func testRecyclerCh(t *testing.T) { nodes := []string{"node0", "node1"} resource := "testResource" addNodeBreakers(resource, nodes) @@ -139,7 +139,7 @@ func TestRecyclerCh(t *testing.T) { assert.Contains(t, m, nodes[0]) // node0 should have been recovered } -func TestRecyclerChConcurrent(t *testing.T) { +func testRecyclerChConcurrent(t *testing.T) { nodes := generateNodes(100) // Generate 100 nodes resource := "testResource" addNodeBreakers(resource, nodes) @@ -179,3 +179,10 @@ func TestRecyclerChConcurrent(t *testing.T) { assert.Contains(t, m, nodes[0]) assert.Contains(t, m, nodes[1]) } + +func TestRecyclerAll(t *testing.T) { + t.Run("TestRecycler", testRecycler) + t.Run("TestRecyclerConcurrent", testRecyclerConcurrent) + t.Run("TestRecyclerCh", testRecyclerCh) + t.Run("TestRecyclerChConcurrent", testRecyclerChConcurrent) +} diff --git a/core/outlier/retryer.go b/core/outlier/retryer.go index bfb495738..fd8ec5346 100644 --- a/core/outlier/retryer.go +++ b/core/outlier/retryer.go @@ -16,6 +16,7 @@ package outlier import ( "errors" + "fmt" "net" "sync" "time" @@ -32,6 +33,11 @@ var ( func init() { go func() { + defer func() { + if err := recover(); err != nil { + logging.Error(fmt.Errorf("%+v", err), "Unexpected panic when consuming retryerCh") + } + }() for task := range retryerCh { retryer := getRetryerOfResource(task.resource) retryer.scheduleNodes(task.nodes) diff --git a/core/outlier/retryer_test.go b/core/outlier/retryer_test.go index caa5ead73..a04ef5e4b 100644 --- a/core/outlier/retryer_test.go +++ b/core/outlier/retryer_test.go @@ -28,47 +28,57 @@ import ( "github.com/alibaba/sentinel-golang/core/circuitbreaker" ) -var callCounts = make(map[string]int) -var recoverCount int -var mu sync.Mutex +type dummyCall struct { + callCounts map[string]int + recoverCount int + mtx sync.Mutex +} + +func newDummyCall() *dummyCall { + return &dummyCall{ + callCounts: make(map[string]int), + } +} -func registerAddress(address string, n int) { - mu.Lock() - defer mu.Unlock() - callCounts[address] = n +func (d *dummyCall) registerAddress(address string, n int) { + d.mtx.Lock() + defer d.mtx.Unlock() + d.callCounts[address] = n } -// dummyCall checks whether the node address has returned to normal. +// dummyCall's Check checks whether the node address has returned to normal. // It returns to normal when the value recorded in callCounts decreases to 0. -func dummyCall(address string) bool { - mu.Lock() - defer mu.Unlock() - if _, ok := callCounts[address]; ok { - callCounts[address]-- +func (d *dummyCall) Check(address string) bool { + d.mtx.Lock() + defer d.mtx.Unlock() + if _, ok := d.callCounts[address]; ok { + d.callCounts[address]-- time.Sleep(100 * time.Millisecond) // simulate network latency - if callCounts[address] == 0 { + if d.callCounts[address] == 0 { fmt.Printf("%s successfully reconnected\n", address) - recoverCount++ + d.recoverCount++ return true } return false } + fmt.Println(d.callCounts) panic("Attempting to call an unregistered node address.") + return false } -func getRecoverCount() int { - mu.Lock() - defer mu.Unlock() - return recoverCount +func (d *dummyCall) getRecoverCount() int { + d.mtx.Lock() + defer d.mtx.Unlock() + return d.recoverCount } -func addOutlierRuleForRetryer(resource string, n, internal uint32) { +func addOutlierRuleForRetryer(resource string, n, internal uint32, f RecoveryCheckFunc) { updateMux.Lock() defer updateMux.Unlock() outlierRules[resource] = &Rule{ MaxRecoveryAttempts: n, RecoveryIntervalMs: internal, - RecoveryCheckFunc: dummyCall, + RecoveryCheckFunc: f, } } @@ -124,14 +134,15 @@ func setNodeBreaker(resource string, node string, breaker *MockCircuitBreaker) { // Construct two dummy node addresses: the first one recovers after the third check, // and the second one recovers after math.MaxInt32 checks. Observe the changes in the // circuit breaker and callCounts status for the first node before and after recovery. -func TestRetryer(t *testing.T) { - resource := "testResource" +func testRetryer(t *testing.T) { + resource := "testResource0" nodes := []string{"node0", "node1"} var internal, n uint32 = 1000, 3 - registerAddress(nodes[0], int(n)) - registerAddress(nodes[1], math.MaxInt32) + d := newDummyCall() + d.registerAddress(nodes[0], int(n)) + d.registerAddress(nodes[1], math.MaxInt32) - addOutlierRuleForRetryer(resource, n, internal) + addOutlierRuleForRetryer(resource, n, internal, d.Check) retryer := getRetryerOfResource(resource) retryer.scheduleNodes(nodes) @@ -140,17 +151,18 @@ func TestRetryer(t *testing.T) { setNodeBreaker(resource, nodes[0], mockCB) minDuration := time.Duration(n * (n + 1) / 2 * internal * 1e6) - for getRecoverCount() < 1 { + for d.getRecoverCount() < 1 { time.Sleep(minDuration) } assert.Equal(t, len(nodes)-1, len(retryer.counts)) mockCB.AssertExpectations(t) } -func TestRetryerConcurrent(t *testing.T) { - resource := "testResource" +func testRetryerConcurrent(t *testing.T) { + resource := "testResource1" nodes := generateNodes(100) // Generate 100 nodes var internal, n uint32 = 1000, 3 + d := newDummyCall() mockCBs := make([]*MockCircuitBreaker, 0, len(nodes)/2) for i, node := range nodes { if i%2 == 0 { @@ -158,13 +170,13 @@ func TestRetryerConcurrent(t *testing.T) { mockCB.On("OnRequestComplete", mock.AnythingOfType("uint64"), nil).Return() setNodeBreaker(resource, node, mockCB) mockCBs = append(mockCBs, mockCB) - registerAddress(node, int(n)) + d.registerAddress(node, int(n)) } else { - registerAddress(node, math.MaxInt32) + d.registerAddress(node, math.MaxInt32) } } - - addOutlierRuleForRetryer(resource, n, internal) + fmt.Println(d.callCounts) + addOutlierRuleForRetryer(resource, n, internal, d.Check) retryer := getRetryerOfResource(resource) numGoroutines := 10 var wg sync.WaitGroup @@ -187,7 +199,7 @@ func TestRetryerConcurrent(t *testing.T) { assert.Equal(t, len(nodes), len(retryer.counts)) minDuration := time.Duration(n * (n + 1) / 2 * internal * 1e6) - for getRecoverCount() < len(nodes)/2 { + for d.getRecoverCount() < len(nodes)/2 { time.Sleep(minDuration) } assert.Equal(t, len(nodes)/2, len(retryer.counts)) @@ -196,14 +208,15 @@ func TestRetryerConcurrent(t *testing.T) { } } -func TestRetryerCh(t *testing.T) { +func testRetryerCh(t *testing.T) { nodes := []string{"node0", "node1"} - resource := "testResource" + resource := "testResource2" var internal, n uint32 = 1000, 3 - registerAddress(nodes[0], int(n)) - registerAddress(nodes[1], math.MaxInt32) + d := newDummyCall() + d.registerAddress(nodes[0], int(n)) + d.registerAddress(nodes[1], math.MaxInt32) - addOutlierRuleForRetryer(resource, n, internal) + addOutlierRuleForRetryer(resource, n, internal, d.Check) retryer := getRetryerOfResource(resource) mockCB := new(MockCircuitBreaker) @@ -213,9 +226,15 @@ func TestRetryerCh(t *testing.T) { retryerCh <- task{nodes, resource} minDuration := time.Duration(n * (n + 1) / 2 * internal * 1e6) - for getRecoverCount() < 1 { + for d.getRecoverCount() < 1 { time.Sleep(minDuration) } assert.Equal(t, len(nodes)-1, len(retryer.counts)) mockCB.AssertExpectations(t) } + +func TestRetryerAll(t *testing.T) { + t.Run("TestRetryer", testRetryer) + t.Run("TestRetryerConcurrent", testRetryerConcurrent) + t.Run("TestRetryerCh", testRetryerCh) +} diff --git a/core/outlier/slot.go b/core/outlier/slot.go index bf407bb60..19eabb649 100644 --- a/core/outlier/slot.go +++ b/core/outlier/slot.go @@ -40,11 +40,14 @@ func (s *Slot) Check(ctx *base.EntryContext) *base.TokenResult { if len(resource) == 0 { return result } + filterNodes, outlierNodes, halfOpenNodes := checkAllNodes(ctx) result.SetFilterNodes(filterNodes) result.SetHalfOpenNodes(halfOpenNodes) + if len(outlierNodes) != 0 { - if len(retryerCh) < capacity { + rule := getOutlierRuleOfResource(resource) + if rule.EnableActiveRecovery && len(retryerCh) < capacity { retryerCh <- task{outlierNodes, resource} } if len(recyclerCh) < capacity { @@ -66,9 +69,7 @@ func checkAllNodes(ctx *base.EntryContext) (filters []string, outliers []string, } continue } - if rule.EnableActiveRecovery { - outliers = append(outliers, address) - } + outliers = append(outliers, address) if len(filters) < int(float64(nodeCount)*rule.MaxEjectionPercent) { filters = append(filters, address) } diff --git a/example/outlier/README.md b/example/outlier/README.md index a6c22d3a0..059d16ccf 100644 --- a/example/outlier/README.md +++ b/example/outlier/README.md @@ -1,10 +1,102 @@ -# Quick Start +# 离群摘除端到端示例 ## 概述 目的:构建用于测试离群摘除功能正确性的随机失败环境 -方法:启动由10个节点构成的服务,每个节点用id编号,节点启动后的第10+5*id秒时开始出现故障,故障时间持续10s +方法:启动由10个节点构成的服务,每个节点用id编号,节点启动后的第10+5*id秒时开始出现故障,故障时间持续20s - 为模拟节点的网络错误,故障时间段内对节点的RPC调用将会持续阻塞,直到故障恢复后才会返回; -- 为模拟节点的业务错误,故障时间段内对节点的RPC调用将会返回5XX错误,直到故障恢复后才返回正常。 \ No newline at end of file +- 为模拟节点的业务错误,故障时间段内对节点的RPC调用将会返回5XX错误,直到故障恢复后才返回正常。 + +## How to Run + +在`example/outlier`目录中有三个微服务框架(go-micro、kitex和kratos)的示例代码。这些示例实现了一个简单的业务逻辑——`echo hello`程序,并利用了`etcd`作为注册中心。示例中不仅实现了服务端代码,在`client`目录中还实现了集成离群摘除功能的客户端代码。下文中我们将会用这份代码来随机模拟节点崩溃故障和业务故障,下面首先介绍如何使用这份示例程序。 + +1. 安装注册中心:请按照[etcd安装文档](https://etcd.io/docs/v3.5/install/)的说明安装etcd。 + +2. 启动注册中心:在本地启动etcd,默认情况下,etcd监听端口为2379。 + +3. 启动服务进程:通过运行`setup.sh`脚本来启动指定数量的服务进程并模拟相应的故障情景。脚本的第一个参数`node_crash`用于指示是否模拟节点崩溃场景,默认值为false,第二个参数`node_count`指定启动服务进程的数量,默认值为9。比如对于 go-micro 框架,通过下面命令可以启动4个服务进程并模拟节点崩溃: + + ```shell + cd hello_micro && ./setup.sh true 4 + ``` + + +4. 启动客户端进程:直接进入`client`目录中通过`go run .`启动客户端程序,客户端每隔500ms会发起一次服务调用,总共发起200次调用并记录正确返回的次数,用于评价离群摘除的实现效果。比如对于 go-micro 框架,客户端启动命令如下: + + ```shell + cd hello_micro/client && go run . + ``` + +## 模拟节点崩溃 + +当`node_crash=true`时模拟的便是节点崩溃的场景。首先启动`node_count`个服务节点,然后按照节点编号的顺序每隔5s杀死一个节点(kill掉节点进程),等到杀死所有节点后,再每隔5s按照节点编号的顺序以相同的端口重新启动节点。`setup.sh`脚本中主要实现了模拟节点崩溃相关的如下三个函数。 + +1. 启动进程 (`start_process`函数): + - 在循环中,根据`node_count`启动指定数量的服务进程,每个进程监听不同的端口(9001到9009)。 + - 使用`go run . --server_address=:$port`命令启动服务,并将进程ID存储在数组`pids`中。 +2. 停止进程 (`stop_process`函数): + - 在循环中,等待5秒后,通过`kill`命令依次停止之前启动的服务进程。 + - 使用`pgrep -f "hello_micro --server_address=:$port" | xargs kill`确保杀死相关的进程。 +3. 重启进程(`restart_process`函数): + - 在循环中,等待5秒后,重启之前停止的服务进程,使用相同的端口。 + +流程控制:先调用`start_process`函数启动所有服务进程,然后等待5秒后调用`stop_process`函数停止进程,然后调用`restart_process`函数重启它们。 + +## 模拟业务错误 + +### 模拟方式 + +当`node_crash=false`时模拟的便是业务故障的场景。首先启动`node_count`个服务节点,然后按照节点编号的顺序每隔5s让一个节点进入业务故障期,该时间持续20s,在此期间收到的所有客户端请求都会返回错误响应,等到节点度过业务故障时期后,将会重新返回正确的响应。示例代码目录下的`handler.go`文件代码模拟了业务错误的场景,具体实现逻辑如下。 + +1. TestHandler结构体:定义`TestHandler`结构体,包含节点的ID和启动时间,这两个参数是为了方便计算业务出现故障的时间区间。 + +2. TestHandler的Ping方法:实现`TestHandler`结构体的`Ping`方法以处理传入的请求并返回给客户端响应: + + - 如果nodeCrash为false,则根据节点的ID计算出业务错误的时间范围: + + - `faultStartTime`是当前节点的启动时间`s.startTime`加上5秒,再加上节点ID乘以5秒。 + + ``` + faultStartTime := s.startTime.Add(5 * time.Second).Add(time.Duration(s.id) * 5 * time.Second) + ``` + + - 故障结束时间`faultEndTime`是`faultStartTime`加上20秒。 + + ``` + faultEndTime := faultStartTime.Add(20 * time.Second) + ``` + + - 接着获取当前时间currentTime,并检查其是否在业务错误的时间范围内: + + - 如果当前时间在`faultStartTime`和`faultEndTime`之间,返回一个空结果并抛出`errors.New("internal server error")`。 + - 如果当前时间不在错误范围内,则返回节点的欢迎信息`fmt.Sprintf("Welcome, I am node%d", s.id)`。 + +### 测试结果 + + +当服务进程处于业务故障时期,虽然其IP地址仍存在于客户端负载均衡列表中,对该节点的调用能够得到响应,但是这个响应不是200 OK而是包含服务端内部错误。所以我们希望能够借助离群摘除的能力,从负载均衡的列表中剔除这些业务故障的节点,从而提高服务调用的成功率。另一方面,我们也希望当节点业务恢复正常时,借助主动或者被动的恢复检测机制,能够及时将其加回到负载均衡列表中。 + +如下图所示,由于节点出现业务故障,不断摘除负载均衡列表中的异常节点。注意下图与模拟节点崩溃的场景不同的是,Filter Pre表示的过滤前的负载均衡列表,从始至终包含所有的9个节点的IP地址。所以如果没有Sentinel提供的客户端离群摘除的能力,单凭注册中心的离群摘除功能是不足以应对这种情况的,因为注册中心无法摘除仅仅是业务上出错的节点。 + +``` +Filter Pre: [10.113.43.181:9008 10.113.43.181:9009 10.113.43.181:9007 10.113.43.181:9005 10.113.43.181:9006 10.113.43.181:9003 10.113.43.181:9002 10.113.43.181:9004 10.113.43.181:9001] +Filter Post: [10.113.43.181:9008 10.113.43.181:9009 10.113.43.181:9007 10.113.43.181:9005 10.113.43.181:9006 10.113.43.181:9003 10.113.43.181:9002 10.113.43.181:9004] +2024/10/14 14:07:14 rpc error: code = Unknown desc = internal server error +Filter Pre: [10.113.43.181:9008 10.113.43.181:9009 10.113.43.181:9007 10.113.43.181:9005 10.113.43.181:9006 10.113.43.181:9003 10.113.43.181:9002 10.113.43.181:9004 10.113.43.181:9001] +Filter Post: [10.113.43.181:9008 10.113.43.181:9009 10.113.43.181:9007 10.113.43.181:9005 10.113.43.181:9006 10.113.43.181:9003 10.113.43.181:9004] +``` +如下图所示,触发了两次被动恢复检测,当节点熔断器处于半开状态时,会周期性放行探测请求,如果请求响应成功则认为节点恢复。下图中对node8的检测成功了,对node9的检测仍旧失败了,检测成功的节点将会在下一次服务调用的时候加入到负载均衡列表中。下图中Filter Post确实可以看到node8被加入到了过滤后的负载均衡列表中。 +``` +Half Filter Pre: [10.113.43.181:9004 10.113.43.181:9006 10.113.43.181:9009 10.113.43.181:9001 10.113.43.181:9003 10.113.43.181:9007 10.113.43.181:9005 10.113.43.181:9008 10.113.43.181:9002] +Half Filter Post: [10.113.43.181:9008] +2024/10/14 14:10:09 message:"Welcome Bob,I am node8" +Half Filter Pre: [10.113.43.181:9004 10.113.43.181:9006 10.113.43.181:9009 10.113.43.181:9001 10.113.43.181:9003 10.113.43.181:9007 10.113.43.181:9005 10.113.43.181:9008 10.113.43.181:9002] +Half Filter Post: [10.113.43.181:9009] +2024/10/14 14:10:09 rpc error: code = Unknown desc = internal server error +Filter Pre: [10.113.43.181:9004 10.113.43.181:9006 10.113.43.181:9009 10.113.43.181:9001 10.113.43.181:9003 10.113.43.181:9007 10.113.43.181:9005 10.113.43.181:9008 10.113.43.181:9002] +Filter Post: [10.113.43.181:9004 10.113.43.181:9006 10.113.43.181:9001 10.113.43.181:9003 10.113.43.181:9007 10.113.43.181:9005 10.113.43.181:9008 10.113.43.181:9002] +2024/10/14 14:10:10 message:"Welcome Bob,I am node1" +``` \ No newline at end of file diff --git a/example/outlier/hello_kitex/client/client.go b/example/outlier/hello_kitex/client/client.go index 916f14d1a..d0aac2698 100644 --- a/example/outlier/hello_kitex/client/client.go +++ b/example/outlier/hello_kitex/client/client.go @@ -23,7 +23,10 @@ func initOutlierClient() hello.Client { } c, err := hello.NewClient("example.helloworld", client.WithResolver(kitex.OutlierClientResolver(resolver)), - client.WithMiddleware(kitex.OutlierClientMiddleware()), + client.WithMiddleware(kitex.SentinelClientMiddleware( + kitex.WithEnableOutlier(func(ctx context.Context) bool { + return true + }))), ) if err != nil { log.Fatal(err) diff --git a/example/outlier/hello_kratos/client/client.go b/example/outlier/hello_kratos/client/client.go index ff3a617fd..3b50cf8aa 100644 --- a/example/outlier/hello_kratos/client/client.go +++ b/example/outlier/hello_kratos/client/client.go @@ -42,7 +42,10 @@ func initOutlierClient() pb.GreeterClient { grpc.WithEndpoint(endpoint), grpc.WithDiscovery(etcdReg), grpc.WithNodeFilter(kratos.OutlierClientFilter), - grpc.WithMiddleware(kratos.OutlierClientMiddleware), + grpc.WithMiddleware(kratos.SentinelClientMiddleware( + kratos.WithEnableOutlier(func(ctx context.Context) bool { + return true + }))), ) if err != nil { log.Fatal(err) diff --git a/example/outlier/hello_micro/client/client.go b/example/outlier/hello_micro/client/client.go index 49e312afd..9d0de6032 100644 --- a/example/outlier/hello_micro/client/client.go +++ b/example/outlier/hello_micro/client/client.go @@ -32,7 +32,10 @@ func initOutlierClient() client.Client { micro.Name(serviceName), micro.Version(version), micro.Selector(sel), - micro.WrapClient(microAdapter.NewOutlierClientWrapper()), + micro.WrapClient(microAdapter.NewClientWrapper( + microAdapter.WithEnableOutlier(func(ctx context.Context) bool { + return true + }))), ) return srv.Client() } diff --git a/pkg/adapters/kitex/client.go b/pkg/adapters/kitex/client.go index dc72e7953..53a401c7f 100644 --- a/pkg/adapters/kitex/client.go +++ b/pkg/adapters/kitex/client.go @@ -10,6 +10,7 @@ import ( sentinel "github.com/alibaba/sentinel-golang/api" "github.com/alibaba/sentinel-golang/core/base" + "github.com/alibaba/sentinel-golang/core/outlier" ) var filterNodes []string @@ -23,46 +24,45 @@ func SentinelClientMiddleware(opts ...Option) func(endpoint.Endpoint) endpoint.E options := newOptions(opts) return func(next endpoint.Endpoint) endpoint.Endpoint { return func(ctx context.Context, req, resp interface{}) error { - resourceName := options.ResourceExtract(ctx, req, resp) - entry, blockErr := sentinel.Entry( - resourceName, - sentinel.WithResourceType(base.ResTypeRPC), - sentinel.WithTrafficType(base.Outbound), - ) - if blockErr != nil { - return options.BlockFallback(ctx, req, resp, blockErr) - } - defer entry.Exit() - err := next(ctx, req, resp) - if err != nil { - sentinel.TraceError(entry, err) - } - return err - } - } -} - -// OutlierClientMiddleware returns new client.Middleware specifically for outlier ejection. -func OutlierClientMiddleware(opts ...Option) func(endpoint.Endpoint) endpoint.Endpoint { - return func(next endpoint.Endpoint) endpoint.Endpoint { - return func(ctx context.Context, req, resp interface{}) error { - resourceName := ServiceNameExtract(ctx) - entry, _ := sentinel.Entry( - resourceName, - sentinel.WithResourceType(base.ResTypeRPC), - sentinel.WithTrafficType(base.Outbound), - ) - defer entry.Exit() - filterNodes = entry.Context().FilterNodes() - halfNodes = entry.Context().HalfOpenNodes() - err := next(ctx, req, resp) - if callee := CalleeAddressExtract(ctx); callee != "" { - sentinel.TraceCallee(entry, callee) + if !options.EnableOutlier(ctx) { + resourceName := options.ResourceExtract(ctx, req, resp) + entry, blockErr := sentinel.Entry( + resourceName, + sentinel.WithResourceType(base.ResTypeRPC), + sentinel.WithTrafficType(base.Outbound), + ) + if blockErr != nil { + return options.BlockFallback(ctx, req, resp, blockErr) + } + defer entry.Exit() + err := next(ctx, req, resp) if err != nil { sentinel.TraceError(entry, err) } + return err + } else { // returns new client middleware specifically for outlier ejection. + slotChain := sentinel.BuildDefaultSlotChain() + slotChain.AddRuleCheckSlot(outlier.DefaultSlot) + slotChain.AddStatSlot(outlier.DefaultMetricStatSlot) + resourceName := ServiceNameExtract(ctx) + entry, _ := sentinel.Entry( + resourceName, + sentinel.WithResourceType(base.ResTypeRPC), + sentinel.WithTrafficType(base.Outbound), + sentinel.WithSlotChain(slotChain), + ) + defer entry.Exit() + filterNodes = entry.Context().FilterNodes() + halfNodes = entry.Context().HalfOpenNodes() + err := next(ctx, req, resp) + if callee := CalleeAddressExtract(ctx); callee != "" { + sentinel.TraceCallee(entry, callee) + if err != nil { + sentinel.TraceError(entry, err) + } + } + return err } - return err } } } diff --git a/pkg/adapters/kitex/options.go b/pkg/adapters/kitex/options.go index 83fcf5879..f6975b5ce 100644 --- a/pkg/adapters/kitex/options.go +++ b/pkg/adapters/kitex/options.go @@ -14,6 +14,7 @@ type Option struct { type options struct { ResourceExtract func(ctx context.Context, req, resp interface{}) string BlockFallback func(ctx context.Context, req, resp interface{}, blockErr error) error + EnableOutlier func(ctx context.Context) bool } func DefaultBlockFallback(ctx context.Context, req, resp interface{}, blockErr error) error { @@ -25,10 +26,15 @@ func DefaultResourceExtract(ctx context.Context, req, resp interface{}) string { return ri.To().ServiceName() + ":" + ri.To().Method() } +func DefaultEnableOutlier(ctx context.Context) bool { + return false +} + func newOptions(opts []Option) *options { o := &options{ ResourceExtract: DefaultResourceExtract, BlockFallback: DefaultBlockFallback, + EnableOutlier: DefaultEnableOutlier, } o.Apply(opts) return o @@ -54,6 +60,13 @@ func WithBlockFallback(f func(ctx context.Context, req, resp interface{}, blockE }} } +// WithEnableOutlier sets whether to enable outlier ejection +func WithEnableOutlier(f func(ctx context.Context) bool) Option { + return Option{func(o *options) { + o.EnableOutlier = f + }} +} + func ServiceNameExtract(ctx context.Context) string { rpcInfo := rpcinfo.GetRPCInfo(ctx) return rpcInfo.To().ServiceName() diff --git a/pkg/adapters/kratos/client.go b/pkg/adapters/kratos/client.go index 3976f94ea..298fd36de 100644 --- a/pkg/adapters/kratos/client.go +++ b/pkg/adapters/kratos/client.go @@ -8,8 +8,9 @@ import ( "github.com/go-kratos/kratos/v2/middleware" "github.com/go-kratos/kratos/v2/selector" - sentinelApi "github.com/alibaba/sentinel-golang/api" + sentinel "github.com/alibaba/sentinel-golang/api" "github.com/alibaba/sentinel-golang/core/base" + "github.com/alibaba/sentinel-golang/core/outlier" ) const filterNodesKey = "filterNodes" @@ -34,38 +35,62 @@ func OutlierClientFilter(ctx context.Context, nodes []selector.Node) []selector. return nodesPost } -func OutlierClientMiddleware(src middleware.Handler) middleware.Handler { - return func(ctx context.Context, req interface{}) (interface{}, error) { - resourceName := ServiceNameExtract(ctx) - if resourceName == "" { - return nil, fmt.Errorf("resource name is empty") - } - entry, _ := sentinelApi.Entry( - resourceName, - sentinelApi.WithResourceType(base.ResTypeRPC), - sentinelApi.WithTrafficType(base.Outbound), - ) - defer entry.Exit() +// fix me: Only the outlier ejection of the kratos adapter has been verified, +// and the flow control capabilities still need to be validated. +func SentinelClientMiddleware(opts ...Option) middleware.Middleware { + options := newOptions(opts) + return func(src middleware.Handler) middleware.Handler { + return func(ctx context.Context, req interface{}) (interface{}, error) { + if !options.EnableOutlier(ctx) { + resourceName := options.ResourceExtract(ctx, req) + entry, blockErr := sentinel.Entry( + resourceName, + sentinel.WithResourceType(base.ResTypeRPC), + sentinel.WithTrafficType(base.Outbound), + ) + if blockErr != nil { + return options.BlockFallback(ctx, req, blockErr) + } + defer entry.Exit() + resp, err := src(ctx, req) + if err != nil { + sentinel.TraceError(entry, err) + } + return resp, err + } else { // returns new client middleware specifically for outlier ejection. + resourceName := ServiceNameExtract(ctx) + slotChain := sentinel.BuildDefaultSlotChain() + slotChain.AddRuleCheckSlot(outlier.DefaultSlot) + slotChain.AddStatSlot(outlier.DefaultMetricStatSlot) + entry, _ := sentinel.Entry( + resourceName, + sentinel.WithResourceType(base.ResTypeRPC), + sentinel.WithTrafficType(base.Outbound), + sentinel.WithSlotChain(slotChain), + ) + defer entry.Exit() - if v, ok := metadata.FromClientContext(ctx); ok { - filterNodes := entry.Context().FilterNodes() - for _, node := range filterNodes { - v.Add(filterNodesKey, node) - } - halfNodes := entry.Context().HalfOpenNodes() - for _, node := range halfNodes { - v.Add(halfNodesKey, node) - } - } + if v, ok := metadata.FromClientContext(ctx); ok { + filterNodes := entry.Context().FilterNodes() + for _, node := range filterNodes { + v.Add(filterNodesKey, node) + } + halfNodes := entry.Context().HalfOpenNodes() + for _, node := range halfNodes { + v.Add(halfNodesKey, node) + } + } - res, err := src(ctx, req) - if p, ok := selector.FromPeerContext(ctx); ok && p.Node != nil { - sentinelApi.TraceCallee(entry, p.Node.Address()) - if err != nil { - sentinelApi.TraceError(entry, err) + res, err := src(ctx, req) + if p, ok := selector.FromPeerContext(ctx); ok && p.Node != nil { + sentinel.TraceCallee(entry, p.Node.Address()) + if err != nil { + sentinel.TraceError(entry, err) + } + } + return res, err } } - return res, err } } diff --git a/pkg/adapters/kratos/options.go b/pkg/adapters/kratos/options.go index e0967c0e1..9b6eb2475 100644 --- a/pkg/adapters/kratos/options.go +++ b/pkg/adapters/kratos/options.go @@ -7,6 +7,68 @@ import ( "github.com/go-kratos/kratos/v2/transport" ) +type Option struct { + F func(o *options) +} + +type options struct { + ResourceExtract func(ctx context.Context, req interface{}) string + BlockFallback func(ctx context.Context, req interface{}, blockErr error) (interface{}, error) + EnableOutlier func(ctx context.Context) bool +} + +func DefaultResourceExtract(ctx context.Context, req interface{}) string { + if v, ok := transport.FromClientContext(ctx); ok { + return v.Operation() + } + panic("operation is empty") +} + +func DefaultBlockFallback(ctx context.Context, req interface{}, blockErr error) (interface{}, error) { + return nil, blockErr +} + +func DefaultEnableOutlier(ctx context.Context) bool { + return false +} + +func newOptions(opts []Option) *options { + o := &options{ + ResourceExtract: DefaultResourceExtract, + BlockFallback: DefaultBlockFallback, + EnableOutlier: DefaultEnableOutlier, + } + o.Apply(opts) + return o +} + +func (o *options) Apply(opts []Option) { + for _, op := range opts { + op.F(o) + } +} + +// WithResourceExtract sets the resource extractor +func WithResourceExtract(f func(ctx context.Context, req interface{}) string) Option { + return Option{F: func(o *options) { + o.ResourceExtract = f + }} +} + +// WithBlockFallback sets the fallback handler +func WithBlockFallback(f func(ctx context.Context, req interface{}, blockErr error) (interface{}, error)) Option { + return Option{func(o *options) { + o.BlockFallback = f + }} +} + +// WithEnableOutlier sets whether to enable outlier ejection +func WithEnableOutlier(f func(ctx context.Context) bool) Option { + return Option{func(o *options) { + o.EnableOutlier = f + }} +} + func ServiceNameExtract(ctx context.Context) string { if v, ok := transport.FromClientContext(ctx); ok { res := v.Endpoint() @@ -14,5 +76,5 @@ func ServiceNameExtract(ctx context.Context) string { return strings.TrimPrefix(res, "discovery:///") } } - return "" + panic("resource name is empty") } diff --git a/pkg/adapters/micro/client.go b/pkg/adapters/micro/client.go index 2de525770..6a4df48e0 100644 --- a/pkg/adapters/micro/client.go +++ b/pkg/adapters/micro/client.go @@ -3,9 +3,11 @@ package micro import ( "context" + "github.com/micro/go-micro/v2/client" + sentinel "github.com/alibaba/sentinel-golang/api" "github.com/alibaba/sentinel-golang/core/base" - "github.com/micro/go-micro/v2/client" + "github.com/alibaba/sentinel-golang/core/outlier" ) type clientWrapper struct { @@ -14,63 +16,89 @@ type clientWrapper struct { } func (c *clientWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { - resourceName := req.Method() options := evaluateOptions(c.Opts) + if options.enableOutlier == nil || !options.enableOutlier(ctx) { + resourceName := req.Method() + if options.clientResourceExtract != nil { + resourceName = options.clientResourceExtract(ctx, req) + } - if options.clientResourceExtract != nil { - resourceName = options.clientResourceExtract(ctx, req) - } - - entry, blockErr := sentinel.Entry( - resourceName, - sentinel.WithResourceType(base.ResTypeRPC), - sentinel.WithTrafficType(base.Outbound), - ) - - if blockErr != nil { - if options.clientBlockFallback != nil { - return options.clientBlockFallback(ctx, req, blockErr) + entry, blockErr := sentinel.Entry( + resourceName, + sentinel.WithResourceType(base.ResTypeRPC), + sentinel.WithTrafficType(base.Outbound), + ) + if blockErr != nil { + if options.clientBlockFallback != nil { + return options.clientBlockFallback(ctx, req, blockErr) + } + return blockErr } - return blockErr - } - defer entry.Exit() + defer entry.Exit() - err := c.Client.Call(ctx, req, rsp, opts...) - if err != nil { - sentinel.TraceError(entry, err) + err := c.Client.Call(ctx, req, rsp, opts...) + if err != nil { + sentinel.TraceError(entry, err) + } + return err + } else { // returns new client middleware specifically for outlier ejection. + slotChain := sentinel.BuildDefaultSlotChain() + slotChain.AddRuleCheckSlot(outlier.DefaultSlot) + slotChain.AddStatSlot(outlier.DefaultMetricStatSlot) + entry, _ := sentinel.Entry( + req.Service(), + sentinel.WithResourceType(base.ResTypeRPC), + sentinel.WithTrafficType(base.Outbound), + sentinel.WithSlotChain(slotChain), + ) + defer entry.Exit() + opts = append(opts, WithSelectOption(entry)) + opts = append(opts, WithCallWrapper(entry)) + return c.Client.Call(ctx, req, rsp, opts...) } - - return err } func (c *clientWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { options := evaluateOptions(c.Opts) - resourceName := req.Method() - - if options.streamClientResourceExtract != nil { - resourceName = options.streamClientResourceExtract(ctx, req) - } - - entry, blockErr := sentinel.Entry( - resourceName, - sentinel.WithResourceType(base.ResTypeRPC), - sentinel.WithTrafficType(base.Outbound), - ) + if options.enableOutlier == nil || !options.enableOutlier(ctx) { + resourceName := req.Method() + if options.streamClientResourceExtract != nil { + resourceName = options.streamClientResourceExtract(ctx, req) + } - if blockErr != nil { - if options.streamClientBlockFallback != nil { - return options.streamClientBlockFallback(ctx, req, blockErr) + entry, blockErr := sentinel.Entry( + resourceName, + sentinel.WithResourceType(base.ResTypeRPC), + sentinel.WithTrafficType(base.Outbound), + ) + if blockErr != nil { + if options.streamClientBlockFallback != nil { + return options.streamClientBlockFallback(ctx, req, blockErr) + } + return nil, blockErr } - return nil, blockErr - } - defer entry.Exit() + defer entry.Exit() - stream, err := c.Client.Stream(ctx, req, opts...) - if err != nil { - sentinel.TraceError(entry, err) + stream, err := c.Client.Stream(ctx, req, opts...) + if err != nil { + sentinel.TraceError(entry, err) + } + return stream, err + } else { + slotChain := sentinel.GlobalSlotChain() + slotChain.AddRuleCheckSlot(outlier.DefaultSlot) + slotChain.AddStatSlot(outlier.DefaultMetricStatSlot) + entry, _ := sentinel.Entry( + req.Service(), + sentinel.WithResourceType(base.ResTypeRPC), + sentinel.WithTrafficType(base.Outbound), + sentinel.WithSlotChain(slotChain), + ) + defer entry.Exit() + opts = append(opts, WithSelectOption(entry)) + opts = append(opts, WithCallWrapper(entry)) + return c.Client.Stream(ctx, req, opts...) } - - return stream, err } // NewClientWrapper returns a sentinel client Wrapper. diff --git a/pkg/adapters/micro/options.go b/pkg/adapters/micro/options.go index 924b0bdc4..6e3a3dbff 100644 --- a/pkg/adapters/micro/options.go +++ b/pkg/adapters/micro/options.go @@ -3,9 +3,10 @@ package micro import ( "context" - "github.com/alibaba/sentinel-golang/core/base" "github.com/micro/go-micro/v2/client" "github.com/micro/go-micro/v2/server" + + "github.com/alibaba/sentinel-golang/core/base" ) type ( @@ -23,6 +24,8 @@ type ( streamClientBlockFallback func(context.Context, client.Request, *base.BlockError) (client.Stream, error) streamServerBlockFallback func(server.Stream, *base.BlockError) server.Stream + + enableOutlier func(ctx context.Context) bool } ) @@ -84,6 +87,13 @@ func WithStreamServerBlockFallback(fn func(server.Stream, *base.BlockError) serv } } +// WithEnableOutlier sets whether to enable outlier ejection +func WithEnableOutlier(fn func(ctx context.Context) bool) Option { + return func(opts *options) { + opts.enableOutlier = fn + } +} + func evaluateOptions(opts []Option) *options { optCopy := &options{} for _, o := range opts { diff --git a/pkg/adapters/micro/outlier_client.go b/pkg/adapters/micro/outlier_client.go index 643e04982..bd08f1bd2 100644 --- a/pkg/adapters/micro/outlier_client.go +++ b/pkg/adapters/micro/outlier_client.go @@ -9,49 +9,10 @@ import ( "github.com/micro/go-micro/v2/client/selector" "github.com/micro/go-micro/v2/registry" - sentinelApi "github.com/alibaba/sentinel-golang/api" + sentinel "github.com/alibaba/sentinel-golang/api" "github.com/alibaba/sentinel-golang/core/base" ) -type outlierClientWrapper struct { - client.Client -} - -// NewOutlierClientWrapper returns a sentinel outlier client Wrapper. -func NewOutlierClientWrapper(opts ...Option) client.Wrapper { - return func(c client.Client) client.Client { - return &outlierClientWrapper{c} - } -} - -func (c *outlierClientWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { - entry, _ := sentinelApi.Entry( - req.Service(), - sentinelApi.WithResourceType(base.ResTypeRPC), - sentinelApi.WithTrafficType(base.Outbound), - ) - defer entry.Exit() - opts = append(opts, WithSelectOption(entry)) - opts = append(opts, WithCallWrapper(entry)) - return c.Client.Call(ctx, req, rsp, opts...) -} - -func (c *outlierClientWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { - entry, _ := sentinelApi.Entry( - req.Service(), - sentinelApi.WithResourceType(base.ResTypeRPC), - sentinelApi.WithTrafficType(base.Outbound), - ) - defer entry.Exit() - opts = append(opts, WithSelectOption(entry)) - opts = append(opts, WithCallWrapper(entry)) - stream, err := c.Client.Stream(ctx, req, opts...) - if err != nil { - sentinelApi.TraceError(entry, err) - } - return stream, err -} - func WithSelectOption(entry *base.SentinelEntry) client.CallOption { return client.WithSelectOption(selector.WithFilter( func(old []*registry.Service) (new []*registry.Service) { @@ -76,9 +37,9 @@ func WithCallWrapper(entry *base.SentinelEntry) client.CallOption { return func(ctx context.Context, node *registry.Node, req client.Request, rsp interface{}, opts client.CallOptions) error { err := f1(ctx, node, req, rsp, opts) - sentinelApi.TraceCallee(entry, node.Address) + sentinel.TraceCallee(entry, node.Address) if err != nil { - sentinelApi.TraceError(entry, err) + sentinel.TraceError(entry, err) } return err }