diff --git a/tests/kfto/core/support.go b/tests/kfto/core/support.go index 46304c93..63636917 100644 --- a/tests/kfto/core/support.go +++ b/tests/kfto/core/support.go @@ -50,6 +50,14 @@ func PyTorchJob(t Test, namespace, name string) func(g Gomega) *kftov1.PyTorchJo } } +func PyTorchJobs(t Test, namespace string) func(g Gomega) []kftov1.PyTorchJob { + return func(g Gomega) []kftov1.PyTorchJob { + jobs, err := t.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).List(t.Ctx(), metav1.ListOptions{}) + g.Expect(err).NotTo(HaveOccurred()) + return jobs.Items + } +} + func PyTorchJobConditionRunning(job *kftov1.PyTorchJob) corev1.ConditionStatus { return PyTorchJobCondition(job, kftov1.JobRunning) } diff --git a/tests/kfto/upgrade/kfto_kueue_sft_upgrade_training_test.go b/tests/kfto/upgrade/kfto_kueue_sft_upgrade_training_test.go index 98642d26..1bdd1177 100644 --- a/tests/kfto/upgrade/kfto_kueue_sft_upgrade_training_test.go +++ b/tests/kfto/upgrade/kfto_kueue_sft_upgrade_training_test.go @@ -26,6 +26,7 @@ import ( kueueacv1beta1 "sigs.k8s.io/kueue/client-go/applyconfiguration/kueue/v1beta1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -36,20 +37,14 @@ var ( namespaceName = "test-kfto-upgrade" resourceFlavorName = "rf-upgrade" clusterQueueName = "cq-upgrade" + localQueueName = "lq-upgrade" pyTorchJobName = "pytorch-upgrade" ) func TestSetupPytorchjob(t *testing.T) { test := With(t) - // Create a namespace - namespace := &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: namespaceName, - }, - } - _, err := test.Client().Core().CoreV1().Namespaces().Create(test.Ctx(), namespace, metav1.CreateOptions{}) - test.Expect(err).NotTo(HaveOccurred()) + createOrGetUpgradeTestNamespace(test, namespaceName) // Create a ConfigMap with training dataset and configuration configData := map[string][]byte{ @@ -59,50 +54,43 @@ func TestSetupPytorchjob(t *testing.T) { config := CreateConfigMap(test, namespaceName, configData) // Create Kueue resources - resourceFlavor := &kueuev1beta1.ResourceFlavor{ - ObjectMeta: metav1.ObjectMeta{ - Name: resourceFlavorName, - }, - } - resourceFlavor, err = test.Client().Kueue().KueueV1beta1().ResourceFlavors().Create(test.Ctx(), resourceFlavor, metav1.CreateOptions{}) + resourceFlavor := kueueacv1beta1.ResourceFlavor(resourceFlavorName) + appliedResourceFlavor, err := test.Client().Kueue().KueueV1beta1().ResourceFlavors().Apply(test.Ctx(), resourceFlavor, metav1.ApplyOptions{FieldManager: "setup-PyTorchJob", Force: true}) test.Expect(err).NotTo(HaveOccurred()) + test.T().Logf("Applied Kueue ResourceFlavor %s successfully", appliedResourceFlavor.Name) - clusterQueue := &kueuev1beta1.ClusterQueue{ - ObjectMeta: metav1.ObjectMeta{ - Name: clusterQueueName, - }, - Spec: kueuev1beta1.ClusterQueueSpec{ - NamespaceSelector: &metav1.LabelSelector{}, - ResourceGroups: []kueuev1beta1.ResourceGroup{ - { - CoveredResources: []corev1.ResourceName{corev1.ResourceName("cpu"), corev1.ResourceName("memory")}, - Flavors: []kueuev1beta1.FlavorQuotas{ - { - Name: kueuev1beta1.ResourceFlavorReference(resourceFlavor.Name), - Resources: []kueuev1beta1.ResourceQuota{ - { - Name: corev1.ResourceCPU, - NominalQuota: resource.MustParse("8"), - }, - { - Name: corev1.ResourceMemory, - NominalQuota: resource.MustParse("12Gi"), - }, - }, - }, - }, - }, - }, - StopPolicy: Ptr(kueuev1beta1.Hold), - }, - } - clusterQueue, err = test.Client().Kueue().KueueV1beta1().ClusterQueues().Create(test.Ctx(), clusterQueue, metav1.CreateOptions{}) + clusterQueue := kueueacv1beta1.ClusterQueue(clusterQueueName).WithSpec( + kueueacv1beta1.ClusterQueueSpec(). + WithNamespaceSelector(metav1.LabelSelector{}). + WithResourceGroups( + kueueacv1beta1.ResourceGroup().WithCoveredResources( + corev1.ResourceName("cpu"), corev1.ResourceName("memory"), + ).WithFlavors( + kueueacv1beta1.FlavorQuotas(). + WithName(kueuev1beta1.ResourceFlavorReference(resourceFlavorName)). + WithResources( + kueueacv1beta1.ResourceQuota().WithName(corev1.ResourceCPU).WithNominalQuota(resource.MustParse("8")), + kueueacv1beta1.ResourceQuota().WithName(corev1.ResourceMemory).WithNominalQuota(resource.MustParse("12Gi")), + ), + ), + ). + WithStopPolicy(kueuev1beta1.Hold), + ) + appliedClusterQueue, err := test.Client().Kueue().KueueV1beta1().ClusterQueues().Apply(test.Ctx(), clusterQueue, metav1.ApplyOptions{FieldManager: "setup-PyTorchJob", Force: true}) test.Expect(err).NotTo(HaveOccurred()) + test.T().Logf("Applied Kueue ClusterQueue %s successfully", appliedClusterQueue.Name) - localQueue := CreateKueueLocalQueue(test, namespaceName, clusterQueue.Name, AsDefaultQueue) + localQueue := kueueacv1beta1.LocalQueue(localQueueName, namespaceName). + WithAnnotations(map[string]string{"kueue.x-k8s.io/default-queue": "true"}). + WithSpec( + kueueacv1beta1.LocalQueueSpec().WithClusterQueue(kueuev1beta1.ClusterQueueReference(clusterQueueName)), + ) + appliedLocalQueue, err := test.Client().Kueue().KueueV1beta1().LocalQueues(namespaceName).Apply(test.Ctx(), localQueue, metav1.ApplyOptions{FieldManager: "setup-PyTorchJob", Force: true}) + test.Expect(err).NotTo(HaveOccurred()) + test.T().Logf("Applied Kueue LocalQueue %s/%s successfully", appliedLocalQueue.Namespace, appliedLocalQueue.Name) // Create training PyTorch job - tuningJob := createPyTorchJob(test, namespaceName, localQueue.Name, *config) + tuningJob := createPyTorchJob(test, namespaceName, appliedLocalQueue.Name, *config) // Make sure the PyTorch job is suspended, waiting for ClusterQueue to be enabled test.Eventually(kftocore.PyTorchJob(test, tuningJob.Namespace, pyTorchJobName), TestTimeoutShort). @@ -133,6 +121,17 @@ func TestRunPytorchjob(t *testing.T) { } func createPyTorchJob(test Test, namespace, localQueueName string, config corev1.ConfigMap) *kftov1.PyTorchJob { + // Does PyTorchJob already exist? + _, err := test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Get(test.Ctx(), pyTorchJobName, metav1.GetOptions{}) + if err == nil { + // If yes then delete it and wait until there are no PyTorchJobs in the namespace + err := test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Delete(test.Ctx(), pyTorchJobName, metav1.DeleteOptions{}) + test.Expect(err).NotTo(HaveOccurred()) + test.Eventually(kftocore.PyTorchJobs(test, namespace), TestTimeoutShort).Should(BeEmpty()) + } else if !errors.IsNotFound(err) { + test.T().Fatalf("Error retrieving PyTorchJob with name `%s`: %v", pyTorchJobName, err) + } + tuningJob := &kftov1.PyTorchJob{ ObjectMeta: metav1.ObjectMeta{ Name: pyTorchJobName, @@ -244,9 +243,23 @@ func createPyTorchJob(test Test, namespace, localQueueName string, config corev1 }, } - tuningJob, err := test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Create(test.Ctx(), tuningJob, metav1.CreateOptions{}) + tuningJob, err = test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Create(test.Ctx(), tuningJob, metav1.CreateOptions{}) test.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created PytorchJob %s/%s successfully", tuningJob.Namespace, tuningJob.Name) return tuningJob } + +func createOrGetUpgradeTestNamespace(test Test, name string, options ...Option[*corev1.Namespace]) (namespace *corev1.Namespace) { + // Verify that the namespace really exists and return it, create it if doesn't exist yet + namespace, err := test.Client().Core().CoreV1().Namespaces().Get(test.Ctx(), name, metav1.GetOptions{}) + if err == nil { + return + } else if errors.IsNotFound(err) { + test.T().Logf("%s namespace doesn't exists. Creating ...", name) + return CreateTestNamespaceWithName(test, name, options...) + } else { + test.T().Fatalf("Error retrieving namespace with name `%s`: %v", name, err) + } + return +} diff --git a/tests/kfto/upgrade/kfto_sft_upgrade_sleep_test.go b/tests/kfto/upgrade/kfto_sft_upgrade_sleep_test.go index 7f9d6576..39e49496 100644 --- a/tests/kfto/upgrade/kfto_sft_upgrade_sleep_test.go +++ b/tests/kfto/upgrade/kfto_sft_upgrade_sleep_test.go @@ -24,6 +24,7 @@ import ( . "github.com/project-codeflare/codeflare-common/support" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kftov1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" @@ -38,13 +39,7 @@ func TestSetupSleepPytorchjob(t *testing.T) { test := With(t) // Create a namespace - namespace := &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: sleepNamespaceName, - }, - } - _, err := test.Client().Core().CoreV1().Namespaces().Create(test.Ctx(), namespace, metav1.CreateOptions{}) - test.Expect(err).NotTo(HaveOccurred()) + createOrGetUpgradeTestNamespace(test, sleepNamespaceName) // Create training PyTorch job createSleepPyTorchJob(test, sleepNamespaceName) @@ -76,6 +71,17 @@ func TestVerifySleepPytorchjob(t *testing.T) { } func createSleepPyTorchJob(test Test, namespace string) *kftov1.PyTorchJob { + // Does PyTorchJob already exist? + _, err := test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Get(test.Ctx(), sleepPyTorchJobName, metav1.GetOptions{}) + if err == nil { + // If yes then delete it and wait until there are no PyTorchJobs in the namespace + err := test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Delete(test.Ctx(), sleepPyTorchJobName, metav1.DeleteOptions{}) + test.Expect(err).NotTo(HaveOccurred()) + test.Eventually(kftocore.PyTorchJobs(test, namespace), TestTimeoutShort).Should(BeEmpty()) + } else if !errors.IsNotFound(err) { + test.T().Fatalf("Error retrieving PyTorchJob with name `%s`: %v", sleepPyTorchJobName, err) + } + tuningJob := &kftov1.PyTorchJob{ ObjectMeta: metav1.ObjectMeta{ Name: sleepPyTorchJobName, @@ -102,7 +108,7 @@ func createSleepPyTorchJob(test Test, namespace string) *kftov1.PyTorchJob { }, } - tuningJob, err := test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Create(test.Ctx(), tuningJob, metav1.CreateOptions{}) + tuningJob, err = test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Create(test.Ctx(), tuningJob, metav1.CreateOptions{}) test.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created PytorchJob %s/%s successfully", tuningJob.Namespace, tuningJob.Name)