From 37a59e4adb546451f29205c6b49d57ee76180e8b Mon Sep 17 00:00:00 2001 From: Karel Suta Date: Mon, 13 Jan 2025 13:40:38 +0100 Subject: [PATCH] Extend HuggingFace Trainer test to multinode multiGPU --- tests/kfto/kfto_training_test.go | 246 ++++++++++++++++++++++-- tests/kfto/resources/hf_llm_training.py | 6 +- tests/kfto/support.go | 33 ++++ 3 files changed, 265 insertions(+), 20 deletions(-) diff --git a/tests/kfto/kfto_training_test.go b/tests/kfto/kfto_training_test.go index 4903c1d7..1edc50de 100644 --- a/tests/kfto/kfto_training_test.go +++ b/tests/kfto/kfto_training_test.go @@ -19,6 +19,7 @@ package kfto import ( "fmt" "testing" + "time" kftov1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" . "github.com/onsi/gomega" @@ -29,15 +30,39 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func TestPyTorchJobWithCuda(t *testing.T) { - runKFTOPyTorchJob(t, GetCudaTrainingImage(), "nvidia.com/gpu", 1) +func TestPyTorchJobSingleNodeSingleGpuWithCuda(t *testing.T) { + runKFTOPyTorchJob(t, GetCudaTrainingImage(), NVIDIA, 1, 0) } -func TestPyTorchJobWithROCm(t *testing.T) { - runKFTOPyTorchJob(t, GetROCmTrainingImage(), "amd.com/gpu", 1) +func TestPyTorchJobSingleNodeMultiGpuWithCuda(t *testing.T) { + runKFTOPyTorchJob(t, GetCudaTrainingImage(), NVIDIA, 2, 0) } -func runKFTOPyTorchJob(t *testing.T, image string, gpuLabel string, numGpus int) { +func TestPyTorchJobMultiNodeSingleGpuWithCuda(t *testing.T) { + runKFTOPyTorchJob(t, GetCudaTrainingImage(), NVIDIA, 1, 1) +} + +func TestPyTorchJobMultiNodeMultiGpuWithCuda(t *testing.T) { + runKFTOPyTorchJob(t, GetCudaTrainingImage(), NVIDIA, 2, 1) +} + +func TestPyTorchJobSingleNodeSingleGpuWithROCm(t *testing.T) { + runKFTOPyTorchJob(t, GetROCmTrainingImage(), AMD, 1, 0) +} + +func TestPyTorchJobSingleNodeMultiGpuWithROCm(t *testing.T) { + runKFTOPyTorchJob(t, GetROCmTrainingImage(), AMD, 2, 0) +} + +func TestPyTorchJobMultiNodeSingleGpuWithROCm(t *testing.T) { + runKFTOPyTorchJob(t, GetROCmTrainingImage(), AMD, 1, 1) +} + +func TestPyTorchJobMultiNodeMultiGpuWithROCm(t *testing.T) { + runKFTOPyTorchJob(t, GetROCmTrainingImage(), AMD, 2, 1) +} + +func runKFTOPyTorchJob(t *testing.T, image string, gpu Gpu, numGpus, numberOfWorkerNodes int) { test := With(t) // Create a namespace @@ -54,20 +79,40 @@ func runKFTOPyTorchJob(t *testing.T, image string, gpuLabel string, numGpus int) defer test.Client().Core().CoreV1().PersistentVolumeClaims(namespace).Delete(test.Ctx(), outputPvc.Name, metav1.DeleteOptions{}) // Create training PyTorch job - tuningJob := createKFTOPyTorchJob(test, namespace, *config, gpuLabel, numGpus, outputPvc.Name, image) + tuningJob := createKFTOPyTorchJob(test, namespace, *config, gpu, numGpus, numberOfWorkerNodes, outputPvc.Name, image) defer test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Delete(test.Ctx(), tuningJob.Name, *metav1.NewDeleteOptions(0)) // Make sure the PyTorch job is running test.Eventually(PyTorchJob(test, namespace, tuningJob.Name), TestTimeoutDouble). Should(WithTransform(PyTorchJobConditionRunning, Equal(corev1.ConditionTrue))) + // Verify GPU utilization + if IsOpenShift(test) && gpu == NVIDIA { + trainingPods := GetPods(test, namespace, metav1.ListOptions{LabelSelector: "training.kubeflow.org/job-name=" + tuningJob.GetName()}) + test.Expect(trainingPods).To(HaveLen(numberOfWorkerNodes + 1)) // +1 is a master node + + for _, trainingPod := range trainingPods { + // Check that GPUs for training pods were utilized recently + test.Eventually(OpenShiftPrometheusGpuUtil(test, trainingPod, gpu), 15*time.Minute). + Should( + And( + HaveLen(numGpus), + ContainElement( + // Check that at lest some GPU was utilized on more than 50% + HaveField("Value", BeNumerically(">", 50)), + ), + ), + ) + } + test.T().Log("All GPUs were successfully utilized") + } + // Make sure the PyTorch job succeeded test.Eventually(PyTorchJob(test, namespace, tuningJob.Name), TestTimeoutDouble).Should(WithTransform(PyTorchJobConditionSucceeded, Equal(corev1.ConditionTrue))) test.T().Logf("PytorchJob %s/%s ran successfully", tuningJob.Namespace, tuningJob.Name) - } -func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap, gpuLabel string, numGpus int, outputPvcName string, baseImage string) *kftov1.PyTorchJob { +func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap, gpu Gpu, numGpus, numberOfWorkerNodes int, outputPvcName string, baseImage string) *kftov1.PyTorchJob { tuningJob := &kftov1.PyTorchJob{ TypeMeta: metav1.TypeMeta{ APIVersion: corev1.SchemeGroupVersion.String(), @@ -78,14 +123,33 @@ func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap, }, Spec: kftov1.PyTorchJobSpec{ PyTorchReplicaSpecs: map[kftov1.ReplicaType]*kftov1.ReplicaSpec{ - "Master": { + kftov1.PyTorchJobReplicaTypeMaster: { Replicas: Ptr(int32(1)), RestartPolicy: "OnFailure", Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "kfto-llm", + }, + }, Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + PodAntiAffinity: &corev1.PodAntiAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "kfto-llm", + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + }, Tolerations: []corev1.Toleration{ { - Key: gpuLabel, + Key: gpu.ResourceLabel, Operator: corev1.TolerationOpExists, }, }, @@ -124,12 +188,12 @@ func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap, ImagePullPolicy: corev1.PullIfNotPresent, Command: []string{ "/bin/bash", "-c", - `python /etc/config/hf_llm_training.py \ + `torchrun /etc/config/hf_llm_training.py \ --model_uri /tmp/model/bloom-560m \ --model_dir /tmp/model/bloom-560m \ - --dataset_file /tmp/all_datasets/alpaca_data_hundredth.json \ + --dataset_file /tmp/all_datasets/alpaca_data_tenth.json \ --transformer_type AutoModelForCausalLM \ - --training_parameters '{"output_dir": "/mnt/output", "per_device_train_batch_size": 8, "num_train_epochs": 3, "logging_dir": "/logs", "eval_strategy": "epoch"}' \ + --training_parameters '{"output_dir": "/mnt/output", "per_device_train_batch_size": 8, "num_train_epochs": 3, "logging_dir": "/logs", "eval_strategy": "epoch", "save_strategy": "no"}' \ --lora_config '{"r": 4, "lora_alpha": 16, "lora_dropout": 0.1, "bias": "none"}'`, }, Env: []corev1.EnvVar{ @@ -145,6 +209,10 @@ func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap, Name: "TOKENIZERS_PARALLELISM", Value: "false", }, + { + Name: "NCCL_DEBUG", + Value: "INFO", + }, }, VolumeMounts: []corev1.VolumeMount{ { @@ -162,14 +230,14 @@ func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap, }, Resources: corev1.ResourceRequirements{ Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("2"), - corev1.ResourceMemory: resource.MustParse("8Gi"), - corev1.ResourceName(gpuLabel): resource.MustParse(fmt.Sprint(numGpus)), + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("8Gi"), + corev1.ResourceName(gpu.ResourceLabel): resource.MustParse(fmt.Sprint(numGpus)), }, Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("2"), - corev1.ResourceMemory: resource.MustParse("8Gi"), - corev1.ResourceName(gpuLabel): resource.MustParse(fmt.Sprint(numGpus)), + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("8Gi"), + corev1.ResourceName(gpu.ResourceLabel): resource.MustParse(fmt.Sprint(numGpus)), }, }, SecurityContext: &corev1.SecurityContext{ @@ -207,6 +275,146 @@ func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap, }, }, }, + kftov1.PyTorchJobReplicaTypeWorker: { + Replicas: Ptr(int32(numberOfWorkerNodes)), + RestartPolicy: "OnFailure", + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "kfto-llm", + }, + }, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + PodAntiAffinity: &corev1.PodAntiAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "kfto-llm", + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + }, + Tolerations: []corev1.Toleration{ + { + Key: gpu.ResourceLabel, + Operator: corev1.TolerationOpExists, + }, + }, + InitContainers: []corev1.Container{ + { + Name: "copy-model", + Image: GetBloomModelImage(), + ImagePullPolicy: corev1.PullIfNotPresent, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "tmp-volume", + MountPath: "/tmp", + }, + }, + Command: []string{"/bin/sh", "-c"}, + Args: []string{"mkdir /tmp/model; cp -r /models/bloom-560m /tmp/model"}, + }, + { + Name: "copy-dataset", + Image: GetAlpacaDatasetImage(), + ImagePullPolicy: corev1.PullIfNotPresent, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "tmp-volume", + MountPath: "/tmp", + }, + }, + Command: []string{"/bin/sh", "-c"}, + Args: []string{"mkdir /tmp/all_datasets; cp -r /dataset/* /tmp/all_datasets;ls /tmp/all_datasets"}, + }, + }, + Containers: []corev1.Container{ + { + Name: "pytorch", + Image: baseImage, + ImagePullPolicy: corev1.PullIfNotPresent, + Command: []string{ + "/bin/bash", "-c", + `torchrun /etc/config/hf_llm_training.py \ + --model_uri /tmp/model/bloom-560m \ + --model_dir /tmp/model/bloom-560m \ + --dataset_file /tmp/all_datasets/alpaca_data_tenth.json \ + --transformer_type AutoModelForCausalLM \ + --training_parameters '{"output_dir": "/mnt/output", "per_device_train_batch_size": 8, "num_train_epochs": 3, "logging_dir": "/logs", "eval_strategy": "epoch", "save_strategy": "no"}' \ + --lora_config '{"r": 4, "lora_alpha": 16, "lora_dropout": 0.1, "bias": "none"}'`, + }, + Env: []corev1.EnvVar{ + { + Name: "HF_HOME", + Value: "/tmp/.cache", + }, + { + Name: "TRITON_CACHE_DIR", + Value: "/tmp/.triton", + }, + { + Name: "TOKENIZERS_PARALLELISM", + Value: "false", + }, + { + Name: "NCCL_DEBUG", + Value: "INFO", + }, + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "config-volume", + MountPath: "/etc/config", + }, + { + Name: "tmp-volume", + MountPath: "/tmp", + }, + }, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("8Gi"), + corev1.ResourceName(gpu.ResourceLabel): resource.MustParse(fmt.Sprint(numGpus)), + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("8Gi"), + corev1.ResourceName(gpu.ResourceLabel): resource.MustParse(fmt.Sprint(numGpus)), + }, + }, + SecurityContext: &corev1.SecurityContext{ + RunAsNonRoot: Ptr(true), + ReadOnlyRootFilesystem: Ptr(true), + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: "config-volume", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: config.Name, + }, + }, + }, + }, + { + Name: "tmp-volume", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + }, + }, + }, + }, }, }, } diff --git a/tests/kfto/resources/hf_llm_training.py b/tests/kfto/resources/hf_llm_training.py index 7a28137a..9ac1d07c 100644 --- a/tests/kfto/resources/hf_llm_training.py +++ b/tests/kfto/resources/hf_llm_training.py @@ -160,8 +160,12 @@ def train_model(model, transformer_type, train_data, eval_data, tokenizer, train mlm=False, ) - # Train the model. + # Train and save the model. trainer.train() + trainer.save_model() + logger.info("parallel_mode: '{0}'".format(trainer.args.parallel_mode)) + logger.info("is_model_parallel: '{0}'".format(trainer.is_model_parallel)) + logger.info("model_wrapped: '{0}'".format(trainer.model_wrapped)) def parse_arguments(): diff --git a/tests/kfto/support.go b/tests/kfto/support.go index ac7ea5a2..982419ad 100644 --- a/tests/kfto/support.go +++ b/tests/kfto/support.go @@ -18,9 +18,24 @@ package kfto import ( "embed" + "time" . "github.com/onsi/gomega" . "github.com/project-codeflare/codeflare-common/support" + prometheusapiv1 "github.com/prometheus/client_golang/api/prometheus/v1" + prometheusmodel "github.com/prometheus/common/model" + + corev1 "k8s.io/api/core/v1" +) + +type Gpu struct { + ResourceLabel string + PrometheusGpuUtilizationLabel string +} + +var ( + NVIDIA = Gpu{ResourceLabel: "nvidia.com/gpu", PrometheusGpuUtilizationLabel: "DCGM_FI_DEV_GPU_UTIL"} + AMD = Gpu{ResourceLabel: "amd.com/gpu"} ) //go:embed resources/* @@ -32,3 +47,21 @@ func ReadFile(t Test, fileName string) []byte { t.Expect(err).NotTo(HaveOccurred()) return file } + +func OpenShiftPrometheusGpuUtil(test Test, pod corev1.Pod, gpu Gpu) func(g Gomega) prometheusmodel.Vector { + return func(g Gomega) prometheusmodel.Vector { + prometheusApiClient := GetOpenShiftPrometheusApiClient(test) + result, warnings, err := prometheusApiClient.Query(test.Ctx(), gpu.PrometheusGpuUtilizationLabel, time.Now(), prometheusapiv1.WithTimeout(5*time.Second)) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(warnings).Should(HaveLen(0)) + + var util prometheusmodel.Vector + for _, sample := range result.(prometheusmodel.Vector) { + if string(sample.Metric["exported_namespace"]) == pod.GetNamespace() && string(sample.Metric["exported_pod"]) == pod.GetName() { + util = append(util, sample) + } + } + + return util + } +}