Skip to content

Commit

Permalink
feat: block the reserved namespaces for placement (Azure#266)
Browse files Browse the repository at this point in the history
Co-authored-by: Ryan Zhang <[email protected]>
  • Loading branch information
ryanzhang-oss and Ryan Zhang authored Sep 7, 2022
1 parent b7abcfb commit d0ae29f
Show file tree
Hide file tree
Showing 12 changed files with 165 additions and 75 deletions.
2 changes: 1 addition & 1 deletion cmd/hubagent/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func NewOptions() *Options {
LeaderElect: true,
ResourceLock: resourcelock.LeasesResourceLock,
ResourceNamespace: utils.FleetSystemNamespace,
ResourceName: "13622se4848560.hub.fleet.azure.com",
ResourceName: "136224848560.hub.fleet.azure.com",
},
ConcurrentClusterPlacementSyncs: 1,
ConcurrentResourceChangeSyncs: 1,
Expand Down
24 changes: 10 additions & 14 deletions cmd/hubagent/workload/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package workload

import (
"context"

"strings"

"k8s.io/client-go/discovery"
Expand All @@ -28,9 +27,9 @@ import (
)

const (
clusterResourcePlacementName = "cluster-resource-placement-controller"
resourceChangeName = "resource-change-controller"
memberClusterPlacementName = "memberCluster-placement-controller"
crpControllerName = "cluster-resource-placement-controller"
resourceChangeControllerName = "resource-change-controller"
mcPlacementControllerName = "memberCluster-placement-controller"
)

// SetupControllers set up the customized controllers we developed
Expand Down Expand Up @@ -65,11 +64,8 @@ func SetupControllers(ctx context.Context, mgr ctrl.Manager, config *rest.Config
return err
}

// setup namespaces we skip propagation
skippedNamespaces := make(map[string]bool)
skippedNamespaces["fleet-system"] = true
skippedNamespaces["kube-system"] = true
skippedNamespaces["kube-public"] = true
skippedNamespaces["kube-node-lease"] = true
skippedNamespaces["default"] = true
optionalSkipNS := strings.Split(opts.SkippedPropagatingNamespaces, ";")
for _, ns := range optionalSkipNS {
Expand All @@ -87,15 +83,15 @@ func SetupControllers(ctx context.Context, mgr ctrl.Manager, config *rest.Config
klog.Info("Setting up clusterResourcePlacement controller")
crpc := &clusterresourceplacement.Reconciler{
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor(clusterResourcePlacementName),
Recorder: mgr.GetEventRecorderFor(crpControllerName),
RestMapper: mgr.GetRESTMapper(),
InformerManager: dynamicInformerManager,
DisabledResourceConfig: disabledResourceConfig,
WorkPendingGracePeriod: opts.WorkPendingGracePeriod,
SkippedNamespaces: skippedNamespaces,
}

ratelimiter := options.DefaultControllerRateLimiter(opts.RateLimiterOpts)
clusterResourcePlacementController := controller.NewController(clusterResourcePlacementName, controller.NamespaceKeyFunc, crpc.Reconcile, ratelimiter)
clusterResourcePlacementController := controller.NewController(crpControllerName, controller.NamespaceKeyFunc, crpc.Reconcile, ratelimiter)
if err != nil {
klog.ErrorS(err, "unable to set up clusterResourcePlacement controller")
return err
Expand All @@ -105,13 +101,13 @@ func SetupControllers(ctx context.Context, mgr ctrl.Manager, config *rest.Config
klog.Info("Setting up resource change controller")
rcr := &resourcechange.Reconciler{
DynamicClient: dynamicClient,
Recorder: mgr.GetEventRecorderFor(resourceChangeName),
Recorder: mgr.GetEventRecorderFor(resourceChangeControllerName),
RestMapper: mgr.GetRESTMapper(),
InformerManager: dynamicInformerManager,
PlacementController: clusterResourcePlacementController,
}

resourceChangeController := controller.NewController(resourceChangeName, controller.ClusterWideKeyFunc, rcr.Reconcile, ratelimiter)
resourceChangeController := controller.NewController(resourceChangeControllerName, controller.ClusterWideKeyFunc, rcr.Reconcile, ratelimiter)
if err != nil {
klog.ErrorS(err, "unable to set up resource change controller")
return err
Expand All @@ -124,7 +120,7 @@ func SetupControllers(ctx context.Context, mgr ctrl.Manager, config *rest.Config
PlacementController: clusterResourcePlacementController,
}

memberClusterPlacementController := controller.NewController(memberClusterPlacementName, controller.NamespaceKeyFunc, mcp.Reconcile, ratelimiter)
memberClusterPlacementController := controller.NewController(mcPlacementControllerName, controller.NamespaceKeyFunc, mcp.Reconcile, ratelimiter)
if err != nil {
klog.ErrorS(err, "unable to set up resource change controller")
return err
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -90,5 +90,5 @@ require (

replace (
k8s.io/kube-scheduler => k8s.io/kube-scheduler v0.24.2 // weird bug that the goland won't compile without this
sigs.k8s.io/work-api => github.com/Azure/k8s-work-api v0.4.1
sigs.k8s.io/work-api => github.com/Azure/k8s-work-api v0.4.2
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ github.com/Azure/go-autorest/autorest/date v0.3.0/go.mod h1:BI0uouVdmngYNUzGWeSY
github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k=
github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8=
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
github.com/Azure/k8s-work-api v0.4.1 h1:q2KEi1yCCoS+1hm9rZ8IHaKCjiNhtd0/SXHfuNl6Ruk=
github.com/Azure/k8s-work-api v0.4.1/go.mod h1:VtsAdhZMoEP9WOEW+LmLm6NRHNyIjJ5xGOzJA64O7ew=
github.com/Azure/k8s-work-api v0.4.2 h1:Kwl8pmBfiykgWws12ud80TpU9gQNveyR7zlwMutGwGc=
github.com/Azure/k8s-work-api v0.4.2/go.mod h1:FOGJkJ+uxjWlvUgmqUlRcmr4Q2ijocrUO/aLJv827y8=
github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 h1:WVsrXCnHlDDX8ls+tootqRE87/hL9S/g4ewig9RsD/c=
github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
Expand Down
24 changes: 9 additions & 15 deletions pkg/controllers/clusterresourceplacement/placement_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"go.goms.io/fleet/pkg/utils"
"go.goms.io/fleet/pkg/utils/controller"
"go.goms.io/fleet/pkg/utils/informer"
"go.goms.io/fleet/pkg/utils/validator"
)

const (
Expand All @@ -41,19 +40,22 @@ var (

// Reconciler reconciles a cluster resource placement object
type Reconciler struct {
// the informer contains the cache for all the resources we need
// the informer contains the cache for all the resources we need.
InformerManager informer.Manager

// RestMapper is used to convert between gvk and gvr on known resources.
RestMapper meta.RESTMapper

// Client is used to update objects which goes to the api server directly
// Client is used to update objects which goes to the api server directly.
Client client.Client

// DisabledResourceConfig contains all the api resources that we won't select
// DisabledResourceConfig contains all the api resources that we won't select.
DisabledResourceConfig *utils.DisabledResourceConfig

WorkPendingGracePeriod metav1.Duration
Recorder record.EventRecorder
// SkippedNamespaces contains the namespaces that we should not propagate.
SkippedNamespaces map[string]bool

Recorder record.EventRecorder
}

func (r *Reconciler) Reconcile(ctx context.Context, key controller.QueueKey) (ctrl.Result, error) {
Expand All @@ -74,14 +76,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, key controller.QueueKey) (ct

// TODO: add finalizer logic if we need it in the future

// TODO: move this to webhook
if err := validator.ValidateClusterResourcePlacement(placementOld); err != nil {
invalidSpec := "the spec is invalid"
klog.ErrorS(err, invalidSpec, "placement", placeRef)
r.Recorder.Event(placementOld, corev1.EventTypeWarning, invalidSpec, err.Error())
return ctrl.Result{}, nil
}

klog.V(2).InfoS("Start to reconcile a ClusterResourcePlacement", "placement", placeRef)
// select the new clusters and record that in the placementNew status
selectedClusters, scheduleErr := r.selectClusters(placementNew)
Expand All @@ -103,7 +97,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, key controller.QueueKey) (ct
// select the new resources and record the result in the placementNew status
manifests, scheduleErr := r.selectResources(ctx, placementNew)
if scheduleErr != nil {
klog.ErrorS(scheduleErr, "failed to generate the work resource for this placementOld", "placement", placeRef)
klog.ErrorS(scheduleErr, "failed to select the resources for this placement", "placement", placeRef)
r.updatePlacementScheduledCondition(placementOld, scheduleErr)
_ = r.Client.Status().Update(ctx, placementOld, client.FieldOwner(utils.PlacementFieldManagerName))
return ctrl.Result{}, scheduleErr
Expand Down
24 changes: 16 additions & 8 deletions pkg/controllers/clusterresourceplacement/resource_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
func (r *Reconciler) selectResources(ctx context.Context, placement *fleetv1alpha1.ClusterResourcePlacement) ([]workv1alpha1.Manifest, error) {
selectedObjects, err := r.gatherSelectedResource(ctx, placement)
if err != nil {
return nil, errors.Wrap(err, "Failed to gather all the selected resource")
return nil, err
}
placement.Status.SelectedResources = make([]fleetv1alpha1.ResourceIdentifier, 0)
manifests := make([]workv1alpha1.Manifest, len(selectedObjects))
Expand Down Expand Up @@ -177,11 +177,15 @@ func (r *Reconciler) fetchNamespaceResources(ctx context.Context, selector fleet

if len(selector.Name) != 0 {
// just a single namespace
return r.fetchAllResourcesInOneNamespace(ctx, selector.Name, placeName)
objs, err := r.fetchAllResourcesInOneNamespace(ctx, selector.Name, placeName)
if err != nil {
klog.ErrorS(err, "failed to fetch all the selected resource in a namespace", "namespace", selector.Name)
return nil, err
}
return objs, err
}
// go through each namespace
lister := r.InformerManager.Lister(utils.NamespaceGVR)

// go through each namespace
var labelSelector labels.Selector
var err error
if selector.LabelSelector == nil {
Expand All @@ -192,9 +196,9 @@ func (r *Reconciler) fetchNamespaceResources(ctx context.Context, selector fleet
return nil, errors.Wrap(err, "cannot convert the label selector to a selector")
}
}
namespaces, err := lister.List(labelSelector)
namespaces, err := r.InformerManager.Lister(utils.NamespaceGVR).List(labelSelector)
if err != nil {
return nil, errors.Wrap(err, "cannot list all the namespaces")
return nil, errors.Wrap(err, "cannot list all the namespaces given the label selector")
}

for _, namespace := range namespaces {
Expand All @@ -204,19 +208,23 @@ func (r *Reconciler) fetchNamespaceResources(ctx context.Context, selector fleet
}
objs, err := r.fetchAllResourcesInOneNamespace(ctx, ns.GetName(), placeName)
if err != nil {
klog.ErrorS(err, "failed to fetch all the selected resource in a namespace", "namespace", ns.GetName())
return nil, err
}
resources = append(resources, objs...)
}

return resources, nil
}

// fetchAllResourcesInOneNamespace retrieve all the objects inside a single namespace which includes the namespace itself.
func (r *Reconciler) fetchAllResourcesInOneNamespace(ctx context.Context, namespaceName string, placeName string) ([]runtime.Object, error) {
klog.V(4).InfoS("start to fetch all the resources inside a namespace", "namespace", namespaceName)
var resources []runtime.Object

if !utils.ShouldPropagateNamespace(namespaceName, r.SkippedNamespaces) {
return nil, errors.New(fmt.Sprintf("namespace %s is not allowed to propagate", namespaceName))
}

klog.V(4).InfoS("start to fetch all the resources inside a namespace", "namespace", namespaceName)
// select the namespace object itself
obj, err := r.InformerManager.Lister(utils.NamespaceGVR).Get(namespaceName)
if err != nil {
Expand Down
11 changes: 1 addition & 10 deletions pkg/resourcewatcher/change_dector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package resourcewatcher

import (
"context"
"strings"
"time"

"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -189,15 +188,7 @@ func (d *ChangeDetector) dynamicResourceFilter(obj interface{}) bool {
}

cwKey, _ := key.(keys.ClusterWideKey)
// special case for cluster namespace
if strings.HasPrefix(cwKey.Namespace, utils.ClusterNamespacePrefix) {
klog.V(5).InfoS("Skip watching resource in namespace", "namespace", cwKey.Namespace,
"group", cwKey.Group, "version", cwKey.Version, "kind", cwKey.Kind, "object", cwKey.Name)
return false
}

// if SkippedNamespaces is set, skip any events related to the object in these namespaces.
if _, ok := d.SkippedNamespaces[cwKey.Namespace]; ok {
if !utils.ShouldPropagateNamespace(cwKey.Namespace, d.SkippedNamespaces) {
klog.V(5).InfoS("Skip watching resource in namespace", "namespace", cwKey.Namespace,
"group", cwKey.Group, "version", cwKey.Version, "kind", cwKey.Kind, "object", cwKey.Name)
return false
Expand Down
40 changes: 27 additions & 13 deletions pkg/utils/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"log"
"math/big"
"strings"
"time"

"github.com/pkg/errors"
Expand All @@ -30,17 +31,21 @@ import (
)

const (
FleetSystemNamespace = "fleet-system"

ClusterNamespacePrefix = "fleet-member-"
NamespaceNameFormat = ClusterNamespacePrefix + "%s"

RoleNameFormat = "fleet-role-%s"

RoleBindingNameFormat = "fleet-rolebinding-%s"
kubePrefix = "kube-"
fleetPrefix = "fleet-"
FleetSystemNamespace = fleetPrefix + "system"
NamespaceNameFormat = fleetPrefix + "member-%s"
RoleNameFormat = fleetPrefix + "role-%s"
RoleBindingNameFormat = fleetPrefix + "rolebinding-%s"
)

PlacementFieldManagerName = "cluster-placement-controller"
const (
// NetworkingGroupName is the group name of the fleet networking.
NetworkingGroupName = "networking.fleet.azure.com"
)

const (
PlacementFieldManagerName = "cluster-placement-controller"
MCControllerFieldManagerName = "member-cluster-controller"
)

Expand All @@ -56,10 +61,6 @@ const (
// PlacementFinalizer is used to make sure that we handle gc of placement resources.
PlacementFinalizer = "work.fleet.azure.com/placement-protection"
)
const (
// NetworkingGroupName is the group name of the fleet networking.
NetworkingGroupName = "networking.fleet.azure.com"
)

var (
FleetRule = rbacv1.PolicyRule{
Expand Down Expand Up @@ -222,3 +223,16 @@ func ShouldPropagateObj(informerManager informer.Manager, uObj *unstructured.Uns
}
return true, nil
}

// ShouldPropagateNamespace decides if we should propagate the resources in the namespace
func ShouldPropagateNamespace(namespace string, skippedNamespaces map[string]bool) bool {
// special case for namespace have the reserved prefix
if strings.HasPrefix(namespace, fleetPrefix) || strings.HasPrefix(namespace, kubePrefix) {
return false
}

if skippedNamespaces[namespace] {
return false
}
return true
}
24 changes: 24 additions & 0 deletions test/e2e/utils/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
"github.com/onsi/gomega/format"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -246,3 +247,26 @@ func DeleteServiceAccount(cluster framework.Cluster, sa *corev1.ServiceAccount)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
})
}

// AlreadyExistMatcher matches the error to be already exist
type AlreadyExistMatcher struct {
}

// Match matches error.
func (matcher AlreadyExistMatcher) Match(actual interface{}) (success bool, err error) {
if actual == nil {
return false, nil
}
actualError := actual.(error)
return apierrors.IsAlreadyExists(actualError), nil
}

// FailureMessage builds an error message.
func (matcher AlreadyExistMatcher) FailureMessage(actual interface{}) (message string) {
return format.Message(actual, "to be already exist")
}

// NegatedFailureMessage builds an error message.
func (matcher AlreadyExistMatcher) NegatedFailureMessage(actual interface{}) (message string) {
return format.Message(actual, "not to be already exist")
}
2 changes: 1 addition & 1 deletion test/e2e/work_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
workapi "sigs.k8s.io/work-api/pkg/apis/v1alpha1"
"sigs.k8s.io/work-api/pkg/utils"

fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1"
fleetutil "go.goms.io/fleet/pkg/utils"
"go.goms.io/fleet/test/e2e/utils"
)

const (
Expand Down
Loading

0 comments on commit d0ae29f

Please sign in to comment.