Skip to content

Commit 07fd4ec

Browse files
authored
choose switchover candidate based on lag and role (zalando#1700)
* choose switchover candidate based on lowest lag in MB and role (in synchronous mode)
1 parent 8959618 commit 07fd4ec

File tree

6 files changed

+231
-11
lines changed

6 files changed

+231
-11
lines changed

pkg/cluster/pod.go

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"math/rand"
7+
"sort"
78
"strconv"
89
"time"
910

@@ -459,8 +460,13 @@ func (c *Cluster) recreatePods(pods []v1.Pod, switchoverCandidates []spec.Namesp
459460
// 1. we have not observed a new master pod when re-creating former replicas
460461
// 2. we know possible switchover targets even when no replicas were recreated
461462
if newMasterPod == nil && len(replicas) > 0 {
462-
if err := c.Switchover(masterPod, masterCandidate(replicas)); err != nil {
463-
c.logger.Warningf("could not perform switch over: %v", err)
463+
masterCandidate, err := c.getSwitchoverCandidate(masterPod)
464+
if err != nil {
465+
// do not recreate master now so it will keep the update flag and switchover will be retried on next sync
466+
return fmt.Errorf("skipping switchover: %v", err)
467+
}
468+
if err := c.Switchover(masterPod, masterCandidate); err != nil {
469+
return fmt.Errorf("could not perform switch over: %v", err)
464470
}
465471
} else if newMasterPod == nil && len(replicas) == 0 {
466472
c.logger.Warningf("cannot perform switch over before re-creating the pod: no replicas")
@@ -475,6 +481,50 @@ func (c *Cluster) recreatePods(pods []v1.Pod, switchoverCandidates []spec.Namesp
475481
return nil
476482
}
477483

484+
func (c *Cluster) getSwitchoverCandidate(master *v1.Pod) (spec.NamespacedName, error) {
485+
486+
var members []patroni.ClusterMember
487+
candidates := make([]patroni.ClusterMember, 0)
488+
syncCandidates := make([]patroni.ClusterMember, 0)
489+
490+
err := retryutil.Retry(1*time.Second, 5*time.Second,
491+
func() (bool, error) {
492+
var err error
493+
members, err = c.patroni.GetClusterMembers(master)
494+
495+
if err != nil {
496+
return false, err
497+
}
498+
return true, nil
499+
},
500+
)
501+
if err != nil {
502+
return spec.NamespacedName{}, fmt.Errorf("failed to get Patroni cluster members: %s", err)
503+
}
504+
505+
for _, member := range members {
506+
if PostgresRole(member.Role) != Leader && PostgresRole(member.Role) != StandbyLeader && member.State == "running" {
507+
candidates = append(candidates, member)
508+
if PostgresRole(member.Role) == SyncStandby {
509+
syncCandidates = append(syncCandidates, member)
510+
}
511+
}
512+
}
513+
514+
// pick candidate with lowest lag
515+
// if sync_standby replicas were found assume synchronous_mode is enabled and ignore other candidates list
516+
if len(syncCandidates) > 0 {
517+
sort.Slice(syncCandidates, func(i, j int) bool { return syncCandidates[i].LagInMb < syncCandidates[j].LagInMb })
518+
return spec.NamespacedName{Namespace: master.Namespace, Name: syncCandidates[0].Name}, nil
519+
}
520+
if len(candidates) > 0 {
521+
sort.Slice(candidates, func(i, j int) bool { return candidates[i].LagInMb < candidates[j].LagInMb })
522+
return spec.NamespacedName{Namespace: master.Namespace, Name: candidates[0].Name}, nil
523+
}
524+
525+
return spec.NamespacedName{}, fmt.Errorf("no switchover candidate found")
526+
}
527+
478528
func (c *Cluster) podIsEndOfLife(pod *v1.Pod) (bool, error) {
479529
node, err := c.KubeClient.Nodes().Get(context.TODO(), pod.Spec.NodeName, metav1.GetOptions{})
480530
if err != nil {

pkg/cluster/pod_test.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package cluster
2+
3+
import (
4+
"bytes"
5+
"fmt"
6+
"io/ioutil"
7+
"net/http"
8+
"testing"
9+
10+
"github.com/golang/mock/gomock"
11+
"github.com/zalando/postgres-operator/mocks"
12+
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
13+
"github.com/zalando/postgres-operator/pkg/spec"
14+
"github.com/zalando/postgres-operator/pkg/util/k8sutil"
15+
"github.com/zalando/postgres-operator/pkg/util/patroni"
16+
)
17+
18+
func TestGetSwitchoverCandidate(t *testing.T) {
19+
testName := "test getting right switchover candidate"
20+
namespace := "default"
21+
22+
ctrl := gomock.NewController(t)
23+
defer ctrl.Finish()
24+
25+
var cluster = New(Config{}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
26+
27+
// simulate different member scenarios
28+
tests := []struct {
29+
subtest string
30+
clusterJson string
31+
expectedCandidate spec.NamespacedName
32+
expectedError error
33+
}{
34+
{
35+
subtest: "choose sync_standby over replica",
36+
clusterJson: `{"members": [{"name": "acid-test-cluster-0", "role": "leader", "state": "running", "api_url": "http://192.168.100.1:8008/patroni", "host": "192.168.100.1", "port": 5432, "timeline": 1}, {"name": "acid-test-cluster-1", "role": "sync_standby", "state": "running", "api_url": "http://192.168.100.2:8008/patroni", "host": "192.168.100.2", "port": 5432, "timeline": 1, "lag": 0}, {"name": "acid-test-cluster-2", "role": "replica", "state": "running", "api_url": "http://192.168.100.3:8008/patroni", "host": "192.168.100.3", "port": 5432, "timeline": 1, "lag": 0}]}`,
37+
expectedCandidate: spec.NamespacedName{Namespace: namespace, Name: "acid-test-cluster-1"},
38+
expectedError: nil,
39+
},
40+
{
41+
subtest: "choose replica with lowest lag",
42+
clusterJson: `{"members": [{"name": "acid-test-cluster-0", "role": "leader", "state": "running", "api_url": "http://192.168.100.1:8008/patroni", "host": "192.168.100.1", "port": 5432, "timeline": 1}, {"name": "acid-test-cluster-1", "role": "replica", "state": "running", "api_url": "http://192.168.100.2:8008/patroni", "host": "192.168.100.2", "port": 5432, "timeline": 1, "lag": 5}, {"name": "acid-test-cluster-2", "role": "replica", "state": "running", "api_url": "http://192.168.100.3:8008/patroni", "host": "192.168.100.3", "port": 5432, "timeline": 1, "lag": 2}]}`,
43+
expectedCandidate: spec.NamespacedName{Namespace: namespace, Name: "acid-test-cluster-2"},
44+
expectedError: nil,
45+
},
46+
{
47+
subtest: "choose first replica when lag is equal evrywhere",
48+
clusterJson: `{"members": [{"name": "acid-test-cluster-0", "role": "leader", "state": "running", "api_url": "http://192.168.100.1:8008/patroni", "host": "192.168.100.1", "port": 5432, "timeline": 1}, {"name": "acid-test-cluster-1", "role": "replica", "state": "running", "api_url": "http://192.168.100.2:8008/patroni", "host": "192.168.100.2", "port": 5432, "timeline": 1, "lag": 5}, {"name": "acid-test-cluster-2", "role": "replica", "state": "running", "api_url": "http://192.168.100.3:8008/patroni", "host": "192.168.100.3", "port": 5432, "timeline": 1, "lag": 5}]}`,
49+
expectedCandidate: spec.NamespacedName{Namespace: namespace, Name: "acid-test-cluster-1"},
50+
expectedError: nil,
51+
},
52+
{
53+
subtest: "no running replica available",
54+
clusterJson: `{"members": [{"name": "acid-test-cluster-0", "role": "leader", "state": "running", "api_url": "http://192.168.100.1:8008/patroni", "host": "192.168.100.1", "port": 5432, "timeline": 2}, {"name": "acid-test-cluster-1", "role": "replica", "state": "starting", "api_url": "http://192.168.100.2:8008/patroni", "host": "192.168.100.2", "port": 5432, "timeline": 2}]}`,
55+
expectedCandidate: spec.NamespacedName{},
56+
expectedError: fmt.Errorf("no switchover candidate found"),
57+
},
58+
}
59+
60+
for _, tt := range tests {
61+
// mocking cluster members
62+
r := ioutil.NopCloser(bytes.NewReader([]byte(tt.clusterJson)))
63+
64+
response := http.Response{
65+
StatusCode: 200,
66+
Body: r,
67+
}
68+
69+
mockClient := mocks.NewMockHTTPClient(ctrl)
70+
mockClient.EXPECT().Get(gomock.Any()).Return(&response, nil).AnyTimes()
71+
72+
p := patroni.New(patroniLogger, mockClient)
73+
cluster.patroni = p
74+
mockMasterPod := newMockPod("192.168.100.1")
75+
mockMasterPod.Namespace = namespace
76+
77+
candidate, err := cluster.getSwitchoverCandidate(mockMasterPod)
78+
if err != nil && err.Error() != tt.expectedError.Error() {
79+
t.Errorf("%s - %s: unexpected error, %v", testName, tt.subtest, err)
80+
}
81+
82+
if candidate != tt.expectedCandidate {
83+
t.Errorf("%s - %s: unexpect switchover candidate, got %s, expected %s", testName, tt.subtest, candidate, tt.expectedCandidate)
84+
}
85+
}
86+
}

pkg/cluster/types.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,14 @@ import (
1414
type PostgresRole string
1515

1616
const (
17-
// Master role
18-
Master PostgresRole = "master"
19-
20-
// Replica role
17+
// spilo roles
18+
Master PostgresRole = "master"
2119
Replica PostgresRole = "replica"
20+
21+
// roles returned by Patroni cluster endpoint
22+
Leader PostgresRole = "leader"
23+
StandbyLeader PostgresRole = "standby_leader"
24+
SyncStandby PostgresRole = "sync_standby"
2225
)
2326

2427
// PodEventType represents the type of a pod-related event

pkg/cluster/util.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"encoding/gob"
77
"encoding/json"
88
"fmt"
9-
"math/rand"
109
"reflect"
1110
"sort"
1211
"strings"
@@ -525,10 +524,6 @@ func (c *Cluster) credentialSecretNameForCluster(username string, clusterName st
525524
"tprgroup", acidzalando.GroupName)
526525
}
527526

528-
func masterCandidate(replicas []spec.NamespacedName) spec.NamespacedName {
529-
return replicas[rand.Intn(len(replicas))]
530-
}
531-
532527
func cloneSpec(from *acidv1.Postgresql) (*acidv1.Postgresql, error) {
533528
var (
534529
buf bytes.Buffer

pkg/util/patroni/patroni.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
const (
2222
failoverPath = "/failover"
2323
configPath = "/config"
24+
clusterPath = "/cluster"
2425
statusPath = "/patroni"
2526
restartPath = "/restart"
2627
apiPort = 8008
@@ -29,6 +30,7 @@ const (
2930

3031
// Interface describe patroni methods
3132
type Interface interface {
33+
GetClusterMembers(master *v1.Pod) ([]ClusterMember, error)
3234
Switchover(master *v1.Pod, candidate string) error
3335
SetPostgresParameters(server *v1.Pod, options map[string]string) error
3436
GetMemberData(server *v1.Pod) (MemberData, error)
@@ -175,6 +177,20 @@ func (p *Patroni) SetConfig(server *v1.Pod, config map[string]interface{}) error
175177
return p.httpPostOrPatch(http.MethodPatch, apiURLString+configPath, buf)
176178
}
177179

180+
// ClusterMembers array of cluster members from Patroni API
181+
type ClusterMembers struct {
182+
Members []ClusterMember `json:"members"`
183+
}
184+
185+
// ClusterMember cluster member data from Patroni API
186+
type ClusterMember struct {
187+
Name string `json:"name"`
188+
Role string `json:"role"`
189+
State string `json:"state"`
190+
Timeline int `json:"timeline"`
191+
LagInMb int `json:"lag"`
192+
}
193+
178194
// MemberDataPatroni child element
179195
type MemberDataPatroni struct {
180196
Version string `json:"version"`
@@ -246,6 +262,27 @@ func (p *Patroni) Restart(server *v1.Pod) error {
246262
return nil
247263
}
248264

265+
// GetClusterMembers read cluster data from patroni API
266+
func (p *Patroni) GetClusterMembers(server *v1.Pod) ([]ClusterMember, error) {
267+
268+
apiURLString, err := apiURL(server)
269+
if err != nil {
270+
return []ClusterMember{}, err
271+
}
272+
body, err := p.httpGet(apiURLString + clusterPath)
273+
if err != nil {
274+
return []ClusterMember{}, err
275+
}
276+
277+
data := ClusterMembers{}
278+
err = json.Unmarshal([]byte(body), &data)
279+
if err != nil {
280+
return []ClusterMember{}, err
281+
}
282+
283+
return data.Members, nil
284+
}
285+
249286
// GetMemberData read member data from patroni API
250287
func (p *Patroni) GetMemberData(server *v1.Pod) (MemberData, error) {
251288

pkg/util/patroni/patroni_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,55 @@ func TestApiURL(t *testing.T) {
8585
}
8686
}
8787

88+
func TestGetClusterMembers(t *testing.T) {
89+
ctrl := gomock.NewController(t)
90+
defer ctrl.Finish()
91+
92+
expectedClusterMemberData := []ClusterMember{
93+
{
94+
Name: "acid-test-cluster-0",
95+
Role: "leader",
96+
State: "running",
97+
Timeline: 1,
98+
LagInMb: 0,
99+
}, {
100+
Name: "acid-test-cluster-1",
101+
Role: "sync_standby",
102+
State: "running",
103+
Timeline: 1,
104+
LagInMb: 0,
105+
}, {
106+
Name: "acid-test-cluster-2",
107+
Role: "replica",
108+
State: "running",
109+
Timeline: 1,
110+
LagInMb: 0,
111+
}}
112+
113+
json := `{"members": [{"name": "acid-test-cluster-0", "role": "leader", "state": "running", "api_url": "http://192.168.100.1:8008/patroni", "host": "192.168.100.1", "port": 5432, "timeline": 1}, {"name": "acid-test-cluster-1", "role": "sync_standby", "state": "running", "api_url": "http://192.168.100.2:8008/patroni", "host": "192.168.100.2", "port": 5432, "timeline": 1, "lag": 0}, {"name": "acid-test-cluster-2", "role": "replica", "state": "running", "api_url": "http://192.168.100.3:8008/patroni", "host": "192.168.100.3", "port": 5432, "timeline": 1, "lag": 0}]}`
114+
r := ioutil.NopCloser(bytes.NewReader([]byte(json)))
115+
116+
response := http.Response{
117+
StatusCode: 200,
118+
Body: r,
119+
}
120+
121+
mockClient := mocks.NewMockHTTPClient(ctrl)
122+
mockClient.EXPECT().Get(gomock.Any()).Return(&response, nil)
123+
124+
p := New(logger, mockClient)
125+
126+
clusterMemberData, err := p.GetClusterMembers(newMockPod("192.168.100.1"))
127+
128+
if !reflect.DeepEqual(expectedClusterMemberData, clusterMemberData) {
129+
t.Errorf("Patroni cluster members differ: expected: %#v, got: %#v", expectedClusterMemberData, clusterMemberData)
130+
}
131+
132+
if err != nil {
133+
t.Errorf("Could not read Patroni data: %v", err)
134+
}
135+
}
136+
88137
func TestGetMemberData(t *testing.T) {
89138
ctrl := gomock.NewController(t)
90139
defer ctrl.Finish()

0 commit comments

Comments
 (0)