Skip to content

Commit

Permalink
Clean up network policies watches
Browse files Browse the repository at this point in the history
  • Loading branch information
asincu committed Dec 15, 2023
1 parent ebf774d commit 1e663f7
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 23 deletions.
20 changes: 2 additions & 18 deletions pkg/controller/logstorage/elastic/elastic_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package elastic
import (
"context"
"fmt"
"github.com/tigera/operator/pkg/render/monitor"
"net/url"

cmnv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/common/v1"
Expand All @@ -38,11 +39,6 @@ import (
relasticsearch "github.com/tigera/operator/pkg/render/common/elasticsearch"
"github.com/tigera/operator/pkg/render/common/networkpolicy"
rsecret "github.com/tigera/operator/pkg/render/common/secret"
"github.com/tigera/operator/pkg/render/kubecontrollers"
"github.com/tigera/operator/pkg/render/logstorage/esgateway"
"github.com/tigera/operator/pkg/render/logstorage/esmetrics"
"github.com/tigera/operator/pkg/render/logstorage/linseed"
"github.com/tigera/operator/pkg/render/monitor"
"github.com/tigera/operator/pkg/tls/certificatemanagement"
apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -151,10 +147,6 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {
{Name: render.ElasticsearchInternalPolicyName, Namespace: render.ElasticsearchNamespace},
{Name: networkpolicy.TigeraComponentDefaultDenyPolicyName, Namespace: render.ElasticsearchNamespace},
{Name: networkpolicy.TigeraComponentDefaultDenyPolicyName, Namespace: render.KibanaNamespace},
{Name: esgateway.PolicyName, Namespace: render.ElasticsearchNamespace},
{Name: esmetrics.ElasticsearchMetricsPolicyName, Namespace: render.ElasticsearchNamespace},
{Name: kubecontrollers.EsKubeControllerNetworkPolicyName, Namespace: common.CalicoNamespace},
{Name: linseed.PolicyName, Namespace: render.ElasticsearchNamespace},
})

// Watch for changes in storage classes, as new storage classes may be made available for LogStorage.
Expand Down Expand Up @@ -186,14 +178,12 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {

// Establish watches for secrets in the tigera-operator namespace.
for _, secretName := range []string{
render.TigeraElasticsearchGatewaySecret,
render.TigeraKibanaCertSecret,
render.OIDCSecretName,
render.DexObjectName,
esmetrics.ElasticsearchMetricsServerTLSSecret,
render.TigeraLinseedSecret,
certificatemanagement.CASecretName,
monitor.PrometheusClientTLSSecretName,
relasticsearch.PublicCertSecret,
render.ElasticsearchAdminUserSecret,
render.ElasticsearchCuratorUserSecret,
render.TigeraElasticsearchInternalCertSecret,
Expand Down Expand Up @@ -242,12 +232,6 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {
if err := utils.AddServiceWatch(c, render.ElasticsearchServiceName, render.ElasticsearchNamespace); err != nil {
return fmt.Errorf("log-storage-elastic-controller failed to watch the Service resource: %w", err)
}
if err := utils.AddServiceWatch(c, esgateway.ServiceName, render.ElasticsearchNamespace); err != nil {
return fmt.Errorf("log-storage-elastic-controller failed to watch the Service resource: %w", err)
}
if err := utils.AddServiceWatch(c, render.LinseedServiceName, render.ElasticsearchNamespace); err != nil {
return fmt.Errorf("log-storage-elastic-controller failed to watch the Service resource: %w", err)
}

// Perform periodic reconciliation. This acts as a backstop to catch reconcile issues,
// and also makes sure we spot when things change that might not trigger a reconciliation.
Expand Down
41 changes: 41 additions & 0 deletions pkg/controller/logstorage/kubecontrollers/es_kube_controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ package kubecontrollers
import (
"context"
"fmt"
"time"

v3 "github.com/tigera/api/pkg/apis/projectcalico/v3"
"github.com/tigera/operator/pkg/render/common/networkpolicy"
"github.com/tigera/operator/pkg/render/logstorage/esgateway"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"

operatorv1 "github.com/tigera/operator/api/v1"

Expand Down Expand Up @@ -54,6 +61,7 @@ type ESKubeControllersController struct {
clusterDomain string
usePSP bool
elasticExternal bool
tierWatchReady *utils.ReadyFlag
}

func Add(mgr manager.Manager, opts options.AddOptions) error {
Expand All @@ -72,6 +80,7 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {
clusterDomain: opts.ClusterDomain,
status: status.New(mgr.GetClient(), "log-storage-kubecontrollers", opts.KubernetesVersion),
elasticExternal: opts.ElasticExternal,
tierWatchReady: &utils.ReadyFlag{},
}
r.status.Run(opts.ShutdownContext)

Expand Down Expand Up @@ -118,6 +127,9 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {
if err := utils.AddServiceWatch(c, render.ElasticsearchServiceName, render.ElasticsearchNamespace); err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to watch the Service resource: %w", err)
}
if err := utils.AddServiceWatch(c, esgateway.ServiceName, render.ElasticsearchNamespace); err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to watch the Service resource: %w", err)
}
if err := utils.AddConfigMapWatch(c, certificatemanagement.TrustedCertConfigMapName, common.CalicoNamespace, &handler.EnqueueRequestForObject{}); err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to watch the Service resource: %w", err)
}
Expand All @@ -129,6 +141,18 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {
return fmt.Errorf("log-storage-kubecontrollers failed to create periodic reconcile watch: %w", err)
}

k8sClient, err := kubernetes.NewForConfig(mgr.GetConfig())
if err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to establish a connection to k8s: %w", err)
}

// Start goroutines to establish watches against projectcalico.org/v3 resources.
go utils.WaitToAddTierWatch(networkpolicy.TigeraComponentTierName, c, k8sClient, log, r.tierWatchReady)
go utils.WaitToAddNetworkPolicyWatches(c, k8sClient, log, []types.NamespacedName{
{Name: esgateway.PolicyName, Namespace: render.ElasticsearchNamespace},
{Name: kubecontrollers.EsKubeControllerNetworkPolicyName, Namespace: common.CalicoNamespace},
})

return nil
}

Expand Down Expand Up @@ -170,6 +194,23 @@ func (r *ESKubeControllersController) Reconcile(ctx context.Context, request rec
return reconcile.Result{}, err
}

// Validate that the tier watch is ready before querying the tier to ensure we utilize the cache.
if !r.tierWatchReady.IsReady() {
r.status.SetDegraded(operatorv1.ResourceNotReady, "Waiting for Tier watch to be established", nil, reqLogger)
return reconcile.Result{RequeueAfter: 10 * time.Second}, nil
}

// Ensure the allow-tigera tier exists, before rendering any network policies within it.
if err := r.client.Get(ctx, client.ObjectKey{Name: networkpolicy.TigeraComponentTierName}, &v3.Tier{}); err != nil {
if errors.IsNotFound(err) {
r.status.SetDegraded(operatorv1.ResourceNotReady, "Waiting for allow-tigera tier to be created", err, reqLogger)
return reconcile.Result{RequeueAfter: 10 * time.Second}, nil
} else {
r.status.SetDegraded(operatorv1.ResourceReadError, "Error querying allow-tigera tier", err, reqLogger)
return reconcile.Result{}, err
}
}

managementClusterConnection, err := utils.GetManagementClusterConnection(ctx, r.client)
if err != nil {
r.status.SetDegraded(operatorv1.ResourceReadError, "Error reading ManagementClusterConnection", err, reqLogger)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func NewControllerWithShims(
provider operatorv1.Provider,
clusterDomain string,
multiTenant bool,
tierWatchReady *utils.ReadyFlag,
) (*ESKubeControllersController, error) {
opts := options.AddOptions{
DetectedProvider: provider,
Expand All @@ -75,10 +76,11 @@ func NewControllerWithShims(
}

r := &ESKubeControllersController{
client: cli,
scheme: scheme,
status: status,
clusterDomain: opts.ClusterDomain,
client: cli,
scheme: scheme,
status: status,
clusterDomain: opts.ClusterDomain,
tierWatchReady: tierWatchReady,
}
r.status.Run(opts.ShutdownContext)
return r, nil
Expand Down Expand Up @@ -176,7 +178,7 @@ var _ = Describe("LogStorage ES kube-controllers controller", func() {
Expect(cli.Create(ctx, bundle.ConfigMap(common.CalicoNamespace))).ShouldNot(HaveOccurred())

// Create the reconciler for the tests.
r, err = NewControllerWithShims(cli, scheme, mockStatus, operatorv1.ProviderNone, dns.DefaultClusterDomain, false)
r, err = NewControllerWithShims(cli, scheme, mockStatus, operatorv1.ProviderNone, dns.DefaultClusterDomain, false, readyFlag)
Expect(err).ShouldNot(HaveOccurred())
})

Expand Down
6 changes: 6 additions & 0 deletions pkg/controller/logstorage/linseed/linseed_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"fmt"
"net/url"

"github.com/tigera/operator/pkg/render/logstorage/esmetrics"

esv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/elasticsearch/v1"
v3 "github.com/tigera/api/pkg/apis/projectcalico/v3"
operatorv1 "github.com/tigera/operator/api/v1"
Expand Down Expand Up @@ -177,6 +179,10 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {
}

go utils.WaitToAddTierWatch(networkpolicy.TigeraComponentTierName, c, k8sClient, log, r.tierWatchReady)
go utils.WaitToAddNetworkPolicyWatches(c, k8sClient, log, []types.NamespacedName{
{Name: esmetrics.ElasticsearchMetricsPolicyName, Namespace: render.ElasticsearchNamespace},
{Name: linseed.PolicyName, Namespace: render.ElasticsearchNamespace},
})
go utils.WaitToAddResourceWatch(c, k8sClient, log, r.dpiAPIReady, []client.Object{&v3.DeepPacketInspection{TypeMeta: metav1.TypeMeta{Kind: v3.KindDeepPacketInspection}}})

// Perform periodic reconciliation. This acts as a backstop to catch reconcile issues,
Expand Down

0 comments on commit 1e663f7

Please sign in to comment.