-
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
Version
List the version(s) of hyper, and any relevant hyper dependency (such as h2 if this is related to HTTP/2).
name = "hyper"
version = "1.6.0"
name = "hyper-rustls"
version = "0.27.5"
name = "hyper-util"
version = "0.1.10"
name = "reqwest"
version = "0.12.14"
name = "http"
version = "1.3.1"
Platform
The output of uname -a (UNIX), or version and 32 or 64-bit (Windows)
Darwin mac.lan 24.6.0 Darwin Kernel Version 24.6.0: Mon Jul 14 11:30:30 PDT 2025; root:xnu-11417.140.69~1/RELEASE_ARM64_T6020 arm64
Description
Hyper returns a partial byte array when its runtime is closed mid-read.
We have had several reports of panics when shutting down SlateDB:
The bytes range panic in the issue above seems to stem from receiving half an object from object storage (using the object_store crate, which uses reqwest and hyper). I was able to induce the error in SlateDB here (using Minio as the object store along with the object_store crate):
https://gist.github.com/criccomini/9a53ace1dc175c4bd157cf518f5d08eb
I was then able to induce the error outside of SlateDB and object_store using this code:
//! Minimal repro for a "partial body without error" scenario using `hyper` only.
//!
//! The test:
//! - Serves an HTTP/1.1 response with a `Content-Length` of `expected_len`, but only writes
//! `first_chunk_len` bytes of the body.
//! - Reads the first body frame successfully.
//! - Drops the Tokio runtime driving the client connection.
//! - Observes that collecting the remainder of the body returns `Ok`, but yields fewer bytes
//! than `Content-Length` advertised.
use bytes::Bytes;
use http_body_util::{BodyExt, Empty};
use hyper::body::Body;
use hyper::client::conn::http1;
use hyper::Request;
use hyper_util::rt::TokioIo;
use std::time::Duration;
use tokio::io::{duplex, AsyncReadExt, AsyncWriteExt};
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore]
async fn repro_hyper_partial_body_when_connection_runtime_dropped() {
// The server will claim it will send 1MiB...
let expected_len = 1024 * 1024;
// ...but will only actually send this many bytes before stalling.
let first_chunk_len = 8 * 1024;
// Use an in-memory duplex stream so this repro doesn't depend on real networking.
let (mut server_io, client_io) = duplex(64 * 1024);
let (server_exit_tx, server_exit_rx) = tokio::sync::oneshot::channel::<()>();
let server_task = tokio::spawn(async move {
// Read the incoming HTTP request headers until the blank line.
let mut buf = [0u8; 4096];
let mut request_bytes = Vec::new();
loop {
let n = server_io.read(&mut buf).await.unwrap();
if n == 0 {
return;
}
request_bytes.extend_from_slice(&buf[..n]);
if request_bytes.windows(4).any(|w| w == b"\r\n\r\n") {
break;
}
}
// Write a response with Content-Length=expected_len, but only write part of the body.
let header = format!(
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n",
expected_len
);
server_io.write_all(header.as_bytes()).await.unwrap();
server_io
.write_all(&vec![b'x'; first_chunk_len])
.await
.unwrap();
server_io.flush().await.unwrap();
// Keep the "connection" open (don't send EOF). The client-side truncation we want to
// demonstrate is induced by dropping the client's connection runtime, not by the server
// cleanly closing.
let _ = tokio::time::timeout(Duration::from_secs(99), server_exit_rx).await;
});
// Create a separate Tokio runtime that will exclusively drive hyper's connection task.
//
// This simulates the "runtime dropped the dispatch task" / "runtime dropped the dispatch
// task" failure mode: the caller continues running, but the runtime responsible for I/O
// gets shut down.
let io_rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.build()
.unwrap();
let mut io_rt = Some(io_rt);
let io_handle = io_rt.as_ref().unwrap().handle().clone();
// Build a hyper HTTP/1 client connection on the I/O runtime, and send the `SendRequest`
// handle back to the test runtime.
let (sender_tx, sender_rx) = tokio::sync::oneshot::channel();
io_handle.spawn(async move {
let io = TokioIo::new(client_io);
let (sender, conn) = http1::handshake::<_, Empty<Bytes>>(io).await.unwrap();
// The connection future must be polled to drive I/O; spawn it onto the I/O runtime.
tokio::spawn(async move {
let _ = conn.await;
});
let _ = sender_tx.send(sender);
});
let mut sender = tokio::time::timeout(Duration::from_secs(5), sender_rx)
.await
.expect("timed out creating hyper client")
.expect("hyper client task dropped");
// Issue a request and start receiving a streamed response body.
let req = Request::builder()
.method("GET")
.uri("/")
.body(Empty::<Bytes>::new())
.unwrap();
let res = sender.send_request(req).await.unwrap();
assert_eq!(res.status(), hyper::StatusCode::OK);
let mut body = res.into_body();
// Hyper exposes the server's declared Content-Length through `size_hint().exact()`.
let hinted_len = body
.size_hint()
.exact()
.expect("missing Content-Length size_hint") as usize;
assert_eq!(hinted_len, expected_len);
// Read a first frame to prove the response body is flowing before we tear down the I/O runtime.
let first_frame = tokio::time::timeout(Duration::from_secs(5), body.frame())
.await
.expect("timed out reading first body frame")
.expect("body frame returned error")
.expect("body unexpectedly empty");
let first_bytes = first_frame.into_data().expect("first frame was not DATA");
assert!(!first_bytes.is_empty());
// Now drop the runtime that is driving hyper's connection task. This will abort the connection
// future and drop the internal sender side of the body stream.
io_rt.take().unwrap().shutdown_background();
// Collecting the remainder of the body *returns Ok*, but yields fewer bytes than expected_len.
// That is the bug being demonstrated (the caller sees EOF with no error despite Content-Length).
let remaining = tokio::time::timeout(Duration::from_secs(5), body.collect())
.await
.expect("timed out collecting remaining body")
.expect("collect returned error")
.to_bytes();
let total_len = first_bytes.len() + remaining.len();
// Shut down the server task so the test doesn't leak work on failure.
let _ = server_exit_tx.send(());
let _ = server_task.await;
assert_eq!(
total_len,
expected_len,
"bug not reproduced: body was not truncated (got {total_len} bytes, expected {expected_len})"
);
}I expected to see this code in the test:
let remaining = tokio::time::timeout(Duration::from_secs(5), body.collect())
.await
.expect("timed out collecting remaining body")
.expect("collect returned error")
.to_bytes();
Raise an expect() panic because the byte stream should have been marked closed after the tokio task disappeared. Instead, the body.collect() call appears to simply return an empty array.