diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index 688e441c2..64d197ab6 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -573,22 +573,16 @@ impl Tracker { let target_changes = self.target_changes .iter() - .map(|((target, time), diff)| (target.node, target.port, time.clone(), *diff)) - .collect::>(); + .map(|((target, time), diff)| (target.node, target.port, time, *diff)); - if !target_changes.is_empty() { - logger.log_target_updates(target_changes); - } + logger.log_target_updates(target_changes); let source_changes = self.source_changes .iter() - .map(|((source, time), diff)| (source.node, source.port, time.clone(), *diff)) - .collect::>(); + .map(|((source, time), diff)| (source.node, source.port, time, *diff)); - if !source_changes.is_empty() { - logger.log_source_updates(source_changes); - } + logger.log_source_updates(source_changes); } // Step 1: Drain `self.input_changes` and determine actual frontier changes. @@ -853,22 +847,34 @@ pub mod logging { } /// Log source update events with additional identifying information. - pub fn log_source_updates(&mut self, updates: Vec<(usize, usize, T, i64)>) { - self.logger.log({ - SourceUpdate { - tracker_id: self.identifier, - updates, - } - }) + pub fn log_source_updates<'a, I>(&mut self, updates: I) + where + I: IntoIterator + { + let updates: Vec<_> = updates.into_iter().map(|(a,b,c,d)| (a,b,c.clone(),d)).collect(); + if !updates.is_empty() { + self.logger.log({ + SourceUpdate { + tracker_id: self.identifier, + updates + } + }); + } } /// Log target update events with additional identifying information. - pub fn log_target_updates(&mut self, updates: Vec<(usize, usize, T, i64)>) { - self.logger.log({ - TargetUpdate { - tracker_id: self.identifier, - updates, - } - }) + pub fn log_target_updates<'a, I>(&mut self, updates: I) + where + I: IntoIterator + { + let updates: Vec<_> = updates.into_iter().map(|(a,b,c,d)| (a,b,c.clone(),d)).collect(); + if !updates.is_empty() { + self.logger.log({ + TargetUpdate { + tracker_id: self.identifier, + updates + } + }); + } } } @@ -929,12 +935,10 @@ impl Drop for Tracker { .flat_map(|(port, target)| { target.pointstamps .updates() - .map(move |(time, diff)| (index, port, time.clone(), -diff)) - }) - .collect::>(); - if !target_changes.is_empty() { - logger.log_target_updates(target_changes); - } + .map(move |(time, diff)| (index, port, time, -diff)) + }); + + logger.log_target_updates(target_changes); let source_changes = per_operator.sources .iter_mut() @@ -942,12 +946,10 @@ impl Drop for Tracker { .flat_map(|(port, source)| { source.pointstamps .updates() - .map(move |(time, diff)| (index, port, time.clone(), -diff)) - }) - .collect::>(); - if !source_changes.is_empty() { - logger.log_source_updates(source_changes); - } + .map(move |(time, diff)| (index, port, time, -diff)) + }); + + logger.log_source_updates(source_changes); } } }