Skip to content

Commit

Permalink
Merge branch 'letsfire-master', closes #17
Browse files Browse the repository at this point in the history
  • Loading branch information
mna committed Jan 27, 2019
2 parents fe71609 + 037b19b commit 703f85d
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 10 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ Package redisc implements a redis cluster client built on top of the [redigo pac

## Releases

* **v1.1.4** : Add `Conn.DoWithTimeout` and `Conn.ReceiveWithTimeout` to match redigo's `ConnWithTimeout` interface (thanks to [@letsfire][letsfire]).

* **v1.1.3** : Fix handling of `ASK` replies in `RetryConn`.

* **v1.1.2** : Remove mention that `StartupNodes` in `Cluster` struct needs to be master nodes (it can be replicas). Add supporting test.
Expand Down Expand Up @@ -69,4 +71,4 @@ The [BSD 3-Clause license][bsd].
[rgc]: https://github.com/chasex/redis-go-cluster
[radix1]: https://github.com/fzzy/radix
[radix2]: https://github.com/mediocregopher/radix.v2

[letsfire]: https://github.com/letsfire
43 changes: 36 additions & 7 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/gomodule/redigo/redis"
)

var _ redis.Conn = (*Conn)(nil)
var _ redis.ConnWithTimeout = (*Conn)(nil)

// Conn is a redis cluster connection. When returned by Get
// or Dial, it is not yet bound to any node in the cluster.
Expand Down Expand Up @@ -232,19 +233,33 @@ func (c *Conn) ReadOnly() error {
// If the connection is not yet bound to a cluster node, it will be
// after this call, based on the rules documented in the Conn type.
func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) {
return c.DoWithTimeout(-1, cmd, args...)
}

// DoWithTimeout sends a command to the server and returns the received reply.
// If the connection is not yet bound to a cluster node, it will be
// after this call, based on the rules documented in the Conn type.
//
// The timeout overrides the read timeout set when dialing the
// connection (in the DialOptions of the Cluster).
func (c *Conn) DoWithTimeout(timeout time.Duration, cmd string, args ...interface{}) (v interface{}, err error) {
rc, _, err := c.bind(cmdSlot(cmd, args))
if err != nil {
return nil, err
}
v, err := rc.Do(cmd, args...)

if timeout < 0 {
v, err = rc.Do(cmd, args...)
} else if rcwt, ok := rc.(redis.ConnWithTimeout); ok {
v, err = rcwt.DoWithTimeout(timeout, cmd, args...)
} else {
return nil, errors.New("redisc: connection does not support ConnWithTimeout")
}
// handle redirections, if any
if re := ParseRedir(err); re != nil {
if re.Type == "MOVED" {
c.cluster.needsRefresh(re)
}
}

return v, err
}

Expand All @@ -263,19 +278,33 @@ func (c *Conn) Send(cmd string, args ...interface{}) error {
// is not yet bound to a cluster node, it will be after this call,
// based on the rules documented in the Conn type.
func (c *Conn) Receive() (interface{}, error) {
return c.ReceiveWithTimeout(-1)
}

// ReceiveWithTimeout receives a single reply from the Redis server.
// If the connection is not yet bound to a cluster node, it will be
// after this call, based on the rules documented in the Conn type.
//
// The timeout overrides the read timeout set when dialing the
// connection (in the DialOptions of the Cluster).
func (c *Conn) ReceiveWithTimeout(timeout time.Duration) (v interface{}, err error) {
rc, _, err := c.bind(-1)
if err != nil {
return nil, err
}
v, err := rc.Receive()

if timeout < 0 {
v, err = rc.Receive()
} else if rcwt, ok := rc.(redis.ConnWithTimeout); ok {
v, err = rcwt.ReceiveWithTimeout(timeout)
} else {
return nil, errors.New("redisc: connection does not support ConnWithTimeout")
}
// handle redirections, if any
if re := ParseRedir(err); re != nil {
if re.Type == "MOVED" {
c.cluster.needsRefresh(re)
}
}

return v, err
}

Expand Down
67 changes: 67 additions & 0 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package redisc

import (
"io"
"net"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -253,6 +254,72 @@ func TestConnBind(t *testing.T) {
assert.NoError(t, BindConn(conn2), "Bind without key")
}

func TestConnWithTimeout(t *testing.T) {
fn, ports := redistest.StartCluster(t, nil)
defer fn()

c := &Cluster{
StartupNodes: []string{":" + ports[0]},
DialOptions: []redis.DialOption{
redis.DialConnectTimeout(2 * time.Second),
redis.DialReadTimeout(time.Second),
},
}
require.NoError(t, c.Refresh(), "Refresh")

testConnDoWithTimeout(t, c)
testConnReceiveWithTimeout(t, c)
}

func testConnDoWithTimeout(t *testing.T, c *Cluster) {
conn1 := c.Get().(*Conn)
defer conn1.Close()

// Do fails because the default timeout is 1s, but command blocks for 2s
_, err1 := conn1.Do("BLPOP", "x", 2)
if assert.Error(t, err1, "Do") {
if assert.IsType(t, &net.OpError{}, err1) {
oe := err1.(*net.OpError)
assert.True(t, oe.Timeout(), "is timeout")
}
}

conn2 := c.Get().(*Conn)
defer conn2.Close()

// DoWithTimeout succeeds because overrides timeout with 3s.
v2, err2 := conn2.DoWithTimeout(time.Second*3, "BLPOP", "x", 2)
assert.NoError(t, err2, "DoWithTimeout")
assert.Equal(t, nil, v2, "expected result")
}

func testConnReceiveWithTimeout(t *testing.T, c *Cluster) {
conn1 := c.Get().(*Conn)
defer conn1.Close()

assert.NoError(t, conn1.Send("BLPOP", "x", 2), "Send")
assert.NoError(t, conn1.Flush(), "Flush")

// Receive fails with its default timeout of 1s vs Block command for 2s
_, err1 := conn1.Receive()
if assert.Error(t, err1, "Receive") {
if assert.IsType(t, &net.OpError{}, err1) {
oe := err1.(*net.OpError)
assert.True(t, oe.Timeout(), "is timeout")
}
}

conn2 := c.Get().(*Conn)
defer conn2.Close()

// ReceiveWithTimeout succeeds with timeout of 3s vs Block command for 2s
assert.NoError(t, conn2.Send("BLPOP", "x", 2), "Send")
assert.NoError(t, conn2.Flush(), "Flush")
v2, err2 := conn2.ReceiveWithTimeout(time.Second * 3)
assert.NoError(t, err2, "ReceiveWithTimeout")
assert.Equal(t, nil, v2, "expected result")
}

func TestConnClose(t *testing.T) {
c := &Cluster{
StartupNodes: []string{":6379"},
Expand Down
6 changes: 4 additions & 2 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
// for a redis.Pool from the redigo package.
//
// Similarly, the Conn type implements redigo's redis.Conn
// interface, so the API to execute commands is the same -
// interface (and the augmented redis.ConnWithTimeout one too),
// so the API to execute commands is the same -
// in fact the redisc package uses the redigo package as its
// only third-party dependency.
//
Expand Down Expand Up @@ -83,7 +84,8 @@
// Connection
//
// The connection returned from Get or Dial is a redigo redis.Conn
// interface, with a concrete type of *Conn. In addition to the
// interface (that also implements redis.ConnWithTimeout),
// with a concrete type of *Conn. In addition to the
// interface's required methods, *Conn adds the following methods:
//
// Bind(...string) error
Expand Down

0 comments on commit 703f85d

Please sign in to comment.