Skip to content

Commit 0424b16

Browse files
authored
Merge pull request sorintlab#756 from sgotti/proxy_configurable_intervals
proxy: make proxyCheckInterval and proxyTimeout configurable
2 parents 1305446 + 9cc800d commit 0424b16

File tree

9 files changed

+153
-77
lines changed

9 files changed

+153
-77
lines changed

cmd/proxy/cmd/proxy.go

+57-11
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,10 @@ type ClusterChecker struct {
8989
endPollonProxyCh chan error
9090

9191
pollonMutex sync.Mutex
92+
93+
proxyCheckInterval time.Duration
94+
proxyTimeout time.Duration
95+
configMutex sync.Mutex
9296
}
9397

9498
func NewClusterChecker(uid string, cfg config) (*ClusterChecker, error) {
@@ -104,6 +108,9 @@ func NewClusterChecker(uid string, cfg config) (*ClusterChecker, error) {
104108
stopListening: cfg.stopListening,
105109
e: e,
106110
endPollonProxyCh: make(chan error),
111+
112+
proxyCheckInterval: cluster.DefaultProxyCheckInterval,
113+
proxyTimeout: cluster.DefaultProxyTimeout,
107114
}, nil
108115
}
109116

@@ -164,15 +171,16 @@ func (c *ClusterChecker) sendPollonConfData(confData pollon.ConfData) {
164171
}
165172
}
166173

167-
func (c *ClusterChecker) SetProxyInfo(e store.Store, generation int64, ttl time.Duration) error {
174+
func (c *ClusterChecker) SetProxyInfo(e store.Store, generation int64, proxyTimeout time.Duration) error {
168175
proxyInfo := &cluster.ProxyInfo{
169-
InfoUID: common.UID(),
170-
UID: c.uid,
171-
Generation: generation,
176+
InfoUID: common.UID(),
177+
UID: c.uid,
178+
Generation: generation,
179+
ProxyTimeout: proxyTimeout,
172180
}
173181
log.Debugf("proxyInfo dump: %s", spew.Sdump(proxyInfo))
174182

175-
if err := c.e.SetProxyInfo(context.TODO(), proxyInfo, ttl); err != nil {
183+
if err := c.e.SetProxyInfo(context.TODO(), proxyInfo, 2*proxyTimeout); err != nil {
176184
return err
177185
}
178186
return nil
@@ -205,13 +213,31 @@ func (c *ClusterChecker) Check() error {
205213
return fmt.Errorf("clusterdata validation failed: %v", err)
206214
}
207215

216+
cdProxyCheckInterval := cd.Cluster.DefSpec().ProxyCheckInterval.Duration
217+
cdProxyTimeout := cd.Cluster.DefSpec().ProxyTimeout.Duration
218+
219+
// use the greater between the current proxy timeout and the one defined in the cluster spec if they're different.
220+
// in this way we're updating our proxyInfo using a timeout that is greater or equal the current active timeout timer.
221+
c.configMutex.Lock()
222+
proxyTimeout := c.proxyTimeout
223+
if cdProxyTimeout > proxyTimeout {
224+
proxyTimeout = cdProxyTimeout
225+
}
226+
c.configMutex.Unlock()
227+
208228
proxy := cd.Proxy
209229
if proxy == nil {
210230
log.Infow("no proxy object available, closing connections to master")
211231
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
212232
// ignore errors on setting proxy info
213-
if err = c.SetProxyInfo(c.e, cluster.NoGeneration, 2*cluster.DefaultProxyTimeoutInterval); err != nil {
233+
if err = c.SetProxyInfo(c.e, cluster.NoGeneration, proxyTimeout); err != nil {
214234
log.Errorw("failed to update proxyInfo", zap.Error(err))
235+
} else {
236+
// update proxyCheckinterval and proxyTimeout only if we successfully updated our proxy info
237+
c.configMutex.Lock()
238+
c.proxyCheckInterval = cdProxyCheckInterval
239+
c.proxyTimeout = cdProxyTimeout
240+
c.configMutex.Unlock()
215241
}
216242
return nil
217243
}
@@ -221,8 +247,14 @@ func (c *ClusterChecker) Check() error {
221247
log.Infow("no db object available, closing connections to master", "db", proxy.Spec.MasterDBUID)
222248
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
223249
// ignore errors on setting proxy info
224-
if err = c.SetProxyInfo(c.e, proxy.Generation, 2*cluster.DefaultProxyTimeoutInterval); err != nil {
250+
if err = c.SetProxyInfo(c.e, proxy.Generation, proxyTimeout); err != nil {
225251
log.Errorw("failed to update proxyInfo", zap.Error(err))
252+
} else {
253+
// update proxyCheckinterval and proxyTimeout only if we successfully updated our proxy info
254+
c.configMutex.Lock()
255+
c.proxyCheckInterval = cdProxyCheckInterval
256+
c.proxyTimeout = cdProxyTimeout
257+
c.configMutex.Unlock()
226258
}
227259
return nil
228260
}
@@ -234,12 +266,18 @@ func (c *ClusterChecker) Check() error {
234266
return nil
235267
}
236268
log.Infow("master address", "address", addr)
237-
if err = c.SetProxyInfo(c.e, proxy.Generation, 2*cluster.DefaultProxyTimeoutInterval); err != nil {
269+
if err = c.SetProxyInfo(c.e, proxy.Generation, proxyTimeout); err != nil {
238270
// if we failed to update our proxy info when a master is defined we
239271
// cannot ignore this error since the sentinel won't know that we exist
240272
// and are sending connections to a master so, when electing a new
241273
// master, it'll not wait for us to close connections to the old one.
242274
return fmt.Errorf("failed to update proxyInfo: %v", err)
275+
} else {
276+
// update proxyCheckinterval and proxyTimeout only if we successfully updated our proxy info
277+
c.configMutex.Lock()
278+
c.proxyCheckInterval = cdProxyCheckInterval
279+
c.proxyTimeout = cdProxyTimeout
280+
c.configMutex.Unlock()
243281
}
244282

245283
// start proxing only if we are inside enabledProxies, this ensures that the
@@ -256,7 +294,9 @@ func (c *ClusterChecker) Check() error {
256294
}
257295

258296
func (c *ClusterChecker) TimeoutChecker(checkOkCh chan struct{}) {
259-
timeoutTimer := time.NewTimer(cluster.DefaultProxyTimeoutInterval)
297+
c.configMutex.Lock()
298+
timeoutTimer := time.NewTimer(c.proxyTimeout)
299+
c.configMutex.Unlock()
260300

261301
for {
262302
select {
@@ -275,7 +315,10 @@ func (c *ClusterChecker) TimeoutChecker(checkOkCh chan struct{}) {
275315

276316
// ignore if stop succeeded or not due to timer already expired
277317
timeoutTimer.Stop()
278-
timeoutTimer = time.NewTimer(cluster.DefaultProxyTimeoutInterval)
318+
319+
c.configMutex.Lock()
320+
timeoutTimer = time.NewTimer(c.proxyTimeout)
321+
c.configMutex.Unlock()
279322
}
280323
}
281324
}
@@ -305,7 +348,10 @@ func (c *ClusterChecker) Start() error {
305348
// report that check was ok
306349
checkOkCh <- struct{}{}
307350
}
308-
timerCh = time.NewTimer(cluster.DefaultProxyCheckInterval).C
351+
c.configMutex.Lock()
352+
timerCh = time.NewTimer(c.proxyCheckInterval).C
353+
c.configMutex.Unlock()
354+
309355
case err := <-c.endPollonProxyCh:
310356
if err != nil {
311357
return fmt.Errorf("proxy error: %v", err)

cmd/sentinel/cmd/sentinel.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,7 @@ func (s *Sentinel) activeProxiesInfos(proxiesInfo cluster.ProxiesInfo) cluster.P
342342
for _, pi := range proxiesInfo {
343343
if pih, ok := pihs[pi.UID]; ok {
344344
if pih.ProxyInfo.InfoUID == pi.InfoUID {
345-
if timer.Since(pih.Timer) > 2*cluster.DefaultProxyTimeoutInterval {
345+
if timer.Since(pih.Timer) > 2*pi.ProxyTimeout {
346346
delete(activeProxiesInfo, pi.UID)
347347
}
348348
} else {
@@ -1820,7 +1820,6 @@ func (s *Sentinel) clusterSentinelCheck(pctx context.Context) {
18201820
s.sleepInterval = cd.Cluster.DefSpec().SleepInterval.Duration
18211821
s.requestTimeout = cd.Cluster.DefSpec().RequestTimeout.Duration
18221822
}
1823-
18241823
}
18251824

18261825
log.Debugf("cd dump: %s", spew.Sdump(cd))

cmd/sentinel/cmd/sentinel_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -5000,8 +5000,8 @@ func TestUpdateCluster(t *testing.T) {
50005000
}
50015001

50025002
func TestActiveProxiesInfos(t *testing.T) {
5003-
proxyInfo1 := cluster.ProxyInfo{UID: "proxy1", InfoUID: "infoUID1"}
5004-
proxyInfo2 := cluster.ProxyInfo{UID: "proxy2", InfoUID: "infoUID2"}
5003+
proxyInfo1 := cluster.ProxyInfo{UID: "proxy1", InfoUID: "infoUID1", ProxyTimeout: cluster.DefaultProxyTimeout}
5004+
proxyInfo2 := cluster.ProxyInfo{UID: "proxy2", InfoUID: "infoUID2", ProxyTimeout: cluster.DefaultProxyTimeout}
50055005
proxyInfoWithDifferentInfoUID := cluster.ProxyInfo{UID: "proxy2", InfoUID: "differentInfoUID"}
50065006
var secToNanoSecondMultiplier int64 = 1000000000
50075007
tests := []struct {
@@ -5033,7 +5033,7 @@ func TestActiveProxiesInfos(t *testing.T) {
50335033
expectedProxyInfoHistories: ProxyInfoHistories{"proxy1": &ProxyInfoHistory{ProxyInfo: &proxyInfo1}, "proxy2": &ProxyInfoHistory{ProxyInfo: &proxyInfoWithDifferentInfoUID}},
50345034
},
50355035
{
5036-
name: "should remove from active proxies if is not updated for twice the DefaultProxyTimeoutInterval",
5036+
name: "should remove from active proxies if is not updated for twice the DefaultProxyTimeout",
50375037
proxyInfoHistories: ProxyInfoHistories{"proxy1": &ProxyInfoHistory{ProxyInfo: &proxyInfo1, Timer: timer.Now() - (3 * 15 * secToNanoSecondMultiplier)}, "proxy2": &ProxyInfoHistory{ProxyInfo: &proxyInfo2, Timer: timer.Now() - (1 * 15 * secToNanoSecondMultiplier)}},
50385038
proxiesInfos: cluster.ProxiesInfo{"proxy1": &proxyInfo1, "proxy2": &proxyInfo2},
50395039
expectedActiveProxies: cluster.ProxiesInfo{"proxy2": &proxyInfo2},

cmd/stolonctl/cmd/spec.go

+4
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ type ClusterSpecNoDefaults struct {
5050
DBWaitReadyTimeout *cluster.Duration `json:"dbWaitReadyTimeout,omitempty"`
5151
FailInterval *cluster.Duration `json:"failInterval,omitempty"`
5252
DeadKeeperRemovalInterval *cluster.Duration `json:"deadKeeperRemovalInterval,omitempty"`
53+
ProxyCheckInterval *cluster.Duration `json:"proxyCheckInterval,omitempty"`
54+
ProxyTimeout *cluster.Duration `json:"proxyTimeout,omitempty"`
5355
MaxStandbys *uint16 `json:"maxStandbys,omitempty"`
5456
MaxStandbysPerSender *uint16 `json:"maxStandbysPerSender,omitempty"`
5557
MaxStandbyLag *uint32 `json:"maxStandbyLag,omitempty"`
@@ -81,6 +83,8 @@ type ClusterSpecDefaults struct {
8183
DBWaitReadyTimeout *cluster.Duration `json:"dbWaitReadyTimeout"`
8284
FailInterval *cluster.Duration `json:"failInterval"`
8385
DeadKeeperRemovalInterval *cluster.Duration `json:"deadKeeperRemovalInterval"`
86+
ProxyCheckInterval *cluster.Duration `json:"proxyCheckInterval"`
87+
ProxyTimeout *cluster.Duration `json:"proxyTimeout"`
8488
MaxStandbys *uint16 `json:"maxStandbys"`
8589
MaxStandbysPerSender *uint16 `json:"maxStandbysPerSender"`
8690
MaxStandbyLag *uint32 `json:"maxStandbyLag"`

0 commit comments

Comments
 (0)