From 55a9586e7018763b3053e8130bbf1cfe41da4887 Mon Sep 17 00:00:00 2001 From: letsfire <249008728@qq.com> Date: Sat, 26 Jan 2019 15:13:35 +0800 Subject: [PATCH 1/5] add support for ConnWithTimeout --- conn.go | 34 ++++++++++++++++++++++++++++------ 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/conn.go b/conn.go index 2951d1a..d1b9208 100644 --- a/conn.go +++ b/conn.go @@ -6,6 +6,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/gomodule/redigo/redis" ) @@ -232,19 +233,30 @@ func (c *Conn) ReadOnly() error { // If the connection is not yet bound to a cluster node, it will be // after this call, based on the rules documented in the Conn type. func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) { + return c.DoWithTimeout(-1, cmd, args...) +} + +// Do sends a command to the server and returns the received reply. +// The timeout overrides the write timeout set when dialing the +// connection. +func (c *Conn) DoWithTimeout(timeout time.Duration, cmd string, args ...interface{}) (v interface{}, err error) { rc, _, err := c.bind(cmdSlot(cmd, args)) if err != nil { return nil, err } - v, err := rc.Do(cmd, args...) - + if timeout < 0 { + v, err = rc.Do(cmd, args...) + } else if rcwt, ok := rc.(redis.ConnWithTimeout); ok { + v, err = rcwt.DoWithTimeout(timeout, cmd, args...) + } else { + return nil, errors.New("redis: connection does not support ConnWithTimeout") + } // handle redirections, if any if re := ParseRedir(err); re != nil { if re.Type == "MOVED" { c.cluster.needsRefresh(re) } } - return v, err } @@ -263,19 +275,29 @@ func (c *Conn) Send(cmd string, args ...interface{}) error { // is not yet bound to a cluster node, it will be after this call, // based on the rules documented in the Conn type. func (c *Conn) Receive() (interface{}, error) { + return c.ReceiveWithTimeout(-1) +} + +// Receive receives a single reply from the Redis server. The timeout +// overrides the read timeout set when dialing the connection. +func (c *Conn) ReceiveWithTimeout(timeout time.Duration) (v interface{}, err error) { rc, _, err := c.bind(-1) if err != nil { return nil, err } - v, err := rc.Receive() - + if timeout < 0 { + v, err = rc.Receive() + } else if rcwt, ok := rc.(redis.ConnWithTimeout); ok { + v, err = rcwt.ReceiveWithTimeout(timeout) + } else { + return nil, errors.New("redis: connection does not support ConnWithTimeout") + } // handle redirections, if any if re := ParseRedir(err); re != nil { if re.Type == "MOVED" { c.cluster.needsRefresh(re) } } - return v, err } From dc928636ad1609c48028f10656248d7f3dab3ef3 Mon Sep 17 00:00:00 2001 From: letsfire <249008728@qq.com> Date: Sat, 26 Jan 2019 15:22:12 +0800 Subject: [PATCH 2/5] add support for ConnWithTimeout --- conn.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/conn.go b/conn.go index d1b9208..0a07e5c 100644 --- a/conn.go +++ b/conn.go @@ -249,7 +249,7 @@ func (c *Conn) DoWithTimeout(timeout time.Duration, cmd string, args ...interfac } else if rcwt, ok := rc.(redis.ConnWithTimeout); ok { v, err = rcwt.DoWithTimeout(timeout, cmd, args...) } else { - return nil, errors.New("redis: connection does not support ConnWithTimeout") + return nil, errors.New("redisc: connection does not support ConnWithTimeout") } // handle redirections, if any if re := ParseRedir(err); re != nil { @@ -290,7 +290,7 @@ func (c *Conn) ReceiveWithTimeout(timeout time.Duration) (v interface{}, err err } else if rcwt, ok := rc.(redis.ConnWithTimeout); ok { v, err = rcwt.ReceiveWithTimeout(timeout) } else { - return nil, errors.New("redis: connection does not support ConnWithTimeout") + return nil, errors.New("redisc: connection does not support ConnWithTimeout") } // handle redirections, if any if re := ParseRedir(err); re != nil { From feb22289a0a8c50dad67c31ac37791272be4f53c Mon Sep 17 00:00:00 2001 From: letsfire <249008728@qq.com> Date: Sun, 27 Jan 2019 11:31:16 +0800 Subject: [PATCH 3/5] add test for ConnWithTimeout --- conn_test.go | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/conn_test.go b/conn_test.go index 3d7e74c..7394a75 100644 --- a/conn_test.go +++ b/conn_test.go @@ -253,6 +253,49 @@ func TestConnBind(t *testing.T) { assert.NoError(t, BindConn(conn2), "Bind without key") } +func TestConnWithTimeout(t *testing.T) { + fn, ports := redistest.StartCluster(t, nil) + defer fn() + + c := &Cluster{ + StartupNodes: []string{":" + ports[0]}, + DialOptions: []redis.DialOption{ + redis.DialReadTimeout(time.Second), + }, + } + require.NoError(t, c.Refresh(), "Refresh") + + conn1 := c.Get().(*Conn) + defer conn1.Close() + + _, err1 := conn1.Do("BLPOP", "x", 2) + assert.Error(t, err1, "Do") + + conn2 := c.Get().(*Conn) + defer conn2.Close() + + v2, err2 := conn2.DoWithTimeout(time.Second*3, "BLPOP", "x", 2) + assert.NoError(t, err2, "DoWithTimeout") + assert.Equal(t, nil, v2, "expected result") + + conn3 := c.Get().(*Conn) + defer conn3.Close() + + conn3.Send("BLPOP", "x", 2) + conn3.Flush() + _, err3 := conn3.Receive() + assert.Error(t, err3, "Receive") + + conn4 := c.Get().(*Conn) + defer conn4.Close() + + conn4.Send("BLPOP", "x", 2) + conn4.Flush() + v4, err4 := conn4.ReceiveWithTimeout(time.Second * 3) + assert.NoError(t, err4, "ReceiveWithTimeout") + assert.Equal(t, nil, v4, "expected result") +} + func TestConnClose(t *testing.T) { c := &Cluster{ StartupNodes: []string{":6379"}, From c50b3671c329df7bde541348c46441001364aed0 Mon Sep 17 00:00:00 2001 From: Martin Angers Date: Sun, 27 Jan 2019 11:06:48 -0500 Subject: [PATCH 4/5] split up do and receive tests --- conn_test.go | 52 ++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 38 insertions(+), 14 deletions(-) diff --git a/conn_test.go b/conn_test.go index 7394a75..f35b1fe 100644 --- a/conn_test.go +++ b/conn_test.go @@ -2,6 +2,7 @@ package redisc import ( "io" + "net" "strings" "testing" "time" @@ -260,40 +261,63 @@ func TestConnWithTimeout(t *testing.T) { c := &Cluster{ StartupNodes: []string{":" + ports[0]}, DialOptions: []redis.DialOption{ + redis.DialConnectTimeout(2 * time.Second), redis.DialReadTimeout(time.Second), }, } require.NoError(t, c.Refresh(), "Refresh") + testConnDoWithTimeout(t, c) + testConnReceiveWithTimeout(t, c) +} + +func testConnDoWithTimeout(t *testing.T, c *Cluster) { conn1 := c.Get().(*Conn) defer conn1.Close() + // Do fails because the default timeout is 1s, but command blocks for 2s _, err1 := conn1.Do("BLPOP", "x", 2) - assert.Error(t, err1, "Do") + if assert.Error(t, err1, "Do") { + if assert.IsType(t, &net.OpError{}, err1) { + oe := err1.(*net.OpError) + assert.True(t, oe.Timeout(), "is timeout") + } + } conn2 := c.Get().(*Conn) defer conn2.Close() + // DoWithTimeout succeeds because overrides timeout with 3s. v2, err2 := conn2.DoWithTimeout(time.Second*3, "BLPOP", "x", 2) assert.NoError(t, err2, "DoWithTimeout") assert.Equal(t, nil, v2, "expected result") +} - conn3 := c.Get().(*Conn) - defer conn3.Close() +func testConnReceiveWithTimeout(t *testing.T, c *Cluster) { + conn1 := c.Get().(*Conn) + defer conn1.Close() - conn3.Send("BLPOP", "x", 2) - conn3.Flush() - _, err3 := conn3.Receive() - assert.Error(t, err3, "Receive") + assert.NoError(t, conn1.Send("BLPOP", "x", 2), "Send") + assert.NoError(t, conn1.Flush(), "Flush") - conn4 := c.Get().(*Conn) - defer conn4.Close() + // Receive fails with its default timeout of 1s vs Block command for 2s + _, err1 := conn1.Receive() + if assert.Error(t, err1, "Receive") { + if assert.IsType(t, &net.OpError{}, err1) { + oe := err1.(*net.OpError) + assert.True(t, oe.Timeout(), "is timeout") + } + } + + conn2 := c.Get().(*Conn) + defer conn2.Close() - conn4.Send("BLPOP", "x", 2) - conn4.Flush() - v4, err4 := conn4.ReceiveWithTimeout(time.Second * 3) - assert.NoError(t, err4, "ReceiveWithTimeout") - assert.Equal(t, nil, v4, "expected result") + // ReceiveWithTimeout succeeds with timeout of 3s vs Block command for 2s + assert.NoError(t, conn2.Send("BLPOP", "x", 2), "Send") + assert.NoError(t, conn2.Flush(), "Flush") + v2, err2 := conn2.ReceiveWithTimeout(time.Second * 3) + assert.NoError(t, err2, "ReceiveWithTimeout") + assert.Equal(t, nil, v2, "expected result") } func TestConnClose(t *testing.T) { From 037b19b73a92fc357757f8892d4880e0f6d6fe82 Mon Sep 17 00:00:00 2001 From: Martin Angers Date: Sun, 27 Jan 2019 11:19:24 -0500 Subject: [PATCH 5/5] document new ConnWithTimeout support --- README.md | 4 +++- conn.go | 19 +++++++++++++------ doc.go | 6 ++++-- 3 files changed, 20 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 60149d9..e1c2ce9 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,8 @@ Package redisc implements a redis cluster client built on top of the [redigo pac ## Releases +* **v1.1.4** : Add `Conn.DoWithTimeout` and `Conn.ReceiveWithTimeout` to match redigo's `ConnWithTimeout` interface (thanks to [@letsfire][letsfire]). + * **v1.1.3** : Fix handling of `ASK` replies in `RetryConn`. * **v1.1.2** : Remove mention that `StartupNodes` in `Cluster` struct needs to be master nodes (it can be replicas). Add supporting test. @@ -69,4 +71,4 @@ The [BSD 3-Clause license][bsd]. [rgc]: https://github.com/chasex/redis-go-cluster [radix1]: https://github.com/fzzy/radix [radix2]: https://github.com/mediocregopher/radix.v2 - +[letsfire]: https://github.com/letsfire diff --git a/conn.go b/conn.go index 0a07e5c..19cf3c1 100644 --- a/conn.go +++ b/conn.go @@ -11,7 +11,7 @@ import ( "github.com/gomodule/redigo/redis" ) -var _ redis.Conn = (*Conn)(nil) +var _ redis.ConnWithTimeout = (*Conn)(nil) // Conn is a redis cluster connection. When returned by Get // or Dial, it is not yet bound to any node in the cluster. @@ -236,9 +236,12 @@ func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) { return c.DoWithTimeout(-1, cmd, args...) } -// Do sends a command to the server and returns the received reply. -// The timeout overrides the write timeout set when dialing the -// connection. +// DoWithTimeout sends a command to the server and returns the received reply. +// If the connection is not yet bound to a cluster node, it will be +// after this call, based on the rules documented in the Conn type. +// +// The timeout overrides the read timeout set when dialing the +// connection (in the DialOptions of the Cluster). func (c *Conn) DoWithTimeout(timeout time.Duration, cmd string, args ...interface{}) (v interface{}, err error) { rc, _, err := c.bind(cmdSlot(cmd, args)) if err != nil { @@ -278,8 +281,12 @@ func (c *Conn) Receive() (interface{}, error) { return c.ReceiveWithTimeout(-1) } -// Receive receives a single reply from the Redis server. The timeout -// overrides the read timeout set when dialing the connection. +// ReceiveWithTimeout receives a single reply from the Redis server. +// If the connection is not yet bound to a cluster node, it will be +// after this call, based on the rules documented in the Conn type. +// +// The timeout overrides the read timeout set when dialing the +// connection (in the DialOptions of the Cluster). func (c *Conn) ReceiveWithTimeout(timeout time.Duration) (v interface{}, err error) { rc, _, err := c.bind(-1) if err != nil { diff --git a/doc.go b/doc.go index d62922d..e9075b0 100644 --- a/doc.go +++ b/doc.go @@ -18,7 +18,8 @@ // for a redis.Pool from the redigo package. // // Similarly, the Conn type implements redigo's redis.Conn -// interface, so the API to execute commands is the same - +// interface (and the augmented redis.ConnWithTimeout one too), +// so the API to execute commands is the same - // in fact the redisc package uses the redigo package as its // only third-party dependency. // @@ -83,7 +84,8 @@ // Connection // // The connection returned from Get or Dial is a redigo redis.Conn -// interface, with a concrete type of *Conn. In addition to the +// interface (that also implements redis.ConnWithTimeout), +// with a concrete type of *Conn. In addition to the // interface's required methods, *Conn adds the following methods: // // Bind(...string) error