Skip to content

Commit 48d4c9b

Browse files
committed
begin implementing BufWriter
1 parent 91aeb39 commit 48d4c9b

File tree

2 files changed

+264
-48
lines changed

2 files changed

+264
-48
lines changed

src/io/buf_writer.rs

+263-47
Original file line numberDiff line numberDiff line change
@@ -7,35 +7,138 @@ use crate::io::Write;
77

88
const DEFAULT_CAPACITY: usize = 8 * 1024;
99

10-
11-
pub struct BufWriter<W: AsyncWrite> {
12-
inner: Option<W>,
10+
/// Wraps a writer and buffers its output.
11+
///
12+
/// It can be excessively inefficient to work directly with something that
13+
/// implements [`Write`]. For example, every call to
14+
/// [`write`][`TcpStream::write`] on [`TcpStream`] results in a system call. A
15+
/// `BufWriter` keeps an in-memory buffer of data and writes it to an underlying
16+
/// writer in large, infrequent batches.
17+
///
18+
/// `BufWriter` can improve the speed of programs that make *small* and
19+
/// *repeated* write calls to the same file or network socket. It does not
20+
/// help when writing very large amounts at once, or writing just one or a few
21+
/// times. It also provides no advantage when writing to a destination that is
22+
/// in memory, like a `Vec<u8>`.
23+
///
24+
/// When the `BufWriter` is dropped, the contents of its buffer will be written
25+
/// out. However, any errors that happen in the process of flushing the buffer
26+
/// when the writer is dropped will be ignored. Code that wishes to handle such
27+
/// errors must manually call [`flush`] before the writer is dropped.
28+
///
29+
/// This type is an async version of [`std::io::BufReader`].
30+
///
31+
/// [`std::io::BufReader`]: https://doc.rust-lang.org/std/io/struct.BufReader.html
32+
///
33+
/// # Examples
34+
///
35+
/// Let's write the numbers one through ten to a [`TcpStream`]:
36+
///
37+
/*/ ```no_run
38+
/ use std::io::prelude::*;
39+
/ use std::net::TcpStream;
40+
/
41+
/ let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();
42+
/
43+
/ for i in 0..10 {
44+
/ stream.write(&[i+1]).unwrap();
45+
/ }
46+
/ ```*/
47+
///
48+
/// Because we're not buffering, we write each one in turn, incurring the
49+
/// overhead of a system call per byte written. We can fix this with a
50+
/// `BufWriter`:
51+
///
52+
/*/ ```no_run
53+
/ use std::io::prelude::*;
54+
/ use std::io::BufWriter;
55+
/ use std::net::TcpStream;
56+
/
57+
/ let mut stream = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap());
58+
/
59+
/ for i in 0..10 {
60+
/ stream.write(&[i+1]).unwrap();
61+
/ }
62+
/ ```*/
63+
///
64+
/// By wrapping the stream with a `BufWriter`, these ten writes are all grouped
65+
/// together by the buffer, and will all be written out in one system call when
66+
/// the `stream` is dropped.
67+
///
68+
/// [`Write`]: ../../std/io/trait.Write.html
69+
/// [`TcpStream::write`]: ../../std/net/struct.TcpStream.html#method.write
70+
/// [`TcpStream`]: ../../std/net/struct.TcpStream.html
71+
/// [`flush`]: #method.flush
72+
pub struct BufWriter<W> {
73+
inner: W,
1374
buf: Vec<u8>,
14-
panicked: bool,
75+
written: usize,
1576
}
1677

1778
impl<W: AsyncWrite + Unpin> BufWriter<W> {
18-
pin_utils::unsafe_pinned!(inner: Option<W>);
19-
pin_utils::unsafe_unpinned!(panicked: bool);
79+
pin_utils::unsafe_pinned!(inner: W);
80+
pin_utils::unsafe_unpinned!(buf: Vec<u8>);
2081

82+
/// Creates a new `BufWriter` with a default buffer capacity. The default is currently 8 KB,
83+
/// but may change in the future.
84+
///
85+
/// # Examples
86+
///
87+
/// ```no_run
88+
/// use async_std::io::BufWriter;
89+
/// use async_std::net::TcpStream;
90+
///
91+
/// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap());
92+
/// ```
2193
pub fn new(inner: W) -> BufWriter<W> {
2294
BufWriter::with_capacity(DEFAULT_CAPACITY, inner)
2395
}
2496

97+
/// Creates a new `BufWriter` with the specified buffer capacity.
98+
///
99+
/// # Examples
100+
///
101+
/// Creating a buffer with a buffer of a hundred bytes.
102+
///
103+
/// ```no_run
104+
/// use async_std::io::BufWriter;
105+
/// use async_std::net::TcpStream;
106+
///
107+
/// let stream = TcpStream::connect("127.0.0.1:34254").unwrap();
108+
/// let mut buffer = BufWriter::with_capacity(100, stream);
109+
/// ```
25110
pub fn with_capacity(capacity: usize, inner: W) -> BufWriter<W> {
26111
BufWriter {
27-
inner: Some(inner),
112+
inner,
28113
buf: Vec::with_capacity(capacity),
29-
panicked: false,
114+
written: 0,
30115
}
31116
}
32117

33118
pub fn get_ref(&self) -> &W {
34-
self.inner.as_ref().unwrap()
119+
&self.inner
35120
}
36121

37122
pub fn get_mut(&mut self) -> &mut W {
38-
self.inner.as_mut().unwrap()
123+
&mut self.inner
124+
}
125+
126+
pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> {
127+
self.inner()
128+
}
129+
130+
/// Consumes BufWriter, returning the underlying writer
131+
///
132+
/// This method will not write leftover data, it will be lost.
133+
/// For method that will attempt to write before returning the writer see [`poll_into_inner`]
134+
///
135+
/// [`poll_into_inner`]: #method.poll_into_inner
136+
pub fn into_inner(self) -> W {
137+
self.inner
138+
}
139+
140+
pub fn poll_into_inner(mut self: Pin<&mut Self>, cx: Context<'_>) -> Poll<io::Result<usize>> {
141+
unimplemented!("poll into inner method")
39142
}
40143

41144
pub fn buffer(&self) -> &[u8] {
@@ -46,25 +149,21 @@ impl<W: AsyncWrite + Unpin> BufWriter<W> {
46149
let Self {
47150
inner,
48151
buf,
49-
panicked
152+
written
50153
} = Pin::get_mut(self);
51-
let mut panicked = Pin::new(panicked);
52-
let mut written = 0;
154+
let mut inner = Pin::new(inner);
53155
let len = buf.len();
54156
let mut ret = Ok(());
55-
while written < len {
56-
*panicked = true;
57-
let r = Pin::new(inner.as_mut().unwrap());
58-
*panicked = false;
59-
match r.poll_write(cx, &buf[written..]) {
157+
while *written < len {
158+
match inner.as_mut().poll_write(cx, &buf[*written..]) {
60159
Poll::Ready(Ok(0)) => {
61160
ret = Err(io::Error::new(
62161
io::ErrorKind::WriteZero,
63162
"Failed to write buffered data",
64163
));
65164
break;
66165
}
67-
Poll::Ready(Ok(n)) => written += n,
166+
Poll::Ready(Ok(n)) => *written += n,
68167
Poll::Ready(Err(ref e)) if e.kind() == io::ErrorKind::Interrupted => {}
69168
Poll::Ready(Err(e)) => {
70169
ret = Err(e);
@@ -73,22 +172,12 @@ impl<W: AsyncWrite + Unpin> BufWriter<W> {
73172
Poll::Pending => return Poll::Pending,
74173
}
75174
}
76-
if written > 0 {
77-
buf.drain(..written);
175+
if *written > 0 {
176+
buf.drain(..*written);
78177
}
178+
*written = 0;
79179
Poll::Ready(ret)
80180
}
81-
82-
pub fn poll_into_inner(
83-
mut self: Pin<&mut Self>,
84-
cx: &mut Context<'_>,
85-
//TODO: Fix 'expected function, found struct `IntoInnerError`' compiler error
86-
) -> Poll<io::Result<W>> {
87-
match ready!(self.as_mut().poll_flush_buf(cx)) {
88-
Ok(()) => Poll::Ready(Ok(self.inner().take().unwrap())),
89-
Err(e) => Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "")))
90-
}
91-
}
92181
}
93182

94183
impl<W: AsyncWrite + Unpin> AsyncWrite for BufWriter<W> {
@@ -97,33 +186,28 @@ impl<W: AsyncWrite + Unpin> AsyncWrite for BufWriter<W> {
97186
cx: &mut Context,
98187
buf: &[u8],
99188
) -> Poll<io::Result<usize>> {
100-
let panicked = self.as_mut().panicked();
101-
if self.as_ref().buf.len() + buf.len() > self.as_ref().buf.capacity() {
102-
match ready!(self.as_mut().poll_flush_buf(cx)) {
103-
Ok(()) => {},
104-
Err(e) => return Poll::Ready(Err(e))
105-
}
189+
if self.buf.len() + buf.len() > self.buf.capacity() {
190+
ready!(self.as_mut().poll_flush_buf(cx))?;
106191
}
107-
if buf.len() >= self.as_ref().buf.capacity() {
108-
*panicked = true;
109-
let r = ready!(self.as_mut().poll_write(cx, buf));
110-
*panicked = false;
111-
return Poll::Ready(r)
192+
if buf.len() >= self.buf.capacity() {
193+
self.inner().poll_write(cx, buf)
112194
} else {
113-
return Poll::Ready(ready!(self.as_ref().buf.write(buf).poll()))
195+
self.buf().write(buf).poll()
114196
}
115197
}
116198

117199
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
118-
unimplemented!()
200+
ready!(self.as_mut().poll_flush_buf(cx))?;
201+
self.inner().poll_flush(cx)
119202
}
120203

121204
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
122-
unimplemented!()
205+
ready!(self.as_mut().poll_flush_buf(cx))?;
206+
self.inner().poll_close(cx)
123207
}
124208
}
125209

126-
impl<W: AsyncWrite + fmt::Debug> fmt::Debug for BufWriter<W> {
210+
impl<W: AsyncWrite + fmt::Debug + Unpin> fmt::Debug for BufWriter<W> {
127211
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
128212
f.debug_struct("BufReader")
129213
.field("writer", &self.inner)
@@ -135,6 +219,138 @@ impl<W: AsyncWrite + fmt::Debug> fmt::Debug for BufWriter<W> {
135219
}
136220
}
137221

222+
pub struct LineWriter<W: AsyncWrite + Unpin> {
223+
inner: BufWriter<W>,
224+
need_flush: bool,
225+
}
226+
227+
impl<W: AsyncWrite + Unpin> LineWriter<W> {
228+
pin_utils::unsafe_pinned!(inner: BufWriter<W>);
229+
pin_utils::unsafe_unpinned!(need_flush: bool);
230+
/// Creates a new `LineWriter`.
231+
///
232+
/// # Examples
233+
///
234+
/// ```no_run
235+
/// use async_std::fs::File;
236+
/// use async_std::io::LineWriter;
237+
///
238+
/// fn main() -> std::io::Result<()> {
239+
/// async_std::task::block_on(async {
240+
/// let file = File::create("poem.txt").await?;
241+
/// let file = LineWriter::new(file);
242+
/// Ok(())
243+
/// })
244+
/// }
245+
/// ```
246+
pub fn new(inner: W) -> LineWriter<W> {
247+
// Lines typically aren't that long, don't use a giant buffer
248+
LineWriter::with_capacity(1024, inner)
249+
}
250+
251+
/// Creates a new `LineWriter` with a specified capacity for the internal
252+
/// buffer.
253+
///
254+
/// # Examples
255+
///
256+
/// ```no_run
257+
/// use async_std::fs::File;
258+
/// use async_std::io::LineWriter;
259+
///
260+
/// fn main() -> std::io::Result<()> {
261+
/// async_std::task::block_on(async {
262+
/// let file = File::create("poem.txt").await?;
263+
/// let file = LineWriter::with_capacity(100, file);
264+
/// Ok(())
265+
/// })
266+
/// }
267+
/// ```
268+
pub fn with_capacity(capacity: usize, inner: W) -> LineWriter<W> {
269+
LineWriter {
270+
inner: BufWriter::with_capacity(capacity, inner),
271+
need_flush: false,
272+
}
273+
}
274+
275+
pub fn get_ref(&self) -> &W {
276+
self.inner.get_ref()
277+
}
278+
279+
pub fn get_mut(&mut self) -> &mut W {
280+
self.inner.get_mut()
281+
}
282+
283+
pub fn into_inner(self) -> W {
284+
self.inner.into_inner()
285+
}
286+
}
287+
288+
impl<W: AsyncWrite + Unpin> AsyncWrite for LineWriter<W> {
289+
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
290+
if self.need_flush {
291+
self.as_mut().poll_flush(cx)?;
292+
}
293+
294+
let i = match memchr::memrchr(b'\n', buf) {
295+
Some(i) => i,
296+
None => return self.as_mut().inner().as_mut().poll_write(cx, buf)
297+
};
298+
299+
let n = ready!(self.as_mut().inner().as_mut().poll_write(cx, &buf[..=i])?);
300+
*self.as_mut().need_flush() = true;
301+
if ready!(self.as_mut().poll_flush(cx)).is_err() || n != 1 + 1 {
302+
return Poll::Ready(Ok(n))
303+
}
304+
match ready!(self.inner().poll_write(cx, &buf[i + 1..])) {
305+
Ok(i) => Poll::Ready(Ok(n + 1)),
306+
Err(_) => Poll::Ready(Ok(n))
307+
}
308+
}
309+
310+
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
311+
self.as_mut().inner().poll_flush(cx)?;
312+
*self.need_flush() = false;
313+
Poll::Ready(Ok(()))
314+
}
315+
316+
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
317+
self.as_mut().inner().poll_flush(cx)?;
318+
self.inner().poll_close(cx)
319+
}
320+
}
321+
322+
impl<W: AsyncWrite + Unpin> fmt::Debug for LineWriter<W> where W: fmt::Debug {
323+
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
324+
fmt.debug_struct("LineWriter")
325+
.field("writer", &self.inner.inner)
326+
.field("buffer",
327+
&format_args!("{}/{}", self.inner.buf.len(), self.inner.buf.capacity()))
328+
.finish()
329+
}
330+
}
331+
332+
138333
mod tests {
334+
use crate::prelude::*;
335+
use crate::task;
336+
use super::LineWriter;
139337

338+
#[test]
339+
fn test_line_buffer() {
340+
task::block_on(async {
341+
let mut writer = LineWriter::new(Vec::new());
342+
writer.write(&[0]).await.unwrap();
343+
assert_eq!(*writer.get_ref(), []);
344+
writer.write(&[1]).await.unwrap();
345+
assert_eq!(*writer.get_ref(), []);
346+
writer.flush().await.unwrap();
347+
assert_eq!(*writer.get_ref(), [0, 1]);
348+
writer.write(&[0, b'\n', 1, b'\n', 2]).await.unwrap();
349+
assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n']);
350+
writer.flush().await.unwrap();
351+
assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n', 2]);
352+
writer.write(&[3, b'\n']).await.unwrap();
353+
assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n', 2, 3, b'\n']);
354+
})
355+
}
140356
}

src/io/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ pub use std::io::{Error, ErrorKind, Result, SeekFrom};
2828

2929
pub use buf_read::{BufRead, Lines};
3030
pub use buf_reader::BufReader;
31-
pub use buf_writer::BufWriter;
31+
pub use buf_writer::{BufWriter, LineWriter};
3232
pub use copy::copy;
3333
pub use empty::{empty, Empty};
3434
pub use read::Read;

0 commit comments

Comments
 (0)