Skip to content

Commit 1bc4ed5

Browse files
committed
Change SinkExt::send_all to take stream by value
1 parent 74ff5a1 commit 1bc4ed5

File tree

3 files changed

+38
-43
lines changed

3 files changed

+38
-43
lines changed

futures-util/src/sink/mod.rs

+4-7
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
use crate::future::{assert_future, Either};
1010
use core::pin::Pin;
1111
use futures_core::future::Future;
12-
use futures_core::stream::{Stream, TryStream};
12+
use futures_core::stream::Stream;
1313
use futures_core::task::{Context, Poll};
1414

1515
#[cfg(feature = "compat")]
@@ -248,15 +248,12 @@ pub trait SinkExt<Item>: Sink<Item> {
248248
/// Doing `sink.send_all(stream)` is roughly equivalent to
249249
/// `stream.forward(sink)`. The returned future will exhaust all items from
250250
/// `stream` and send them to `self`.
251-
fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St>
251+
fn send_all<St>(&mut self, stream: St) -> SendAll<'_, Self, St>
252252
where
253-
St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized,
254-
// St: Stream<Item = Result<Item, Self::Error>> + Unpin + ?Sized,
253+
St: Stream<Item = Result<Item, Self::Error>>,
255254
Self: Unpin,
256255
{
257-
// TODO: type mismatch resolving `<St as Stream>::Item == std::result::Result<Item, <Self as futures_sink::Sink<Item>>::Error>`
258-
// assert_future::<Result<(), Self::Error>, _>(SendAll::new(self, stream))
259-
SendAll::new(self, stream)
256+
assert_future::<Result<(), Self::Error>, _>(SendAll::new(self, stream))
260257
}
261258

262259
/// Wrap this sink in an `Either` sink, making it the left-hand variant

futures-util/src/sink/send_all.rs

+33-35
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,33 @@
1-
use crate::stream::{StreamExt, TryStreamExt, Fuse};
1+
use crate::stream::{StreamExt, Fuse};
22
use core::fmt;
33
use core::pin::Pin;
44
use futures_core::future::Future;
55
use futures_core::ready;
66
use futures_core::stream::{TryStream, Stream};
77
use futures_core::task::{Context, Poll};
88
use futures_sink::Sink;
9+
use pin_project_lite::pin_project;
910

10-
/// Future for the [`send_all`](super::SinkExt::send_all) method.
11-
#[allow(explicit_outlives_requirements)] // https://github.com/rust-lang/rust/issues/60993
12-
#[must_use = "futures do nothing unless you `.await` or poll them"]
13-
pub struct SendAll<'a, Si, St>
14-
where
15-
Si: ?Sized,
16-
St: ?Sized + TryStream,
17-
{
18-
sink: &'a mut Si,
19-
stream: Fuse<&'a mut St>,
20-
buffered: Option<St::Ok>,
11+
pin_project! {
12+
/// Future for the [`send_all`](super::SinkExt::send_all) method.
13+
#[allow(explicit_outlives_requirements)] // https://github.com/rust-lang/rust/issues/60993
14+
#[must_use = "futures do nothing unless you `.await` or poll them"]
15+
pub struct SendAll<'a, Si, St>
16+
where
17+
Si: ?Sized,
18+
St: TryStream,
19+
{
20+
sink: &'a mut Si,
21+
#[pin]
22+
stream: Fuse<St>,
23+
buffered: Option<St::Ok>,
24+
}
2125
}
2226

2327
impl<Si, St> fmt::Debug for SendAll<'_, Si, St>
2428
where
2529
Si: fmt::Debug + ?Sized,
26-
St: fmt::Debug + ?Sized + TryStream,
30+
St: fmt::Debug + TryStream,
2731
St::Ok: fmt::Debug,
2832
{
2933
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
@@ -35,21 +39,14 @@ where
3539
}
3640
}
3741

38-
// Pinning is never projected to any fields
39-
impl<Si, St> Unpin for SendAll<'_, Si, St>
40-
where
41-
Si: Unpin + ?Sized,
42-
St: TryStream + Unpin + ?Sized,
43-
{}
44-
4542
impl<'a, Si, St, Ok, Error> SendAll<'a, Si, St>
4643
where
4744
Si: Sink<Ok, Error = Error> + Unpin + ?Sized,
48-
St: TryStream<Ok = Ok, Error = Error> + Stream + Unpin + ?Sized,
45+
St: TryStream<Ok = Ok, Error = Error> + Stream,
4946
{
5047
pub(super) fn new(
5148
sink: &'a mut Si,
52-
stream: &'a mut St,
49+
stream: St,
5350
) -> Self {
5451
Self {
5552
sink,
@@ -59,17 +56,18 @@ where
5956
}
6057

6158
fn try_start_send(
62-
&mut self,
59+
self: Pin<&mut Self>,
6360
cx: &mut Context<'_>,
6461
item: St::Ok,
6562
) -> Poll<Result<(), Si::Error>> {
66-
debug_assert!(self.buffered.is_none());
67-
match Pin::new(&mut self.sink).poll_ready(cx)? {
63+
let this = self.project();
64+
debug_assert!(this.buffered.is_none());
65+
match Pin::new(&mut *this.sink).poll_ready(cx)? {
6866
Poll::Ready(()) => {
69-
Poll::Ready(Pin::new(&mut self.sink).start_send(item))
67+
Poll::Ready(Pin::new(&mut *this.sink).start_send(item))
7068
}
7169
Poll::Pending => {
72-
self.buffered = Some(item);
70+
*this.buffered = Some(item);
7371
Poll::Pending
7472
}
7573
}
@@ -79,32 +77,32 @@ where
7977
impl<Si, St, Ok, Error> Future for SendAll<'_, Si, St>
8078
where
8179
Si: Sink<Ok, Error = Error> + Unpin + ?Sized,
82-
St: Stream<Item = Result<Ok, Error>> + Unpin + ?Sized,
80+
St: Stream<Item = Result<Ok, Error>>,
8381
{
8482
type Output = Result<(), Error>;
8583

8684
fn poll(
8785
mut self: Pin<&mut Self>,
8886
cx: &mut Context<'_>,
8987
) -> Poll<Self::Output> {
90-
let this = &mut *self;
9188
// If we've got an item buffered already, we need to write it to the
9289
// sink before we can do anything else
93-
if let Some(item) = this.buffered.take() {
94-
ready!(this.try_start_send(cx, item))?
90+
if let Some(item) = self.as_mut().project().buffered.take() {
91+
ready!(self.as_mut().try_start_send(cx, item))?
9592
}
9693

9794
loop {
98-
match this.stream.try_poll_next_unpin(cx)? {
95+
let this = self.as_mut().project();
96+
match this.stream.try_poll_next(cx)? {
9997
Poll::Ready(Some(item)) => {
100-
ready!(this.try_start_send(cx, item))?
98+
ready!(self.as_mut().try_start_send(cx, item))?
10199
}
102100
Poll::Ready(None) => {
103-
ready!(Pin::new(&mut this.sink).poll_flush(cx))?;
101+
ready!(Pin::new(this.sink).poll_flush(cx))?;
104102
return Poll::Ready(Ok(()))
105103
}
106104
Poll::Pending => {
107-
ready!(Pin::new(&mut this.sink).poll_flush(cx))?;
105+
ready!(Pin::new(this.sink).poll_flush(cx))?;
108106
return Poll::Pending
109107
}
110108
}

futures/tests/auto_traits.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1018,7 +1018,7 @@ pub mod sink {
10181018
assert_not_impl!(SendAll<'_, (), LocalTryStream>: Sync);
10191019
assert_not_impl!(SendAll<'_, *const (), SyncTryStream<()>>: Sync);
10201020
assert_impl!(SendAll<'_, (), UnpinTryStream>: Unpin);
1021-
assert_not_impl!(SendAll<'_, PhantomPinned, UnpinTryStream>: Unpin);
1021+
assert_impl!(SendAll<'_, PhantomPinned, UnpinTryStream>: Unpin);
10221022
assert_not_impl!(SendAll<'_, (), PinnedTryStream>: Unpin);
10231023

10241024
assert_impl!(SinkErrInto<SendSink, *const (), *const ()>: Send);

0 commit comments

Comments
 (0)