Skip to content

Commit 77a0b50

Browse files
committed
fixed read-parse-print slowdowns in clients
1 parent a305f75 commit 77a0b50

File tree

6 files changed

+97
-32
lines changed

6 files changed

+97
-32
lines changed

Cargo.lock

+37
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,4 @@ features = ["net", "rt", "io-std", "io-util", "rt-multi-thread"]
2626

2727
[dependencies.async-std]
2828
version = "1.10.0"
29+
features = ["unstable"]

README.md

+10-2
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ Threaded server will use an amortized counter by default. To
5252
use an atomic counter, use the `--alt` argument. They are
5353
roughly the same speed.
5454

55+
## Notes
56+
5557
On my modern Linux box I need to run these things as root to
5658
get decent performance. I also occasionally need to
5759

@@ -60,8 +62,6 @@ get decent performance. I also occasionally need to
6062
to get SYN cookies turned off on localhost. (Sigh. Working
6163
on reporting this.) Don't forget to undo this when you're done!
6264

63-
## Notes
64-
6565
Various optimizations that have been tried:
6666

6767
* Make the seq server use an amortized counter construction
@@ -73,6 +73,14 @@ Various optimizations that have been tried:
7373
* Make the async server use amortized counter: roughly 3×
7474
slowdown, ditched.
7575

76+
* Replace the read-parse-print stupidity in the clients with
77+
simply writing the received bytes. Some speedup, kept.
78+
79+
* Use `tokio::io::stdout` in the async client. Sadly, since
80+
this is not locked it's not guaranteed to work reliably
81+
100% of the time. Replaced with `spawn_blocking` writes to
82+
`std::io::stdout` for about a 20% slowdown, but correctness.
83+
7684
## Acknowledgements
7785

7886
Thanks to Josh Triplett for the scoped-threads server and

src/ccasync.rs

+27-18
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,38 @@
1+
use std::io::Write;
2+
13
pub mod rt_async_std {
2-
pub use async_std::io::{self, ReadExt, WriteExt};
3-
pub use async_std::net::TcpStream;
4-
pub use async_std::task;
4+
5+
use super::*;
6+
7+
use async_std::io::ReadExt;
8+
use async_std::net::TcpStream;
9+
use async_std::task;
510

611
async fn drop_handle(h: task::JoinHandle<()>) {
712
h.await;
813
}
914

10-
super::ccasync!();
15+
ccasync!();
1116

1217
pub fn start(n: usize, m: usize) {
1318
task::block_on(send(n, m));
1419
}
1520
}
1621

1722
pub mod rt_tokio {
18-
pub use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
19-
pub use tokio::net::TcpStream;
20-
pub use tokio::runtime::Runtime;
21-
pub use tokio::task;
23+
24+
use super::*;
25+
26+
use tokio::io::AsyncReadExt;
27+
use tokio::net::TcpStream;
28+
use tokio::runtime::Runtime;
29+
use tokio::task;
2230

2331
async fn drop_handle(h: task::JoinHandle<()>) {
2432
h.await.unwrap();
2533
}
2634

27-
super::ccasync!();
35+
ccasync!();
2836

2937
pub fn start(n: usize, m: usize) {
3038
let rt = Runtime::new().unwrap();
@@ -34,23 +42,24 @@ pub mod rt_tokio {
3442

3543
macro_rules! ccasync {
3644
() => {
37-
async fn get_count() -> u64 {
45+
async fn get_count() -> Vec<u8> {
3846
let mut stream = TcpStream::connect("127.0.0.1:10123").await.unwrap();
39-
let mut buf = String::with_capacity(22);
40-
stream.read_to_string(&mut buf).await.unwrap();
47+
let mut buf = Vec::with_capacity(22);
48+
stream.read_to_end(&mut buf).await.unwrap();
4149
drop(stream);
42-
buf.trim_end().parse().unwrap()
50+
buf
4351
}
4452

4553
async fn send(n: usize, m: usize) {
4654
let mut handles = Vec::with_capacity(m);
4755
for _ in 0..n {
4856
let h = task::spawn(async {
49-
let c = get_count().await;
50-
io::stdout()
51-
.write_all(format!("{}\n", c).as_bytes())
52-
.await
53-
.unwrap();
57+
let mut buf = get_count().await;
58+
let nbuf = buf.len();
59+
buf[nbuf - 2] = b'\n';
60+
task::spawn_blocking(move || {
61+
std::io::stdout().write_all(&buf[..nbuf-1]).unwrap();
62+
});
5463
});
5564
handles.push(h);
5665
if handles.len() >= m {

src/ccseq.rs

+8-5
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
1-
use std::io::Read;
1+
use std::io::{Read, Write, stdout};
22
use std::net;
33

44
pub fn send(n: usize) {
5-
let mut buf = String::with_capacity(22);
5+
let stdout = stdout();
6+
let mut stdout = stdout.lock();
7+
let mut buf = vec![0u8; 22];
68
for _ in 0..n {
79
let mut stream = net::TcpStream::connect("127.0.0.1:10123").unwrap();
810
buf.clear();
9-
stream.read_to_string(&mut buf).unwrap();
10-
let count: u64 = buf.trim_end().parse().unwrap();
11-
println!("{}", count);
11+
stream.read_to_end(&mut buf).unwrap();
12+
let nbuf = buf.len();
13+
buf[nbuf - 2] = b'\n';
14+
stdout.write_all(&buf[..nbuf-1]).unwrap();
1215
}
1316
}

src/ccthread.rs

+14-7
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,20 @@
11
use std::collections::VecDeque;
2-
use std::io::Read;
2+
use std::io::{Read, Write, stdout};
33
use std::net;
44
use std::thread;
55

6-
fn get_count() -> u64 {
6+
fn get_count() -> Vec<u8> {
77
let mut stream = net::TcpStream::connect("127.0.0.1:10123").unwrap();
8-
let mut buf = String::with_capacity(22);
9-
stream.read_to_string(&mut buf).unwrap();
10-
buf.trim_end().parse().unwrap()
8+
let mut buf = vec![0u8; 22];
9+
stream.read_to_end(&mut buf).unwrap();
10+
buf
11+
}
12+
13+
fn print_count(h: thread::JoinHandle<Vec<u8>>) {
14+
let mut buf = h.join().unwrap();
15+
let nbuf = buf.len();
16+
buf[nbuf - 2] = b'\n';
17+
stdout().write_all(&buf[..nbuf-1]).unwrap();
1118
}
1219

1320
pub fn send(n: usize, m: usize) {
@@ -16,11 +23,11 @@ pub fn send(n: usize, m: usize) {
1623
handles.push_front(thread::spawn(get_count));
1724
if handles.len() > m {
1825
for h in handles.drain(m..) {
19-
println!("{:?}", h.join().unwrap());
26+
print_count(h);
2027
}
2128
}
2229
}
2330
for h in handles.into_iter() {
24-
println!("{:?}", h.join().unwrap());
31+
print_count(h);
2532
}
2633
}

0 commit comments

Comments
 (0)