From 95a6f5cec082e22d6ad7e783bfc97131ff911a7c Mon Sep 17 00:00:00 2001 From: abhijeet-dhumal Date: Mon, 20 Jan 2025 13:54:24 +0530 Subject: [PATCH] Update Gpu struct to Accelerator and add isGpu method to be used for KFTO tests --- tests/kfto/kfto_mnist_training_test.go | 42 +++++++++++--------------- tests/kfto/kfto_training_test.go | 4 +-- tests/kfto/support.go | 15 ++++++--- 3 files changed, 30 insertions(+), 31 deletions(-) diff --git a/tests/kfto/kfto_mnist_training_test.go b/tests/kfto/kfto_mnist_training_test.go index 7cfce26d..dd6c367f 100644 --- a/tests/kfto/kfto_mnist_training_test.go +++ b/tests/kfto/kfto_mnist_training_test.go @@ -54,7 +54,7 @@ func TestPyTorchJobMnistMultiNodeMultiGpuWithROCm(t *testing.T) { runKFTOPyTorchMnistJob(t, AMD, GetROCmTrainingImage(), "resources/requirements-rocm.txt", 1, 2) } -func runKFTOPyTorchMnistJob(t *testing.T, gpu Gpu, image string, requirementsFile string, workerReplicas, numProcPerNode int) { +func runKFTOPyTorchMnistJob(t *testing.T, accelerator Accelerator, image string, requirementsFile string, workerReplicas, numProcPerNode int) { test := With(t) // Create a namespace @@ -63,7 +63,7 @@ func runKFTOPyTorchMnistJob(t *testing.T, gpu Gpu, image string, requirementsFil mnist := ReadFile(test, "resources/mnist.py") requirementsFileName := ReadFile(test, requirementsFile) - if workerReplicas*numProcPerNode > 0 && gpu.ResourceLabel != "cpu" { + if accelerator.isGpu() { mnist = bytes.Replace(mnist, []byte("accelerator=\"has to be specified\""), []byte("accelerator=\"gpu\""), 1) } else { mnist = bytes.Replace(mnist, []byte("accelerator=\"has to be specified\""), []byte("accelerator=\"cpu\""), 1) @@ -74,11 +74,8 @@ func runKFTOPyTorchMnistJob(t *testing.T, gpu Gpu, image string, requirementsFil "requirements.txt": requirementsFileName, }) - outputPvc := CreatePersistentVolumeClaim(test, namespace.Name, "50Gi", corev1.ReadWriteOnce) - defer test.Client().Core().CoreV1().PersistentVolumeClaims(namespace.Name).Delete(test.Ctx(), outputPvc.Name, metav1.DeleteOptions{}) - // Create training PyTorch job - tuningJob := createKFTOPyTorchMnistJob(test, namespace.Name, *config, gpu, workerReplicas, numProcPerNode, outputPvc.Name, image) + tuningJob := createKFTOPyTorchMnistJob(test, namespace.Name, *config, accelerator, workerReplicas, numProcPerNode, image) defer test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace.Name).Delete(test.Ctx(), tuningJob.Name, *metav1.NewDeleteOptions(0)) // Make sure the PyTorch job is running @@ -86,19 +83,19 @@ func runKFTOPyTorchMnistJob(t *testing.T, gpu Gpu, image string, requirementsFil Should(WithTransform(PyTorchJobConditionRunning, Equal(corev1.ConditionTrue))) // Verify GPU utilization - if IsOpenShift(test) && gpu == NVIDIA { + if IsOpenShift(test) && accelerator == NVIDIA { trainingPods := GetPods(test, namespace.Name, metav1.ListOptions{LabelSelector: "training.kubeflow.org/job-name=" + tuningJob.GetName()}) test.Expect(trainingPods).To(HaveLen(workerReplicas + 1)) // +1 is a master node for _, trainingPod := range trainingPods { // Check that GPUs for training pods were utilized recently - test.Eventually(OpenShiftPrometheusGpuUtil(test, trainingPod, gpu), 15*time.Minute). + test.Eventually(OpenShiftPrometheusGpuUtil(test, trainingPod, accelerator), 15*time.Minute). Should( And( HaveLen(numProcPerNode), ContainElement( - // Check that at least some GPU was utilized on more than 30% - HaveField("Value", BeNumerically(">", 30)), + // Check that at least some GPU was utilized on more than 20% + HaveField("Value", BeNumerically(">", 20)), ), ), ) @@ -112,12 +109,9 @@ func runKFTOPyTorchMnistJob(t *testing.T, gpu Gpu, image string, requirementsFil } -func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.ConfigMap, gpu Gpu, workerReplicas int, numProcPerNode int, outputPvcName string, baseImage string) *kftov1.PyTorchJob { - var useGPU = false +func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.ConfigMap, accelerator Accelerator, workerReplicas int, numProcPerNode int, baseImage string) *kftov1.PyTorchJob { var backend string - - if gpu.ResourceLabel != "cpu" { - useGPU = true + if accelerator.isGpu() { backend = "nccl" } else { backend = "gloo" @@ -172,7 +166,7 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config MNIST('/tmp/datasets/mnist', train=False, download=True, transform=Compose([ToTensor()]))" && \ echo -e "\n\n Dataset downloaded to /tmp/datasets/mnist" && ls -R /tmp/datasets/mnist && \ echo -e "\n\n Starting training..." && \ - torchrun --nproc_per_node=%d /mnt/files/mnist.py --dataset_path "/tmp/datasets/mnist" --epochs 7 --save_every 2 --batch_size 64 --lr 0.0005 --snapshot_path "mnist_snapshot.pt" --backend %s`, numProcPerNode, backend), + torchrun --nproc_per_node=%d /mnt/files/mnist.py --dataset_path "/tmp/datasets/mnist" --epochs 7 --save_every 2 --batch_size 128 --lr 0.001 --snapshot_path "mnist_snapshot.pt" --backend %s`, numProcPerNode, backend), }, VolumeMounts: []corev1.VolumeMount{ { @@ -257,7 +251,7 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config MNIST('/tmp/datasets/mnist', train=False, download=True, transform=Compose([ToTensor()]))" && \ echo -e "\n\n Dataset downloaded to /tmp/datasets/mnist" && ls -R /tmp/datasets/mnist && \ echo -e "\n\n Starting training..." && \ - torchrun --nproc_per_node=%d /mnt/files/mnist.py --dataset_path "/tmp/datasets/mnist" --epochs 7 --save_every 2 --batch_size 64 --lr 0.0005 --snapshot_path "mnist_snapshot.pt" --backend %s`, numProcPerNode, backend), + torchrun --nproc_per_node=%d /mnt/files/mnist.py --dataset_path "/tmp/datasets/mnist" --epochs 7 --save_every 2 --batch_size 128 --lr 0.001 --snapshot_path "mnist_snapshot.pt" --backend %s`, numProcPerNode, backend), }, VolumeMounts: []corev1.VolumeMount{ { @@ -307,12 +301,12 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config }, } - if useGPU { + if accelerator.isGpu() { // Update resource lists for GPU (NVIDIA/ROCm) usecase - tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Resources.Requests[corev1.ResourceName(gpu.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode)) - tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(gpu.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode)) - tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Resources.Requests[corev1.ResourceName(gpu.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode)) - tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(gpu.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode)) + tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Resources.Requests[corev1.ResourceName(accelerator.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode)) + tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(accelerator.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode)) + tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Resources.Requests[corev1.ResourceName(accelerator.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode)) + tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(accelerator.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode)) tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Env = []corev1.EnvVar{ { @@ -338,13 +332,13 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config // Update tolerations tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Tolerations = []corev1.Toleration{ { - Key: gpu.ResourceLabel, + Key: accelerator.ResourceLabel, Operator: corev1.TolerationOpExists, }, } tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Tolerations = []corev1.Toleration{ { - Key: gpu.ResourceLabel, + Key: accelerator.ResourceLabel, Operator: corev1.TolerationOpExists, }, } diff --git a/tests/kfto/kfto_training_test.go b/tests/kfto/kfto_training_test.go index f4d78e6f..5ea0e565 100644 --- a/tests/kfto/kfto_training_test.go +++ b/tests/kfto/kfto_training_test.go @@ -62,7 +62,7 @@ func TestPyTorchJobMultiNodeMultiGpuWithROCm(t *testing.T) { runKFTOPyTorchJob(t, GetROCmTrainingImage(), AMD, 2, 1) } -func runKFTOPyTorchJob(t *testing.T, image string, gpu Gpu, numGpus, numberOfWorkerNodes int) { +func runKFTOPyTorchJob(t *testing.T, image string, gpu Accelerator, numGpus, numberOfWorkerNodes int) { test := With(t) // Create a namespace @@ -112,7 +112,7 @@ func runKFTOPyTorchJob(t *testing.T, image string, gpu Gpu, numGpus, numberOfWor test.T().Logf("PytorchJob %s/%s ran successfully", tuningJob.Namespace, tuningJob.Name) } -func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap, gpu Gpu, numGpus, numberOfWorkerNodes int, outputPvcName string, baseImage string) *kftov1.PyTorchJob { +func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap, gpu Accelerator, numGpus, numberOfWorkerNodes int, outputPvcName string, baseImage string) *kftov1.PyTorchJob { tuningJob := &kftov1.PyTorchJob{ TypeMeta: metav1.TypeMeta{ APIVersion: corev1.SchemeGroupVersion.String(), diff --git a/tests/kfto/support.go b/tests/kfto/support.go index 52ad4af7..df078b69 100644 --- a/tests/kfto/support.go +++ b/tests/kfto/support.go @@ -28,17 +28,22 @@ import ( corev1 "k8s.io/api/core/v1" ) -type Gpu struct { +type Accelerator struct { ResourceLabel string PrometheusGpuUtilizationLabel string } var ( - NVIDIA = Gpu{ResourceLabel: "nvidia.com/gpu", PrometheusGpuUtilizationLabel: "DCGM_FI_DEV_GPU_UTIL"} - AMD = Gpu{ResourceLabel: "amd.com/gpu"} - CPU = Gpu{ResourceLabel: "cpu"} + NVIDIA = Accelerator{ResourceLabel: "nvidia.com/gpu", PrometheusGpuUtilizationLabel: "DCGM_FI_DEV_GPU_UTIL"} + AMD = Accelerator{ResourceLabel: "amd.com/gpu"} + CPU = Accelerator{} ) +// Method to check if the accelerator is a GPU +func (a Accelerator) isGpu() bool { + return a != CPU +} + //go:embed resources/* var files embed.FS @@ -49,7 +54,7 @@ func ReadFile(t Test, fileName string) []byte { return file } -func OpenShiftPrometheusGpuUtil(test Test, pod corev1.Pod, gpu Gpu) func(g Gomega) prometheusmodel.Vector { +func OpenShiftPrometheusGpuUtil(test Test, pod corev1.Pod, gpu Accelerator) func(g Gomega) prometheusmodel.Vector { return func(g Gomega) prometheusmodel.Vector { prometheusApiClient := GetOpenShiftPrometheusApiClient(test) result, warnings, err := prometheusApiClient.Query(test.Ctx(), gpu.PrometheusGpuUtilizationLabel, time.Now(), prometheusapiv1.WithTimeout(5*time.Second))