Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.

Replace tokio_core with tokio (ring -> 0.13) #9657

Merged
merged 2 commits into from
Oct 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,177 changes: 628 additions & 549 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 3 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@ serde = "1.0"
serde_json = "1.0"
serde_derive = "1.0"
futures = "0.1"
futures-cpupool = "0.1"
fdlimit = "0.1"
ctrlc = { git = "https://github.com/paritytech/rust-ctrlc.git" }
jsonrpc-core = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-1.11" }
jsonrpc-core = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-2.2" }
ethcore = { path = "ethcore", features = ["parity"] }
parity-bytes = "0.1"
ethcore-io = { path = "util/io" }
Expand All @@ -51,7 +50,7 @@ rpc-cli = { path = "rpc_cli" }
parity-hash-fetch = { path = "hash-fetch" }
parity-ipfs-api = { path = "ipfs" }
parity-local-store = { path = "local-store" }
parity-reactor = { path = "util/reactor" }
parity-runtime = { path = "util/runtime" }
parity-rpc = { path = "rpc" }
parity-rpc-client = { path = "rpc_client" }
parity-updater = { path = "updater" }
Expand Down Expand Up @@ -137,7 +136,4 @@ members = [
"util/keccak-hasher",
"util/patricia-trie-ethereum",
"util/fastmap",
]

[patch.crates-io]
ring = { git = "https://github.com/paritytech/ring" }
]
2 changes: 1 addition & 1 deletion ethcore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ hashdb = "0.3.0"
memorydb = "0.3.0"
patricia-trie = "0.3.0"
patricia-trie-ethereum = { path = "../util/patricia-trie-ethereum" }
parity-crypto = "0.1"
parity-crypto = "0.2"
error-chain = { version = "0.12", default-features = false }
ethcore-io = { path = "../util/io" }
ethcore-logger = { path = "../logger" }
Expand Down
2 changes: 1 addition & 1 deletion ethcore/private-tx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ ethabi-derive = "6.0"
ethabi-contract = "6.0"
ethcore = { path = ".." }
parity-bytes = "0.1"
parity-crypto = "0.1"
parity-crypto = "0.2"
ethcore-io = { path = "../../util/io" }
ethcore-logger = { path = "../../logger" }
ethcore-miner = { path = "../../miner" }
Expand Down
4 changes: 2 additions & 2 deletions ethcore/private-tx/src/encryptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ impl SecretStoreEncryptor {

// send HTTP request
let method = if use_post {
Method::Post
Method::POST
} else {
Method::Get
Method::GET
};

let url = Url::from_str(&url).map_err(|e| ErrorKind::Encrypt(e.to_string()))?;
Expand Down
8 changes: 4 additions & 4 deletions ethcore/stratum/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ authors = ["Parity Technologies <[email protected]>"]
[dependencies]
ethereum-types = "0.4"
keccak-hash = "0.1"
jsonrpc-core = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-1.11" }
jsonrpc-macros = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-1.11" }
jsonrpc-tcp-server = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-1.11" }
jsonrpc-core = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-2.2" }
jsonrpc-macros = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-2.2" }
jsonrpc-tcp-server = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-2.2" }
log = "0.4"
parking_lot = "0.6"

[dev-dependencies]
env_logger = "0.5"
tokio-core = "0.1"
tokio = "0.1"
tokio-io = "0.1"
ethcore-logger = { path = "../../logger" }
71 changes: 36 additions & 35 deletions ethcore/stratum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ extern crate parking_lot;

#[macro_use] extern crate log;

#[cfg(test)] extern crate tokio_core;
#[cfg(test)] extern crate tokio;
#[cfg(test)] extern crate tokio_io;
#[cfg(test)] extern crate ethcore_logger;

Expand Down Expand Up @@ -323,12 +323,10 @@ impl MetaExtractor<SocketMetadata> for PeerMetaExtractor {
#[cfg(test)]
mod tests {
use super::*;
use std::net::SocketAddr;
use std::net::{SocketAddr, Shutdown};
use std::sync::Arc;

use tokio_core::reactor::{Core, Timeout};
use tokio_core::net::TcpStream;
use tokio_io::io;
use tokio::{io, runtime::Runtime, timer::timeout::{self, Timeout}, net::TcpStream};
use jsonrpc_core::futures::{Future, future};

use ethcore_logger::init_log;
Expand All @@ -342,23 +340,23 @@ mod tests {
}

fn dummy_request(addr: &SocketAddr, data: &str) -> Vec<u8> {
let mut core = Core::new().expect("Tokio Core should be created with no errors");
let mut buffer = vec![0u8; 2048];
let mut runtime = Runtime::new().expect("Tokio Runtime should be created with no errors");

let mut data_vec = data.as_bytes().to_vec();
data_vec.extend(b"\n");

let stream = TcpStream::connect(addr, &core.handle())
.and_then(|stream| {
io::write_all(stream, &data_vec)
let stream = TcpStream::connect(addr)
.and_then(move |stream| {
io::write_all(stream, data_vec)
})
.and_then(|(stream, _)| {
io::read(stream, &mut buffer)
stream.shutdown(Shutdown::Write).unwrap();
io::read_to_end(stream, Vec::with_capacity(2048))
})
.and_then(|(_, read_buf, len)| {
future::ok(read_buf[0..len].to_vec())
.and_then(|(_stream, read_buf)| {
future::ok(read_buf)
});
let result = core.run(stream).expect("Core should run with no errors");
let result = runtime.block_on(stream).expect("Runtime should run with no errors");

result
}
Expand Down Expand Up @@ -417,7 +415,7 @@ mod tests {
}

#[test]
fn receives_initial_paylaod() {
fn receives_initial_payload() {
let addr = "127.0.0.1:19975".parse().unwrap();
let _stratum = Stratum::start(&addr, DummyManager::new(), None).expect("There should be no error starting stratum");
let request = r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": [], "id": 2}"#;
Expand Down Expand Up @@ -460,40 +458,43 @@ mod tests {
.to_vec();
auth_request.extend(b"\n");

let mut core = Core::new().expect("Tokio Core should be created with no errors");
let timeout1 = Timeout::new(::std::time::Duration::from_millis(100), &core.handle())
.expect("There should be a timeout produced in message test");
let timeout2 = Timeout::new(::std::time::Duration::from_millis(100), &core.handle())
.expect("There should be a timeout produced in message test");
let mut buffer = vec![0u8; 2048];
let mut buffer2 = vec![0u8; 2048];
let stream = TcpStream::connect(&addr, &core.handle())
.and_then(|stream| {
io::write_all(stream, &auth_request)
let auth_response = "{\"jsonrpc\":\"2.0\",\"result\":true,\"id\":1}\n";

let mut runtime = Runtime::new().expect("Tokio Runtime should be created with no errors");
let read_buf0 = vec![0u8; auth_response.len()];
let read_buf1 = Vec::with_capacity(2048);
let stream = TcpStream::connect(&addr)
.and_then(move |stream| {
io::write_all(stream, auth_request)
})
.and_then(|(stream, _)| {
io::read(stream, &mut buffer)
io::read_exact(stream, read_buf0)
})
.and_then(|(stream, _, _)| {
.map_err(|err| panic!("{:?}", err))
.and_then(move |(stream, read_buf0)| {
assert_eq!(String::from_utf8(read_buf0).unwrap(), auth_response);
trace!(target: "stratum", "Received authorization confirmation");
timeout1.join(future::ok(stream))
Timeout::new(future::ok(stream), ::std::time::Duration::from_millis(100))
})
.and_then(|(_, stream)| {
.map_err(|err: timeout::Error<()>| panic!("Timeout: {:?}", err))
.and_then(move |stream| {
trace!(target: "stratum", "Pusing work to peers");
stratum.push_work_all(r#"{ "00040008", "100500" }"#.to_owned())
.expect("Pushing work should produce no errors");
timeout2.join(future::ok(stream))
Timeout::new(future::ok(stream), ::std::time::Duration::from_millis(100))
})
.and_then(|(_, stream)| {
.map_err(|err: timeout::Error<()>| panic!("Timeout: {:?}", err))
.and_then(|stream| {
trace!(target: "stratum", "Ready to read work from server");
io::read(stream, &mut buffer2)
stream.shutdown(Shutdown::Write).unwrap();
io::read_to_end(stream, read_buf1)
})
.and_then(|(_, read_buf, len)| {
.and_then(|(_, read_buf1)| {
trace!(target: "stratum", "Received work from server");
future::ok(read_buf[0..len].to_vec())
future::ok(read_buf1)
});
let response = String::from_utf8(
core.run(stream).expect("Core should run with no errors")
runtime.block_on(stream).expect("Runtime should run with no errors")
).expect("Response should be utf-8");

assert_eq!(
Expand Down
2 changes: 1 addition & 1 deletion ethkey/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ authors = ["Parity Technologies <[email protected]>"]
[dependencies]
byteorder = "1.0"
edit-distance = "2.0"
parity-crypto = "0.1"
parity-crypto = "0.2"
eth-secp256k1 = { git = "https://github.com/paritytech/rust-secp256k1" }
ethereum-types = "0.4"
lazy_static = "1.0"
Expand Down
2 changes: 1 addition & 1 deletion ethstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ tiny-keccak = "1.4"
time = "0.1.34"
itertools = "0.5"
parking_lot = "0.6"
parity-crypto = "0.1"
parity-crypto = "0.2"
ethereum-types = "0.4"
dir = { path = "../util/dir" }
smallvec = "0.6"
Expand Down
4 changes: 1 addition & 3 deletions hash-fetch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ authors = ["Parity Technologies <[email protected]>"]

[dependencies]
futures = "0.1"
futures-cpupool = "0.1"
log = "0.4"
mime = "0.3"
mime_guess = "2.0.0-alpha.2"
Expand All @@ -17,7 +16,7 @@ rustc-hex = "1.0"
fetch = { path = "../util/fetch" }
parity-bytes = "0.1"
ethereum-types = "0.4"
parity-reactor = { path = "../util/reactor" }
parity-runtime = { path = "../util/runtime" }
keccak-hash = "0.1"
registrar = { path = "../registrar" }

Expand All @@ -26,6 +25,5 @@ ethabi-derive = "6.0"
ethabi-contract = "6.0"

[dev-dependencies]
hyper = "0.11"
parking_lot = "0.6"
fake-fetch = { path = "../util/fake-fetch" }
29 changes: 12 additions & 17 deletions hash-fetch/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ use std::path::PathBuf;

use hash::keccak_buffer;
use fetch::{self, Fetch};
use futures_cpupool::CpuPool;
use futures::{Future, IntoFuture};
use parity_reactor::Remote;
use parity_runtime::Executor;
use urlhint::{URLHintContract, URLHint, URLHintResult};
use registrar::{RegistrarClient, Asynchronous};
use ethereum_types::H256;
Expand Down Expand Up @@ -109,21 +108,19 @@ fn validate_hash(path: PathBuf, hash: H256, body: fetch::BodyReader) -> Result<P

/// Default Hash-fetching client using on-chain contract to resolve hashes to URLs.
pub struct Client<F: Fetch + 'static = fetch::Client> {
pool: CpuPool,
contract: URLHintContract,
fetch: F,
remote: Remote,
executor: Executor,
random_path: Arc<Fn() -> PathBuf + Sync + Send>,
}

impl<F: Fetch + 'static> Client<F> {
/// Creates new instance of the `Client` given on-chain contract client, fetch service and task runner.
pub fn with_fetch(contract: Arc<RegistrarClient<Call=Asynchronous>>, pool: CpuPool, fetch: F, remote: Remote) -> Self {
pub fn with_fetch(contract: Arc<RegistrarClient<Call=Asynchronous>>, fetch: F, executor: Executor) -> Self {
Client {
pool,
contract: URLHintContract::new(contract),
fetch: fetch,
remote: remote,
executor: executor,
random_path: Arc::new(random_temp_path),
}
}
Expand All @@ -135,7 +132,6 @@ impl<F: Fetch + 'static> HashFetch for Client<F> {

let random_path = self.random_path.clone();
let remote_fetch = self.fetch.clone();
let pool = self.pool.clone();
let future = self.contract.resolve(hash)
.map_err(|e| { warn!("Error resolving URL: {}", e); Error::NoResolution })
.and_then(|maybe_url| maybe_url.ok_or(Error::NoResolution))
Expand All @@ -162,7 +158,7 @@ impl<F: Fetch + 'static> HashFetch for Client<F> {
Ok(response)
}
})
.and_then(move |response| pool.spawn_fn(move || {
.and_then(move |response| {
debug!(target: "fetch", "Content fetched, validating hash ({:?})", hash);
let path = random_path();
let res = validate_hash(path.clone(), hash, fetch::BodyReader::new(response));
Expand All @@ -172,10 +168,10 @@ impl<F: Fetch + 'static> HashFetch for Client<F> {
let _ = fs::remove_file(&path);
}
res
}))
})
.then(move |res| { on_done(res); Ok(()) as Result<(), ()> });

self.remote.spawn(future);
self.executor.spawn(future);
}
}

Expand All @@ -197,8 +193,7 @@ mod tests {
use rustc_hex::FromHex;
use std::sync::{Arc, mpsc};
use parking_lot::Mutex;
use futures_cpupool::CpuPool;
use parity_reactor::Remote;
use parity_runtime::Executor;
use urlhint::tests::{FakeRegistrar, URLHINT};
use super::{Error, Client, HashFetch, random_temp_path};

Expand All @@ -216,7 +211,7 @@ mod tests {
// given
let contract = Arc::new(FakeRegistrar::new());
let fetch = FakeFetch::new(None::<usize>);
let client = Client::with_fetch(contract.clone(), CpuPool::new(1), fetch, Remote::new_sync());
let client = Client::with_fetch(contract.clone(), fetch, Executor::new_sync());

// when
let (tx, rx) = mpsc::channel();
Expand All @@ -234,7 +229,7 @@ mod tests {
// given
let registrar = Arc::new(registrar());
let fetch = FakeFetch::new(None::<usize>);
let client = Client::with_fetch(registrar.clone(), CpuPool::new(1), fetch, Remote::new_sync());
let client = Client::with_fetch(registrar.clone(), fetch, Executor::new_sync());

// when
let (tx, rx) = mpsc::channel();
Expand All @@ -252,7 +247,7 @@ mod tests {
// given
let registrar = Arc::new(registrar());
let fetch = FakeFetch::new(Some(1));
let mut client = Client::with_fetch(registrar.clone(), CpuPool::new(1), fetch, Remote::new_sync());
let mut client = Client::with_fetch(registrar.clone(), fetch, Executor::new_sync());
let path = random_temp_path();
let path2 = path.clone();
client.random_path = Arc::new(move || path2.clone());
Expand All @@ -275,7 +270,7 @@ mod tests {
// given
let registrar = Arc::new(registrar());
let fetch = FakeFetch::new(Some(1));
let client = Client::with_fetch(registrar.clone(), CpuPool::new(1), fetch, Remote::new_sync());
let client = Client::with_fetch(registrar.clone(), fetch, Executor::new_sync());

// when
let (tx, rx) = mpsc::channel();
Expand Down
5 changes: 1 addition & 4 deletions hash-fetch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ extern crate ethabi;
extern crate parity_bytes as bytes;
extern crate ethereum_types;
extern crate futures;
extern crate futures_cpupool;
extern crate keccak_hash as hash;
extern crate mime;
extern crate mime_guess;
extern crate parity_reactor;
extern crate parity_runtime;
extern crate rand;
extern crate rustc_hex;
extern crate registrar;
Expand All @@ -43,8 +42,6 @@ extern crate ethabi_contract;
#[cfg(test)]
extern crate parking_lot;
#[cfg(test)]
extern crate hyper;
#[cfg(test)]
extern crate fake_fetch;

mod client;
Expand Down
Loading