Skip to content

Commit

Permalink
Merge pull request #2888 from asincu/clean_up_watches
Browse files Browse the repository at this point in the history
Clean up network policies watches
  • Loading branch information
caseydavenport authored Dec 15, 2023
2 parents ebf774d + 28b7d75 commit 1cca8a7
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 30 deletions.
6 changes: 0 additions & 6 deletions pkg/controller/logstorage/elastic/elastic_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@ import (
relasticsearch "github.com/tigera/operator/pkg/render/common/elasticsearch"
"github.com/tigera/operator/pkg/render/common/networkpolicy"
rsecret "github.com/tigera/operator/pkg/render/common/secret"
"github.com/tigera/operator/pkg/render/kubecontrollers"
"github.com/tigera/operator/pkg/render/logstorage/esgateway"
"github.com/tigera/operator/pkg/render/logstorage/esmetrics"
"github.com/tigera/operator/pkg/render/logstorage/linseed"
"github.com/tigera/operator/pkg/render/monitor"
"github.com/tigera/operator/pkg/tls/certificatemanagement"
apps "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -151,10 +149,6 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {
{Name: render.ElasticsearchInternalPolicyName, Namespace: render.ElasticsearchNamespace},
{Name: networkpolicy.TigeraComponentDefaultDenyPolicyName, Namespace: render.ElasticsearchNamespace},
{Name: networkpolicy.TigeraComponentDefaultDenyPolicyName, Namespace: render.KibanaNamespace},
{Name: esgateway.PolicyName, Namespace: render.ElasticsearchNamespace},
{Name: esmetrics.ElasticsearchMetricsPolicyName, Namespace: render.ElasticsearchNamespace},
{Name: kubecontrollers.EsKubeControllerNetworkPolicyName, Namespace: common.CalicoNamespace},
{Name: linseed.PolicyName, Namespace: render.ElasticsearchNamespace},
})

// Watch for changes in storage classes, as new storage classes may be made available for LogStorage.
Expand Down
59 changes: 47 additions & 12 deletions pkg/controller/logstorage/esmetrics/esmetrics_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ package esmetrics
import (
"context"
"fmt"
"time"

v3 "github.com/tigera/api/pkg/apis/projectcalico/v3"
"github.com/tigera/operator/pkg/render/common/networkpolicy"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"

operatorv1 "github.com/tigera/operator/api/v1"
"github.com/tigera/operator/pkg/common"
Expand Down Expand Up @@ -49,13 +55,14 @@ const (
)

type ESMetricsSubController struct {
client client.Client
scheme *runtime.Scheme
status status.StatusManager
provider operatorv1.Provider
clusterDomain string
usePSP bool
multiTenant bool
client client.Client
scheme *runtime.Scheme
status status.StatusManager
provider operatorv1.Provider
clusterDomain string
usePSP bool
multiTenant bool
tierWatchReady *utils.ReadyFlag
}

func Add(mgr manager.Manager, opts options.AddOptions) error {
Expand All @@ -70,11 +77,12 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {
}

r := &ESMetricsSubController{
client: mgr.GetClient(),
scheme: mgr.GetScheme(),
status: status.New(mgr.GetClient(), tigeraStatusName, opts.KubernetesVersion),
clusterDomain: opts.ClusterDomain,
provider: opts.DetectedProvider,
client: mgr.GetClient(),
scheme: mgr.GetScheme(),
status: status.New(mgr.GetClient(), tigeraStatusName, opts.KubernetesVersion),
clusterDomain: opts.ClusterDomain,
provider: opts.DetectedProvider,
tierWatchReady: &utils.ReadyFlag{},
}
r.status.Run(opts.ShutdownContext)

Expand Down Expand Up @@ -106,6 +114,16 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {
}
}

k8sClient, err := kubernetes.NewForConfig(mgr.GetConfig())
if err != nil {
return fmt.Errorf("log-storage-esmetrics-controller failed to establish a connection to k8s: %w", err)
}

go utils.WaitToAddTierWatch(networkpolicy.TigeraComponentTierName, c, k8sClient, log, r.tierWatchReady)
go utils.WaitToAddNetworkPolicyWatches(c, k8sClient, log, []types.NamespacedName{
{Name: esmetrics.ElasticsearchMetricsPolicyName, Namespace: render.ElasticsearchNamespace},
})

return nil
}

Expand Down Expand Up @@ -144,6 +162,23 @@ func (r *ESMetricsSubController) Reconcile(ctx context.Context, request reconcil
return reconcile.Result{RequeueAfter: utils.StandardRetry}, nil
}

// Validate that the tier watch is ready before querying the tier to ensure we utilize the cache.
if !r.tierWatchReady.IsReady() {
r.status.SetDegraded(operatorv1.ResourceNotReady, "Waiting for Tier watch to be established", nil, reqLogger)
return reconcile.Result{RequeueAfter: utils.StandardRetry}, nil
}

// Ensure the allow-tigera tier exists, before rendering any network policies within it.
if err := r.client.Get(ctx, client.ObjectKey{Name: networkpolicy.TigeraComponentTierName}, &v3.Tier{}); err != nil {
if errors.IsNotFound(err) {
r.status.SetDegraded(operatorv1.ResourceNotReady, "Waiting for allow-tigera tier to be created", err, reqLogger)
return reconcile.Result{RequeueAfter: 10 * time.Second}, nil
} else {
r.status.SetDegraded(operatorv1.ResourceReadError, "Error querying allow-tigera tier", err, reqLogger)
return reconcile.Result{}, err
}
}

esMetricsSecret, err := utils.GetSecret(context.Background(), r.client, esmetrics.ElasticsearchMetricsSecret, common.OperatorNamespace())
if err != nil {
r.status.SetDegraded(operatorv1.ResourceReadError, "Failed to retrieve Elasticsearch metrics user secret.", err, reqLogger)
Expand Down
24 changes: 18 additions & 6 deletions pkg/controller/logstorage/esmetrics/esmetrics_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package esmetrics
import (
"context"

v3 "github.com/tigera/api/pkg/apis/projectcalico/v3"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -50,6 +52,7 @@ func NewESMetricsControllerWithShims(
provider operatorv1.Provider,
clusterDomain string,
multiTenant bool,
readyFlag *utils.ReadyFlag,
) (*ESMetricsSubController, error) {

opts := options.AddOptions{
Expand All @@ -60,11 +63,12 @@ func NewESMetricsControllerWithShims(
}

r := &ESMetricsSubController{
client: cli,
scheme: scheme,
status: status,
clusterDomain: opts.ClusterDomain,
multiTenant: opts.MultiTenant,
client: cli,
scheme: scheme,
status: status,
clusterDomain: opts.ClusterDomain,
multiTenant: opts.MultiTenant,
tierWatchReady: readyFlag,
}
r.status.Run(opts.ShutdownContext)
return r, nil
Expand All @@ -77,6 +81,7 @@ var _ = Describe("LogStorage Linseed controller", func() {
scheme *runtime.Scheme
ctx context.Context
r *ESMetricsSubController
readyFlag *utils.ReadyFlag
)
BeforeEach(func() {
scheme = runtime.NewScheme()
Expand All @@ -98,8 +103,15 @@ var _ = Describe("LogStorage Linseed controller", func() {
mockStatus.On("ReadyToMonitor")
mockStatus.On("ClearDegraded")

readyFlag = &utils.ReadyFlag{}
readyFlag.MarkAsReady()

// Create the allow-tigera Tier, since the controller blocks on its existence.
tier := &v3.Tier{ObjectMeta: metav1.ObjectMeta{Name: "allow-tigera"}}
Expect(cli.Create(ctx, tier)).ShouldNot(HaveOccurred())

var err error
r, err = NewESMetricsControllerWithShims(cli, scheme, mockStatus, operatorv1.ProviderNone, dns.DefaultClusterDomain, false)
r, err = NewESMetricsControllerWithShims(cli, scheme, mockStatus, operatorv1.ProviderNone, dns.DefaultClusterDomain, false, readyFlag)
Expect(err).ShouldNot(HaveOccurred())
})

Expand Down
41 changes: 41 additions & 0 deletions pkg/controller/logstorage/kubecontrollers/es_kube_controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ package kubecontrollers
import (
"context"
"fmt"
"time"

v3 "github.com/tigera/api/pkg/apis/projectcalico/v3"
"github.com/tigera/operator/pkg/render/common/networkpolicy"
"github.com/tigera/operator/pkg/render/logstorage/esgateway"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"

operatorv1 "github.com/tigera/operator/api/v1"

Expand Down Expand Up @@ -54,6 +61,7 @@ type ESKubeControllersController struct {
clusterDomain string
usePSP bool
elasticExternal bool
tierWatchReady *utils.ReadyFlag
}

func Add(mgr manager.Manager, opts options.AddOptions) error {
Expand All @@ -72,6 +80,7 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {
clusterDomain: opts.ClusterDomain,
status: status.New(mgr.GetClient(), "log-storage-kubecontrollers", opts.KubernetesVersion),
elasticExternal: opts.ElasticExternal,
tierWatchReady: &utils.ReadyFlag{},
}
r.status.Run(opts.ShutdownContext)

Expand Down Expand Up @@ -118,6 +127,9 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {
if err := utils.AddServiceWatch(c, render.ElasticsearchServiceName, render.ElasticsearchNamespace); err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to watch the Service resource: %w", err)
}
if err := utils.AddServiceWatch(c, esgateway.ServiceName, render.ElasticsearchNamespace); err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to watch the Service resource: %w", err)
}
if err := utils.AddConfigMapWatch(c, certificatemanagement.TrustedCertConfigMapName, common.CalicoNamespace, &handler.EnqueueRequestForObject{}); err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to watch the Service resource: %w", err)
}
Expand All @@ -129,6 +141,18 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {
return fmt.Errorf("log-storage-kubecontrollers failed to create periodic reconcile watch: %w", err)
}

k8sClient, err := kubernetes.NewForConfig(mgr.GetConfig())
if err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to establish a connection to k8s: %w", err)
}

// Start goroutines to establish watches against projectcalico.org/v3 resources.
go utils.WaitToAddTierWatch(networkpolicy.TigeraComponentTierName, c, k8sClient, log, r.tierWatchReady)
go utils.WaitToAddNetworkPolicyWatches(c, k8sClient, log, []types.NamespacedName{
{Name: esgateway.PolicyName, Namespace: render.ElasticsearchNamespace},
{Name: kubecontrollers.EsKubeControllerNetworkPolicyName, Namespace: common.CalicoNamespace},
})

return nil
}

Expand Down Expand Up @@ -170,6 +194,23 @@ func (r *ESKubeControllersController) Reconcile(ctx context.Context, request rec
return reconcile.Result{}, err
}

// Validate that the tier watch is ready before querying the tier to ensure we utilize the cache.
if !r.tierWatchReady.IsReady() {
r.status.SetDegraded(operatorv1.ResourceNotReady, "Waiting for Tier watch to be established", nil, reqLogger)
return reconcile.Result{RequeueAfter: 10 * time.Second}, nil
}

// Ensure the allow-tigera tier exists, before rendering any network policies within it.
if err := r.client.Get(ctx, client.ObjectKey{Name: networkpolicy.TigeraComponentTierName}, &v3.Tier{}); err != nil {
if errors.IsNotFound(err) {
r.status.SetDegraded(operatorv1.ResourceNotReady, "Waiting for allow-tigera tier to be created", err, reqLogger)
return reconcile.Result{RequeueAfter: 10 * time.Second}, nil
} else {
r.status.SetDegraded(operatorv1.ResourceReadError, "Error querying allow-tigera tier", err, reqLogger)
return reconcile.Result{}, err
}
}

managementClusterConnection, err := utils.GetManagementClusterConnection(ctx, r.client)
if err != nil {
r.status.SetDegraded(operatorv1.ResourceReadError, "Error reading ManagementClusterConnection", err, reqLogger)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"fmt"

v3 "github.com/tigera/api/pkg/apis/projectcalico/v3"

controllerruntime "sigs.k8s.io/controller-runtime"

. "github.com/onsi/ginkgo"
Expand Down Expand Up @@ -66,6 +68,7 @@ func NewControllerWithShims(
provider operatorv1.Provider,
clusterDomain string,
multiTenant bool,
tierWatchReady *utils.ReadyFlag,
) (*ESKubeControllersController, error) {
opts := options.AddOptions{
DetectedProvider: provider,
Expand All @@ -75,10 +78,11 @@ func NewControllerWithShims(
}

r := &ESKubeControllersController{
client: cli,
scheme: scheme,
status: status,
clusterDomain: opts.ClusterDomain,
client: cli,
scheme: scheme,
status: status,
clusterDomain: opts.ClusterDomain,
tierWatchReady: tierWatchReady,
}
r.status.Run(opts.ShutdownContext)
return r, nil
Expand Down Expand Up @@ -176,8 +180,12 @@ var _ = Describe("LogStorage ES kube-controllers controller", func() {
Expect(cli.Create(ctx, bundle.ConfigMap(common.CalicoNamespace))).ShouldNot(HaveOccurred())

// Create the reconciler for the tests.
r, err = NewControllerWithShims(cli, scheme, mockStatus, operatorv1.ProviderNone, dns.DefaultClusterDomain, false)
r, err = NewControllerWithShims(cli, scheme, mockStatus, operatorv1.ProviderNone, dns.DefaultClusterDomain, false, readyFlag)
Expect(err).ShouldNot(HaveOccurred())

// Create the allow-tigera Tier, since the controller blocks on its existence.
tier := &v3.Tier{ObjectMeta: metav1.ObjectMeta{Name: "allow-tigera"}}
Expect(cli.Create(ctx, tier)).ShouldNot(HaveOccurred())
})

It("should wait for the cluster CA to be provisioned", func() {
Expand Down
5 changes: 4 additions & 1 deletion pkg/controller/logstorage/linseed/linseed_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,13 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {

k8sClient, err := kubernetes.NewForConfig(mgr.GetConfig())
if err != nil {
return fmt.Errorf("log-storage-elastic-controller failed to establish a connection to k8s: %w", err)
return fmt.Errorf("log-storage-linseed-controller failed to establish a connection to k8s: %w", err)
}

go utils.WaitToAddTierWatch(networkpolicy.TigeraComponentTierName, c, k8sClient, log, r.tierWatchReady)
go utils.WaitToAddNetworkPolicyWatches(c, k8sClient, log, []types.NamespacedName{
{Name: linseed.PolicyName, Namespace: helper.InstallNamespace()},
})
go utils.WaitToAddResourceWatch(c, k8sClient, log, r.dpiAPIReady, []client.Object{&v3.DeepPacketInspection{TypeMeta: metav1.TypeMeta{Kind: v3.KindDeepPacketInspection}}})

// Perform periodic reconciliation. This acts as a backstop to catch reconcile issues,
Expand Down

0 comments on commit 1cca8a7

Please sign in to comment.