Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor] Move function ValidateRayJobSpec to validation.go and its unit test #2812

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 1 addition & 54 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}

if err := validateRayJobSpec(rayJobInstance); err != nil {
if err := utils.ValidateRayJobSpec(rayJobInstance); err != nil {
logger.Error(err, "The RayJob spec is invalid")
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeWarning, string(utils.InvalidRayJobSpec),
"The RayJob spec is invalid %s/%s: %v", rayJobInstance.Namespace, rayJobInstance.Name, err)
Expand Down Expand Up @@ -875,59 +875,6 @@ func checkActiveDeadlineAndUpdateStatusIfNeeded(ctx context.Context, rayJob *ray
return true
}

func validateRayJobSpec(rayJob *rayv1.RayJob) error {
// KubeRay has some limitations for the suspend operation. The limitations are a subset of the limitations of
// Kueue (https://kueue.sigs.k8s.io/docs/tasks/run_rayjobs/#c-limitations). For example, KubeRay allows users
// to suspend a RayJob with autoscaling enabled, but Kueue doesn't.
if rayJob.Spec.Suspend && !rayJob.Spec.ShutdownAfterJobFinishes {
return fmt.Errorf("a RayJob with shutdownAfterJobFinishes set to false is not allowed to be suspended")
}

isClusterSelectorMode := len(rayJob.Spec.ClusterSelector) != 0
if rayJob.Spec.Suspend && isClusterSelectorMode {
return fmt.Errorf("the ClusterSelector mode doesn't support the suspend operation")
}
if rayJob.Spec.RayClusterSpec == nil && !isClusterSelectorMode {
return fmt.Errorf("one of RayClusterSpec or ClusterSelector must be set")
}
// Validate whether RuntimeEnvYAML is a valid YAML string. Note that this only checks its validity
// as a YAML string, not its adherence to the runtime environment schema.
if _, err := utils.UnmarshalRuntimeEnvYAML(rayJob.Spec.RuntimeEnvYAML); err != nil {
return err
}
if rayJob.Spec.ActiveDeadlineSeconds != nil && *rayJob.Spec.ActiveDeadlineSeconds <= 0 {
return fmt.Errorf("activeDeadlineSeconds must be a positive integer")
}
if rayJob.Spec.BackoffLimit != nil && *rayJob.Spec.BackoffLimit < 0 {
return fmt.Errorf("backoffLimit must be a positive integer")
}
if !features.Enabled(features.RayJobDeletionPolicy) && rayJob.Spec.DeletionPolicy != nil {
return fmt.Errorf("RayJobDeletionPolicy feature gate must be enabled to use the DeletionPolicy feature")
}

if rayJob.Spec.DeletionPolicy != nil {
policy := *rayJob.Spec.DeletionPolicy
if isClusterSelectorMode {
switch policy {
case rayv1.DeleteClusterDeletionPolicy:
return fmt.Errorf("the ClusterSelector mode doesn't support DeletionPolicy=DeleteCluster")
case rayv1.DeleteWorkersDeletionPolicy:
return fmt.Errorf("the ClusterSelector mode doesn't support DeletionPolicy=DeleteWorkers")
}
}

if policy == rayv1.DeleteWorkersDeletionPolicy && utils.IsAutoscalingEnabled(rayJob) {
// TODO (rueian): This can be supported in a future Ray version. We should check the RayVersion once we know it.
return fmt.Errorf("DeletionPolicy=DeleteWorkers currently does not support RayCluster with autoscaling enabled")
}

if rayJob.Spec.ShutdownAfterJobFinishes && policy == rayv1.DeleteNoneDeletionPolicy {
return fmt.Errorf("shutdownAfterJobFinshes is set to 'true' while deletion policy is 'DeleteNone'")
}
}
return nil
}

func validateRayJobStatus(rayJob *rayv1.RayJob) error {
if rayJob.Status.JobDeploymentStatus == rayv1.JobDeploymentStatusWaiting && rayJob.Spec.SubmissionMode != rayv1.InteractiveMode {
return fmt.Errorf("invalid RayJob State: JobDeploymentStatus cannot be `Waiting` when SubmissionMode is not InteractiveMode")
Expand Down
115 changes: 0 additions & 115 deletions ray-operator/controllers/ray/rayjob_controller_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
utils "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
"github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned/scheme"
"github.com/ray-project/kuberay/ray-operator/pkg/features"
)

func TestCreateRayJobSubmitterIfNeed(t *testing.T) {
Expand Down Expand Up @@ -320,119 +318,6 @@ func TestUpdateRayJobStatus(t *testing.T) {
}
}

func TestValidateRayJobSpec(t *testing.T) {
err := validateRayJobSpec(&rayv1.RayJob{})
assert.ErrorContains(t, err, "one of RayClusterSpec or ClusterSelector must be set")

err = validateRayJobSpec(&rayv1.RayJob{
Spec: rayv1.RayJobSpec{
Suspend: true,
ShutdownAfterJobFinishes: false,
},
})
assert.ErrorContains(t, err, "a RayJob with shutdownAfterJobFinishes set to false is not allowed to be suspended")

err = validateRayJobSpec(&rayv1.RayJob{
Spec: rayv1.RayJobSpec{
Suspend: true,
ShutdownAfterJobFinishes: true,
RayClusterSpec: &rayv1.RayClusterSpec{},
},
})
assert.NoError(t, err)

err = validateRayJobSpec(&rayv1.RayJob{
Spec: rayv1.RayJobSpec{
Suspend: true,
ShutdownAfterJobFinishes: true,
ClusterSelector: map[string]string{
"key": "value",
},
},
})
assert.ErrorContains(t, err, "the ClusterSelector mode doesn't support the suspend operation")

err = validateRayJobSpec(&rayv1.RayJob{
Spec: rayv1.RayJobSpec{
RuntimeEnvYAML: "invalid_yaml_str",
RayClusterSpec: &rayv1.RayClusterSpec{},
},
})
assert.ErrorContains(t, err, "failed to unmarshal RuntimeEnvYAML")

err = validateRayJobSpec(&rayv1.RayJob{
Spec: rayv1.RayJobSpec{
BackoffLimit: ptr.To[int32](-1),
RayClusterSpec: &rayv1.RayClusterSpec{},
},
})
assert.ErrorContains(t, err, "backoffLimit must be a positive integer")

err = validateRayJobSpec(&rayv1.RayJob{
Spec: rayv1.RayJobSpec{
DeletionPolicy: ptr.To(rayv1.DeleteClusterDeletionPolicy),
ShutdownAfterJobFinishes: true,
RayClusterSpec: &rayv1.RayClusterSpec{},
},
})
assert.ErrorContains(t, err, "RayJobDeletionPolicy feature gate must be enabled to use the DeletionPolicy feature")

defer features.SetFeatureGateDuringTest(t, features.RayJobDeletionPolicy, true)()

err = validateRayJobSpec(&rayv1.RayJob{
Spec: rayv1.RayJobSpec{
DeletionPolicy: ptr.To(rayv1.DeleteClusterDeletionPolicy),
ClusterSelector: map[string]string{"key": "value"},
},
})
assert.ErrorContains(t, err, "the ClusterSelector mode doesn't support DeletionPolicy=DeleteCluster")

err = validateRayJobSpec(&rayv1.RayJob{
Spec: rayv1.RayJobSpec{
DeletionPolicy: ptr.To(rayv1.DeleteWorkersDeletionPolicy),
ClusterSelector: map[string]string{"key": "value"},
},
})
assert.ErrorContains(t, err, "the ClusterSelector mode doesn't support DeletionPolicy=DeleteWorkers")

err = validateRayJobSpec(&rayv1.RayJob{
Spec: rayv1.RayJobSpec{
DeletionPolicy: ptr.To(rayv1.DeleteWorkersDeletionPolicy),
RayClusterSpec: &rayv1.RayClusterSpec{
EnableInTreeAutoscaling: ptr.To[bool](true),
},
},
})
assert.ErrorContains(t, err, "DeletionPolicy=DeleteWorkers currently does not support RayCluster with autoscaling enabled")

err = validateRayJobSpec(&rayv1.RayJob{
Spec: rayv1.RayJobSpec{
DeletionPolicy: ptr.To(rayv1.DeleteClusterDeletionPolicy),
ShutdownAfterJobFinishes: true,
RayClusterSpec: &rayv1.RayClusterSpec{},
},
})
assert.NoError(t, err)

err = validateRayJobSpec(&rayv1.RayJob{
Spec: rayv1.RayJobSpec{
DeletionPolicy: nil,
ShutdownAfterJobFinishes: true,
RayClusterSpec: &rayv1.RayClusterSpec{},
},
})
assert.NoError(t, err)

err = validateRayJobSpec(&rayv1.RayJob{
Spec: rayv1.RayJobSpec{
DeletionPolicy: ptr.To(rayv1.DeleteNoneDeletionPolicy),
ShutdownAfterJobFinishes: true,
RayClusterSpec: &rayv1.RayClusterSpec{},
},
})
assert.ErrorContains(t, err, "shutdownAfterJobFinshes is set to 'true' while deletion policy is 'DeleteNone'")
}

func TestFailedToCreateRayJobSubmitterEvent(t *testing.T) {
rayJob := &rayv1.RayJob{
ObjectMeta: metav1.ObjectMeta{
Expand Down
53 changes: 53 additions & 0 deletions ray-operator/controllers/ray/utils/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,56 @@ func ValidateRayClusterSpec(instance *rayv1.RayCluster) error {
}
return nil
}

func ValidateRayJobSpec(rayJob *rayv1.RayJob) error {
// KubeRay has some limitations for the suspend operation. The limitations are a subset of the limitations of
// Kueue (https://kueue.sigs.k8s.io/docs/tasks/run_rayjobs/#c-limitations). For example, KubeRay allows users
// to suspend a RayJob with autoscaling enabled, but Kueue doesn't.
if rayJob.Spec.Suspend && !rayJob.Spec.ShutdownAfterJobFinishes {
return fmt.Errorf("a RayJob with shutdownAfterJobFinishes set to false is not allowed to be suspended")
}

isClusterSelectorMode := len(rayJob.Spec.ClusterSelector) != 0
if rayJob.Spec.Suspend && isClusterSelectorMode {
return fmt.Errorf("the ClusterSelector mode doesn't support the suspend operation")
}
if rayJob.Spec.RayClusterSpec == nil && !isClusterSelectorMode {
return fmt.Errorf("one of RayClusterSpec or ClusterSelector must be set")
}
// Validate whether RuntimeEnvYAML is a valid YAML string. Note that this only checks its validity
// as a YAML string, not its adherence to the runtime environment schema.
if _, err := UnmarshalRuntimeEnvYAML(rayJob.Spec.RuntimeEnvYAML); err != nil {
return err
}
if rayJob.Spec.ActiveDeadlineSeconds != nil && *rayJob.Spec.ActiveDeadlineSeconds <= 0 {
return fmt.Errorf("activeDeadlineSeconds must be a positive integer")
}
if rayJob.Spec.BackoffLimit != nil && *rayJob.Spec.BackoffLimit < 0 {
return fmt.Errorf("backoffLimit must be a positive integer")
}
if !features.Enabled(features.RayJobDeletionPolicy) && rayJob.Spec.DeletionPolicy != nil {
return fmt.Errorf("RayJobDeletionPolicy feature gate must be enabled to use the DeletionPolicy feature")
}

if rayJob.Spec.DeletionPolicy != nil {
policy := *rayJob.Spec.DeletionPolicy
if isClusterSelectorMode {
switch policy {
case rayv1.DeleteClusterDeletionPolicy:
return fmt.Errorf("the ClusterSelector mode doesn't support DeletionPolicy=DeleteCluster")
case rayv1.DeleteWorkersDeletionPolicy:
return fmt.Errorf("the ClusterSelector mode doesn't support DeletionPolicy=DeleteWorkers")
}
}

if policy == rayv1.DeleteWorkersDeletionPolicy && IsAutoscalingEnabled(rayJob) {
// TODO (rueian): This can be supported in a future Ray version. We should check the RayVersion once we know it.
return fmt.Errorf("DeletionPolicy=DeleteWorkers currently does not support RayCluster with autoscaling enabled")
}

if rayJob.Spec.ShutdownAfterJobFinishes && policy == rayv1.DeleteNoneDeletionPolicy {
return fmt.Errorf("shutdownAfterJobFinshes is set to 'true' while deletion policy is 'DeleteNone'")
}
}
return nil
}
113 changes: 113 additions & 0 deletions ray-operator/controllers/ray/utils/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,3 +520,116 @@ func TestValidateRayClusterSpecSuspendingWorkerGroup(t *testing.T) {
})
}
}

func TestValidateRayJobSpec(t *testing.T) {
err := ValidateRayJobSpec(&rayv1.RayJob{})
assert.ErrorContains(t, err, "one of RayClusterSpec or ClusterSelector must be set")

err = ValidateRayJobSpec(&rayv1.RayJob{
Spec: rayv1.RayJobSpec{
Suspend: true,
ShutdownAfterJobFinishes: false,
},
})
assert.ErrorContains(t, err, "a RayJob with shutdownAfterJobFinishes set to false is not allowed to be suspended")

err = ValidateRayJobSpec(&rayv1.RayJob{
Spec: rayv1.RayJobSpec{
Suspend: true,
ShutdownAfterJobFinishes: true,
RayClusterSpec: &rayv1.RayClusterSpec{},
},
})
assert.NoError(t, err)

err = ValidateRayJobSpec(&rayv1.RayJob{
Spec: rayv1.RayJobSpec{
Suspend: true,
ShutdownAfterJobFinishes: true,
ClusterSelector: map[string]string{
"key": "value",
},
},
})
assert.ErrorContains(t, err, "the ClusterSelector mode doesn't support the suspend operation")

err = ValidateRayJobSpec(&rayv1.RayJob{
Spec: rayv1.RayJobSpec{
RuntimeEnvYAML: "invalid_yaml_str",
RayClusterSpec: &rayv1.RayClusterSpec{},
},
})
assert.ErrorContains(t, err, "failed to unmarshal RuntimeEnvYAML")

err = ValidateRayJobSpec(&rayv1.RayJob{
Spec: rayv1.RayJobSpec{
BackoffLimit: ptr.To[int32](-1),
RayClusterSpec: &rayv1.RayClusterSpec{},
},
})
assert.ErrorContains(t, err, "backoffLimit must be a positive integer")

err = ValidateRayJobSpec(&rayv1.RayJob{
Spec: rayv1.RayJobSpec{
DeletionPolicy: ptr.To(rayv1.DeleteClusterDeletionPolicy),
ShutdownAfterJobFinishes: true,
RayClusterSpec: &rayv1.RayClusterSpec{},
},
})
assert.ErrorContains(t, err, "RayJobDeletionPolicy feature gate must be enabled to use the DeletionPolicy feature")

defer features.SetFeatureGateDuringTest(t, features.RayJobDeletionPolicy, true)()

err = ValidateRayJobSpec(&rayv1.RayJob{
Spec: rayv1.RayJobSpec{
DeletionPolicy: ptr.To(rayv1.DeleteClusterDeletionPolicy),
ClusterSelector: map[string]string{"key": "value"},
},
})
assert.ErrorContains(t, err, "the ClusterSelector mode doesn't support DeletionPolicy=DeleteCluster")

err = ValidateRayJobSpec(&rayv1.RayJob{
Spec: rayv1.RayJobSpec{
DeletionPolicy: ptr.To(rayv1.DeleteWorkersDeletionPolicy),
ClusterSelector: map[string]string{"key": "value"},
},
})
assert.ErrorContains(t, err, "the ClusterSelector mode doesn't support DeletionPolicy=DeleteWorkers")

err = ValidateRayJobSpec(&rayv1.RayJob{
Spec: rayv1.RayJobSpec{
DeletionPolicy: ptr.To(rayv1.DeleteWorkersDeletionPolicy),
RayClusterSpec: &rayv1.RayClusterSpec{
EnableInTreeAutoscaling: ptr.To[bool](true),
},
},
})
assert.ErrorContains(t, err, "DeletionPolicy=DeleteWorkers currently does not support RayCluster with autoscaling enabled")

err = ValidateRayJobSpec(&rayv1.RayJob{
Spec: rayv1.RayJobSpec{
DeletionPolicy: ptr.To(rayv1.DeleteClusterDeletionPolicy),
ShutdownAfterJobFinishes: true,
RayClusterSpec: &rayv1.RayClusterSpec{},
},
})
assert.NoError(t, err)

err = ValidateRayJobSpec(&rayv1.RayJob{
Spec: rayv1.RayJobSpec{
DeletionPolicy: nil,
ShutdownAfterJobFinishes: true,
RayClusterSpec: &rayv1.RayClusterSpec{},
},
})
assert.NoError(t, err)

err = ValidateRayJobSpec(&rayv1.RayJob{
Spec: rayv1.RayJobSpec{
DeletionPolicy: ptr.To(rayv1.DeleteNoneDeletionPolicy),
ShutdownAfterJobFinishes: true,
RayClusterSpec: &rayv1.RayClusterSpec{},
},
})
assert.ErrorContains(t, err, "shutdownAfterJobFinshes is set to 'true' while deletion policy is 'DeleteNone'")
}
Loading