diff --git a/pkg/controller/jobs/appwrapper/appwrapper_multikueue_adapter.go b/pkg/controller/jobs/appwrapper/appwrapper_multikueue_adapter.go index 2a3a23f101..1911256f28 100644 --- a/pkg/controller/jobs/appwrapper/appwrapper_multikueue_adapter.go +++ b/pkg/controller/jobs/appwrapper/appwrapper_multikueue_adapter.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" @@ -41,6 +42,8 @@ type multikueueAdapter struct{} var _ jobframework.MultiKueueAdapter = (*multikueueAdapter)(nil) func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error { + log := ctrl.LoggerFrom(ctx) + localAppWrapper := awv1beta2.AppWrapper{} err := localClient.Get(ctx, key, &localAppWrapper) if err != nil { @@ -55,6 +58,11 @@ func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Clie // if the remote exists, just copy the status if err == nil { + if localAppWrapper.Spec.Suspend { + // Ensure the appwrapper is unsuspended before updating its status; otherwise, it will fail when patching the spec. + log.V(2).Info("Skipping the sync since the local appwrapper is still suspended") + return nil + } return clientutil.PatchStatus(ctx, localClient, &localAppWrapper, func() (bool, error) { localAppWrapper.Status = remoteAppWrapper.Status return true, nil diff --git a/pkg/controller/jobs/appwrapper/appwrapper_multikueue_adapter_test.go b/pkg/controller/jobs/appwrapper/appwrapper_multikueue_adapter_test.go index 6441c170cf..b9a2559070 100644 --- a/pkg/controller/jobs/appwrapper/appwrapper_multikueue_adapter_test.go +++ b/pkg/controller/jobs/appwrapper/appwrapper_multikueue_adapter_test.go @@ -83,7 +83,7 @@ func TestMultikueueAdapter(t *testing.T) { }, "sync status from remote appwrapper": { managersAppWrappers: []awv1beta2.AppWrapper{ - *baseAppWrapperManagedByKueueBuilder.DeepCopy().Obj(), + *baseAppWrapperManagedByKueueBuilder.DeepCopy().Suspend(false).Obj(), }, workerAppWrappers: []awv1beta2.AppWrapper{ *baseAppWrapperBuilder.DeepCopy(). @@ -98,9 +98,39 @@ func TestMultikueueAdapter(t *testing.T) { wantManagersAppWrappers: []awv1beta2.AppWrapper{ *baseAppWrapperManagedByKueueBuilder.DeepCopy(). + Suspend(false). + SetPhase(awv1beta2.AppWrapperSuspended). + Obj(), + }, + wantWorkerAppWrappers: []awv1beta2.AppWrapper{ + *baseAppWrapperBuilder.DeepCopy(). + Label(constants.PrebuiltWorkloadLabel, "wl1"). + Label(kueue.MultiKueueOriginLabel, "origin1"). + SetPhase(awv1beta2.AppWrapperSuspended). + Obj(), + }, + }, + "status is not synced if local appwrapper is still suspended": { + managersAppWrappers: []awv1beta2.AppWrapper{ + *baseAppWrapperManagedByKueueBuilder.DeepCopy().Suspend(true).Obj(), + }, + workerAppWrappers: []awv1beta2.AppWrapper{ + *baseAppWrapperBuilder.DeepCopy(). + Label(constants.PrebuiltWorkloadLabel, "wl1"). + Label(kueue.MultiKueueOriginLabel, "origin1"). SetPhase(awv1beta2.AppWrapperSuspended). Obj(), }, + operation: func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error { + return adapter.SyncJob(ctx, managerClient, workerClient, types.NamespacedName{Name: "aw1", Namespace: TestNamespace}, "wl1", "origin1") + }, + + wantManagersAppWrappers: []awv1beta2.AppWrapper{ + *baseAppWrapperManagedByKueueBuilder.DeepCopy(). + Suspend(true). + SetPhase(awv1beta2.AppWrapperEmpty). + Obj(), + }, wantWorkerAppWrappers: []awv1beta2.AppWrapper{ *baseAppWrapperBuilder.DeepCopy(). Label(constants.PrebuiltWorkloadLabel, "wl1").