From 090ab11eb0052304f6285531d94318fd886051c3 Mon Sep 17 00:00:00 2001 From: Martin Dekov Date: Tue, 28 Jul 2020 23:43:36 +0300 Subject: [PATCH] Add multiple namespaces in the operator Adding support for multiple namespaces in the operator as per: https://github.com/openfaas/faas-netes/issues/616 Signed-off-by: Martin Dekov --- chart/openfaas/templates/gateway-dep.yaml | 2 + chart/openfaas/templates/operator-rbac.yaml | 3 + main.go | 9 +- pkg/config/read_config.go | 4 + pkg/handlers/namespaces.go | 7 +- pkg/handlers/secrets.go | 35 ++-- pkg/server/apply.go | 7 +- pkg/server/delete.go | 17 +- pkg/server/list.go | 24 ++- pkg/server/namespace.go | 13 +- pkg/server/proxy.go | 89 ----------- pkg/server/replicas.go | 42 +++-- pkg/server/secret.go | 169 +------------------- pkg/server/server.go | 6 +- 14 files changed, 127 insertions(+), 300 deletions(-) delete mode 100644 pkg/server/proxy.go diff --git a/chart/openfaas/templates/gateway-dep.yaml b/chart/openfaas/templates/gateway-dep.yaml index fedb41591..6e274905e 100644 --- a/chart/openfaas/templates/gateway-dep.yaml +++ b/chart/openfaas/templates/gateway-dep.yaml @@ -194,6 +194,8 @@ spec: value: "{{ .Values.faasnetes.livenessProbe.timeoutSeconds }}" - name: liveness_probe_period_seconds value: "{{ .Values.faasnetes.livenessProbe.periodSeconds }}" + - name: cluster_role + value: "{{ .Values.clusterRole }}" ports: - containerPort: 8081 protocol: TCP diff --git a/chart/openfaas/templates/operator-rbac.yaml b/chart/openfaas/templates/operator-rbac.yaml index 94af0aa66..5ce95bdf7 100644 --- a/chart/openfaas/templates/operator-rbac.yaml +++ b/chart/openfaas/templates/operator-rbac.yaml @@ -98,6 +98,9 @@ rules: - apiGroups: [""] resources: ["pods", "pods/log", "namespaces", "endpoints"] verbs: ["get", "list", "watch"] + - apiGroups: [""] + resources: ["events"] + verbs: ["get", "list", "watch", "create", "update", "patch", "delete"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding diff --git a/main.go b/main.go index 0c7fd548d..874e2d9f3 100644 --- a/main.go +++ b/main.go @@ -103,10 +103,15 @@ func main() { // auto-scaling is does via the HTTP API that acts on the deployment Spec.Replicas defaultResync := time.Minute * 5 - kubeInformerOpt := kubeinformers.WithNamespace(config.DefaultFunctionNamespace) + namespaceScope := config.DefaultFunctionNamespace + if operator && config.ClusterRole { + namespaceScope = "" + } + + kubeInformerOpt := kubeinformers.WithNamespace(namespaceScope) kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, defaultResync, kubeInformerOpt) - faasInformerOpt := informers.WithNamespace(config.DefaultFunctionNamespace) + faasInformerOpt := informers.WithNamespace(namespaceScope) faasInformerFactory := informers.NewSharedInformerFactoryWithOptions(faasClient, defaultResync, faasInformerOpt) // this is where we need to swap to the faasInformerFactory diff --git a/pkg/config/read_config.go b/pkg/config/read_config.go index 36ccee9b5..962871345 100644 --- a/pkg/config/read_config.go +++ b/pkg/config/read_config.go @@ -49,6 +49,7 @@ func (ReadConfig) Read(hasEnv ftypes.HasEnv) (BootstrapConfig, error) { cfg.DefaultFunctionNamespace = ftypes.ParseString(hasEnv.Getenv("function_namespace"), "default") cfg.ProfilesNamespace = ftypes.ParseString(hasEnv.Getenv("profiles_namespace"), cfg.DefaultFunctionNamespace) + cfg.ClusterRole = ftypes.ParseBoolValue(hasEnv.Getenv("cluster_role"), false) cfg.HTTPProbe = httpProbe cfg.SetNonRootUser = setNonRootUser @@ -105,6 +106,8 @@ type BootstrapConfig struct { ProfilesNamespace string // FaaSConfig contains the configuration for the FaaSProvider FaaSConfig ftypes.FaaSConfig + // ClusterRole determines whether the operator should have cluster wide access + ClusterRole bool } // Fprint pretty-prints the config with the stdlib logger. One line per config value. @@ -128,5 +131,6 @@ func (c BootstrapConfig) Fprint(verbose bool) { log.Printf("LivenessProbeInitialDelaySeconds: %d\n", c.LivenessProbeInitialDelaySeconds) log.Printf("LivenessProbeTimeoutSeconds: %d\n", c.LivenessProbeTimeoutSeconds) log.Printf("LivenessProbePeriodSeconds: %d\n", c.LivenessProbePeriodSeconds) + log.Printf("ClusterRole: %v\n", c.ClusterRole) } } diff --git a/pkg/handlers/namespaces.go b/pkg/handlers/namespaces.go index c8a80ab88..a1bde0955 100644 --- a/pkg/handlers/namespaces.go +++ b/pkg/handlers/namespaces.go @@ -22,7 +22,7 @@ func MakeNamespacesLister(defaultNamespace string, clientset kubernetes.Interfac return func(w http.ResponseWriter, r *http.Request) { log.Println("Query namespaces") - res := list(defaultNamespace, clientset) + res := ListNamespaces(defaultNamespace, clientset) out, _ := json.Marshal(res) w.Header().Set("Content-Type", "application/json") @@ -63,7 +63,7 @@ func NewNamespaceResolver(defaultNamespace string, kube kubernetes.Interface) Na r.Body = ioutil.NopCloser(bytes.NewBuffer(body)) } - allowedNamespaces := list(defaultNamespace, kube) + allowedNamespaces := ListNamespaces(defaultNamespace, kube) ok := findNamespace(req.Namespace, allowedNamespaces) if !ok { return req.Namespace, fmt.Errorf("unable to manage secrets within the %s namespace", req.Namespace) @@ -73,7 +73,8 @@ func NewNamespaceResolver(defaultNamespace string, kube kubernetes.Interface) Na } } -func list(defaultNamespace string, clientset kubernetes.Interface) []string { +// ListNamespaces lists all namespaces annotated with openfaas true +func ListNamespaces(defaultNamespace string, clientset kubernetes.Interface) []string { listOptions := metav1.ListOptions{} namespaces, err := clientset.CoreV1().Namespaces().List(context.TODO(), listOptions) diff --git a/pkg/handlers/secrets.go b/pkg/handlers/secrets.go index 0236427dd..3852cd05d 100644 --- a/pkg/handlers/secrets.go +++ b/pkg/handlers/secrets.go @@ -14,26 +14,27 @@ import ( ) // MakeSecretHandler makes a handler for Create/List/Delete/Update of -//secrets in the Kubernetes API +// secrets in the Kubernetes API func MakeSecretHandler(defaultNamespace string, kube kubernetes.Interface) http.HandlerFunc { - handler := secretsHandler{ - lookupNamespace: NewNamespaceResolver(defaultNamespace, kube), - secrets: k8s.NewSecretsClient(kube), + handler := SecretsHandler{ + LookupNamespace: NewNamespaceResolver(defaultNamespace, kube), + Secrets: k8s.NewSecretsClient(kube), } return handler.ServeHTTP } -type secretsHandler struct { - secrets k8s.SecretsClient - lookupNamespace NamespaceResolver +// SecretsHandler enabling to create openfaas secrets across namespaces +type SecretsHandler struct { + Secrets k8s.SecretsClient + LookupNamespace NamespaceResolver } -func (h secretsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (h SecretsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if r.Body != nil { defer r.Body.Close() } - lookupNamespace, err := h.lookupNamespace(r) + lookupNamespace, err := h.LookupNamespace(r) if err != nil { switch err.Error() { case "unable to unmarshal Secret request": @@ -61,8 +62,8 @@ func (h secretsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } -func (h secretsHandler) listSecrets(namespace string, w http.ResponseWriter, r *http.Request) { - res, err := h.secrets.List(namespace) +func (h SecretsHandler) listSecrets(namespace string, w http.ResponseWriter, r *http.Request) { + res, err := h.Secrets.List(namespace) if err != nil { status, reason := ProcessErrorReasons(err) log.Printf("Secret list error reason: %s, %v\n", reason, err) @@ -88,7 +89,7 @@ func (h secretsHandler) listSecrets(namespace string, w http.ResponseWriter, r * w.Write(secretsBytes) } -func (h secretsHandler) createSecret(namespace string, w http.ResponseWriter, r *http.Request) { +func (h SecretsHandler) createSecret(namespace string, w http.ResponseWriter, r *http.Request) { secret := types.Secret{} err := json.NewDecoder(r.Body).Decode(&secret) if err != nil { @@ -98,7 +99,7 @@ func (h secretsHandler) createSecret(namespace string, w http.ResponseWriter, r } secret.Namespace = namespace - err = h.secrets.Create(secret) + err = h.Secrets.Create(secret) if err != nil { status, reason := ProcessErrorReasons(err) log.Printf("Secret create error reason: %s, %v\n", reason, err) @@ -109,7 +110,7 @@ func (h secretsHandler) createSecret(namespace string, w http.ResponseWriter, r w.WriteHeader(http.StatusAccepted) } -func (h secretsHandler) replaceSecret(namespace string, w http.ResponseWriter, r *http.Request) { +func (h SecretsHandler) replaceSecret(namespace string, w http.ResponseWriter, r *http.Request) { secret := types.Secret{} err := json.NewDecoder(r.Body).Decode(&secret) if err != nil { @@ -119,7 +120,7 @@ func (h secretsHandler) replaceSecret(namespace string, w http.ResponseWriter, r } secret.Namespace = namespace - err = h.secrets.Replace(secret) + err = h.Secrets.Replace(secret) if err != nil { status, reason := ProcessErrorReasons(err) log.Printf("Secret update error reason: %s, %v\n", reason, err) @@ -130,7 +131,7 @@ func (h secretsHandler) replaceSecret(namespace string, w http.ResponseWriter, r w.WriteHeader(http.StatusAccepted) } -func (h secretsHandler) deleteSecret(namespace string, w http.ResponseWriter, r *http.Request) { +func (h SecretsHandler) deleteSecret(namespace string, w http.ResponseWriter, r *http.Request) { secret := types.Secret{} err := json.NewDecoder(r.Body).Decode(&secret) if err != nil { @@ -139,7 +140,7 @@ func (h secretsHandler) deleteSecret(namespace string, w http.ResponseWriter, r return } - err = h.secrets.Delete(namespace, secret.Name) + err = h.Secrets.Delete(namespace, secret.Name) if err != nil { status, reason := ProcessErrorReasons(err) log.Printf("Secret delete error reason: %s, %v\n", reason, err) diff --git a/pkg/server/apply.go b/pkg/server/apply.go index 1e05d2ea1..be54c3149 100644 --- a/pkg/server/apply.go +++ b/pkg/server/apply.go @@ -16,7 +16,7 @@ import ( "k8s.io/klog" ) -func makeApplyHandler(namespace string, client clientset.Interface) http.HandlerFunc { +func makeApplyHandler(defaultNamespace string, client clientset.Interface) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if r.Body != nil { @@ -33,6 +33,11 @@ func makeApplyHandler(namespace string, client clientset.Interface) http.Handler } klog.Infof("Deployment request for: %s\n", req.Service) + namespace := defaultNamespace + if len(req.Namespace) > 0 { + namespace = req.Namespace + } + opts := metav1.GetOptions{} got, err := client.OpenfaasV1().Functions(namespace).Get(context.TODO(), req.Service, opts) miss := false diff --git a/pkg/server/delete.go b/pkg/server/delete.go index d568b0b3a..f5168032d 100644 --- a/pkg/server/delete.go +++ b/pkg/server/delete.go @@ -13,9 +13,22 @@ import ( glog "k8s.io/klog" ) -func makeDeleteHandler(namespace string, client clientset.Interface) http.HandlerFunc { +func makeDeleteHandler(defaultNamespace string, client clientset.Interface) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + q := r.URL.Query() + namespace := q.Get("namespace") + + lookupNamespace := defaultNamespace + if len(namespace) > 0 { + lookupNamespace = namespace + } + + if namespace == "kube-system" { + http.Error(w, "unable to list within the kube-system namespace", http.StatusUnauthorized) + return + } + if r.Body != nil { defer r.Body.Close() } @@ -35,7 +48,7 @@ func makeDeleteHandler(namespace string, client clientset.Interface) http.Handle return } - err = client.OpenfaasV1().Functions(namespace). + err = client.OpenfaasV1().Functions(lookupNamespace). Delete(context.TODO(), request.FunctionName, metav1.DeleteOptions{}) if err != nil { w.WriteHeader(http.StatusInternalServerError) diff --git a/pkg/server/list.go b/pkg/server/list.go index d61c312d5..ef4febb04 100644 --- a/pkg/server/list.go +++ b/pkg/server/list.go @@ -13,19 +13,33 @@ import ( glog "k8s.io/klog" ) -func makeListHandler(namespace string, +func makeListHandler(defaultNamespace string, client clientset.Interface, - deploymentLister appsv1.DeploymentNamespaceLister) http.HandlerFunc { + deploymentLister appsv1.DeploymentLister) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if r.Body != nil { defer r.Body.Close() } + q := r.URL.Query() + namespace := q.Get("namespace") + + lookupNamespace := defaultNamespace + + if len(namespace) > 0 { + lookupNamespace = namespace + } + + if lookupNamespace == "kube-system" { + http.Error(w, "unable to list within the kube-system namespace", http.StatusUnauthorized) + return + } + functions := []types.FunctionStatus{} opts := metav1.ListOptions{} - res, err := client.OpenfaasV1().Functions(namespace).List(context.TODO(), opts) + res, err := client.OpenfaasV1().Functions(lookupNamespace).List(context.TODO(), opts) if err != nil { w.WriteHeader(http.StatusBadRequest) w.Write([]byte(err.Error())) @@ -35,7 +49,7 @@ func makeListHandler(namespace string, for _, item := range res.Items { - desiredReplicas, availableReplicas, err := getReplicas(item.Spec.Name, namespace, deploymentLister) + desiredReplicas, availableReplicas, err := getReplicas(item.Spec.Name, lookupNamespace, deploymentLister) if err != nil { glog.Warningf("Function listing getReplicas error: %v", err) } @@ -47,7 +61,7 @@ func makeListHandler(namespace string, Image: item.Spec.Image, Labels: item.Spec.Labels, Annotations: item.Spec.Annotations, - Namespace: namespace, + Namespace: lookupNamespace, } functions = append(functions, function) diff --git a/pkg/server/namespace.go b/pkg/server/namespace.go index 11c110895..15dba36e7 100644 --- a/pkg/server/namespace.go +++ b/pkg/server/namespace.go @@ -3,16 +3,19 @@ package server import ( "encoding/json" "net/http" + + "github.com/openfaas/faas-netes/pkg/handlers" + "k8s.io/client-go/kubernetes" ) -func makeListNamespaceHandler(defaultNamespace string) func(http.ResponseWriter, *http.Request) { +func makeListNamespaceHandler(defaultNamespace string, clientset kubernetes.Interface) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, r *http.Request) { + res := handlers.ListNamespaces(defaultNamespace, clientset) - defer r.Body.Close() - - res, _ := json.Marshal([]string{defaultNamespace}) + out, _ := json.Marshal(res) w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) - w.Write(res) + w.Write(out) } } diff --git a/pkg/server/proxy.go b/pkg/server/proxy.go deleted file mode 100644 index fbbff94bf..000000000 --- a/pkg/server/proxy.go +++ /dev/null @@ -1,89 +0,0 @@ -package server - -import ( - "bytes" - "fmt" - "io" - "net" - "net/http" - "time" - - "github.com/gorilla/mux" - "github.com/openfaas/faas/gateway/requests" - glog "k8s.io/klog" -) - -// makeProxy creates a proxy for HTTP web requests which can be routed to a function. -func makeProxy(functionNamespace string, timeout time.Duration) http.HandlerFunc { - proxyClient := http.Client{ - Transport: &http.Transport{ - Proxy: http.ProxyFromEnvironment, - DialContext: (&net.Dialer{ - Timeout: timeout, - KeepAlive: 1 * time.Second, - }).DialContext, - IdleConnTimeout: 120 * time.Millisecond, - ExpectContinueTimeout: 1500 * time.Millisecond, - }, - } - - return func(w http.ResponseWriter, r *http.Request) { - if r.Body != nil { - defer r.Body.Close() - } - - switch r.Method { - case http.MethodPost, - http.MethodPut, - http.MethodPatch, - http.MethodDelete, - http.MethodGet: - - vars := mux.Vars(r) - service := vars["name"] - - defer func(when time.Time) { - seconds := time.Since(when).Seconds() - glog.V(2).Infof("%s took %f seconds", service, seconds) - }(time.Now()) - - forwardReq := requests.NewForwardRequest(r.Method, *r.URL) - - url := forwardReq.ToURL(fmt.Sprintf("%s.%s", service, functionNamespace), 8080) - - request, _ := http.NewRequest(r.Method, url, r.Body) - - copyHeaders(&request.Header, &r.Header) - - defer request.Body.Close() - - response, err := proxyClient.Do(request) - - if err != nil { - glog.Errorf("%s error: %s", service, err.Error()) - writeHead(service, http.StatusInternalServerError, w) - buf := bytes.NewBufferString("Can't reach service: " + service) - w.Write(buf.Bytes()) - return - } - - clientHeader := w.Header() - copyHeaders(&clientHeader, &response.Header) - - writeHead(service, http.StatusOK, w) - io.Copy(w, response.Body) - } - } -} - -func writeHead(service string, code int, w http.ResponseWriter) { - w.WriteHeader(code) -} - -func copyHeaders(destination *http.Header, source *http.Header) { - for k, v := range *source { - vClone := make([]string, len(v)) - copy(vClone, v) - (*destination)[k] = vClone - } -} diff --git a/pkg/server/replicas.go b/pkg/server/replicas.go index ac0e281e5..d513e1c91 100644 --- a/pkg/server/replicas.go +++ b/pkg/server/replicas.go @@ -15,21 +15,29 @@ import ( glog "k8s.io/klog" ) -func makeReplicaReader(namespace string, client clientset.Interface, lister v1.DeploymentNamespaceLister) http.HandlerFunc { +func makeReplicaReader(defaultNamespace string, client clientset.Interface, lister v1.DeploymentLister) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) functionName := vars["name"] + q := r.URL.Query() + namespace := q.Get("namespace") + + lookupNamespace := defaultNamespace + + if len(namespace) > 0 { + lookupNamespace = namespace + } + opts := metav1.GetOptions{} - k8sfunc, err := client.OpenfaasV1().Functions(namespace). + k8sfunc, err := client.OpenfaasV1().Functions(lookupNamespace). Get(context.TODO(), functionName, opts) if err != nil { w.WriteHeader(http.StatusNotFound) w.Write([]byte(err.Error())) return } - - desiredReplicas, availableReplicas, err := getReplicas(functionName, namespace, lister) + desiredReplicas, availableReplicas, err := getReplicas(functionName, lookupNamespace, lister) if err != nil { glog.Warningf("Function replica reader error: %v", err) } @@ -42,7 +50,7 @@ func makeReplicaReader(namespace string, client clientset.Interface, lister v1.D Name: k8sfunc.Spec.Name, EnvProcess: k8sfunc.Spec.Handler, Image: k8sfunc.Spec.Image, - Namespace: namespace, + Namespace: lookupNamespace, } res, _ := json.Marshal(result) @@ -52,8 +60,8 @@ func makeReplicaReader(namespace string, client clientset.Interface, lister v1.D } } -func getReplicas(functionName string, namespace string, lister v1.DeploymentNamespaceLister) (uint64, uint64, error) { - dep, err := lister.Get(functionName) +func getReplicas(functionName string, namespace string, lister v1.DeploymentLister) (uint64, uint64, error) { + dep, err := lister.Deployments(namespace).Get(functionName) if err != nil { return 0, 0, err } @@ -63,11 +71,25 @@ func getReplicas(functionName string, namespace string, lister v1.DeploymentName return desiredReplicas, availableReplicas, nil } -func makeReplicaHandler(namespace string, kube kubernetes.Interface) http.HandlerFunc { +func makeReplicaHandler(defaultNamespace string, kube kubernetes.Interface) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) functionName := vars["name"] + q := r.URL.Query() + namespace := q.Get("namespace") + + lookupNamespace := defaultNamespace + + if len(namespace) > 0 { + lookupNamespace = namespace + } + + if lookupNamespace == "kube-system" { + http.Error(w, "unable to list within the kube-system namespace", http.StatusUnauthorized) + return + } + req := types.ScaleServiceRequest{} if r.Body != nil { defer r.Body.Close() @@ -81,7 +103,7 @@ func makeReplicaHandler(namespace string, kube kubernetes.Interface) http.Handle } opts := metav1.GetOptions{} - dep, err := kube.AppsV1().Deployments(namespace).Get(context.TODO(), functionName, opts) + dep, err := kube.AppsV1().Deployments(lookupNamespace).Get(context.TODO(), functionName, opts) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) @@ -90,7 +112,7 @@ func makeReplicaHandler(namespace string, kube kubernetes.Interface) http.Handle } dep.Spec.Replicas = int32p(int32(req.Replicas)) - _, err = kube.AppsV1().Deployments(namespace).Update(context.TODO(), dep, metav1.UpdateOptions{}) + _, err = kube.AppsV1().Deployments(lookupNamespace).Update(context.TODO(), dep, metav1.UpdateOptions{}) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) diff --git a/pkg/server/secret.go b/pkg/server/secret.go index 7f5c1e3c3..a5b5e24fc 100644 --- a/pkg/server/secret.go +++ b/pkg/server/secret.go @@ -1,19 +1,12 @@ package server import ( - "context" - "encoding/json" - "fmt" - "io" - "io/ioutil" "net/http" - faastypes "github.com/openfaas/faas-provider/types" + "github.com/openfaas/faas-netes/pkg/k8s" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/openfaas/faas-netes/pkg/handlers" "k8s.io/client-go/kubernetes" - glog "k8s.io/klog" ) const ( @@ -23,159 +16,9 @@ const ( // makeSecretHandler provides the secrets CRUD endpoint func makeSecretHandler(namespace string, kube kubernetes.Interface) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - if r.Body != nil { - defer r.Body.Close() - } - - switch r.Method { - case http.MethodGet: - secrets, err := getSecrets(namespace, kube) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte(err.Error())) - glog.Errorf("Secrets query error: %v", err) - return - } - - secretsBytes, err := json.Marshal(secrets) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte(err.Error())) - glog.Errorf("Secrets json marshal error: %v", err) - return - } - - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - w.Write(secretsBytes) - case http.MethodPost: - secret, err := parseSecret(namespace, r.Body) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - w.Write([]byte(err.Error())) - glog.Errorf("Secret unmarshal error: %v", err) - return - } - - if err := createSecret(namespace, kube, secret); err != nil { - w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte(err.Error())) - glog.Errorf("Secret create error: %v", err) - return - } - - glog.Infof("Secret %s created", secret.GetName()) - w.WriteHeader(http.StatusAccepted) - case http.MethodPut: - secret, err := parseSecret(namespace, r.Body) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - w.Write([]byte(err.Error())) - glog.Errorf("Secret unmarshal error: %v", err) - return - } - - if err := updateSecret(namespace, kube, secret); err != nil { - w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte(err.Error())) - glog.Errorf("Secret update error: %v", err) - return - } - - glog.Infof("Secret %s updated", secret.GetName()) - w.WriteHeader(http.StatusAccepted) - case http.MethodDelete: - secret, err := parseSecret(namespace, r.Body) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - w.Write([]byte(err.Error())) - glog.Errorf("Secret unmarshal error: %v", err) - return - } - - if err := deleteSecret(namespace, kube, secret); err != nil { - w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte(err.Error())) - glog.Errorf("Secret %s delete error: %v", secret.GetName(), err) - return - } - - glog.Infof("Secret %s deleted", secret.GetName()) - w.WriteHeader(http.StatusAccepted) - default: - w.WriteHeader(http.StatusBadRequest) - return - } - } -} - -func getSecrets(namespace string, kube kubernetes.Interface) ([]faastypes.Secret, error) { - secrets := []faastypes.Secret{} - selector := fmt.Sprintf("%s=%s", secretLabel, secretLabelValue) - - res, err := kube.CoreV1().Secrets(namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: selector}) - - if err != nil { - return secrets, err - } - - for _, item := range res.Items { - secret := faastypes.Secret{ - Name: item.Name, - } - secrets = append(secrets, secret) + handler := handlers.SecretsHandler{ + LookupNamespace: handlers.NewNamespaceResolver(namespace, kube), + Secrets: k8s.NewSecretsClient(kube), } - - return secrets, nil -} - -func createSecret(namespace string, kube kubernetes.Interface, secret *corev1.Secret) error { - _, err := kube.CoreV1().Secrets(namespace).Create(context.TODO(), secret, metav1.CreateOptions{}) - if err != nil { - return err - } - return nil -} - -func updateSecret(namespace string, kube kubernetes.Interface, secret *corev1.Secret) error { - s, err := kube.CoreV1().Secrets(namespace).Get(context.TODO(), secret.GetName(), metav1.GetOptions{}) - if err != nil { - return err - } - s.StringData = secret.StringData - if _, err = kube.CoreV1().Secrets(namespace).Update(context.TODO(), s, metav1.UpdateOptions{}); err != nil { - return err - } - return nil -} - -func deleteSecret(namespace string, kube kubernetes.Interface, secret *corev1.Secret) error { - if err := kube.CoreV1().Secrets(namespace).Delete(context.TODO(), secret.GetName(), metav1.DeleteOptions{}); err != nil { - return err - } - return nil -} - -func parseSecret(namespace string, r io.Reader) (*corev1.Secret, error) { - body, _ := ioutil.ReadAll(r) - req := faastypes.Secret{} - if err := json.Unmarshal(body, &req); err != nil { - return nil, err - } - secret := &corev1.Secret{ - Type: corev1.SecretTypeOpaque, - ObjectMeta: metav1.ObjectMeta{ - Name: req.Name, - Namespace: namespace, - Labels: map[string]string{ - secretLabel: secretLabelValue, - }, - }, - StringData: map[string]string{ - req.Name: req.Value, - }, - } - - return secret, nil + return handler.ServeHTTP } diff --git a/pkg/server/server.go b/pkg/server/server.go index 179d515ad..f60752ac5 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -35,7 +35,7 @@ func New(client clientset.Interface, endpointsInformer coreinformer.EndpointsInformer, deploymentsInformer appsinformer.DeploymentInformer) *Server { - functionNamespace := "openfaas-fn" + functionNamespace := "" if namespace, exists := os.LookupEnv("function_namespace"); exists { functionNamespace = namespace } @@ -72,7 +72,7 @@ func New(client clientset.Interface, lister := endpointsInformer.Lister() functionLookup := k8s.NewFunctionLookup(functionNamespace, lister) - deploymentLister := deploymentsInformer.Lister().Deployments(functionNamespace) + deploymentLister := deploymentsInformer.Lister() bootstrapConfig := types.FaaSConfig{ ReadTimeout: time.Duration(readTimeout) * time.Second, WriteTimeout: time.Duration(writeTimeout) * time.Second, @@ -91,7 +91,7 @@ func New(client clientset.Interface, HealthHandler: makeHealthHandler(), InfoHandler: makeInfoHandler(), SecretHandler: makeSecretHandler(functionNamespace, kube), - ListNamespaceHandler: makeListNamespaceHandler(functionNamespace), + ListNamespaceHandler: makeListNamespaceHandler(functionNamespace, kube), LogHandler: logs.NewLogHandlerFunc(faasnetesk8s.NewLogRequestor(kube, functionNamespace), bootstrapConfig.WriteTimeout), }