Skip to content

Split io into multiple files #151

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 14 additions & 185 deletions src/io/read.rs → src/io/read/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
use std::io::IoSliceMut;
mod read;
mod read_exact;
mod read_to_end;
mod read_to_string;
mod read_vectored;

use read::ReadFuture;
use read_exact::ReadExactFuture;
use read_to_end::{read_to_end_internal, ReadToEndFuture};
use read_to_string::ReadToStringFuture;
use read_vectored::ReadVectoredFuture;

use std::io;
use std::mem;
use std::pin::Pin;
use std::str;

use cfg_if::cfg_if;
use futures_io::AsyncRead;

use crate::future::Future;
use crate::io;
use crate::task::{Context, Poll};

cfg_if! {
if #[cfg(feature = "docs")] {
#[doc(hidden)]
Expand Down Expand Up @@ -80,7 +86,7 @@ pub trait Read {
/// [`read`]: #tymethod.read
fn read_vectored<'a>(
&'a mut self,
bufs: &'a mut [IoSliceMut<'a>],
bufs: &'a mut [io::IoSliceMut<'a>],
) -> ret!('a, ReadVectoredFuture, io::Result<usize>)
where
Self: Unpin,
Expand Down Expand Up @@ -215,180 +221,3 @@ impl<T: AsyncRead + Unpin + ?Sized> Read for T {
ReadFuture { reader: self, buf }
}
}

#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct ReadFuture<'a, T: Unpin + ?Sized> {
reader: &'a mut T,
buf: &'a mut [u8],
}

impl<T: AsyncRead + Unpin + ?Sized> Future for ReadFuture<'_, T> {
type Output = io::Result<usize>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self { reader, buf } = &mut *self;
Pin::new(reader).poll_read(cx, buf)
}
}

#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct ReadVectoredFuture<'a, T: Unpin + ?Sized> {
reader: &'a mut T,
bufs: &'a mut [IoSliceMut<'a>],
}

impl<T: AsyncRead + Unpin + ?Sized> Future for ReadVectoredFuture<'_, T> {
type Output = io::Result<usize>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self { reader, bufs } = &mut *self;
Pin::new(reader).poll_read_vectored(cx, bufs)
}
}

#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct ReadToEndFuture<'a, T: Unpin + ?Sized> {
reader: &'a mut T,
buf: &'a mut Vec<u8>,
start_len: usize,
}

impl<T: AsyncRead + Unpin + ?Sized> Future for ReadToEndFuture<'_, T> {
type Output = io::Result<usize>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self {
reader,
buf,
start_len,
} = &mut *self;
read_to_end_internal(Pin::new(reader), cx, buf, *start_len)
}
}

#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct ReadToStringFuture<'a, T: Unpin + ?Sized> {
reader: &'a mut T,
buf: &'a mut String,
bytes: Vec<u8>,
start_len: usize,
}

impl<T: AsyncRead + Unpin + ?Sized> Future for ReadToStringFuture<'_, T> {
type Output = io::Result<usize>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self {
reader,
buf,
bytes,
start_len,
} = &mut *self;
let reader = Pin::new(reader);

let ret = futures_core::ready!(read_to_end_internal(reader, cx, bytes, *start_len));
if str::from_utf8(&bytes).is_err() {
Poll::Ready(ret.and_then(|_| {
Err(io::Error::new(
io::ErrorKind::InvalidData,
"stream did not contain valid UTF-8",
))
}))
} else {
debug_assert!(buf.is_empty());
// Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`.
mem::swap(unsafe { buf.as_mut_vec() }, bytes);
Poll::Ready(ret)
}
}
}

#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct ReadExactFuture<'a, T: Unpin + ?Sized> {
reader: &'a mut T,
buf: &'a mut [u8],
}

impl<T: AsyncRead + Unpin + ?Sized> Future for ReadExactFuture<'_, T> {
type Output = io::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self { reader, buf } = &mut *self;

while !buf.is_empty() {
let n = futures_core::ready!(Pin::new(&mut *reader).poll_read(cx, buf))?;
let (_, rest) = mem::replace(buf, &mut []).split_at_mut(n);
*buf = rest;

if n == 0 {
return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into()));
}
}

Poll::Ready(Ok(()))
}
}

// This uses an adaptive system to extend the vector when it fills. We want to
// avoid paying to allocate and zero a huge chunk of memory if the reader only
// has 4 bytes while still making large reads if the reader does have a ton
// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
// time is 4,500 times (!) slower than this if the reader has a very small
// amount of data to return.
//
// Because we're extending the buffer with uninitialized data for trusted
// readers, we need to make sure to truncate that if any of this panics.
pub fn read_to_end_internal<R: AsyncRead + ?Sized>(
mut rd: Pin<&mut R>,
cx: &mut Context<'_>,
buf: &mut Vec<u8>,
start_len: usize,
) -> Poll<io::Result<usize>> {
struct Guard<'a> {
buf: &'a mut Vec<u8>,
len: usize,
}

impl Drop for Guard<'_> {
fn drop(&mut self) {
unsafe {
self.buf.set_len(self.len);
}
}
}

let mut g = Guard {
len: buf.len(),
buf,
};
let ret;
loop {
if g.len == g.buf.len() {
unsafe {
g.buf.reserve(32);
let capacity = g.buf.capacity();
g.buf.set_len(capacity);
rd.initializer().initialize(&mut g.buf[g.len..]);
}
}

match futures_core::ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) {
Ok(0) => {
ret = Poll::Ready(Ok(g.len - start_len));
break;
}
Ok(n) => g.len += n,
Err(e) => {
ret = Poll::Ready(Err(e));
break;
}
}
}

ret
}
23 changes: 23 additions & 0 deletions src/io/read/read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use crate::future::Future;
use crate::task::{Context, Poll};

use std::io;
use std::pin::Pin;

use futures_io::AsyncRead;

#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct ReadFuture<'a, T: Unpin + ?Sized> {
pub(crate) reader: &'a mut T,
pub(crate) buf: &'a mut [u8],
}

impl<T: AsyncRead + Unpin + ?Sized> Future for ReadFuture<'_, T> {
type Output = io::Result<usize>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self { reader, buf } = &mut *self;
Pin::new(reader).poll_read(cx, buf)
}
}
35 changes: 35 additions & 0 deletions src/io/read/read_exact.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use crate::future::Future;
use crate::task::{Context, Poll};

use std::io;
use std::mem;
use std::pin::Pin;

use futures_io::AsyncRead;

#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct ReadExactFuture<'a, T: Unpin + ?Sized> {
pub(crate) reader: &'a mut T,
pub(crate) buf: &'a mut [u8],
}

impl<T: AsyncRead + Unpin + ?Sized> Future for ReadExactFuture<'_, T> {
type Output = io::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self { reader, buf } = &mut *self;

while !buf.is_empty() {
let n = futures_core::ready!(Pin::new(&mut *reader).poll_read(cx, buf))?;
let (_, rest) = mem::replace(buf, &mut []).split_at_mut(n);
*buf = rest;

if n == 0 {
return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into()));
}
}

Poll::Ready(Ok(()))
}
}
87 changes: 87 additions & 0 deletions src/io/read/read_to_end.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use crate::future::Future;
use crate::task::{Context, Poll};

use std::io;
use std::pin::Pin;

use futures_io::AsyncRead;

#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct ReadToEndFuture<'a, T: Unpin + ?Sized> {
pub(crate) reader: &'a mut T,
pub(crate) buf: &'a mut Vec<u8>,
pub(crate) start_len: usize,
}

impl<T: AsyncRead + Unpin + ?Sized> Future for ReadToEndFuture<'_, T> {
type Output = io::Result<usize>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self {
reader,
buf,
start_len,
} = &mut *self;
read_to_end_internal(Pin::new(reader), cx, buf, *start_len)
}
}

// This uses an adaptive system to extend the vector when it fills. We want to
// avoid paying to allocate and zero a huge chunk of memory if the reader only
// has 4 bytes while still making large reads if the reader does have a ton
// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
// time is 4,500 times (!) slower than this if the reader has a very small
// amount of data to return.
//
// Because we're extending the buffer with uninitialized data for trusted
// readers, we need to make sure to truncate that if any of this panics.
pub fn read_to_end_internal<R: AsyncRead + ?Sized>(
mut rd: Pin<&mut R>,
cx: &mut Context<'_>,
buf: &mut Vec<u8>,
start_len: usize,
) -> Poll<io::Result<usize>> {
struct Guard<'a> {
buf: &'a mut Vec<u8>,
len: usize,
}

impl Drop for Guard<'_> {
fn drop(&mut self) {
unsafe {
self.buf.set_len(self.len);
}
}
}

let mut g = Guard {
len: buf.len(),
buf,
};
let ret;
loop {
if g.len == g.buf.len() {
unsafe {
g.buf.reserve(32);
let capacity = g.buf.capacity();
g.buf.set_len(capacity);
rd.initializer().initialize(&mut g.buf[g.len..]);
}
}

match futures_core::ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) {
Ok(0) => {
ret = Poll::Ready(Ok(g.len - start_len));
break;
}
Ok(n) => g.len += n,
Err(e) => {
ret = Poll::Ready(Err(e));
break;
}
}
}

ret
}
Loading