Skip to content

Commit 0726c04

Browse files
authored
use PartialOrder implementation where appropriate (#328)
These two places had a copy pasted implementation of the PartialOrder logic so this replaces it with calling the appropriate trait implementations Signed-off-by: Petros Angelatos <[email protected]>
1 parent 96673a3 commit 0726c04

File tree

2 files changed

+22
-22
lines changed

2 files changed

+22
-22
lines changed

src/operators/arrange/arrangement.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -569,7 +569,7 @@ where
569569
*reader = Some(reader_local);
570570

571571
// Initialize to the minimal input frontier.
572-
let mut input_frontier = vec![<G::Timestamp as Timestamp>::minimum()];
572+
let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());
573573

574574
move |input, output| {
575575

@@ -589,14 +589,12 @@ where
589589
// and sending smaller bites than we might have otherwise done.
590590

591591
// Assert that the frontier never regresses.
592-
assert!(input.frontier().frontier().iter().all(|t1| input_frontier.iter().any(|t2: &G::Timestamp| t2.less_equal(t1))));
593-
594-
// Test to see if strict progress has occurred (any of the old frontier less equal
595-
// to the new frontier).
596-
let progress = input_frontier.iter().any(|t2| !input.frontier().less_equal(t2));
597-
598-
if progress {
592+
assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &input.frontier().frontier()));
599593

594+
// Test to see if strict progress has occurred, which happens whenever the new
595+
// frontier isn't equal to the previous. It is only in this case that we have any
596+
// data processing to do.
597+
if prev_frontier.borrow() != input.frontier().frontier() {
600598
// There are two cases to handle with some care:
601599
//
602600
// 1. If any held capabilities are not in advance of the new input frontier,
@@ -663,8 +661,8 @@ where
663661
writer.seal(input.frontier().frontier().to_owned());
664662
}
665663

666-
input_frontier.clear();
667-
input_frontier.extend(input.frontier().frontier().iter().cloned());
664+
prev_frontier.clear();
665+
prev_frontier.extend(input.frontier().frontier().iter().cloned());
668666
}
669667

670668
if let Some(mut fuel) = effort.clone() {

src/operators/arrange/upsert.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ where
185185
*reader = Some(reader_local.clone());
186186

187187
// Tracks the input frontier, used to populate the lower bound of new batches.
188-
let mut input_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());
188+
let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());
189189

190190
// For stashing input upserts, ordered increasing by time (`BinaryHeap` is a max-heap).
191191
let mut priority_queue = BinaryHeap::<std::cmp::Reverse<(G::Timestamp, Tr::Key, Option<Tr::Val>)>>::new();
@@ -202,11 +202,13 @@ where
202202
}
203203
});
204204

205-
// Test to see if strict progress has occurred, which happens whenever any element of
206-
// the old frontier is not greater or equal to the new frontier. It is only in this
207-
// case that we have any data processing to do.
208-
let progress = input_frontier.elements().iter().any(|t2| !input.frontier().less_equal(t2));
209-
if progress {
205+
// Assert that the frontier never regresses.
206+
assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &input.frontier().frontier()));
207+
208+
// Test to see if strict progress has occurred, which happens whenever the new
209+
// frontier isn't equal to the previous. It is only in this case that we have any
210+
// data processing to do.
211+
if prev_frontier.borrow() != input.frontier().frontier() {
210212

211213
// If there is at least one capability not in advance of the input frontier ...
212214
if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) {
@@ -293,8 +295,8 @@ where
293295
builder.push(update);
294296
}
295297
}
296-
let batch = builder.done(input_frontier.clone(), upper.clone(), Antichain::from_elem(G::Timestamp::minimum()));
297-
input_frontier.clone_from(&upper);
298+
let batch = builder.done(prev_frontier.clone(), upper.clone(), Antichain::from_elem(G::Timestamp::minimum()));
299+
prev_frontier.clone_from(&upper);
298300

299301
// Communicate `batch` to the arrangement and the stream.
300302
writer.insert(batch.clone(), Some(capability.time().clone()));
@@ -325,12 +327,12 @@ where
325327
}
326328

327329
// Update our view of the input frontier.
328-
input_frontier.clear();
329-
input_frontier.extend(input.frontier().frontier().iter().cloned());
330+
prev_frontier.clear();
331+
prev_frontier.extend(input.frontier().frontier().iter().cloned());
330332

331333
// Downgrade capabilities for `reader_local`.
332-
reader_local.set_logical_compaction(input_frontier.borrow());
333-
reader_local.set_physical_compaction(input_frontier.borrow());
334+
reader_local.set_logical_compaction(prev_frontier.borrow());
335+
reader_local.set_physical_compaction(prev_frontier.borrow());
334336
}
335337

336338
if let Some(mut fuel) = effort.clone() {

0 commit comments

Comments
 (0)