Skip to content

Commit 37f77c8

Browse files
committed
Added notification handler
1 parent 0865144 commit 37f77c8

File tree

4 files changed

+99
-25
lines changed

4 files changed

+99
-25
lines changed

go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
55
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
66
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
77
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
8+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
89
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
910
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
1011
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

loadbalancer.go

Lines changed: 67 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,19 @@ type LoadBalancer struct {
1313
primaryGroup ServerGroup
1414
backupGroup ServerGroup
1515
primaryOnlineCount int
16+
eventHandlerMtx sync.RWMutex
17+
eventHandler EventHandler
1618
}
1719

20+
type EventHandler func(eventType int, server *Server)
21+
22+
// -----------------------------------------------------------------------------
23+
24+
const (
25+
ServerUpEvent int = iota + 1
26+
ServerDownEvent
27+
)
28+
1829
const (
1930
InvalidParamsErr = "invalid parameter"
2031
)
@@ -31,18 +42,30 @@ func Create() *LoadBalancer {
3142
backupGroup: ServerGroup{
3243
srvList: make([]Server, 0),
3344
},
45+
eventHandlerMtx: sync.RWMutex{},
3446
}
3547
return &lb
3648
}
3749

50+
// SetEventHandler sets a new notification handler callback
51+
func (lb *LoadBalancer) SetEventHandler(handler EventHandler) {
52+
lb.eventHandlerMtx.Lock()
53+
lb.eventHandler = handler
54+
lb.eventHandlerMtx.Unlock()
55+
}
56+
3857
// Add adds a new server to the list
3958
func (lb *LoadBalancer) Add(opts ServerOptions, userData interface{}) error {
4059
// Check options
4160
if opts.Weight < 0 {
4261
return errors.New(InvalidParamsErr)
4362
}
4463
if !opts.IsBackup {
45-
if opts.MaxFails < 0 || opts.FailTimeout <= time.Duration(0) {
64+
if opts.MaxFails > 0 {
65+
if opts.FailTimeout <= time.Duration(0) {
66+
return errors.New(InvalidParamsErr)
67+
}
68+
} else if opts.MaxFails < 0 {
4669
return errors.New(InvalidParamsErr)
4770
}
4871
}
@@ -56,7 +79,7 @@ func (lb *LoadBalancer) Add(opts ServerOptions, userData interface{}) error {
5679
if srv.opts.Weight == 0 {
5780
srv.opts.Weight = 1
5881
}
59-
if opts.IsBackup {
82+
if opts.IsBackup || srv.opts.MaxFails == 0 {
6083
srv.opts.MaxFails = 0
6184
srv.opts.FailTimeout = time.Duration(0)
6285
}
@@ -89,21 +112,27 @@ func (lb *LoadBalancer) Add(opts ServerOptions, userData interface{}) error {
89112

90113
// Next gets the next available server. It can return nil if no available server
91114
func (lb *LoadBalancer) Next() *Server {
115+
var nextServer *Server
116+
92117
now := time.Now()
93118

119+
notifyUp := make([]*Server, 0) // NOTE: We would use defer, but they are executed LIFO
120+
94121
// Lock access
95122
lb.mtx.Lock()
96-
defer lb.mtx.Unlock()
97123

98124
// If all primary servers are offline, check if we can put someone up
99125
if lb.primaryOnlineCount == 0 {
100126
for idx := range lb.primaryGroup.srvList {
101-
if now.After(lb.primaryGroup.srvList[idx].failTimestamp) {
102-
// Put this server online again
103-
lb.primaryGroup.srvList[idx].isDown = false
104-
lb.primaryGroup.srvList[idx].failCounter = 0
127+
srv := &lb.primaryGroup.srvList[idx]
105128

129+
if now.After(srv.failTimestamp) {
130+
// Put this server online again
131+
srv.isDown = false
132+
srv.failCounter = 0
106133
lb.primaryOnlineCount += 1
134+
135+
notifyUp = append(notifyUp, srv)
107136
}
108137
}
109138
}
@@ -117,14 +146,17 @@ func (lb *LoadBalancer) Next() *Server {
117146
// Set this server online again
118147
srv.isDown = false
119148
srv.lb.primaryOnlineCount += 1
149+
150+
notifyUp = append(notifyUp, srv)
120151
}
121152

122153
if !srv.isDown && lb.primaryGroup.currServerWeight < srv.opts.Weight {
123154
// Got a server!
124155
lb.primaryGroup.currServerWeight += 1
125156

126-
// Done
127-
return srv
157+
// Select this server
158+
nextServer = srv
159+
break
128160
}
129161

130162
// Advance to next server
@@ -137,17 +169,18 @@ func (lb *LoadBalancer) Next() *Server {
137169
}
138170
}
139171

140-
// If we reach here, there is no primary server available
141-
if len(lb.backupGroup.srvList) > 0 {
172+
// Look for backup servers if there is no primary available
173+
if nextServer == nil && len(lb.backupGroup.srvList) > 0 {
142174
for {
143175
srv := &lb.backupGroup.srvList[lb.backupGroup.currServerIdx]
144176

145177
if lb.backupGroup.currServerWeight < srv.opts.Weight {
146178
// Got a server!
147179
lb.backupGroup.currServerWeight += 1
148180

149-
// Done
150-
return srv
181+
// Select this server
182+
nextServer = srv
183+
break
151184
}
152185

153186
// Advance to next server
@@ -160,8 +193,16 @@ func (lb *LoadBalancer) Next() *Server {
160193
}
161194
}
162195

163-
// No available server
164-
return nil
196+
// Unlock access
197+
lb.mtx.Unlock()
198+
199+
// Call event callback
200+
for _, srv := range notifyUp {
201+
lb.raiseEvent(ServerUpEvent, srv)
202+
}
203+
204+
// Done
205+
return nextServer
165206
}
166207

167208
// WaitNext returns a channel that is fulfilled with the next available server
@@ -227,3 +268,14 @@ func (lb *LoadBalancer) WaitNext() (ch chan *Server) {
227268

228269
return
229270
}
271+
272+
// -----------------------------------------------------------------------------
273+
// Private methods
274+
275+
func (lb *LoadBalancer) raiseEvent(eventType int, server *Server) {
276+
lb.eventHandlerMtx.RLock()
277+
if lb.eventHandler != nil {
278+
lb.eventHandler(eventType, server)
279+
}
280+
lb.eventHandlerMtx.RUnlock()
281+
}

loadbalancer_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"testing"
55
"time"
66

7-
"github.com/stretchr/testify/assert"
7+
"github.com/stretchr/testify/require"
88
)
99

1010
// -----------------------------------------------------------------------------
@@ -31,9 +31,9 @@ func TestNoFail(t *testing.T) {
3131

3232
srvName, _ := srv.UserData().(string)
3333
if (idx % serverTotalCount) < serverOneCount {
34-
assert.Equal(t, serverOneName, srvName)
34+
require.Equal(t, serverOneName, srvName)
3535
} else {
36-
assert.Equal(t, serverTwoName, srvName)
36+
require.Equal(t, serverTwoName, srvName)
3737
}
3838

3939
srv.SetOnline()
@@ -51,7 +51,7 @@ func TestFailAll(t *testing.T) {
5151

5252
// At this point next server should be none
5353
srv := lb.Next()
54-
assert.Equal(t, (*Server)(nil), srv)
54+
require.Equal(t, (*Server)(nil), srv)
5555
}
5656

5757
func TestBackup(t *testing.T) {
@@ -67,7 +67,7 @@ func TestBackup(t *testing.T) {
6767
srv := lb.Next()
6868

6969
srvName, _ := srv.UserData().(string)
70-
assert.Equal(t, backupServerName, srvName)
70+
require.Equal(t, backupServerName, srvName)
7171

7272
srv.SetOffline() // NOTE: This call will act as a NO-OP
7373
}
@@ -83,15 +83,15 @@ func TestWait(t *testing.T) {
8383

8484
// At this point next server should be none
8585
srv := lb.Next()
86-
assert.Equal(t, (*Server)(nil), srv)
86+
require.Equal(t, (*Server)(nil), srv)
8787

8888
// Wait until a server becomes available (after ~1sec)
8989
ch := lb.WaitNext()
9090
srv = <-ch
9191

9292
// At this point server 2 should be online again
9393
srvName, _ := srv.UserData().(string)
94-
assert.Equal(t, srvName, serverTwoName)
94+
require.Equal(t, srvName, serverTwoName)
9595
}
9696

9797
// -----------------------------------------------------------------------------

server.go

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,10 @@ func (srv *Server) SetOnline() {
4646
return
4747
}
4848

49+
notifyUp := false
50+
4951
// Lock access
5052
srv.lb.mtx.Lock()
51-
defer srv.lb.mtx.Unlock()
5253

5354
// Reset the failure counter
5455
srv.failCounter = 0
@@ -57,6 +58,16 @@ func (srv *Server) SetOnline() {
5758
if srv.isDown {
5859
srv.isDown = false
5960
srv.lb.primaryOnlineCount += 1
61+
62+
notifyUp = true
63+
}
64+
65+
// Unlock access
66+
srv.lb.mtx.Unlock()
67+
68+
// Call event callback
69+
if notifyUp {
70+
srv.lb.raiseEvent(ServerUpEvent, srv)
6071
}
6172
}
6273

@@ -67,9 +78,10 @@ func (srv *Server) SetOffline() {
6778
return
6879
}
6980

81+
notifyDown := false
82+
7083
// Lock access
7184
srv.lb.mtx.Lock()
72-
defer srv.lb.mtx.Unlock()
7385

7486
// If server is up
7587
if !srv.isDown && srv.failCounter < srv.opts.MaxFails {
@@ -94,8 +106,17 @@ func (srv *Server) SetOffline() {
94106
if srv.failCounter == srv.opts.MaxFails {
95107
srv.isDown = true
96108
srv.failTimestamp = now.Add(srv.opts.FailTimeout)
97-
98109
srv.lb.primaryOnlineCount -= 1
110+
111+
notifyDown = true
99112
}
100113
}
114+
115+
// Unlock access
116+
srv.lb.mtx.Unlock()
117+
118+
// Call event callback
119+
if notifyDown {
120+
srv.lb.raiseEvent(ServerDownEvent, srv)
121+
}
101122
}

0 commit comments

Comments
 (0)