Skip to content

Commit

Permalink
fix: unify nodeclass status and termination controllers to prevent ra… (
Browse files Browse the repository at this point in the history
  • Loading branch information
saurav-agarwalla authored and edibble21 committed Jan 22, 2025
1 parent 5499554 commit 1de3a8a
Show file tree
Hide file tree
Showing 23 changed files with 280 additions and 470 deletions.
8 changes: 4 additions & 4 deletions pkg/cloudprovider/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
"github.com/aws/karpenter-provider-aws/pkg/apis"
v1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1"
"github.com/aws/karpenter-provider-aws/pkg/cloudprovider"
"github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclass/status"
"github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclass"
"github.com/aws/karpenter-provider-aws/pkg/fake"
"github.com/aws/karpenter-provider-aws/pkg/operator/options"
"github.com/aws/karpenter-provider-aws/pkg/test"
Expand Down Expand Up @@ -1156,7 +1156,7 @@ var _ = Describe("CloudProvider", func() {
{SubnetId: aws.String("test-subnet-2"), AvailabilityZone: aws.String("test-zone-1a"), AvailabilityZoneId: aws.String("tstz1-1a"), AvailableIpAddressCount: aws.Int32(100),
Tags: []ec2types.Tag{{Key: aws.String("Name"), Value: aws.String("test-subnet-2")}}},
}})
controller := status.NewController(env.Client, awsEnv.SubnetProvider, awsEnv.SecurityGroupProvider, awsEnv.AMIProvider, awsEnv.InstanceProfileProvider, awsEnv.LaunchTemplateProvider, *cloudProvider, awsEnv.InstanceProvider)
controller := nodeclass.NewController(env.Client, recorder, awsEnv.SubnetProvider, awsEnv.SecurityGroupProvider, awsEnv.AMIProvider, awsEnv.InstanceProfileProvider, awsEnv.LaunchTemplateProvider)
ExpectApplied(ctx, env.Client, nodePool, nodeClass)
ExpectObjectReconciled(ctx, env.Client, controller, nodeClass)
pod := coretest.UnschedulablePod(coretest.PodOptions{NodeSelector: map[string]string{corev1.LabelTopologyZone: "test-zone-1a"}})
Expand All @@ -1173,7 +1173,7 @@ var _ = Describe("CloudProvider", func() {
{SubnetId: aws.String("test-subnet-2"), AvailabilityZone: aws.String("test-zone-1a"), AvailabilityZoneId: aws.String("tstz1-1a"), AvailableIpAddressCount: aws.Int32(11),
Tags: []ec2types.Tag{{Key: aws.String("Name"), Value: aws.String("test-subnet-2")}}},
}})
controller := status.NewController(env.Client, awsEnv.SubnetProvider, awsEnv.SecurityGroupProvider, awsEnv.AMIProvider, awsEnv.InstanceProfileProvider, awsEnv.LaunchTemplateProvider, *cloudProvider, awsEnv.InstanceProvider)
controller := nodeclass.NewController(env.Client, recorder, awsEnv.SubnetProvider, awsEnv.SecurityGroupProvider, awsEnv.AMIProvider, awsEnv.InstanceProfileProvider, awsEnv.LaunchTemplateProvider)
nodeClass.Spec.Kubelet = &v1.KubeletConfiguration{
MaxPods: aws.Int32(1),
}
Expand Down Expand Up @@ -1214,7 +1214,7 @@ var _ = Describe("CloudProvider", func() {
}})
nodeClass.Spec.SubnetSelectorTerms = []v1.SubnetSelectorTerm{{Tags: map[string]string{"Name": "test-subnet-1"}}}
ExpectApplied(ctx, env.Client, nodePool, nodeClass)
controller := status.NewController(env.Client, awsEnv.SubnetProvider, awsEnv.SecurityGroupProvider, awsEnv.AMIProvider, awsEnv.InstanceProfileProvider, awsEnv.LaunchTemplateProvider, *cloudProvider, awsEnv.InstanceProvider)
controller := nodeclass.NewController(env.Client, recorder, awsEnv.SubnetProvider, awsEnv.SecurityGroupProvider, awsEnv.AMIProvider, awsEnv.InstanceProfileProvider, awsEnv.LaunchTemplateProvider)
ExpectObjectReconciled(ctx, env.Client, controller, nodeClass)
podSubnet1 := coretest.UnschedulablePod()
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, podSubnet1)
Expand Down
9 changes: 2 additions & 7 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ import (
"github.com/aws/aws-sdk-go-v2/aws"

v1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1"
nodeclass "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclass"
nodeclasshash "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclass/hash"
nodeclassstatus "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclass/status"
nodeclasstermination "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclass/termination"
controllersinstancetype "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/instancetype"
controllersinstancetypecapacity "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/instancetype/capacity"
controllerspricing "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/pricing"
Expand All @@ -41,14 +40,12 @@ import (
servicesqs "github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/client"

"sigs.k8s.io/karpenter/pkg/events"

awscache "github.com/aws/karpenter-provider-aws/pkg/cache"
provcloudprovider "github.com/aws/karpenter-provider-aws/pkg/cloudprovider"
"github.com/aws/karpenter-provider-aws/pkg/controllers/interruption"
nodeclaimgarbagecollection "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclaim/garbagecollection"
nodeclaimtagging "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclaim/tagging"
Expand Down Expand Up @@ -84,9 +81,7 @@ func NewControllers(
instanceTypeProvider *instancetype.DefaultProvider) []controller.Controller {
controllers := []controller.Controller{
nodeclasshash.NewController(kubeClient),
nodeclassstatus.NewController(kubeClient, subnetProvider, securityGroupProvider, amiProvider, instanceProfileProvider, launchTemplateProvider, *provcloudprovider.New(instanceTypeProvider, instanceProvider, events.NewRecorder(&record.FakeRecorder{}),
kubeClient, amiProvider, securityGroupProvider), instanceProvider),
nodeclasstermination.NewController(kubeClient, recorder, instanceProfileProvider, launchTemplateProvider),
nodeclass.NewController(kubeClient, recorder, subnetProvider, securityGroupProvider, amiProvider, instanceProfileProvider, launchTemplateProvider),
nodeclaimgarbagecollection.NewController(kubeClient, cloudProvider),
nodeclaimtagging.NewController(kubeClient, cloudProvider, instanceProvider),
controllerspricing.NewController(pricingProvider),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package status
package nodeclass

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package status_test
package nodeclass_test

import (
"fmt"
Expand Down Expand Up @@ -132,7 +132,7 @@ var _ = Describe("NodeClass AMI Status Controller", func() {
}
nodeClass.Spec.AMISelectorTerms = []v1.AMISelectorTerm{{Alias: "al2023@latest"}}
ExpectApplied(ctx, env.Client, nodeClass)
ExpectObjectReconciled(ctx, env.Client, statusController, nodeClass)
ExpectObjectReconciled(ctx, env.Client, controller, nodeClass)
nodeClass = ExpectExists(ctx, env.Client, nodeClass)

Expect(len(nodeClass.Status.AMIs)).To(Equal(4))
Expand Down Expand Up @@ -216,7 +216,7 @@ var _ = Describe("NodeClass AMI Status Controller", func() {
}
nodeClass.Spec.AMISelectorTerms = []v1.AMISelectorTerm{{Alias: "al2@latest"}}
ExpectApplied(ctx, env.Client, nodeClass)
ExpectObjectReconciled(ctx, env.Client, statusController, nodeClass)
ExpectObjectReconciled(ctx, env.Client, controller, nodeClass)
nodeClass = ExpectExists(ctx, env.Client, nodeClass)

Expect(len(nodeClass.Status.AMIs)).To(Equal(4))
Expand Down Expand Up @@ -302,7 +302,7 @@ var _ = Describe("NodeClass AMI Status Controller", func() {
}
nodeClass.Spec.AMISelectorTerms = []v1.AMISelectorTerm{{Alias: "bottlerocket@latest"}}
ExpectApplied(ctx, env.Client, nodeClass)
ExpectObjectReconciled(ctx, env.Client, statusController, nodeClass)
ExpectObjectReconciled(ctx, env.Client, controller, nodeClass)
nodeClass = ExpectExists(ctx, env.Client, nodeClass)

Expect(len(nodeClass.Status.AMIs)).To(Equal(4))
Expand Down Expand Up @@ -384,7 +384,7 @@ var _ = Describe("NodeClass AMI Status Controller", func() {
}
nodeClass.Spec.AMISelectorTerms = []v1.AMISelectorTerm{{Alias: "windows2019@latest"}}
ExpectApplied(ctx, env.Client, nodeClass)
ExpectObjectReconciled(ctx, env.Client, statusController, nodeClass)
ExpectObjectReconciled(ctx, env.Client, controller, nodeClass)
nodeClass = ExpectExists(ctx, env.Client, nodeClass)

Expect(len(nodeClass.Status.AMIs)).To(Equal(1))
Expand Down Expand Up @@ -419,7 +419,7 @@ var _ = Describe("NodeClass AMI Status Controller", func() {
}
nodeClass.Spec.AMISelectorTerms = []v1.AMISelectorTerm{{Alias: "windows2022@latest"}}
ExpectApplied(ctx, env.Client, nodeClass)
ExpectObjectReconciled(ctx, env.Client, statusController, nodeClass)
ExpectObjectReconciled(ctx, env.Client, controller, nodeClass)
nodeClass = ExpectExists(ctx, env.Client, nodeClass)

Expect(len(nodeClass.Status.AMIs)).To(Equal(1))
Expand Down Expand Up @@ -459,7 +459,7 @@ var _ = Describe("NodeClass AMI Status Controller", func() {
Alias: "bottlerocket@latest",
}}
ExpectApplied(ctx, env.Client, nodeClass)
ExpectObjectReconciled(ctx, env.Client, statusController, nodeClass)
ExpectObjectReconciled(ctx, env.Client, controller, nodeClass)
nodeClass = ExpectExists(ctx, env.Client, nodeClass)

Expect(len(nodeClass.Status.AMIs)).To(Equal(2))
Expand Down Expand Up @@ -510,7 +510,7 @@ var _ = Describe("NodeClass AMI Status Controller", func() {
Tags: map[string]string{"Name": "amd64-standard"},
}}
ExpectApplied(ctx, env.Client, nodeClass)
ExpectObjectReconciled(ctx, env.Client, statusController, nodeClass)
ExpectObjectReconciled(ctx, env.Client, controller, nodeClass)
nodeClass = ExpectExists(ctx, env.Client, nodeClass)
Expect(nodeClass.Status.AMIs).To(Equal(
[]v1.AMI{
Expand All @@ -530,7 +530,7 @@ var _ = Describe("NodeClass AMI Status Controller", func() {
It("should get error when resolving AMIs and have status condition set to false", func() {
awsEnv.EC2API.NextError.Set(fmt.Errorf("unable to resolve AMI"))
ExpectApplied(ctx, env.Client, nodeClass)
_ = ExpectObjectReconcileFailed(ctx, env.Client, statusController, nodeClass)
_ = ExpectObjectReconcileFailed(ctx, env.Client, controller, nodeClass)
nodeClass = ExpectExists(ctx, env.Client, nodeClass)
Expect(nodeClass.StatusConditions().IsTrue(v1.ConditionTypeAMIsReady)).To(BeFalse())
})
Expand Down Expand Up @@ -571,7 +571,7 @@ var _ = Describe("NodeClass AMI Status Controller", func() {
})
It("should update nodeclass AMI status with correct deprecation value and conditions", func() {
ExpectApplied(ctx, env.Client, nodeClass)
ExpectObjectReconciled(ctx, env.Client, statusController, nodeClass)
ExpectObjectReconciled(ctx, env.Client, controller, nodeClass)
nodeClass = ExpectExists(ctx, env.Client, nodeClass)

Expect(len(nodeClass.Status.AMIs)).To(Equal(2))
Expand Down Expand Up @@ -609,7 +609,7 @@ var _ = Describe("NodeClass AMI Status Controller", func() {
// Flush Cache
awsEnv.EC2Cache.Flush()

ExpectObjectReconciled(ctx, env.Client, statusController, nodeClass)
ExpectObjectReconciled(ctx, env.Client, controller, nodeClass)
nodeClass = ExpectExists(ctx, env.Client, nodeClass)
Expect(len(nodeClass.Status.AMIs)).To(Equal(2))
Expect(nodeClass.Status.AMIs).To(Equal(
Expand Down Expand Up @@ -646,7 +646,7 @@ var _ = Describe("NodeClass AMI Status Controller", func() {

// Initial reconcile discovers AMIs which are deprecated
ExpectApplied(ctx, env.Client, nodeClass)
ExpectObjectReconciled(ctx, env.Client, statusController, nodeClass)
ExpectObjectReconciled(ctx, env.Client, controller, nodeClass)
nodeClass = ExpectExists(ctx, env.Client, nodeClass)

Expect(len(nodeClass.Status.AMIs)).To(Equal(2))
Expand Down Expand Up @@ -709,7 +709,7 @@ var _ = Describe("NodeClass AMI Status Controller", func() {
awsEnv.EC2Cache.Flush()

ExpectApplied(ctx, env.Client, nodeClass)
ExpectObjectReconciled(ctx, env.Client, statusController, nodeClass)
ExpectObjectReconciled(ctx, env.Client, controller, nodeClass)
nodeClass = ExpectExists(ctx, env.Client, nodeClass)

Expect(len(nodeClass.Status.AMIs)).To(Equal(2))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package termination
package nodeclass

import (
"context"
"fmt"
"time"

"go.uber.org/multierr"
"k8s.io/apimachinery/pkg/api/errors"
"sigs.k8s.io/karpenter/pkg/operator/injection"
nodeclaimutils "sigs.k8s.io/karpenter/pkg/utils/nodeclaim"

"github.com/aws/karpenter-provider-aws/pkg/providers/launchtemplate"
"sigs.k8s.io/karpenter/pkg/utils/result"

"github.com/samber/lo"
"k8s.io/apimachinery/pkg/api/equality"
Expand All @@ -44,34 +44,103 @@ import (
"sigs.k8s.io/karpenter/pkg/events"

v1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1"
"github.com/aws/karpenter-provider-aws/pkg/providers/amifamily"
"github.com/aws/karpenter-provider-aws/pkg/providers/instanceprofile"
"github.com/aws/karpenter-provider-aws/pkg/providers/launchtemplate"
"github.com/aws/karpenter-provider-aws/pkg/providers/securitygroup"
"github.com/aws/karpenter-provider-aws/pkg/providers/subnet"
)

type nodeClassReconciler interface {
Reconcile(context.Context, *v1.EC2NodeClass) (reconcile.Result, error)
}

type Controller struct {
kubeClient client.Client
recorder events.Recorder
instanceProfileProvider instanceprofile.Provider
launchTemplateProvider launchtemplate.Provider
kubeClient client.Client
recorder events.Recorder
launchTemplateProvider launchtemplate.Provider

ami *AMI
instanceProfile *InstanceProfile
subnet *Subnet
securityGroup *SecurityGroup
validation *Validation
readiness *Readiness //TODO : Remove this when we have sub status conditions
}

func NewController(kubeClient client.Client, recorder events.Recorder,
instanceProfileProvider instanceprofile.Provider, launchTemplateProvider launchtemplate.Provider) *Controller {
func NewController(kubeClient client.Client, recorder events.Recorder, subnetProvider subnet.Provider, securityGroupProvider securitygroup.Provider,
amiProvider amifamily.Provider, instanceProfileProvider instanceprofile.Provider, launchTemplateProvider launchtemplate.Provider) *Controller {

return &Controller{
kubeClient: kubeClient,
recorder: recorder,
instanceProfileProvider: instanceProfileProvider,
launchTemplateProvider: launchTemplateProvider,
kubeClient: kubeClient,
recorder: recorder,
launchTemplateProvider: launchTemplateProvider,
ami: &AMI{amiProvider: amiProvider},
subnet: &Subnet{subnetProvider: subnetProvider},
securityGroup: &SecurityGroup{securityGroupProvider: securityGroupProvider},
instanceProfile: &InstanceProfile{instanceProfileProvider: instanceProfileProvider},
validation: &Validation{},
readiness: &Readiness{launchTemplateProvider: launchTemplateProvider},
}
}

func (c *Controller) Name() string {
return "nodeclass"
}

func (c *Controller) Reconcile(ctx context.Context, nodeClass *v1.EC2NodeClass) (reconcile.Result, error) {
ctx = injection.WithControllerName(ctx, "nodeclass.termination")
ctx = injection.WithControllerName(ctx, c.Name())

if !nodeClass.GetDeletionTimestamp().IsZero() {
return c.finalize(ctx, nodeClass)
}
return reconcile.Result{}, nil

if !controllerutil.ContainsFinalizer(nodeClass, v1.TerminationFinalizer) {
stored := nodeClass.DeepCopy()
controllerutil.AddFinalizer(nodeClass, v1.TerminationFinalizer)

// We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch
// can cause races due to the fact that it fully replaces the list on a change
// Here, we are updating the finalizer list
if err := c.kubeClient.Patch(ctx, nodeClass, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, err
}
}
stored := nodeClass.DeepCopy()

var results []reconcile.Result
var errs error
for _, reconciler := range []nodeClassReconciler{
c.ami,
c.subnet,
c.securityGroup,
c.instanceProfile,
c.validation,
c.readiness,
} {
res, err := reconciler.Reconcile(ctx, nodeClass)
errs = multierr.Append(errs, err)
results = append(results, res)
}

if !equality.Semantic.DeepEqual(stored, nodeClass) {
// We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch
// can cause races due to the fact that it fully replaces the list on a change
// Here, we are updating the status condition list
if err := c.kubeClient.Status().Patch(ctx, nodeClass, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
errs = multierr.Append(errs, client.IgnoreNotFound(err))
}
}
if errs != nil {
return reconcile.Result{}, errs
}
return result.Min(results...), nil
}

func (c *Controller) finalize(ctx context.Context, nodeClass *v1.EC2NodeClass) (reconcile.Result, error) {
Expand All @@ -88,8 +157,8 @@ func (c *Controller) finalize(ctx context.Context, nodeClass *v1.EC2NodeClass) (
return reconcile.Result{RequeueAfter: time.Minute * 10}, nil // periodically fire the event
}
if nodeClass.Spec.Role != "" {
if err := c.instanceProfileProvider.Delete(ctx, nodeClass); err != nil {
return reconcile.Result{}, fmt.Errorf("deleting instance profile, %w", err)
if _, err := c.instanceProfile.Finalize(ctx, nodeClass); err != nil {
return reconcile.Result{}, err
}
}
if err := c.launchTemplateProvider.DeleteAll(ctx, nodeClass); err != nil {
Expand All @@ -113,7 +182,7 @@ func (c *Controller) finalize(ctx context.Context, nodeClass *v1.EC2NodeClass) (

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named("nodeclass.termination").
Named(c.Name()).
For(&v1.EC2NodeClass{}).
Watches(
&karpv1.NodeClaim{},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package termination
package nodeclass

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package status
package nodeclass

import (
"context"
Expand Down Expand Up @@ -42,3 +42,10 @@ func (ip *InstanceProfile) Reconcile(ctx context.Context, nodeClass *v1.EC2NodeC
nodeClass.StatusConditions().SetTrue(v1.ConditionTypeInstanceProfileReady)
return reconcile.Result{}, nil
}

func (ip *InstanceProfile) Finalize(ctx context.Context, nodeClass *v1.EC2NodeClass) (reconcile.Result, error) {
if err := ip.instanceProfileProvider.Delete(ctx, nodeClass); err != nil {
return reconcile.Result{}, fmt.Errorf("deleting instance profile, %w", err)
}
return reconcile.Result{}, nil
}
Loading

0 comments on commit 1de3a8a

Please sign in to comment.