From 56ef622a4240ac7965e9d4fb795608c55a77cb00 Mon Sep 17 00:00:00 2001 From: Levi Blackstone Date: Thu, 16 Jan 2020 14:41:36 -0700 Subject: [PATCH] Gracefully handle unreachable k8s cluster (#946) Previously, the provider erroneously expected that the default provider pointed to a functioning Kubernetes cluster. This led to unexpected failures in cases where this wasn't true, such as the user manually setting the kubeconfig value for the stack to an invalid value. This change explicitly checks for a valid configuration, and falls back to a degraded state if this check fails. This still allows invoke logic to run during previews without requiring an active k8s cluster. --- CHANGELOG.md | 1 + pkg/await/await.go | 2 +- pkg/await/service.go | 4 +- pkg/cluster/version.go | 4 +- pkg/provider/provider.go | 139 +++++++++++++++++++++++++++------------ 5 files changed, 104 insertions(+), 46 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 29c04e5754..b18ca7759a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ ### Bug fixes +- Gracefully handle unreachable k8s cluster. (https://github.com/pulumi/pulumi-kubernetes/pull/946). - Fix deprecation notice for CSINode. (https://github.com/pulumi/pulumi-kubernetes/pull/944). ## 1.4.3 (January 8, 2020) diff --git a/pkg/await/await.go b/pkg/await/await.go index d7080e3b5e..f728fa77b3 100644 --- a/pkg/await/await.go +++ b/pkg/await/await.go @@ -432,7 +432,7 @@ func Deletion(c DeleteConfig) error { return nilIfGVKDeleted(err) } - err = deleteResource(c.Name, client, cluster.GetServerVersion(c.ClientSet.DiscoveryClientCached)) + err = deleteResource(c.Name, client, cluster.TryGetServerVersion(c.ClientSet.DiscoveryClientCached)) if err != nil { return nilIfGVKDeleted(err) } diff --git a/pkg/await/service.go b/pkg/await/service.go index d21db1993f..603510398e 100644 --- a/pkg/await/service.go +++ b/pkg/await/service.go @@ -141,7 +141,7 @@ func (sia *serviceInitAwaiter) Await() error { } defer endpointWatcher.Stop() - version := cluster.GetServerVersion(sia.config.clientSet.DiscoveryClientCached) + version := cluster.TryGetServerVersion(sia.config.clientSet.DiscoveryClientCached) timeout := metadata.TimeoutDuration(sia.config.timeout, sia.config.currentInputs, DefaultServiceTimeoutMins*60) return sia.await(serviceWatcher, endpointWatcher, time.After(timeout), make(chan struct{}), version) @@ -176,7 +176,7 @@ func (sia *serviceInitAwaiter) Read() error { endpointList = &unstructured.UnstructuredList{Items: []unstructured.Unstructured{}} } - version := cluster.GetServerVersion(sia.config.clientSet.DiscoveryClientCached) + version := cluster.TryGetServerVersion(sia.config.clientSet.DiscoveryClientCached) return sia.read(service, endpointList, version) } diff --git a/pkg/cluster/version.go b/pkg/cluster/version.go index aa59467e00..9e106ec9ba 100644 --- a/pkg/cluster/version.go +++ b/pkg/cluster/version.go @@ -54,9 +54,9 @@ func (v ServerVersion) Compare(version ServerVersion) int { return res } -// GetServerVersion attempts to retrieve the server version from k8s. +// TryGetServerVersion attempts to retrieve the server version from k8s. // Returns the configured default version in case this fails. -func GetServerVersion(cdi discovery.CachedDiscoveryInterface) ServerVersion { +func TryGetServerVersion(cdi discovery.CachedDiscoveryInterface) ServerVersion { defaultSV := ServerVersion{ Major: 1, Minor: 14, diff --git a/pkg/provider/provider.go b/pkg/provider/provider.go index d13b797e61..c163b08494 100644 --- a/pkg/provider/provider.go +++ b/pkg/provider/provider.go @@ -106,7 +106,8 @@ type kubeProvider struct { suppressDeprecationWarnings bool enableSecrets bool - config *rest.Config // Cluster config, e.g., through $KUBECONFIG file. + clusterUnreachable bool // Kubernetes cluster is unreachable + config *rest.Config // Cluster config, e.g., through $KUBECONFIG file. clientSet *clients.DynamicClientSet logClient *clients.LogClient @@ -320,14 +321,19 @@ func (k *kubeProvider) Configure(_ context.Context, req *pulumirpc.ConfigureRequ if configJSON, ok := vars["kubernetes:config:kubeconfig"]; ok { config, err := clientcmd.Load([]byte(configJSON)) if err != nil { - return nil, pkgerrors.Wrap(err, "failed to parse kubeconfig data in "+ + // Rather than erroring out here, mark the cluster as unreachable and conditionally bail out on + // operations that require a valid cluster. This will allow us to perform invoke operations + // using the default provider. + k.clusterUnreachable = true + glog.V(3).Infof(fmt.Sprintf("failed to parse kubeconfig data in "+ "`kubernetes:config:kubeconfig`; this must be a YAML literal string and not "+ - "a filename or path") - } - kubeconfig = clientcmd.NewDefaultClientConfig(*config, overrides) - configurationNamespace, _, err := kubeconfig.Namespace() - if err == nil { - k.defaultNamespace = configurationNamespace + "a filename or path - %v", err)) + } else { + kubeconfig = clientcmd.NewDefaultClientConfig(*config, overrides) + configurationNamespace, _, err := kubeconfig.Namespace() + if err == nil { + k.defaultNamespace = configurationNamespace + } } } else { // Use client-go to resolve the final configuration values for the client. Typically these @@ -343,28 +349,31 @@ func (k *kubeProvider) Configure(_ context.Context, req *pulumirpc.ConfigureRequ k.defaultNamespace = defaultNamespace } - config, err := kubeconfig.ClientConfig() - if err != nil { - return nil, fmt.Errorf("unable to load Kubernetes client configuration from kubeconfig file: %v", err) - } - k.config = config + if !k.clusterUnreachable { + config, err := kubeconfig.ClientConfig() + if err != nil { + return nil, fmt.Errorf("unable to load Kubernetes client configuration from kubeconfig file: %v", err) + } + k.config = config - cs, err := clients.NewDynamicClientSet(k.config) - if err != nil { - return nil, err - } - k.clientSet = cs + cs, err := clients.NewDynamicClientSet(k.config) + if err != nil { + return nil, err + } + k.clientSet = cs - lc, err := clients.NewLogClient(k.config) - if err != nil { - return nil, err - } - k.logClient = lc + lc, err := clients.NewLogClient(k.config) + if err != nil { + return nil, err + } + k.logClient = lc - k.k8sVersion = cluster.GetServerVersion(cs.DiscoveryClientCached) + k.k8sVersion = cluster.TryGetServerVersion(cs.DiscoveryClientCached) - if _, err = k.getResources(); err != nil { - return nil, fmt.Errorf("unable to load schema information from the API server: %v", err) + if _, err = k.getResources(); err != nil { + k.clusterUnreachable = true + glog.V(3).Infof("unable to load schema information from the API server: %v", err) + } } return &pulumirpc.ConfigureResponse{ @@ -376,9 +385,13 @@ func (k *kubeProvider) Configure(_ context.Context, req *pulumirpc.ConfigureRequ func (k *kubeProvider) Invoke(ctx context.Context, req *pulumirpc.InvokeRequest) (*pulumirpc.InvokeResponse, error) { + // Important: Some invoke logic is intended to run during preview, and the Kubernetes provider + // inputs may not have resolved yet. Any invoke logic that depends on an active cluster must check + // k.clusterUnreachable and handle that condition appropriately. + // Always fail. tok := req.GetTok() - return nil, fmt.Errorf("Unknown Invoke type '%s'", tok) + return nil, fmt.Errorf("unknown Invoke type '%s'", tok) } // StreamInvoke dynamically executes a built-in function in the provider. The result is streamed @@ -386,6 +399,10 @@ func (k *kubeProvider) Invoke(ctx context.Context, func (k *kubeProvider) StreamInvoke( req *pulumirpc.InvokeRequest, server pulumirpc.ResourceProvider_StreamInvokeServer) error { + // Important: Some invoke logic is intended to run during preview, and the Kubernetes provider + // inputs may not have resolved yet. Any invoke logic that depends on an active cluster must check + // k.clusterUnreachable and handle that condition appropriately. + // Unmarshal arguments. tok := req.GetTok() label := fmt.Sprintf("%s.StreamInvoke(%s)", k.label(), tok) @@ -414,13 +431,17 @@ func (k *kubeProvider) StreamInvoke( // expected to never terminate, and users of the various SDKs need a way to tell the // provider to stop streaming and reclaim the resources associated with the stream. // - // Still, we implement this cancellation also for `list`, primarily for coompleteness. We'd + // Still, we implement this cancellation also for `list`, primarily for completeness. We'd // like to avoid an unpleasant and non-actionable error that would appear on a `Send` on a // client that is no longer accepting requests. This also helps to guard against the // possibility that some dark corner of gRPC signals cancellation by accident, e.g., during // shutdown. // + if k.clusterUnreachable { + return fmt.Errorf("configured Kubernetes cluster is unreachable") + } + namespace := "" if args["namespace"].HasValue() { namespace = args["namespace"].StringValue() @@ -511,6 +532,10 @@ func (k *kubeProvider) StreamInvoke( // Set up resource watcher. // + if k.clusterUnreachable { + return fmt.Errorf("configured Kubernetes cluster is unreachable") + } + namespace := "" if args["namespace"].HasValue() { namespace = args["namespace"].StringValue() @@ -591,6 +616,10 @@ func (k *kubeProvider) StreamInvoke( // Set up log stream for Pod. // + if k.clusterUnreachable { + return fmt.Errorf("configured Kubernetes cluster is unreachable") + } + namespace := "default" if args["namespace"].HasValue() { namespace = args["namespace"].StringValue() @@ -683,7 +712,7 @@ func (k *kubeProvider) StreamInvoke( } } default: - return fmt.Errorf("Unknown Invoke type '%s'", tok) + return fmt.Errorf("unknown Invoke type '%s'", tok) } } @@ -813,11 +842,14 @@ func (k *kubeProvider) Check(ctx context.Context, req *pulumirpc.CheckRequest) ( return nil, err } - if removed, version := kinds.RemovedApiVersion(gvk, k.k8sVersion); removed { - return nil, &kinds.RemovedApiError{GVK: gvk, Version: version} - } - if !k.suppressDeprecationWarnings && kinds.DeprecatedApiVersion(gvk) { - _ = k.host.Log(ctx, diag.Warning, urn, gen.ApiVersionComment(gvk)) + // Skip the API version check if the cluster is unreachable. + if !k.clusterUnreachable { + if removed, version := kinds.RemovedApiVersion(gvk, k.k8sVersion); removed { + return nil, &kinds.RemovedApiError{GVK: gvk, Version: version} + } + if !k.suppressDeprecationWarnings && kinds.DeprecatedApiVersion(gvk) { + _ = k.host.Log(ctx, diag.Warning, urn, gen.ApiVersionComment(gvk)) + } } // If a default namespace is set on the provider for this resource, check if the resource has Namespaced @@ -842,7 +874,7 @@ func (k *kubeProvider) Check(ctx context.Context, req *pulumirpc.CheckRequest) ( // HACK: Do not validate against OpenAPI spec if there is a computed value. The OpenAPI spec // does not know how to deal with the placeholder values for computed values. - if !hasComputedValue(newInputs) { + if !hasComputedValue(newInputs) && !k.clusterUnreachable { resources, err := k.getResources() if err != nil { return nil, pkgerrors.Wrapf(err, "Failed to fetch OpenAPI schema from the API server") @@ -972,11 +1004,14 @@ func (k *kubeProvider) Diff( oldInputs.SetGroupVersionKind(gvk) } - supportsDryRun, err := openapi.SupportsDryRun(k.clientSet.DiscoveryClientCached, gvk) - if err != nil { - return nil, pkgerrors.Wrapf(err, - "Failed to check for changes in resource %s because of an error communicating with the API server", - fqObjName(newInputs)) + supportsDryRun := false + if !k.clusterUnreachable { + supportsDryRun, err = openapi.SupportsDryRun(k.clientSet.DiscoveryClientCached, gvk) + if err != nil { + return nil, pkgerrors.Wrapf(err, + "Failed to check for changes in resource %s because of an error communicating with the API server", + fqObjName(newInputs)) + } } var patch []byte @@ -1108,6 +1143,11 @@ func (k *kubeProvider) Create( label := fmt.Sprintf("%s.Create(%s)", k.label(), urn) glog.V(9).Infof("%s executing", label) + // Create requires a connection to a k8s cluster, so bail out immediately if it is unreachable. + if k.clusterUnreachable { + return nil, fmt.Errorf("configured Kubernetes cluster is unreachable") + } + // Parse inputs newResInputs, err := plugin.UnmarshalProperties(req.GetProperties(), plugin.MarshalOptions{ Label: fmt.Sprintf("%s.properties", label), @@ -1231,6 +1271,12 @@ func (k *kubeProvider) Read(ctx context.Context, req *pulumirpc.ReadRequest) (*p label := fmt.Sprintf("%s.Read(%s)", k.label(), urn) glog.V(9).Infof("%s executing", label) + // If the cluster is unreachable, consider the resource deleted and inform the user. + if k.clusterUnreachable { + _ = k.host.Log(ctx, diag.Warning, urn, fmt.Sprintf("configured Kubernetes cluster is unreachable")) + return deleteResponse, nil + } + // Obtain new properties, create a Kubernetes `unstructured.Unstructured` that we can pass to the // validation routines. oldState, err := plugin.UnmarshalProperties(req.GetProperties(), plugin.MarshalOptions{ @@ -1431,11 +1477,15 @@ func (k *kubeProvider) Update( // discovery client is completely dynamic.) // - [ ] Support server-side apply, when it comes out. // - urn := resource.URN(req.GetUrn()) label := fmt.Sprintf("%s.Update(%s)", k.label(), urn) glog.V(9).Infof("%s executing", label) + // Update requires a connection to a k8s cluster, so bail out immediately if it is unreachable. + if k.clusterUnreachable { + return nil, fmt.Errorf("configured Kubernetes cluster is unreachable") + } + // Obtain old properties, create a Kubernetes `unstructured.Unstructured`. oldState, err := plugin.UnmarshalProperties(req.GetOlds(), plugin.MarshalOptions{ Label: fmt.Sprintf("%s.olds", label), KeepUnknowns: true, SkipNulls: true, KeepSecrets: true, @@ -1546,10 +1596,17 @@ func (k *kubeProvider) Update( func (k *kubeProvider) Delete( ctx context.Context, req *pulumirpc.DeleteRequest, ) (*pbempty.Empty, error) { + urn := resource.URN(req.GetUrn()) label := fmt.Sprintf("%s.Delete(%s)", k.label(), urn) glog.V(9).Infof("%s executing", label) + // If the cluster is unreachable, consider the resource deleted and inform the user. + if k.clusterUnreachable { + _ = k.host.Log(ctx, diag.Warning, urn, fmt.Sprintf("configured Kubernetes cluster is unreachable")) + return &pbempty.Empty{}, nil + } + // TODO(hausdorff): Propagate other options, like grace period through flags. // Obtain new properties, create a Kubernetes `unstructured.Unstructured`.