Skip to content

Commit

Permalink
Update Gpu struct to Accelerator and add isGpu method to be used for …
Browse files Browse the repository at this point in the history
…KFTO tests
  • Loading branch information
abhijeet-dhumal committed Jan 20, 2025
1 parent c3f99cf commit 95a6f5c
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 31 deletions.
42 changes: 18 additions & 24 deletions tests/kfto/kfto_mnist_training_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestPyTorchJobMnistMultiNodeMultiGpuWithROCm(t *testing.T) {
runKFTOPyTorchMnistJob(t, AMD, GetROCmTrainingImage(), "resources/requirements-rocm.txt", 1, 2)
}

func runKFTOPyTorchMnistJob(t *testing.T, gpu Gpu, image string, requirementsFile string, workerReplicas, numProcPerNode int) {
func runKFTOPyTorchMnistJob(t *testing.T, accelerator Accelerator, image string, requirementsFile string, workerReplicas, numProcPerNode int) {
test := With(t)

// Create a namespace
Expand All @@ -63,7 +63,7 @@ func runKFTOPyTorchMnistJob(t *testing.T, gpu Gpu, image string, requirementsFil
mnist := ReadFile(test, "resources/mnist.py")
requirementsFileName := ReadFile(test, requirementsFile)

if workerReplicas*numProcPerNode > 0 && gpu.ResourceLabel != "cpu" {
if accelerator.isGpu() {
mnist = bytes.Replace(mnist, []byte("accelerator=\"has to be specified\""), []byte("accelerator=\"gpu\""), 1)
} else {
mnist = bytes.Replace(mnist, []byte("accelerator=\"has to be specified\""), []byte("accelerator=\"cpu\""), 1)
Expand All @@ -74,31 +74,28 @@ func runKFTOPyTorchMnistJob(t *testing.T, gpu Gpu, image string, requirementsFil
"requirements.txt": requirementsFileName,
})

outputPvc := CreatePersistentVolumeClaim(test, namespace.Name, "50Gi", corev1.ReadWriteOnce)
defer test.Client().Core().CoreV1().PersistentVolumeClaims(namespace.Name).Delete(test.Ctx(), outputPvc.Name, metav1.DeleteOptions{})

// Create training PyTorch job
tuningJob := createKFTOPyTorchMnistJob(test, namespace.Name, *config, gpu, workerReplicas, numProcPerNode, outputPvc.Name, image)
tuningJob := createKFTOPyTorchMnistJob(test, namespace.Name, *config, accelerator, workerReplicas, numProcPerNode, image)
defer test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace.Name).Delete(test.Ctx(), tuningJob.Name, *metav1.NewDeleteOptions(0))

// Make sure the PyTorch job is running
test.Eventually(PyTorchJob(test, namespace.Name, tuningJob.Name), TestTimeoutDouble).
Should(WithTransform(PyTorchJobConditionRunning, Equal(corev1.ConditionTrue)))

// Verify GPU utilization
if IsOpenShift(test) && gpu == NVIDIA {
if IsOpenShift(test) && accelerator == NVIDIA {
trainingPods := GetPods(test, namespace.Name, metav1.ListOptions{LabelSelector: "training.kubeflow.org/job-name=" + tuningJob.GetName()})
test.Expect(trainingPods).To(HaveLen(workerReplicas + 1)) // +1 is a master node

for _, trainingPod := range trainingPods {
// Check that GPUs for training pods were utilized recently
test.Eventually(OpenShiftPrometheusGpuUtil(test, trainingPod, gpu), 15*time.Minute).
test.Eventually(OpenShiftPrometheusGpuUtil(test, trainingPod, accelerator), 15*time.Minute).
Should(
And(
HaveLen(numProcPerNode),
ContainElement(
// Check that at least some GPU was utilized on more than 30%
HaveField("Value", BeNumerically(">", 30)),
// Check that at least some GPU was utilized on more than 20%
HaveField("Value", BeNumerically(">", 20)),
),
),
)
Expand All @@ -112,12 +109,9 @@ func runKFTOPyTorchMnistJob(t *testing.T, gpu Gpu, image string, requirementsFil

}

func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.ConfigMap, gpu Gpu, workerReplicas int, numProcPerNode int, outputPvcName string, baseImage string) *kftov1.PyTorchJob {
var useGPU = false
func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.ConfigMap, accelerator Accelerator, workerReplicas int, numProcPerNode int, baseImage string) *kftov1.PyTorchJob {
var backend string

if gpu.ResourceLabel != "cpu" {
useGPU = true
if accelerator.isGpu() {
backend = "nccl"
} else {
backend = "gloo"
Expand Down Expand Up @@ -172,7 +166,7 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
MNIST('/tmp/datasets/mnist', train=False, download=True, transform=Compose([ToTensor()]))" && \
echo -e "\n\n Dataset downloaded to /tmp/datasets/mnist" && ls -R /tmp/datasets/mnist && \
echo -e "\n\n Starting training..." && \
torchrun --nproc_per_node=%d /mnt/files/mnist.py --dataset_path "/tmp/datasets/mnist" --epochs 7 --save_every 2 --batch_size 64 --lr 0.0005 --snapshot_path "mnist_snapshot.pt" --backend %s`, numProcPerNode, backend),
torchrun --nproc_per_node=%d /mnt/files/mnist.py --dataset_path "/tmp/datasets/mnist" --epochs 7 --save_every 2 --batch_size 128 --lr 0.001 --snapshot_path "mnist_snapshot.pt" --backend %s`, numProcPerNode, backend),
},
VolumeMounts: []corev1.VolumeMount{
{
Expand Down Expand Up @@ -257,7 +251,7 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
MNIST('/tmp/datasets/mnist', train=False, download=True, transform=Compose([ToTensor()]))" && \
echo -e "\n\n Dataset downloaded to /tmp/datasets/mnist" && ls -R /tmp/datasets/mnist && \
echo -e "\n\n Starting training..." && \
torchrun --nproc_per_node=%d /mnt/files/mnist.py --dataset_path "/tmp/datasets/mnist" --epochs 7 --save_every 2 --batch_size 64 --lr 0.0005 --snapshot_path "mnist_snapshot.pt" --backend %s`, numProcPerNode, backend),
torchrun --nproc_per_node=%d /mnt/files/mnist.py --dataset_path "/tmp/datasets/mnist" --epochs 7 --save_every 2 --batch_size 128 --lr 0.001 --snapshot_path "mnist_snapshot.pt" --backend %s`, numProcPerNode, backend),
},
VolumeMounts: []corev1.VolumeMount{
{
Expand Down Expand Up @@ -307,12 +301,12 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
},
}

if useGPU {
if accelerator.isGpu() {
// Update resource lists for GPU (NVIDIA/ROCm) usecase
tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Resources.Requests[corev1.ResourceName(gpu.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode))
tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(gpu.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode))
tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Resources.Requests[corev1.ResourceName(gpu.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode))
tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(gpu.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode))
tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Resources.Requests[corev1.ResourceName(accelerator.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode))
tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(accelerator.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode))
tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Resources.Requests[corev1.ResourceName(accelerator.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode))
tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(accelerator.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode))

tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Env = []corev1.EnvVar{
{
Expand All @@ -338,13 +332,13 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
// Update tolerations
tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Tolerations = []corev1.Toleration{
{
Key: gpu.ResourceLabel,
Key: accelerator.ResourceLabel,
Operator: corev1.TolerationOpExists,
},
}
tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Tolerations = []corev1.Toleration{
{
Key: gpu.ResourceLabel,
Key: accelerator.ResourceLabel,
Operator: corev1.TolerationOpExists,
},
}
Expand Down
4 changes: 2 additions & 2 deletions tests/kfto/kfto_training_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestPyTorchJobMultiNodeMultiGpuWithROCm(t *testing.T) {
runKFTOPyTorchJob(t, GetROCmTrainingImage(), AMD, 2, 1)
}

func runKFTOPyTorchJob(t *testing.T, image string, gpu Gpu, numGpus, numberOfWorkerNodes int) {
func runKFTOPyTorchJob(t *testing.T, image string, gpu Accelerator, numGpus, numberOfWorkerNodes int) {
test := With(t)

// Create a namespace
Expand Down Expand Up @@ -112,7 +112,7 @@ func runKFTOPyTorchJob(t *testing.T, image string, gpu Gpu, numGpus, numberOfWor
test.T().Logf("PytorchJob %s/%s ran successfully", tuningJob.Namespace, tuningJob.Name)
}

func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap, gpu Gpu, numGpus, numberOfWorkerNodes int, outputPvcName string, baseImage string) *kftov1.PyTorchJob {
func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap, gpu Accelerator, numGpus, numberOfWorkerNodes int, outputPvcName string, baseImage string) *kftov1.PyTorchJob {
tuningJob := &kftov1.PyTorchJob{
TypeMeta: metav1.TypeMeta{
APIVersion: corev1.SchemeGroupVersion.String(),
Expand Down
15 changes: 10 additions & 5 deletions tests/kfto/support.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,22 @@ import (
corev1 "k8s.io/api/core/v1"
)

type Gpu struct {
type Accelerator struct {
ResourceLabel string
PrometheusGpuUtilizationLabel string
}

var (
NVIDIA = Gpu{ResourceLabel: "nvidia.com/gpu", PrometheusGpuUtilizationLabel: "DCGM_FI_DEV_GPU_UTIL"}
AMD = Gpu{ResourceLabel: "amd.com/gpu"}
CPU = Gpu{ResourceLabel: "cpu"}
NVIDIA = Accelerator{ResourceLabel: "nvidia.com/gpu", PrometheusGpuUtilizationLabel: "DCGM_FI_DEV_GPU_UTIL"}
AMD = Accelerator{ResourceLabel: "amd.com/gpu"}
CPU = Accelerator{}
)

// Method to check if the accelerator is a GPU
func (a Accelerator) isGpu() bool {
return a != CPU
}

//go:embed resources/*
var files embed.FS

Expand All @@ -49,7 +54,7 @@ func ReadFile(t Test, fileName string) []byte {
return file
}

func OpenShiftPrometheusGpuUtil(test Test, pod corev1.Pod, gpu Gpu) func(g Gomega) prometheusmodel.Vector {
func OpenShiftPrometheusGpuUtil(test Test, pod corev1.Pod, gpu Accelerator) func(g Gomega) prometheusmodel.Vector {
return func(g Gomega) prometheusmodel.Vector {
prometheusApiClient := GetOpenShiftPrometheusApiClient(test)
result, warnings, err := prometheusApiClient.Query(test.Ctx(), gpu.PrometheusGpuUtilizationLabel, time.Now(), prometheusapiv1.WithTimeout(5*time.Second))
Expand Down

0 comments on commit 95a6f5c

Please sign in to comment.