Skip to content

Commit

Permalink
Revert "feat(#24): supports a common way for deployment waiting"
Browse files Browse the repository at this point in the history
  • Loading branch information
caoxianfei1 authored May 9, 2023
1 parent 322e8c8 commit b0518bc
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 106 deletions.
7 changes: 5 additions & 2 deletions pkg/chunkserver/spec.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package chunkserver

import (
"context"
"path"
"strconv"

Expand Down Expand Up @@ -67,8 +68,10 @@ func (c *Cluster) startChunkServers() error {
}

// wait all Deployments to start
if err := k8sutil.WaitForDeploymentsToStart(&c.Context, deploymentsToWaitFor, k8sutil.WaitForRunningInterval, k8sutil.WaitForRunningTimeout); err != nil {
return err
for _, d := range deploymentsToWaitFor {
if err := k8sutil.WaitForDeploymentToStart(context.TODO(), &c.Context, d); err != nil {
return err
}
}

return nil
Expand Down
6 changes: 4 additions & 2 deletions pkg/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,10 @@ func (c *Cluster) Start(nodeNameIP map[string]string) error {
}

// wait all Deployments to start
if err := k8sutil.WaitForDeploymentsToStart(&c.Context, deploymentsToWaitFor, k8sutil.WaitForRunningInterval, k8sutil.WaitForRunningTimeout); err != nil {
return err
for _, d := range deploymentsToWaitFor {
if err := k8sutil.WaitForDeploymentToStart(context.TODO(), &c.Context, d); err != nil {
return err
}
}

k8sutil.UpdateStatusCondition(c.Kind, context.TODO(), &c.Context, c.NamespacedName, curvev1.ConditionTypeEtcdReady, curvev1.ConditionTrue, curvev1.ConditionEtcdClusterCreatedReason, "Etcd cluster has been created")
Expand Down
131 changes: 35 additions & 96 deletions pkg/k8sutil/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,11 @@ import (
"fmt"
"time"

"github.com/opencurve/curve-operator/pkg/clusterd"
"github.com/opencurve/curve-operator/pkg/k8sutil/patch"
"github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
)

const (
WaitForRunningInterval = 2 * time.Second
WaitForRunningTimeout = 5 * time.Minute
"github.com/opencurve/curve-operator/pkg/clusterd"
"github.com/opencurve/curve-operator/pkg/k8sutil/patch"
)

// UpdateDeploymentAndWait updates a deployment and waits until it is running to return. It will
Expand Down Expand Up @@ -53,6 +45,11 @@ func UpdateDeploymentAndWait(ctx context.Context, clusterContext *clusterd.Conte
// If deployments are different, let's update!
logger.Infof("updating deployment %q after verifying it is safe to stop", modifiedDeployment.Name)

// Let's verify the deployment can be stopped
// if err := verifyCallback("stop"); err != nil {
// return fmt.Errorf("failed to check if deployment %q can be updated. %v", modifiedDeployment.Name, err)
// }

// Set hash annotation to the newly generated deployment
if err := patch.DefaultAnnotator.SetLastAppliedAnnotation(modifiedDeployment); err != nil {
return fmt.Errorf("failed to set hash annotation on deployment %q. %v", modifiedDeployment.Name, err)
Expand All @@ -62,101 +59,43 @@ func UpdateDeploymentAndWait(ctx context.Context, clusterContext *clusterd.Conte
return fmt.Errorf("failed to update deployment %q. %v", modifiedDeployment.Name, err)
}

waitChan := make(chan error)
defer close(waitChan)

go func() {
waitChan <- WaitForDeploymentToStart(clusterContext, currentDeployment, WaitForRunningInterval, WaitForRunningTimeout)
}()

select {
case <-ctx.Done():
return errors.Wrapf(ctx.Err(), "failed to wait for deployment %q to start due to context is done", currentDeployment.Name)
case err := <-waitChan:
if err != nil {
return fmt.Errorf("failed to wait for deployment %q to start due to timeout. %v", currentDeployment.Name, err)
}
if err := WaitForDeploymentToStart(ctx, clusterContext, currentDeployment); err != nil {
return err
}
return nil
}

// WaitForDeploymentsToStart waits for the deployments to start, and returns an error if any of the deployments
// not started
//
// interval is the interval to check the deployment status
// timeout is the timeout to wait for the deployment to start, if timeout, it returns an error
func WaitForDeploymentsToStart(clusterContext *clusterd.Context, objectMetas []*appsv1.Deployment,
interval time.Duration, timeout time.Duration) error {
length := len(objectMetas)
hub := make(chan error, length)
defer close(hub)
for i := range objectMetas {
objectMata := objectMetas[i]
go func() {
hub <- WaitForDeploymentToStart(clusterContext, objectMata, interval, timeout)
}()
}

var errorSlice []error
for i := 0; i < length; i++ {
if err := <-hub; err != nil {
errorSlice = append(errorSlice, err)
}
}
return utilerrors.NewAggregate(errorSlice)
// Now we check if we can go to the next daemon
// if err := verifyCallback("continue"); err != nil {
// return fmt.Errorf("failed to check if deployment %q can continue: %v", modifiedDeployment.Name, err)
// }
return nil
}

// WaitForDeploymentToStart waits for the deployment to start, and returns an error if the deployment not started
//
// interval is the interval to check the deployment status
// timeout is the timeout to wait for the deployment to start, if timeout, it returns an error
func WaitForDeploymentToStart(clusterContext *clusterd.Context, d *appsv1.Deployment, interval time.Duration,
timeout time.Duration) error {

// err is the error of once poll, it may provide the reason why the deployment is not started, and tell
// wait.PollImmediate to continue to poll
// and the lastErr is the error of the last poll
var lastErr error
_ = wait.PollImmediate(interval, timeout, func() (bool, error) {
deployment, err := clusterContext.Clientset.AppsV1().Deployments(d.GetNamespace()).
Get(d.GetName(), metav1.GetOptions{})
func WaitForDeploymentToStart(ctx context.Context, clusterdContext *clusterd.Context, deployment *appsv1.Deployment) error {
// wait for the deployment to be restarted up to 300s(5min)
sleepTime := 3
attempts := 100
for i := 0; i < attempts; i++ {
// check for the status of the deployment
d, err := clusterdContext.Clientset.AppsV1().Deployments(deployment.Namespace).Get(deployment.Name, metav1.GetOptions{})
if err != nil {
logger.Errorf("failed to get deployment %s in cluster: %s", d.GetName(), err.Error())
return false, err
return fmt.Errorf("failed to get deployment %q. %v", deployment.Name, err)
}
newStatus := deployment.Status
// this code is copied from pkg/controller/deployment/deployment_controller.go
if newStatus.UpdatedReplicas == *(deployment.Spec.Replicas) &&
newStatus.Replicas == *(deployment.Spec.Replicas) &&
newStatus.AvailableReplicas == *(deployment.Spec.Replicas) &&
newStatus.ObservedGeneration >= deployment.Generation {
logger.Infof("deployment %s has been started", deployment.Name)
// Set lastErr to nil because we have a successful poll
lastErr = nil
return true, nil
if d.Status.ObservedGeneration >= deployment.Status.ObservedGeneration && d.Status.UpdatedReplicas > 0 && d.Status.ReadyReplicas > 0 {
logger.Infof("finished waiting for updated deployment %q", d.Name)
return nil
}

var unready []appsv1.DeploymentCondition
// filter out the conditions that are not ready to help debug
for i := range deployment.Status.Conditions {
condition := deployment.Status.Conditions[i]
if condition.Status != v1.ConditionTrue {
unready = append(unready, condition)
// If ProgressDeadlineExceeded is reached let's fail earlier
// This can happen if one of the deployment cannot be scheduled on a node and stays in "pending" state
for _, condition := range d.Status.Conditions {
if condition.Type == appsv1.DeploymentProgressing && condition.Reason == "ProgressDeadlineExceeded" {
return fmt.Errorf("gave up waiting for deployment %q to update because %q", deployment.Name, condition.Reason)
}
}
logger.Infof("deployment %s is starting, Generation: %d, ObservedGeneration: %d, UpdatedReplicas: %d,"+
" ReadyReplicas: %d, UnReadyConditions: %v", deployment.Name, deployment.GetGeneration(),
deployment.Status.ObservedGeneration, deployment.Status.UpdatedReplicas,
deployment.Status.ReadyReplicas, unready)
if err != nil {
lastErr = err
}
return false, nil
})
if lastErr != nil {
return errors.Wrapf(lastErr, "failed to waiting deplyoment %s to start after %vs waiting",
d.GetName(), timeout.Seconds())

logger.Debugf("deployment %q status=%+v", d.Name, d.Status)

time.Sleep(time.Duration(sleepTime) * time.Second)
}
return errors.Errorf("failed to waiting deplyoment %s to start after %vs waiting",
d.GetName(), timeout.Seconds())
return fmt.Errorf("gave up waiting for deployment %q to update", deployment.Name)
}
6 changes: 4 additions & 2 deletions pkg/mds/mds.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,10 @@ func (c *Cluster) Start(nodeNameIP map[string]string) error {
}

// wait all Deployments to start
if err := k8sutil.WaitForDeploymentsToStart(&c.Context, deploymentsToWaitFor, k8sutil.WaitForRunningInterval, k8sutil.WaitForRunningTimeout); err != nil {
return err
for _, d := range deploymentsToWaitFor {
if err := k8sutil.WaitForDeploymentToStart(context.TODO(), &c.Context, d); err != nil {
return err
}
}

k8sutil.UpdateStatusCondition(c.Kind, context.TODO(), &c.Context, c.NamespacedName, curvev1.ConditionTypeMdsReady, curvev1.ConditionTrue, curvev1.ConditionMdsClusterCreatedReason, "MDS cluster has been created")
Expand Down
6 changes: 4 additions & 2 deletions pkg/metaserver/metaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,10 @@ func (c *Cluster) Start(nodeNameIP map[string]string) error {
}

// wait all Deployments to start
if err := k8sutil.WaitForDeploymentsToStart(&c.Context, deploymentsToWaitFor, k8sutil.WaitForRunningInterval, k8sutil.WaitForRunningTimeout); err != nil {
return err
for _, d := range deploymentsToWaitFor {
if err := k8sutil.WaitForDeploymentToStart(context.TODO(), &c.Context, d); err != nil {
return err
}
}

// create logic pool
Expand Down
6 changes: 4 additions & 2 deletions pkg/snapshotclone/snapshotclone.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,10 @@ func (c *Cluster) Start(nodeNameIP map[string]string) error {
}

// wait all Deployments to start
if err := k8sutil.WaitForDeploymentsToStart(&c.Context, deploymentsToWaitFor, k8sutil.WaitForRunningInterval, k8sutil.WaitForRunningTimeout); err != nil {
return err
for _, d := range deploymentsToWaitFor {
if err := k8sutil.WaitForDeploymentToStart(context.TODO(), &c.Context, d); err != nil {
return err
}
}

k8sutil.UpdateStatusCondition(c.Kind, context.TODO(), &c.Context, c.NamespacedName, curvev1.ConditionTypeSnapShotCloneReady, curvev1.ConditionTrue, curvev1.ConditionSnapShotCloneClusterCreatedReason, "Snapshotclone cluster has been created")
Expand Down

0 comments on commit b0518bc

Please sign in to comment.