forked from patrickxb/fgosqlite
-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathpool.go
180 lines (163 loc) · 4.4 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
// Copyright 2010 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build all
package sqlite
import (
"sync"
"time"
)
// Pool adapted from https://code.google.com/p/vitess/source/browse/go/pools/roundrobin.go
type Pool struct {
mu sync.Mutex
available *sync.Cond
conns chan *Conn
size int
factory ConnOpen
idleTimeout time.Duration
}
// ConnOpen is the signature of connection factory.
type ConnOpen func() (*Conn, error)
// NewPool creates a connection pool.
// factory will be the function used to create connections.
// capacity is the maximum number of connections created.
// If a connection is unused beyond idleTimeout, it's discarded.
func NewPool(factory ConnOpen, capacity int, idleTimeout time.Duration) *Pool {
p := &Pool{conns: make(chan *Conn, capacity), factory: factory, idleTimeout: idleTimeout}
p.available = sync.NewCond(&p.mu)
return p
}
// Get will return the next available connection. If none is available, and capacity
// has not been reached, it will create a new one using the factory. Otherwise,
// it will indefinitely wait till the next connection becomes available.
func (p *Pool) Get() (*Conn, error) {
return p.get(true)
}
// TryGet will return the next available connection. If none is available, and capacity
// has not been reached, it will create a new one using the factory. Otherwise,
// it will return nil with no error.
func (p *Pool) TryGet() (*Conn, error) {
return p.get(false)
}
func (p *Pool) get(wait bool) (*Conn, error) {
p.mu.Lock()
defer p.mu.Unlock()
// Any waits in this loop will release the lock, and it will be
// reacquired before the waits return.
for {
select {
case conn := <-p.conns:
// Found a free resource in the channel
if p.idleTimeout > 0 && conn.timeUsed.Add(p.idleTimeout).Sub(time.Now()) < 0 {
// connection has been idle for too long. Discard & go for next.
go conn.Close()
p.size--
// Nobody else should be waiting, but signal anyway.
p.available.Signal()
continue
}
return conn, nil
default:
// connection channel is empty
if p.size >= cap(p.conns) {
// The pool is full
if wait {
p.available.Wait()
continue
}
return nil, nil
}
// Pool is not full. Create a connection.
var conn *Conn
var err error
if conn, err = p.waitForCreate(); err != nil {
// size was decremented, and somebody could be waiting.
p.available.Signal()
return nil, err
}
// Creation successful. Account for this by incrementing size.
p.size++
return conn, err
}
}
}
func (p *Pool) waitForCreate() (*Conn, error) {
// Prevent thundering herd: increment size before creating resource, and decrement after.
p.size++
p.mu.Unlock()
defer func() {
p.mu.Lock()
p.size--
}()
return p.factory()
}
// Release will return a connection to the pool. You MUST return every connection to the pool,
// even if it's closed. If a connection is closed, Release will discard it.
func (p *Pool) Release(c *Conn) {
p.mu.Lock()
defer p.available.Signal()
defer p.mu.Unlock()
if p.size > cap(p.conns) {
go c.Close()
p.size--
} else if c.IsClosed() {
p.size--
} else {
if len(p.conns) == cap(p.conns) {
panic("unexpected")
}
c.timeUsed = time.Now()
p.conns <- c
}
}
// Close empties the pool closing all its connections.
// It waits for all connections to be returned (Release).
func (p *Pool) Close() {
p.mu.Lock()
defer p.mu.Unlock()
for p.size > 0 {
select {
case conn := <-p.conns:
go conn.Close()
p.size--
default:
p.available.Wait()
}
}
p.factory = nil
}
// IsClosed returns true when the pool has been closed.
func (p *Pool) IsClosed() bool {
return p.factory == nil
}
// SetCapacity changes the capacity of the pool.
// You can use it to expand or shrink.
func (p *Pool) SetCapacity(capacity int) {
p.mu.Lock()
defer p.available.Broadcast()
defer p.mu.Unlock()
nr := make(chan *Conn, capacity)
// This loop transfers connections from the old channel
// to the new one, until it fills up or runs out.
// It discards extras, if any.
for {
select {
case conn := <-p.conns:
if len(nr) < cap(nr) {
nr <- conn
} else {
go conn.Close()
p.size--
}
continue
default:
}
break
}
p.conns = nr
}
func (p *Pool) SetIdleTimeout(idleTimeout time.Duration) {
p.mu.Lock()
defer p.mu.Unlock()
p.idleTimeout = idleTimeout
}