Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions timely/src/dataflow/operators/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

use std::{borrow, error::Error, fmt::Display, ops::Deref};
use std::rc::Rc;
use std::cell::RefCell;
use std::cell::{OnceCell, RefCell};
use std::fmt::{self, Debug};

use crate::order::PartialOrder;
Expand Down Expand Up @@ -239,15 +239,15 @@ pub struct InputCapability<T: Timestamp> {
/// Output capability buffers, for use in minting capabilities.
internal: CapabilityUpdates<T>,
/// Timestamp summaries for each output.
summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
summaries: Rc<OnceCell<PortConnectivity<T::Summary>>>,
/// A drop guard that updates the consumed capability this InputCapability refers to on drop
consumed_guard: ConsumedGuard<T>,
}

impl<T: Timestamp> CapabilityTrait<T> for InputCapability<T> {
fn time(&self) -> &T { self.time() }
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool {
let summaries_borrow = self.summaries.borrow();
let summaries_borrow = self.summaries.get().expect("connectivity frozen at operator build");
let internal_borrow = self.internal.borrow();
// To be valid, the output buffer must match and the timestamp summary needs to be the default.
Rc::ptr_eq(&internal_borrow[port], query_buffer) &&
Expand All @@ -258,7 +258,7 @@ impl<T: Timestamp> CapabilityTrait<T> for InputCapability<T> {
impl<T: Timestamp> InputCapability<T> {
/// Creates a new capability reference at `time` while incrementing (and keeping a reference to)
/// the provided [`ChangeBatch`].
pub(crate) fn new(internal: CapabilityUpdates<T>, summaries: Rc<RefCell<PortConnectivity<T::Summary>>>, guard: ConsumedGuard<T>) -> Self {
pub(crate) fn new(internal: CapabilityUpdates<T>, summaries: Rc<OnceCell<PortConnectivity<T::Summary>>>, guard: ConsumedGuard<T>) -> Self {
InputCapability {
internal,
summaries,
Expand All @@ -280,7 +280,7 @@ impl<T: Timestamp> InputCapability<T> {
/// This method panics if `self.time` is not less or equal to `new_time`.
pub fn delayed(&self, new_time: &T, output_port: usize) -> Capability<T> {
use crate::progress::timestamp::PathSummary;
if let Some(path) = self.summaries.borrow().get(output_port) {
if let Some(path) = self.summaries.get().expect("connectivity frozen at operator build").get(output_port) {
if path.iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) {
Capability::new(new_time.clone(), Rc::clone(&self.internal.borrow()[output_port]))
} else {
Expand Down
11 changes: 6 additions & 5 deletions timely/src/dataflow/operators/generic/builder_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::scheduling::{Schedule, Activations};

use crate::progress::{Source, Target};
use crate::progress::{Timestamp, Operate, operate::SharedProgress, Antichain};
use crate::progress::operate::{FrontierInterest, Connectivity, PortConnectivity};
use crate::progress::operate::{FrontierInterest, Connectivity, PortConnectivityBuilder};
use crate::Container;
use crate::dataflow::{Stream, Scope, OperatorSlot};
use crate::dataflow::channels::pushers::Tee;
Expand Down Expand Up @@ -55,7 +55,7 @@ pub struct OperatorBuilder<'scope, T: Timestamp> {
slot: OperatorSlot<'scope, T>,
address: Rc<[usize]>, // path to the operator (ending with index).
shape: OperatorShape,
summary: Connectivity<<T as Timestamp>::Summary>,
summary: Vec<PortConnectivityBuilder<<T as Timestamp>::Summary>>,
}

impl<'scope, T: Timestamp> OperatorBuilder<'scope, T> {
Expand Down Expand Up @@ -113,8 +113,9 @@ impl<'scope, T: Timestamp> OperatorBuilder<'scope, T> {

self.shape.inputs += 1;
self.shape.notify.push(FrontierInterest::Always);
let connectivity: PortConnectivity<_> = connection.into_iter().collect();
assert!(connectivity.iter_ports().all(|(o,_)| o < self.shape.outputs));
let connectivity: PortConnectivityBuilder<_> = connection.into_iter()
.inspect(|(o,_)| assert!(*o < self.shape.outputs))
.collect();
self.summary.push(connectivity);

receiver
Expand Down Expand Up @@ -182,7 +183,7 @@ impl<'scope, T: Timestamp> OperatorBuilder<'scope, T> {
activations: self.scope.activations(),
logic,
shared_progress: Rc::new(RefCell::new(SharedProgress::new(inputs, outputs))),
summary: self.summary,
summary: self.summary.into_iter().map(|b| b.freeze()).collect(),
};

self.slot.install(Box::new(operator));
Expand Down
24 changes: 16 additions & 8 deletions timely/src/dataflow/operators/generic/builder_rc.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Types to build operators with general shapes.

use std::rc::Rc;
use std::cell::RefCell;
use std::cell::{OnceCell, RefCell};
use std::default::Default;

use crate::progress::{ChangeBatch, Timestamp};
Expand All @@ -18,7 +18,7 @@ use crate::dataflow::operators::capability::Capability;
use crate::dataflow::operators::generic::handles::{InputHandleCore, new_input_handle};
use crate::dataflow::operators::generic::operator_info::OperatorInfo;
use crate::dataflow::operators::generic::builder_raw::OperatorShape;
use crate::progress::operate::{FrontierInterest, PortConnectivity};
use crate::progress::operate::{FrontierInterest, PortConnectivity, PortConnectivityBuilder};

use super::builder_raw::OperatorBuilder as OperatorBuilderRaw;

Expand All @@ -29,8 +29,11 @@ pub struct OperatorBuilder<'scope, T: Timestamp> {
frontier: Vec<MutableAntichain<T>>,
consumed: Vec<Rc<RefCell<ChangeBatch<T>>>>,
internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
/// For each input, a shared list of summaries to each output.
summaries: Vec<Rc<RefCell<PortConnectivity<<T as Timestamp>::Summary>>>>,
/// For each input, a shared cell from which input handles and capabilities
/// read the summaries to each output at runtime, and the builder in which
/// the summaries accumulate during construction. The cell is set once, from
/// the builder, when the operator is built.
summaries: Vec<(Rc<OnceCell<PortConnectivity<<T as Timestamp>::Summary>>>, PortConnectivityBuilder<<T as Timestamp>::Summary>)>,
produced: Vec<Rc<RefCell<ChangeBatch<T>>>>,
}

Expand Down Expand Up @@ -81,8 +84,8 @@ impl<'scope, T: Timestamp> OperatorBuilder<'scope, T> {
self.frontier.push(MutableAntichain::new());
self.consumed.push(Rc::clone(input.consumed()));

let shared_summary = Rc::new(RefCell::new(connection.into_iter().collect()));
self.summaries.push(Rc::clone(&shared_summary));
let shared_summary = Rc::new(OnceCell::new());
self.summaries.push((Rc::clone(&shared_summary), connection.into_iter().collect()));

new_input_handle(input, Rc::clone(&self.internal), shared_summary)
}
Expand Down Expand Up @@ -115,7 +118,7 @@ impl<'scope, T: Timestamp> OperatorBuilder<'scope, T> {
self.produced.push(Rc::clone(counter.produced()));

for (input, entry) in connection {
self.summaries[input].borrow_mut().add_port(new_output, entry);
self.summaries[input].1.add_port(new_output, entry);
}

(pushers::Output::new(counter, internal, new_output), stream)
Expand Down Expand Up @@ -168,11 +171,16 @@ impl<'scope, T: Timestamp> OperatorBuilder<'scope, T> {
///
/// This method calls `build_typed` directly using a new closure, mirroring
/// the variation in `L`, rather than forcing it to be reboxed via `build`.
pub fn build_reschedule_typed<B, L>(self, constructor: B)
pub fn build_reschedule_typed<B, L>(mut self, constructor: B)
where
B: FnOnce(Vec<Capability<T>>) -> L,
L: FnMut(&[MutableAntichain<T>])->bool+'static
{
// Freeze the per-input connectivity, now complete, for runtime readers.
for (cell, builder) in std::mem::take(&mut self.summaries) {
cell.set(builder.freeze()).expect("connectivity already frozen");
}

let mut logic = constructor(self.mint_capabilities());

let mut bookkeeping = ProgressBookkeeping {
Expand Down
6 changes: 3 additions & 3 deletions timely/src/dataflow/operators/generic/handles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! the operator would with its input and output streams.

use std::rc::Rc;
use std::cell::RefCell;
use std::cell::{OnceCell, RefCell};
use std::collections::VecDeque;

use crate::progress::Timestamp;
Expand All @@ -27,7 +27,7 @@ pub struct InputHandleCore<T: Timestamp, C, P: Pull<Message<T, C>>> {
///
/// Each timestamp received through this input may only produce output timestamps
/// greater or equal to the input timestamp subjected to at least one of these summaries.
summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
summaries: Rc<OnceCell<PortConnectivity<T::Summary>>>,
/// Staged capabilities and containers.
staging: VecDeque<(InputCapability<T>, C)>,
staged: Vec<C>,
Expand Down Expand Up @@ -76,7 +76,7 @@ impl<T: Timestamp, C: Accountable, P: Pull<Message<T, C>>> InputHandleCore<T, C,
pub fn new_input_handle<T: Timestamp, C: Accountable, P: Pull<Message<T, C>>>(
pull_counter: PullCounter<T, C, P>,
internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
summaries: Rc<OnceCell<PortConnectivity<T::Summary>>>,
) -> InputHandleCore<T, C, P> {
InputHandleCore {
pull_counter,
Expand Down
3 changes: 3 additions & 0 deletions timely/src/progress/frontier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,9 @@ impl<T> Antichain<T> {
///```
pub fn clear(&mut self) { self.elements.clear() }

/// Drains the elements, leaving the allocation for reuse.
pub fn drain(&mut self) -> smallvec::Drain<'_, [T; 1]> { self.elements.drain(..) }

/// Sorts the elements so that comparisons between antichains can be made.
pub fn sort(&mut self) where T: Ord { self.elements.sort() }

Expand Down
96 changes: 73 additions & 23 deletions timely/src/progress/operate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,50 +76,100 @@ pub enum FrontierInterest {

/// Operator internal connectivity, from inputs to outputs.
pub type Connectivity<TS> = Vec<PortConnectivity<TS>>;
/// Internal connectivity from one port to any number of opposing ports.
#[derive(serde::Serialize, serde::Deserialize, columnar::Columnar, Debug, Clone, Eq, PartialEq)]
pub struct PortConnectivity<TS> {
tree: std::collections::BTreeMap<usize, Antichain<TS>>,

/// Append-only accumulation of port summaries, prior to canonicalization.
///
/// Summaries may be introduced in any order, and repeatedly for the same port.
/// The `freeze` method canonicalizes the accumulation into a `PortConnectivity`,
/// which is the only way to read the contents back out.
#[derive(Debug, Clone)]
pub struct PortConnectivityBuilder<TS> {
/// Pairs of port and path summary antichain, in insertion order.
entries: Vec<(usize, Antichain<TS>)>,
}

impl<TS> Default for PortConnectivity<TS> {
impl<TS> Default for PortConnectivityBuilder<TS> {
fn default() -> Self {
Self { tree: std::collections::BTreeMap::new() }
Self { entries: Vec::new() }
}
}

impl<TS> PortConnectivity<TS> {
/// Inserts an element by reference, ensuring that the index exists.
pub fn insert(&mut self, index: usize, element: TS) -> bool where TS : crate::PartialOrder {
self.tree.entry(index).or_default().insert(element)
}
/// Inserts an element by reference, ensuring that the index exists.
pub fn insert_ref(&mut self, index: usize, element: &TS) -> bool where TS : crate::PartialOrder + Clone {
self.tree.entry(index).or_default().insert_ref(element)
impl<TS> PortConnectivityBuilder<TS> {
/// Inserts a summary element for `index`.
///
/// Equivalent to `add_port` with a single-element antichain.
pub fn insert(&mut self, index: usize, element: TS) {
self.add_port(index, Antichain::from_elem(element));
}
/// Introduces a summary for `port`. Panics if a summary already exists.
/// Introduces a summary for `port`, which `freeze` will merge with any other
/// summaries for the same port.
///
/// Summaries for the same port are merged by antichain insertion, and describe the
/// union of the claimed paths. Empty summaries are discarded.
pub fn add_port(&mut self, port: usize, summary: Antichain<TS>) {
if !summary.is_empty() {
let prior = self.tree.insert(port, summary);
assert!(prior.is_none());
self.entries.push((port, summary));
}
else {
assert!(self.tree.remove(&port).is_none());
}
/// Canonicalizes the accumulated summaries into a readable `PortConnectivity`.
///
/// Duplicate ports are merged by antichain insertion, whose result is independent of
/// the order in which elements were introduced.
pub fn freeze(mut self) -> PortConnectivity<TS> where TS : crate::PartialOrder {
self.entries.sort_unstable_by_key(|(port, _)| *port);
let mut entries: Vec<(usize, Antichain<TS>)> = Vec::with_capacity(self.entries.len());
for (port, summary) in self.entries {
match entries.last_mut() {
Some((last, antichain)) if *last == port => {
for element in summary { antichain.insert(element); }
}
_ => { entries.push((port, summary)); }
}
}
PortConnectivity { entries }
}
}

impl<TS> FromIterator<(usize, Antichain<TS>)> for PortConnectivityBuilder<TS> {
fn from_iter<T>(iter: T) -> Self where T: IntoIterator<Item = (usize, Antichain<TS>)> {
Self { entries: iter.into_iter().filter(|(_,p)| !p.is_empty()).collect() }
}
}

/// Internal connectivity from one port to any number of opposing ports.
///
/// Always in canonical form: ports sorted and distinct, antichains non-empty.
/// Values are constructed by `PortConnectivityBuilder::freeze` (or collected from
/// an iterator), and offer no mutation.
#[derive(serde::Serialize, serde::Deserialize, columnar::Columnar, Debug, Clone, Eq, PartialEq)]
pub struct PortConnectivity<TS> {
/// Pairs of port and path summary antichain, sorted by distinct port.
entries: Vec<(usize, Antichain<TS>)>,
}

impl<TS> Default for PortConnectivity<TS> {
fn default() -> Self {
Self { entries: Vec::new() }
}
}

impl<TS> PortConnectivity<TS> {
/// Borrowing iterator of port identifiers and antichains.
pub fn iter_ports(&self) -> impl Iterator<Item = (usize, &Antichain<TS>)> {
self.tree.iter().map(|(o,p)| (*o, p))
self.entries.iter().map(|(o,p)| (*o, p))
}
/// Returns the associated path summary, if it exists.
pub fn get(&self, index: usize) -> Option<&Antichain<TS>> {
self.tree.get(&index)
self.entries
.binary_search_by_key(&index, |(port, _)| *port)
.ok()
.map(|position| &self.entries[position].1)
}
}

impl<TS> FromIterator<(usize, Antichain<TS>)> for PortConnectivity<TS> {
impl<TS: crate::PartialOrder> FromIterator<(usize, Antichain<TS>)> for PortConnectivity<TS> {
fn from_iter<T>(iter: T) -> Self where T: IntoIterator<Item = (usize, Antichain<TS>)> {
Self { tree: iter.into_iter().filter(|(_,p)| !p.is_empty()).collect() }
iter.into_iter().collect::<PortConnectivityBuilder<TS>>().freeze()
}
}

Expand Down
Loading
Loading