Skip to content

Commit 114d7e7

Browse files
committed
Expose pool limit setting via Session.SetPoolLimit.
1 parent e15707a commit 114d7e7

File tree

5 files changed

+82
-33
lines changed

5 files changed

+82
-33
lines changed

cluster.go

+6-7
Original file line numberDiff line numberDiff line change
@@ -515,12 +515,10 @@ func (cluster *mongoCluster) syncServersIteration(direct bool) {
515515
cluster.Unlock()
516516
}
517517

518-
var socketsPerServer = 4096
519-
520518
// AcquireSocket returns a socket to a server in the cluster. If slaveOk is
521519
// true, it will attempt to return a socket to a slave server. If it is
522520
// false, the socket will necessarily be to a master server.
523-
func (cluster *mongoCluster) AcquireSocket(slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D) (s *mongoSocket, err error) {
521+
func (cluster *mongoCluster) AcquireSocket(slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit int) (s *mongoSocket, err error) {
524522
var started time.Time
525523
var syncCount uint
526524
warnedLimit := false
@@ -562,12 +560,13 @@ func (cluster *mongoCluster) AcquireSocket(slaveOk bool, syncTimeout time.Durati
562560
continue
563561
}
564562

565-
s, abended, err := server.AcquireSocket(socketsPerServer, socketTimeout)
566-
if err == errSocketLimit {
563+
s, abended, err := server.AcquireSocket(poolLimit, socketTimeout)
564+
if err == errPoolLimit {
567565
if !warnedLimit {
566+
warnedLimit = true
568567
log("WARNING: Per-server connection limit reached.")
569568
}
570-
time.Sleep(1e8)
569+
time.Sleep(100 * time.Millisecond)
571570
continue
572571
}
573572
if err != nil {
@@ -582,7 +581,7 @@ func (cluster *mongoCluster) AcquireSocket(slaveOk bool, syncTimeout time.Durati
582581
logf("Cannot confirm server %s as master (%v)", server.Addr, err)
583582
s.Release()
584583
cluster.syncServers()
585-
time.Sleep(1e8)
584+
time.Sleep(100 * time.Millisecond)
586585
continue
587586
}
588587
}

cluster_test.go

+46-9
Original file line numberDiff line numberDiff line change
@@ -1151,13 +1151,48 @@ func (s *S) TestRemovalOfClusterMember(c *C) {
11511151
c.Log("========== Test succeeded. ==========")
11521152
}
11531153

1154-
func (s *S) TestSocketLimit(c *C) {
1154+
func (s *S) TestPoolLimitSimple(c *C) {
1155+
session, err := mgo.Dial("localhost:40001")
1156+
c.Assert(err, IsNil)
1157+
defer session.Close()
1158+
1159+
stats := mgo.GetStats()
1160+
for stats.MasterConns+stats.SlaveConns != 1 {
1161+
stats = mgo.GetStats()
1162+
c.Log("Waiting for connection to be established...")
1163+
time.Sleep(100 * time.Millisecond)
1164+
}
1165+
1166+
c.Assert(stats.SocketsAlive, Equals, 1)
1167+
c.Assert(stats.SocketsInUse, Equals, 0)
1168+
1169+
// Put one socket in use.
1170+
c.Assert(session.Ping(), IsNil)
1171+
1172+
done := make(chan time.Duration)
1173+
1174+
// Now block trying to get another one due to the pool limit.
1175+
go func() {
1176+
copy := session.Copy()
1177+
defer copy.Close()
1178+
copy.SetPoolLimit(1)
1179+
started := time.Now()
1180+
c.Check(copy.Ping(), IsNil)
1181+
done <- time.Now().Sub(started)
1182+
}()
1183+
1184+
time.Sleep(500 * time.Millisecond)
1185+
1186+
// Put the one socket back in the pool, freeing it for the copy.
1187+
session.Refresh()
1188+
delay := <-done
1189+
c.Assert(delay > 500 * time.Millisecond, Equals, true, Commentf("Delay: %s", delay))
1190+
}
1191+
1192+
func (s *S) TestPoolLimitMany(c *C) {
11551193
if *fast {
11561194
c.Skip("-fast")
11571195
}
1158-
const socketLimit = 64
1159-
restore := mgo.HackSocketsPerServer(socketLimit)
1160-
defer restore()
11611196

11621197
session, err := mgo.Dial("localhost:40011")
11631198
c.Assert(err, IsNil)
@@ -1167,17 +1202,19 @@ func (s *S) TestSocketLimit(c *C) {
11671202
for stats.MasterConns+stats.SlaveConns != 3 {
11681203
stats = mgo.GetStats()
11691204
c.Log("Waiting for all connections to be established...")
1170-
time.Sleep(5e8)
1205+
time.Sleep(500 * time.Millisecond)
11711206
}
11721207
c.Assert(stats.SocketsAlive, Equals, 3)
11731208

1209+
const poolLimit = 64
1210+
session.SetPoolLimit(poolLimit)
1211+
11741212
// Consume the whole limit for the master.
11751213
var master []*mgo.Session
1176-
for i := 0; i < socketLimit; i++ {
1214+
for i := 0; i < poolLimit; i++ {
11771215
s := session.Copy()
11781216
defer s.Close()
1179-
err := s.Ping()
1180-
c.Assert(err, IsNil)
1217+
c.Assert(s.Ping(), IsNil)
11811218
master = append(master, s)
11821219
}
11831220

@@ -1187,7 +1224,7 @@ func (s *S) TestSocketLimit(c *C) {
11871224
master[0].Refresh()
11881225
}()
11891226

1190-
// Now a single ping must block, since it would need another
1227+
// Then, a single ping must block, since it would need another
11911228
// connection to the master, over the limit. Once the goroutine
11921229
// above releases its socket, it should move on.
11931230
session.Ping()

export_test.go

-9
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,6 @@ import (
44
"time"
55
)
66

7-
func HackSocketsPerServer(newLimit int) (restore func()) {
8-
oldLimit := newLimit
9-
restore = func() {
10-
socketsPerServer = oldLimit
11-
}
12-
socketsPerServer = newLimit
13-
return
14-
}
15-
167
func HackPingDelay(newDelay time.Duration) (restore func()) {
178
globalMutex.Lock()
189
defer globalMutex.Unlock()

server.go

+7-6
Original file line numberDiff line numberDiff line change
@@ -89,17 +89,18 @@ func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer) *
8989
return server
9090
}
9191

92-
var errSocketLimit = errors.New("per-server connection limit reached")
92+
var errPoolLimit = errors.New("per-server connection limit reached")
9393
var errServerClosed = errors.New("server was closed")
9494

9595
// AcquireSocket returns a socket for communicating with the server.
9696
// This will attempt to reuse an old connection, if one is available. Otherwise,
9797
// it will establish a new one. The returned socket is owned by the call site,
9898
// and will return to the cache when the socket has its Release method called
9999
// the same number of times as AcquireSocket + Acquire were called for it.
100-
// If the limit argument is not zero, a socket will only be returned if the
101-
// number of sockets in use for this server is under the provided limit.
102-
func (server *mongoServer) AcquireSocket(limit int, timeout time.Duration) (socket *mongoSocket, abended bool, err error) {
100+
// If the poolLimit argument is greater than zero and the number of sockets in
101+
// use in this server is greater than the provided limit, errPoolLimit is
102+
// returned.
103+
func (server *mongoServer) AcquireSocket(poolLimit int, timeout time.Duration) (socket *mongoSocket, abended bool, err error) {
103104
for {
104105
server.Lock()
105106
abended = server.abended
@@ -108,9 +109,9 @@ func (server *mongoServer) AcquireSocket(limit int, timeout time.Duration) (sock
108109
return nil, abended, errServerClosed
109110
}
110111
n := len(server.unusedSockets)
111-
if limit > 0 && len(server.liveSockets)-n >= limit {
112+
if poolLimit > 0 && len(server.liveSockets)-n >= poolLimit {
112113
server.Unlock()
113-
return nil, false, errSocketLimit
114+
return nil, false, errPoolLimit
114115
}
115116
if n > 0 {
116117
socket = server.unusedSockets[n-1]

session.go

+23-2
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ type Session struct {
7070
sourcedb string
7171
dialCred *Credential
7272
creds []Credential
73+
poolLimit int
7374
}
7475

7576
type Database struct {
@@ -431,7 +432,12 @@ func parseURL(s string) (*urlInfo, error) {
431432

432433
func newSession(consistency mode, cluster *mongoCluster, timeout time.Duration) (session *Session) {
433434
cluster.Acquire()
434-
session = &Session{cluster_: cluster, syncTimeout: timeout, sockTimeout: timeout}
435+
session = &Session{
436+
cluster_: cluster,
437+
syncTimeout: timeout,
438+
sockTimeout: timeout,
439+
poolLimit: 4096,
440+
}
435441
debugf("New session %p on cluster %p", session, cluster)
436442
session.SetMode(consistency, true)
437443
session.SetSafe(&Safe{})
@@ -1368,6 +1374,21 @@ func (s *Session) SetCursorTimeout(d time.Duration) {
13681374
s.m.Unlock()
13691375
}
13701376

1377+
// SetPoolLimit sets the maximum number of sockets in use in a single server
1378+
// before this session will block waiting for a socket to be available.
1379+
// The default limit is 4096.
1380+
//
1381+
// This limit must be set to cover more than any expected workload of the
1382+
// application. It is a bad practice and an unsupported use case to use the
1383+
// database driver to define the concurrency limit of an application. Prevent
1384+
// such concurrency "at the door" instead, by properly restricting the amount
1385+
// of used resources and number of goroutines before they are created.
1386+
func (s *Session) SetPoolLimit(limit int) {
1387+
s.m.Lock()
1388+
s.poolLimit = limit
1389+
s.m.Unlock()
1390+
}
1391+
13711392
// SetBatch sets the default batch size used when fetching documents from the
13721393
// database. It's possible to change this setting on a per-query basis as
13731394
// well, using the Query.Batch method.
@@ -3365,7 +3386,7 @@ func (s *Session) acquireSocket(slaveOk bool) (*mongoSocket, error) {
33653386
}
33663387

33673388
// Still not good. We need a new socket.
3368-
sock, err := s.cluster().AcquireSocket(slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags)
3389+
sock, err := s.cluster().AcquireSocket(slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit)
33693390
if err != nil {
33703391
return nil, err
33713392
}

0 commit comments

Comments
 (0)