Skip to content

Commit beb6ab2

Browse files
committed
Merge pull request sorintlab#509 from sgotti/keeper_update_in_memory_dbLocalState_only_after_saving_on_disk
keeper: update in memory dbLocalState only after saving on disk
2 parents 3403866 + c1a671a commit beb6ab2

File tree

1 file changed

+123
-124
lines changed

1 file changed

+123
-124
lines changed

cmd/keeper/cmd/keeper.go

Lines changed: 123 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"syscall"
3232
"time"
3333

34+
"github.com/mitchellh/copystructure"
3435
"github.com/sorintlab/stolon/cmd"
3536
"github.com/sorintlab/stolon/internal/cluster"
3637
"github.com/sorintlab/stolon/internal/common"
@@ -71,6 +72,21 @@ type DBLocalState struct {
7172
InitPGParameters common.Parameters
7273
}
7374

75+
func (s *DBLocalState) DeepCopy() *DBLocalState {
76+
if s == nil {
77+
return nil
78+
}
79+
ns, err := copystructure.Copy(s)
80+
if err != nil {
81+
panic(err)
82+
}
83+
// paranoid test
84+
if !reflect.DeepEqual(s, ns) {
85+
panic("not equal")
86+
}
87+
return ns.(*DBLocalState)
88+
}
89+
7490
type config struct {
7591
cmd.CommonConfig
7692

@@ -276,8 +292,9 @@ func (p *PostgresKeeper) createPGParameters(db *cluster.DB) common.Parameters {
276292
parameters := common.Parameters{}
277293

278294
// Include init parameters if include config is required
295+
dbls := p.dbLocalStateCopy()
279296
if db.Spec.IncludeConfig {
280-
for k, v := range p.dbLocalState.InitPGParameters {
297+
for k, v := range dbls.InitPGParameters {
281298
parameters[k] = v
282299
}
283300
}
@@ -496,6 +513,12 @@ func NewPostgresKeeper(cfg *config, end chan error) (*PostgresKeeper, error) {
496513
return p, nil
497514
}
498515

516+
func (p *PostgresKeeper) dbLocalStateCopy() *DBLocalState {
517+
p.localStateMutex.Lock()
518+
defer p.localStateMutex.Unlock()
519+
return p.dbLocalState.DeepCopy()
520+
}
521+
499522
func (p *PostgresKeeper) usePgrewind(db *cluster.DB) bool {
500523
return p.pgSUUsername != "" && p.pgSUPassword != "" && db.Spec.UsePgrewind
501524
}
@@ -590,10 +613,9 @@ func (p *PostgresKeeper) GetPGState(pctx context.Context) (*cluster.PostgresStat
590613
// Just get one pgstate at a time to avoid exausting available connections
591614
pgState := &cluster.PostgresState{}
592615

593-
p.localStateMutex.Lock()
594-
pgState.UID = p.dbLocalState.UID
595-
pgState.Generation = p.dbLocalState.Generation
596-
p.localStateMutex.Unlock()
616+
dbls := p.dbLocalStateCopy()
617+
pgState.UID = dbls.UID
618+
pgState.Generation = dbls.Generation
597619

598620
pgState.ListenAddress = p.pgListenAddress
599621
pgState.Port = p.pgPort
@@ -1002,7 +1024,7 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
10021024

10031025
var pgParameters common.Parameters
10041026

1005-
dbls := p.dbLocalState
1027+
dbls := p.dbLocalStateCopy()
10061028
if dbls.Initializing {
10071029
// If we are here this means that the db initialization or
10081030
// resync has failed so we have to clean up stale data
@@ -1019,12 +1041,12 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
10191041
return
10201042
}
10211043
// Reset current db local state since it's not valid anymore
1022-
p.localStateMutex.Lock()
1023-
dbls.UID = ""
1024-
dbls.Generation = cluster.NoGeneration
1025-
dbls.Initializing = false
1026-
p.localStateMutex.Unlock()
1027-
if err = p.saveDBLocalState(); err != nil {
1044+
ndbls := &DBLocalState{
1045+
UID: "",
1046+
Generation: cluster.NoGeneration,
1047+
Initializing: false,
1048+
}
1049+
if err = p.saveDBLocalState(ndbls); err != nil {
10281050
log.Errorw("failed to save db local state", zap.Error(err))
10291051
return
10301052
}
@@ -1048,15 +1070,15 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
10481070
log.Debugw("db status", "initialized", false, "started", false)
10491071
}
10501072

1073+
dbls = p.dbLocalStateCopy()
10511074
// if the db is initialized but there isn't a db local state then generate a new one
10521075
if initialized && dbls.UID == "" {
1053-
p.localStateMutex.Lock()
1054-
dbls.UID = common.UID()
1055-
dbls.Generation = cluster.NoGeneration
1056-
dbls.InitPGParameters = nil
1057-
dbls.Initializing = false
1058-
p.localStateMutex.Unlock()
1059-
if err = p.saveDBLocalState(); err != nil {
1076+
ndbls := &DBLocalState{
1077+
UID: common.UID(),
1078+
Generation: cluster.NoGeneration,
1079+
Initializing: false,
1080+
}
1081+
if err = p.saveDBLocalState(ndbls); err != nil {
10601082
log.Errorw("failed to save db local state", zap.Error(err))
10611083
return
10621084
}
@@ -1067,14 +1089,13 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
10671089
switch db.Spec.InitMode {
10681090
case cluster.DBInitModeNew:
10691091
log.Infow("initializing the database cluster")
1070-
p.localStateMutex.Lock()
1071-
dbls.UID = db.UID
1072-
// Set a no generation since we aren't already converged.
1073-
dbls.Generation = cluster.NoGeneration
1074-
dbls.InitPGParameters = nil
1075-
dbls.Initializing = true
1076-
p.localStateMutex.Unlock()
1077-
if err = p.saveDBLocalState(); err != nil {
1092+
ndbls := &DBLocalState{
1093+
UID: db.UID,
1094+
// Set a no generation since we aren't already converged.
1095+
Generation: cluster.NoGeneration,
1096+
Initializing: true,
1097+
}
1098+
if err = p.saveDBLocalState(ndbls); err != nil {
10781099
log.Errorw("failed to save db local state", zap.Error(err))
10791100
return
10801101
}
@@ -1106,30 +1127,23 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
11061127
}
11071128
initialized = true
11081129

1130+
if err = pgm.StartTmpMerged(); err != nil {
1131+
log.Errorw("failed to start instance", zap.Error(err))
1132+
return
1133+
}
1134+
if err = pgm.WaitReady(cluster.DefaultDBWaitReadyTimeout); err != nil {
1135+
log.Errorw("timeout waiting for instance to be ready", zap.Error(err))
1136+
return
1137+
}
11091138
if db.Spec.IncludeConfig {
1110-
if err = pgm.StartTmpMerged(); err != nil {
1111-
log.Errorw("failed to start instance", zap.Error(err))
1112-
return
1113-
}
1114-
if err = pgm.WaitReady(cluster.DefaultDBWaitReadyTimeout); err != nil {
1115-
log.Errorw("timeout waiting for instance to be ready", zap.Error(err))
1116-
return
1117-
}
11181139
pgParameters, err = pgm.GetConfigFilePGParameters()
11191140
if err != nil {
11201141
log.Errorw("failed to retrieve postgres parameters", zap.Error(err))
11211142
return
11221143
}
1123-
p.localStateMutex.Lock()
1124-
dbls.InitPGParameters = pgParameters
1125-
p.localStateMutex.Unlock()
1126-
} else {
1127-
if err = pgm.StartTmpMerged(); err != nil {
1128-
log.Errorw("failed to start instance", zap.Error(err))
1129-
return
1130-
}
1131-
if err = pgm.WaitReady(cluster.DefaultDBWaitReadyTimeout); err != nil {
1132-
log.Errorw("timeout waiting for instance to be ready", zap.Error(err))
1144+
ndbls.InitPGParameters = pgParameters
1145+
if err = p.saveDBLocalState(ndbls); err != nil {
1146+
log.Errorw("failed to save db local state", zap.Error(err))
11331147
return
11341148
}
11351149
}
@@ -1140,24 +1154,19 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
11401154
return
11411155
}
11421156

1143-
if err = p.saveDBLocalState(); err != nil {
1144-
log.Errorw("failed to save db local state", zap.Error(err))
1145-
return
1146-
}
11471157
if err = pgm.StopIfStarted(true); err != nil {
11481158
log.Errorw("failed to stop pg instance", zap.Error(err))
11491159
return
11501160
}
11511161
case cluster.DBInitModePITR:
11521162
log.Infow("restoring the database cluster")
1153-
p.localStateMutex.Lock()
1154-
dbls.UID = db.UID
1155-
// Set a no generation since we aren't already converged.
1156-
dbls.Generation = cluster.NoGeneration
1157-
dbls.InitPGParameters = nil
1158-
dbls.Initializing = true
1159-
p.localStateMutex.Unlock()
1160-
if err = p.saveDBLocalState(); err != nil {
1163+
ndbls := &DBLocalState{
1164+
UID: db.UID,
1165+
// Set a no generation since we aren't already converged.
1166+
Generation: cluster.NoGeneration,
1167+
Initializing: true,
1168+
}
1169+
if err = p.saveDBLocalState(ndbls); err != nil {
11611170
log.Errorw("failed to save db local state", zap.Error(err))
11621171
return
11631172
}
@@ -1211,31 +1220,28 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
12111220
log.Errorw("failed to retrieve postgres parameters", zap.Error(err))
12121221
return
12131222
}
1214-
p.localStateMutex.Lock()
1215-
dbls.InitPGParameters = pgParameters
1216-
p.localStateMutex.Unlock()
1223+
ndbls.InitPGParameters = pgParameters
1224+
if err = p.saveDBLocalState(ndbls); err != nil {
1225+
log.Errorw("failed to save db local state", zap.Error(err))
1226+
return
1227+
}
12171228
}
12181229
initialized = true
12191230

1220-
if err = p.saveDBLocalState(); err != nil {
1221-
log.Errorw("failed to save db local state", zap.Error(err))
1222-
return
1223-
}
12241231
if err = pgm.StopIfStarted(true); err != nil {
12251232
log.Errorw("failed to stop pg instance", zap.Error(err))
12261233
return
12271234
}
12281235
case cluster.DBInitModeResync:
12291236
log.Infow("resyncing the database cluster")
1230-
// replace our current db uid with the required one.
1231-
p.localStateMutex.Lock()
1232-
dbls.UID = db.UID
1233-
// Set a no generation since we aren't already converged.
1234-
dbls.Generation = cluster.NoGeneration
1235-
dbls.InitPGParameters = nil
1236-
dbls.Initializing = true
1237-
p.localStateMutex.Unlock()
1238-
if err = p.saveDBLocalState(); err != nil {
1237+
ndbls := &DBLocalState{
1238+
// replace our current db uid with the required one.
1239+
UID: db.UID,
1240+
// Set a no generation since we aren't already converged.
1241+
Generation: cluster.NoGeneration,
1242+
Initializing: true,
1243+
}
1244+
if err = p.saveDBLocalState(ndbls); err != nil {
12391245
log.Errorw("failed to save db local state", zap.Error(err))
12401246
return
12411247
}
@@ -1332,14 +1338,14 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
13321338
initialized = true
13331339

13341340
case cluster.DBInitModeExisting:
1335-
// replace our current db uid with the required one.
1336-
p.localStateMutex.Lock()
1337-
dbls.UID = db.UID
1338-
// Set a no generation since we aren't already converged.
1339-
dbls.Generation = cluster.NoGeneration
1340-
dbls.InitPGParameters = nil
1341-
p.localStateMutex.Unlock()
1342-
if err = p.saveDBLocalState(); err != nil {
1341+
ndbls := &DBLocalState{
1342+
// replace our current db uid with the required one.
1343+
UID: db.UID,
1344+
// Set a no generation since we aren't already converged.
1345+
Generation: cluster.NoGeneration,
1346+
Initializing: false,
1347+
}
1348+
if err = p.saveDBLocalState(ndbls); err != nil {
13431349
log.Errorw("failed to save db local state", zap.Error(err))
13441350
return
13451351
}
@@ -1353,55 +1359,39 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
13531359
log.Errorw("failed to stop pg instance", zap.Error(err))
13541360
return
13551361
}
1362+
if err = pgm.StartTmpMerged(); err != nil {
1363+
log.Errorw("failed to start instance", zap.Error(err))
1364+
return
1365+
}
1366+
if err = pgm.WaitReady(cluster.DefaultDBWaitReadyTimeout); err != nil {
1367+
log.Errorw("timeout waiting for instance to be ready", zap.Error(err))
1368+
return
1369+
}
13561370
if db.Spec.IncludeConfig {
1357-
if err = pgm.StartTmpMerged(); err != nil {
1358-
log.Errorw("failed to start instance", zap.Error(err))
1359-
return
1360-
}
1361-
if err = pgm.WaitReady(cluster.DefaultDBWaitReadyTimeout); err != nil {
1362-
log.Errorw("timeout waiting for instance to be ready", zap.Error(err))
1363-
return
1364-
}
13651371
pgParameters, err = pgm.GetConfigFilePGParameters()
13661372
if err != nil {
13671373
log.Errorw("failed to retrieve postgres parameters", zap.Error(err))
13681374
return
13691375
}
1370-
p.localStateMutex.Lock()
1371-
dbls.InitPGParameters = pgParameters
1372-
p.localStateMutex.Unlock()
1373-
} else {
1374-
if err = pgm.StartTmpMerged(); err != nil {
1375-
log.Errorw("failed to start instance", zap.Error(err))
1376-
return
1377-
}
1378-
if err = pgm.WaitReady(cluster.DefaultDBWaitReadyTimeout); err != nil {
1379-
log.Errorw("timeout waiting for instance to be ready", zap.Error(err))
1376+
ndbls.InitPGParameters = pgParameters
1377+
if err = p.saveDBLocalState(ndbls); err != nil {
1378+
log.Errorw("failed to save db local state", zap.Error(err))
13801379
return
13811380
}
13821381
}
1383-
log.Infow("updating our db UID with the cluster data provided db UID")
1384-
// replace our current db uid with the required one.
1385-
p.localStateMutex.Lock()
1386-
dbls.InitPGParameters = pgParameters
1387-
p.localStateMutex.Unlock()
1388-
if err = p.saveDBLocalState(); err != nil {
1389-
log.Errorw("failed to save db local state", zap.Error(err))
1390-
return
1391-
}
13921382
if err = pgm.StopIfStarted(true); err != nil {
13931383
log.Errorw("failed to stop pg instance", zap.Error(err))
13941384
return
13951385
}
13961386
case cluster.DBInitModeNone:
1397-
// replace our current db uid with the required one.
1398-
p.localStateMutex.Lock()
1399-
dbls.UID = db.UID
1400-
// Set a no generation since we aren't already converged.
1401-
dbls.Generation = cluster.NoGeneration
1402-
dbls.InitPGParameters = nil
1403-
p.localStateMutex.Unlock()
1404-
if err = p.saveDBLocalState(); err != nil {
1387+
ndbls := &DBLocalState{
1388+
// replace our current db uid with the required one.
1389+
UID: db.UID,
1390+
// Set a no generation since we aren't already converged.
1391+
Generation: cluster.NoGeneration,
1392+
Initializing: false,
1393+
}
1394+
if err = p.saveDBLocalState(ndbls); err != nil {
14051395
log.Errorw("failed to save db local state", zap.Error(err))
14061396
return
14071397
}
@@ -1648,11 +1638,10 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
16481638
}
16491639

16501640
// If we are here, then all went well and we can update the db generation and save it locally
1651-
p.localStateMutex.Lock()
1652-
dbls.Generation = db.Generation
1653-
dbls.Initializing = false
1654-
p.localStateMutex.Unlock()
1655-
if err := p.saveDBLocalState(); err != nil {
1641+
ndbls := p.dbLocalStateCopy()
1642+
ndbls.Generation = db.Generation
1643+
ndbls.Initializing = false
1644+
if err := p.saveDBLocalState(ndbls); err != nil {
16561645
log.Errorw("failed to save db local state", zap.Error(err))
16571646
return
16581647
}
@@ -1700,12 +1689,22 @@ func (p *PostgresKeeper) loadDBLocalState() error {
17001689
return nil
17011690
}
17021691

1703-
func (p *PostgresKeeper) saveDBLocalState() error {
1704-
sj, err := json.Marshal(p.dbLocalState)
1692+
// saveDBLocalState saves on disk the dbLocalState and only if successfull
1693+
// updates the current in memory state
1694+
func (p *PostgresKeeper) saveDBLocalState(dbls *DBLocalState) error {
1695+
sj, err := json.Marshal(dbls)
17051696
if err != nil {
17061697
return err
17071698
}
1708-
return common.WriteFileAtomic(p.dbLocalStateFilePath(), 0600, sj)
1699+
if err = common.WriteFileAtomic(p.dbLocalStateFilePath(), 0600, sj); err != nil {
1700+
return err
1701+
}
1702+
1703+
p.localStateMutex.Lock()
1704+
p.dbLocalState = dbls.DeepCopy()
1705+
p.localStateMutex.Unlock()
1706+
1707+
return nil
17091708
}
17101709

17111710
// IsMaster return if the db is the cluster master db.

0 commit comments

Comments
 (0)