Skip to content

Commit c1a671a

Browse files
committed
keeper: update in memory dbLocalState only after saving on disk
improve the in memory dbLocalState management forcing its update only after being written to the local state file. In such way we'll only report the current generation, dbuid etc... to the sentinel only after we are sure we have saved it locally.
1 parent 3403866 commit c1a671a

File tree

1 file changed

+123
-124
lines changed

1 file changed

+123
-124
lines changed

cmd/keeper/cmd/keeper.go

Lines changed: 123 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"syscall"
3232
"time"
3333

34+
"github.com/mitchellh/copystructure"
3435
"github.com/sorintlab/stolon/cmd"
3536
"github.com/sorintlab/stolon/internal/cluster"
3637
"github.com/sorintlab/stolon/internal/common"
@@ -71,6 +72,21 @@ type DBLocalState struct {
7172
InitPGParameters common.Parameters
7273
}
7374

75+
func (s *DBLocalState) DeepCopy() *DBLocalState {
76+
if s == nil {
77+
return nil
78+
}
79+
ns, err := copystructure.Copy(s)
80+
if err != nil {
81+
panic(err)
82+
}
83+
// paranoid test
84+
if !reflect.DeepEqual(s, ns) {
85+
panic("not equal")
86+
}
87+
return ns.(*DBLocalState)
88+
}
89+
7490
type config struct {
7591
cmd.CommonConfig
7692

@@ -276,8 +292,9 @@ func (p *PostgresKeeper) createPGParameters(db *cluster.DB) common.Parameters {
276292
parameters := common.Parameters{}
277293

278294
// Include init parameters if include config is required
295+
dbls := p.dbLocalStateCopy()
279296
if db.Spec.IncludeConfig {
280-
for k, v := range p.dbLocalState.InitPGParameters {
297+
for k, v := range dbls.InitPGParameters {
281298
parameters[k] = v
282299
}
283300
}
@@ -496,6 +513,12 @@ func NewPostgresKeeper(cfg *config, end chan error) (*PostgresKeeper, error) {
496513
return p, nil
497514
}
498515

516+
func (p *PostgresKeeper) dbLocalStateCopy() *DBLocalState {
517+
p.localStateMutex.Lock()
518+
defer p.localStateMutex.Unlock()
519+
return p.dbLocalState.DeepCopy()
520+
}
521+
499522
func (p *PostgresKeeper) usePgrewind(db *cluster.DB) bool {
500523
return p.pgSUUsername != "" && p.pgSUPassword != "" && db.Spec.UsePgrewind
501524
}
@@ -590,10 +613,9 @@ func (p *PostgresKeeper) GetPGState(pctx context.Context) (*cluster.PostgresStat
590613
// Just get one pgstate at a time to avoid exausting available connections
591614
pgState := &cluster.PostgresState{}
592615

593-
p.localStateMutex.Lock()
594-
pgState.UID = p.dbLocalState.UID
595-
pgState.Generation = p.dbLocalState.Generation
596-
p.localStateMutex.Unlock()
616+
dbls := p.dbLocalStateCopy()
617+
pgState.UID = dbls.UID
618+
pgState.Generation = dbls.Generation
597619

598620
pgState.ListenAddress = p.pgListenAddress
599621
pgState.Port = p.pgPort
@@ -1002,7 +1024,7 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
10021024

10031025
var pgParameters common.Parameters
10041026

1005-
dbls := p.dbLocalState
1027+
dbls := p.dbLocalStateCopy()
10061028
if dbls.Initializing {
10071029
// If we are here this means that the db initialization or
10081030
// resync has failed so we have to clean up stale data
@@ -1019,12 +1041,12 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
10191041
return
10201042
}
10211043
// Reset current db local state since it's not valid anymore
1022-
p.localStateMutex.Lock()
1023-
dbls.UID = ""
1024-
dbls.Generation = cluster.NoGeneration
1025-
dbls.Initializing = false
1026-
p.localStateMutex.Unlock()
1027-
if err = p.saveDBLocalState(); err != nil {
1044+
ndbls := &DBLocalState{
1045+
UID: "",
1046+
Generation: cluster.NoGeneration,
1047+
Initializing: false,
1048+
}
1049+
if err = p.saveDBLocalState(ndbls); err != nil {
10281050
log.Errorw("failed to save db local state", zap.Error(err))
10291051
return
10301052
}
@@ -1048,15 +1070,15 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
10481070
log.Debugw("db status", "initialized", false, "started", false)
10491071
}
10501072

1073+
dbls = p.dbLocalStateCopy()
10511074
// if the db is initialized but there isn't a db local state then generate a new one
10521075
if initialized && dbls.UID == "" {
1053-
p.localStateMutex.Lock()
1054-
dbls.UID = common.UID()
1055-
dbls.Generation = cluster.NoGeneration
1056-
dbls.InitPGParameters = nil
1057-
dbls.Initializing = false
1058-
p.localStateMutex.Unlock()
1059-
if err = p.saveDBLocalState(); err != nil {
1076+
ndbls := &DBLocalState{
1077+
UID: common.UID(),
1078+
Generation: cluster.NoGeneration,
1079+
Initializing: false,
1080+
}
1081+
if err = p.saveDBLocalState(ndbls); err != nil {
10601082
log.Errorw("failed to save db local state", zap.Error(err))
10611083
return
10621084
}
@@ -1067,14 +1089,13 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
10671089
switch db.Spec.InitMode {
10681090
case cluster.DBInitModeNew:
10691091
log.Infow("initializing the database cluster")
1070-
p.localStateMutex.Lock()
1071-
dbls.UID = db.UID
1072-
// Set a no generation since we aren't already converged.
1073-
dbls.Generation = cluster.NoGeneration
1074-
dbls.InitPGParameters = nil
1075-
dbls.Initializing = true
1076-
p.localStateMutex.Unlock()
1077-
if err = p.saveDBLocalState(); err != nil {
1092+
ndbls := &DBLocalState{
1093+
UID: db.UID,
1094+
// Set a no generation since we aren't already converged.
1095+
Generation: cluster.NoGeneration,
1096+
Initializing: true,
1097+
}
1098+
if err = p.saveDBLocalState(ndbls); err != nil {
10781099
log.Errorw("failed to save db local state", zap.Error(err))
10791100
return
10801101
}
@@ -1106,30 +1127,23 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
11061127
}
11071128
initialized = true
11081129

1130+
if err = pgm.StartTmpMerged(); err != nil {
1131+
log.Errorw("failed to start instance", zap.Error(err))
1132+
return
1133+
}
1134+
if err = pgm.WaitReady(cluster.DefaultDBWaitReadyTimeout); err != nil {
1135+
log.Errorw("timeout waiting for instance to be ready", zap.Error(err))
1136+
return
1137+
}
11091138
if db.Spec.IncludeConfig {
1110-
if err = pgm.StartTmpMerged(); err != nil {
1111-
log.Errorw("failed to start instance", zap.Error(err))
1112-
return
1113-
}
1114-
if err = pgm.WaitReady(cluster.DefaultDBWaitReadyTimeout); err != nil {
1115-
log.Errorw("timeout waiting for instance to be ready", zap.Error(err))
1116-
return
1117-
}
11181139
pgParameters, err = pgm.GetConfigFilePGParameters()
11191140
if err != nil {
11201141
log.Errorw("failed to retrieve postgres parameters", zap.Error(err))
11211142
return
11221143
}
1123-
p.localStateMutex.Lock()
1124-
dbls.InitPGParameters = pgParameters
1125-
p.localStateMutex.Unlock()
1126-
} else {
1127-
if err = pgm.StartTmpMerged(); err != nil {
1128-
log.Errorw("failed to start instance", zap.Error(err))
1129-
return
1130-
}
1131-
if err = pgm.WaitReady(cluster.DefaultDBWaitReadyTimeout); err != nil {
1132-
log.Errorw("timeout waiting for instance to be ready", zap.Error(err))
1144+
ndbls.InitPGParameters = pgParameters
1145+
if err = p.saveDBLocalState(ndbls); err != nil {
1146+
log.Errorw("failed to save db local state", zap.Error(err))
11331147
return
11341148
}
11351149
}
@@ -1140,24 +1154,19 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
11401154
return
11411155
}
11421156

1143-
if err = p.saveDBLocalState(); err != nil {
1144-
log.Errorw("failed to save db local state", zap.Error(err))
1145-
return
1146-
}
11471157
if err = pgm.StopIfStarted(true); err != nil {
11481158
log.Errorw("failed to stop pg instance", zap.Error(err))
11491159
return
11501160
}
11511161
case cluster.DBInitModePITR:
11521162
log.Infow("restoring the database cluster")
1153-
p.localStateMutex.Lock()
1154-
dbls.UID = db.UID
1155-
// Set a no generation since we aren't already converged.
1156-
dbls.Generation = cluster.NoGeneration
1157-
dbls.InitPGParameters = nil
1158-
dbls.Initializing = true
1159-
p.localStateMutex.Unlock()
1160-
if err = p.saveDBLocalState(); err != nil {
1163+
ndbls := &DBLocalState{
1164+
UID: db.UID,
1165+
// Set a no generation since we aren't already converged.
1166+
Generation: cluster.NoGeneration,
1167+
Initializing: true,
1168+
}
1169+
if err = p.saveDBLocalState(ndbls); err != nil {
11611170
log.Errorw("failed to save db local state", zap.Error(err))
11621171
return
11631172
}
@@ -1211,31 +1220,28 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
12111220
log.Errorw("failed to retrieve postgres parameters", zap.Error(err))
12121221
return
12131222
}
1214-
p.localStateMutex.Lock()
1215-
dbls.InitPGParameters = pgParameters
1216-
p.localStateMutex.Unlock()
1223+
ndbls.InitPGParameters = pgParameters
1224+
if err = p.saveDBLocalState(ndbls); err != nil {
1225+
log.Errorw("failed to save db local state", zap.Error(err))
1226+
return
1227+
}
12171228
}
12181229
initialized = true
12191230

1220-
if err = p.saveDBLocalState(); err != nil {
1221-
log.Errorw("failed to save db local state", zap.Error(err))
1222-
return
1223-
}
12241231
if err = pgm.StopIfStarted(true); err != nil {
12251232
log.Errorw("failed to stop pg instance", zap.Error(err))
12261233
return
12271234
}
12281235
case cluster.DBInitModeResync:
12291236
log.Infow("resyncing the database cluster")
1230-
// replace our current db uid with the required one.
1231-
p.localStateMutex.Lock()
1232-
dbls.UID = db.UID
1233-
// Set a no generation since we aren't already converged.
1234-
dbls.Generation = cluster.NoGeneration
1235-
dbls.InitPGParameters = nil
1236-
dbls.Initializing = true
1237-
p.localStateMutex.Unlock()
1238-
if err = p.saveDBLocalState(); err != nil {
1237+
ndbls := &DBLocalState{
1238+
// replace our current db uid with the required one.
1239+
UID: db.UID,
1240+
// Set a no generation since we aren't already converged.
1241+
Generation: cluster.NoGeneration,
1242+
Initializing: true,
1243+
}
1244+
if err = p.saveDBLocalState(ndbls); err != nil {
12391245
log.Errorw("failed to save db local state", zap.Error(err))
12401246
return
12411247
}
@@ -1332,14 +1338,14 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
13321338
initialized = true
13331339

13341340
case cluster.DBInitModeExisting:
1335-
// replace our current db uid with the required one.
1336-
p.localStateMutex.Lock()
1337-
dbls.UID = db.UID
1338-
// Set a no generation since we aren't already converged.
1339-
dbls.Generation = cluster.NoGeneration
1340-
dbls.InitPGParameters = nil
1341-
p.localStateMutex.Unlock()
1342-
if err = p.saveDBLocalState(); err != nil {
1341+
ndbls := &DBLocalState{
1342+
// replace our current db uid with the required one.
1343+
UID: db.UID,
1344+
// Set a no generation since we aren't already converged.
1345+
Generation: cluster.NoGeneration,
1346+
Initializing: false,
1347+
}
1348+
if err = p.saveDBLocalState(ndbls); err != nil {
13431349
log.Errorw("failed to save db local state", zap.Error(err))
13441350
return
13451351
}
@@ -1353,55 +1359,39 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
13531359
log.Errorw("failed to stop pg instance", zap.Error(err))
13541360
return
13551361
}
1362+
if err = pgm.StartTmpMerged(); err != nil {
1363+
log.Errorw("failed to start instance", zap.Error(err))
1364+
return
1365+
}
1366+
if err = pgm.WaitReady(cluster.DefaultDBWaitReadyTimeout); err != nil {
1367+
log.Errorw("timeout waiting for instance to be ready", zap.Error(err))
1368+
return
1369+
}
13561370
if db.Spec.IncludeConfig {
1357-
if err = pgm.StartTmpMerged(); err != nil {
1358-
log.Errorw("failed to start instance", zap.Error(err))
1359-
return
1360-
}
1361-
if err = pgm.WaitReady(cluster.DefaultDBWaitReadyTimeout); err != nil {
1362-
log.Errorw("timeout waiting for instance to be ready", zap.Error(err))
1363-
return
1364-
}
13651371
pgParameters, err = pgm.GetConfigFilePGParameters()
13661372
if err != nil {
13671373
log.Errorw("failed to retrieve postgres parameters", zap.Error(err))
13681374
return
13691375
}
1370-
p.localStateMutex.Lock()
1371-
dbls.InitPGParameters = pgParameters
1372-
p.localStateMutex.Unlock()
1373-
} else {
1374-
if err = pgm.StartTmpMerged(); err != nil {
1375-
log.Errorw("failed to start instance", zap.Error(err))
1376-
return
1377-
}
1378-
if err = pgm.WaitReady(cluster.DefaultDBWaitReadyTimeout); err != nil {
1379-
log.Errorw("timeout waiting for instance to be ready", zap.Error(err))
1376+
ndbls.InitPGParameters = pgParameters
1377+
if err = p.saveDBLocalState(ndbls); err != nil {
1378+
log.Errorw("failed to save db local state", zap.Error(err))
13801379
return
13811380
}
13821381
}
1383-
log.Infow("updating our db UID with the cluster data provided db UID")
1384-
// replace our current db uid with the required one.
1385-
p.localStateMutex.Lock()
1386-
dbls.InitPGParameters = pgParameters
1387-
p.localStateMutex.Unlock()
1388-
if err = p.saveDBLocalState(); err != nil {
1389-
log.Errorw("failed to save db local state", zap.Error(err))
1390-
return
1391-
}
13921382
if err = pgm.StopIfStarted(true); err != nil {
13931383
log.Errorw("failed to stop pg instance", zap.Error(err))
13941384
return
13951385
}
13961386
case cluster.DBInitModeNone:
1397-
// replace our current db uid with the required one.
1398-
p.localStateMutex.Lock()
1399-
dbls.UID = db.UID
1400-
// Set a no generation since we aren't already converged.
1401-
dbls.Generation = cluster.NoGeneration
1402-
dbls.InitPGParameters = nil
1403-
p.localStateMutex.Unlock()
1404-
if err = p.saveDBLocalState(); err != nil {
1387+
ndbls := &DBLocalState{
1388+
// replace our current db uid with the required one.
1389+
UID: db.UID,
1390+
// Set a no generation since we aren't already converged.
1391+
Generation: cluster.NoGeneration,
1392+
Initializing: false,
1393+
}
1394+
if err = p.saveDBLocalState(ndbls); err != nil {
14051395
log.Errorw("failed to save db local state", zap.Error(err))
14061396
return
14071397
}
@@ -1648,11 +1638,10 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
16481638
}
16491639

16501640
// If we are here, then all went well and we can update the db generation and save it locally
1651-
p.localStateMutex.Lock()
1652-
dbls.Generation = db.Generation
1653-
dbls.Initializing = false
1654-
p.localStateMutex.Unlock()
1655-
if err := p.saveDBLocalState(); err != nil {
1641+
ndbls := p.dbLocalStateCopy()
1642+
ndbls.Generation = db.Generation
1643+
ndbls.Initializing = false
1644+
if err := p.saveDBLocalState(ndbls); err != nil {
16561645
log.Errorw("failed to save db local state", zap.Error(err))
16571646
return
16581647
}
@@ -1700,12 +1689,22 @@ func (p *PostgresKeeper) loadDBLocalState() error {
17001689
return nil
17011690
}
17021691

1703-
func (p *PostgresKeeper) saveDBLocalState() error {
1704-
sj, err := json.Marshal(p.dbLocalState)
1692+
// saveDBLocalState saves on disk the dbLocalState and only if successfull
1693+
// updates the current in memory state
1694+
func (p *PostgresKeeper) saveDBLocalState(dbls *DBLocalState) error {
1695+
sj, err := json.Marshal(dbls)
17051696
if err != nil {
17061697
return err
17071698
}
1708-
return common.WriteFileAtomic(p.dbLocalStateFilePath(), 0600, sj)
1699+
if err = common.WriteFileAtomic(p.dbLocalStateFilePath(), 0600, sj); err != nil {
1700+
return err
1701+
}
1702+
1703+
p.localStateMutex.Lock()
1704+
p.dbLocalState = dbls.DeepCopy()
1705+
p.localStateMutex.Unlock()
1706+
1707+
return nil
17091708
}
17101709

17111710
// IsMaster return if the db is the cluster master db.

0 commit comments

Comments
 (0)