Skip to content

Commit 9b36582

Browse files
committed
Merge branch 'master' into fs-stream-enumerate
2 parents cdd4215 + 2ecaf18 commit 9b36582

15 files changed

+266
-70
lines changed

docs/src/tutorial/all_together.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ async fn broker(mut events: Receiver<Event>) -> Result<()> {
115115
Event::Message { from, to, msg } => {
116116
for addr in to {
117117
if let Some(peer) = peers.get_mut(&addr) {
118-
peer.send(format!("from {}: {}\n", from, msg)).await?
118+
let msg = format!("from {}: {}\n", from, msg);
119+
peer.send(msg).await?
119120
}
120121
}
121122
}

docs/src/tutorial/clean_shutdown.md

+4-2
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ Let's add waiting to the server:
115115
# Event::Message { from, to, msg } => {
116116
# for addr in to {
117117
# if let Some(peer) = peers.get_mut(&addr) {
118-
# peer.send(format!("from {}: {}\n", from, msg)).await?
118+
# let msg = format!("from {}: {}\n", from, msg);
119+
# peer.send(msg).await?
119120
# }
120121
# }
121122
# }
@@ -217,7 +218,8 @@ async fn broker(mut events: Receiver<Event>) -> Result<()> {
217218
Event::Message { from, to, msg } => {
218219
for addr in to {
219220
if let Some(peer) = peers.get_mut(&addr) {
220-
peer.send(format!("from {}: {}\n", from, msg)).await?
221+
let msg = format!("from {}: {}\n", from, msg);
222+
peer.send(msg).await?
221223
}
222224
}
223225
}

docs/src/tutorial/connecting_readers_and_writers.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ async fn broker(mut events: Receiver<Event>) -> Result<()> {
7373
Event::Message { from, to, msg } => { // 3
7474
for addr in to {
7575
if let Some(peer) = peers.get_mut(&addr) {
76-
peer.send(format!("from {}: {}\n", from, msg)).await?
76+
let msg = format!("from {}: {}\n", from, msg);
77+
peer.send(msg).await?
7778
}
7879
}
7980
}

docs/src/tutorial/handling_disconnection.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,8 @@ async fn broker(events: Receiver<Event>) {
257257
Event::Message { from, to, msg } => {
258258
for addr in to {
259259
if let Some(peer) = peers.get_mut(&addr) {
260-
peer.send(format!("from {}: {}\n", from, msg)).await
260+
let msg = format!("from {}: {}\n", from, msg);
261+
peer.send(msg).await
261262
.unwrap() // 6
262263
}
263264
}

examples/a-chat/server.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,8 @@ async fn broker_loop(mut events: Receiver<Event>) {
139139
Event::Message { from, to, msg } => {
140140
for addr in to {
141141
if let Some(peer) = peers.get_mut(&addr) {
142-
peer.send(format!("from {}: {}\n", from, msg))
143-
.await
144-
.unwrap()
142+
let msg = format!("from {}: {}\n", from, msg);
143+
peer.send(msg).await.unwrap();
145144
}
146145
}
147146
}

examples/socket-timeouts.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ async fn get() -> io::Result<Vec<u8>> {
1010

1111
let mut buf = vec![];
1212

13-
io::timeout(Duration::from_secs(5), async {
13+
io::timeout(Duration::from_secs(5), async move {
1414
stream.read_to_end(&mut buf).await?;
1515
Ok(buf)
1616
})

src/io/buf_read/fill_buf.rs

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
use std::pin::Pin;
2+
3+
use futures_io::AsyncBufRead;
4+
5+
use crate::future::Future;
6+
use crate::io;
7+
use crate::task::{Context, Poll};
8+
9+
#[doc(hidden)]
10+
#[allow(missing_debug_implementations)]
11+
pub struct FillBufFuture<'a, R: ?Sized> {
12+
reader: &'a mut R,
13+
}
14+
15+
impl<'a, R: ?Sized> FillBufFuture<'a, R> {
16+
pub(crate) fn new(reader: &'a mut R) -> Self {
17+
Self { reader }
18+
}
19+
}
20+
21+
impl<'a, R: AsyncBufRead + Unpin + ?Sized> Future for FillBufFuture<'a, R> {
22+
type Output = io::Result<&'a [u8]>;
23+
24+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&'a [u8]>> {
25+
let Self { reader } = &mut *self;
26+
let result = Pin::new(reader).poll_fill_buf(cx);
27+
// This is safe because:
28+
// 1. The buffer is valid for the lifetime of the reader.
29+
// 2. Output is unrelated to the wrapper (Self).
30+
result.map_ok(|buf| unsafe { std::mem::transmute::<&'_ [u8], &'a [u8]>(buf) })
31+
}
32+
}

src/io/buf_read/mod.rs

+22
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
mod fill_buf;
12
mod lines;
23
mod read_line;
34
mod read_until;
45

6+
use fill_buf::FillBufFuture;
57
pub use lines::Lines;
68
use read_line::ReadLineFuture;
79
use read_until::ReadUntilFuture;
@@ -41,6 +43,26 @@ cfg_if! {
4143
/// [`futures::io::AsyncBufRead`]:
4244
/// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/io/trait.AsyncBufRead.html
4345
pub trait BufRead {
46+
/// Returns the contents of the internal buffer, filling it with more data from the inner
47+
/// reader if it is empty.
48+
///
49+
/// This function is a lower-level call. It needs to be paired with the [`consume`] method to
50+
/// function properly. When calling this method, none of the contents will be "read" in the
51+
/// sense that later calling `read` may return the same contents. As such, [`consume`] must be
52+
/// called with the number of bytes that are consumed from this buffer to ensure that the bytes
53+
/// are never returned twice.
54+
///
55+
/// [`consume`]: #tymethod.consume
56+
///
57+
/// An empty buffer returned indicates that the stream has reached EOF.
58+
// TODO: write a proper doctest with `consume`
59+
fn fill_buf<'a>(&'a mut self) -> ret!('a, FillBufFuture, io::Result<&'a [u8]>)
60+
where
61+
Self: Unpin,
62+
{
63+
FillBufFuture::new(self)
64+
}
65+
4466
/// Reads all bytes into `buf` until the delimiter `byte` or EOF is reached.
4567
///
4668
/// This function will read bytes from the underlying stream until the delimiter or EOF is

src/stream/stream/all.rs

+15-22
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,36 @@
1-
use crate::future::Future;
2-
use crate::task::{Context, Poll};
3-
41
use std::marker::PhantomData;
52
use std::pin::Pin;
63

7-
#[derive(Debug)]
8-
pub struct AllFuture<'a, S, F, T>
9-
where
10-
F: FnMut(T) -> bool,
11-
{
4+
use crate::future::Future;
5+
use crate::stream::Stream;
6+
use crate::task::{Context, Poll};
7+
8+
#[doc(hidden)]
9+
#[allow(missing_debug_implementations)]
10+
pub struct AllFuture<'a, S, F, T> {
1211
pub(crate) stream: &'a mut S,
1312
pub(crate) f: F,
1413
pub(crate) result: bool,
15-
pub(crate) __item: PhantomData<T>,
14+
pub(crate) _marker: PhantomData<T>,
1615
}
1716

18-
impl<'a, S, F, T> AllFuture<'a, S, F, T>
19-
where
20-
F: FnMut(T) -> bool,
21-
{
22-
pin_utils::unsafe_pinned!(stream: &'a mut S);
23-
pin_utils::unsafe_unpinned!(result: bool);
24-
pin_utils::unsafe_unpinned!(f: F);
25-
}
17+
impl<S: Unpin, F, T> Unpin for AllFuture<'_, S, F, T> {}
2618

2719
impl<S, F> Future for AllFuture<'_, S, F, S::Item>
2820
where
29-
S: futures_core::stream::Stream + Unpin + Sized,
21+
S: Stream + Unpin + Sized,
3022
F: FnMut(S::Item) -> bool,
3123
{
3224
type Output = bool;
3325

3426
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
35-
use futures_core::stream::Stream;
36-
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
27+
let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx));
28+
3729
match next {
3830
Some(v) => {
39-
let result = (self.as_mut().f())(v);
40-
*self.as_mut().result() = result;
31+
let result = (&mut self.f)(v);
32+
self.result = result;
33+
4134
if result {
4235
// don't forget to wake this task again to pull the next item from stream
4336
cx.waker().wake_by_ref();

src/stream/stream/any.rs

+15-22
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,36 @@
1-
use crate::future::Future;
2-
use crate::task::{Context, Poll};
3-
41
use std::marker::PhantomData;
52
use std::pin::Pin;
63

7-
#[derive(Debug)]
8-
pub struct AnyFuture<'a, S, F, T>
9-
where
10-
F: FnMut(T) -> bool,
11-
{
4+
use crate::future::Future;
5+
use crate::stream::Stream;
6+
use crate::task::{Context, Poll};
7+
8+
#[doc(hidden)]
9+
#[allow(missing_debug_implementations)]
10+
pub struct AnyFuture<'a, S, F, T> {
1211
pub(crate) stream: &'a mut S,
1312
pub(crate) f: F,
1413
pub(crate) result: bool,
15-
pub(crate) __item: PhantomData<T>,
14+
pub(crate) _marker: PhantomData<T>,
1615
}
1716

18-
impl<'a, S, F, T> AnyFuture<'a, S, F, T>
19-
where
20-
F: FnMut(T) -> bool,
21-
{
22-
pin_utils::unsafe_pinned!(stream: &'a mut S);
23-
pin_utils::unsafe_unpinned!(result: bool);
24-
pin_utils::unsafe_unpinned!(f: F);
25-
}
17+
impl<S: Unpin, F, T> Unpin for AnyFuture<'_, S, F, T> {}
2618

2719
impl<S, F> Future for AnyFuture<'_, S, F, S::Item>
2820
where
29-
S: futures_core::stream::Stream + Unpin + Sized,
21+
S: Stream + Unpin + Sized,
3022
F: FnMut(S::Item) -> bool,
3123
{
3224
type Output = bool;
3325

3426
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
35-
use futures_core::stream::Stream;
36-
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
27+
let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx));
28+
3729
match next {
3830
Some(v) => {
39-
let result = (self.as_mut().f())(v);
40-
*self.as_mut().result() = result;
31+
let result = (&mut self.f)(v);
32+
self.result = result;
33+
4134
if result {
4235
Poll::Ready(true)
4336
} else {

src/stream/stream/find.rs

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
use crate::task::{Context, Poll};
2+
use std::marker::PhantomData;
3+
use std::pin::Pin;
4+
5+
#[doc(hidden)]
6+
#[allow(missing_debug_implementations)]
7+
pub struct FindFuture<'a, S, P, T> {
8+
stream: &'a mut S,
9+
p: P,
10+
__t: PhantomData<T>,
11+
}
12+
13+
impl<'a, S, P, T> FindFuture<'a, S, P, T> {
14+
pin_utils::unsafe_pinned!(stream: &'a mut S);
15+
pin_utils::unsafe_unpinned!(p: P);
16+
17+
pub(super) fn new(stream: &'a mut S, p: P) -> Self {
18+
FindFuture {
19+
stream,
20+
p,
21+
__t: PhantomData,
22+
}
23+
}
24+
}
25+
26+
impl<'a, S, P> futures_core::future::Future for FindFuture<'a, S, P, S::Item>
27+
where
28+
S: futures_core::stream::Stream + Unpin + Sized,
29+
P: FnMut(&S::Item) -> bool,
30+
{
31+
type Output = Option<S::Item>;
32+
33+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
34+
use futures_core::stream::Stream;
35+
36+
let item = futures_core::ready!(self.as_mut().stream().poll_next(cx));
37+
38+
match item {
39+
Some(v) => match (self.as_mut().p())(&v) {
40+
true => Poll::Ready(Some(v)),
41+
false => {
42+
cx.waker().wake_by_ref();
43+
Poll::Pending
44+
}
45+
},
46+
None => Poll::Ready(None),
47+
}
48+
}
49+
}

src/stream/stream/min_by.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ use std::cmp::Ordering;
22
use std::pin::Pin;
33

44
use crate::future::Future;
5+
use crate::stream::Stream;
56
use crate::task::{Context, Poll};
67

8+
#[doc(hidden)]
79
#[allow(missing_debug_implementations)]
810
pub struct MinByFuture<S, F, T> {
911
stream: S,
@@ -27,7 +29,7 @@ impl<S, F, T> MinByFuture<S, F, T> {
2729

2830
impl<S, F> Future for MinByFuture<S, F, S::Item>
2931
where
30-
S: futures_core::stream::Stream + Unpin + Sized,
32+
S: Stream + Unpin + Sized,
3133
S::Item: Copy,
3234
F: FnMut(&S::Item, &S::Item) -> Ordering,
3335
{

0 commit comments

Comments
 (0)