Skip to content

Commit 6e96a85

Browse files
taiki-eNemo157
authored andcommitted
impl Sink for try_stream combinators
1 parent 64eb779 commit 6e96a85

File tree

4 files changed

+42
-0
lines changed

4 files changed

+42
-0
lines changed

futures-util/src/try_stream/err_into.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use core::marker::PhantomData;
22
use core::pin::Pin;
33
use futures_core::stream::{FusedStream, Stream, TryStream};
44
use futures_core::task::{Waker, Poll};
5+
use futures_sink::Sink;
56
use pin_utils::unsafe_pinned;
67

78
/// Stream for the [`err_into`](super::TryStreamExt::err_into) method.
@@ -43,3 +44,14 @@ where
4344
.map(|res| res.map(|some| some.map_err(Into::into)))
4445
}
4546
}
47+
48+
// Forwarding impl of Sink from the underlying stream
49+
impl<S, E, Item> Sink<Item> for ErrInto<S, E>
50+
where
51+
S: TryStream + Sink<Item>,
52+
S::Error: Into<E>,
53+
{
54+
type SinkError = S::SinkError;
55+
56+
delegate_sink!(stream, Item);
57+
}

futures-util/src/try_stream/into_stream.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use core::pin::Pin;
22
use futures_core::stream::{FusedStream, Stream, TryStream};
33
use futures_core::task::{Waker, Poll};
4+
use futures_sink::Sink;
45
use pin_utils::unsafe_pinned;
56

67
/// Stream for the [`into_stream`](super::TryStreamExt::into_stream) method.
@@ -53,3 +54,10 @@ impl<St: TryStream> Stream for IntoStream<St> {
5354
self.stream().try_poll_next(waker)
5455
}
5556
}
57+
58+
// Forwarding impl of Sink from the underlying stream
59+
impl<S: TryStream + Sink<Item>, Item> Sink<Item> for IntoStream<S> {
60+
type SinkError = S::SinkError;
61+
62+
delegate_sink!(stream, Item);
63+
}

futures-util/src/try_stream/try_buffer_unordered.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate::try_stream::IntoStream;
44
use futures_core::future::TryFuture;
55
use futures_core::stream::{Stream, TryStream};
66
use futures_core::task::{Waker, Poll};
7+
use futures_sink::Sink;
78
use pin_utils::{unsafe_pinned, unsafe_unpinned};
89
use core::pin::Pin;
910

@@ -96,3 +97,13 @@ impl<St> Stream for TryBufferUnordered<St>
9697
}
9798
}
9899
}
100+
101+
// Forwarding impl of Sink from the underlying stream
102+
impl<S, Item> Sink<Item> for TryBufferUnordered<S>
103+
where S: TryStream + Sink<Item>,
104+
S::Ok: TryFuture<Error = S::Error>,
105+
{
106+
type SinkError = S::SinkError;
107+
108+
delegate_sink!(stream, Item);
109+
}

futures-util/src/try_stream/try_filter_map.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use core::pin::Pin;
22
use futures_core::future::{TryFuture};
33
use futures_core::stream::{Stream, TryStream};
44
use futures_core::task::{Waker, Poll};
5+
use futures_sink::Sink;
56
use pin_utils::{unsafe_pinned, unsafe_unpinned};
67

78
/// Stream for the [`try_filter_map`](super::TryStreamExt::try_filter_map)
@@ -84,3 +85,13 @@ impl<St, Fut, F, T> Stream for TryFilterMap<St, Fut, F>
8485
}
8586
}
8687

88+
// Forwarding impl of Sink from the underlying stream
89+
impl<S, Fut, F, T, Item> Sink<Item> for TryFilterMap<S, Fut, F>
90+
where S: TryStream + Sink<Item>,
91+
Fut: TryFuture<Ok = Option<T>, Error = S::Error>,
92+
F: FnMut(S::Ok) -> Fut,
93+
{
94+
type SinkError = S::SinkError;
95+
96+
delegate_sink!(stream, Item);
97+
}

0 commit comments

Comments
 (0)