Skip to content

Commit

Permalink
feat(opencurve#24): supports a common way for deployment waiting
Browse files Browse the repository at this point in the history
Signed-off-by: Anur Ijuokarukas <[email protected]>
  • Loading branch information
anurnomeru committed Apr 21, 2023
1 parent ebde98e commit 5259280
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 19 deletions.
5 changes: 1 addition & 4 deletions pkg/chunkserver/chunkserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,12 @@ func (c *Cluster) Start(nodeNameIP map[string]string) error {
logger.Info("create physical pool successed")

// 3. startChunkServers start all chunkservers for each device of every node
// 4. wait all chunkservers online before create logical pool
err = c.startChunkServers()
if err != nil {
return errors.Wrap(err, "failed to start chunkserver")
}

// 4. wait all chunkservers online before create logical pool
logger.Info("starting all chunkserver")
time.Sleep(30 * time.Second)

// 5. create logical pool
_, err = c.runCreatePoolJob(nodeNameIP, "logical_pool")
if err != nil {
Expand Down
44 changes: 29 additions & 15 deletions pkg/chunkserver/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package chunkserver
import (
"path"
"strconv"
"time"

"github.com/opencurve/curve-operator/pkg/k8sutil"
"github.com/pkg/errors"
apps "k8s.io/api/apps/v1"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -37,6 +39,7 @@ func (c *Cluster) startChunkServers() error {

_ = c.CreateS3ConfigMap()

deploymentsToWaitFor := make([]*appsv1.Deployment, 0)
for _, csConfig := range chunkserverConfigs {

err := c.createConfigMap(csConfig)
Expand All @@ -62,19 +65,24 @@ func (c *Cluster) startChunkServers() error {
// }
} else {
logger.Infof("Deployment %s has been created , waiting for startup", newDeployment.GetName())
// TODO:wait for the new deployment
// deploymentsToWaitFor = append(deploymentsToWaitFor, newDeployment)
deploymentsToWaitFor = append(deploymentsToWaitFor, newDeployment)
}
// update condition type and phase etc.
}

logger.Info("starting all chunkserver")
if errSlice := k8sutil.WaitForDeploymentsToStart(c.context.Clientset, 3*time.Second, 30*time.Second,
deploymentsToWaitFor); len(errSlice) > 0 {
errors.
}
return nil
}

// createCSClientConfigMap create cs_client configmap
func (c *Cluster) createCSClientConfigMap() error {
// 1. get mds-conf-template from cluster
csClientCMTemplate, err := c.context.Clientset.CoreV1().ConfigMaps(c.namespacedName.Namespace).Get(config.CsClientConfigMapTemp, metav1.GetOptions{})
csClientCMTemplate, err := c.context.Clientset.CoreV1().ConfigMaps(c.namespacedName.Namespace).Get(config.CsClientConfigMapTemp,
metav1.GetOptions{})
if err != nil {
logger.Errorf("failed to get configmap %s from cluster", config.CsClientConfigMapTemp)
if kerrors.IsNotFound(err) {
Expand All @@ -88,7 +96,8 @@ func (c *Cluster) createCSClientConfigMap() error {
// 3. replace ${} to specific parameters
replacedCsClientData, err := config.ReplaceConfigVars(csClientCMData, &chunkserverConfigs[0])
if err != nil {
return errors.Wrap(err, "failed to Replace cs_client config template to generate a new cs_client configmap to start server.")
return errors.Wrap(err,
"failed to Replace cs_client config template to generate a new cs_client configmap to start server.")
}

csClientConfigMap := map[string]string{
Expand All @@ -105,7 +114,8 @@ func (c *Cluster) createCSClientConfigMap() error {

err = c.ownerInfo.SetControllerReference(cm)
if err != nil {
return errors.Wrapf(err, "failed to set owner reference to cs_client.conf configmap %q", config.CSClientConfigMapName)
return errors.Wrapf(err, "failed to set owner reference to cs_client.conf configmap %q",
config.CSClientConfigMapName)
}

// Create cs_client configmap in cluster
Expand All @@ -119,7 +129,8 @@ func (c *Cluster) createCSClientConfigMap() error {

// CreateS3ConfigMap creates s3 configmap
func (c *Cluster) CreateS3ConfigMap() error {
s3CMTemplate, err := c.context.Clientset.CoreV1().ConfigMaps(c.namespacedName.Namespace).Get(config.S3ConfigMapTemp, metav1.GetOptions{})
s3CMTemplate, err := c.context.Clientset.CoreV1().ConfigMaps(c.namespacedName.Namespace).Get(config.S3ConfigMapTemp,
metav1.GetOptions{})
if err != nil {
logger.Errorf("failed to get configmap %s from cluster", config.S3ConfigMapTemp)
if kerrors.IsNotFound(err) {
Expand Down Expand Up @@ -199,7 +210,8 @@ func (c *Cluster) createStartCSConfigMap() error {
// createConfigMap create chunkserver configmap for chunkserver server
func (c *Cluster) createConfigMap(csConfig chunkserverConfig) error {
// 1. get mds-conf-template from cluster
chunkserverCMTemplate, err := c.context.Clientset.CoreV1().ConfigMaps(c.namespacedName.Namespace).Get(config.ChunkServerConfigMapTemp, metav1.GetOptions{})
chunkserverCMTemplate, err := c.context.Clientset.CoreV1().ConfigMaps(c.namespacedName.Namespace).Get(config.ChunkServerConfigMapTemp,
metav1.GetOptions{})
if err != nil {
logger.Errorf("failed to get configmap %s from cluster", config.ChunkServerConfigMapTemp)
if kerrors.IsNotFound(err) {
Expand All @@ -217,7 +229,8 @@ func (c *Cluster) createConfigMap(csConfig chunkserverConfig) error {
// 3. replace ${} to specific parameters
replacedChunkServerData, err := config.ReplaceConfigVars(chunkserverData, &csConfig)
if err != nil {
return errors.Wrap(err, "failed to Replace chunkserver config template to generate a new chunkserver configmap to start server.")
return errors.Wrap(err,
"failed to Replace chunkserver config template to generate a new chunkserver configmap to start server.")
}

// for debug
Expand All @@ -237,7 +250,8 @@ func (c *Cluster) createConfigMap(csConfig chunkserverConfig) error {

err = c.ownerInfo.SetControllerReference(cm)
if err != nil {
return errors.Wrapf(err, "failed to set owner reference to chunkserverconfig configmap %q", config.ChunkserverConfigMapName)
return errors.Wrapf(err, "failed to set owner reference to chunkserverconfig configmap %q",
config.ChunkserverConfigMapName)
}

// Create chunkserver config in cluster
Expand All @@ -249,7 +263,7 @@ func (c *Cluster) createConfigMap(csConfig chunkserverConfig) error {
return nil
}

func (c *Cluster) makeDeployment(csConfig *chunkserverConfig) (*apps.Deployment, error) {
func (c *Cluster) makeDeployment(csConfig *chunkserverConfig) (*appsv1.Deployment, error) {
volumes := CSDaemonVolumes(csConfig)
vols, _ := c.createTopoAndToolVolumeAndMount()
volumes = append(volumes, vols...)
Expand All @@ -273,20 +287,20 @@ func (c *Cluster) makeDeployment(csConfig *chunkserverConfig) (*apps.Deployment,

replicas := int32(1)

d := &apps.Deployment{
d := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: csConfig.ResourceName,
Namespace: c.namespacedName.Namespace,
Labels: c.getChunkServerPodLabels(csConfig),
},
Spec: apps.DeploymentSpec{
Spec: appsv1.DeploymentSpec{
Selector: &metav1.LabelSelector{
MatchLabels: c.getChunkServerPodLabels(csConfig),
},
Template: podSpec,
Replicas: &replicas,
Strategy: apps.DeploymentStrategy{
Type: apps.RecreateDeploymentStrategyType,
Strategy: appsv1.DeploymentStrategy{
Type: appsv1.RecreateDeploymentStrategyType,
},
},
}
Expand Down
97 changes: 97 additions & 0 deletions pkg/k8sutil/deployment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package k8sutil

import (
"time"

"github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
)

// WaitForDeploymentsToStart waits for the deployments to start, and returns a channel to indicate whether
// all deployments are started or not
//
// tickDuration is the interval to check the deployment status
// objectMeta is the metadata of the deployment
//
// we use the hub chan to collect the result of each deployment, and when all deployments are started,
// we return true, otherwise, we return false, this design let WaitForDeploymentToStart and
// WaitForDeploymentsToStart can be used in the same way
func WaitForDeploymentsToStart(clientSet kubernetes.Interface, interval time.Duration,
timeout time.Duration, objectMetas []*appsv1.Deployment) error {
length := len(objectMetas)
hub := make(chan error, length)
defer close(hub)
for i := range objectMetas {
objectMata := objectMetas[i]
go func() {
hub <- WaitForDeploymentToStart(clientSet, interval, timeout, objectMata)
}()
}

var errorSlice []error
for i := 0; i < length; i++ {
if err := <-hub; err != nil {
errorSlice = append(errorSlice, err)
}
}
return utilerrors.NewAggregate(errorSlice)
}

// WaitForDeploymentToStart waits for the deployment to start, and returns a channel to indicate whether
// the deployment is started or not
//
// tickDuration is the interval to check the deployment status
// objectMeta is the metadata of the deployment
//
// Caution: it actually waits for at least one pod to be ready, not completely ready
func WaitForDeploymentToStart(clientSet kubernetes.Interface, interval time.Duration,
timeout time.Duration, d *appsv1.Deployment) error {

// err is the error of once poll, it may provide the reason why the deployment is not started, and tell
// wait.PollImmediate to continue to poll
// and the lastErr is the error of the last poll
var lastErr error
_ = wait.PollImmediate(interval, timeout, func() (bool, error) {
deployment, err := clientSet.AppsV1().Deployments(d.GetNamespace()).
Get(d.GetName(), metav1.GetOptions{})
if err != nil {
logger.Errorf("failed to get deployment %s in cluster: %s", d.GetName(), err.Error())
return false, err
}
if deployment.GetGeneration() == deployment.Status.ObservedGeneration &&
deployment.Status.UpdatedReplicas > 0 && deployment.Status.ReadyReplicas > 0 {
logger.Infof("deployment %s has been started", deployment.Name)
// Set lastErr to nil because we have a successful poll
lastErr = nil
return true, nil
}

var unready []appsv1.DeploymentCondition
// filter out the conditions that are not ready to help debug
for i := range deployment.Status.Conditions {
condition := deployment.Status.Conditions[i]
if condition.Status != v1.ConditionTrue {
unready = append(unready, condition)
}
}
logger.Infof("deployment %s is starting, Generation: %d, ObservedGeneration: %d, UpdatedReplicas: %d,"+
" ReadyReplicas: %d, UnReadyConditions: %v", deployment.Name, deployment.GetGeneration(),
deployment.Status.ObservedGeneration, deployment.Status.UpdatedReplicas,
deployment.Status.ReadyReplicas, unready)
if err != nil {
lastErr = err
}
return false, nil
})
if lastErr != nil {
return errors.Wrapf(lastErr, "failed to waiting deplyoment %s to start after %vs waiting",
d.GetName(), timeout.Seconds())
}
return errors.Errorf("failed to waiting deplyoment %s to start after %vs waiting",
d.GetName(), timeout.Seconds())
}

0 comments on commit 5259280

Please sign in to comment.