Skip to content

Commit

Permalink
review comment: don't upsync status while local job is still suspended
Browse files Browse the repository at this point in the history
  • Loading branch information
dgrove-oss committed Jan 17, 2025
1 parent 7e80941 commit df72363
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
Expand All @@ -41,6 +42,8 @@ type multikueueAdapter struct{}
var _ jobframework.MultiKueueAdapter = (*multikueueAdapter)(nil)

func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error {
log := ctrl.LoggerFrom(ctx)

localAppWrapper := awv1beta2.AppWrapper{}
err := localClient.Get(ctx, key, &localAppWrapper)
if err != nil {
Expand All @@ -55,6 +58,11 @@ func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Clie

// if the remote exists, just copy the status
if err == nil {
if localAppWrapper.Spec.Suspend {
// Ensure the appwrapper is unsuspended before updating its status; otherwise, it will fail when patching the spec.
log.V(2).Info("Skipping the sync since the local appwrapper is still suspended")
return nil
}
return clientutil.PatchStatus(ctx, localClient, &localAppWrapper, func() (bool, error) {
localAppWrapper.Status = remoteAppWrapper.Status
return true, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestMultikueueAdapter(t *testing.T) {
},
"sync status from remote appwrapper": {
managersAppWrappers: []awv1beta2.AppWrapper{
*baseAppWrapperManagedByKueueBuilder.DeepCopy().Obj(),
*baseAppWrapperManagedByKueueBuilder.DeepCopy().Suspend(false).Obj(),
},
workerAppWrappers: []awv1beta2.AppWrapper{
*baseAppWrapperBuilder.DeepCopy().
Expand All @@ -98,9 +98,39 @@ func TestMultikueueAdapter(t *testing.T) {

wantManagersAppWrappers: []awv1beta2.AppWrapper{
*baseAppWrapperManagedByKueueBuilder.DeepCopy().
Suspend(false).
SetPhase(awv1beta2.AppWrapperSuspended).
Obj(),
},
wantWorkerAppWrappers: []awv1beta2.AppWrapper{
*baseAppWrapperBuilder.DeepCopy().
Label(constants.PrebuiltWorkloadLabel, "wl1").
Label(kueue.MultiKueueOriginLabel, "origin1").
SetPhase(awv1beta2.AppWrapperSuspended).
Obj(),
},
},
"status is not synced if local appwrapper is still suspended": {
managersAppWrappers: []awv1beta2.AppWrapper{
*baseAppWrapperManagedByKueueBuilder.DeepCopy().Suspend(true).Obj(),
},
workerAppWrappers: []awv1beta2.AppWrapper{
*baseAppWrapperBuilder.DeepCopy().
Label(constants.PrebuiltWorkloadLabel, "wl1").
Label(kueue.MultiKueueOriginLabel, "origin1").
SetPhase(awv1beta2.AppWrapperSuspended).
Obj(),
},
operation: func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error {
return adapter.SyncJob(ctx, managerClient, workerClient, types.NamespacedName{Name: "aw1", Namespace: TestNamespace}, "wl1", "origin1")
},

wantManagersAppWrappers: []awv1beta2.AppWrapper{
*baseAppWrapperManagedByKueueBuilder.DeepCopy().
Suspend(true).
SetPhase(awv1beta2.AppWrapperEmpty).
Obj(),
},
wantWorkerAppWrappers: []awv1beta2.AppWrapper{
*baseAppWrapperBuilder.DeepCopy().
Label(constants.PrebuiltWorkloadLabel, "wl1").
Expand Down

0 comments on commit df72363

Please sign in to comment.