From 96f9892f1c8550f72b0f3720543c016f45a4ecaf Mon Sep 17 00:00:00 2001 From: Cheyu Wu Date: Thu, 23 Jan 2025 21:15:51 +0800 Subject: [PATCH] refactor: mv ValidateRayJobSpec to validation.go and its unit test to validation_test.go --- .../controllers/ray/rayjob_controller.go | 55 +-------- .../ray/rayjob_controller_unit_test.go | 115 ------------------ .../controllers/ray/utils/validation.go | 53 ++++++++ .../controllers/ray/utils/validation_test.go | 113 +++++++++++++++++ 4 files changed, 167 insertions(+), 169 deletions(-) diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index 7a2ac41f71..642c0b92e2 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -127,7 +127,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err } - if err := validateRayJobSpec(rayJobInstance); err != nil { + if err := utils.ValidateRayJobSpec(rayJobInstance); err != nil { logger.Error(err, "The RayJob spec is invalid") r.Recorder.Eventf(rayJobInstance, corev1.EventTypeWarning, string(utils.InvalidRayJobSpec), "The RayJob spec is invalid %s/%s: %v", rayJobInstance.Namespace, rayJobInstance.Name, err) @@ -875,59 +875,6 @@ func checkActiveDeadlineAndUpdateStatusIfNeeded(ctx context.Context, rayJob *ray return true } -func validateRayJobSpec(rayJob *rayv1.RayJob) error { - // KubeRay has some limitations for the suspend operation. The limitations are a subset of the limitations of - // Kueue (https://kueue.sigs.k8s.io/docs/tasks/run_rayjobs/#c-limitations). For example, KubeRay allows users - // to suspend a RayJob with autoscaling enabled, but Kueue doesn't. - if rayJob.Spec.Suspend && !rayJob.Spec.ShutdownAfterJobFinishes { - return fmt.Errorf("a RayJob with shutdownAfterJobFinishes set to false is not allowed to be suspended") - } - - isClusterSelectorMode := len(rayJob.Spec.ClusterSelector) != 0 - if rayJob.Spec.Suspend && isClusterSelectorMode { - return fmt.Errorf("the ClusterSelector mode doesn't support the suspend operation") - } - if rayJob.Spec.RayClusterSpec == nil && !isClusterSelectorMode { - return fmt.Errorf("one of RayClusterSpec or ClusterSelector must be set") - } - // Validate whether RuntimeEnvYAML is a valid YAML string. Note that this only checks its validity - // as a YAML string, not its adherence to the runtime environment schema. - if _, err := utils.UnmarshalRuntimeEnvYAML(rayJob.Spec.RuntimeEnvYAML); err != nil { - return err - } - if rayJob.Spec.ActiveDeadlineSeconds != nil && *rayJob.Spec.ActiveDeadlineSeconds <= 0 { - return fmt.Errorf("activeDeadlineSeconds must be a positive integer") - } - if rayJob.Spec.BackoffLimit != nil && *rayJob.Spec.BackoffLimit < 0 { - return fmt.Errorf("backoffLimit must be a positive integer") - } - if !features.Enabled(features.RayJobDeletionPolicy) && rayJob.Spec.DeletionPolicy != nil { - return fmt.Errorf("RayJobDeletionPolicy feature gate must be enabled to use the DeletionPolicy feature") - } - - if rayJob.Spec.DeletionPolicy != nil { - policy := *rayJob.Spec.DeletionPolicy - if isClusterSelectorMode { - switch policy { - case rayv1.DeleteClusterDeletionPolicy: - return fmt.Errorf("the ClusterSelector mode doesn't support DeletionPolicy=DeleteCluster") - case rayv1.DeleteWorkersDeletionPolicy: - return fmt.Errorf("the ClusterSelector mode doesn't support DeletionPolicy=DeleteWorkers") - } - } - - if policy == rayv1.DeleteWorkersDeletionPolicy && utils.IsAutoscalingEnabled(rayJob) { - // TODO (rueian): This can be supported in a future Ray version. We should check the RayVersion once we know it. - return fmt.Errorf("DeletionPolicy=DeleteWorkers currently does not support RayCluster with autoscaling enabled") - } - - if rayJob.Spec.ShutdownAfterJobFinishes && policy == rayv1.DeleteNoneDeletionPolicy { - return fmt.Errorf("shutdownAfterJobFinshes is set to 'true' while deletion policy is 'DeleteNone'") - } - } - return nil -} - func validateRayJobStatus(rayJob *rayv1.RayJob) error { if rayJob.Status.JobDeploymentStatus == rayv1.JobDeploymentStatusWaiting && rayJob.Spec.SubmissionMode != rayv1.InteractiveMode { return fmt.Errorf("invalid RayJob State: JobDeploymentStatus cannot be `Waiting` when SubmissionMode is not InteractiveMode") diff --git a/ray-operator/controllers/ray/rayjob_controller_unit_test.go b/ray-operator/controllers/ray/rayjob_controller_unit_test.go index bb8c05f042..6610293bdd 100644 --- a/ray-operator/controllers/ray/rayjob_controller_unit_test.go +++ b/ray-operator/controllers/ray/rayjob_controller_unit_test.go @@ -13,7 +13,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" - "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/client/interceptor" @@ -21,7 +20,6 @@ import ( rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" utils "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" "github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned/scheme" - "github.com/ray-project/kuberay/ray-operator/pkg/features" ) func TestCreateRayJobSubmitterIfNeed(t *testing.T) { @@ -320,119 +318,6 @@ func TestUpdateRayJobStatus(t *testing.T) { } } -func TestValidateRayJobSpec(t *testing.T) { - err := validateRayJobSpec(&rayv1.RayJob{}) - assert.ErrorContains(t, err, "one of RayClusterSpec or ClusterSelector must be set") - - err = validateRayJobSpec(&rayv1.RayJob{ - Spec: rayv1.RayJobSpec{ - Suspend: true, - ShutdownAfterJobFinishes: false, - }, - }) - assert.ErrorContains(t, err, "a RayJob with shutdownAfterJobFinishes set to false is not allowed to be suspended") - - err = validateRayJobSpec(&rayv1.RayJob{ - Spec: rayv1.RayJobSpec{ - Suspend: true, - ShutdownAfterJobFinishes: true, - RayClusterSpec: &rayv1.RayClusterSpec{}, - }, - }) - assert.NoError(t, err) - - err = validateRayJobSpec(&rayv1.RayJob{ - Spec: rayv1.RayJobSpec{ - Suspend: true, - ShutdownAfterJobFinishes: true, - ClusterSelector: map[string]string{ - "key": "value", - }, - }, - }) - assert.ErrorContains(t, err, "the ClusterSelector mode doesn't support the suspend operation") - - err = validateRayJobSpec(&rayv1.RayJob{ - Spec: rayv1.RayJobSpec{ - RuntimeEnvYAML: "invalid_yaml_str", - RayClusterSpec: &rayv1.RayClusterSpec{}, - }, - }) - assert.ErrorContains(t, err, "failed to unmarshal RuntimeEnvYAML") - - err = validateRayJobSpec(&rayv1.RayJob{ - Spec: rayv1.RayJobSpec{ - BackoffLimit: ptr.To[int32](-1), - RayClusterSpec: &rayv1.RayClusterSpec{}, - }, - }) - assert.ErrorContains(t, err, "backoffLimit must be a positive integer") - - err = validateRayJobSpec(&rayv1.RayJob{ - Spec: rayv1.RayJobSpec{ - DeletionPolicy: ptr.To(rayv1.DeleteClusterDeletionPolicy), - ShutdownAfterJobFinishes: true, - RayClusterSpec: &rayv1.RayClusterSpec{}, - }, - }) - assert.ErrorContains(t, err, "RayJobDeletionPolicy feature gate must be enabled to use the DeletionPolicy feature") - - defer features.SetFeatureGateDuringTest(t, features.RayJobDeletionPolicy, true)() - - err = validateRayJobSpec(&rayv1.RayJob{ - Spec: rayv1.RayJobSpec{ - DeletionPolicy: ptr.To(rayv1.DeleteClusterDeletionPolicy), - ClusterSelector: map[string]string{"key": "value"}, - }, - }) - assert.ErrorContains(t, err, "the ClusterSelector mode doesn't support DeletionPolicy=DeleteCluster") - - err = validateRayJobSpec(&rayv1.RayJob{ - Spec: rayv1.RayJobSpec{ - DeletionPolicy: ptr.To(rayv1.DeleteWorkersDeletionPolicy), - ClusterSelector: map[string]string{"key": "value"}, - }, - }) - assert.ErrorContains(t, err, "the ClusterSelector mode doesn't support DeletionPolicy=DeleteWorkers") - - err = validateRayJobSpec(&rayv1.RayJob{ - Spec: rayv1.RayJobSpec{ - DeletionPolicy: ptr.To(rayv1.DeleteWorkersDeletionPolicy), - RayClusterSpec: &rayv1.RayClusterSpec{ - EnableInTreeAutoscaling: ptr.To[bool](true), - }, - }, - }) - assert.ErrorContains(t, err, "DeletionPolicy=DeleteWorkers currently does not support RayCluster with autoscaling enabled") - - err = validateRayJobSpec(&rayv1.RayJob{ - Spec: rayv1.RayJobSpec{ - DeletionPolicy: ptr.To(rayv1.DeleteClusterDeletionPolicy), - ShutdownAfterJobFinishes: true, - RayClusterSpec: &rayv1.RayClusterSpec{}, - }, - }) - assert.NoError(t, err) - - err = validateRayJobSpec(&rayv1.RayJob{ - Spec: rayv1.RayJobSpec{ - DeletionPolicy: nil, - ShutdownAfterJobFinishes: true, - RayClusterSpec: &rayv1.RayClusterSpec{}, - }, - }) - assert.NoError(t, err) - - err = validateRayJobSpec(&rayv1.RayJob{ - Spec: rayv1.RayJobSpec{ - DeletionPolicy: ptr.To(rayv1.DeleteNoneDeletionPolicy), - ShutdownAfterJobFinishes: true, - RayClusterSpec: &rayv1.RayClusterSpec{}, - }, - }) - assert.ErrorContains(t, err, "shutdownAfterJobFinshes is set to 'true' while deletion policy is 'DeleteNone'") -} - func TestFailedToCreateRayJobSubmitterEvent(t *testing.T) { rayJob := &rayv1.RayJob{ ObjectMeta: metav1.ObjectMeta{ diff --git a/ray-operator/controllers/ray/utils/validation.go b/ray-operator/controllers/ray/utils/validation.go index 9883a894f7..68766e4af1 100644 --- a/ray-operator/controllers/ray/utils/validation.go +++ b/ray-operator/controllers/ray/utils/validation.go @@ -89,3 +89,56 @@ func ValidateRayClusterSpec(instance *rayv1.RayCluster) error { } return nil } + +func ValidateRayJobSpec(rayJob *rayv1.RayJob) error { + // KubeRay has some limitations for the suspend operation. The limitations are a subset of the limitations of + // Kueue (https://kueue.sigs.k8s.io/docs/tasks/run_rayjobs/#c-limitations). For example, KubeRay allows users + // to suspend a RayJob with autoscaling enabled, but Kueue doesn't. + if rayJob.Spec.Suspend && !rayJob.Spec.ShutdownAfterJobFinishes { + return fmt.Errorf("a RayJob with shutdownAfterJobFinishes set to false is not allowed to be suspended") + } + + isClusterSelectorMode := len(rayJob.Spec.ClusterSelector) != 0 + if rayJob.Spec.Suspend && isClusterSelectorMode { + return fmt.Errorf("the ClusterSelector mode doesn't support the suspend operation") + } + if rayJob.Spec.RayClusterSpec == nil && !isClusterSelectorMode { + return fmt.Errorf("one of RayClusterSpec or ClusterSelector must be set") + } + // Validate whether RuntimeEnvYAML is a valid YAML string. Note that this only checks its validity + // as a YAML string, not its adherence to the runtime environment schema. + if _, err := UnmarshalRuntimeEnvYAML(rayJob.Spec.RuntimeEnvYAML); err != nil { + return err + } + if rayJob.Spec.ActiveDeadlineSeconds != nil && *rayJob.Spec.ActiveDeadlineSeconds <= 0 { + return fmt.Errorf("activeDeadlineSeconds must be a positive integer") + } + if rayJob.Spec.BackoffLimit != nil && *rayJob.Spec.BackoffLimit < 0 { + return fmt.Errorf("backoffLimit must be a positive integer") + } + if !features.Enabled(features.RayJobDeletionPolicy) && rayJob.Spec.DeletionPolicy != nil { + return fmt.Errorf("RayJobDeletionPolicy feature gate must be enabled to use the DeletionPolicy feature") + } + + if rayJob.Spec.DeletionPolicy != nil { + policy := *rayJob.Spec.DeletionPolicy + if isClusterSelectorMode { + switch policy { + case rayv1.DeleteClusterDeletionPolicy: + return fmt.Errorf("the ClusterSelector mode doesn't support DeletionPolicy=DeleteCluster") + case rayv1.DeleteWorkersDeletionPolicy: + return fmt.Errorf("the ClusterSelector mode doesn't support DeletionPolicy=DeleteWorkers") + } + } + + if policy == rayv1.DeleteWorkersDeletionPolicy && IsAutoscalingEnabled(rayJob) { + // TODO (rueian): This can be supported in a future Ray version. We should check the RayVersion once we know it. + return fmt.Errorf("DeletionPolicy=DeleteWorkers currently does not support RayCluster with autoscaling enabled") + } + + if rayJob.Spec.ShutdownAfterJobFinishes && policy == rayv1.DeleteNoneDeletionPolicy { + return fmt.Errorf("shutdownAfterJobFinshes is set to 'true' while deletion policy is 'DeleteNone'") + } + } + return nil +} diff --git a/ray-operator/controllers/ray/utils/validation_test.go b/ray-operator/controllers/ray/utils/validation_test.go index 307035d6f3..4b69c38e01 100644 --- a/ray-operator/controllers/ray/utils/validation_test.go +++ b/ray-operator/controllers/ray/utils/validation_test.go @@ -520,3 +520,116 @@ func TestValidateRayClusterSpecSuspendingWorkerGroup(t *testing.T) { }) } } + +func TestValidateRayJobSpec(t *testing.T) { + err := ValidateRayJobSpec(&rayv1.RayJob{}) + assert.ErrorContains(t, err, "one of RayClusterSpec or ClusterSelector must be set") + + err = ValidateRayJobSpec(&rayv1.RayJob{ + Spec: rayv1.RayJobSpec{ + Suspend: true, + ShutdownAfterJobFinishes: false, + }, + }) + assert.ErrorContains(t, err, "a RayJob with shutdownAfterJobFinishes set to false is not allowed to be suspended") + + err = ValidateRayJobSpec(&rayv1.RayJob{ + Spec: rayv1.RayJobSpec{ + Suspend: true, + ShutdownAfterJobFinishes: true, + RayClusterSpec: &rayv1.RayClusterSpec{}, + }, + }) + assert.NoError(t, err) + + err = ValidateRayJobSpec(&rayv1.RayJob{ + Spec: rayv1.RayJobSpec{ + Suspend: true, + ShutdownAfterJobFinishes: true, + ClusterSelector: map[string]string{ + "key": "value", + }, + }, + }) + assert.ErrorContains(t, err, "the ClusterSelector mode doesn't support the suspend operation") + + err = ValidateRayJobSpec(&rayv1.RayJob{ + Spec: rayv1.RayJobSpec{ + RuntimeEnvYAML: "invalid_yaml_str", + RayClusterSpec: &rayv1.RayClusterSpec{}, + }, + }) + assert.ErrorContains(t, err, "failed to unmarshal RuntimeEnvYAML") + + err = ValidateRayJobSpec(&rayv1.RayJob{ + Spec: rayv1.RayJobSpec{ + BackoffLimit: ptr.To[int32](-1), + RayClusterSpec: &rayv1.RayClusterSpec{}, + }, + }) + assert.ErrorContains(t, err, "backoffLimit must be a positive integer") + + err = ValidateRayJobSpec(&rayv1.RayJob{ + Spec: rayv1.RayJobSpec{ + DeletionPolicy: ptr.To(rayv1.DeleteClusterDeletionPolicy), + ShutdownAfterJobFinishes: true, + RayClusterSpec: &rayv1.RayClusterSpec{}, + }, + }) + assert.ErrorContains(t, err, "RayJobDeletionPolicy feature gate must be enabled to use the DeletionPolicy feature") + + defer features.SetFeatureGateDuringTest(t, features.RayJobDeletionPolicy, true)() + + err = ValidateRayJobSpec(&rayv1.RayJob{ + Spec: rayv1.RayJobSpec{ + DeletionPolicy: ptr.To(rayv1.DeleteClusterDeletionPolicy), + ClusterSelector: map[string]string{"key": "value"}, + }, + }) + assert.ErrorContains(t, err, "the ClusterSelector mode doesn't support DeletionPolicy=DeleteCluster") + + err = ValidateRayJobSpec(&rayv1.RayJob{ + Spec: rayv1.RayJobSpec{ + DeletionPolicy: ptr.To(rayv1.DeleteWorkersDeletionPolicy), + ClusterSelector: map[string]string{"key": "value"}, + }, + }) + assert.ErrorContains(t, err, "the ClusterSelector mode doesn't support DeletionPolicy=DeleteWorkers") + + err = ValidateRayJobSpec(&rayv1.RayJob{ + Spec: rayv1.RayJobSpec{ + DeletionPolicy: ptr.To(rayv1.DeleteWorkersDeletionPolicy), + RayClusterSpec: &rayv1.RayClusterSpec{ + EnableInTreeAutoscaling: ptr.To[bool](true), + }, + }, + }) + assert.ErrorContains(t, err, "DeletionPolicy=DeleteWorkers currently does not support RayCluster with autoscaling enabled") + + err = ValidateRayJobSpec(&rayv1.RayJob{ + Spec: rayv1.RayJobSpec{ + DeletionPolicy: ptr.To(rayv1.DeleteClusterDeletionPolicy), + ShutdownAfterJobFinishes: true, + RayClusterSpec: &rayv1.RayClusterSpec{}, + }, + }) + assert.NoError(t, err) + + err = ValidateRayJobSpec(&rayv1.RayJob{ + Spec: rayv1.RayJobSpec{ + DeletionPolicy: nil, + ShutdownAfterJobFinishes: true, + RayClusterSpec: &rayv1.RayClusterSpec{}, + }, + }) + assert.NoError(t, err) + + err = ValidateRayJobSpec(&rayv1.RayJob{ + Spec: rayv1.RayJobSpec{ + DeletionPolicy: ptr.To(rayv1.DeleteNoneDeletionPolicy), + ShutdownAfterJobFinishes: true, + RayClusterSpec: &rayv1.RayClusterSpec{}, + }, + }) + assert.ErrorContains(t, err, "shutdownAfterJobFinshes is set to 'true' while deletion policy is 'DeleteNone'") +}