Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,6 @@ jobs:
- uses: dtolnay/rust-toolchain@stable
with:
components: clippy
- name: Install protoc
uses: taiki-e/install-action@v2
with:
tool: protoc@3.20.3
- run: cargo clippy --workspace --all-features --lib --bins --examples -- -D warnings

deny-check:
Expand Down
8 changes: 7 additions & 1 deletion examples/axum-key-value-store/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ use std::{
use tokio::net::TcpListener;
use tower::ServiceBuilder;
use tower_http::{
on_early_drop::{EarlyDropsAsFailures, OnEarlyDropLayer},
timeout::TimeoutLayer,
trace::{DefaultMakeSpan, DefaultOnResponse, TraceLayer},
trace::{DefaultMakeSpan, DefaultOnFailure, DefaultOnResponse, TraceLayer},
LatencyUnit, ServiceBuilderExt,
};

Expand Down Expand Up @@ -74,6 +75,11 @@ fn app() -> Router {
.make_span_with(DefaultMakeSpan::new().include_headers(true))
.on_response(DefaultOnResponse::new().include_headers(true).latency_unit(LatencyUnit::Micros)),
)
// Report clients that disconnect before the response completes.
// Fires inside the TraceLayer span so events carry the request context.
.layer(OnEarlyDropLayer::new(EarlyDropsAsFailures::new(
DefaultOnFailure::default(),
)))
.sensitive_response_headers(sensitive_headers)
// Set a timeout
.layer(TimeoutLayer::with_status_code(StatusCode::REQUEST_TIMEOUT, Duration::from_secs(10)))
Expand Down
68 changes: 66 additions & 2 deletions tower-http/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,82 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

# Unreleased

## Fixed

- Restore `tokio` and `async-compression` as no-op features. These will be
removed next breaking release

# 0.6.9

## Added:

- `on-early-drop`: middleware that detects when a response future or response
body is dropped before completion ([#636])

Two events get hooks: the response future being dropped before
the inner service produces a response, and the response body being
dropped before reaching end-of-stream.

Install custom callbacks with `OnEarlyDropLayer::builder()`:

```rust
use http::Request;
use tower_http::on_early_drop::{OnBodyDropFn, OnEarlyDropLayer};

let layer = OnEarlyDropLayer::builder()
.on_future_drop(|req: &Request<()>| {
let uri = req.uri().clone();
move || eprintln!("future dropped for {}", uri)
})
.on_body_drop(OnBodyDropFn::new(|req: &Request<()>| {
let uri = req.uri().clone();
move |parts: &http::response::Parts| {
let status = parts.status;
move || eprintln!("body dropped for {} status {}", uri, status)
}
}));
```

Or route both events through a `trace::OnFailure` hook with
`EarlyDropsAsFailures`. Place this layer inside a `TraceLayer` so the
emitted events inherit the request span:

```rust
use tower::ServiceBuilder;
use tower_http::on_early_drop::{OnEarlyDropLayer, EarlyDropsAsFailures};
use tower_http::trace::{DefaultOnFailure, TraceLayer};

let stack = ServiceBuilder::new()
.layer(TraceLayer::new_for_http())
.layer(OnEarlyDropLayer::new(
EarlyDropsAsFailures::new(DefaultOnFailure::default()),
));
```
- `fs`: make `AsyncReadBody::with_capacity` public ([#415])

## Changed:

- The implicit `async-compression` feature is removed (BREAKING) ([#642])
- The implicit `tokio` feature is removed (BREAKING) ([#628])
- The implicit `async-compression` feature is removed ([#642])
- The implicit `tokio` feature is removed ([#628])
- `fs`: no longer auto-enables the `tracing` crate feature; enable `tracing`
explicitly to restore error logging on `ServeDir` IO failures ([#614])

## Fixed

- `trace`: restore failure classification at end-of-stream ([#483])
- `follow-redirect`: support unicode URLs (swaps `iri-string` dep for
`url`) ([#646])
- `fs`: reject reserved Windows DOS device names (`CON`, `COM1`, etc.) in
`ServeDir` ([#663])

[#415]: https://github.com/tower-rs/tower-http/pull/415
[#483]: https://github.com/tower-rs/tower-http/pull/483
[#614]: https://github.com/tower-rs/tower-http/pull/614
[#628]: https://github.com/tower-rs/tower-http/pull/628
[#636]: https://github.com/tower-rs/tower-http/pull/636
[#642]: https://github.com/tower-rs/tower-http/pull/642
[#646]: https://github.com/tower-rs/tower-http/pull/646
[#663]: https://github.com/tower-rs/tower-http/pull/663

# 0.6.8

Expand Down
9 changes: 8 additions & 1 deletion tower-http/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "tower-http"
description = "Tower middleware and utilities for HTTP clients and servers"
version = "0.6.8"
version = "0.6.9"
authors = ["Tower Maintainers <team@tower-rs.com>"]
edition = "2018"
license = "MIT"
Expand Down Expand Up @@ -72,6 +72,7 @@ full = [
"map-response-body",
"metrics",
"normalize-path",
"on-early-drop",
"propagate-header",
"redirect",
"request-id",
Expand All @@ -95,6 +96,7 @@ map-request-body = []
map-response-body = []
metrics = ["dep:http-body", "dep:tokio", "tokio?/time"]
normalize-path = []
on-early-drop = ["dep:http-body"]
propagate-header = []
redirect = []
request-id = ["uuid"]
Expand All @@ -118,6 +120,11 @@ decompression-full = ["decompression-br", "decompression-deflate", "decompressio
decompression-gzip = ["dep:async-compression", "async-compression?/gzip", "futures-core", "dep:http-body", "dep:http-body-util", "tokio-util", "dep:tokio"]
decompression-zstd = ["dep:async-compression", "async-compression?/zstd", "futures-core", "dep:http-body", "dep:http-body-util", "tokio-util", "dep:tokio"]

# FIXME: rip this out come 0.7.0.
# ref: https://github.com/tower-rs/tower-http/pull/666#issuecomment-4382555061
tokio = []
async-compression = []

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
Expand Down
3 changes: 3 additions & 0 deletions tower-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,9 @@ pub mod timeout;
#[cfg(feature = "normalize-path")]
pub mod normalize_path;

#[cfg(feature = "on-early-drop")]
pub mod on_early_drop;

pub mod classify;
pub mod services;

Expand Down
80 changes: 80 additions & 0 deletions tower-http/src/on_early_drop/body.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
//! Response body wrapper for [`OnEarlyDropService`].
//!
//! [`OnEarlyDropService`]: super::OnEarlyDropService

use crate::on_early_drop::guard::OnEarlyDropGuard;
use crate::on_early_drop::traits::OnDropCallback;
use http_body::{Body, Frame};
use pin_project_lite::pin_project;
use std::{
pin::Pin,
task::{ready, Context, Poll},
};

pin_project! {
/// Response body for [`OnEarlyDropService`]. Fires its callback if
/// dropped before reaching end-of-stream.
///
/// Bodies that already report [`Body::is_end_stream`] at construction
/// (HEAD requests, 204 responses, etc.) never fire.
///
/// [`OnEarlyDropService`]: super::OnEarlyDropService
pub struct OnEarlyDropBody<B, Callback>
where
Callback: OnDropCallback,
{
#[pin]
inner: B,
guard: OnEarlyDropGuard<Callback>,
}
}

impl<B, Callback> OnEarlyDropBody<B, Callback>
where
Callback: OnDropCallback,
{
/// Wrap `body` with a drop callback.
pub(crate) fn new(body: B, callback: Callback) -> Self
where
B: Body,
{
let mut guard = OnEarlyDropGuard::new(callback);
if body.is_end_stream() {
guard.completed();
}
Self { inner: body, guard }
}
}

impl<B, Callback> Body for OnEarlyDropBody<B, Callback>
where
B: Body,
Callback: OnDropCallback,
{
type Data = B::Data;
type Error = B::Error;

fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let this = self.project();
let result = ready!(this.inner.poll_frame(cx));
// End-of-stream (Ready(None)) or body-level error (Ready(Some(Err)))
// both mean the body will not yield more frames. Suppress the guard
// in either case; service-level errors are out of scope for this
// middleware.
if matches!(result, None | Some(Err(_))) {
this.guard.completed();
}
Poll::Ready(result)
}

fn is_end_stream(&self) -> bool {
self.inner.is_end_stream()
}

fn size_hint(&self) -> http_body::SizeHint {
self.inner.size_hint()
}
}
Loading
Loading