This repository has been archived by the owner on Oct 25, 2024. It is now read-only.
forked from schwartzmx/gremtune
-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathpool.go
353 lines (286 loc) · 9.5 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
package gremcos
import (
"fmt"
"sync"
"time"
"github.com/rs/zerolog"
"github.com/supplyon/gremcos/interfaces"
)
type QueryExecutorFactoryFunc func() (interfaces.QueryExecutor, error)
// pool maintains a pool of connections to the cosmos db.
type pool struct {
logger zerolog.Logger
// createQueryExecutor function that returns new connected QueryExecutors
createQueryExecutor QueryExecutorFactoryFunc
// maxActive is the maximum number of allowed active connections
maxActive int
// idleTimeout is the maximum time a idle connection will be kept in the pool.
// If the timeout has been expired, the connection will be closed and removed
// from the pool.
// If this timeout is set to 0, the timeout is unlimited -> no expiration of connections.
idleTimeout time.Duration
// idleConnections list of idle connections
idleConnections []*idleConnection
// active is the number of currently active connections
active int
closed bool
cond *sync.Cond
mu sync.RWMutex
}
// pooledConnection represents a shared and reusable connection.
type pooledConnection struct {
pool *pool
client interfaces.QueryExecutor
}
// NewPool creates a new pool which is a QueryExecutor
func NewPool(createQueryExecutor QueryExecutorFactoryFunc, maxActiveConnections int, idleTimeout time.Duration, logger zerolog.Logger) (*pool, error) {
if createQueryExecutor == nil {
return nil, fmt.Errorf("Given createQueryExecutor is nil")
}
if maxActiveConnections < 1 {
return nil, fmt.Errorf("maxActiveConnections has to be >=1")
}
if idleTimeout < time.Second*0 {
return nil, fmt.Errorf("maxActiveConnections has to be >=0")
}
return &pool{
createQueryExecutor: createQueryExecutor,
maxActive: maxActiveConnections,
active: 0,
closed: false,
idleTimeout: idleTimeout,
idleConnections: make([]*idleConnection, 0),
logger: logger,
}, nil
}
type idleConnection struct {
pc *pooledConnection
// idleSince is the time the connection was idled
idleSince time.Time
}
// IsConnected returns true in case at least one (idle or active) connection
// managed by the pool is connected.
func (p *pool) IsConnected() bool {
p.mu.RLock()
// in case we have at least one active connection
// --> we can return immediately with status connected
if p.active > 0 {
p.mu.RUnlock()
return true
}
// copy the idle connections to be able to unlock as soon as possible
idleConnectionsCopy := make([]*idleConnection, len(p.idleConnections))
copy(idleConnectionsCopy, p.idleConnections)
p.mu.RUnlock()
for _, connection := range idleConnectionsCopy {
if !connection.pc.client.IsConnected() {
continue
}
// We assume to be healthy if at least one active connection
// could be found. Hence, we can stop searching when the first
// healthy one was found.
return true
}
// did not find any working connection
return false
}
func (p *pool) LastError() error {
// TODO: Implement
return nil
}
// Get will return an available pooled connection. Either an idle connection or
// by dialing a new one if the pool does not currently have a maximum number
// of active connections.
func (p *pool) Get() (*pooledConnection, error) {
// Lock the pool to keep the kids out.
p.mu.Lock()
// Clean this place up.
p.purge()
// Wait loop
for {
p.logger.Debug().Int("active", p.active).Int("maxActive", p.maxActive).Int("idle", len(p.idleConnections)).Msg("Pool-Get")
// TODO: Ensure to return only clients that are connected
// Try to grab first available idle connection
if conn := p.first(); conn != nil {
// Remove the connection from the idle slice
p.idleConnections = append(p.idleConnections[:0], p.idleConnections[1:]...)
p.active++
p.mu.Unlock()
pc := &pooledConnection{pool: p, client: conn.pc.client}
return pc, nil
}
// No idle connections, try dialing a new one
if p.maxActive == 0 || p.active < p.maxActive {
p.active++
createQueryExecutor := p.createQueryExecutor
// Unlock here so that any other connections that need to be
// dialed do not have to wait.
p.mu.Unlock()
dc, err := createQueryExecutor()
if err != nil {
p.mu.Lock()
p.release()
p.mu.Unlock()
return nil, err
}
pc := &pooledConnection{pool: p, client: dc}
return pc, nil
}
//No idle connections and max active connections, let's wait.
if p.cond == nil {
p.cond = sync.NewCond(&p.mu)
}
p.logger.Info().Int("active", p.active).Int("maxActive", p.maxActive).Int("idle", len(p.idleConnections)).Msg("Wait for new connections")
p.cond.Wait()
}
}
// put pushes the supplied pooledConnection to the top of the idle slice to be reused.
// It is not threadsafe. The caller should manage locking the pool.
func (p *pool) put(pc *pooledConnection) {
if p.closed {
pc.client.Close()
return
}
idle := &idleConnection{pc: pc, idleSince: time.Now()}
// Prepend the connection to the front of the slice
p.idleConnections = append([]*idleConnection{idle}, p.idleConnections...)
}
// purge removes expired idle connections from the pool.
// It is not threadsafe. The caller should manage locking the pool.
func (p *pool) purge() {
timeout := p.idleTimeout
// don't clean up in case there is no timeout specified
if timeout <= 0 {
p.logger.Info().Msg("Don't purge connections, no timeout specified")
return
}
var idleConnectionsAfterPurge []*idleConnection
now := time.Now()
for _, idleConnection := range p.idleConnections {
// If the client has an error then exclude it from the pool
if err := idleConnection.pc.client.LastError(); err != nil {
// only log the error in case it is not the socket closed event
if _, ok := err.(socketClosedByServerError); !ok {
p.logger.Info().Err(err).Msgf("(during purge) Remove connection from pool due to an error [%s]", err.Error())
}
// Force underlying connection closed
idleConnection.pc.client.Close()
continue
}
// If the client is not connected any more then exclude it from the pool
if !idleConnection.pc.client.IsConnected() {
p.logger.Info().Msg("(during purge) Remove connection from pool which is not connected")
continue
}
deadline := idleConnection.idleSince.Add(timeout)
if deadline.After(now) {
p.logger.Debug().Time("deadline", deadline).Msg("(during purge) Keep connection which is not expired")
// not expired -> keep it in the idle connection list
idleConnectionsAfterPurge = append(idleConnectionsAfterPurge, idleConnection)
} else {
p.logger.Info().Time("deadline", deadline).Msg("(during purge) Remove connection from pool which is expired")
// expired -> don't add it to the idle connection list
// Force underlying connection closed
idleConnection.pc.client.Close()
}
}
p.idleConnections = idleConnectionsAfterPurge
}
// release decrements active and alerts waiters.
// It is not threadsafe. The caller should manage locking the pool.
func (p *pool) release() {
if p.closed {
return
}
// can't release a more connections
// since there are no active ones any more
if p.active == 0 {
return
}
p.active--
if p.cond != nil {
p.cond.Signal()
}
}
// It is not threadsafe. The caller should manage locking the pool.
func (p *pool) first() *idleConnection {
if len(p.idleConnections) == 0 {
return nil
}
return p.idleConnections[0]
}
// Close closes the pool.
func (p *pool) Close() error {
p.mu.Lock()
defer p.mu.Unlock()
for _, c := range p.idleConnections {
c.pc.client.Close()
}
p.closed = true
return nil
}
// ExecuteWithBindings formats a raw Gremlin query, sends it to Gremlin Server, and returns the result.
func (p *pool) ExecuteWithBindings(query string, bindings, rebindings map[string]interface{}) (resp []interfaces.Response, err error) {
pc, err := p.Get()
if err != nil {
return nil, err
}
defer pc.Close()
return pc.client.ExecuteWithBindings(query, bindings, rebindings)
}
// Execute grabs a connection from the pool, formats a raw Gremlin query, sends it to Gremlin Server, and returns the result.
func (p *pool) Execute(query string) (resp []interfaces.Response, err error) {
pc, err := p.Get()
if err != nil {
return nil, err
}
// put the connection back into the idle pool
defer pc.Close()
return pc.client.Execute(query)
}
func (p *pool) ExecuteAsync(query string, responseChannel chan interfaces.AsyncResponse) (err error) {
pc, err := p.Get()
if err != nil {
return err
}
// put the connection back into the idle pool
defer pc.Close()
return pc.client.ExecuteAsync(query, responseChannel)
}
func (p *pool) ExecuteFile(path string) (resp []interfaces.Response, err error) {
pc, err := p.Get()
if err != nil {
return nil, err
}
// put the connection back into the idle pool
defer pc.Close()
return pc.client.ExecuteFile(path)
}
func (p *pool) ExecuteFileWithBindings(path string, bindings, rebindings map[string]interface{}) (resp []interfaces.Response, err error) {
pc, err := p.Get()
if err != nil {
return nil, err
}
// put the connection back into the idle pool
defer pc.Close()
return pc.client.ExecuteFileWithBindings(path, bindings, rebindings)
}
// Close signals that the caller is finished with the connection and should be
// returned to the pool for future use.
func (pc *pooledConnection) Close() {
pc.pool.mu.Lock()
defer pc.pool.mu.Unlock()
pc.pool.put(pc)
pc.pool.release()
}
// Ping obtains/ creates a connection from the pool and
// sends the ping control message over the underlying websocket.
func (p *pool) Ping() error {
pc, err := p.Get()
if err != nil {
return err
}
// put the connection back into the idle pool
defer pc.Close()
return pc.client.Ping()
}