Skip to content

Commit 2b60a61

Browse files
committed
quic: fix several bugs in flow control accounting
Connection-level flow control sets a bound on the total maximum stream offset of all data sent, not the total amount of bytes sent in STREAM frames. For example, if we send the bytes [0,10) for a stream, and then retransmit the same bytes due to packet loss, that consumes 10 bytes of connection-level flow, not 20. We were incorrectly tracking total bytes sent. Fix this. We were blocking retransmission of data in lost STREAM frames on availability of connection-level flow control. We now place a stream with retransmitted data on queueMeta (non-flow-controlled data), since we have already accounted for the flow control window consumption of the data. We were incorrectly marking a stream as being able to send an empty STREAM frame with a FIN bit, when the stream was actually blocked on stream-level flow control. Fix this. For golang/go#58547 Change-Id: Ib2ace94183750078a19d945256507060ea786735 Reviewed-on: https://go-review.googlesource.com/c/net/+/532716 LUCI-TryBot-Result: Go LUCI <[email protected]> Reviewed-by: Jonathan Amsterdam <[email protected]>
1 parent 73d82ef commit 2b60a61

File tree

3 files changed

+90
-5
lines changed

3 files changed

+90
-5
lines changed

internal/quic/conn_flow_test.go

+34
Original file line numberDiff line numberDiff line change
@@ -394,3 +394,37 @@ func TestConnOutflowMetaAndData(t *testing.T) {
394394
data: data,
395395
})
396396
}
397+
398+
func TestConnOutflowResentData(t *testing.T) {
399+
tc, s := newTestConnAndLocalStream(t, clientSide, bidiStream,
400+
permissiveTransportParameters,
401+
func(p *transportParameters) {
402+
p.initialMaxData = 10
403+
})
404+
tc.ignoreFrame(frameTypeAck)
405+
406+
data := makeTestData(15)
407+
s.Write(data[:8])
408+
tc.wantFrame("data is under MAX_DATA limit, all sent",
409+
packetType1RTT, debugFrameStream{
410+
id: s.id,
411+
data: data[:8],
412+
})
413+
414+
// Lose the last STREAM packet.
415+
const pto = false
416+
tc.triggerLossOrPTO(packetType1RTT, false)
417+
tc.wantFrame("lost STREAM data is retransmitted",
418+
packetType1RTT, debugFrameStream{
419+
id: s.id,
420+
data: data[:8],
421+
})
422+
423+
s.Write(data[8:])
424+
tc.wantFrame("new data is sent up to the MAX_DATA limit",
425+
packetType1RTT, debugFrameStream{
426+
id: s.id,
427+
off: 8,
428+
data: data[8:10],
429+
})
430+
}

internal/quic/stream.go

+18-5
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ type Stream struct {
3939
outgate gate
4040
out pipe // buffered data to send
4141
outwin int64 // maximum MAX_STREAM_DATA received from the peer
42+
outmaxsent int64 // maximum data offset we've sent to the peer
4243
outmaxbuf int64 // maximum amount of data we will buffer
4344
outunsent rangeset[int64] // ranges buffered but not yet sent
4445
outacked rangeset[int64] // ranges sent and acknowledged
@@ -494,8 +495,12 @@ func (s *Stream) outUnlockNoQueue() streamState {
494495
case s.outblocked.shouldSend(): // STREAM_DATA_BLOCKED
495496
state = streamOutSendMeta
496497
case len(s.outunsent) > 0: // STREAM frame with data
497-
state = streamOutSendData
498-
case s.outclosed.shouldSend(): // STREAM frame with FIN bit, all data already sent
498+
if s.outunsent.min() < s.outmaxsent {
499+
state = streamOutSendMeta // resent data, will not consume flow control
500+
} else {
501+
state = streamOutSendData // new data, requires flow control
502+
}
503+
case s.outclosed.shouldSend() && s.out.end == s.outmaxsent: // empty STREAM frame with FIN bit
499504
state = streamOutSendMeta
500505
case s.outopened.shouldSend(): // STREAM frame with no data
501506
state = streamOutSendMeta
@@ -725,7 +730,11 @@ func (s *Stream) appendOutFramesLocked(w *packetWriter, pnum packetNumber, pto b
725730
for {
726731
// STREAM
727732
off, size := dataToSend(min(s.out.start, s.outwin), min(s.out.end, s.outwin), s.outunsent, s.outacked, pto)
728-
size = min(size, s.conn.streams.outflow.avail())
733+
if end := off + size; end > s.outmaxsent {
734+
// This will require connection-level flow control to send.
735+
end = min(end, s.outmaxsent+s.conn.streams.outflow.avail())
736+
size = end - off
737+
}
729738
fin := s.outclosed.isSet() && off+size == s.out.end
730739
shouldSend := size > 0 || // have data to send
731740
s.outopened.shouldSendPTO(pto) || // should open the stream
@@ -738,8 +747,12 @@ func (s *Stream) appendOutFramesLocked(w *packetWriter, pnum packetNumber, pto b
738747
return false
739748
}
740749
s.out.copy(off, b)
741-
s.conn.streams.outflow.consume(int64(len(b)))
742-
s.outunsent.sub(off, off+int64(len(b)))
750+
end := off + int64(len(b))
751+
if end > s.outmaxsent {
752+
s.conn.streams.outflow.consume(end - s.outmaxsent)
753+
s.outmaxsent = end
754+
}
755+
s.outunsent.sub(off, end)
743756
s.frameOpensStream(pnum)
744757
if fin {
745758
s.outclosed.setSent(pnum)

internal/quic/stream_test.go

+38
Original file line numberDiff line numberDiff line change
@@ -1094,6 +1094,44 @@ func TestStreamCloseUnblocked(t *testing.T) {
10941094
}
10951095
}
10961096

1097+
func TestStreamCloseWriteWhenBlockedByStreamFlowControl(t *testing.T) {
1098+
ctx := canceledContext()
1099+
tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, permissiveTransportParameters,
1100+
func(p *transportParameters) {
1101+
//p.initialMaxData = 0
1102+
p.initialMaxStreamDataUni = 0
1103+
})
1104+
tc.ignoreFrame(frameTypeStreamDataBlocked)
1105+
if _, err := s.WriteContext(ctx, []byte{0, 1}); err != nil {
1106+
t.Fatalf("s.Write = %v", err)
1107+
}
1108+
s.CloseWrite()
1109+
tc.wantIdle("stream write is blocked by flow control")
1110+
1111+
tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
1112+
id: s.id,
1113+
max: 1,
1114+
})
1115+
tc.wantFrame("send data up to flow control limit",
1116+
packetType1RTT, debugFrameStream{
1117+
id: s.id,
1118+
data: []byte{0},
1119+
})
1120+
tc.wantIdle("stream write is again blocked by flow control")
1121+
1122+
tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
1123+
id: s.id,
1124+
max: 2,
1125+
})
1126+
tc.wantFrame("send remaining data and FIN",
1127+
packetType1RTT, debugFrameStream{
1128+
id: s.id,
1129+
off: 1,
1130+
data: []byte{1},
1131+
fin: true,
1132+
})
1133+
}
1134+
10971135
func TestStreamPeerResetsWithUnreadAndUnsentData(t *testing.T) {
10981136
testStreamTypes(t, "", func(t *testing.T, styp streamType) {
10991137
ctx := canceledContext()

0 commit comments

Comments
 (0)