Skip to content

Commit 8c9c562

Browse files
authored
Update/refactor to latest tokio_io (#122)
* failing tests for tcp and ipc * encoder/decoder * fix remaining * also tcp * fix warnings * review fixes * fix warnings
1 parent fd2afe3 commit 8c9c562

File tree

10 files changed

+139
-47
lines changed

10 files changed

+139
-47
lines changed

ipc/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ tokio-service = "0.1"
1414
jsonrpc-core = { version = "7.0", path = "../core" }
1515
jsonrpc-server-utils = { version = "7.0", path = "../server-utils" }
1616
parity-tokio-ipc = { git = "https://github.com/nikvolf/parity-tokio-ipc" }
17+
bytes = "0.4"
1718

1819
[dev-dependencies]
1920
env_logger = "0.4"

ipc/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ extern crate jsonrpc_core as jsonrpc;
66
extern crate jsonrpc_server_utils as server_utils;
77
extern crate parity_tokio_ipc;
88
extern crate tokio_service;
9+
extern crate bytes;
910

1011
#[macro_use] extern crate log;
1112

ipc/src/logger.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#![allow(dead_code)]
2+
13
use std::env;
24
use log::LogLevelFilter;
35
use env_logger::LogBuilder;

ipc/src/server.rs

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@ use jsonrpc::futures::{future, Future, Stream, Sink};
66
use jsonrpc::futures::sync::oneshot;
77
use jsonrpc::{Metadata, MetaIoHandler, Middleware, NoopMiddleware};
88
use jsonrpc::futures::BoxFuture;
9-
use server_utils::tokio_core::io::Io;
9+
1010
use server_utils::tokio_core::reactor::Remote;
11+
use server_utils::tokio_io::AsyncRead;
1112
use server_utils::reactor;
1213

1314
use meta::{MetaExtractor, NoopExtractor, RequestContext};
@@ -95,12 +96,12 @@ impl<M: Metadata, S: Middleware<M>> ServerBuilder<M, S> {
9596
let listener = match Endpoint::new(endpoint_addr, handle) {
9697
Ok(l) => l,
9798
Err(e) => {
98-
start_signal.complete(Err(e));
99+
start_signal.send(Err(e)).expect("Cannot fail since receiver never dropped before receiving");
99100
return future::ok(()).boxed();
100101
}
101102
};
102103

103-
start_signal.complete(Ok(()));
104+
start_signal.send(Ok(())).expect("Cannot fail since receiver never dropped before receiving");
104105
let remote = handle.remote().clone();
105106
let connections = listener.incoming();
106107

@@ -128,7 +129,7 @@ impl<M: Metadata, S: Middleware<M>> ServerBuilder<M, S> {
128129
.filter_map(|x| x);
129130

130131

131-
let writer = writer.send_all(responses).then(move |_| {
132+
let writer = writer.send_all(responses).then(|_| {
132133
trace!(target: "ipc", "Peer: service finished");
133134
Ok(())
134135
});
@@ -160,7 +161,7 @@ pub struct Server {
160161
impl Server {
161162
/// Closes the server (waits for finish)
162163
pub fn close(mut self) {
163-
self.stop.take().unwrap().complete(());
164+
self.stop.take().map(|stop| stop.send(()));
164165
self.remote.take().unwrap().close();
165166
self.clear_file();
166167
}
@@ -172,13 +173,13 @@ impl Server {
172173

173174
/// Remove socket file
174175
fn clear_file(&self) {
175-
::std::fs::remove_file(&self.path).unwrap_or_else(|_| {}); // ignore error, file could have been gone somewhere
176+
let _ = ::std::fs::remove_file(&self.path); // ignore error, file could have been gone somewhere
176177
}
177178
}
178179

179180
impl Drop for Server {
180181
fn drop(&mut self) {
181-
self.stop.take().map(|stop| stop.complete(()));
182+
let _ = self.stop.take().map(|stop| stop.send(()));
182183
self.remote.take().map(|remote| remote.close());
183184
self.clear_file();
184185
}
@@ -195,7 +196,7 @@ mod tests {
195196
use jsonrpc::futures::{Future, future};
196197
use self::tokio_uds::UnixStream;
197198
use server_utils::tokio_core::reactor::Core;
198-
use server_utils::tokio_core::io;
199+
use server_utils::tokio_io::io;
199200

200201
fn server_builder() -> ServerBuilder {
201202
let mut io = MetaIoHandler::<()>::default();
@@ -275,6 +276,42 @@ mod tests {
275276
);
276277
}
277278

279+
#[test]
280+
fn req_parallel() {
281+
use std::thread;
282+
283+
::logger::init_log();
284+
let path = "/tmp/test-ipc-45000";
285+
let _server = run(path);
286+
287+
let mut handles = Vec::new();
288+
for _ in 0..4 {
289+
let path = path.clone();
290+
handles.push(
291+
thread::spawn(move || {
292+
for _ in 0..100 {
293+
let result = dummy_request_str(
294+
&path,
295+
b"{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}\n",
296+
);
297+
298+
assert_eq!(
299+
result,
300+
"{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}\n",
301+
"Response does not exactly much the expected response",
302+
);
303+
304+
::std::thread::sleep(::std::time::Duration::from_millis(10));
305+
}
306+
})
307+
);
308+
}
309+
310+
for handle in handles.drain(..) {
311+
handle.join().unwrap();
312+
}
313+
}
314+
278315
#[test]
279316
fn close() {
280317
::logger::init_log();

ipc/src/stream_codec.rs

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::io;
2-
use server_utils::tokio_core::io::{Codec, EasyBuf};
2+
use server_utils::tokio_io::codec::{Decoder, Encoder};
3+
use bytes::{BytesMut, BufMut};
34

45
pub struct StreamCodec;
56

@@ -10,11 +11,11 @@ fn is_whitespace(byte: u8) -> bool {
1011
}
1112
}
1213

13-
impl Codec for StreamCodec {
14-
type In = String;
15-
type Out = String;
14+
impl Decoder for StreamCodec {
15+
type Item = String;
16+
type Error = io::Error;
1617

17-
fn decode(&mut self, buf: &mut EasyBuf) -> io::Result<Option<Self::In>> {
18+
fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Item>> {
1819
let mut depth = 0;
1920
let mut in_str = false;
2021
let mut is_escaped = false;
@@ -44,7 +45,7 @@ impl Codec for StreamCodec {
4445
}
4546

4647
if depth == 0 && idx != start_idx && idx - start_idx + 1 > whitespaces {
47-
let bts = buf.drain_to(idx + 1);
48+
let bts = buf.split_to(idx + 1);
4849
match String::from_utf8(bts.as_ref().to_vec()) {
4950
Ok(val) => { return Ok(Some(val)) },
5051
Err(_) => { return Ok(None); } // skip non-utf requests (TODO: log error?)
@@ -54,10 +55,15 @@ impl Codec for StreamCodec {
5455

5556
Ok(None)
5657
}
58+
}
5759

58-
fn encode(&mut self, msg: String, buf: &mut Vec<u8>) -> io::Result<()> {
59-
buf.extend_from_slice(msg.as_bytes());
60-
buf.push(b'\n'); // for #4750
60+
impl Encoder for StreamCodec {
61+
type Item = String;
62+
type Error = io::Error;
63+
64+
fn encode(&mut self, msg: String, buf: &mut BytesMut) -> io::Result<()> {
65+
buf.put_slice(msg.as_bytes());
66+
buf.put(b'\n'); // for #4750
6167
Ok(())
6268
}
6369
}
@@ -66,12 +72,13 @@ impl Codec for StreamCodec {
6672
mod tests {
6773

6874
use super::StreamCodec;
69-
use server_utils::tokio_core::io::{Codec, EasyBuf};
75+
use server_utils::tokio_io::codec::Decoder;
76+
use bytes::{BytesMut, BufMut};
7077

7178
#[test]
7279
fn simple_encode() {
73-
let mut buf = EasyBuf::new();
74-
buf.get_mut().extend_from_slice(b"{ test: 1 }{ test: 2 }{ test: 3 }");
80+
let mut buf = BytesMut::with_capacity(2048);
81+
buf.put_slice(b"{ test: 1 }{ test: 2 }{ test: 3 }");
7582

7683
let mut codec = StreamCodec;
7784

@@ -84,8 +91,8 @@ mod tests {
8491

8592
#[test]
8693
fn whitespace() {
87-
let mut buf = EasyBuf::new();
88-
buf.get_mut().extend_from_slice(b"{ test: 1 }\n\n\n\n{ test: 2 }\n\r{\n test: 3 } ");
94+
let mut buf = BytesMut::with_capacity(2048);
95+
buf.put_slice(b"{ test: 1 }\n\n\n\n{ test: 2 }\n\r{\n test: 3 } ");
8996

9097
let mut codec = StreamCodec;
9198

@@ -113,8 +120,8 @@ mod tests {
113120

114121
#[test]
115122
fn fragmented_encode() {
116-
let mut buf = EasyBuf::new();
117-
buf.get_mut().extend_from_slice(b"{ test: 1 }{ test: 2 }{ tes");
123+
let mut buf = BytesMut::with_capacity(2048);
124+
buf.put_slice(b"{ test: 1 }{ test: 2 }{ tes");
118125

119126
let mut codec = StreamCodec;
120127
let request = codec.decode(&mut buf)
@@ -126,7 +133,7 @@ mod tests {
126133
.expect("There should be at least one request in second fragmented test");
127134
assert_eq!(String::from_utf8(buf.as_ref().to_vec()).unwrap(), "{ tes");
128135

129-
buf.get_mut().extend_from_slice(b"t: 3 }");
136+
buf.put_slice(b"t: 3 }");
130137
let request = codec.decode(&mut buf)
131138
.expect("There should be no error in third fragmented test")
132139
.expect("There should be at least one request in third fragmented test");
@@ -150,8 +157,8 @@ mod tests {
150157
]
151158
}"#;
152159

153-
let mut buf = EasyBuf::new();
154-
buf.get_mut().extend_from_slice(request.as_bytes());
160+
let mut buf = BytesMut::with_capacity(65536);
161+
buf.put_slice(request.as_bytes());
155162

156163
let mut codec = StreamCodec;
157164
let parsed_request = codec.decode(&mut buf)

tcp/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ log = "0.3"
1313
parking_lot = "0.4"
1414
serde_json = "0.9"
1515
tokio-service = "0.1"
16-
1716
jsonrpc-core = { version = "7.0", path = "../core" }
1817
jsonrpc-server-utils = { version = "7.0", path = "../server-utils" }
18+
bytes = "0.4"
1919

2020
[dev-dependencies]
2121
lazy_static = "0.2"

tcp/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ extern crate jsonrpc_server_utils as server_utils;
2727
extern crate parking_lot;
2828
extern crate serde_json;
2929
extern crate tokio_service;
30+
extern crate bytes;
3031

3132
#[macro_use] extern crate log;
3233

tcp/src/line_codec.rs

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
use std::{io, str};
2-
use server_utils::tokio_core::io::{Codec, EasyBuf};
2+
use server_utils::tokio_io::codec::{Decoder, Encoder};
3+
use bytes::{BytesMut, BufMut};
34

45
pub struct LineCodec;
56

6-
impl Codec for LineCodec {
7-
type In = String;
8-
type Out = String;
7+
impl Decoder for LineCodec {
8+
type Item = String;
9+
type Error = io::Error;
910

10-
fn decode(&mut self, buf: &mut EasyBuf) -> io::Result<Option<Self::In>> {
11-
if let Some(i) = buf.as_slice().iter().position(|&b| b == b'\n') {
12-
let line = buf.drain_to(i);
13-
buf.drain_to(1);
11+
fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Item>> {
12+
if let Some(i) = buf.as_ref().iter().position(|&b| b == b'\n') {
13+
let line = buf.split_to(i);
14+
buf.split_to(1);
1415

1516
match str::from_utf8(&line.as_ref()) {
1617
Ok(s) => Ok(Some(s.to_string())),
@@ -20,10 +21,15 @@ impl Codec for LineCodec {
2021
Ok(None)
2122
}
2223
}
24+
}
25+
26+
impl Encoder for LineCodec {
27+
type Item = String;
28+
type Error = io::Error;
2329

24-
fn encode(&mut self, msg: String, buf: &mut Vec<u8>) -> io::Result<()> {
25-
buf.extend_from_slice(msg.as_bytes());
26-
buf.push(b'\n');
30+
fn encode(&mut self, msg: String, buf: &mut BytesMut) -> io::Result<()> {
31+
buf.put_slice(msg.as_bytes());
32+
buf.put(b'\n');
2733
Ok(())
2834
}
2935
}
@@ -32,12 +38,14 @@ impl Codec for LineCodec {
3238
mod tests {
3339

3440
use super::LineCodec;
35-
use server_utils::tokio_core::io::{Codec, EasyBuf};
41+
42+
use server_utils::tokio_io::codec::Decoder;
43+
use bytes::{BytesMut, BufMut};
3644

3745
#[test]
3846
fn simple_encode() {
39-
let mut buf = EasyBuf::new();
40-
buf.get_mut().extend_from_slice(b"{ test: 1 }\n{ test: 2 }\n{ test: 3 }");
47+
let mut buf = BytesMut::with_capacity(2048);
48+
buf.put_slice(b"{ test: 1 }\n{ test: 2 }\n{ test: 3 }");
4149

4250
let mut codec = LineCodec;
4351

tcp/src/server.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use jsonrpc::{MetaIoHandler, Metadata, Middleware, NoopMiddleware};
88
use jsonrpc::futures::{future, Future, Stream, Sink};
99
use jsonrpc::futures::sync::{mpsc, oneshot};
1010
use server_utils::{reactor, tokio_core};
11-
use server_utils::tokio_core::io::Io;
11+
use server_utils::tokio_io::AsyncRead;
1212

1313
use dispatch::{Dispatcher, SenderChannels, PeerMessageQueue};
1414
use line_codec::LineCodec;
@@ -164,19 +164,19 @@ pub struct Server {
164164
impl Server {
165165
/// Closes the server (waits for finish)
166166
pub fn close(mut self) {
167-
self.stop.take().unwrap().complete(());
167+
let _ = self.stop.take().map(|sg| sg.send(()));
168168
self.remote.take().unwrap().close();
169169
}
170170

171171
/// Wait for the server to finish
172172
pub fn wait(mut self) {
173-
self.remote.take().unwrap().wait();
173+
self.remote.take().unwrap().close();
174174
}
175175
}
176176

177177
impl Drop for Server {
178178
fn drop(&mut self) {
179-
self.stop.take().map(|stop| stop.complete(()));
179+
let _ = self.stop.take().map(|sg| sg.send(()));
180180
self.remote.take().map(|remote| remote.close());
181181
}
182182
}

0 commit comments

Comments
 (0)