Skip to content

Commit df6d4ee

Browse files
committed
Merge pull request sorintlab#531 from sgotti/namespace_additional_master_replication_slots
keeper: prefix additional repl slots with `stolon_`
2 parents 07bdac8 + 54f20a5 commit df6d4ee

File tree

5 files changed

+97
-67
lines changed

5 files changed

+97
-67
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
### v0.12.0
2+
3+
## Upgrades notes.
4+
5+
* Replication slots declared in the clusterspec `additionalMasterReplicationSlots` option will now be prefixed with the `stolon_` string to let users be able to manually create/drop custom replication slots (they shouldn't start with `stolon_`). Users of these feature should upgrade all the references to these replication slots adding the `stolon_` prefix.
6+
17
### v0.11.0
28

39
#### New features

cmd/keeper/cmd/keeper.go

Lines changed: 27 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -886,68 +886,46 @@ func (p *PostgresKeeper) isDifferentTimelineBranch(followedDB *cluster.DB, pgSta
886886
return false
887887
}
888888

889-
func (p *PostgresKeeper) updateReplSlots(curReplSlots []string, uid string, followersUIDs []string) error {
890-
// Drop internal replication slots
891-
for _, slot := range curReplSlots {
892-
if !common.IsStolonName(slot) {
893-
continue
894-
}
895-
if !util.StringInSlice(followersUIDs, common.NameFromStolonName(slot)) {
896-
log.Infow("dropping replication slot since db not marked as follower", "slot", slot, "db", common.NameFromStolonName(slot))
897-
if err := p.pgm.DropReplicationSlot(slot); err != nil {
898-
log.Errorw("failed to drop replication slot", "slot", slot, "err", err)
899-
// don't return the error but continue also if drop failed (standby still connected)
900-
}
901-
}
902-
}
903-
// Create internal replication slots
889+
func (p *PostgresKeeper) updateReplSlots(curReplSlots []string, uid string, followersUIDs, additionalReplSlots []string) error {
890+
internalReplSlots := map[string]struct{}{}
891+
892+
// Create a list of the wanted internal replication slots
904893
for _, followerUID := range followersUIDs {
905894
if followerUID == uid {
906895
continue
907896
}
908-
replSlot := common.StolonName(followerUID)
909-
if !util.StringInSlice(curReplSlots, replSlot) {
910-
log.Infow("creating replication slot", "slot", replSlot, "db", followerUID)
911-
if err := p.pgm.CreateReplicationSlot(replSlot); err != nil {
912-
log.Errorw("failed to create replication slot", "slot", replSlot, zap.Error(err))
913-
return err
914-
}
915-
}
897+
internalReplSlots[common.StolonName(followerUID)] = struct{}{}
916898
}
917-
return nil
918-
}
919899

920-
func (p *PostgresKeeper) updateAdditionalReplSlots(curReplSlots []string, additionalReplSlots []string) error {
921-
// detect not stolon replication slots
922-
notStolonSlots := []string{}
923-
for _, curReplSlot := range curReplSlots {
924-
if !common.IsStolonName(curReplSlot) {
925-
notStolonSlots = append(notStolonSlots, curReplSlot)
926-
}
900+
// Add AdditionalReplicationSlots
901+
for _, slot := range additionalReplSlots {
902+
internalReplSlots[common.StolonName(slot)] = struct{}{}
927903
}
928904

929-
// drop unnecessary slots
930-
for _, slot := range notStolonSlots {
931-
if !util.StringInSlice(additionalReplSlots, slot) {
905+
// Drop internal replication slots
906+
for _, slot := range curReplSlots {
907+
if !common.IsStolonName(slot) {
908+
continue
909+
}
910+
if _, ok := internalReplSlots[slot]; !ok {
932911
log.Infow("dropping replication slot", "slot", slot)
933912
if err := p.pgm.DropReplicationSlot(slot); err != nil {
934-
log.Errorw("failed to drop replication slot", "slot", slot, zap.Error(err))
935-
return err
913+
log.Errorw("failed to drop replication slot", "slot", slot, "err", err)
914+
// don't return the error but continue also if drop failed (standby still connected)
936915
}
937916
}
938917
}
939918

940-
// create required slots
941-
for _, slot := range additionalReplSlots {
942-
if !util.StringInSlice(notStolonSlots, slot) {
919+
// Create internal replication slots
920+
for slot := range internalReplSlots {
921+
if !util.StringInSlice(curReplSlots, slot) {
943922
log.Infow("creating replication slot", "slot", slot)
944923
if err := p.pgm.CreateReplicationSlot(slot); err != nil {
945924
log.Errorw("failed to create replication slot", "slot", slot, zap.Error(err))
946925
return err
947926
}
948927
}
949928
}
950-
951929
return nil
952930
}
953931

@@ -961,14 +939,10 @@ func (p *PostgresKeeper) refreshReplicationSlots(cd *cluster.ClusterData, db *cl
961939

962940
followersUIDs := db.Spec.Followers
963941

964-
if err = p.updateReplSlots(currentReplicationSlots, db.UID, followersUIDs); err != nil {
942+
if err = p.updateReplSlots(currentReplicationSlots, db.UID, followersUIDs, db.Spec.AdditionalReplicationSlots); err != nil {
965943
log.Errorw("error updating replication slots", zap.Error(err))
966944
return err
967945
}
968-
if err = p.updateAdditionalReplSlots(currentReplicationSlots, db.Spec.AdditionalReplicationSlots); err != nil {
969-
log.Errorw("error updating additional replication slots", zap.Error(err))
970-
return err
971-
}
972946

973947
return nil
974948
}
@@ -1449,7 +1423,7 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
14491423
log.Infow("already master")
14501424
}
14511425

1452-
if err = p.refreshReplicationSlots(cd, db); err != nil {
1426+
if err := p.refreshReplicationSlots(cd, db); err != nil {
14531427
log.Errorw("error updating replication slots", zap.Error(err))
14541428
return
14551429
}
@@ -1499,7 +1473,8 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
14991473
// TODO(sgotti) Check that the followed instance has all the needed WAL segments
15001474

15011475
// Update our primary_conninfo if replConnString changed
1502-
if db.Spec.FollowConfig.Type == cluster.FollowTypeInternal {
1476+
switch db.Spec.FollowConfig.Type {
1477+
case cluster.FollowTypeInternal:
15031478
var curReplConnParams postgresql.ConnParams
15041479

15051480
curReplConnParams, err = pgm.GetPrimaryConninfo()
@@ -1531,13 +1506,11 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
15311506
}
15321507
}
15331508

1534-
// TODO(sgotti) currently we ignore DBSpec.AdditionalReplicationSlots on standbys
1535-
// So we don't touch replication slots and manually created
1536-
// slots are kept. If the instance becomes master then they'll
1537-
// be dropped.
1538-
}
1509+
if err = p.refreshReplicationSlots(cd, db); err != nil {
1510+
log.Errorw("error updating replication slots", zap.Error(err))
1511+
}
15391512

1540-
if db.Spec.FollowConfig.Type == cluster.FollowTypeExternal {
1513+
case cluster.FollowTypeExternal:
15411514
// Update recovery conf if our FollowConfig has changed
15421515
curReplConnParams, err := pgm.GetPrimaryConninfo()
15431516
if err != nil {
@@ -1573,7 +1546,6 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
15731546

15741547
if err = p.refreshReplicationSlots(cd, db); err != nil {
15751548
log.Errorw("error updating replication slots", zap.Error(err))
1576-
return
15771549
}
15781550
}
15791551

doc/cluster_spec.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ Some options in a running cluster specification can be changed to update the des
2525
| minSynchronousStandbys | minimum number of required synchronous standbys when synchronous replication is enabled (only set this to a value > 1 when using PostgreSQL >= 9.6) | no | uint16 | 1 |
2626
| maxSynchronousStandbys | maximum number of required synchronous standbys when synchronous replication is enabled (only set this to a value > 1 when using PostgreSQL >= 9.6) | no | uint16 | 1 |
2727
| additionalWalSenders | number of additional wal_senders in addition to the ones internally defined by stolon, useful to provide enough wal senders for external standbys (changing this value requires an instance restart) | no | uint16 | 5 |
28-
| additionalMasterReplicationSlots | a list of additional replication slots to be created on the master postgres instance. Replication slots not defined here will be dropped from the master instance (i.e. manually created replication slots will be removed). | no | []string | null |
28+
| additionalMasterReplicationSlots | a list of additional physical replication slots to be created on the master postgres instance. They will be prefixed with `stolon_` (like internal replication slots used for standby replication) to make them "namespaced" from other replication slots. Replication slots starting with `stolon_` and not defined here (and not used for standby replication) will be dropped from the master instance. | no | []string | null |
2929
| usePgrewind | try to use pg_rewind for faster instance resyncronization. | no | bool | false |
3030
| initMode | The cluster initialization mode. Can be *new* or *existing*. *new* means that a new db cluster will be created on a random keeper and the other keepers will sync with it. *existing* means that a keeper (that needs to have an already created db cluster) will be choosed as the initial master and the other keepers will sync with it. In this case the `existingConfig` object needs to be populated. | yes | string | |
3131
| existingConfig | configuration for initMode of type "existing" | if initMode is "existing" | ExistingConfig | |

tests/integration/config_test.go

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package integration
1616

1717
import (
18+
"context"
1819
"fmt"
1920
"io/ioutil"
2021
"os"
@@ -364,16 +365,28 @@ func TestAdditionalReplicationSlots(t *testing.T) {
364365
t.Fatalf("unexpected err: %v", err)
365366
}
366367

368+
cd, _, err := sm.GetClusterData(context.TODO())
369+
if err != nil {
370+
t.Fatalf("unexpected err: %v", err)
371+
}
372+
373+
var standbyDBUID string
374+
for _, db := range cd.DBs {
375+
if db.Spec.KeeperUID == standby.uid {
376+
standbyDBUID = db.UID
377+
}
378+
}
379+
367380
// create additional replslots on master
368381
err = StolonCtl(clusterName, tstore.storeBackend, storeEndpoints, "update", "--patch", `{ "additionalMasterReplicationSlots" : [ "replslot01", "replslot02" ] }`)
369382
if err != nil {
370383
t.Fatalf("unexpected err: %v", err)
371384
}
372-
if err := waitNotStolonReplicationSlots(master, []string{"replslot01", "replslot02"}, 30*time.Second); err != nil {
385+
if err := waitStolonReplicationSlots(master, []string{standbyDBUID, "replslot01", "replslot02"}, 30*time.Second); err != nil {
373386
t.Fatalf("unexpected err: %v", err)
374387
}
375388
// no repl slot on standby
376-
if err := waitNotStolonReplicationSlots(standby, []string{}, 30*time.Second); err != nil {
389+
if err := waitStolonReplicationSlots(standby, []string{}, 30*time.Second); err != nil {
377390
t.Fatalf("unexpected err: %v", err)
378391
}
379392

@@ -382,11 +395,11 @@ func TestAdditionalReplicationSlots(t *testing.T) {
382395
if err != nil {
383396
t.Fatalf("unexpected err: %v", err)
384397
}
385-
if err := waitNotStolonReplicationSlots(master, []string{"replslot01"}, 30*time.Second); err != nil {
398+
if err := waitStolonReplicationSlots(master, []string{standbyDBUID, "replslot01"}, 30*time.Second); err != nil {
386399
t.Fatalf("unexpected err: %v", err)
387400
}
388401
// no repl slot on standby
389-
if err := waitNotStolonReplicationSlots(standby, []string{}, 30*time.Second); err != nil {
402+
if err := waitStolonReplicationSlots(standby, []string{}, 30*time.Second); err != nil {
390403
t.Fatalf("unexpected err: %v", err)
391404
}
392405

@@ -395,11 +408,11 @@ func TestAdditionalReplicationSlots(t *testing.T) {
395408
if err != nil {
396409
t.Fatalf("unexpected err: %v", err)
397410
}
398-
if err := waitNotStolonReplicationSlots(master, []string{}, 30*time.Second); err != nil {
411+
if err := waitStolonReplicationSlots(master, []string{standbyDBUID}, 30*time.Second); err != nil {
399412
t.Fatalf("unexpected err: %v", err)
400413
}
401414
// no repl slot on standby
402-
if err := waitNotStolonReplicationSlots(standby, []string{}, 30*time.Second); err != nil {
415+
if err := waitStolonReplicationSlots(standby, []string{}, 30*time.Second); err != nil {
403416
t.Fatalf("unexpected err: %v", err)
404417
}
405418

@@ -408,19 +421,27 @@ func TestAdditionalReplicationSlots(t *testing.T) {
408421
if err != nil {
409422
t.Fatalf("unexpected err: %v", err)
410423
}
411-
if err := waitNotStolonReplicationSlots(master, []string{"replslot01", "replslot02"}, 30*time.Second); err != nil {
424+
if err := waitStolonReplicationSlots(master, []string{standbyDBUID, "replslot01", "replslot02"}, 30*time.Second); err != nil {
412425
t.Fatalf("unexpected err: %v", err)
413426
}
414427
// no repl slot on standby
415-
if err := waitNotStolonReplicationSlots(standby, []string{}, 30*time.Second); err != nil {
428+
if err := waitStolonReplicationSlots(standby, []string{}, 30*time.Second); err != nil {
416429
t.Fatalf("unexpected err: %v", err)
417430
}
418431

419-
// Manually create a replication slot. It should be dropped.
432+
// Manually create a replication slot. It should not be dropped.
420433
if _, err := master.Exec("select pg_create_physical_replication_slot('manualreplslot')"); err != nil {
421434
t.Fatalf("unexpected err: %v", err)
422435
}
423-
if err := waitNotStolonReplicationSlots(master, []string{"replslot01", "replslot02"}, 30*time.Second); err != nil {
436+
// Manually create a replication slot starting with stolon_ . It should be dropped.
437+
if _, err := master.Exec("select pg_create_physical_replication_slot('stolon_manualreplslot')"); err != nil {
438+
t.Fatalf("unexpected err: %v", err)
439+
}
440+
if err := waitStolonReplicationSlots(master, []string{standbyDBUID, "replslot01", "replslot02"}, 30*time.Second); err != nil {
441+
t.Fatalf("unexpected err: %v", err)
442+
}
443+
// check it here so we are sure the refresh slots function has already been called
444+
if err := waitNotStolonReplicationSlots(master, []string{"manualreplslot"}, 30*time.Second); err != nil {
424445
t.Fatalf("unexpected err: %v", err)
425446
}
426447

@@ -437,7 +458,7 @@ func TestAdditionalReplicationSlots(t *testing.T) {
437458
}
438459

439460
// repl slot on standby which is the new master
440-
if err := waitNotStolonReplicationSlots(standby, []string{"replslot01", "replslot02"}, 30*time.Second); err != nil {
461+
if err := waitStolonReplicationSlots(standby, []string{"replslot01", "replslot02"}, 30*time.Second); err != nil {
441462
t.Fatalf("unexpected err: %v", err)
442463
}
443464
}

tests/integration/utils.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,37 @@ func waitReplicationSlots(q Querier, replSlots []string, timeout time.Duration)
167167
return fmt.Errorf("timeout waiting for replSlots %v, got: %v, last err: %v", replSlots, curReplSlots, err)
168168
}
169169

170+
func waitStolonReplicationSlots(q Querier, replSlots []string, timeout time.Duration) error {
171+
// prefix with stolon_
172+
for i, slot := range replSlots {
173+
replSlots[i] = common.StolonName(slot)
174+
}
175+
sort.Sort(sort.StringSlice(replSlots))
176+
177+
start := time.Now()
178+
var curReplSlots []string
179+
var err error
180+
for time.Now().Add(-timeout).Before(start) {
181+
allReplSlots, err := getReplicationSlots(q)
182+
if err != nil {
183+
goto end
184+
}
185+
curReplSlots = []string{}
186+
for _, s := range allReplSlots {
187+
if common.IsStolonName(s) {
188+
curReplSlots = append(curReplSlots, s)
189+
}
190+
}
191+
sort.Sort(sort.StringSlice(curReplSlots))
192+
if reflect.DeepEqual(replSlots, curReplSlots) {
193+
return nil
194+
}
195+
end:
196+
time.Sleep(2 * time.Second)
197+
}
198+
return fmt.Errorf("timeout waiting for replSlots %v, got: %v, last err: %v", replSlots, curReplSlots, err)
199+
}
200+
170201
func waitNotStolonReplicationSlots(q Querier, replSlots []string, timeout time.Duration) error {
171202
sort.Sort(sort.StringSlice(replSlots))
172203

0 commit comments

Comments
 (0)