Skip to content

Commit

Permalink
[ISSUE #139] feat:suppport watch services change event (#138)
Browse files Browse the repository at this point in the history
* feat: 添加就近路由支持文档

* rebase upstream/master

* fix:修复demo中缺失import问题

* fix:修复demo中缺失import问题

* feat:suppport watch services change event
  • Loading branch information
chuntaojun authored Apr 12, 2023
1 parent a0389ad commit 0e788fb
Show file tree
Hide file tree
Showing 17 changed files with 421 additions and 28 deletions.
5 changes: 5 additions & 0 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ type InitCalleeServiceRequest api.InitCalleeServiceRequest
// WatchAllInstancesRequest is the request to watch instances
type WatchAllInstancesRequest api.WatchAllInstancesRequest

// WatchAllServicesRequest is the request to watch services
type WatchAllServicesRequest api.WatchAllServicesRequest

// ConsumerAPI 主调端API方法.
type ConsumerAPI interface {
api.SDKOwner
Expand All @@ -71,6 +74,8 @@ type ConsumerAPI interface {
InitCalleeService(req *InitCalleeServiceRequest) error
// WatchAllInstances 监听服务实例变更事件
WatchAllInstances(req *WatchAllInstancesRequest) (*model.WatchAllInstancesResponse, error)
// WatchAllServices 监听服务列表变更事件
WatchAllServices(req *WatchAllServicesRequest) (*model.WatchAllServicesResponse, error)
// Destroy 销毁API,销毁后无法再进行调用
Destroy()
}
Expand Down
7 changes: 7 additions & 0 deletions api/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ type WatchAllInstancesRequest struct {
model.WatchAllInstancesRequest
}

// WatchAllServicesRequest .
type WatchAllServicesRequest struct {
model.WatchAllServicesRequest
}

// ConsumerAPI 主调端API方法
type ConsumerAPI interface {
SDKOwner
Expand All @@ -147,6 +152,8 @@ type ConsumerAPI interface {
InitCalleeService(req *InitCalleeServiceRequest) error
// WatchAllInstances 监听服务实例变更事件
WatchAllInstances(req *WatchAllInstancesRequest) (*model.WatchAllInstancesResponse, error)
// WatchAllServices 监听服务列表变更事件
WatchAllServices(req *WatchAllServicesRequest) (*model.WatchAllServicesResponse, error)
}

var (
Expand Down
10 changes: 10 additions & 0 deletions api/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,16 @@ func (c *consumerAPI) WatchAllInstances(req *WatchAllInstancesRequest) (*model.W
return c.context.GetEngine().WatchAllInstances(&req.WatchAllInstancesRequest)
}

func (c *consumerAPI) WatchAllServices(req *WatchAllServicesRequest) (*model.WatchAllServicesResponse, error) {
if err := checkAvailable(c); err != nil {
return nil, err
}
if err := req.Validate(); err != nil {
return nil, err
}
return c.context.GetEngine().WatchAllServices(&req.WatchAllServicesRequest)
}

// SDKContext 获取SDK上下文
func (c *consumerAPI) SDKContext() SDKContext {
return c.context
Expand Down
5 changes: 5 additions & 0 deletions api_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ func (c *consumerAPI) WatchAllInstances(req *WatchAllInstancesRequest) (*model.W
return c.rawAPI.WatchAllInstances((*api.WatchAllInstancesRequest)(req))
}

// WatchAllServices 监听服务列表变更事件
func (c *consumerAPI) WatchAllServices(req *WatchAllServicesRequest) (*model.WatchAllServicesResponse, error) {
return c.rawAPI.WatchAllServices((*api.WatchAllServicesRequest)(req))
}

// Destroy 销毁API,销毁后无法再进行调用
func (c *consumerAPI) Destroy() {
c.rawAPI.Destroy()
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
global:
serverConnector:
addresses:
- 9.134.5.52:8091
- 127.0.0.1:8091
statReporter:
enable: false
enable: false
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ var (
)

func initArgs() {
flag.StringVar(&namespace, "namespace", "", "namespace")
flag.StringVar(&service, "service", "", "service")
flag.StringVar(&namespace, "namespace", "default", "namespace")
flag.StringVar(&service, "service", "WatchInstanceNotify", "service")
flag.Uint64Var(&waitIndex, "waitIndex", 0, "waitIndex")
flag.DurationVar(&waitTime, "waitTime", 10*time.Second, "waitTime")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
global:
serverConnector:
addresses:
- 9.134.5.52:8091
- 127.0.0.1:8091
statReporter:
enable: false
enable: false
73 changes: 73 additions & 0 deletions examples/watch/service/longpull/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/**
* Tencent is pleased to support the open source community by making Polaris available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package main

import (
"flag"
"log"
"time"

"github.com/polarismesh/polaris-go"
"github.com/polarismesh/polaris-go/api"
)

var (
namespace string
waitIndex uint64
waitTime time.Duration
)

func initArgs() {
flag.StringVar(&namespace, "namespace", "", "namespace")
flag.Uint64Var(&waitIndex, "waitIndex", 0, "waitIndex")
flag.DurationVar(&waitTime, "waitTime", 60*time.Second, "waitTime")
}

func main() {
initArgs()
flag.Parse()
consumer, err := polaris.NewConsumerAPI()
if err != nil {
log.Fatalf("fail to create consumerAPI, err is %v", err)
}
defer consumer.Destroy()

var index uint64 = waitIndex

for {
req := &polaris.WatchAllServicesRequest{}
req.Namespace = namespace
req.WaitTime = waitTime
req.WaitIndex = index
req.WatchMode = api.WatchModeLongPull
resp, err := consumer.WatchAllServices(req)
if err != nil {
log.Fatalf("fail to watch all instances, namespace %s, err: %s", namespace, err)
}
servicesResp := resp.ServicesResponse()
index = servicesResp.GetHashValue()
log.Printf("namespace %s, services count is %d, revision : %s, next watch index %d", namespace,
len(servicesResp.GetValue()), resp.ServicesResponse().GetRevision(), index)
// for _, svc := range servicesResp.GetValue() {
// log.Printf("namespace %s, svc %s", svc.Namespace, svc.Service)
// }

log.Printf("namespace %s, watch id is %d\n", namespace, resp.WatchId())
resp.CancelWatch()
}
}
6 changes: 6 additions & 0 deletions examples/watch/service/longpull/polaris.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
global:
serverConnector:
addresses:
- 127.0.0.1:8091
statReporter:
enable: false
84 changes: 84 additions & 0 deletions examples/watch/service/notify/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/**
* Tencent is pleased to support the open source community by making Polaris available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package main

import (
"flag"
"log"
"sync"

"github.com/polarismesh/polaris-go"
"github.com/polarismesh/polaris-go/api"
"github.com/polarismesh/polaris-go/pkg/model"
)

var (
namespace string
waitIndex uint64
)

func initArgs() {
flag.StringVar(&namespace, "namespace", "Polaris", "namespace")
flag.Uint64Var(&waitIndex, "waitIndex", 0, "waitIndex")
}

const svcCount = 1

var port int32 = 1000

type TestListener struct {
wg *sync.WaitGroup
}

// OnInstancesUpdate notify when service instances changed
func (t *TestListener) OnServicesUpdate(resp *model.ServicesResponse) {
defer t.wg.Done()
log.Printf("receive namespace %s, services change, new count is %d, revision : %s", namespace, len(resp.GetValue()), resp.GetRevision())
// for _, svc := range resp.GetValue() {
// log.Printf("namespace %s, svc %s", svc.Namespace, svc.Service)
// }
}

func main() {
initArgs()
flag.Parse()
consumer, err := polaris.NewConsumerAPI()
if err != nil {
log.Fatalf("fail to create consumerAPI, err is %v", err)
}
defer consumer.Destroy()

wg := &sync.WaitGroup{}
wg.Add(10)
req := &polaris.WatchAllServicesRequest{}
req.Namespace = namespace
req.WatchMode = api.WatchModeNotify
req.ServicesListener = &TestListener{
wg: wg,
}
resp, err := consumer.WatchAllServices(req)
if err != nil {
log.Fatalf("fail to watch all services, namespace %s, err: %s", namespace, err)
}
servicesResp := resp.ServicesResponse()
log.Printf("namespace %s, services count is %d", namespace, len(servicesResp.GetValue()))
// for _, svc := range servicesResp.GetValue() {
// log.Printf("namespace %s, svc %s", svc.Namespace, svc.Service)
// }
wg.Wait()
}
6 changes: 6 additions & 0 deletions examples/watch/service/notify/polaris.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
global:
serverConnector:
addresses:
- 127.0.0.1:8091
statReporter:
enable: false
5 changes: 5 additions & 0 deletions pkg/flow/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,3 +636,8 @@ func (e *Engine) SyncGetConfigFile(namespace, fileGroup, fileName string) (model
func (e *Engine) WatchAllInstances(request *model.WatchAllInstancesRequest) (*model.WatchAllInstancesResponse, error) {
return e.watchEngine.WatchAllInstances(request)
}

// WatchAllServices 监听所有的服务列表
func (e *Engine) WatchAllServices(request *model.WatchAllServicesRequest) (*model.WatchAllServicesResponse, error) {
return e.watchEngine.WatchAllServices(request)
}
Loading

0 comments on commit 0e788fb

Please sign in to comment.