diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..fa8d85a --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +Cargo.lock +target diff --git a/Cargo.toml b/Cargo.toml index 6b8cbc3..fe3eeb7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,20 +5,20 @@ authors = ["Ricardo Pieper "] edition = "2021" [dependencies] -rand = "0.6.5" -lazy_static = "1.3.0" -raster = "0.2.0" -clap = "2.33.0" -num_cpus = "1.0" -rayon = "1.6.0" -time = "0.1.42" -futures = "0.3" -tokio = { version = "1", features = ["full"] } -tokio-stream = "0.1" parking_lot = "0.12" [dev-dependencies] criterion = "0.3" +rayon = "1.6" +clap = "2.34" +raster = "0.2" +tokio = { version = "1.24", features = ["full"] } +tokio-stream = "0.1" +rand = "0.6" +lazy_static = "1.4" +num_cpus = "1.15" +time = "0.3" +futures = "0.3" [[bench]] name = "mandelbrot_rustspp" diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 0000000..d9a115b --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,2 @@ +[toolchain] +channel = "nightly" # this is necessary for cargo-expand to work diff --git a/src/blocks/in_block.rs b/src/blocks/in_block.rs index 88cd532..88b5837 100644 --- a/src/blocks/in_block.rs +++ b/src/blocks/in_block.rs @@ -3,8 +3,6 @@ use crate::*; use parking_lot::Mutex; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use std::thread; -use std::thread::JoinHandle; use work_storage::{BlockingOrderedSet, BlockingQueue}; use work_storage::{TimestampedWorkItem, WorkItem}; @@ -106,7 +104,7 @@ impl< let collected = handler.process(val, order); (*collected_list).push(collected); } - TimestampedWorkItem(WorkItem::Dropped, order) => (), + TimestampedWorkItem(WorkItem::Dropped, _order) => (), TimestampedWorkItem(WorkItem::Stop, _) => { break; } @@ -134,7 +132,7 @@ impl< let collected: TCollected = handler.process(val, order); (*collected_list).push(collected); } - TimestampedWorkItem(WorkItem::Dropped, order) => { + TimestampedWorkItem(WorkItem::Dropped, _order) => { next_item += 1; } TimestampedWorkItem(WorkItem::Stop, _) => { diff --git a/src/blocks/inout_block.rs b/src/blocks/inout_block.rs index 04ec43a..404f0ba 100644 --- a/src/blocks/inout_block.rs +++ b/src/blocks/inout_block.rs @@ -1,8 +1,6 @@ use crate::blocks::*; use crate::work_storage::*; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::thread; -use std::thread::JoinHandle; use std::{marker::PhantomData, sync::Arc}; // Public API: A Input-Output node; transforms some value into another @@ -80,12 +78,12 @@ impl< } impl< - TInput: 'static + Send, - TOutput: 'static, - TCollected: 'static, - TStage: InOut + Send + 'static, - TFactory: FnMut() -> TStage, - TNextStep: PipelineBlock + Send + Sync + 'static, + TInput: 'static + Send, + TOutput: 'static, + TCollected: 'static, + TStage: InOut + Send + 'static, + TFactory: FnMut() -> TStage, + TNextStep: PipelineBlock + Send + Sync + 'static, > InOutBlock { pub fn new( @@ -110,7 +108,7 @@ impl< work_queue: BlockingQueue::new(), next_step: Arc::new(next_step), transformer_factory: transformer, - replicas: replicas, + replicas, _params: PhantomData, } } @@ -123,7 +121,7 @@ impl< let queue = self.work_queue.clone(); let alive_threads = alive_threads.clone(); - let mut next_step = self.next_step.clone(); + let next_step = self.next_step.clone(); let mut transformer = (self.transformer_factory)(); let monitor_loop = MonitorLoop::new(move || {