Skip to content

Commit

Permalink
Only schedule switchover for pod migration, consider mainWindow for P…
Browse files Browse the repository at this point in the history
…GVERSION env change
  • Loading branch information
hughcapet committed Jan 22, 2025
1 parent e04b91d commit 604b760
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 37 deletions.
3 changes: 0 additions & 3 deletions docs/administrator.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,6 @@ Note that, changes in `SPILO_CONFIGURATION` env variable under `bootstrap.dcs`
path are ignored for the diff. They will be applied through Patroni's rest api
interface, following a restart of all instances.

Rolling update is postponed until the next maintenance window if any is defined
under the `maintenanceWindows` cluster manifest parameter.

The operator also support lazy updates of the Spilo image. In this case the
StatefulSet is only updated, but no rolling update follows. This feature saves
you a switchover - and hence downtime - when you know pods are re-started later
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/cluster_manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ These parameters are grouped directly under the `spec` key in the manifest.

* **maintenanceWindows**
a list which defines specific time frames when certain maintenance operations
such as automatic major upgrades or rolling updates are allowed. Accepted formats
such as automatic major upgrades or master pod migration. Accepted formats
are "01:00-06:00" for daily maintenance windows or "Sat:00:00-04:00" for specific
days, with all times in UTC.

Expand Down
21 changes: 11 additions & 10 deletions e2e/tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -1187,7 +1187,7 @@ def test_major_version_upgrade(self):
Test major version upgrade: with full upgrade, maintenance window, and annotation
"""
def check_version():
p = k8s.patroni_rest("acid-upgrade-test-0", "")
p = k8s.patroni_rest("acid-upgrade-test-0", "") or {}
version = p.get("server_version", 0) // 10000
return version

Expand Down Expand Up @@ -1237,7 +1237,7 @@ def get_annotations():
# should not upgrade because current time is not in maintenanceWindow
current_time = datetime.now()
maintenance_window_future = f"{(current_time+timedelta(minutes=60)).strftime('%H:%M')}-{(current_time+timedelta(minutes=120)).strftime('%H:%M')}"
pg_patch_version_15 = {
pg_patch_version_15_outside_mw = {
"spec": {
"postgresql": {
"version": "15"
Expand All @@ -1248,7 +1248,7 @@ def get_annotations():
}
}
k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_15)
"acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_15_outside_mw)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")

# no pod replacement outside of the maintenance window
Expand All @@ -1259,12 +1259,12 @@ def get_annotations():
second_annotations = get_annotations()
self.assertIsNone(second_annotations.get("last-major-upgrade-failure"), "Annotation for last upgrade's failure should not be set")

# change the version again to trigger operator sync
# change maintenanceWindows to current
maintenance_window_current = f"{(current_time-timedelta(minutes=30)).strftime('%H:%M')}-{(current_time+timedelta(minutes=30)).strftime('%H:%M')}"
pg_patch_version_16 = {
pg_patch_version_15_in_mw = {
"spec": {
"postgresql": {
"version": "16"
"version": "15"
},
"maintenanceWindows": [
maintenance_window_current
Expand All @@ -1273,13 +1273,13 @@ def get_annotations():
}

k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_16)
"acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_15_in_mw)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")

k8s.wait_for_pod_failover(master_nodes, 'spilo-role=master,' + cluster_label)
k8s.wait_for_pod_start('spilo-role=master,' + cluster_label)
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
self.eventuallyEqual(check_version, 16, "Version should be upgraded from 14 to 16")
self.eventuallyEqual(check_version, 15, "Version should be upgraded from 14 to 15")

# check if annotation for last upgrade's success is updated after second upgrade
third_annotations = get_annotations()
Expand All @@ -1306,16 +1306,17 @@ def get_annotations():
k8s.wait_for_pod_failover(master_nodes, 'spilo-role=replica,' + cluster_label)
k8s.wait_for_pod_start('spilo-role=master,' + cluster_label)
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
self.eventuallyEqual(check_version, 16, "Version should not be upgraded because annotation for last upgrade's failure is set")
self.eventuallyEqual(check_version, 15, "Version should not be upgraded because annotation for last upgrade's failure is set")

# change the version back to 15 and should remove failure annotation
k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_15)
"acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_15_in_mw)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")

k8s.wait_for_pod_start('spilo-role=master,' + cluster_label)
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)

self.eventuallyEqual(check_version, 15, "Version should not be upgraded from 15")
fourth_annotations = get_annotations()
self.assertIsNone(fourth_annotations.get("last-major-upgrade-failure"), "Annotation for last upgrade's failure is not removed")

Expand Down
39 changes: 24 additions & 15 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,11 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
defer c.mu.Unlock()

c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusUpdating)

if !isInMaintenanceWindow(newSpec.Spec.MaintenanceWindows) {
// do not apply any major version related changes yet
newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion
}
c.setSpec(newSpec)

defer func() {
Expand Down Expand Up @@ -1761,35 +1766,39 @@ func (c *Cluster) GetSwitchoverSchedule() string {
}

// Switchover does a switchover (via Patroni) to a candidate pod
func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) error {
func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName, inMaintWindow bool) error {
var err error
c.logger.Debugf("switching over from %q to %q", curMaster.Name, candidate)

if !isInMaintenanceWindow(c.Spec.MaintenanceWindows) {
c.logger.Infof("postponing switchover, not in maintenance window")
schedule := c.GetSwitchoverSchedule()

if err := c.patroni.Switchover(curMaster, candidate.Name, schedule); err != nil {
return fmt.Errorf("could not schedule switchover: %v", err)
}
c.logger.Infof("switchover is scheduled at %s", schedule)
return nil
}

c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switching over from %q to %q", curMaster.Name, candidate)
stopCh := make(chan struct{})
ch := c.registerPodSubscriber(candidate)
defer c.unregisterPodSubscriber(candidate)
defer close(stopCh)

if err = c.patroni.Switchover(curMaster, candidate.Name, ""); err == nil {
var scheduled_at string
if inMaintWindow {
c.logger.Debugf("switching over from %q to %q", curMaster.Name, candidate)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switching over from %q to %q", curMaster.Name, candidate)

scheduled_at = c.GetSwitchoverSchedule()
} else {
scheduled_at = ""
}

if err = c.patroni.Switchover(curMaster, candidate.Name, scheduled_at); err == nil {
if inMaintWindow {
c.logger.Infof("switchover is scheduled at %s", scheduled_at)
return nil
}
c.logger.Debugf("successfully switched over from %q to %q", curMaster.Name, candidate)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Successfully switched over from %q to %q", curMaster.Name, candidate)
_, err = c.waitForPodLabel(ch, stopCh, nil)
if err != nil {
err = fmt.Errorf("could not get master pod label: %v", err)
}
} else {
if inMaintWindow {
return fmt.Errorf("could not schedule switchover: %v", err)
}
err = fmt.Errorf("could not switch over from %q to %q: %v", curMaster.Name, candidate, err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switchover from %q to %q FAILED: %v", curMaster.Name, candidate, err)
}
Expand Down
11 changes: 8 additions & 3 deletions pkg/cluster/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,11 +280,16 @@ func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error {
return fmt.Errorf("could not move pod: %v", err)
}

scheduleSwitchover := false
if !isInMaintenanceWindow(c.Spec.MaintenanceWindows) {
c.logger.Infof("postponing switchover, not in maintenance window")
scheduleSwitchover = true
}
err = retryutil.Retry(1*time.Minute, 5*time.Minute,
func() (bool, error) {
err := c.Switchover(oldMaster, masterCandidateName)
err := c.Switchover(oldMaster, masterCandidateName, scheduleSwitchover)
if err != nil {
c.logger.Errorf("could not failover to pod %q: %v", masterCandidateName, err)
c.logger.Errorf("could not switchover to pod %q: %v", masterCandidateName, err)
return false, nil
}
return true, nil
Expand Down Expand Up @@ -445,7 +450,7 @@ func (c *Cluster) recreatePods(pods []v1.Pod, switchoverCandidates []spec.Namesp
// do not recreate master now so it will keep the update flag and switchover will be retried on next sync
return fmt.Errorf("skipping switchover: %v", err)
}
if err := c.Switchover(masterPod, masterCandidate); err != nil {
if err := c.Switchover(masterPod, masterCandidate, false); err != nil {
return fmt.Errorf("could not perform switch over: %v", err)
}
} else if newMasterPod == nil && len(replicas) == 0 {
Expand Down
10 changes: 5 additions & 5 deletions pkg/cluster/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
}
}

if !isInMaintenanceWindow(newSpec.Spec.MaintenanceWindows) {
// do not apply any major version related changes yet
newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion
}

if err = c.syncStatefulSet(); err != nil {
if !k8sutil.ResourceAlreadyExists(err) {
err = fmt.Errorf("could not sync statefulsets: %v", err)
Expand Down Expand Up @@ -658,11 +663,6 @@ func (c *Cluster) syncStatefulSet() error {
isSafeToRecreatePods = false
}

if !isInMaintenanceWindow(c.Spec.MaintenanceWindows) {
postponeReasons = append(postponeReasons, "not in maintenance window")
isSafeToRecreatePods = false
}

// if we get here we also need to re-create the pods (either leftovers from the old
// statefulset or those that got their configuration from the outdated statefulset)
if len(podsToRecreate) > 0 {
Expand Down

0 comments on commit 604b760

Please sign in to comment.