-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathloadbalancer.go
279 lines (225 loc) · 6.11 KB
/
loadbalancer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
package loadbalancer
import (
"errors"
"sync"
"time"
)
// -----------------------------------------------------------------------------
// LoadBalancer is the main load balancer object manager.
type LoadBalancer struct {
mtx sync.Mutex
primaryGroup ServerGroup
backupGroup ServerGroup
primaryOnlineCount int
eventHandlerMtx sync.RWMutex
eventHandler EventHandler
}
// EventHandler is a handler to call when a server is set offline or online.
type EventHandler func(eventType int, server *Server)
// -----------------------------------------------------------------------------
const (
ServerUpEvent int = iota + 1
ServerDownEvent
)
// -----------------------------------------------------------------------------
// Create creates a new load balancer manager
func Create() *LoadBalancer {
lb := LoadBalancer{
mtx: sync.Mutex{},
primaryGroup: ServerGroup{
srvList: make([]Server, 0),
},
backupGroup: ServerGroup{
srvList: make([]Server, 0),
},
eventHandlerMtx: sync.RWMutex{},
}
return &lb
}
// SetEventHandler sets a new notification handler callback
func (lb *LoadBalancer) SetEventHandler(handler EventHandler) {
lb.eventHandlerMtx.Lock()
lb.eventHandler = handler
lb.eventHandlerMtx.Unlock()
}
// Add adds a new server to the list
func (lb *LoadBalancer) Add(opts ServerOptions, userData interface{}) error {
// Check options
if opts.Weight < 0 {
return errors.New("invalid parameter")
}
if !opts.IsBackup {
if opts.MaxFails > 0 {
if opts.FailTimeout <= time.Duration(0) {
return errors.New("invalid parameter")
}
} else if opts.MaxFails < 0 {
return errors.New("invalid parameter")
}
}
// Create new server
srv := Server{
lb: lb,
opts: opts,
userData: userData,
}
if srv.opts.Weight == 0 {
srv.opts.Weight = 1
}
if opts.IsBackup || srv.opts.MaxFails == 0 {
srv.opts.MaxFails = 0
srv.opts.FailTimeout = time.Duration(0)
}
// Lock access
lb.mtx.Lock()
defer lb.mtx.Unlock()
if !opts.IsBackup {
// Set server index
srv.index = len(lb.primaryGroup.srvList)
// Add to the primary server list
lb.primaryGroup.srvList = append(lb.primaryGroup.srvList, srv)
// Assume the server is initially online
lb.primaryOnlineCount += 1
} else {
// Set server index
srv.index = len(lb.backupGroup.srvList)
// Add to the backup server list
lb.backupGroup.srvList = append(lb.backupGroup.srvList, srv)
}
// Done
return nil
}
// Next gets the next available server. It can return nil if no available server
func (lb *LoadBalancer) Next() *Server {
var nextServer *Server
now := time.Now()
notifyUp := make([]*Server, 0) // NOTE: We would use defer, but they are executed LIFO
// Lock access
lb.mtx.Lock()
// If all primary servers are offline, check if we can put someone up
if lb.primaryOnlineCount == 0 {
for idx := range lb.primaryGroup.srvList {
srv := &lb.primaryGroup.srvList[idx]
if now.After(srv.failTimestamp) {
// Put this server online again
srv.isDown = false
srv.failCounter = 0
lb.primaryOnlineCount += 1
notifyUp = append(notifyUp, srv)
}
}
}
// If there is at least one primary server online, find the next
if lb.primaryOnlineCount > 0 {
for {
srv := &lb.primaryGroup.srvList[lb.primaryGroup.currServerIdx]
if srv.isDown && now.After(srv.failTimestamp) {
// Set this server online again
srv.isDown = false
srv.lb.primaryOnlineCount += 1
notifyUp = append(notifyUp, srv)
}
if !srv.isDown && lb.primaryGroup.currServerWeight < srv.opts.Weight {
// Got a server!
lb.primaryGroup.currServerWeight += 1
// Select this server
nextServer = srv
break
}
// Advance to next server
lb.primaryGroup.currServerIdx += 1
if lb.primaryGroup.currServerIdx >= len(lb.primaryGroup.srvList) {
lb.primaryGroup.currServerIdx = 0
}
lb.primaryGroup.currServerWeight = 0
}
}
// Look for backup servers if there is no primary available
if nextServer == nil && len(lb.backupGroup.srvList) > 0 {
for {
srv := &lb.backupGroup.srvList[lb.backupGroup.currServerIdx]
if lb.backupGroup.currServerWeight < srv.opts.Weight {
// Got a server!
lb.backupGroup.currServerWeight += 1
// Select this server
nextServer = srv
break
}
// Advance to next server
lb.backupGroup.currServerIdx += 1
if lb.backupGroup.currServerIdx >= len(lb.backupGroup.srvList) {
lb.backupGroup.currServerIdx = 0
}
lb.backupGroup.currServerWeight = 0
}
}
// Unlock access
lb.mtx.Unlock()
// Call event callback
for _, srv := range notifyUp {
lb.raiseEvent(ServerUpEvent, srv)
}
// Done
return nextServer
}
// WaitNext returns a channel that is fulfilled with the next available server
func (lb *LoadBalancer) WaitNext() (ch chan *Server) {
ch = make(chan *Server)
// Set up a goroutine that will be fulfilled when a server is available
go func() {
var srv *Server
for {
// Get an available server
srv = lb.Next()
if srv != nil {
// Got one
break
}
now := time.Now()
toWait := time.Duration(-1)
// Lock access
lb.mtx.Lock()
// Exit if we don't have primary servers
if len(lb.primaryGroup.srvList) == 0 {
lb.mtx.Unlock()
break
}
// Get the server that will become online sooner
srvCount := len(lb.primaryGroup.srvList)
for idx := 0; idx < srvCount; idx++ {
srv = &lb.primaryGroup.srvList[idx]
// Only consider offline servers
if srv.isDown {
diff := srv.failTimestamp.Sub(now)
if diff <= 0 {
// This server will immediately become online
break
}
if toWait < 0 || diff < toWait {
toWait = diff
}
}
}
// Unlock access
lb.mtx.Unlock()
// Wait some time until a new server can become available
if toWait > 0 {
time.Sleep(toWait)
}
}
// Once we have a server, send through the channel
ch <- srv
close(ch)
}()
return
}
// OnlineCount gets the total amount of online servers
func (lb *LoadBalancer) OnlineCount(includeBackup bool) int {
lb.mtx.Lock()
count := lb.primaryOnlineCount
lb.mtx.Unlock()
if includeBackup {
count += len(lb.backupGroup.srvList)
}
return count
}