@@ -22,6 +22,7 @@ import (
22
22
"crypto/rand"
23
23
"crypto/x509"
24
24
"encoding/pem"
25
+ "errors"
25
26
"fmt"
26
27
"reflect"
27
28
"sort"
@@ -34,7 +35,7 @@ import (
34
35
batchv1 "k8s.io/api/batch/v1"
35
36
corev1 "k8s.io/api/core/v1"
36
37
"k8s.io/apimachinery/pkg/api/equality"
37
- "k8s.io/apimachinery/pkg/api/errors"
38
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
38
39
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
39
40
"k8s.io/apimachinery/pkg/labels"
40
41
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -359,7 +360,7 @@ func NewMPIJobControllerWithClock(
359
360
// Pipe to default handler first, which just logs the error
360
361
cache .DefaultWatchErrorHandler (r , err )
361
362
362
- if errors .IsUnauthorized (err ) || errors .IsForbidden (err ) {
363
+ if apierrors .IsUnauthorized (err ) || apierrors .IsForbidden (err ) {
363
364
klog .Fatalf ("Unable to sync cache for informer %s: %s. Requesting controller to exit." , name , err )
364
365
}
365
366
})
@@ -564,7 +565,7 @@ func (c *MPIJobController) syncHandler(key string) error {
564
565
sharedJob , err := c .mpiJobLister .MPIJobs (namespace ).Get (name )
565
566
if err != nil {
566
567
// The MPIJob may no longer exist, in which case we stop processing.
567
- if errors .IsNotFound (err ) {
568
+ if apierrors .IsNotFound (err ) {
568
569
klog .V (4 ).Infof ("MPIJob has been deleted: %v" , key )
569
570
return nil
570
571
}
@@ -714,7 +715,7 @@ func cleanUpWorkerPods(mpiJob *kubeflow.MPIJob, c *MPIJobController) error {
714
715
// getLauncherJob gets the launcher Job controlled by this MPIJob.
715
716
func (c * MPIJobController ) getLauncherJob (mpiJob * kubeflow.MPIJob ) (* batchv1.Job , error ) {
716
717
launcher , err := c .jobLister .Jobs (mpiJob .Namespace ).Get (mpiJob .Name + launcherSuffix )
717
- if errors .IsNotFound (err ) {
718
+ if apierrors .IsNotFound (err ) {
718
719
return nil , nil
719
720
}
720
721
if err != nil {
@@ -729,7 +730,7 @@ func (c *MPIJobController) getLauncherJob(mpiJob *kubeflow.MPIJob) (*batchv1.Job
729
730
if ! metav1 .IsControlledBy (launcher , mpiJob ) {
730
731
msg := fmt .Sprintf (MessageResourceExists , launcher .Name , launcher .Kind )
731
732
c .recorder .Event (mpiJob , corev1 .EventTypeWarning , ErrResourceExists , msg )
732
- return launcher , fmt . Errorf (msg )
733
+ return launcher , errors . New (msg )
733
734
}
734
735
735
736
return launcher , nil
@@ -740,7 +741,7 @@ func (c *MPIJobController) getOrCreatePodGroups(mpiJob *kubeflow.MPIJob) (metav1
740
741
newPodGroup := c .PodGroupCtrl .newPodGroup (mpiJob )
741
742
podGroup , err := c .PodGroupCtrl .getPodGroup (newPodGroup .GetNamespace (), newPodGroup .GetName ())
742
743
// If the PodGroup doesn't exist, we'll create it.
743
- if errors .IsNotFound (err ) {
744
+ if apierrors .IsNotFound (err ) {
744
745
return c .PodGroupCtrl .createPodGroup (context .TODO (), newPodGroup )
745
746
}
746
747
// If an error occurs during Get/Create, we'll requeue the item so we
@@ -754,7 +755,7 @@ func (c *MPIJobController) getOrCreatePodGroups(mpiJob *kubeflow.MPIJob) (metav1
754
755
if ! metav1 .IsControlledBy (podGroup , mpiJob ) {
755
756
msg := fmt .Sprintf (MessageResourceExists , podGroup .GetName (), "PodGroup" )
756
757
c .recorder .Event (mpiJob , corev1 .EventTypeWarning , ErrResourceExists , msg )
757
- return nil , fmt . Errorf (msg )
758
+ return nil , errors . New (msg )
758
759
}
759
760
760
761
if ! c .PodGroupCtrl .pgSpecsAreEqual (podGroup , newPodGroup ) {
@@ -767,7 +768,7 @@ func (c *MPIJobController) getOrCreatePodGroups(mpiJob *kubeflow.MPIJob) (metav1
767
768
func (c * MPIJobController ) deletePodGroups (mpiJob * kubeflow.MPIJob ) error {
768
769
podGroup , err := c .PodGroupCtrl .getPodGroup (mpiJob .Namespace , mpiJob .Name )
769
770
if err != nil {
770
- if errors .IsNotFound (err ) {
771
+ if apierrors .IsNotFound (err ) {
771
772
return nil
772
773
}
773
774
return err
@@ -778,7 +779,7 @@ func (c *MPIJobController) deletePodGroups(mpiJob *kubeflow.MPIJob) error {
778
779
if ! metav1 .IsControlledBy (podGroup , mpiJob ) {
779
780
msg := fmt .Sprintf (MessageResourceExists , podGroup .GetName (), "PodGroup" )
780
781
c .recorder .Event (mpiJob , corev1 .EventTypeWarning , ErrResourceExists , msg )
781
- return fmt . Errorf (msg )
782
+ return errors . New (msg )
782
783
}
783
784
784
785
// If the PodGroup exist, we'll delete it.
@@ -839,7 +840,7 @@ func (c *MPIJobController) getOrCreateConfigMap(mpiJob *kubeflow.MPIJob) (*corev
839
840
840
841
cm , err := c .configMapLister .ConfigMaps (mpiJob .Namespace ).Get (mpiJob .Name + configSuffix )
841
842
// If the ConfigMap doesn't exist, we'll create it.
842
- if errors .IsNotFound (err ) {
843
+ if apierrors .IsNotFound (err ) {
843
844
return c .kubeClient .CoreV1 ().ConfigMaps (mpiJob .Namespace ).Create (context .TODO (), newCM , metav1.CreateOptions {})
844
845
}
845
846
if err != nil {
@@ -851,7 +852,7 @@ func (c *MPIJobController) getOrCreateConfigMap(mpiJob *kubeflow.MPIJob) (*corev
851
852
if ! metav1 .IsControlledBy (cm , mpiJob ) {
852
853
msg := fmt .Sprintf (MessageResourceExists , cm .Name , cm .Kind )
853
854
c .recorder .Event (mpiJob , corev1 .EventTypeWarning , ErrResourceExists , msg )
854
- return nil , fmt . Errorf (msg )
855
+ return nil , errors . New (msg )
855
856
}
856
857
857
858
// If the ConfigMap is changed, update it
@@ -869,7 +870,7 @@ func (c *MPIJobController) getOrCreateConfigMap(mpiJob *kubeflow.MPIJob) (*corev
869
870
870
871
func (c * MPIJobController ) getOrCreateService (job * kubeflow.MPIJob , newSvc * corev1.Service ) (* corev1.Service , error ) {
871
872
svc , err := c .serviceLister .Services (job .Namespace ).Get (newSvc .Name )
872
- if errors .IsNotFound (err ) {
873
+ if apierrors .IsNotFound (err ) {
873
874
return c .kubeClient .CoreV1 ().Services (job .Namespace ).Create (context .TODO (), newSvc , metav1.CreateOptions {})
874
875
}
875
876
if err != nil {
@@ -878,7 +879,7 @@ func (c *MPIJobController) getOrCreateService(job *kubeflow.MPIJob, newSvc *core
878
879
if ! metav1 .IsControlledBy (svc , job ) {
879
880
msg := fmt .Sprintf (MessageResourceExists , svc .Name , svc .Kind )
880
881
c .recorder .Event (job , corev1 .EventTypeWarning , ErrResourceExists , msg )
881
- return nil , fmt . Errorf (msg )
882
+ return nil , errors . New (msg )
882
883
}
883
884
884
885
// If the Service selector is changed, update it.
@@ -895,7 +896,7 @@ func (c *MPIJobController) getOrCreateService(job *kubeflow.MPIJob, newSvc *core
895
896
// or create one if it doesn't exist.
896
897
func (c * MPIJobController ) getOrCreateSSHAuthSecret (job * kubeflow.MPIJob ) (* corev1.Secret , error ) {
897
898
secret , err := c .secretLister .Secrets (job .Namespace ).Get (job .Name + sshAuthSecretSuffix )
898
- if errors .IsNotFound (err ) {
899
+ if apierrors .IsNotFound (err ) {
899
900
secret , err := newSSHAuthSecret (job )
900
901
if err != nil {
901
902
return nil , err
@@ -908,7 +909,7 @@ func (c *MPIJobController) getOrCreateSSHAuthSecret(job *kubeflow.MPIJob) (*core
908
909
if ! metav1 .IsControlledBy (secret , job ) {
909
910
msg := fmt .Sprintf (MessageResourceExists , secret .Name , secret .Kind )
910
911
c .recorder .Event (job , corev1 .EventTypeWarning , ErrResourceExists , msg )
911
- return nil , fmt . Errorf (msg )
912
+ return nil , errors . New (msg )
912
913
}
913
914
newSecret , err := newSSHAuthSecret (job )
914
915
if err != nil {
@@ -973,7 +974,7 @@ func (c *MPIJobController) getOrCreateWorker(mpiJob *kubeflow.MPIJob) ([]*corev1
973
974
pod , err := c .podLister .Pods (mpiJob .Namespace ).Get (workerName (mpiJob , i ))
974
975
975
976
// If the worker Pod doesn't exist, we'll create it.
976
- if errors .IsNotFound (err ) {
977
+ if apierrors .IsNotFound (err ) {
977
978
worker := c .newWorker (mpiJob , i )
978
979
pod , err = c .kubeClient .CoreV1 ().Pods (mpiJob .Namespace ).Create (context .TODO (), worker , metav1.CreateOptions {})
979
980
}
@@ -989,7 +990,7 @@ func (c *MPIJobController) getOrCreateWorker(mpiJob *kubeflow.MPIJob) ([]*corev1
989
990
if pod != nil && ! metav1 .IsControlledBy (pod , mpiJob ) {
990
991
msg := fmt .Sprintf (MessageResourceExists , pod .Name , pod .Kind )
991
992
c .recorder .Event (mpiJob , corev1 .EventTypeWarning , ErrResourceExists , msg )
992
- return nil , fmt . Errorf (msg )
993
+ return nil , errors . New (msg )
993
994
}
994
995
workerPods = append (workerPods , pod )
995
996
}
@@ -1020,15 +1021,15 @@ func (c *MPIJobController) deleteWorkerPods(mpiJob *kubeflow.MPIJob) error {
1020
1021
pod , err := c .podLister .Pods (mpiJob .Namespace ).Get (name )
1021
1022
1022
1023
// If the worker Pod doesn't exist, no need to remove it.
1023
- if errors .IsNotFound (err ) {
1024
+ if apierrors .IsNotFound (err ) {
1024
1025
continue
1025
1026
}
1026
1027
// If the worker is not controlled by this MPIJob resource, we should log
1027
1028
// a warning to the event recorder and return.
1028
1029
if pod != nil && ! metav1 .IsControlledBy (pod , mpiJob ) {
1029
1030
msg := fmt .Sprintf (MessageResourceExists , pod .Name , pod .Kind )
1030
1031
c .recorder .Event (mpiJob , corev1 .EventTypeWarning , ErrResourceExists , msg )
1031
- return fmt . Errorf (msg )
1032
+ return errors . New (msg )
1032
1033
}
1033
1034
// If the worker pod is not running and cleanupPolicy is
1034
1035
// set to CleanPodPolicyRunning, keep the pod.
@@ -1039,7 +1040,7 @@ func (c *MPIJobController) deleteWorkerPods(mpiJob *kubeflow.MPIJob) error {
1039
1040
continue
1040
1041
}
1041
1042
err = c .kubeClient .CoreV1 ().Pods (mpiJob .Namespace ).Delete (context .TODO (), name , metav1.DeleteOptions {})
1042
- if err != nil && ! errors .IsNotFound (err ) {
1043
+ if err != nil && ! apierrors .IsNotFound (err ) {
1043
1044
klog .Errorf ("Failed to delete pod[%s/%s]: %v" , mpiJob .Namespace , name , err )
1044
1045
return err
1045
1046
}
0 commit comments