Skip to content

Commit d7c44c7

Browse files
committed
Better rate limited message.
1 parent f6d6826 commit d7c44c7

File tree

7 files changed

+69
-37
lines changed

7 files changed

+69
-37
lines changed

.travis.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ services:
55
- redis-server
66

77
go:
8-
- 1.3
98
- 1.4
109
- 1.5
1110
- tip

multi.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,16 +46,15 @@ func (c *Client) Multi() *Multi {
4646
return multi
4747
}
4848

49-
func (c *Multi) putConn(cn *conn, ei error) {
50-
var err error
51-
if isBadConn(cn, ei) {
49+
func (c *Multi) putConn(cn *conn, err error) {
50+
if isBadConn(cn, err) {
5251
// Close current connection.
53-
c.base.connPool.(*stickyConnPool).Reset()
52+
c.base.connPool.(*stickyConnPool).Reset(err)
5453
} else {
55-
err = c.base.connPool.Put(cn)
56-
}
57-
if err != nil {
58-
log.Printf("redis: putConn failed: %s", err)
54+
err := c.base.connPool.Put(cn)
55+
if err != nil {
56+
log.Printf("redis: putConn failed: %s", err)
57+
}
5958
}
6059
}
6160

pool.go

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ type pool interface {
2020
First() *conn
2121
Get() (*conn, bool, error)
2222
Put(*conn) error
23-
Remove(*conn) error
23+
Remove(*conn, error) error
2424
Len() int
2525
FreeLen() int
2626
Close() error
@@ -130,7 +130,7 @@ type connPool struct {
130130

131131
_closed int32
132132

133-
lastDialErr error
133+
lastErr atomic.Value
134134
}
135135

136136
func newConnPool(opt *Options) *connPool {
@@ -204,15 +204,15 @@ func (p *connPool) wait() *conn {
204204
func (p *connPool) new() (*conn, error) {
205205
if p.rl.Limit() {
206206
err := fmt.Errorf(
207-
"redis: you open connections too fast (last error: %v)",
208-
p.lastDialErr,
207+
"redis: you open connections too fast (last_error=%q)",
208+
p.loadLastErr(),
209209
)
210210
return nil, err
211211
}
212212

213213
cn, err := p.dialer()
214214
if err != nil {
215-
p.lastDialErr = err
215+
p.storeLastErr(err.Error())
216216
return nil, err
217217
}
218218

@@ -255,8 +255,9 @@ func (p *connPool) Get() (cn *conn, isNew bool, err error) {
255255
func (p *connPool) Put(cn *conn) error {
256256
if cn.rd.Buffered() != 0 {
257257
b, _ := cn.rd.Peek(cn.rd.Buffered())
258-
log.Printf("redis: connection has unread data: %q", b)
259-
return p.Remove(cn)
258+
err := fmt.Errorf("redis: connection has unread data: %q", b)
259+
log.Print(err)
260+
return p.Remove(cn, err)
260261
}
261262
if p.opt.getIdleTimeout() > 0 {
262263
cn.usedAt = time.Now()
@@ -275,7 +276,9 @@ func (p *connPool) replace(cn *conn) (*conn, error) {
275276
return newcn, nil
276277
}
277278

278-
func (p *connPool) Remove(cn *conn) error {
279+
func (p *connPool) Remove(cn *conn, reason error) error {
280+
p.storeLastErr(reason.Error())
281+
279282
// Replace existing connection with new one and unblock waiter.
280283
newcn, err := p.replace(cn)
281284
if err != nil {
@@ -330,6 +333,17 @@ func (p *connPool) reaper() {
330333
}
331334
}
332335

336+
func (p *connPool) storeLastErr(err string) {
337+
p.lastErr.Store(err)
338+
}
339+
340+
func (p *connPool) loadLastErr() string {
341+
if v := p.lastErr.Load(); v != nil {
342+
return v.(string)
343+
}
344+
return ""
345+
}
346+
333347
//------------------------------------------------------------------------------
334348

335349
type singleConnPool struct {
@@ -357,7 +371,7 @@ func (p *singleConnPool) Put(cn *conn) error {
357371
return nil
358372
}
359373

360-
func (p *singleConnPool) Remove(cn *conn) error {
374+
func (p *singleConnPool) Remove(cn *conn, _ error) error {
361375
if p.cn != cn {
362376
panic("p.cn != cn")
363377
}
@@ -440,13 +454,13 @@ func (p *stickyConnPool) Put(cn *conn) error {
440454
return nil
441455
}
442456

443-
func (p *stickyConnPool) remove() (err error) {
444-
err = p.pool.Remove(p.cn)
457+
func (p *stickyConnPool) remove(reason error) (err error) {
458+
err = p.pool.Remove(p.cn, reason)
445459
p.cn = nil
446460
return err
447461
}
448462

449-
func (p *stickyConnPool) Remove(cn *conn) error {
463+
func (p *stickyConnPool) Remove(cn *conn, _ error) error {
450464
defer p.mx.Unlock()
451465
p.mx.Lock()
452466
if p.closed {
@@ -479,10 +493,10 @@ func (p *stickyConnPool) FreeLen() int {
479493
return 0
480494
}
481495

482-
func (p *stickyConnPool) Reset() (err error) {
496+
func (p *stickyConnPool) Reset(reason error) (err error) {
483497
p.mx.Lock()
484498
if p.cn != nil {
485-
err = p.remove()
499+
err = p.remove(reason)
486500
}
487501
p.mx.Unlock()
488502
return err
@@ -500,7 +514,8 @@ func (p *stickyConnPool) Close() error {
500514
if p.reusable {
501515
err = p.put()
502516
} else {
503-
err = p.remove()
517+
reason := errors.New("redis: sticky not reusable connection")
518+
err = p.remove(reason)
504519
}
505520
}
506521
return err

pool_test.go

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package redis_test
22

33
import (
4+
"errors"
45
"sync"
56
"testing"
67
"time"
@@ -36,7 +37,6 @@ var _ = Describe("pool", func() {
3637
})
3738

3839
AfterEach(func() {
39-
Expect(client.FlushDb().Err()).NotTo(HaveOccurred())
4040
Expect(client.Close()).NotTo(HaveOccurred())
4141
})
4242

@@ -141,12 +141,12 @@ var _ = Describe("pool", func() {
141141
pool := client.Pool()
142142

143143
// Reserve one connection.
144-
cn, _, err := client.Pool().Get()
144+
cn, _, err := pool.Get()
145145
Expect(err).NotTo(HaveOccurred())
146146

147147
// Reserve the rest of connections.
148148
for i := 0; i < 9; i++ {
149-
_, _, err := client.Pool().Get()
149+
_, _, err := pool.Get()
150150
Expect(err).NotTo(HaveOccurred())
151151
}
152152

@@ -168,7 +168,8 @@ var _ = Describe("pool", func() {
168168
// ok
169169
}
170170

171-
Expect(pool.Remove(cn)).NotTo(HaveOccurred())
171+
err = pool.Remove(cn, errors.New("test"))
172+
Expect(err).NotTo(HaveOccurred())
172173

173174
// Check that Ping is unblocked.
174175
select {
@@ -179,6 +180,23 @@ var _ = Describe("pool", func() {
179180
}
180181
Expect(ping.Err()).NotTo(HaveOccurred())
181182
})
183+
184+
It("should rate limit dial", func() {
185+
pool := client.Pool()
186+
187+
var rateErr error
188+
for i := 0; i < 1000; i++ {
189+
cn, _, err := pool.Get()
190+
if err != nil {
191+
rateErr = err
192+
break
193+
}
194+
195+
_ = pool.Remove(cn, errors.New("test"))
196+
}
197+
198+
Expect(rateErr).To(MatchError(`redis: you open connections too fast (last_error="test")`))
199+
})
182200
})
183201

184202
func BenchmarkPool(b *testing.B) {

pubsub.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -233,9 +233,9 @@ func (c *PubSub) Receive() (interface{}, error) {
233233
return c.ReceiveTimeout(0)
234234
}
235235

236-
func (c *PubSub) reconnect() {
236+
func (c *PubSub) reconnect(reason error) {
237237
// Close current connection.
238-
c.connPool.(*stickyConnPool).Reset()
238+
c.connPool.(*stickyConnPool).Reset(reason)
239239

240240
if len(c.channels) > 0 {
241241
if err := c.Subscribe(c.channels...); err != nil {
@@ -276,7 +276,7 @@ func (c *PubSub) ReceiveMessage() (*Message, error) {
276276
if errNum > 2 {
277277
time.Sleep(time.Second)
278278
}
279-
c.reconnect()
279+
c.reconnect(err)
280280
continue
281281
}
282282

redis.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,9 @@ func (c *baseClient) conn() (*conn, bool, error) {
2020
return c.connPool.Get()
2121
}
2222

23-
func (c *baseClient) putConn(cn *conn, ei error) {
24-
var err error
25-
if isBadConn(cn, ei) {
26-
err = c.connPool.Remove(cn)
23+
func (c *baseClient) putConn(cn *conn, err error) {
24+
if isBadConn(cn, err) {
25+
err = c.connPool.Remove(cn, err)
2726
} else {
2827
err = c.connPool.Put(cn)
2928
}

sentinel.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package redis
22

33
import (
44
"errors"
5+
"fmt"
56
"log"
67
"net"
78
"strings"
@@ -227,11 +228,12 @@ func (d *sentinelFailover) closeOldConns(newMaster string) {
227228
break
228229
}
229230
if cn.RemoteAddr().String() != newMaster {
230-
log.Printf(
231+
err := fmt.Errorf(
231232
"redis-sentinel: closing connection to the old master %s",
232233
cn.RemoteAddr(),
233234
)
234-
d.pool.Remove(cn)
235+
log.Print(err)
236+
d.pool.Remove(cn, err)
235237
} else {
236238
cnsToPut = append(cnsToPut, cn)
237239
}

0 commit comments

Comments
 (0)