Skip to content

Commit 2add8bd

Browse files
committed
test: Add tests for cancelled context
1 parent 345ad75 commit 2add8bd

File tree

3 files changed

+76
-0
lines changed

3 files changed

+76
-0
lines changed

commands_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4843,6 +4843,24 @@ var _ = Describe("Commands", func() {
48434843
Expect(err).To(Equal(redis.Nil))
48444844
})
48454845

4846+
Describe("canceled context", func() {
4847+
It("should unblock XRead", func() {
4848+
ctx2, cancel := context.WithCancel(ctx)
4849+
errCh := make(chan error, 1)
4850+
go func() {
4851+
errCh <- client.XRead(ctx2, &redis.XReadArgs{
4852+
Streams: []string{"stream", "$"},
4853+
}).Err()
4854+
}()
4855+
4856+
var gotErr error
4857+
Consistently(errCh).ShouldNot(Receive(&gotErr), "Received %v", gotErr)
4858+
cancel()
4859+
Eventually(errCh).Should(Receive(&gotErr))
4860+
Expect(gotErr).To(HaveOccurred())
4861+
})
4862+
})
4863+
48464864
Describe("group", func() {
48474865
BeforeEach(func() {
48484866
err := client.XGroupCreate(ctx, "stream", "group", "0").Err()
@@ -5023,6 +5041,26 @@ var _ = Describe("Commands", func() {
50235041
Expect(err).NotTo(HaveOccurred())
50245042
Expect(n).To(Equal(int64(2)))
50255043
})
5044+
5045+
Describe("canceled context", func() {
5046+
It("should unblock XReadGroup", func() {
5047+
ctx2, cancel := context.WithCancel(ctx)
5048+
errCh := make(chan error, 1)
5049+
go func() {
5050+
errCh <- client.XReadGroup(ctx2, &redis.XReadGroupArgs{
5051+
Group: "group",
5052+
Consumer: "consumer",
5053+
Streams: []string{"stream", ">"},
5054+
}).Err()
5055+
}()
5056+
5057+
var gotErr error
5058+
Consistently(errCh).ShouldNot(Receive(&gotErr), "Received %v", gotErr)
5059+
cancel()
5060+
Eventually(errCh).Should(Receive(&gotErr))
5061+
Expect(gotErr).To(HaveOccurred())
5062+
})
5063+
})
50265064
})
50275065

50285066
Describe("xinfo", func() {

internal_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,4 +351,21 @@ var _ = Describe("withConn", func() {
351351
Expect(newConn).NotTo(Equal(conn))
352352
Expect(client.connPool.Len()).To(Equal(1))
353353
})
354+
355+
It("should remove the connection from the pool if the context is canceled", func() {
356+
var conn *pool.Conn
357+
358+
ctx2, cancel := context.WithCancel(ctx)
359+
cancel()
360+
361+
client.withConn(ctx2, func(ctx context.Context, c *pool.Conn) error {
362+
conn = c
363+
return nil
364+
})
365+
366+
newConn, err := client.connPool.Get(ctx)
367+
Expect(err).To(BeNil())
368+
Expect(newConn).NotTo(Equal(conn))
369+
Expect(client.connPool.Len()).To(Equal(1))
370+
})
354371
})

pubsub_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package redis_test
22

33
import (
4+
"context"
45
"io"
56
"net"
67
"sync"
@@ -567,4 +568,24 @@ var _ = Describe("PubSub", func() {
567568
Expect(msg.Channel).To(Equal("mychannel"))
568569
Expect(msg.Payload).To(Equal(text))
569570
})
571+
572+
Describe("canceled context", func() {
573+
It("should unblock ReceiveMessage", func() {
574+
pubsub := client.Subscribe(ctx, "mychannel")
575+
defer pubsub.Close()
576+
577+
ctx2, cancel := context.WithCancel(ctx)
578+
errCh := make(chan error, 1)
579+
go func() {
580+
_, err := pubsub.ReceiveMessage(ctx2)
581+
errCh <- err
582+
}()
583+
584+
var gotErr error
585+
Consistently(errCh).ShouldNot(Receive(&gotErr), "Received %v", gotErr)
586+
cancel()
587+
Eventually(errCh).Should(Receive(&gotErr))
588+
Expect(gotErr).To(HaveOccurred())
589+
})
590+
})
570591
})

0 commit comments

Comments
 (0)