Skip to content

Commit 98d1c12

Browse files
committed
fix: do not assign capacity for pending streams
`Prioritize::send_data` has a check to prevent assigning capacity to streams that are not yet open. Assigning flow control window to pending streams could starve already open streams. This change adds a similar check to `Prioritize::reserve_capacity`. Test `capacity_not_assigned_to_unopened_streams` in `flow_control.rs` demonstrates the fix. A number of other tests must be changed because they were assuming that pending streams immediately received connection capacity. This may be related to #853.
1 parent b9d5397 commit 98d1c12

File tree

4 files changed

+82
-19
lines changed

4 files changed

+82
-19
lines changed

src/proto/streams/prioritize.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,9 @@ impl Prioritize {
284284
// Try to assign additional capacity to the stream. If none is
285285
// currently available, the stream will be queued to receive some
286286
// when more becomes available.
287-
self.try_assign_capacity(stream);
287+
if !stream.is_pending_open {
288+
self.try_assign_capacity(stream);
289+
}
288290
}
289291
}
290292
}

tests/h2-tests/tests/flow_control.rs

Lines changed: 62 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ async fn stream_close_by_data_frame_releases_capacity() {
391391

392392
// The capacity should be immediately available as nothing else is
393393
// happening on the stream.
394-
assert_eq!(s1.capacity(), window_size);
394+
let mut s1 = h2.drive(util::wait_for_capacity(s1, window_size)).await;
395395

396396
let request = Request::builder()
397397
.method(Method::POST)
@@ -414,7 +414,7 @@ async fn stream_close_by_data_frame_releases_capacity() {
414414
s1.send_data("".into(), true).unwrap();
415415

416416
// The capacity should be available
417-
assert_eq!(s2.capacity(), 5);
417+
let mut s2 = h2.drive(util::wait_for_capacity(s2, 5)).await;
418418

419419
// Send the frame
420420
s2.send_data("hello".into(), true).unwrap();
@@ -461,9 +461,7 @@ async fn stream_close_by_trailers_frame_releases_capacity() {
461461
// This effectively reserves the entire connection window
462462
s1.reserve_capacity(window_size);
463463

464-
// The capacity should be immediately available as nothing else is
465-
// happening on the stream.
466-
assert_eq!(s1.capacity(), window_size);
464+
let mut s1 = h2.drive(util::wait_for_capacity(s1, window_size)).await;
467465

468466
let request = Request::builder()
469467
.method(Method::POST)
@@ -486,7 +484,7 @@ async fn stream_close_by_trailers_frame_releases_capacity() {
486484
s1.send_trailers(Default::default()).unwrap();
487485

488486
// The capacity should be available
489-
assert_eq!(s2.capacity(), 5);
487+
let mut s2 = h2.drive(util::wait_for_capacity(s2, 5)).await;
490488

491489
// Send the frame
492490
s2.send_data("hello".into(), true).unwrap();
@@ -919,10 +917,10 @@ async fn recv_no_init_window_then_receive_some_init_window() {
919917

920918
let (response, mut stream) = client.send_request(request, false).unwrap();
921919

922-
stream.reserve_capacity(11);
920+
stream.reserve_capacity(10);
923921

924-
let mut stream = h2.drive(util::wait_for_capacity(stream, 11)).await;
925-
assert_eq!(stream.capacity(), 11);
922+
let mut stream = h2.drive(util::wait_for_capacity(stream, 10)).await;
923+
assert_eq!(stream.capacity(), 10);
926924

927925
stream.send_data("hello world".into(), true).unwrap();
928926

@@ -1990,6 +1988,61 @@ async fn reclaim_reserved_capacity() {
19901988
join(mock, h2).await;
19911989
}
19921990

1991+
#[tokio::test]
1992+
async fn capacity_not_assigned_to_unopened_streams() {
1993+
h2_support::trace_init!();
1994+
1995+
let (io, mut srv) = mock::new();
1996+
1997+
let mock = async move {
1998+
let mut settings = frame::Settings::default();
1999+
settings.set_max_concurrent_streams(Some(1));
2000+
let settings = srv.assert_client_handshake_with_settings(settings).await;
2001+
assert_default_settings!(settings);
2002+
2003+
srv.recv_frame(frames::headers(1).request("POST", "https://www.example.com/"))
2004+
.await;
2005+
srv.recv_frame(frames::data(1, "hello")).await;
2006+
srv.recv_frame(frames::data(1, "world").eos()).await;
2007+
srv.send_frame(frames::headers(1).response(200).eos()).await;
2008+
2009+
srv.recv_frame(frames::headers(3).request("POST", "https://www.example.com/"))
2010+
.await;
2011+
srv.send_frame(frames::window_update(
2012+
0,
2013+
frame::DEFAULT_INITIAL_WINDOW_SIZE + 10,
2014+
))
2015+
.await;
2016+
srv.recv_frame(frames::reset(3).cancel()).await;
2017+
};
2018+
2019+
let h2 = async move {
2020+
let (mut client, mut h2) = client::handshake(io).await.unwrap();
2021+
let request = Request::builder()
2022+
.method(Method::POST)
2023+
.uri("https://www.example.com/")
2024+
.body(())
2025+
.unwrap();
2026+
2027+
let (response1, mut stream1) = client.send_request(request.clone(), false).unwrap();
2028+
stream1.send_data("hello".into(), false).unwrap();
2029+
let (_, mut stream2) = client.send_request(request.clone(), false).unwrap();
2030+
stream2.reserve_capacity(frame::DEFAULT_INITIAL_WINDOW_SIZE as usize);
2031+
stream1.send_data("world".into(), true).unwrap();
2032+
h2.drive(response1).await.unwrap();
2033+
let stream2 = h2
2034+
.drive(util::wait_for_capacity(
2035+
stream2,
2036+
frame::DEFAULT_INITIAL_WINDOW_SIZE as usize,
2037+
))
2038+
.await;
2039+
drop(stream2);
2040+
h2.await.unwrap();
2041+
};
2042+
2043+
join(mock, h2).await;
2044+
}
2045+
19932046
// ==== abusive window updates ====
19942047

19952048
#[tokio::test]

tests/h2-tests/tests/prioritization.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@ async fn single_stream_send_large_body() {
5252
stream.reserve_capacity(payload.len());
5353

5454
// The capacity should be immediately allocated
55-
assert_eq!(stream.capacity(), payload.len());
55+
let mut stream = h2
56+
.drive(util::wait_for_capacity(stream, payload.len()))
57+
.await;
5658

5759
// Send the data
5860
stream.send_data(payload.into(), true).unwrap();
@@ -108,7 +110,9 @@ async fn multiple_streams_with_payload_greater_than_default_window() {
108110
// The capacity should be immediately
109111
// allocated to default window size (smaller than payload)
110112
stream1.reserve_capacity(payload_clone.len());
111-
assert_eq!(stream1.capacity(), DEFAULT_WINDOW_SIZE);
113+
let mut stream1 = conn
114+
.drive(util::wait_for_capacity(stream1, DEFAULT_WINDOW_SIZE))
115+
.await;
112116

113117
stream2.reserve_capacity(payload_clone.len());
114118
assert_eq!(stream2.capacity(), 0);
@@ -179,7 +183,9 @@ async fn single_stream_send_extra_large_body_multi_frames_one_buffer() {
179183
stream.reserve_capacity(payload.len());
180184

181185
// The capacity should be immediately allocated
182-
assert_eq!(stream.capacity(), payload.len());
186+
let mut stream = h2
187+
.drive(util::wait_for_capacity(stream, payload.len()))
188+
.await;
183189

184190
// Send the data
185191
stream.send_data(payload.into(), true).unwrap();
@@ -296,13 +302,13 @@ async fn single_stream_send_extra_large_body_multi_frames_multi_buffer() {
296302
0, 0, 16, 1, 4, 0, 0, 0, 1, 131, 135, 65, 139, 157, 41, 172, 75, 143, 168, 233, 25, 151,
297303
33, 233, 132,
298304
])
305+
.write(frames::SETTINGS_ACK)
306+
.read(frames::SETTINGS_ACK)
299307
.write(&[
300308
// DATA
301309
0, 64, 0, 0, 0, 0, 0, 0, 1,
302310
])
303311
.write(&payload[0..16_384])
304-
.write(frames::SETTINGS_ACK)
305-
.read(frames::SETTINGS_ACK)
306312
.wait(Duration::from_millis(10))
307313
.write(&[
308314
// DATA
@@ -326,7 +332,9 @@ async fn single_stream_send_extra_large_body_multi_frames_multi_buffer() {
326332
stream.reserve_capacity(payload.len());
327333

328334
// The capacity should be immediately allocated
329-
assert_eq!(stream.capacity(), payload.len());
335+
let mut stream = h2
336+
.drive(util::wait_for_capacity(stream, payload.len()))
337+
.await;
330338

331339
// Send the data
332340
stream.send_data(payload.into(), true).unwrap();

tests/h2-tests/tests/stream_states.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,11 @@ async fn send_recv_data() {
5151
0, 0, 16, 1, 4, 0, 0, 0, 1, 131, 135, 65, 139, 157, 41, 172, 75, 143, 168, 233, 25, 151,
5252
33, 233, 132,
5353
])
54+
.write(frames::SETTINGS_ACK)
5455
.write(&[
5556
// DATA
5657
0, 0, 5, 0, 1, 0, 0, 0, 1, 104, 101, 108, 108, 111,
5758
])
58-
.write(frames::SETTINGS_ACK)
5959
// Read response
6060
.read(&[
6161
// HEADERS
@@ -78,10 +78,10 @@ async fn send_recv_data() {
7878
// Reserve send capacity
7979
stream.reserve_capacity(5);
8080

81-
assert_eq!(stream.capacity(), 5);
81+
let mut stream = h2.drive(util::wait_for_capacity(stream, 5)).await;
8282

8383
// Send the data
84-
stream.send_data("hello".as_bytes(), true).unwrap();
84+
stream.send_data("hello".into(), true).unwrap();
8585

8686
// Get the response
8787
let resp = h2.run(response).await.unwrap();

0 commit comments

Comments
 (0)