Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.

Commit 066c74f

Browse files
committed
Consolidate Tokio Runtime use, remove CpuPool.
* Rename and move the `tokio_reactor` crate (`util/reactor`) to `tokio_runtime` (`util/runtime`). * Rename `EventLoop` to `Runtime`. - Rename `EventLoop::spawn` to `Runtime::with_default_thread_count`. - Add the `Runtime::with_thread_count` method. - Rename `Remote` to `Executor`. * Remove uses of `CpuPool` and spawn all tasks via the `Runtime` executor instead. * Other changes related to `CpuPool` removal: - Remove `Reservations::with_pool`. `::new` now takes an `Executor` as an argument. - Remove `SenderReservations::with_pool`. `::new` now takes an `Executor` as an argument.
1 parent 4d17d65 commit 066c74f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+440
-466
lines changed

Cargo.lock

Lines changed: 147 additions & 151 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,9 @@ serde = "1.0"
2828
serde_json = "1.0"
2929
serde_derive = "1.0"
3030
futures = "0.1"
31-
futures-cpupool = "0.1"
3231
fdlimit = "0.1"
3332
ctrlc = { git = "https://github.com/paritytech/rust-ctrlc.git" }
34-
jsonrpc-core = { git = "https://github.com/c0gent/jsonrpc.git", branch = "c0gent-hyper" }
33+
jsonrpc-core = { git = "https://github.com/paritytech/jsonrpc.git", rev = "7a59776" }
3534
ethcore = { path = "ethcore", features = ["parity"] }
3635
parity-bytes = "0.1"
3736
ethcore-io = { path = "util/io" }
@@ -51,7 +50,7 @@ rpc-cli = { path = "rpc_cli" }
5150
parity-hash-fetch = { path = "hash-fetch" }
5251
parity-ipfs-api = { path = "ipfs" }
5352
parity-local-store = { path = "local-store" }
54-
parity-reactor = { path = "util/reactor" }
53+
parity-runtime = { path = "util/runtime" }
5554
parity-rpc = { path = "rpc" }
5655
parity-rpc-client = { path = "rpc_client" }
5756
parity-updater = { path = "updater" }

ethcore/stratum/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ authors = ["Parity Technologies <[email protected]>"]
88
[dependencies]
99
ethereum-types = "0.4"
1010
keccak-hash = "0.1"
11-
jsonrpc-core = { git = "https://github.com/c0gent/jsonrpc.git", branch = "c0gent-hyper" }
12-
jsonrpc-macros = { git = "https://github.com/c0gent/jsonrpc.git", branch = "c0gent-hyper" }
13-
jsonrpc-tcp-server = { git = "https://github.com/c0gent/jsonrpc.git", branch = "c0gent-hyper" }
11+
jsonrpc-core = { git = "https://github.com/paritytech/jsonrpc.git", rev = "7a59776" }
12+
jsonrpc-macros = { git = "https://github.com/paritytech/jsonrpc.git", rev = "7a59776" }
13+
jsonrpc-tcp-server = { git = "https://github.com/paritytech/jsonrpc.git", rev = "7a59776" }
1414
log = "0.4"
1515
parking_lot = "0.6"
1616

hash-fetch/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ authors = ["Parity Technologies <[email protected]>"]
88

99
[dependencies]
1010
futures = "0.1"
11-
futures-cpupool = "0.1"
1211
log = "0.4"
1312
mime = "0.3"
1413
mime_guess = "2.0.0-alpha.2"
@@ -17,7 +16,7 @@ rustc-hex = "1.0"
1716
fetch = { path = "../util/fetch" }
1817
parity-bytes = "0.1"
1918
ethereum-types = "0.4"
20-
parity-reactor = { path = "../util/reactor" }
19+
parity-runtime = { path = "../util/runtime" }
2120
keccak-hash = "0.1"
2221
registrar = { path = "../registrar" }
2322

hash-fetch/src/client.rs

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,8 @@ use std::path::PathBuf;
2323

2424
use hash::keccak_buffer;
2525
use fetch::{self, Fetch};
26-
use futures_cpupool::CpuPool;
2726
use futures::{Future, IntoFuture};
28-
use parity_reactor::Remote;
27+
use parity_runtime::Executor;
2928
use urlhint::{URLHintContract, URLHint, URLHintResult};
3029
use registrar::{RegistrarClient, Asynchronous};
3130
use ethereum_types::H256;
@@ -109,21 +108,19 @@ fn validate_hash(path: PathBuf, hash: H256, body: fetch::BodyReader) -> Result<P
109108

110109
/// Default Hash-fetching client using on-chain contract to resolve hashes to URLs.
111110
pub struct Client<F: Fetch + 'static = fetch::Client> {
112-
pool: CpuPool,
113111
contract: URLHintContract,
114112
fetch: F,
115-
remote: Remote,
113+
executor: Executor,
116114
random_path: Arc<Fn() -> PathBuf + Sync + Send>,
117115
}
118116

119117
impl<F: Fetch + 'static> Client<F> {
120118
/// Creates new instance of the `Client` given on-chain contract client, fetch service and task runner.
121-
pub fn with_fetch(contract: Arc<RegistrarClient<Call=Asynchronous>>, pool: CpuPool, fetch: F, remote: Remote) -> Self {
119+
pub fn with_fetch(contract: Arc<RegistrarClient<Call=Asynchronous>>, fetch: F, executor: Executor) -> Self {
122120
Client {
123-
pool,
124121
contract: URLHintContract::new(contract),
125122
fetch: fetch,
126-
remote: remote,
123+
executor: executor,
127124
random_path: Arc::new(random_temp_path),
128125
}
129126
}
@@ -135,7 +132,6 @@ impl<F: Fetch + 'static> HashFetch for Client<F> {
135132

136133
let random_path = self.random_path.clone();
137134
let remote_fetch = self.fetch.clone();
138-
let pool = self.pool.clone();
139135
let future = self.contract.resolve(hash)
140136
.map_err(|e| { warn!("Error resolving URL: {}", e); Error::NoResolution })
141137
.and_then(|maybe_url| maybe_url.ok_or(Error::NoResolution))
@@ -162,7 +158,7 @@ impl<F: Fetch + 'static> HashFetch for Client<F> {
162158
Ok(response)
163159
}
164160
})
165-
.and_then(move |response| pool.spawn_fn(move || {
161+
.and_then(move |response| {
166162
debug!(target: "fetch", "Content fetched, validating hash ({:?})", hash);
167163
let path = random_path();
168164
let res = validate_hash(path.clone(), hash, fetch::BodyReader::new(response));
@@ -172,10 +168,10 @@ impl<F: Fetch + 'static> HashFetch for Client<F> {
172168
let _ = fs::remove_file(&path);
173169
}
174170
res
175-
}))
171+
})
176172
.then(move |res| { on_done(res); Ok(()) as Result<(), ()> });
177173

178-
self.remote.spawn(future);
174+
self.executor.spawn(future);
179175
}
180176
}
181177

@@ -197,8 +193,7 @@ mod tests {
197193
use rustc_hex::FromHex;
198194
use std::sync::{Arc, mpsc};
199195
use parking_lot::Mutex;
200-
use futures_cpupool::CpuPool;
201-
use parity_reactor::Remote;
196+
use parity_runtime::Executor;
202197
use urlhint::tests::{FakeRegistrar, URLHINT};
203198
use super::{Error, Client, HashFetch, random_temp_path};
204199

@@ -216,7 +211,7 @@ mod tests {
216211
// given
217212
let contract = Arc::new(FakeRegistrar::new());
218213
let fetch = FakeFetch::new(None::<usize>);
219-
let client = Client::with_fetch(contract.clone(), CpuPool::new(1), fetch, Remote::new_sync());
214+
let client = Client::with_fetch(contract.clone(), fetch, Executor::new_sync());
220215

221216
// when
222217
let (tx, rx) = mpsc::channel();
@@ -234,7 +229,7 @@ mod tests {
234229
// given
235230
let registrar = Arc::new(registrar());
236231
let fetch = FakeFetch::new(None::<usize>);
237-
let client = Client::with_fetch(registrar.clone(), CpuPool::new(1), fetch, Remote::new_sync());
232+
let client = Client::with_fetch(registrar.clone(), fetch, Executor::new_sync());
238233

239234
// when
240235
let (tx, rx) = mpsc::channel();
@@ -252,7 +247,7 @@ mod tests {
252247
// given
253248
let registrar = Arc::new(registrar());
254249
let fetch = FakeFetch::new(Some(1));
255-
let mut client = Client::with_fetch(registrar.clone(), CpuPool::new(1), fetch, Remote::new_sync());
250+
let mut client = Client::with_fetch(registrar.clone(), fetch, Executor::new_sync());
256251
let path = random_temp_path();
257252
let path2 = path.clone();
258253
client.random_path = Arc::new(move || path2.clone());
@@ -275,7 +270,7 @@ mod tests {
275270
// given
276271
let registrar = Arc::new(registrar());
277272
let fetch = FakeFetch::new(Some(1));
278-
let client = Client::with_fetch(registrar.clone(), CpuPool::new(1), fetch, Remote::new_sync());
273+
let client = Client::with_fetch(registrar.clone(), fetch, Executor::new_sync());
279274

280275
// when
281276
let (tx, rx) = mpsc::channel();

hash-fetch/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,10 @@ extern crate ethabi;
2525
extern crate parity_bytes as bytes;
2626
extern crate ethereum_types;
2727
extern crate futures;
28-
extern crate futures_cpupool;
2928
extern crate keccak_hash as hash;
3029
extern crate mime;
3130
extern crate mime_guess;
32-
extern crate parity_reactor;
31+
extern crate parity_runtime;
3332
extern crate rand;
3433
extern crate rustc_hex;
3534
extern crate registrar;

ipfs/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ authors = ["Parity Technologies <[email protected]>"]
99
ethcore = { path = "../ethcore" }
1010
parity-bytes = "0.1"
1111
ethereum-types = "0.4"
12-
jsonrpc-core = { git = "https://github.com/c0gent/jsonrpc.git", branch = "c0gent-hyper" }
13-
jsonrpc-http-server = { git = "https://github.com/c0gent/jsonrpc.git", branch = "c0gent-hyper" }
12+
jsonrpc-core = { git = "https://github.com/paritytech/jsonrpc.git", rev = "7a59776" }
13+
jsonrpc-http-server = { git = "https://github.com/paritytech/jsonrpc.git", rev = "7a59776" }
1414
rlp = { version = "0.3.0", features = ["ethereum"] }
1515
cid = "0.3"
1616
multihash = "0.8"

miner/Cargo.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ authors = ["Parity Technologies <[email protected]>"]
1111
ethash = { path = "../ethash", optional = true }
1212
fetch = { path = "../util/fetch", optional = true }
1313
hyper = { version = "0.12", optional = true }
14-
parity-reactor = { path = "../util/reactor", optional = true }
1514
url = { version = "1", optional = true }
1615

1716
# Miner
@@ -20,7 +19,7 @@ error-chain = "0.12"
2019
ethcore-transaction = { path = "../ethcore/transaction" }
2120
ethereum-types = "0.4"
2221
futures = "0.1"
23-
futures-cpupool = "0.1"
22+
parity-runtime = { path = "../util/runtime" }
2423
heapsize = "0.4"
2524
keccak-hash = "0.1"
2625
linked-hash-map = "0.5"
@@ -37,4 +36,4 @@ ethkey = { path = "../ethkey" }
3736
rustc-hex = "1.0"
3837

3938
[features]
40-
work-notify = ["ethash", "fetch", "hyper", "parity-reactor", "url"]
39+
work-notify = ["ethash", "fetch", "hyper", "url"]

miner/src/gas_price_calibrator.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use std::time::{Instant, Duration};
2020

2121
use ansi_term::Colour;
2222
use ethereum_types::U256;
23-
use futures_cpupool::CpuPool;
23+
use parity_runtime::Executor;
2424
use price_info::{Client as PriceInfoClient, PriceInfo};
2525
use price_info::fetch::Client as FetchClient;
2626

@@ -43,7 +43,7 @@ pub struct GasPriceCalibrator {
4343

4444
impl GasPriceCalibrator {
4545
/// Create a new gas price calibrator.
46-
pub fn new(options: GasPriceCalibratorOptions, fetch: FetchClient, p: CpuPool) -> GasPriceCalibrator {
46+
pub fn new(options: GasPriceCalibratorOptions, fetch: FetchClient, p: Executor) -> GasPriceCalibrator {
4747
GasPriceCalibrator {
4848
options: options,
4949
next_calibration: Instant::now(),

miner/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ extern crate ansi_term;
2323
extern crate ethcore_transaction as transaction;
2424
extern crate ethereum_types;
2525
extern crate futures;
26-
extern crate futures_cpupool;
26+
extern crate parity_runtime;
2727
extern crate heapsize;
2828
extern crate keccak_hash as hash;
2929
extern crate linked_hash_map;

miner/src/work_notify.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@
1818
1919
extern crate ethash;
2020
extern crate fetch;
21-
extern crate parity_reactor;
21+
extern crate parity_runtime;
2222
extern crate url;
2323
extern crate hyper;
2424

2525
use self::fetch::{Fetch, Request, Client as FetchClient, Method};
26-
use self::parity_reactor::Remote;
26+
use self::parity_runtime::Executor;
2727
use self::ethash::SeedHashCompute;
2828
use self::url::Url;
2929
use self::hyper::header::{self, HeaderValue};
@@ -43,13 +43,13 @@ pub trait NotifyWork : Send + Sync {
4343
pub struct WorkPoster {
4444
urls: Vec<Url>,
4545
client: FetchClient,
46-
remote: Remote,
46+
executor: Executor,
4747
seed_compute: Mutex<SeedHashCompute>,
4848
}
4949

5050
impl WorkPoster {
5151
/// Create new `WorkPoster`.
52-
pub fn new(urls: &[String], fetch: FetchClient, remote: Remote) -> Self {
52+
pub fn new(urls: &[String], fetch: FetchClient, executor: Executor) -> Self {
5353
let urls = urls.into_iter().filter_map(|u| {
5454
match Url::parse(u) {
5555
Ok(url) => Some(url),
@@ -61,7 +61,7 @@ impl WorkPoster {
6161
}).collect();
6262
WorkPoster {
6363
client: fetch,
64-
remote: remote,
64+
executor: executor,
6565
urls: urls,
6666
seed_compute: Mutex::new(SeedHashCompute::default()),
6767
}
@@ -81,7 +81,7 @@ impl NotifyWork for WorkPoster {
8181

8282
for u in &self.urls {
8383
let u = u.clone();
84-
self.remote.spawn(self.client.fetch(
84+
self.executor.spawn(self.client.fetch(
8585
Request::new(u.clone(), Method::POST)
8686
.with_header(header::CONTENT_TYPE, HeaderValue::from_static("application/json"))
8787
.with_body(body.clone()), Default::default()

parity/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ extern crate clap;
2525
extern crate dir;
2626
extern crate env_logger;
2727
extern crate futures;
28-
extern crate futures_cpupool;
2928
extern crate atty;
3029
extern crate jsonrpc_core;
3130
extern crate num_cpus;
@@ -60,7 +59,7 @@ extern crate kvdb;
6059
extern crate parity_hash_fetch as hash_fetch;
6160
extern crate parity_ipfs_api;
6261
extern crate parity_local_store as local_store;
63-
extern crate parity_reactor;
62+
extern crate parity_runtime;
6463
extern crate parity_rpc;
6564
extern crate parity_updater as updater;
6665
extern crate parity_version;

parity/light_helpers/queue_cull.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use light::TransactionQueue;
2929

3030
use futures::{future, Future};
3131

32-
use parity_reactor::Remote;
32+
use parity_runtime::Executor;
3333

3434
use parking_lot::RwLock;
3535

@@ -50,8 +50,8 @@ pub struct QueueCull<T> {
5050
pub on_demand: Arc<OnDemand>,
5151
/// The transaction queue.
5252
pub txq: Arc<RwLock<TransactionQueue>>,
53-
/// Event loop remote.
54-
pub remote: Remote,
53+
/// Event loop executor.
54+
pub executor: Executor,
5555
}
5656

5757
impl<T: LightChainClient + 'static> IoHandler<ClientIoMessage> for QueueCull<T> {
@@ -70,7 +70,7 @@ impl<T: LightChainClient + 'static> IoHandler<ClientIoMessage> for QueueCull<T>
7070
let start_nonce = self.client.engine().account_start_nonce(best_header.number());
7171

7272
info!(target: "cull", "Attempting to cull queued transactions from {} senders.", senders.len());
73-
self.remote.spawn_with_timeout(move || {
73+
self.executor.spawn_with_timeout(move || {
7474
let maybe_fetching = sync.with_context(move |ctx| {
7575
// fetch the nonce of each sender in the queue.
7676
let nonce_reqs = senders.iter()

parity/params.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use ethcore::client::Mode;
2121
use ethcore::ethereum;
2222
use ethcore::spec::{Spec, SpecParams};
2323
use ethereum_types::{U256, Address};
24-
use futures_cpupool::CpuPool;
24+
use parity_runtime::Executor;
2525
use hash_fetch::fetch::Client as FetchClient;
2626
use journaldb::Algorithm;
2727
use miner::gas_pricer::GasPricer;
@@ -256,7 +256,7 @@ impl Default for GasPricerConfig {
256256
}
257257

258258
impl GasPricerConfig {
259-
pub fn to_gas_pricer(&self, fetch: FetchClient, p: CpuPool) -> GasPricer {
259+
pub fn to_gas_pricer(&self, fetch: FetchClient, p: Executor) -> GasPricer {
260260
match *self {
261261
GasPricerConfig::Fixed(u) => GasPricer::Fixed(u),
262262
GasPricerConfig::Calibrated { usd_per_tx, recalibration_period, .. } => {

parity/rpc.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,13 @@ use dir::default_data_path;
2323
use dir::helpers::replace_home;
2424
use helpers::parity_ipc_path;
2525
use jsonrpc_core::MetaIoHandler;
26-
use parity_reactor::Remote;
26+
use parity_runtime::Executor;
2727
use parity_rpc::informant::{RpcStats, Middleware};
2828
use parity_rpc::{self as rpc, Metadata, DomainsValidation};
2929
use rpc_apis::{self, ApiSet};
3030

3131
pub use parity_rpc::{IpcServer, HttpServer, RequestMiddleware};
3232
pub use parity_rpc::ws::Server as WsServer;
33-
pub use parity_rpc::informant::CpuPool;
3433

3534
pub const DAPPS_DOMAIN: &'static str = "web3.site";
3635

@@ -134,9 +133,8 @@ fn address(enabled: bool, bind_iface: &str, bind_port: u16, hosts: &Option<Vec<S
134133

135134
pub struct Dependencies<D: rpc_apis::Dependencies> {
136135
pub apis: Arc<D>,
137-
pub remote: Remote,
136+
pub executor: Executor,
138137
pub stats: Arc<RpcStats>,
139-
pub pool: Option<CpuPool>,
140138
}
141139

142140
pub fn new_ws<D: rpc_apis::Dependencies>(
@@ -155,7 +153,7 @@ pub fn new_ws<D: rpc_apis::Dependencies>(
155153
let handler = {
156154
let mut handler = MetaIoHandler::with_middleware((
157155
rpc::WsDispatcher::new(full_handler),
158-
Middleware::new(deps.stats.clone(), deps.apis.activity_notifier(), deps.pool.clone())
156+
Middleware::new(deps.stats.clone(), deps.apis.activity_notifier())
159157
));
160158
let apis = conf.apis.list_apis();
161159
deps.apis.extend_with_set(&mut handler, &apis);
@@ -289,7 +287,7 @@ pub fn setup_apis<D>(apis: ApiSet, deps: &Dependencies<D>) -> MetaIoHandler<Meta
289287
where D: rpc_apis::Dependencies
290288
{
291289
let mut handler = MetaIoHandler::with_middleware(
292-
Middleware::new(deps.stats.clone(), deps.apis.activity_notifier(), deps.pool.clone())
290+
Middleware::new(deps.stats.clone(), deps.apis.activity_notifier())
293291
);
294292
let apis = apis.list_apis();
295293
deps.apis.extend_with_set(&mut handler, &apis);

0 commit comments

Comments
 (0)