Skip to content

Commit

Permalink
[Refactor][RayService] Add conditions to RayService
Browse files Browse the repository at this point in the history
Closes: #2751
Signed-off-by: Chi-Sheng Liu <[email protected]>
  • Loading branch information
MortalHappiness committed Jan 23, 2025
1 parent 7e72627 commit 599ac4f
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 0 deletions.
4 changes: 4 additions & 0 deletions docs/reference/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ RayService is the Schema for the rayservices API
| `spec` _[RayServiceSpec](#rayservicespec)_ | | | |






#### RayServiceSpec


Expand Down
39 changes: 39 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions ray-operator/apis/ray/v1/rayservice_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ type RayServiceSpec struct {

// RayServiceStatuses defines the observed state of RayService
type RayServiceStatuses struct {
// Represents the latest available observations of a RayService's current state.
// +patchMergeKey=type
// +patchStrategy=merge
// +listType=map
// +listMapKey=type
Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"`
// LastUpdateTime represents the timestamp when the RayService status was last updated.
LastUpdateTime *metav1.Time `json:"lastUpdateTime,omitempty"`
// ServiceStatus indicates the current RayService status.
Expand Down Expand Up @@ -122,6 +128,26 @@ type ServeDeploymentStatus struct {
Message string `json:"message,omitempty"`
}

type (
RayServiceConditionType string
RayServiceConditionReason string
)

const (
// RayServiceReady means users can send requests to the underlying cluster and the number of serve endpoints is greater than 0.
RayServiceReady RayServiceConditionType = "Ready"
// UpgradeInProgress means the RayService is currently performing a zero-downtime upgrade.
UpgradeInProgress RayServiceConditionType = "UpgradeInProgress"
)

const (
RayServiceInitializing RayServiceConditionReason = "Initializing"
ZeroServeEndpoints RayServiceConditionReason = "ZeroServeEndpoints"
NonZeroServeEndpoints RayServiceConditionReason = "NonZeroServeEndpoints"
ActivePendingClustersBothExist RayServiceConditionReason = "ActivePendingClustersBothExist"
NoPendingCluster RayServiceConditionReason = "NoPendingCluster"
)

// +kubebuilder:object:root=true
// +kubebuilder:resource:categories=all
// +kubebuilder:subresource:status
Expand Down
7 changes: 7 additions & 0 deletions ray-operator/apis/ray/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 39 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayservices.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 52 additions & 0 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"math"
"os"
"reflect"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -128,6 +129,13 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque
// TODO (kevin85421): ObservedGeneration should be used to determine whether to update this CR or not.
rayServiceInstance.Status.ObservedGeneration = rayServiceInstance.ObjectMeta.Generation

// Initialize conditions for RayService.
initConditions(rayServiceInstance)
if errStatus := r.Status().Update(ctx, rayServiceInstance); errStatus != nil {
logger.Error(errStatus, "Fail to update initial conditions for RayService")
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, nil
}

// Find active and pending ray cluster objects given current service name.
var activeRayClusterInstance *rayv1.RayCluster
var pendingRayClusterInstance *rayv1.RayCluster
Expand Down Expand Up @@ -272,9 +280,46 @@ func (r *RayServiceReconciler) calculateStatus(ctx context.Context, rayServiceIn
return errstd.New("numServeEndpoints exceeds math.MaxInt32")
}
rayServiceInstance.Status.NumServeEndpoints = int32(numServeEndpoints) //nolint:gosec // This is a false positive from gosec. See https://github.com/securego/gosec/issues/1212 for more details.
calculateConditions(rayServiceInstance)
return nil
}

func initConditions(rayServiceInstance *rayv1.RayService) {
if rayServiceInstance.Status.Conditions == nil {
rayServiceInstance.Status.Conditions = []metav1.Condition{}
}
if len(rayServiceInstance.Status.Conditions) == 0 {
message := "RayService is initializing"
setCondition(rayServiceInstance, rayv1.RayServiceReady, metav1.ConditionFalse, rayv1.RayServiceInitializing, message)
setCondition(rayServiceInstance, rayv1.UpgradeInProgress, metav1.ConditionFalse, rayv1.RayServiceInitializing, message)
}
}

func calculateConditions(rayServiceInstance *rayv1.RayService) {
if rayServiceInstance.Status.NumServeEndpoints > 0 {
setCondition(rayServiceInstance, rayv1.RayServiceReady, metav1.ConditionTrue, rayv1.NonZeroServeEndpoints, "Number of serve endpoints is greater than 0")
} else if meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.RayServiceReady)) {
setCondition(rayServiceInstance, rayv1.RayServiceReady, metav1.ConditionFalse, rayv1.ZeroServeEndpoints, "Number of serve endpoints dropped to 0")
}

if rayServiceInstance.Status.ActiveServiceStatus.RayClusterName != "" && rayServiceInstance.Status.PendingServiceStatus.RayClusterName != "" {
setCondition(rayServiceInstance, rayv1.UpgradeInProgress, metav1.ConditionTrue, rayv1.ActivePendingClustersBothExist, "Both active and pending Ray clusters exist")
} else if rayServiceInstance.Status.PendingServiceStatus.RayClusterName == "" {
setCondition(rayServiceInstance, rayv1.UpgradeInProgress, metav1.ConditionFalse, rayv1.NoPendingCluster, "No pending Ray cluster exists")
}
}

func setCondition(rayServiceInstance *rayv1.RayService, conditionType rayv1.RayServiceConditionType, status metav1.ConditionStatus, reason rayv1.RayServiceConditionReason, message string) {
condition := metav1.Condition{
Type: string(conditionType),
Status: status,
Reason: string(reason),
Message: message,
ObservedGeneration: rayServiceInstance.Status.ObservedGeneration,
}
meta.SetStatusCondition(&rayServiceInstance.Status.Conditions, condition)
}

// Checks whether the old and new RayServiceStatus are inconsistent by comparing different fields.
// If the only difference between the old and new status is the HealthLastUpdateTime field,
// the status update will not be triggered.
Expand Down Expand Up @@ -343,6 +388,11 @@ func inconsistentRayServiceStatuses(ctx context.Context, oldStatus rayv1.RayServ
return true
}

if !reflect.DeepEqual(oldStatus.Conditions, newStatus.Conditions) {
logger.Info("inconsistentRayServiceStatus RayService Conditions changed")
return true
}

if inconsistentRayServiceStatus(ctx, oldStatus.ActiveServiceStatus, newStatus.ActiveServiceStatus) {
logger.Info("inconsistentRayServiceStatus RayService ActiveServiceStatus changed")
return true
Expand Down Expand Up @@ -988,6 +1038,8 @@ func markPreparingNewCluster(rayServiceInstance *rayv1.RayService) {
rayServiceInstance.Status.PendingServiceStatus = rayv1.RayServiceStatus{
RayClusterName: utils.GenerateRayClusterName(rayServiceInstance.Name),
}
// TODO(MortalHappiness): Don't calculate conditions here. Try to refactor this. Status calculation should be done in single place.
calculateConditions(rayServiceInstance)
}

func promotePendingClusterToActiveCluster(ctx context.Context, rayServiceInstance *rayv1.RayService) {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 599ac4f

Please sign in to comment.