Skip to content

Commit 033a90b

Browse files
taiki-ecramertj
authored andcommitted
Add async BufWriter
1 parent 85162dc commit 033a90b

File tree

4 files changed

+423
-6
lines changed

4 files changed

+423
-6
lines changed

futures-util/src/io/buf_writer.rs

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
use futures_core::task::{Context, Poll};
2+
use futures_io::{AsyncSeek, AsyncWrite, IoSlice, SeekFrom};
3+
use pin_utils::{unsafe_pinned, unsafe_unpinned};
4+
use std::fmt;
5+
use std::io::{self, Write};
6+
use std::pin::Pin;
7+
use super::DEFAULT_BUF_SIZE;
8+
9+
/// Wraps a writer and buffers its output.
10+
///
11+
/// It can be excessively inefficient to work directly with something that
12+
/// implements [`AsyncWrite`]. A `BufWriter` keeps an in-memory buffer of data and
13+
/// writes it to an underlying writer in large, infrequent batches.
14+
///
15+
/// `BufWriter` can improve the speed of programs that make *small* and
16+
/// *repeated* write calls to the same file or network socket. It does not
17+
/// help when writing very large amounts at once, or writing just one or a few
18+
/// times. It also provides no advantage when writing to a destination that is
19+
/// in memory, like a `Vec<u8>`.
20+
///
21+
/// When the `BufWriter` is dropped, the contents of its buffer will be
22+
/// discarded. Creating multiple instances of a `BufWriter` on the same
23+
/// stream can cause data loss. If you need to write out the contents of its
24+
/// buffer, you must manually call flush before the writer is dropped.
25+
///
26+
/// [`AsyncWrite`]: futures_io::AsyncWrite
27+
/// [`flush`]: super::AsyncWriteExt::flush
28+
///
29+
// TODO: Examples
30+
pub struct BufWriter<W> {
31+
inner: W,
32+
buf: Vec<u8>,
33+
written: usize,
34+
}
35+
36+
impl<W: AsyncWrite> BufWriter<W> {
37+
unsafe_pinned!(inner: W);
38+
unsafe_unpinned!(buf: Vec<u8>);
39+
40+
/// Creates a new `BufWriter` with a default buffer capacity. The default is currently 8 KB,
41+
/// but may change in the future.
42+
pub fn new(inner: W) -> Self {
43+
Self::with_capacity(DEFAULT_BUF_SIZE, inner)
44+
}
45+
46+
/// Creates a new `BufWriter` with the specified buffer capacity.
47+
pub fn with_capacity(cap: usize, inner: W) -> Self {
48+
Self {
49+
inner,
50+
buf: Vec::with_capacity(cap),
51+
written: 0,
52+
}
53+
}
54+
55+
fn flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
56+
let Self { inner, buf, written } = unsafe { Pin::get_unchecked_mut(self) };
57+
let mut inner = unsafe { Pin::new_unchecked(inner) };
58+
59+
let len = buf.len();
60+
let mut ret = Ok(());
61+
while *written < len {
62+
match ready!(inner.as_mut().poll_write(cx, &buf[*written..])) {
63+
Ok(0) => {
64+
ret = Err(io::Error::new(
65+
io::ErrorKind::WriteZero,
66+
"failed to write the buffered data",
67+
));
68+
break;
69+
}
70+
Ok(n) => *written += n,
71+
Err(e) => {
72+
ret = Err(e);
73+
break;
74+
}
75+
}
76+
}
77+
if *written > 0 {
78+
buf.drain(..*written);
79+
}
80+
*written = 0;
81+
Poll::Ready(ret)
82+
}
83+
84+
/// Gets a reference to the underlying writer.
85+
pub fn get_ref(&self) -> &W {
86+
&self.inner
87+
}
88+
89+
/// Gets a mutable reference to the underlying writer.
90+
///
91+
/// It is inadvisable to directly write to the underlying writer.
92+
pub fn get_mut(&mut self) -> &mut W {
93+
&mut self.inner
94+
}
95+
96+
/// Gets a pinned mutable reference to the underlying writer.
97+
///
98+
/// It is inadvisable to directly write to the underlying writer.
99+
pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut W> {
100+
self.inner()
101+
}
102+
103+
/// Consumes this `BufWriter`, returning the underlying writer.
104+
///
105+
/// Note that any leftover data in the internal buffer is lost.
106+
pub fn into_inner(self) -> W {
107+
self.inner
108+
}
109+
110+
/// Returns a reference to the internally buffered data.
111+
pub fn buffer(&self) -> &[u8] {
112+
&self.buf
113+
}
114+
}
115+
116+
impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
117+
fn poll_write(
118+
mut self: Pin<&mut Self>,
119+
cx: &mut Context<'_>,
120+
buf: &[u8],
121+
) -> Poll<io::Result<usize>> {
122+
if self.buf.len() + buf.len() > self.buf.capacity() {
123+
ready!(self.as_mut().flush_buf(cx))?;
124+
}
125+
if buf.len() >= self.buf.capacity() {
126+
self.inner().poll_write(cx, buf)
127+
} else {
128+
Poll::Ready(self.buf().write(buf))
129+
}
130+
}
131+
132+
fn poll_write_vectored(
133+
mut self: Pin<&mut Self>,
134+
cx: &mut Context<'_>,
135+
bufs: &[IoSlice<'_>],
136+
) -> Poll<io::Result<usize>> {
137+
let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
138+
if self.buf.len() + total_len > self.buf.capacity() {
139+
ready!(self.as_mut().flush_buf(cx))?;
140+
}
141+
if total_len >= self.buf.capacity() {
142+
self.inner().poll_write_vectored(cx, bufs)
143+
} else {
144+
Poll::Ready(self.buf().write_vectored(bufs))
145+
}
146+
}
147+
148+
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
149+
ready!(self.as_mut().flush_buf(cx))?;
150+
self.inner().poll_flush(cx)
151+
}
152+
153+
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
154+
ready!(self.as_mut().flush_buf(cx))?;
155+
self.inner().poll_close(cx)
156+
}
157+
}
158+
159+
impl<W: AsyncWrite + fmt::Debug> fmt::Debug for BufWriter<W> {
160+
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
161+
fmt.debug_struct("BufWriter")
162+
.field("writer", &self.inner)
163+
.field("buffer", &format_args!("{}/{}", self.buf.len(), self.buf.capacity()))
164+
.field("written", &self.written)
165+
.finish()
166+
}
167+
}
168+
169+
impl<W: AsyncWrite + AsyncSeek> AsyncSeek for BufWriter<W> {
170+
/// Seek to the offset, in bytes, in the underlying writer.
171+
///
172+
/// Seeking always writes out the internal buffer before seeking.
173+
fn poll_seek(
174+
mut self: Pin<&mut Self>,
175+
cx: &mut Context<'_>,
176+
pos: SeekFrom,
177+
) -> Poll<io::Result<u64>> {
178+
ready!(self.as_mut().flush_buf(cx))?;
179+
self.inner().poll_seek(cx, pos)
180+
}
181+
}

futures-util/src/io/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ pub use self::allow_std::AllowStdIo;
2525
mod buf_reader;
2626
pub use self::buf_reader::BufReader;
2727

28-
// mod buf_writer;
29-
// pub use self::buf_writer::BufWriter;
28+
mod buf_writer;
29+
pub use self::buf_writer::BufWriter;
3030

3131
mod copy_into;
3232
pub use self::copy_into::CopyInto;

futures/src/lib.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -295,10 +295,10 @@ pub mod io {
295295
};
296296

297297
pub use futures_util::io::{
298-
AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt, AllowStdIo, BufReader,
299-
Close, CopyInto, Flush, Read, ReadExact, ReadHalf, ReadLine, ReadToEnd,
300-
ReadUntil, Seek, Window, WriteAll, WriteHalf,
301-
Lines, ReadVectored, Write, WriteVectored,
298+
AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt, AllowStdIo,
299+
BufReader, BufWriter, Close, CopyInto, Flush, Lines, Read, ReadExact,
300+
ReadHalf, ReadLine, ReadToEnd, ReadUntil, ReadVectored, Seek, Window,
301+
Write, WriteAll, WriteHalf, WriteVectored,
302302
};
303303
}
304304

0 commit comments

Comments
 (0)