Skip to content

Commit

Permalink
Record GVK in APIResourceMeta (Azure#284)
Browse files Browse the repository at this point in the history
Co-authored-by: guofei <[email protected]>
Co-authored-by: Ryan Zhang <[email protected]>
  • Loading branch information
3 people authored Sep 13, 2022
1 parent dcae038 commit c4f8c87
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 31 deletions.
20 changes: 6 additions & 14 deletions apis/v1alpha1/clusterresourceplacement_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"fmt"

"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -21,10 +20,8 @@ import (
)

var ResourceInformer informer.Manager
var restMapper meta.RESTMapper

func (c *ClusterResourcePlacement) SetupWebhookWithManager(mgr ctrl.Manager) error {
restMapper = mgr.GetRESTMapper()
return ctrl.NewWebhookManagedBy(mgr).
For(c).
Complete()
Expand Down Expand Up @@ -75,19 +72,14 @@ func ValidateClusterResourcePlacement(clusterResourcePlacement *ClusterResourceP
allErr = append(allErr, fmt.Errorf("cannot perform resource scope check for now, please retry"))
} else {
for _, selector := range clusterResourcePlacement.Spec.ResourceSelectors {
gk := schema.GroupKind{
Group: selector.Group,
Kind: selector.Kind,
gvk := schema.GroupVersionKind{
Group: selector.Group,
Version: selector.Version,
Kind: selector.Kind,
}

restMapping, err := restMapper.RESTMapping(gk, selector.Version)
if err != nil {
allErr = append(allErr, errors.Wrap(err, fmt.Sprintf("failed to get GVR of GVK (%s/%s/%s), please retry if the GVK is valid", selector.Group, selector.Version, selector.Kind)))
continue
}

if !ResourceInformer.IsClusterScopedResources(restMapping.Resource) {
allErr = append(allErr, fmt.Errorf("the resource is not found in schema (please retry) or it is not a cluster scoped resource: %v", restMapping.Resource))
if !ResourceInformer.IsClusterScopedResources(gvk) {
allErr = append(allErr, fmt.Errorf("the resource is not found in schema (please retry) or it is not a cluster scoped resource: %v", gvk))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,12 @@ func (r *Reconciler) fetchClusterScopedResources(ctx context.Context, selector f
return nil, errors.Wrap(err, "Failed to get GVR of the selector")
}
gvr := restMapping.Resource
if !r.InformerManager.IsClusterScopedResources(gvr) {
gvk := schema.GroupVersionKind{
Group: selector.Group,
Version: selector.Version,
Kind: selector.Kind,
}
if !r.InformerManager.IsClusterScopedResources(gvk) {
return nil, errors.New(fmt.Sprintf("%+v is not a cluster scoped resource", restMapping.Resource))
}
if !r.InformerManager.IsInformerSynced(gvr) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (r *Reconciler) getUnstructuredObject(objectKey keys.ClusterWideKey) (runti
return nil, false, errors.Wrap(err, "Failed to get GVR of object")
}
gvr := restMapping.Resource
isClusterScoped := r.InformerManager.IsClusterScopedResources(gvr)
isClusterScoped := r.InformerManager.IsClusterScopedResources(objectKey.GroupVersionKind())
if !r.InformerManager.IsInformerSynced(gvr) {
return nil, isClusterScoped, fmt.Errorf("informer cache for %+v is not synced yet", restMapping.Resource)
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/resourcewatcher/change_dector.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (d *ChangeDetector) Start(ctx context.Context) error {
d.onClusterResourcePlacementUpdated, d.onClusterResourcePlacementDeleted)
d.InformerManager.AddStaticResource(
informer.APIResourceMeta{
GroupVersionKind: utils.ClusterResourcePlacementGVK,
GroupVersionResource: utils.ClusterResourcePlacementGVR,
IsClusterScoped: true,
}, clusterPlacementEventHandler)
Expand All @@ -90,6 +91,7 @@ func (d *ChangeDetector) Start(ctx context.Context) error {
workEventHandler := newHandlerOnEvents(nil, d.onWorkUpdated, d.onWorkDeleted)
d.InformerManager.AddStaticResource(
informer.APIResourceMeta{
GroupVersionKind: utils.WorkGVK,
GroupVersionResource: utils.WorkGVR,
IsClusterScoped: false,
}, workEventHandler)
Expand All @@ -99,6 +101,7 @@ func (d *ChangeDetector) Start(ctx context.Context) error {
memberClusterEventHandler := newHandlerOnEvents(nil, d.onMemberClusterUpdated, nil)
d.InformerManager.AddStaticResource(
informer.APIResourceMeta{
GroupVersionKind: utils.MemberClusterGVK,
GroupVersionResource: utils.MemberClusterGVR,
IsClusterScoped: true,
}, memberClusterEventHandler)
Expand Down
2 changes: 2 additions & 0 deletions pkg/resourcewatcher/resource_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ func (d *ChangeDetector) getWatchableResources() ([]informer.APIResourceMeta, er
}
for i := range rl.APIResources {
gvr := schema.GroupVersionResource{Group: gv.Group, Version: gv.Version, Resource: rl.APIResources[i].Name}
gvk := schema.GroupVersionKind{Group: gv.Group, Version: gv.Version, Kind: rl.APIResources[i].Kind}
watchableGroupVersionResources = append(watchableGroupVersionResources, informer.APIResourceMeta{
GroupVersionKind: gvk,
GroupVersionResource: gvr,
IsClusterScoped: !rl.APIResources[i].Namespaced,
})
Expand Down
12 changes: 12 additions & 0 deletions pkg/utils/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ var (
Resource: fleetv1alpha1.ClusterResourcePlacementResource,
}

ClusterResourcePlacementGVK = schema.GroupVersionKind{
Group: fleetv1alpha1.GroupVersion.Group,
Version: fleetv1alpha1.GroupVersion.Version,
Kind: "ClusterResourcePlacement",
}

NamespaceGVK = schema.GroupVersionKind{
Group: corev1.GroupName,
Version: corev1.SchemeGroupVersion.Version,
Expand All @@ -110,6 +116,12 @@ var (
Resource: fleetv1alpha1.MemberClusterResource,
}

MemberClusterGVK = schema.GroupVersionKind{
Group: fleetv1alpha1.GroupVersion.Group,
Version: fleetv1alpha1.GroupVersion.Version,
Kind: fleetv1alpha1.MemberClusterKind,
}

WorkGVK = schema.GroupVersionKind{
Group: workv1alpha1.GroupVersion.Group,
Version: workv1alpha1.GroupVersion.Version,
Expand Down
33 changes: 18 additions & 15 deletions pkg/utils/informer/informermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Manager interface {
GetNameSpaceScopedResources() []schema.GroupVersionResource

// IsClusterScopedResources returns if a resource is cluster scoped.
IsClusterScopedResources(resource schema.GroupVersionResource) bool
IsClusterScopedResources(resource schema.GroupVersionKind) bool

// WaitForCacheSync waits for the informer cache to populate.
WaitForCacheSync()
Expand All @@ -69,12 +69,15 @@ func NewInformerManager(client dynamic.Interface, defaultResync time.Duration, p
ctx: ctx,
cancel: cancel,
informerFactory: dynamicinformer.NewDynamicSharedInformerFactory(client, defaultResync),
apiResources: make(map[schema.GroupVersionResource]*APIResourceMeta),
apiResources: make(map[schema.GroupVersionKind]*APIResourceMeta),
}
}

// APIResourceMeta contains the gvk and associated metadata about an api resource
type APIResourceMeta struct {
// GroupVersionKind is the gvk of the resource.
GroupVersionKind schema.GroupVersionKind

// GroupVersionResource is the gvr of the resource.
GroupVersionResource schema.GroupVersionResource

Expand Down Expand Up @@ -102,18 +105,18 @@ type informerManagerImpl struct {
informerFactory dynamicinformer.DynamicSharedInformerFactory

// the apiResources map collects all the api resources we watch
apiResources map[schema.GroupVersionResource]*APIResourceMeta
apiResources map[schema.GroupVersionKind]*APIResourceMeta
resourcesLock sync.RWMutex
}

func (s *informerManagerImpl) AddDynamicResources(dynResources []APIResourceMeta, handler cache.ResourceEventHandler, listComplete bool) {
newGVRs := make(map[schema.GroupVersionResource]bool, len(dynResources))
newGVKs := make(map[schema.GroupVersionKind]bool, len(dynResources))

addInformerFunc := func(newRes APIResourceMeta) {
dynRes, exist := s.apiResources[newRes.GroupVersionResource]
dynRes, exist := s.apiResources[newRes.GroupVersionKind]
if !exist {
newRes.isPresent = true
s.apiResources[newRes.GroupVersionResource] = &newRes
s.apiResources[newRes.GroupVersionKind] = &newRes
s.informerFactory.ForResource(newRes.GroupVersionResource).Informer().AddEventHandler(handler)
klog.InfoS("Added an informer for a new resource", "res", newRes)
} else if !dynRes.isPresent {
Expand All @@ -130,7 +133,7 @@ func (s *informerManagerImpl) AddDynamicResources(dynResources []APIResourceMeta

// Add the new dynResources that do not exist yet while build a map to speed up lookup
for _, newRes := range dynResources {
newGVRs[newRes.GroupVersionResource] = true
newGVKs[newRes.GroupVersionKind] = true
addInformerFunc(newRes)
}

Expand All @@ -140,8 +143,8 @@ func (s *informerManagerImpl) AddDynamicResources(dynResources []APIResourceMeta
}

// mark the disappeared dynResources from the handler map
for gvr, dynRes := range s.apiResources {
if !newGVRs[gvr] && !dynRes.isStaticResource && dynRes.isPresent {
for gvk, dynRes := range s.apiResources {
if !newGVKs[gvk] && !dynRes.isStaticResource && dynRes.isPresent {
// TODO: Disable the informer associated with the resource
dynRes.isPresent = false
klog.InfoS("Disabled an informer for a disappeared resource", "res", dynRes)
Expand All @@ -153,13 +156,13 @@ func (s *informerManagerImpl) AddStaticResource(resource APIResourceMeta, handle
s.resourcesLock.Lock()
defer s.resourcesLock.Unlock()

staticRes, exist := s.apiResources[resource.GroupVersionResource]
staticRes, exist := s.apiResources[resource.GroupVersionKind]
if exist {
klog.ErrorS(fmt.Errorf("a static resource is added already"), "existing res", staticRes)
}

resource.isStaticResource = true
s.apiResources[resource.GroupVersionResource] = &resource
s.apiResources[resource.GroupVersionKind] = &resource
s.informerFactory.ForResource(resource.GroupVersionResource).Informer().AddEventHandler(handler)
}

Expand Down Expand Up @@ -189,19 +192,19 @@ func (s *informerManagerImpl) GetNameSpaceScopedResources() []schema.GroupVersio
defer s.resourcesLock.RUnlock()

res := make([]schema.GroupVersionResource, 0, len(s.apiResources))
for gvr, resource := range s.apiResources {
for _, resource := range s.apiResources {
if resource.isPresent && !resource.isStaticResource && !resource.IsClusterScoped {
res = append(res, gvr)
res = append(res, resource.GroupVersionResource)
}
}
return res
}

func (s *informerManagerImpl) IsClusterScopedResources(resource schema.GroupVersionResource) bool {
func (s *informerManagerImpl) IsClusterScopedResources(gvk schema.GroupVersionKind) bool {
s.resourcesLock.RLock()
defer s.resourcesLock.RUnlock()

resMeta, exist := s.apiResources[resource]
resMeta, exist := s.apiResources[gvk]
if !exist {
return false
}
Expand Down

0 comments on commit c4f8c87

Please sign in to comment.