Skip to content

Commit

Permalink
remove selectCase and remove any signs of 2 defaults
Browse files Browse the repository at this point in the history
Signed-off-by: alecholmez <[email protected]>

oops remove that

Signed-off-by: alecholmez <[email protected]>

stash changes

Signed-off-by: alecholmez <[email protected]>
  • Loading branch information
alecholmez committed Dec 6, 2021
1 parent 8e3e122 commit eba7aed
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 45 deletions.
35 changes: 18 additions & 17 deletions pkg/server/sotw/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
lastDiscoveryResponses := map[string]lastDiscoveryResponse{}

// a collection of stack allocated watches per request type
watches := newWatches(reqCh)
watches := newWatches()

defer func() {
watches.Cancel()
watches.close()
if s.callbacks != nil {
s.callbacks.OnStreamClosed(streamID)
}
Expand Down Expand Up @@ -124,14 +124,6 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
return out.Nonce, str.Send(out)
}

open := func(w *watch, req *discovery.DiscoveryRequest, responder chan cache.Response) {
w.cancel = s.cache.CreateWatch(req, streamState, responder)
watches.cases[w.index] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(responder),
}
}

if s.callbacks != nil {
if err := s.callbacks.OnStreamOpen(str.Context(), streamID, defaultTypeURL); err != nil {
return err
Expand All @@ -142,7 +134,7 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
var node = &core.Node{}

// recompute dynamic channels for this stream
watches.RecomputeWatches(s.ctx, reqCh)
watches.recompute(s.ctx, reqCh)

for {
// The list of select cases looks like this:
Expand Down Expand Up @@ -204,27 +196,36 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
// We've found a pre-existing watch, lets check and update if needed.
// If these requirements aren't satisfied, leave an open watch.
if w.nonce == "" || w.nonce == nonce {
w.Cancel()
w.close()

open(w, req, responder)
watches.addWatch(typeURL, &watch{
cancel: s.cache.CreateWatch(req, streamState, responder),
response: responder,
})
}
} else {
// No pre-existing watch exists, let's create one.
// We need to precompute the watches first then open a watch in the cache.
watches.responders[typeURL] = &watch{}
w = watches.responders[typeURL]
watches.RecomputeWatches(s.ctx, reqCh)
watches.recompute(s.ctx, reqCh)

open(w, req, responder)
watches.addWatch(typeURL, &watch{
cancel: s.cache.CreateWatch(req, streamState, responder),
response: responder,
})
}

// Recompute the dynamic select cases for this stream.
watches.recompute(s.ctx, reqCh)
default:
// Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL
if !ok {
return status.Errorf(codes.Unavailable, "resource watch failed")
return status.Errorf(codes.Unavailable, "resource watch %d -> failed", index)
}

res := value.Interface().(cache.Response)
nonce, err := send(value.Interface().(cache.Response))
nonce, err := send(res)
if err != nil {
return err
}
Expand Down
56 changes: 28 additions & 28 deletions pkg/server/sotw/v3/watches.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
)

// watches for all xDS resource types
Expand All @@ -17,57 +18,56 @@ type watches struct {
}

// newWatches creates and initializes watches.
func newWatches(req <-chan *discovery.DiscoveryRequest) watches {
func newWatches() watches {
return watches{
responders: make(map[string]*watch, int(types.UnknownType)),
cases: make([]reflect.SelectCase, 2), // We use 2 for the default computation here: ctx.Done() + reqCh.Recv()
cases: make([]reflect.SelectCase, 0),
}
}

// Cancel all watches
func (w *watches) Cancel() {
// addWatch creates a new watch entry in the watches map.
// Watches are sorted by typeURL.
func (w *watches) addWatch(typeURL string, watch *watch) {
w.responders[typeURL] = watch
}

// close all open watches
func (w *watches) close() {
for _, watch := range w.responders {
watch.Cancel()
watch.close()
}
}

// recomputeWatches rebuilds the known list of dynamic channels if needed
func (w *watches) RecomputeWatches(ctx context.Context, reqCh <-chan *discovery.DiscoveryRequest) {
newCases := []reflect.SelectCase{
{
func (w *watches) recompute(ctx context.Context, req <-chan *discovery.DiscoveryRequest) {
w.cases = w.cases[0:]
w.cases = append(w.cases,
reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ctx.Done()),
},
{
}, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(reqCh),
Chan: reflect.ValueOf(req),
},
}
)

index := len(newCases)
for _, watch := range w.responders {
newCases = append(newCases, watch.selectCase)
watch.index = index
index++
w.cases = append(w.cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(watch.response),
})
}

w.cases = newCases
}

// watch contains the necessary modifiables for receiving resource responses
type watch struct {
selectCase reflect.SelectCase
cancel func()
nonce string

// Index is used to track the location of this channel in watches. This allows us
// to update the channel used at this slot without recomputing the entire list of select
// statements.
index int
cancel func()
nonce string
response chan cache.Response
}

// Cancel calls terminate and cancel
func (w *watch) Cancel() {
// close cancels an open watch
func (w *watch) close() {
if w.cancel != nil {
w.cancel()
}
Expand Down

0 comments on commit eba7aed

Please sign in to comment.