Skip to content

Commit

Permalink
Extend HuggingFace Trainer test to multinode multiGPU
Browse files Browse the repository at this point in the history
  • Loading branch information
sutaakar authored and openshift-merge-bot[bot] committed Jan 16, 2025
1 parent ccd10c1 commit 37a59e4
Show file tree
Hide file tree
Showing 3 changed files with 265 additions and 20 deletions.
246 changes: 227 additions & 19 deletions tests/kfto/kfto_training_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package kfto
import (
"fmt"
"testing"
"time"

kftov1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
. "github.com/onsi/gomega"
Expand All @@ -29,15 +30,39 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestPyTorchJobWithCuda(t *testing.T) {
runKFTOPyTorchJob(t, GetCudaTrainingImage(), "nvidia.com/gpu", 1)
func TestPyTorchJobSingleNodeSingleGpuWithCuda(t *testing.T) {
runKFTOPyTorchJob(t, GetCudaTrainingImage(), NVIDIA, 1, 0)
}

func TestPyTorchJobWithROCm(t *testing.T) {
runKFTOPyTorchJob(t, GetROCmTrainingImage(), "amd.com/gpu", 1)
func TestPyTorchJobSingleNodeMultiGpuWithCuda(t *testing.T) {
runKFTOPyTorchJob(t, GetCudaTrainingImage(), NVIDIA, 2, 0)
}

func runKFTOPyTorchJob(t *testing.T, image string, gpuLabel string, numGpus int) {
func TestPyTorchJobMultiNodeSingleGpuWithCuda(t *testing.T) {
runKFTOPyTorchJob(t, GetCudaTrainingImage(), NVIDIA, 1, 1)
}

func TestPyTorchJobMultiNodeMultiGpuWithCuda(t *testing.T) {
runKFTOPyTorchJob(t, GetCudaTrainingImage(), NVIDIA, 2, 1)
}

func TestPyTorchJobSingleNodeSingleGpuWithROCm(t *testing.T) {
runKFTOPyTorchJob(t, GetROCmTrainingImage(), AMD, 1, 0)
}

func TestPyTorchJobSingleNodeMultiGpuWithROCm(t *testing.T) {
runKFTOPyTorchJob(t, GetROCmTrainingImage(), AMD, 2, 0)
}

func TestPyTorchJobMultiNodeSingleGpuWithROCm(t *testing.T) {
runKFTOPyTorchJob(t, GetROCmTrainingImage(), AMD, 1, 1)
}

func TestPyTorchJobMultiNodeMultiGpuWithROCm(t *testing.T) {
runKFTOPyTorchJob(t, GetROCmTrainingImage(), AMD, 2, 1)
}

func runKFTOPyTorchJob(t *testing.T, image string, gpu Gpu, numGpus, numberOfWorkerNodes int) {
test := With(t)

// Create a namespace
Expand All @@ -54,20 +79,40 @@ func runKFTOPyTorchJob(t *testing.T, image string, gpuLabel string, numGpus int)
defer test.Client().Core().CoreV1().PersistentVolumeClaims(namespace).Delete(test.Ctx(), outputPvc.Name, metav1.DeleteOptions{})

// Create training PyTorch job
tuningJob := createKFTOPyTorchJob(test, namespace, *config, gpuLabel, numGpus, outputPvc.Name, image)
tuningJob := createKFTOPyTorchJob(test, namespace, *config, gpu, numGpus, numberOfWorkerNodes, outputPvc.Name, image)
defer test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Delete(test.Ctx(), tuningJob.Name, *metav1.NewDeleteOptions(0))

// Make sure the PyTorch job is running
test.Eventually(PyTorchJob(test, namespace, tuningJob.Name), TestTimeoutDouble).
Should(WithTransform(PyTorchJobConditionRunning, Equal(corev1.ConditionTrue)))

// Verify GPU utilization
if IsOpenShift(test) && gpu == NVIDIA {
trainingPods := GetPods(test, namespace, metav1.ListOptions{LabelSelector: "training.kubeflow.org/job-name=" + tuningJob.GetName()})
test.Expect(trainingPods).To(HaveLen(numberOfWorkerNodes + 1)) // +1 is a master node

for _, trainingPod := range trainingPods {
// Check that GPUs for training pods were utilized recently
test.Eventually(OpenShiftPrometheusGpuUtil(test, trainingPod, gpu), 15*time.Minute).
Should(
And(
HaveLen(numGpus),
ContainElement(
// Check that at lest some GPU was utilized on more than 50%
HaveField("Value", BeNumerically(">", 50)),
),
),
)
}
test.T().Log("All GPUs were successfully utilized")
}

// Make sure the PyTorch job succeeded
test.Eventually(PyTorchJob(test, namespace, tuningJob.Name), TestTimeoutDouble).Should(WithTransform(PyTorchJobConditionSucceeded, Equal(corev1.ConditionTrue)))
test.T().Logf("PytorchJob %s/%s ran successfully", tuningJob.Namespace, tuningJob.Name)

}

func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap, gpuLabel string, numGpus int, outputPvcName string, baseImage string) *kftov1.PyTorchJob {
func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap, gpu Gpu, numGpus, numberOfWorkerNodes int, outputPvcName string, baseImage string) *kftov1.PyTorchJob {
tuningJob := &kftov1.PyTorchJob{
TypeMeta: metav1.TypeMeta{
APIVersion: corev1.SchemeGroupVersion.String(),
Expand All @@ -78,14 +123,33 @@ func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap,
},
Spec: kftov1.PyTorchJobSpec{
PyTorchReplicaSpecs: map[kftov1.ReplicaType]*kftov1.ReplicaSpec{
"Master": {
kftov1.PyTorchJobReplicaTypeMaster: {
Replicas: Ptr(int32(1)),
RestartPolicy: "OnFailure",
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "kfto-llm",
},
},
Spec: corev1.PodSpec{
Affinity: &corev1.Affinity{
PodAntiAffinity: &corev1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "kfto-llm",
},
},
TopologyKey: "kubernetes.io/hostname",
},
},
},
},
Tolerations: []corev1.Toleration{
{
Key: gpuLabel,
Key: gpu.ResourceLabel,
Operator: corev1.TolerationOpExists,
},
},
Expand Down Expand Up @@ -124,12 +188,12 @@ func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap,
ImagePullPolicy: corev1.PullIfNotPresent,
Command: []string{
"/bin/bash", "-c",
`python /etc/config/hf_llm_training.py \
`torchrun /etc/config/hf_llm_training.py \
--model_uri /tmp/model/bloom-560m \
--model_dir /tmp/model/bloom-560m \
--dataset_file /tmp/all_datasets/alpaca_data_hundredth.json \
--dataset_file /tmp/all_datasets/alpaca_data_tenth.json \
--transformer_type AutoModelForCausalLM \
--training_parameters '{"output_dir": "/mnt/output", "per_device_train_batch_size": 8, "num_train_epochs": 3, "logging_dir": "/logs", "eval_strategy": "epoch"}' \
--training_parameters '{"output_dir": "/mnt/output", "per_device_train_batch_size": 8, "num_train_epochs": 3, "logging_dir": "/logs", "eval_strategy": "epoch", "save_strategy": "no"}' \
--lora_config '{"r": 4, "lora_alpha": 16, "lora_dropout": 0.1, "bias": "none"}'`,
},
Env: []corev1.EnvVar{
Expand All @@ -145,6 +209,10 @@ func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap,
Name: "TOKENIZERS_PARALLELISM",
Value: "false",
},
{
Name: "NCCL_DEBUG",
Value: "INFO",
},
},
VolumeMounts: []corev1.VolumeMount{
{
Expand All @@ -162,14 +230,14 @@ func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap,
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("8Gi"),
corev1.ResourceName(gpuLabel): resource.MustParse(fmt.Sprint(numGpus)),
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("8Gi"),
corev1.ResourceName(gpu.ResourceLabel): resource.MustParse(fmt.Sprint(numGpus)),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("8Gi"),
corev1.ResourceName(gpuLabel): resource.MustParse(fmt.Sprint(numGpus)),
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("8Gi"),
corev1.ResourceName(gpu.ResourceLabel): resource.MustParse(fmt.Sprint(numGpus)),
},
},
SecurityContext: &corev1.SecurityContext{
Expand Down Expand Up @@ -207,6 +275,146 @@ func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap,
},
},
},
kftov1.PyTorchJobReplicaTypeWorker: {
Replicas: Ptr(int32(numberOfWorkerNodes)),
RestartPolicy: "OnFailure",
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "kfto-llm",
},
},
Spec: corev1.PodSpec{
Affinity: &corev1.Affinity{
PodAntiAffinity: &corev1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "kfto-llm",
},
},
TopologyKey: "kubernetes.io/hostname",
},
},
},
},
Tolerations: []corev1.Toleration{
{
Key: gpu.ResourceLabel,
Operator: corev1.TolerationOpExists,
},
},
InitContainers: []corev1.Container{
{
Name: "copy-model",
Image: GetBloomModelImage(),
ImagePullPolicy: corev1.PullIfNotPresent,
VolumeMounts: []corev1.VolumeMount{
{
Name: "tmp-volume",
MountPath: "/tmp",
},
},
Command: []string{"/bin/sh", "-c"},
Args: []string{"mkdir /tmp/model; cp -r /models/bloom-560m /tmp/model"},
},
{
Name: "copy-dataset",
Image: GetAlpacaDatasetImage(),
ImagePullPolicy: corev1.PullIfNotPresent,
VolumeMounts: []corev1.VolumeMount{
{
Name: "tmp-volume",
MountPath: "/tmp",
},
},
Command: []string{"/bin/sh", "-c"},
Args: []string{"mkdir /tmp/all_datasets; cp -r /dataset/* /tmp/all_datasets;ls /tmp/all_datasets"},
},
},
Containers: []corev1.Container{
{
Name: "pytorch",
Image: baseImage,
ImagePullPolicy: corev1.PullIfNotPresent,
Command: []string{
"/bin/bash", "-c",
`torchrun /etc/config/hf_llm_training.py \
--model_uri /tmp/model/bloom-560m \
--model_dir /tmp/model/bloom-560m \
--dataset_file /tmp/all_datasets/alpaca_data_tenth.json \
--transformer_type AutoModelForCausalLM \
--training_parameters '{"output_dir": "/mnt/output", "per_device_train_batch_size": 8, "num_train_epochs": 3, "logging_dir": "/logs", "eval_strategy": "epoch", "save_strategy": "no"}' \
--lora_config '{"r": 4, "lora_alpha": 16, "lora_dropout": 0.1, "bias": "none"}'`,
},
Env: []corev1.EnvVar{
{
Name: "HF_HOME",
Value: "/tmp/.cache",
},
{
Name: "TRITON_CACHE_DIR",
Value: "/tmp/.triton",
},
{
Name: "TOKENIZERS_PARALLELISM",
Value: "false",
},
{
Name: "NCCL_DEBUG",
Value: "INFO",
},
},
VolumeMounts: []corev1.VolumeMount{
{
Name: "config-volume",
MountPath: "/etc/config",
},
{
Name: "tmp-volume",
MountPath: "/tmp",
},
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("8Gi"),
corev1.ResourceName(gpu.ResourceLabel): resource.MustParse(fmt.Sprint(numGpus)),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("8Gi"),
corev1.ResourceName(gpu.ResourceLabel): resource.MustParse(fmt.Sprint(numGpus)),
},
},
SecurityContext: &corev1.SecurityContext{
RunAsNonRoot: Ptr(true),
ReadOnlyRootFilesystem: Ptr(true),
},
},
},
Volumes: []corev1.Volume{
{
Name: "config-volume",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: config.Name,
},
},
},
},
{
Name: "tmp-volume",
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
},
},
},
},
},
},
},
}
Expand Down
6 changes: 5 additions & 1 deletion tests/kfto/resources/hf_llm_training.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,12 @@ def train_model(model, transformer_type, train_data, eval_data, tokenizer, train
mlm=False,
)

# Train the model.
# Train and save the model.
trainer.train()
trainer.save_model()
logger.info("parallel_mode: '{0}'".format(trainer.args.parallel_mode))
logger.info("is_model_parallel: '{0}'".format(trainer.is_model_parallel))
logger.info("model_wrapped: '{0}'".format(trainer.model_wrapped))


def parse_arguments():
Expand Down
33 changes: 33 additions & 0 deletions tests/kfto/support.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,24 @@ package kfto

import (
"embed"
"time"

. "github.com/onsi/gomega"
. "github.com/project-codeflare/codeflare-common/support"
prometheusapiv1 "github.com/prometheus/client_golang/api/prometheus/v1"
prometheusmodel "github.com/prometheus/common/model"

corev1 "k8s.io/api/core/v1"
)

type Gpu struct {
ResourceLabel string
PrometheusGpuUtilizationLabel string
}

var (
NVIDIA = Gpu{ResourceLabel: "nvidia.com/gpu", PrometheusGpuUtilizationLabel: "DCGM_FI_DEV_GPU_UTIL"}
AMD = Gpu{ResourceLabel: "amd.com/gpu"}
)

//go:embed resources/*
Expand All @@ -32,3 +47,21 @@ func ReadFile(t Test, fileName string) []byte {
t.Expect(err).NotTo(HaveOccurred())
return file
}

func OpenShiftPrometheusGpuUtil(test Test, pod corev1.Pod, gpu Gpu) func(g Gomega) prometheusmodel.Vector {
return func(g Gomega) prometheusmodel.Vector {
prometheusApiClient := GetOpenShiftPrometheusApiClient(test)
result, warnings, err := prometheusApiClient.Query(test.Ctx(), gpu.PrometheusGpuUtilizationLabel, time.Now(), prometheusapiv1.WithTimeout(5*time.Second))
g.Expect(err).NotTo(HaveOccurred())
g.Expect(warnings).Should(HaveLen(0))

var util prometheusmodel.Vector
for _, sample := range result.(prometheusmodel.Vector) {
if string(sample.Metric["exported_namespace"]) == pod.GetNamespace() && string(sample.Metric["exported_pod"]) == pod.GetName() {
util = append(util, sample)
}
}

return util
}
}

0 comments on commit 37a59e4

Please sign in to comment.