diff --git a/go.mod b/go.mod index e1bb9062b2..d1412300c7 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/stretchr/testify v1.7.0 go.opentelemetry.io/proto/otlp v0.7.0 golang.org/x/net v0.0.0-20201021035429-f5854403a974 // indirect + golang.org/x/sync v0.0.0-20190423024810-112230192c58 golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 // indirect google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 google.golang.org/grpc v1.36.0 diff --git a/go.sum b/go.sum index 7bebfae838..a2f6cae649 100644 --- a/go.sum +++ b/go.sum @@ -73,6 +73,7 @@ golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4Iltr golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/pkg/server/sotw/v3/server.go b/pkg/server/sotw/v3/server.go index d3b167379a..da60204366 100644 --- a/pkg/server/sotw/v3/server.go +++ b/pkg/server/sotw/v3/server.go @@ -18,10 +18,11 @@ package sotw import ( "context" "errors" - "reflect" "strconv" + "sync" "sync/atomic" + "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -72,6 +73,24 @@ type lastDiscoveryResponse struct { resources map[string]struct{} } +type lastDiscoveryResponses struct { + mu sync.RWMutex + responses map[string]lastDiscoveryResponse +} + +func (l *lastDiscoveryResponses) Set(key string, value lastDiscoveryResponse) { + l.mu.Lock() + l.responses[key] = value + l.mu.Unlock() +} + +func (l *lastDiscoveryResponses) Get(key string) (value lastDiscoveryResponse, ok bool) { + l.mu.RLock() + value, ok = l.responses[key] + l.mu.RUnlock() + return +} + // process handles a bi-di stream request func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryRequest, defaultTypeURL string) error { // increment stream count @@ -82,13 +101,12 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq var streamNonce int64 streamState := stream.NewStreamState(false, map[string]string{}) - lastDiscoveryResponses := map[string]lastDiscoveryResponse{} + lastDiscoveryResponses := lastDiscoveryResponses{responses: make(map[string]lastDiscoveryResponse)} // a collection of stack allocated watches per request type watches := newWatches() defer func() { - watches.close() if s.callbacks != nil { s.callbacks.OnStreamClosed(streamID) } @@ -116,7 +134,7 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq for _, r := range resp.GetRequest().ResourceNames { lastResponse.resources[r] = struct{}{} } - lastDiscoveryResponses[resp.GetRequest().TypeUrl] = lastResponse + lastDiscoveryResponses.Set(resp.GetRequest().TypeUrl, lastResponse) if s.callbacks != nil { s.callbacks.OnStreamResponse(resp.GetContext(), streamID, resp.GetRequest(), out) @@ -133,103 +151,100 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq // node may only be set on the first discovery request var node = &core.Node{} - // recompute dynamic channels for this stream - watches.recompute(s.ctx, reqCh) - - for { - // The list of select cases looks like this: - // 0: <- ctx.Done - // 1: <- reqCh - // 2...: per type watches - index, value, ok := reflect.Select(watches.cases) - switch index { - // ctx.Done() -> if we receive a value here we return as no further computation is needed - case 0: - return nil - // Case 1 handles any request inbound on the stream and handles all initialization as needed - case 1: - // input stream ended or errored out - if !ok { - return nil - } - - req := value.Interface().(*discovery.DiscoveryRequest) - if req == nil { - return status.Errorf(codes.Unavailable, "empty request") - } + var resCh = make(chan cache.Response, 1) - // node field in discovery request is delta-compressed - if req.Node != nil { - node = req.Node - } else { - req.Node = node - } + ctx, cancel := context.WithCancel(s.ctx) + eg, ctx := errgroup.WithContext(ctx) - // nonces can be reused across streams; we verify nonce only if nonce is not initialized - nonce := req.GetResponseNonce() + eg.Go(func() error { + defer func() { + watches.close() // this should remove all watches from the cache + close(resCh) // close resCh and let the second eg.Go drain it + }() - // type URL is required for ADS but is implicit for xDS - if defaultTypeURL == resource.AnyType { - if req.TypeUrl == "" { - return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") + for { + select { + case <-ctx.Done(): + return nil + case req, more := <-reqCh: + if !more { + return nil + } + if req == nil { + return status.Errorf(codes.Unavailable, "empty request") + } + // node field in discovery request is delta-compressed + if req.Node != nil { + node = req.Node + } else { + req.Node = node } - } else if req.TypeUrl == "" { - req.TypeUrl = defaultTypeURL - } - if s.callbacks != nil { - if err := s.callbacks.OnStreamRequest(streamID, req); err != nil { - return err + // nonces can be reused across streams; we verify nonce only if nonce is not initialized + nonce := req.GetResponseNonce() + + // type URL is required for ADS but is implicit for xDS + if defaultTypeURL == resource.AnyType { + if req.TypeUrl == "" { + return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") + } + } else if req.TypeUrl == "" { + req.TypeUrl = defaultTypeURL } - } - if lastResponse, ok := lastDiscoveryResponses[req.TypeUrl]; ok { - if lastResponse.nonce == "" || lastResponse.nonce == nonce { - // Let's record Resource names that a client has received. - streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.resources) + if s.callbacks != nil { + if err := s.callbacks.OnStreamRequest(streamID, req); err != nil { + return err + } } - } - typeURL := req.GetTypeUrl() - responder := make(chan cache.Response, 1) - if w, ok := watches.responders[typeURL]; ok { - // We've found a pre-existing watch, lets check and update if needed. - // If these requirements aren't satisfied, leave an open watch. - if w.nonce == "" || w.nonce == nonce { - w.close() + if lastResponse, ok := lastDiscoveryResponses.Get(req.TypeUrl); ok { + if lastResponse.nonce == "" || lastResponse.nonce == nonce { + // Let's record Resource names that a client has received. + streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.resources) + } + } + typeURL := req.GetTypeUrl() + if w := watches.getWatch(typeURL); w != nil { + // We've found a pre-existing watch, lets check and update if needed. + // If these requirements aren't satisfied, leave an open watch. + if n := w.getNonce(); n == "" || n == nonce { + w.close() + + watches.addWatch(typeURL, &watch{ + cancel: s.cache.CreateWatch(req, streamState, resCh), + }) + } + } else { + // No pre-existing watch exists, let's create one. + // We need to precompute the watches first then open a watch in the cache. watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, streamState, responder), - response: responder, + cancel: s.cache.CreateWatch(req, streamState, resCh), }) } - } else { - // No pre-existing watch exists, let's create one. - // We need to precompute the watches first then open a watch in the cache. - watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, streamState, responder), - response: responder, - }) } + } + }) - // Recompute the dynamic select cases for this stream. - watches.recompute(s.ctx, reqCh) - default: - // Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL - if !ok { - // Receiver channel was closed. TODO(jpeach): probably cancel the watch or something? - return status.Errorf(codes.Unavailable, "resource watch %d -> failed", index) + eg.Go(func() (err error) { + var nonce string + for res := range resCh { + if res == nil || err != nil { + continue // this loop should not exit until resCh closed } - - res := value.Interface().(cache.Response) - nonce, err := send(res) - if err != nil { - return err + if nonce, err = send(res); err == nil { + if w := watches.getWatch(res.GetRequest().TypeUrl); w != nil { + w.setNonce(nonce) + } + } else { + cancel() } - - watches.responders[res.GetRequest().TypeUrl].nonce = nonce } - } + return err + }) + + return eg.Wait() } // StreamHandler converts a blocking read call to channels and initiates stream processing diff --git a/pkg/server/sotw/v3/watches.go b/pkg/server/sotw/v3/watches.go index 45670d6a91..68fd8cc745 100644 --- a/pkg/server/sotw/v3/watches.go +++ b/pkg/server/sotw/v3/watches.go @@ -1,34 +1,37 @@ package sotw import ( - "context" - "reflect" + "sync" - discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/types" - "github.com/envoyproxy/go-control-plane/pkg/cache/v3" ) // watches for all xDS resource types type watches struct { + mu sync.RWMutex responders map[string]*watch - - // cases is a dynamic select case for the watched channels. - cases []reflect.SelectCase } // newWatches creates and initializes watches. func newWatches() watches { return watches{ responders: make(map[string]*watch, int(types.UnknownType)), - cases: make([]reflect.SelectCase, 0), } } // addWatch creates a new watch entry in the watches map. // Watches are sorted by typeURL. func (w *watches) addWatch(typeURL string, watch *watch) { + w.mu.Lock() w.responders[typeURL] = watch + w.mu.Unlock() +} + +func (w *watches) getWatch(typeURL string) (watch *watch) { + w.mu.RLock() + watch = w.responders[typeURL] + w.mu.RUnlock() + return } // close all open watches @@ -38,33 +41,24 @@ func (w *watches) close() { } } -// recomputeWatches rebuilds the known list of dynamic channels if needed -func (w *watches) recompute(ctx context.Context, req <-chan *discovery.DiscoveryRequest) { - w.cases = w.cases[:0] // Clear the existing cases while retaining capacity. - - w.cases = append(w.cases, - reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(ctx.Done()), - }, reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(req), - }, - ) +// watch contains the necessary modifiables for receiving resource responses +type watch struct { + mu sync.RWMutex + cancel func() + nonce string +} - for _, watch := range w.responders { - w.cases = append(w.cases, reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(watch.response), - }) - } +func (w *watch) getNonce() (n string) { + w.mu.RLock() + n = w.nonce + w.mu.RUnlock() + return n } -// watch contains the necessary modifiables for receiving resource responses -type watch struct { - cancel func() - nonce string - response chan cache.Response +func (w *watch) setNonce(n string) { + w.mu.Lock() + w.nonce = n + w.mu.Unlock() } // close cancels an open watch