Skip to content

Commit 9cc800d

Browse files
committed
proxy: make proxyCheckInterval and proxyTimeout configurable
make proxyCheckInterval and proxyTimeout configurable in the cluster spec. The proxy will publish its current proxyTimeout so the sentinel will know when to consider it as active.
1 parent 1305446 commit 9cc800d

File tree

9 files changed

+153
-77
lines changed

9 files changed

+153
-77
lines changed

cmd/proxy/cmd/proxy.go

+57-11
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,10 @@ type ClusterChecker struct {
8989
endPollonProxyCh chan error
9090

9191
pollonMutex sync.Mutex
92+
93+
proxyCheckInterval time.Duration
94+
proxyTimeout time.Duration
95+
configMutex sync.Mutex
9296
}
9397

9498
func NewClusterChecker(uid string, cfg config) (*ClusterChecker, error) {
@@ -104,6 +108,9 @@ func NewClusterChecker(uid string, cfg config) (*ClusterChecker, error) {
104108
stopListening: cfg.stopListening,
105109
e: e,
106110
endPollonProxyCh: make(chan error),
111+
112+
proxyCheckInterval: cluster.DefaultProxyCheckInterval,
113+
proxyTimeout: cluster.DefaultProxyTimeout,
107114
}, nil
108115
}
109116

@@ -164,15 +171,16 @@ func (c *ClusterChecker) sendPollonConfData(confData pollon.ConfData) {
164171
}
165172
}
166173

167-
func (c *ClusterChecker) SetProxyInfo(e store.Store, generation int64, ttl time.Duration) error {
174+
func (c *ClusterChecker) SetProxyInfo(e store.Store, generation int64, proxyTimeout time.Duration) error {
168175
proxyInfo := &cluster.ProxyInfo{
169-
InfoUID: common.UID(),
170-
UID: c.uid,
171-
Generation: generation,
176+
InfoUID: common.UID(),
177+
UID: c.uid,
178+
Generation: generation,
179+
ProxyTimeout: proxyTimeout,
172180
}
173181
log.Debugf("proxyInfo dump: %s", spew.Sdump(proxyInfo))
174182

175-
if err := c.e.SetProxyInfo(context.TODO(), proxyInfo, ttl); err != nil {
183+
if err := c.e.SetProxyInfo(context.TODO(), proxyInfo, 2*proxyTimeout); err != nil {
176184
return err
177185
}
178186
return nil
@@ -205,13 +213,31 @@ func (c *ClusterChecker) Check() error {
205213
return fmt.Errorf("clusterdata validation failed: %v", err)
206214
}
207215

216+
cdProxyCheckInterval := cd.Cluster.DefSpec().ProxyCheckInterval.Duration
217+
cdProxyTimeout := cd.Cluster.DefSpec().ProxyTimeout.Duration
218+
219+
// use the greater between the current proxy timeout and the one defined in the cluster spec if they're different.
220+
// in this way we're updating our proxyInfo using a timeout that is greater or equal the current active timeout timer.
221+
c.configMutex.Lock()
222+
proxyTimeout := c.proxyTimeout
223+
if cdProxyTimeout > proxyTimeout {
224+
proxyTimeout = cdProxyTimeout
225+
}
226+
c.configMutex.Unlock()
227+
208228
proxy := cd.Proxy
209229
if proxy == nil {
210230
log.Infow("no proxy object available, closing connections to master")
211231
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
212232
// ignore errors on setting proxy info
213-
if err = c.SetProxyInfo(c.e, cluster.NoGeneration, 2*cluster.DefaultProxyTimeoutInterval); err != nil {
233+
if err = c.SetProxyInfo(c.e, cluster.NoGeneration, proxyTimeout); err != nil {
214234
log.Errorw("failed to update proxyInfo", zap.Error(err))
235+
} else {
236+
// update proxyCheckinterval and proxyTimeout only if we successfully updated our proxy info
237+
c.configMutex.Lock()
238+
c.proxyCheckInterval = cdProxyCheckInterval
239+
c.proxyTimeout = cdProxyTimeout
240+
c.configMutex.Unlock()
215241
}
216242
return nil
217243
}
@@ -221,8 +247,14 @@ func (c *ClusterChecker) Check() error {
221247
log.Infow("no db object available, closing connections to master", "db", proxy.Spec.MasterDBUID)
222248
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
223249
// ignore errors on setting proxy info
224-
if err = c.SetProxyInfo(c.e, proxy.Generation, 2*cluster.DefaultProxyTimeoutInterval); err != nil {
250+
if err = c.SetProxyInfo(c.e, proxy.Generation, proxyTimeout); err != nil {
225251
log.Errorw("failed to update proxyInfo", zap.Error(err))
252+
} else {
253+
// update proxyCheckinterval and proxyTimeout only if we successfully updated our proxy info
254+
c.configMutex.Lock()
255+
c.proxyCheckInterval = cdProxyCheckInterval
256+
c.proxyTimeout = cdProxyTimeout
257+
c.configMutex.Unlock()
226258
}
227259
return nil
228260
}
@@ -234,12 +266,18 @@ func (c *ClusterChecker) Check() error {
234266
return nil
235267
}
236268
log.Infow("master address", "address", addr)
237-
if err = c.SetProxyInfo(c.e, proxy.Generation, 2*cluster.DefaultProxyTimeoutInterval); err != nil {
269+
if err = c.SetProxyInfo(c.e, proxy.Generation, proxyTimeout); err != nil {
238270
// if we failed to update our proxy info when a master is defined we
239271
// cannot ignore this error since the sentinel won't know that we exist
240272
// and are sending connections to a master so, when electing a new
241273
// master, it'll not wait for us to close connections to the old one.
242274
return fmt.Errorf("failed to update proxyInfo: %v", err)
275+
} else {
276+
// update proxyCheckinterval and proxyTimeout only if we successfully updated our proxy info
277+
c.configMutex.Lock()
278+
c.proxyCheckInterval = cdProxyCheckInterval
279+
c.proxyTimeout = cdProxyTimeout
280+
c.configMutex.Unlock()
243281
}
244282

245283
// start proxing only if we are inside enabledProxies, this ensures that the
@@ -256,7 +294,9 @@ func (c *ClusterChecker) Check() error {
256294
}
257295

258296
func (c *ClusterChecker) TimeoutChecker(checkOkCh chan struct{}) {
259-
timeoutTimer := time.NewTimer(cluster.DefaultProxyTimeoutInterval)
297+
c.configMutex.Lock()
298+
timeoutTimer := time.NewTimer(c.proxyTimeout)
299+
c.configMutex.Unlock()
260300

261301
for {
262302
select {
@@ -275,7 +315,10 @@ func (c *ClusterChecker) TimeoutChecker(checkOkCh chan struct{}) {
275315

276316
// ignore if stop succeeded or not due to timer already expired
277317
timeoutTimer.Stop()
278-
timeoutTimer = time.NewTimer(cluster.DefaultProxyTimeoutInterval)
318+
319+
c.configMutex.Lock()
320+
timeoutTimer = time.NewTimer(c.proxyTimeout)
321+
c.configMutex.Unlock()
279322
}
280323
}
281324
}
@@ -305,7 +348,10 @@ func (c *ClusterChecker) Start() error {
305348
// report that check was ok
306349
checkOkCh <- struct{}{}
307350
}
308-
timerCh = time.NewTimer(cluster.DefaultProxyCheckInterval).C
351+
c.configMutex.Lock()
352+
timerCh = time.NewTimer(c.proxyCheckInterval).C
353+
c.configMutex.Unlock()
354+
309355
case err := <-c.endPollonProxyCh:
310356
if err != nil {
311357
return fmt.Errorf("proxy error: %v", err)

cmd/sentinel/cmd/sentinel.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,7 @@ func (s *Sentinel) activeProxiesInfos(proxiesInfo cluster.ProxiesInfo) cluster.P
342342
for _, pi := range proxiesInfo {
343343
if pih, ok := pihs[pi.UID]; ok {
344344
if pih.ProxyInfo.InfoUID == pi.InfoUID {
345-
if timer.Since(pih.Timer) > 2*cluster.DefaultProxyTimeoutInterval {
345+
if timer.Since(pih.Timer) > 2*pi.ProxyTimeout {
346346
delete(activeProxiesInfo, pi.UID)
347347
}
348348
} else {
@@ -1820,7 +1820,6 @@ func (s *Sentinel) clusterSentinelCheck(pctx context.Context) {
18201820
s.sleepInterval = cd.Cluster.DefSpec().SleepInterval.Duration
18211821
s.requestTimeout = cd.Cluster.DefSpec().RequestTimeout.Duration
18221822
}
1823-
18241823
}
18251824

18261825
log.Debugf("cd dump: %s", spew.Sdump(cd))

cmd/sentinel/cmd/sentinel_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -5000,8 +5000,8 @@ func TestUpdateCluster(t *testing.T) {
50005000
}
50015001

50025002
func TestActiveProxiesInfos(t *testing.T) {
5003-
proxyInfo1 := cluster.ProxyInfo{UID: "proxy1", InfoUID: "infoUID1"}
5004-
proxyInfo2 := cluster.ProxyInfo{UID: "proxy2", InfoUID: "infoUID2"}
5003+
proxyInfo1 := cluster.ProxyInfo{UID: "proxy1", InfoUID: "infoUID1", ProxyTimeout: cluster.DefaultProxyTimeout}
5004+
proxyInfo2 := cluster.ProxyInfo{UID: "proxy2", InfoUID: "infoUID2", ProxyTimeout: cluster.DefaultProxyTimeout}
50055005
proxyInfoWithDifferentInfoUID := cluster.ProxyInfo{UID: "proxy2", InfoUID: "differentInfoUID"}
50065006
var secToNanoSecondMultiplier int64 = 1000000000
50075007
tests := []struct {
@@ -5033,7 +5033,7 @@ func TestActiveProxiesInfos(t *testing.T) {
50335033
expectedProxyInfoHistories: ProxyInfoHistories{"proxy1": &ProxyInfoHistory{ProxyInfo: &proxyInfo1}, "proxy2": &ProxyInfoHistory{ProxyInfo: &proxyInfoWithDifferentInfoUID}},
50345034
},
50355035
{
5036-
name: "should remove from active proxies if is not updated for twice the DefaultProxyTimeoutInterval",
5036+
name: "should remove from active proxies if is not updated for twice the DefaultProxyTimeout",
50375037
proxyInfoHistories: ProxyInfoHistories{"proxy1": &ProxyInfoHistory{ProxyInfo: &proxyInfo1, Timer: timer.Now() - (3 * 15 * secToNanoSecondMultiplier)}, "proxy2": &ProxyInfoHistory{ProxyInfo: &proxyInfo2, Timer: timer.Now() - (1 * 15 * secToNanoSecondMultiplier)}},
50385038
proxiesInfos: cluster.ProxiesInfo{"proxy1": &proxyInfo1, "proxy2": &proxyInfo2},
50395039
expectedActiveProxies: cluster.ProxiesInfo{"proxy2": &proxyInfo2},

cmd/stolonctl/cmd/spec.go

+4
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ type ClusterSpecNoDefaults struct {
5050
DBWaitReadyTimeout *cluster.Duration `json:"dbWaitReadyTimeout,omitempty"`
5151
FailInterval *cluster.Duration `json:"failInterval,omitempty"`
5252
DeadKeeperRemovalInterval *cluster.Duration `json:"deadKeeperRemovalInterval,omitempty"`
53+
ProxyCheckInterval *cluster.Duration `json:"proxyCheckInterval,omitempty"`
54+
ProxyTimeout *cluster.Duration `json:"proxyTimeout,omitempty"`
5355
MaxStandbys *uint16 `json:"maxStandbys,omitempty"`
5456
MaxStandbysPerSender *uint16 `json:"maxStandbysPerSender,omitempty"`
5557
MaxStandbyLag *uint32 `json:"maxStandbyLag,omitempty"`
@@ -81,6 +83,8 @@ type ClusterSpecDefaults struct {
8183
DBWaitReadyTimeout *cluster.Duration `json:"dbWaitReadyTimeout"`
8284
FailInterval *cluster.Duration `json:"failInterval"`
8385
DeadKeeperRemovalInterval *cluster.Duration `json:"deadKeeperRemovalInterval"`
86+
ProxyCheckInterval *cluster.Duration `json:"proxyCheckInterval"`
87+
ProxyTimeout *cluster.Duration `json:"proxyTimeout"`
8488
MaxStandbys *uint16 `json:"maxStandbys"`
8589
MaxStandbysPerSender *uint16 `json:"maxStandbysPerSender"`
8690
MaxStandbyLag *uint32 `json:"maxStandbyLag"`

0 commit comments

Comments
 (0)