diff --git a/CHANGELOG.md b/CHANGELOG.md index 29c04e5754..b18ca7759a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ ### Bug fixes +- Gracefully handle unreachable k8s cluster. (https://github.com/pulumi/pulumi-kubernetes/pull/946). - Fix deprecation notice for CSINode. (https://github.com/pulumi/pulumi-kubernetes/pull/944). ## 1.4.3 (January 8, 2020) diff --git a/pkg/await/await.go b/pkg/await/await.go index d7080e3b5e..f728fa77b3 100644 --- a/pkg/await/await.go +++ b/pkg/await/await.go @@ -432,7 +432,7 @@ func Deletion(c DeleteConfig) error { return nilIfGVKDeleted(err) } - err = deleteResource(c.Name, client, cluster.GetServerVersion(c.ClientSet.DiscoveryClientCached)) + err = deleteResource(c.Name, client, cluster.TryGetServerVersion(c.ClientSet.DiscoveryClientCached)) if err != nil { return nilIfGVKDeleted(err) } diff --git a/pkg/await/service.go b/pkg/await/service.go index d21db1993f..603510398e 100644 --- a/pkg/await/service.go +++ b/pkg/await/service.go @@ -141,7 +141,7 @@ func (sia *serviceInitAwaiter) Await() error { } defer endpointWatcher.Stop() - version := cluster.GetServerVersion(sia.config.clientSet.DiscoveryClientCached) + version := cluster.TryGetServerVersion(sia.config.clientSet.DiscoveryClientCached) timeout := metadata.TimeoutDuration(sia.config.timeout, sia.config.currentInputs, DefaultServiceTimeoutMins*60) return sia.await(serviceWatcher, endpointWatcher, time.After(timeout), make(chan struct{}), version) @@ -176,7 +176,7 @@ func (sia *serviceInitAwaiter) Read() error { endpointList = &unstructured.UnstructuredList{Items: []unstructured.Unstructured{}} } - version := cluster.GetServerVersion(sia.config.clientSet.DiscoveryClientCached) + version := cluster.TryGetServerVersion(sia.config.clientSet.DiscoveryClientCached) return sia.read(service, endpointList, version) } diff --git a/pkg/cluster/version.go b/pkg/cluster/version.go index aa59467e00..9e106ec9ba 100644 --- a/pkg/cluster/version.go +++ b/pkg/cluster/version.go @@ -54,9 +54,9 @@ func (v ServerVersion) Compare(version ServerVersion) int { return res } -// GetServerVersion attempts to retrieve the server version from k8s. +// TryGetServerVersion attempts to retrieve the server version from k8s. // Returns the configured default version in case this fails. -func GetServerVersion(cdi discovery.CachedDiscoveryInterface) ServerVersion { +func TryGetServerVersion(cdi discovery.CachedDiscoveryInterface) ServerVersion { defaultSV := ServerVersion{ Major: 1, Minor: 14, diff --git a/pkg/provider/provider.go b/pkg/provider/provider.go index d13b797e61..c163b08494 100644 --- a/pkg/provider/provider.go +++ b/pkg/provider/provider.go @@ -106,7 +106,8 @@ type kubeProvider struct { suppressDeprecationWarnings bool enableSecrets bool - config *rest.Config // Cluster config, e.g., through $KUBECONFIG file. + clusterUnreachable bool // Kubernetes cluster is unreachable + config *rest.Config // Cluster config, e.g., through $KUBECONFIG file. clientSet *clients.DynamicClientSet logClient *clients.LogClient @@ -320,14 +321,19 @@ func (k *kubeProvider) Configure(_ context.Context, req *pulumirpc.ConfigureRequ if configJSON, ok := vars["kubernetes:config:kubeconfig"]; ok { config, err := clientcmd.Load([]byte(configJSON)) if err != nil { - return nil, pkgerrors.Wrap(err, "failed to parse kubeconfig data in "+ + // Rather than erroring out here, mark the cluster as unreachable and conditionally bail out on + // operations that require a valid cluster. This will allow us to perform invoke operations + // using the default provider. + k.clusterUnreachable = true + glog.V(3).Infof(fmt.Sprintf("failed to parse kubeconfig data in "+ "`kubernetes:config:kubeconfig`; this must be a YAML literal string and not "+ - "a filename or path") - } - kubeconfig = clientcmd.NewDefaultClientConfig(*config, overrides) - configurationNamespace, _, err := kubeconfig.Namespace() - if err == nil { - k.defaultNamespace = configurationNamespace + "a filename or path - %v", err)) + } else { + kubeconfig = clientcmd.NewDefaultClientConfig(*config, overrides) + configurationNamespace, _, err := kubeconfig.Namespace() + if err == nil { + k.defaultNamespace = configurationNamespace + } } } else { // Use client-go to resolve the final configuration values for the client. Typically these @@ -343,28 +349,31 @@ func (k *kubeProvider) Configure(_ context.Context, req *pulumirpc.ConfigureRequ k.defaultNamespace = defaultNamespace } - config, err := kubeconfig.ClientConfig() - if err != nil { - return nil, fmt.Errorf("unable to load Kubernetes client configuration from kubeconfig file: %v", err) - } - k.config = config + if !k.clusterUnreachable { + config, err := kubeconfig.ClientConfig() + if err != nil { + return nil, fmt.Errorf("unable to load Kubernetes client configuration from kubeconfig file: %v", err) + } + k.config = config - cs, err := clients.NewDynamicClientSet(k.config) - if err != nil { - return nil, err - } - k.clientSet = cs + cs, err := clients.NewDynamicClientSet(k.config) + if err != nil { + return nil, err + } + k.clientSet = cs - lc, err := clients.NewLogClient(k.config) - if err != nil { - return nil, err - } - k.logClient = lc + lc, err := clients.NewLogClient(k.config) + if err != nil { + return nil, err + } + k.logClient = lc - k.k8sVersion = cluster.GetServerVersion(cs.DiscoveryClientCached) + k.k8sVersion = cluster.TryGetServerVersion(cs.DiscoveryClientCached) - if _, err = k.getResources(); err != nil { - return nil, fmt.Errorf("unable to load schema information from the API server: %v", err) + if _, err = k.getResources(); err != nil { + k.clusterUnreachable = true + glog.V(3).Infof("unable to load schema information from the API server: %v", err) + } } return &pulumirpc.ConfigureResponse{ @@ -376,9 +385,13 @@ func (k *kubeProvider) Configure(_ context.Context, req *pulumirpc.ConfigureRequ func (k *kubeProvider) Invoke(ctx context.Context, req *pulumirpc.InvokeRequest) (*pulumirpc.InvokeResponse, error) { + // Important: Some invoke logic is intended to run during preview, and the Kubernetes provider + // inputs may not have resolved yet. Any invoke logic that depends on an active cluster must check + // k.clusterUnreachable and handle that condition appropriately. + // Always fail. tok := req.GetTok() - return nil, fmt.Errorf("Unknown Invoke type '%s'", tok) + return nil, fmt.Errorf("unknown Invoke type '%s'", tok) } // StreamInvoke dynamically executes a built-in function in the provider. The result is streamed @@ -386,6 +399,10 @@ func (k *kubeProvider) Invoke(ctx context.Context, func (k *kubeProvider) StreamInvoke( req *pulumirpc.InvokeRequest, server pulumirpc.ResourceProvider_StreamInvokeServer) error { + // Important: Some invoke logic is intended to run during preview, and the Kubernetes provider + // inputs may not have resolved yet. Any invoke logic that depends on an active cluster must check + // k.clusterUnreachable and handle that condition appropriately. + // Unmarshal arguments. tok := req.GetTok() label := fmt.Sprintf("%s.StreamInvoke(%s)", k.label(), tok) @@ -414,13 +431,17 @@ func (k *kubeProvider) StreamInvoke( // expected to never terminate, and users of the various SDKs need a way to tell the // provider to stop streaming and reclaim the resources associated with the stream. // - // Still, we implement this cancellation also for `list`, primarily for coompleteness. We'd + // Still, we implement this cancellation also for `list`, primarily for completeness. We'd // like to avoid an unpleasant and non-actionable error that would appear on a `Send` on a // client that is no longer accepting requests. This also helps to guard against the // possibility that some dark corner of gRPC signals cancellation by accident, e.g., during // shutdown. // + if k.clusterUnreachable { + return fmt.Errorf("configured Kubernetes cluster is unreachable") + } + namespace := "" if args["namespace"].HasValue() { namespace = args["namespace"].StringValue() @@ -511,6 +532,10 @@ func (k *kubeProvider) StreamInvoke( // Set up resource watcher. // + if k.clusterUnreachable { + return fmt.Errorf("configured Kubernetes cluster is unreachable") + } + namespace := "" if args["namespace"].HasValue() { namespace = args["namespace"].StringValue() @@ -591,6 +616,10 @@ func (k *kubeProvider) StreamInvoke( // Set up log stream for Pod. // + if k.clusterUnreachable { + return fmt.Errorf("configured Kubernetes cluster is unreachable") + } + namespace := "default" if args["namespace"].HasValue() { namespace = args["namespace"].StringValue() @@ -683,7 +712,7 @@ func (k *kubeProvider) StreamInvoke( } } default: - return fmt.Errorf("Unknown Invoke type '%s'", tok) + return fmt.Errorf("unknown Invoke type '%s'", tok) } } @@ -813,11 +842,14 @@ func (k *kubeProvider) Check(ctx context.Context, req *pulumirpc.CheckRequest) ( return nil, err } - if removed, version := kinds.RemovedApiVersion(gvk, k.k8sVersion); removed { - return nil, &kinds.RemovedApiError{GVK: gvk, Version: version} - } - if !k.suppressDeprecationWarnings && kinds.DeprecatedApiVersion(gvk) { - _ = k.host.Log(ctx, diag.Warning, urn, gen.ApiVersionComment(gvk)) + // Skip the API version check if the cluster is unreachable. + if !k.clusterUnreachable { + if removed, version := kinds.RemovedApiVersion(gvk, k.k8sVersion); removed { + return nil, &kinds.RemovedApiError{GVK: gvk, Version: version} + } + if !k.suppressDeprecationWarnings && kinds.DeprecatedApiVersion(gvk) { + _ = k.host.Log(ctx, diag.Warning, urn, gen.ApiVersionComment(gvk)) + } } // If a default namespace is set on the provider for this resource, check if the resource has Namespaced @@ -842,7 +874,7 @@ func (k *kubeProvider) Check(ctx context.Context, req *pulumirpc.CheckRequest) ( // HACK: Do not validate against OpenAPI spec if there is a computed value. The OpenAPI spec // does not know how to deal with the placeholder values for computed values. - if !hasComputedValue(newInputs) { + if !hasComputedValue(newInputs) && !k.clusterUnreachable { resources, err := k.getResources() if err != nil { return nil, pkgerrors.Wrapf(err, "Failed to fetch OpenAPI schema from the API server") @@ -972,11 +1004,14 @@ func (k *kubeProvider) Diff( oldInputs.SetGroupVersionKind(gvk) } - supportsDryRun, err := openapi.SupportsDryRun(k.clientSet.DiscoveryClientCached, gvk) - if err != nil { - return nil, pkgerrors.Wrapf(err, - "Failed to check for changes in resource %s because of an error communicating with the API server", - fqObjName(newInputs)) + supportsDryRun := false + if !k.clusterUnreachable { + supportsDryRun, err = openapi.SupportsDryRun(k.clientSet.DiscoveryClientCached, gvk) + if err != nil { + return nil, pkgerrors.Wrapf(err, + "Failed to check for changes in resource %s because of an error communicating with the API server", + fqObjName(newInputs)) + } } var patch []byte @@ -1108,6 +1143,11 @@ func (k *kubeProvider) Create( label := fmt.Sprintf("%s.Create(%s)", k.label(), urn) glog.V(9).Infof("%s executing", label) + // Create requires a connection to a k8s cluster, so bail out immediately if it is unreachable. + if k.clusterUnreachable { + return nil, fmt.Errorf("configured Kubernetes cluster is unreachable") + } + // Parse inputs newResInputs, err := plugin.UnmarshalProperties(req.GetProperties(), plugin.MarshalOptions{ Label: fmt.Sprintf("%s.properties", label), @@ -1231,6 +1271,12 @@ func (k *kubeProvider) Read(ctx context.Context, req *pulumirpc.ReadRequest) (*p label := fmt.Sprintf("%s.Read(%s)", k.label(), urn) glog.V(9).Infof("%s executing", label) + // If the cluster is unreachable, consider the resource deleted and inform the user. + if k.clusterUnreachable { + _ = k.host.Log(ctx, diag.Warning, urn, fmt.Sprintf("configured Kubernetes cluster is unreachable")) + return deleteResponse, nil + } + // Obtain new properties, create a Kubernetes `unstructured.Unstructured` that we can pass to the // validation routines. oldState, err := plugin.UnmarshalProperties(req.GetProperties(), plugin.MarshalOptions{ @@ -1431,11 +1477,15 @@ func (k *kubeProvider) Update( // discovery client is completely dynamic.) // - [ ] Support server-side apply, when it comes out. // - urn := resource.URN(req.GetUrn()) label := fmt.Sprintf("%s.Update(%s)", k.label(), urn) glog.V(9).Infof("%s executing", label) + // Update requires a connection to a k8s cluster, so bail out immediately if it is unreachable. + if k.clusterUnreachable { + return nil, fmt.Errorf("configured Kubernetes cluster is unreachable") + } + // Obtain old properties, create a Kubernetes `unstructured.Unstructured`. oldState, err := plugin.UnmarshalProperties(req.GetOlds(), plugin.MarshalOptions{ Label: fmt.Sprintf("%s.olds", label), KeepUnknowns: true, SkipNulls: true, KeepSecrets: true, @@ -1546,10 +1596,17 @@ func (k *kubeProvider) Update( func (k *kubeProvider) Delete( ctx context.Context, req *pulumirpc.DeleteRequest, ) (*pbempty.Empty, error) { + urn := resource.URN(req.GetUrn()) label := fmt.Sprintf("%s.Delete(%s)", k.label(), urn) glog.V(9).Infof("%s executing", label) + // If the cluster is unreachable, consider the resource deleted and inform the user. + if k.clusterUnreachable { + _ = k.host.Log(ctx, diag.Warning, urn, fmt.Sprintf("configured Kubernetes cluster is unreachable")) + return &pbempty.Empty{}, nil + } + // TODO(hausdorff): Propagate other options, like grace period through flags. // Obtain new properties, create a Kubernetes `unstructured.Unstructured`.