Skip to content

Commit

Permalink
Move opinions about reachability logging into TrackerLogger (#629)
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry authored Jan 23, 2025
1 parent 90bd8aa commit bd4c8a7
Showing 1 changed file with 38 additions and 36 deletions.
74 changes: 38 additions & 36 deletions timely/src/progress/reachability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -573,22 +573,16 @@ impl<T:Timestamp> Tracker<T> {
let target_changes =
self.target_changes
.iter()
.map(|((target, time), diff)| (target.node, target.port, time.clone(), *diff))
.collect::<Vec<_>>();
.map(|((target, time), diff)| (target.node, target.port, time, *diff));

if !target_changes.is_empty() {
logger.log_target_updates(target_changes);
}
logger.log_target_updates(target_changes);

let source_changes =
self.source_changes
.iter()
.map(|((source, time), diff)| (source.node, source.port, time.clone(), *diff))
.collect::<Vec<_>>();
.map(|((source, time), diff)| (source.node, source.port, time, *diff));

if !source_changes.is_empty() {
logger.log_source_updates(source_changes);
}
logger.log_source_updates(source_changes);
}

// Step 1: Drain `self.input_changes` and determine actual frontier changes.
Expand Down Expand Up @@ -853,22 +847,34 @@ pub mod logging {
}

/// Log source update events with additional identifying information.
pub fn log_source_updates(&mut self, updates: Vec<(usize, usize, T, i64)>) {
self.logger.log({
SourceUpdate {
tracker_id: self.identifier,
updates,
}
})
pub fn log_source_updates<'a, I>(&mut self, updates: I)
where
I: IntoIterator<Item = (usize, usize, &'a T, i64)>
{
let updates: Vec<_> = updates.into_iter().map(|(a,b,c,d)| (a,b,c.clone(),d)).collect();
if !updates.is_empty() {
self.logger.log({
SourceUpdate {
tracker_id: self.identifier,
updates
}
});
}
}
/// Log target update events with additional identifying information.
pub fn log_target_updates(&mut self, updates: Vec<(usize, usize, T, i64)>) {
self.logger.log({
TargetUpdate {
tracker_id: self.identifier,
updates,
}
})
pub fn log_target_updates<'a, I>(&mut self, updates: I)
where
I: IntoIterator<Item = (usize, usize, &'a T, i64)>
{
let updates: Vec<_> = updates.into_iter().map(|(a,b,c,d)| (a,b,c.clone(),d)).collect();
if !updates.is_empty() {
self.logger.log({
TargetUpdate {
tracker_id: self.identifier,
updates
}
});
}
}
}

Expand Down Expand Up @@ -929,25 +935,21 @@ impl<T: Timestamp> Drop for Tracker<T> {
.flat_map(|(port, target)| {
target.pointstamps
.updates()
.map(move |(time, diff)| (index, port, time.clone(), -diff))
})
.collect::<Vec<_>>();
if !target_changes.is_empty() {
logger.log_target_updates(target_changes);
}
.map(move |(time, diff)| (index, port, time, -diff))
});

logger.log_target_updates(target_changes);

let source_changes = per_operator.sources
.iter_mut()
.enumerate()
.flat_map(|(port, source)| {
source.pointstamps
.updates()
.map(move |(time, diff)| (index, port, time.clone(), -diff))
})
.collect::<Vec<_>>();
if !source_changes.is_empty() {
logger.log_source_updates(source_changes);
}
.map(move |(time, diff)| (index, port, time, -diff))
});

logger.log_source_updates(source_changes);
}
}
}

0 comments on commit bd4c8a7

Please sign in to comment.