File tree 2 files changed +49
-0
lines changed
2 files changed +49
-0
lines changed Original file line number Diff line number Diff line change @@ -36,6 +36,13 @@ func newSendQueue(conn sendConn) sender {
36
36
func (h * sendQueue ) Send (p * packetBuffer ) {
37
37
select {
38
38
case h .queue <- p :
39
+ // clear available channel if we've reached capacity
40
+ if len (h .queue ) == sendQueueCapacity {
41
+ select {
42
+ case <- h .available :
43
+ default :
44
+ }
45
+ }
39
46
case <- h .runStopped :
40
47
default :
41
48
panic ("sendQueue.Send would have blocked" )
Original file line number Diff line number Diff line change @@ -73,6 +73,48 @@ var _ = Describe("Send Queue", func() {
73
73
Eventually (done ).Should (BeClosed ())
74
74
})
75
75
76
+ It ("signals when sending is possible again, when the first write succeeded" , func () {
77
+ write := make (chan struct {}, 1 )
78
+ written := make (chan struct {}, 100 )
79
+ // now start sending out packets. This should free up queue space.
80
+ c .EXPECT ().Write (gomock .Any ()).DoAndReturn (func (b []byte ) error {
81
+ <- write
82
+ written <- struct {}{}
83
+ return nil
84
+ }).AnyTimes ()
85
+ // allow the first packet to be sent immediately
86
+ write <- struct {}{}
87
+
88
+ done := make (chan struct {})
89
+ go func () {
90
+ defer GinkgoRecover ()
91
+ q .Run ()
92
+ close (done )
93
+ }()
94
+
95
+ q .Send (getPacket ([]byte ("foobar" )))
96
+ <- written
97
+
98
+ // now fill up the send queue
99
+ for i := 0 ; i < sendQueueCapacity + 1 ; i ++ {
100
+ Expect (q .WouldBlock ()).To (BeFalse ())
101
+ q .Send (getPacket ([]byte ("foobar" )))
102
+ }
103
+
104
+ Expect (q .WouldBlock ()).To (BeTrue ())
105
+ Consistently (q .Available ()).ShouldNot (Receive ())
106
+ write <- struct {}{}
107
+ Eventually (q .Available ()).Should (Receive ())
108
+
109
+ // test shutdown
110
+ for i := 0 ; i < sendQueueCapacity ; i ++ {
111
+ write <- struct {}{}
112
+ }
113
+
114
+ q .Close ()
115
+ Eventually (done ).Should (BeClosed ())
116
+ })
117
+
76
118
It ("does not block pending send after the queue has stopped running" , func () {
77
119
done := make (chan struct {})
78
120
go func () {
You can’t perform that action at this time.
0 commit comments