Skip to content

Commit 90872dd

Browse files
authored
Merge pull request #92 from Drevoed/buf-writer
draft Buf writer
2 parents f922e9c + 63154f5 commit 90872dd

File tree

2 files changed

+382
-0
lines changed

2 files changed

+382
-0
lines changed

src/io/buf_writer.rs

+380
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,380 @@
1+
use crate::task::{Context, Poll};
2+
use futures_core::ready;
3+
use futures_io::{AsyncSeek, AsyncWrite, SeekFrom};
4+
use std::fmt;
5+
use std::io;
6+
use std::pin::Pin;
7+
8+
const DEFAULT_CAPACITY: usize = 8 * 1024;
9+
10+
/// Wraps a writer and buffers its output.
11+
///
12+
/// It can be excessively inefficient to work directly with something that
13+
/// implements [`Write`]. For example, every call to
14+
/// [`write`][`TcpStream::write`] on [`TcpStream`] results in a system call. A
15+
/// `BufWriter` keeps an in-memory buffer of data and writes it to an underlying
16+
/// writer in large, infrequent batches.
17+
///
18+
/// `BufWriter` can improve the speed of programs that make *small* and
19+
/// *repeated* write calls to the same file or network socket. It does not
20+
/// help when writing very large amounts at once, or writing just one or a few
21+
/// times. It also provides no advantage when writing to a destination that is
22+
/// in memory, like a `Vec<u8>`.
23+
///
24+
/// When the `BufWriter` is dropped, the contents of its buffer will be written
25+
/// out. However, any errors that happen in the process of flushing the buffer
26+
/// when the writer is dropped will be ignored. Code that wishes to handle such
27+
/// errors must manually call [`flush`] before the writer is dropped.
28+
///
29+
/// This type is an async version of [`std::io::BufReader`].
30+
///
31+
/// [`std::io::BufReader`]: https://doc.rust-lang.org/std/io/struct.BufReader.html
32+
///
33+
/// # Examples
34+
///
35+
/// Let's write the numbers one through ten to a [`TcpStream`]:
36+
///
37+
/// ```no_run
38+
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
39+
/// use async_std::net::TcpStream;
40+
/// use async_std::prelude::*;
41+
///
42+
/// let mut stream = TcpStream::connect("127.0.0.1:34254").await?;
43+
///
44+
/// for i in 0..10 {
45+
/// let arr = [i+1];
46+
/// stream.write(&arr).await?;
47+
/// }
48+
/// #
49+
/// # Ok(()) }) }
50+
/// ```
51+
///
52+
/// Because we're not buffering, we write each one in turn, incurring the
53+
/// overhead of a system call per byte written. We can fix this with a
54+
/// `BufWriter`:
55+
///
56+
/// ```no_run
57+
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
58+
/// use async_std::io::BufWriter;
59+
/// use async_std::net::TcpStream;
60+
/// use async_std::prelude::*;
61+
///
62+
/// let mut stream = BufWriter::new(TcpStream::connect("127.0.0.1:34254").await?);
63+
/// for i in 0..10 {
64+
/// let arr = [i+1];
65+
/// stream.write(&arr).await?;
66+
/// };
67+
/// #
68+
/// # Ok(()) }) }
69+
/// ```
70+
///
71+
/// By wrapping the stream with a `BufWriter`, these ten writes are all grouped
72+
/// together by the buffer, and will all be written out in one system call when
73+
/// the `stream` is dropped.
74+
///
75+
/// [`Write`]: trait.Write.html
76+
/// [`TcpStream::write`]: ../net/struct.TcpStream.html#method.write
77+
/// [`TcpStream`]: ../net/struct.TcpStream.html
78+
/// [`flush`]: trait.Write.html#tymethod.flush
79+
pub struct BufWriter<W> {
80+
inner: W,
81+
buf: Vec<u8>,
82+
written: usize,
83+
}
84+
85+
impl<W: AsyncWrite> BufWriter<W> {
86+
pin_utils::unsafe_pinned!(inner: W);
87+
pin_utils::unsafe_unpinned!(buf: Vec<u8>);
88+
89+
/// Creates a new `BufWriter` with a default buffer capacity. The default is currently 8 KB,
90+
/// but may change in the future.
91+
///
92+
/// # Examples
93+
///
94+
/// ```no_run
95+
/// # #![allow(unused_mut)]
96+
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
97+
/// use async_std::io::BufWriter;
98+
/// use async_std::net::TcpStream;
99+
///
100+
/// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").await?);
101+
/// #
102+
/// # Ok(()) }) }
103+
/// ```
104+
pub fn new(inner: W) -> BufWriter<W> {
105+
BufWriter::with_capacity(DEFAULT_CAPACITY, inner)
106+
}
107+
108+
/// Creates a new `BufWriter` with the specified buffer capacity.
109+
///
110+
/// # Examples
111+
///
112+
/// Creating a buffer with a buffer of a hundred bytes.
113+
///
114+
/// ```no_run
115+
/// # #![allow(unused_mut)]
116+
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
117+
/// use async_std::io::BufWriter;
118+
/// use async_std::net::TcpStream;
119+
///
120+
/// let stream = TcpStream::connect("127.0.0.1:34254").await?;
121+
/// let mut buffer = BufWriter::with_capacity(100, stream);
122+
/// #
123+
/// # Ok(()) }) }
124+
/// ```
125+
pub fn with_capacity(capacity: usize, inner: W) -> BufWriter<W> {
126+
BufWriter {
127+
inner,
128+
buf: Vec::with_capacity(capacity),
129+
written: 0,
130+
}
131+
}
132+
133+
/// Gets a reference to the underlying writer.
134+
///
135+
/// # Examples
136+
///
137+
/// ```no_run
138+
/// # #![allow(unused_mut)]
139+
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
140+
/// use async_std::io::BufWriter;
141+
/// use async_std::net::TcpStream;
142+
///
143+
/// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").await?);
144+
///
145+
/// // We can use reference just like buffer
146+
/// let reference = buffer.get_ref();
147+
/// #
148+
/// # Ok(()) }) }
149+
/// ```
150+
pub fn get_ref(&self) -> &W {
151+
&self.inner
152+
}
153+
154+
/// Gets a mutable reference to the underlying writer.
155+
///
156+
/// It is inadvisable to directly write to the underlying writer.
157+
///
158+
/// # Examples
159+
///
160+
/// ```no_run
161+
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
162+
/// use async_std::io::BufWriter;
163+
/// use async_std::net::TcpStream;
164+
///
165+
/// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").await?);
166+
///
167+
/// // We can use reference just like buffer
168+
/// let reference = buffer.get_mut();
169+
/// #
170+
/// # Ok(()) }) }
171+
/// ```
172+
pub fn get_mut(&mut self) -> &mut W {
173+
&mut self.inner
174+
}
175+
176+
// pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> {
177+
// self.inner()
178+
// }
179+
180+
/// Consumes BufWriter, returning the underlying writer
181+
///
182+
/// This method will not write leftover data, it will be lost.
183+
/// For method that will attempt to write before returning the writer see [`poll_into_inner`]
184+
///
185+
/// [`poll_into_inner`]: #method.poll_into_inner
186+
pub fn into_inner(self) -> W {
187+
self.inner
188+
}
189+
190+
// pub fn poll_into_inner(self: Pin<&mut Self>, _cx: Context<'_>) -> Poll<io::Result<usize>> {
191+
// unimplemented!("poll into inner method")
192+
// }
193+
194+
/// Returns a reference to the internally buffered data.
195+
///
196+
/// # Examples
197+
///
198+
/// ```no_run
199+
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
200+
/// use async_std::io::BufWriter;
201+
/// use async_std::net::TcpStream;
202+
///
203+
/// let buf_writer = BufWriter::new(TcpStream::connect("127.0.0.1:34251").await?);
204+
///
205+
/// // See how many bytes are currently buffered
206+
/// let bytes_buffered = buf_writer.buffer().len();
207+
/// #
208+
/// # Ok(()) }) }
209+
/// ```
210+
pub fn buffer(&self) -> &[u8] {
211+
&self.buf
212+
}
213+
214+
/// Poll buffer flushing until completion
215+
///
216+
/// This is used in types that wrap around BufWrite, one such example: [`LineWriter`]
217+
///
218+
/// [`LineWriter`]: struct.LineWriter.html
219+
fn poll_flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
220+
let Self {
221+
inner,
222+
buf,
223+
written,
224+
} = unsafe { Pin::get_unchecked_mut(self) };
225+
let mut inner = unsafe { Pin::new_unchecked(inner) };
226+
let len = buf.len();
227+
let mut ret = Ok(());
228+
while *written < len {
229+
match inner.as_mut().poll_write(cx, &buf[*written..]) {
230+
Poll::Ready(Ok(0)) => {
231+
ret = Err(io::Error::new(
232+
io::ErrorKind::WriteZero,
233+
"Failed to write buffered data",
234+
));
235+
break;
236+
}
237+
Poll::Ready(Ok(n)) => *written += n,
238+
Poll::Ready(Err(ref e)) if e.kind() == io::ErrorKind::Interrupted => {}
239+
Poll::Ready(Err(e)) => {
240+
ret = Err(e);
241+
break;
242+
}
243+
Poll::Pending => return Poll::Pending,
244+
}
245+
}
246+
if *written > 0 {
247+
buf.drain(..*written);
248+
}
249+
*written = 0;
250+
Poll::Ready(ret)
251+
}
252+
}
253+
254+
impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
255+
fn poll_write(
256+
mut self: Pin<&mut Self>,
257+
cx: &mut Context<'_>,
258+
buf: &[u8],
259+
) -> Poll<io::Result<usize>> {
260+
if self.buf.len() + buf.len() > self.buf.capacity() {
261+
ready!(self.as_mut().poll_flush_buf(cx))?;
262+
}
263+
if buf.len() >= self.buf.capacity() {
264+
self.inner().poll_write(cx, buf)
265+
} else {
266+
Pin::new(&mut *self.buf()).poll_write(cx, buf)
267+
}
268+
}
269+
270+
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
271+
ready!(self.as_mut().poll_flush_buf(cx))?;
272+
self.inner().poll_flush(cx)
273+
}
274+
275+
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
276+
ready!(self.as_mut().poll_flush_buf(cx))?;
277+
self.inner().poll_close(cx)
278+
}
279+
}
280+
281+
impl<W: AsyncWrite + fmt::Debug> fmt::Debug for BufWriter<W> {
282+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
283+
f.debug_struct("BufReader")
284+
.field("writer", &self.inner)
285+
.field("buf", &self.buf)
286+
.finish()
287+
}
288+
}
289+
290+
impl<W: AsyncWrite + AsyncSeek> AsyncSeek for BufWriter<W> {
291+
/// Seek to the offset, in bytes, in the underlying writer.
292+
///
293+
/// Seeking always writes out the internal buffer before seeking.
294+
295+
fn poll_seek(
296+
mut self: Pin<&mut Self>,
297+
cx: &mut Context<'_>,
298+
pos: SeekFrom,
299+
) -> Poll<io::Result<u64>> {
300+
ready!(self.as_mut().poll_flush_buf(cx))?;
301+
self.inner().poll_seek(cx, pos)
302+
}
303+
}
304+
305+
mod tests {
306+
#![allow(unused_imports)]
307+
308+
use super::BufWriter;
309+
use crate::io::{self, SeekFrom};
310+
use crate::prelude::*;
311+
use crate::task;
312+
313+
#[test]
314+
fn test_buffered_writer() {
315+
task::block_on(async {
316+
let inner = Vec::new();
317+
let mut writer = BufWriter::with_capacity(2, inner);
318+
319+
writer.write(&[0, 1]).await.unwrap();
320+
assert_eq!(writer.buffer(), []);
321+
assert_eq!(*writer.get_ref(), [0, 1]);
322+
323+
writer.write(&[2]).await.unwrap();
324+
assert_eq!(writer.buffer(), [2]);
325+
assert_eq!(*writer.get_ref(), [0, 1]);
326+
327+
writer.write(&[3]).await.unwrap();
328+
assert_eq!(writer.buffer(), [2, 3]);
329+
assert_eq!(*writer.get_ref(), [0, 1]);
330+
331+
writer.flush().await.unwrap();
332+
assert_eq!(writer.buffer(), []);
333+
assert_eq!(*writer.get_ref(), [0, 1, 2, 3]);
334+
335+
writer.write(&[4]).await.unwrap();
336+
writer.write(&[5]).await.unwrap();
337+
assert_eq!(writer.buffer(), [4, 5]);
338+
assert_eq!(*writer.get_ref(), [0, 1, 2, 3]);
339+
340+
writer.write(&[6]).await.unwrap();
341+
assert_eq!(writer.buffer(), [6]);
342+
assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5]);
343+
344+
writer.write(&[7, 8]).await.unwrap();
345+
assert_eq!(writer.buffer(), []);
346+
assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8]);
347+
348+
writer.write(&[9, 10, 11]).await.unwrap();
349+
assert_eq!(writer.buffer(), []);
350+
assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
351+
352+
writer.flush().await.unwrap();
353+
assert_eq!(writer.buffer(), []);
354+
assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
355+
})
356+
}
357+
358+
#[test]
359+
fn test_buffered_writer_inner_into_inner_does_not_flush() {
360+
task::block_on(async {
361+
let mut w = BufWriter::with_capacity(3, Vec::new());
362+
w.write(&[0, 1]).await.unwrap();
363+
assert_eq!(*w.get_ref(), []);
364+
let w = w.into_inner();
365+
assert_eq!(w, []);
366+
})
367+
}
368+
369+
#[test]
370+
fn test_buffered_writer_seek() {
371+
task::block_on(async {
372+
let mut w = BufWriter::with_capacity(3, io::Cursor::new(Vec::new()));
373+
w.write_all(&[0, 1, 2, 3, 4, 5]).await.unwrap();
374+
w.write_all(&[6, 7]).await.unwrap();
375+
assert_eq!(w.seek(SeekFrom::Current(0)).await.ok(), Some(8));
376+
assert_eq!(&w.get_ref().get_ref()[..], &[0, 1, 2, 3, 4, 5, 6, 7][..]);
377+
assert_eq!(w.seek(SeekFrom::Start(2)).await.ok(), Some(2));
378+
})
379+
}
380+
}

src/io/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom};
2525

2626
pub use buf_read::{BufRead, Lines};
2727
pub use buf_reader::BufReader;
28+
pub use buf_writer::BufWriter;
2829
pub use copy::copy;
2930
pub use cursor::Cursor;
3031
pub use empty::{empty, Empty};
@@ -46,6 +47,7 @@ pub(crate) mod seek;
4647
pub(crate) mod write;
4748

4849
mod buf_reader;
50+
mod buf_writer;
4951
mod copy;
5052
mod cursor;
5153
mod empty;

0 commit comments

Comments
 (0)