diff --git a/Cargo.lock b/Cargo.lock index c9d4da59..b75512cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2845,7 +2845,6 @@ dependencies = [ "abomonation", "abomonation_derive", "crossbeam-channel", - "futures-util", "getopts", "serde", "serde_derive", diff --git a/external/differential-dataflow/dogsdogsdogs/src/operators/half_join.rs b/external/differential-dataflow/dogsdogsdogs/src/operators/half_join.rs index 338a994a..a2920c16 100644 --- a/external/differential-dataflow/dogsdogsdogs/src/operators/half_join.rs +++ b/external/differential-dataflow/dogsdogsdogs/src/operators/half_join.rs @@ -186,6 +186,9 @@ where let timer = std::time::Instant::now(); let mut work = 0; + // New entries to introduce to the stash after processing. + let mut stash_additions = HashMap::new(); + if let Some(ref mut trace) = arrangement_trace { for (capability, proposals) in stash.iter_mut() { @@ -231,6 +234,35 @@ where } proposals.retain(|ptd| !ptd.2.is_zero()); + + // Determine the lower bound of remaining update times. + let mut antichain = Antichain::new(); + for (_, initial, _) in proposals.iter() { + antichain.insert(initial.clone()); + } + // Fast path: there is only one element in the antichain. + // All times in `proposals` must be greater or equal to it. + if antichain.len() == 1 && !antichain.less_equal(capability.time()) { + stash_additions + .entry(capability.delayed(&antichain[0])) + .or_insert(Vec::new()) + .extend(proposals.drain(..)); + } + else if antichain.len() > 1 { + // Any remaining times should peel off elements from `proposals`. + let mut additions = vec![Vec::new(); antichain.len()]; + for (data, initial, diff) in proposals.drain(..) { + use timely::PartialOrder; + let position = antichain.iter().position(|t| t.less_equal(&initial)).unwrap(); + additions[position].push((data, initial, diff)); + } + for (time, addition) in antichain.into_iter().zip(additions) { + stash_additions + .entry(capability.delayed(&time)) + .or_insert(Vec::new()) + .extend(addition); + } + } } } } @@ -242,6 +274,10 @@ where // drop fully processed capabilities. stash.retain(|_,proposals| !proposals.is_empty()); + + for (capability, proposals) in stash_additions.into_iter() { + stash.entry(capability).or_insert(Vec::new()).extend(proposals); + } // The logical merging frontier depends on both input1 and stash. let mut frontier = timely::progress::frontier::Antichain::new(); diff --git a/external/differential-dataflow/examples/accumulate.rs b/external/differential-dataflow/examples/accumulate.rs index cfb37078..efd1dfc8 100644 --- a/external/differential-dataflow/examples/accumulate.rs +++ b/external/differential-dataflow/examples/accumulate.rs @@ -5,7 +5,6 @@ extern crate differential_dataflow; use rand::{Rng, SeedableRng, StdRng}; use differential_dataflow::input::Input; -use differential_dataflow::operators::Consolidate; fn main() { diff --git a/external/differential-dataflow/examples/arrange.rs b/external/differential-dataflow/examples/arrange.rs index 8fbda1cb..f6924094 100644 --- a/external/differential-dataflow/examples/arrange.rs +++ b/external/differential-dataflow/examples/arrange.rs @@ -14,7 +14,6 @@ use differential_dataflow::operators::arrange::ArrangeByKey; use differential_dataflow::operators::reduce::Reduce; use differential_dataflow::operators::join::JoinCore; use differential_dataflow::operators::Iterate; -use differential_dataflow::operators::Consolidate; fn main() { diff --git a/external/differential-dataflow/examples/dynamic.rs b/external/differential-dataflow/examples/dynamic.rs new file mode 100644 index 00000000..8a9dcdc8 --- /dev/null +++ b/external/differential-dataflow/examples/dynamic.rs @@ -0,0 +1,153 @@ +extern crate rand; +extern crate timely; +extern crate differential_dataflow; + +use rand::{Rng, SeedableRng, StdRng}; + +use timely::dataflow::*; +use timely::dataflow::operators::probe::Handle; + +use differential_dataflow::input::Input; +use differential_dataflow::Collection; +use differential_dataflow::operators::*; +use differential_dataflow::lattice::Lattice; +use differential_dataflow::logging::DifferentialEvent; + +type Node = u32; +type Edge = (Node, Node); + +fn main() { + + let nodes: u32 = std::env::args().nth(1).unwrap().parse().unwrap(); + let edges: u32 = std::env::args().nth(2).unwrap().parse().unwrap(); + let batch: u32 = std::env::args().nth(3).unwrap().parse().unwrap(); + let rounds: u32 = std::env::args().nth(4).unwrap().parse().unwrap(); + let inspect: bool = std::env::args().nth(5).unwrap() == "inspect"; + + // define a new computational scope, in which to run BFS + timely::execute_from_args(std::env::args(), move |worker| { + + if let Ok(addr) = ::std::env::var("DIFFERENTIAL_LOG_ADDR") { + + eprintln!("enabled DIFFERENTIAL logging to {}", addr); + + if let Ok(stream) = ::std::net::TcpStream::connect(&addr) { + let writer = ::timely::dataflow::operators::capture::EventWriter::new(stream); + let mut logger = ::timely::logging::BatchLogger::new(writer); + worker.log_register().insert::("differential/arrange", move |time, data| + logger.publish_batch(time, data) + ); + } + else { + panic!("Could not connect to differential log address: {:?}", addr); + } + } + + let timer = ::std::time::Instant::now(); + + // define BFS dataflow; return handles to roots and edges inputs + let mut probe = Handle::new(); + let (mut roots, mut graph) = worker.dataflow(|scope| { + + let (root_input, roots) = scope.new_collection(); + let (edge_input, graph) = scope.new_collection(); + + let mut result = bfs(&graph, &roots); + + if !inspect { + result = result.filter(|_| false); + } + + result.map(|(_,l)| l) + .consolidate() + .inspect(|x| println!("\t{:?}", x)) + .probe_with(&mut probe); + + (root_input, edge_input) + }); + + let seed: &[_] = &[1, 2, 3, 4]; + let mut rng1: StdRng = SeedableRng::from_seed(seed); // rng for edge additions + let mut rng2: StdRng = SeedableRng::from_seed(seed); // rng for edge deletions + + roots.insert(0); + roots.close(); + + println!("performing BFS on {} nodes, {} edges:", nodes, edges); + + if worker.index() == 0 { + for _ in 0 .. edges { + graph.insert((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes))); + } + } + + println!("{:?}\tloaded", timer.elapsed()); + + graph.advance_to(1); + graph.flush(); + worker.step_or_park_while(None, || probe.less_than(graph.time())); + + println!("{:?}\tstable", timer.elapsed()); + + for round in 0 .. rounds { + for element in 0 .. batch { + if worker.index() == 0 { + graph.insert((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes))); + graph.remove((rng2.gen_range(0, nodes), rng2.gen_range(0, nodes))); + } + graph.advance_to(2 + round * batch + element); + } + graph.flush(); + + let timer2 = ::std::time::Instant::now(); + worker.step_or_park_while(None, || probe.less_than(&graph.time())); + + if worker.index() == 0 { + let elapsed = timer2.elapsed(); + println!("{:?}\t{:?}:\t{}", timer.elapsed(), round, elapsed.as_secs() * 1000000000 + (elapsed.subsec_nanos() as u64)); + } + } + println!("finished; elapsed: {:?}", timer.elapsed()); + }).unwrap(); +} + +// returns pairs (n, s) indicating node n can be reached from a root in s steps. +fn bfs(edges: &Collection, roots: &Collection) -> Collection +where G::Timestamp: Lattice+Ord { + + use timely::order::Product; + use iterate::Variable; + use differential_dataflow::dynamic::{feedback_summary, pointstamp::PointStamp}; + + // initialize roots as reaching themselves at distance 0 + let nodes = roots.map(|x| (x, 0)); + + // repeatedly update minimal distances each node can be reached from each root + nodes.scope().iterative::, _, _>(|inner| { + + // These enter the statically bound scope, rather than any iterative scopes. + // We do not *need* to enter them into the dynamic scope, as they are static + // within that scope. + let edges = edges.enter(inner); + let nodes = nodes.enter(inner); + + // Create a variable for label iteration. + let inner = feedback_summary::(1, 1); + let label = Variable::new_from(nodes.clone(), Product { outer: Default::default(), inner }); + + let next = + label + .join_map(&edges, |_k,l,d| (*d, l+1)) + .concat(&nodes) + .reduce(|_, s, t| t.push((*s[0].0, 1))) + ; + + label.set(&next); + // Leave the dynamic iteration, stripping off the last timestamp coordinate. + next + .leave_dynamic(1) + .inspect(|x| println!("{:?}", x)) + .leave() + }) + +} \ No newline at end of file diff --git a/external/differential-dataflow/examples/graspan.rs b/external/differential-dataflow/examples/graspan.rs index c0316955..d06f2237 100644 --- a/external/differential-dataflow/examples/graspan.rs +++ b/external/differential-dataflow/examples/graspan.rs @@ -17,7 +17,7 @@ use differential_dataflow::lattice::Lattice; use differential_dataflow::input::{Input, InputSession}; use differential_dataflow::operators::arrange::{ArrangeByKey, ArrangeBySelf}; use differential_dataflow::operators::iterate::Variable; -use differential_dataflow::operators::{Threshold, JoinCore, Consolidate}; +use differential_dataflow::operators::{Threshold, JoinCore}; type Node = usize; type Edge = (Node, Node); diff --git a/external/differential-dataflow/examples/progress.rs b/external/differential-dataflow/examples/progress.rs index f909dce4..30c11b38 100644 --- a/external/differential-dataflow/examples/progress.rs +++ b/external/differential-dataflow/examples/progress.rs @@ -204,7 +204,7 @@ where { // Retain node connections along "default" timestamp summaries. let nodes = nodes.flat_map(|(target, source, summary)| { - if summary != Default::default() { + if summary == Default::default() { Some((Location::from(target), Location::from(source))) } else { diff --git a/external/differential-dataflow/src/collection.rs b/external/differential-dataflow/src/collection.rs index 23c5f0da..eb82c5d0 100644 --- a/external/differential-dataflow/src/collection.rs +++ b/external/differential-dataflow/src/collection.rs @@ -531,7 +531,6 @@ impl Collection where G::Timestamp: Da R: ::ExchangeData+Hashable, G::Timestamp: Lattice+Ord, { - use operators::consolidate::Consolidate; self.consolidate() .inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x)); } @@ -545,6 +544,7 @@ impl Collection where G::Timestamp: Da use timely::dataflow::scopes::ScopeParent; use timely::progress::timestamp::Refines; +/// Methods requiring a nested scope. impl<'a, G: Scope, T: Timestamp, D: Data, R: Semigroup> Collection, D, R> where T: Refines<::Timestamp>, @@ -582,6 +582,7 @@ where } } +/// Methods requiring a region as the scope. impl<'a, G: Scope, D: Data, R: Semigroup> Collection, D, R> { /// Returns the value of a Collection from a nested region to its containing scope. @@ -595,6 +596,7 @@ impl<'a, G: Scope, D: Data, R: Semigroup> Collection, } } +/// Methods requiring an Abelian difference, to support negation. impl Collection where G::Timestamp: Data { /// Creates a new collection whose counts are the negation of those in the input. /// diff --git a/external/differential-dataflow/src/consolidation.rs b/external/differential-dataflow/src/consolidation.rs index 69b5f457..b4087bcc 100644 --- a/external/differential-dataflow/src/consolidation.rs +++ b/external/differential-dataflow/src/consolidation.rs @@ -38,6 +38,8 @@ pub fn consolidate_slice(slice: &mut [(T, R)]) -> usize { // In a world where there are not many results, we may never even need to call in to merge sort. slice.sort_by(|x,y| x.0.cmp(&y.0)); + let slice_ptr = slice.as_mut_ptr(); + // Counts the number of distinct known-non-zero accumulations. Indexes the write location. let mut offset = 0; for index in 1 .. slice.len() { @@ -55,8 +57,8 @@ pub fn consolidate_slice(slice: &mut [(T, R)]) -> usize { assert!(offset < index); // LOOP INVARIANT: offset < index - let ptr1 = slice.as_mut_ptr().offset(offset as isize); - let ptr2 = slice.as_mut_ptr().offset(index as isize); + let ptr1 = slice_ptr.add(offset); + let ptr2 = slice_ptr.add(index); if (*ptr1).0 == (*ptr2).0 { (*ptr1).1.plus_equals(&(*ptr2).1); @@ -65,7 +67,7 @@ pub fn consolidate_slice(slice: &mut [(T, R)]) -> usize { if !(*ptr1).1.is_zero() { offset += 1; } - let ptr1 = slice.as_mut_ptr().offset(offset as isize); + let ptr1 = slice_ptr.add(offset); std::ptr::swap(ptr1, ptr2); } } @@ -103,6 +105,8 @@ pub fn consolidate_updates_slice(slice: &mut [(D, // In a world where there are not many results, we may never even need to call in to merge sort. slice.sort_unstable_by(|x,y| (&x.0, &x.1).cmp(&(&y.0, &y.1))); + let slice_ptr = slice.as_mut_ptr(); + // Counts the number of distinct known-non-zero accumulations. Indexes the write location. let mut offset = 0; for index in 1 .. slice.len() { @@ -118,8 +122,8 @@ pub fn consolidate_updates_slice(slice: &mut [(D, unsafe { // LOOP INVARIANT: offset < index - let ptr1 = slice.as_mut_ptr().offset(offset as isize); - let ptr2 = slice.as_mut_ptr().offset(index as isize); + let ptr1 = slice_ptr.add(offset); + let ptr2 = slice_ptr.add(index); if (*ptr1).0 == (*ptr2).0 && (*ptr1).1 == (*ptr2).1 { (*ptr1).2.plus_equals(&(*ptr2).2); @@ -128,7 +132,7 @@ pub fn consolidate_updates_slice(slice: &mut [(D, if !(*ptr1).2.is_zero() { offset += 1; } - let ptr1 = slice.as_mut_ptr().offset(offset as isize); + let ptr1 = slice_ptr.add(offset); std::ptr::swap(ptr1, ptr2); } diff --git a/external/differential-dataflow/src/dynamic/mod.rs b/external/differential-dataflow/src/dynamic/mod.rs new file mode 100644 index 00000000..b637666c --- /dev/null +++ b/external/differential-dataflow/src/dynamic/mod.rs @@ -0,0 +1,76 @@ +//! Types and operators for dynamically scoped iterative dataflows. +//! +//! Scopes in timely dataflow are expressed statically, as part of the type system. +//! This affords many efficiencies, as well as type-driven reassurance of correctness. +//! However, there are times you need scopes whose organization is discovered only at runtime. +//! Naiad and Materialize are examples: the latter taking arbitrary SQL into iterative dataflows. +//! +//! This module provides a timestamp type `Pointstamp` that can represent an update with an +//! unboundedly long sequence of some `T: Timestamp`, ordered by the product order by which times +//! in iterative dataflows are ordered. The module also provides methods for manipulating these +//! timestamps to emulate the movement of update streams in to, within, and out of iterative scopes. +//! + +pub mod pointstamp; + +use timely::dataflow::Scope; +use timely::order::Product; +use timely::progress::Timestamp; +use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; +use timely::dataflow::channels::pact::Pipeline; +use timely::progress::Antichain; + +use difference::Semigroup; +use {Collection, Data}; +use collection::AsCollection; +use dynamic::pointstamp::PointStamp; +use dynamic::pointstamp::PointStampSummary; + +impl Collection +where + G: Scope>>, + D: Data, + R: Semigroup, + T: Timestamp+Default, + TOuter: Timestamp, +{ + /// Enters a dynamically created scope which has `level` timestamp coordinates. + pub fn enter_dynamic(&self, _level: usize) -> Self { + (*self).clone() + } + /// Leaves a dynamically created scope which has `level` timestamp coordinates. + pub fn leave_dynamic(&self, level: usize) -> Self { + // Create a unary operator that will strip all but `level-1` timestamp coordinates. + let mut builder = OperatorBuilder::new("LeaveDynamic".to_string(), self.scope()); + let (mut output, stream) = builder.new_output(); + let mut input = builder.new_input_connection(&self.inner, Pipeline, vec![Antichain::from_elem(Product { outer: Default::default(), inner: PointStampSummary { retain: Some(level - 1), actions: Vec::new() } })]); + + let mut vector = Default::default(); + builder.build(move |_capability| move |_frontier| { + let mut output = output.activate(); + input.for_each(|cap, data| { + data.swap(&mut vector); + let mut new_time = cap.time().clone(); + new_time.inner.vector.truncate(level - 1); + let new_cap = cap.delayed(&new_time); + for (_data, time, _diff) in vector.iter_mut() { + time.inner.vector.truncate(level - 1); + } + output.session(&new_cap).give_vec(&mut vector); + }); + }); + + stream.as_collection() + } +} + +/// Produces the summary for a feedback operator at `level`, applying `summary` to that coordinate. +pub fn feedback_summary(level: usize, summary: T::Summary) -> PointStampSummary +where + T: Timestamp+Default, +{ + PointStampSummary { + retain: None, + actions: std::iter::repeat(Default::default()).take(level-1).chain(std::iter::once(summary)).collect(), + } +} diff --git a/external/differential-dataflow/src/dynamic/pointstamp.rs b/external/differential-dataflow/src/dynamic/pointstamp.rs new file mode 100644 index 00000000..1c2eb247 --- /dev/null +++ b/external/differential-dataflow/src/dynamic/pointstamp.rs @@ -0,0 +1,204 @@ +//! A timestamp type as in Naiad, where a vector of timestamps of different lengths are comparable. +//! +//! This type compares using "standard" tuple logic as if each timestamp were extended indefinitely with minimal elements. +//! +//! The path summary for this type allows *run-time* rather than *type-driven* iterative scopes. +//! Each summary represents some journey within and out of some number of scopes, followed by entry +//! into and iteration within some other number of scopes. +//! +//! As a result, summaries describe some number of trailing coordinates to truncate, and some increments +//! to the resulting vector. Structurally, the increments can only be to one non-truncated coordinate +//! (as iteration within a scope requires leaving contained scopes), and then to any number of appended +//! default coordinates (which is effectively just *setting* the coordinate). + +use serde::{Deserialize, Serialize}; + +/// A sequence of timestamps, partially ordered by the product order. +/// +/// Sequences of different lengths are compared as if extended indefinitely by `T::minimum()`. +/// Sequences are not guaranteed to be "minimal", and may end with `T::minimum()` entries. +#[derive( + Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize, Abomonation, +)] +pub struct PointStamp { + /// A sequence of timestamps corresponding to timestamps in a sequence of nested scopes. + pub vector: Vec, +} + +impl PointStamp { + /// Create a new sequence. + pub fn new(vector: Vec) -> Self { + PointStamp { vector } + } +} + +// Implement timely dataflow's `PartialOrder` trait. +use timely::order::PartialOrder; +impl PartialOrder for PointStamp { + fn less_equal(&self, other: &Self) -> bool { + // Every present coordinate must be less-equal the corresponding coordinate, + // where absent corresponding coordinates are `T::minimum()`. Coordinates + // absent from `self.vector` are themselves `T::minimum()` and are less-equal + // any corresponding coordinate in `other.vector`. + self.vector + .iter() + .zip(other.vector.iter().chain(std::iter::repeat(&T::minimum()))) + .all(|(t1, t2)| t1.less_equal(t2)) + } +} + +use timely::progress::timestamp::Refines; +impl Refines<()> for PointStamp { + fn to_inner(_outer: ()) -> Self { + Self { vector: Vec::new() } + } + fn to_outer(self) -> () { + () + } + fn summarize(_summary: ::Summary) -> () { + () + } +} + +// Implement timely dataflow's `PathSummary` trait. +// This is preparation for the `Timestamp` implementation below. +use timely::progress::PathSummary; + +/// Describes an action on a `PointStamp`: truncation to `length` followed by `actions`. +#[derive( + Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize, Abomonation +)] +pub struct PointStampSummary { + /// Number of leading coordinates to retain. + /// + /// A `None` value indicates that all coordinates should be retained. + pub retain: Option, + /// Summary actions to apply to all coordinates. + /// + /// If `actions.len()` is greater than `retain`, a timestamp should be extended by + /// `T::minimum()` in order to be subjected to `actions`. + pub actions: Vec, +} + +impl PathSummary> for PointStampSummary { + fn results_in(&self, timestamp: &PointStamp) -> Option> { + // Get a slice of timestamp coordinates appropriate for consideration. + let timestamps = if let Some(retain) = self.retain { + if retain < timestamp.vector.len() { + ×tamp.vector[..retain] + } else { + ×tamp.vector[..] + } + } else { + ×tamp.vector[..] + }; + + let mut vector = Vec::with_capacity(std::cmp::max(timestamps.len(), self.actions.len())); + // Introduce elements where both timestamp and action exist. + let min_len = std::cmp::min(timestamps.len(), self.actions.len()); + for (action, timestamp) in self.actions.iter().zip(timestamps.iter()) { + vector.push(action.results_in(timestamp)?); + } + // Any remaining timestamps should be copied in. + for timestamp in timestamps.iter().skip(min_len) { + vector.push(timestamp.clone()); + } + // Any remaining actions should be applied to the empty timestamp. + for action in self.actions.iter().skip(min_len) { + vector.push(action.results_in(&T::minimum())?); + } + + Some(PointStamp { vector }) + } + fn followed_by(&self, other: &Self) -> Option { + // The output `retain` will be the minimum of the two inputs. + let retain = match (self.retain, other.retain) { + (Some(x), Some(y)) => Some(std::cmp::min(x, y)), + (Some(x), None) => Some(x), + (None, Some(y)) => Some(y), + (None, None) => None, + }; + + // The output `actions` will depend on the relative sizes of the input `retain`s. + let self_actions = if let Some(retain) = other.retain { + if retain < self.actions.len() { + &self.actions[..retain] + } else { + &self.actions[..] + } + } else { + &self.actions[..] + }; + + let mut actions = Vec::with_capacity(std::cmp::max(self_actions.len(), other.actions.len())); + // Introduce actions where both input actions apply. + let min_len = std::cmp::min(self_actions.len(), other.actions.len()); + for (action1, action2) in self_actions.iter().zip(other.actions.iter()) { + actions.push(action1.followed_by(action2)?); + } + // Append any remaining self actions. + actions.extend(self_actions.iter().skip(min_len).cloned()); + // Append any remaining other actions. + actions.extend(other.actions.iter().skip(min_len).cloned()); + + Some(Self { retain, actions }) + } +} + +impl PartialOrder for PointStampSummary { + fn less_equal(&self, other: &Self) -> bool { + // If the `retain`s are not the same, there is some coordinate which + // could either be bigger or smaller as the timestamp or the replacemnt. + // In principle, a `T::minimum()` extension could break this rule, and + // we could tighten this logic if needed; I think it is fine not to though. + self.retain == other.retain + && self.actions.len() <= other.actions.len() + && self + .actions + .iter() + .zip(other.actions.iter()) + .all(|(t1, t2)| t1.less_equal(t2)) + } +} + +// Implement timely dataflow's `Timestamp` trait. +use timely::progress::Timestamp; +impl Timestamp for PointStamp { + fn minimum() -> Self { + Self { vector: Vec::new() } + } + type Summary = PointStampSummary; +} + +// Implement differential dataflow's `Lattice` trait. +// This extends the `PartialOrder` implementation with additional structure. +use lattice::Lattice; +impl Lattice for PointStamp { + fn join(&self, other: &Self) -> Self { + let min_len = ::std::cmp::min(self.vector.len(), other.vector.len()); + let max_len = ::std::cmp::max(self.vector.len(), other.vector.len()); + let mut vector = Vec::with_capacity(max_len); + // For coordinates in both inputs, apply `join` to the pair. + for index in 0..min_len { + vector.push(self.vector[index].join(&other.vector[index])); + } + // Only one of the two vectors will have remaining elements; copy them. + for time in &self.vector[min_len..] { + vector.push(time.clone()); + } + for time in &other.vector[min_len..] { + vector.push(time.clone()); + } + Self { vector } + } + fn meet(&self, other: &Self) -> Self { + let min_len = ::std::cmp::min(self.vector.len(), other.vector.len()); + let mut vector = Vec::with_capacity(min_len); + // For coordinates in both inputs, apply `meet` to the pair. + for index in 0..min_len { + vector.push(self.vector[index].meet(&other.vector[index])); + } + // Remaining coordinates are `T::minimum()` in one input, and so in the output. + Self { vector } + } +} diff --git a/external/differential-dataflow/src/lib.rs b/external/differential-dataflow/src/lib.rs index c4d7693e..36de7a60 100644 --- a/external/differential-dataflow/src/lib.rs +++ b/external/differential-dataflow/src/lib.rs @@ -1,7 +1,7 @@ //! Differential dataflow is a high-throughput, low-latency data-parallel programming framework. //! //! Differential dataflow programs are written in a collection-oriented style, where you transform -//! collections of records using traditional operations like `map`, `filter`, `join`, and `group_by`. +//! collections of records using traditional operations like `map`, `filter`, `join`, and `reduce`. //! Differential dataflow also includes the less traditional operation `iterate`, which allows you //! to repeatedly apply differential dataflow transformations to collections. //! @@ -109,6 +109,7 @@ pub mod lattice; pub mod trace; pub mod input; pub mod difference; +pub mod dynamic; pub mod collection; pub mod logging; pub mod consolidation; diff --git a/external/differential-dataflow/src/operators/arrange/agent.rs b/external/differential-dataflow/src/operators/arrange/agent.rs index d126938b..5a7f3120 100644 --- a/external/differential-dataflow/src/operators/arrange/agent.rs +++ b/external/differential-dataflow/src/operators/arrange/agent.rs @@ -5,7 +5,7 @@ use std::cell::RefCell; use std::collections::VecDeque; use timely::dataflow::Scope; -use timely::dataflow::operators::generic::source; +use timely::dataflow::operators::generic::{OperatorInfo, source}; use timely::progress::Timestamp; use timely::progress::{Antichain, frontier::AntichainRef}; use timely::dataflow::operators::CapabilitySet; @@ -38,7 +38,7 @@ where physical_compaction: Antichain, temp_antichain: Antichain, - operator: ::timely::dataflow::operators::generic::OperatorInfo, + operator: OperatorInfo, logging: Option<::logging::Logger>, } @@ -89,7 +89,7 @@ where Tr::Time: Timestamp+Lattice, { /// Creates a new agent from a trace reader. - pub fn new(trace: Tr, operator: ::timely::dataflow::operators::generic::OperatorInfo, logging: Option<::logging::Logger>) -> (Self, TraceWriter) + pub fn new(trace: Tr, operator: OperatorInfo, logging: Option<::logging::Logger>) -> (Self, TraceWriter) where Tr: Trace, Tr::Batch: Batch, @@ -155,6 +155,11 @@ where reference.0.activate(); reference } + + /// The [OperatorInfo] of the underlying Timely operator + pub fn operator(&self) -> &OperatorInfo { + &self.operator + } } impl TraceAgent diff --git a/external/differential-dataflow/src/operators/arrange/arrangement.rs b/external/differential-dataflow/src/operators/arrange/arrangement.rs index 3934e80f..37a710a0 100644 --- a/external/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/external/differential-dataflow/src/operators/arrange/arrangement.rs @@ -4,7 +4,7 @@ //! structure, provides access to both an indexed form of accepted updates as well as a stream of //! batches of newly arranged updates. //! -//! Several operators (`join`, `group`, and `cogroup`, among others) are implemented against `Arranged`, +//! Several operators (`join`, `reduce`, and `count`, among others) are implemented against `Arranged`, //! and can be applied directly to arranged data instead of the collection. Internally, the operators //! will borrow the shared state, and listen on the timely stream for shared batches of data. The //! resources to index the collection---communication, computation, and memory---are spent only once, diff --git a/external/differential-dataflow/src/operators/consolidate.rs b/external/differential-dataflow/src/operators/consolidate.rs index d36f0490..32b770c2 100644 --- a/external/differential-dataflow/src/operators/consolidate.rs +++ b/external/differential-dataflow/src/operators/consolidate.rs @@ -10,10 +10,18 @@ use timely::dataflow::Scope; use ::{Collection, ExchangeData, Hashable}; use ::difference::Semigroup; -use operators::arrange::arrangement::Arrange; -/// An extension method for consolidating weighted streams. -pub trait Consolidate : Sized { +use Data; +use lattice::Lattice; + +/// Methods which require data be arrangeable. +impl Collection +where + G: Scope, + G::Timestamp: Data+Lattice, + D: ExchangeData+Hashable, + R: Semigroup+ExchangeData, +{ /// Aggregates the weights of equal records into at most one record. /// /// This method uses the type `D`'s `hashed()` method to partition the data. The data are @@ -26,7 +34,6 @@ pub trait Consolidate : Sized { /// extern crate differential_dataflow; /// /// use differential_dataflow::input::Input; - /// use differential_dataflow::operators::Consolidate; /// /// fn main() { /// ::timely::example(|scope| { @@ -40,30 +47,23 @@ pub trait Consolidate : Sized { /// }); /// } /// ``` - fn consolidate(&self) -> Self { - self.consolidate_named("Consolidate") + pub fn consolidate(&self) -> Self { + use trace::implementations::ord::OrdKeySpine as DefaultKeyTrace; + self.consolidate_named::>("Consolidate") } - /// As `consolidate` but with the ability to name the operator. - fn consolidate_named(&self, name: &str) -> Self; -} - -impl Consolidate for Collection -where - D: ExchangeData+Hashable, - R: ExchangeData+Semigroup, - G::Timestamp: ::lattice::Lattice+Ord, - { - fn consolidate_named(&self, name: &str) -> Self { - use trace::implementations::ord::OrdKeySpine as DefaultKeyTrace; + /// As `consolidate` but with the ability to name the operator and specify the trace type. + pub fn consolidate_named(&self, name: &str) -> Self + where + Tr: crate::trace::Trace+crate::trace::TraceReader+'static, + Tr::Batch: crate::trace::Batch, + { + use operators::arrange::arrangement::Arrange; self.map(|k| (k, ())) - .arrange_named::>(name) + .arrange_named::(name) .as_collection(|d: &D, _| d.clone()) } -} -/// An extension method for consolidating weighted streams. -pub trait ConsolidateStream { /// Aggregates the weights of equal records. /// /// Unlike `consolidate`, this method does not exchange data and does not @@ -79,7 +79,6 @@ pub trait ConsolidateStream { /// extern crate differential_dataflow; /// /// use differential_dataflow::input::Input; - /// use differential_dataflow::operators::consolidate::ConsolidateStream; /// /// fn main() { /// ::timely::example(|scope| { @@ -93,16 +92,7 @@ pub trait ConsolidateStream { /// }); /// } /// ``` - fn consolidate_stream(&self) -> Self; -} - -impl ConsolidateStream for Collection -where - D: ExchangeData+Hashable, - R: ExchangeData+Semigroup, - G::Timestamp: ::lattice::Lattice+Ord, - { - fn consolidate_stream(&self) -> Self { + pub fn consolidate_stream(&self) -> Self { use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::Operator; diff --git a/external/differential-dataflow/src/operators/count.rs b/external/differential-dataflow/src/operators/count.rs index c9d19234..69f8e16a 100644 --- a/external/differential-dataflow/src/operators/count.rs +++ b/external/differential-dataflow/src/operators/count.rs @@ -1,17 +1,4 @@ -//! Group records by a key, and apply a reduction function. -//! -//! The `group` operators act on data that can be viewed as pairs `(key, val)`. They group records -//! with the same key, and apply user supplied functions to the key and a list of values, which are -//! expected to populate a list of output values. -//! -//! Several variants of `group` exist which allow more precise control over how grouping is done. -//! For example, the `_by` suffixed variants take arbitrary data, but require a key-value selector -//! to be applied to each record. The `_u` suffixed variants use unsigned integers as keys, and -//! will use a dense array rather than a `HashMap` to store their keys. -//! -//! The list of values are presented as an iterator which internally merges sorted lists of values. -//! This ordering can be exploited in several cases to avoid computation when only the first few -//! elements are required. +//! Count the number of occurrences of each element. use timely::order::TotalOrder; use timely::dataflow::*; diff --git a/external/differential-dataflow/src/operators/iterate.rs b/external/differential-dataflow/src/operators/iterate.rs index 062d0225..a61efcfc 100644 --- a/external/differential-dataflow/src/operators/iterate.rs +++ b/external/differential-dataflow/src/operators/iterate.rs @@ -52,7 +52,7 @@ pub trait Iterate { /// Importantly, this method does not automatically consolidate results. /// It may be important to conclude with `consolidate()` to ensure that /// logically empty collections that contain cancelling records do not - /// result in non-termination. Operators like `group`, `distinct`, and + /// result in non-termination. Operators like `reduce`, `distinct`, and /// `count` also perform consolidation, and are safe to conclude with. /// /// # Examples @@ -63,7 +63,6 @@ pub trait Iterate { /// /// use differential_dataflow::input::Input; /// use differential_dataflow::operators::Iterate; - /// use differential_dataflow::operators::Consolidate; /// /// fn main() { /// ::timely::example(|scope| { @@ -145,7 +144,6 @@ impl Iterate for G { /// /// use differential_dataflow::input::Input; /// use differential_dataflow::operators::iterate::Variable; -/// use differential_dataflow::operators::Consolidate; /// /// fn main() { /// ::timely::example(|scope| { diff --git a/external/differential-dataflow/src/operators/join.rs b/external/differential-dataflow/src/operators/join.rs index 2db2560e..8710ac07 100644 --- a/external/differential-dataflow/src/operators/join.rs +++ b/external/differential-dataflow/src/operators/join.rs @@ -214,7 +214,7 @@ where /// /// This method is used by the various `join` implementations, but it can also be used /// directly in the event that one has a handle to an `Arranged`, perhaps because -/// the arrangement is available for re-use, or from the output of a `group` operator. +/// the arrangement is available for re-use, or from the output of a `reduce` operator. pub trait JoinCore where G::Timestamp: Lattice+Ord { /// Joins two arranged collections with the same key type. diff --git a/external/differential-dataflow/src/operators/mod.rs b/external/differential-dataflow/src/operators/mod.rs index 96f0acc9..9f048cd5 100644 --- a/external/differential-dataflow/src/operators/mod.rs +++ b/external/differential-dataflow/src/operators/mod.rs @@ -5,7 +5,6 @@ //! to several operations defined directly on the `Collection` type (e.g. `map` and `filter`). pub use self::reduce::{Reduce, Threshold, Count}; -pub use self::consolidate::Consolidate; pub use self::iterate::Iterate; pub use self::join::{Join, JoinCore}; pub use self::count::CountTotal; diff --git a/external/differential-dataflow/src/operators/reduce.rs b/external/differential-dataflow/src/operators/reduce.rs index 2b5ea9aa..45ad9246 100644 --- a/external/differential-dataflow/src/operators/reduce.rs +++ b/external/differential-dataflow/src/operators/reduce.rs @@ -239,11 +239,11 @@ where } } -/// Extension trait for the `group_arranged` differential dataflow method. +/// Extension trait for the `reduce_core` differential dataflow method. pub trait ReduceCore where G::Timestamp: Lattice+Ord { - /// Applies `group` to arranged data, and returns an arrangement of output data. + /// Applies `reduce` to arranged data, and returns an arrangement of output data. /// - /// This method is used by the more ergonomic `group`, `distinct`, and `count` methods, although + /// This method is used by the more ergonomic `reduce`, `distinct`, and `count` methods, although /// it can be very useful if one needs to manually attach and re-use existing arranged collections. /// /// # Examples diff --git a/external/differential-dataflow/src/trace/implementations/merge_batcher.rs b/external/differential-dataflow/src/trace/implementations/merge_batcher.rs index 83f4f63f..6815e807 100644 --- a/external/differential-dataflow/src/trace/implementations/merge_batcher.rs +++ b/external/differential-dataflow/src/trace/implementations/merge_batcher.rs @@ -1,5 +1,7 @@ //! A general purpose `Batcher` implementation based on radix sort. +use std::collections::VecDeque; + use timely::communication::message::RefOrMut; use timely::progress::frontier::Antichain; @@ -120,66 +122,12 @@ where } -use std::slice::{from_raw_parts}; - -pub struct VecQueue { - list: Vec, - head: usize, - tail: usize, -} - -impl VecQueue { - #[inline] - pub fn new() -> Self { VecQueue::from(Vec::new()) } - #[inline] - pub fn pop(&mut self) -> T { - debug_assert!(self.head < self.tail); - self.head += 1; - unsafe { ::std::ptr::read(self.list.as_mut_ptr().offset((self.head as isize) - 1)) } - } - #[inline] - pub fn peek(&self) -> &T { - debug_assert!(self.head < self.tail); - unsafe { self.list.get_unchecked(self.head) } - } - #[inline] - pub fn _peek_tail(&self) -> &T { - debug_assert!(self.head < self.tail); - unsafe { self.list.get_unchecked(self.tail-1) } - } - #[inline] - pub fn _slice(&self) -> &[T] { - debug_assert!(self.head < self.tail); - unsafe { from_raw_parts(self.list.get_unchecked(self.head), self.tail - self.head) } - } - #[inline] - pub fn from(mut list: Vec) -> Self { - let tail = list.len(); - unsafe { list.set_len(0); } - VecQueue { - list: list, - head: 0, - tail: tail, - } - } - // could leak, if self.head != self.tail. - #[inline] - pub fn done(self) -> Vec { - debug_assert!(self.head == self.tail); - self.list - } - #[inline] - pub fn len(&self) -> usize { self.tail - self.head } - #[inline] - pub fn is_empty(&self) -> bool { self.head == self.tail } -} - #[inline] unsafe fn push_unchecked(vec: &mut Vec, element: T) { debug_assert!(vec.len() < vec.capacity()); - let len = vec.len(); - ::std::ptr::write(vec.get_unchecked_mut(len), element); - vec.set_len(len + 1); + let idx = vec.len(); + vec.set_len(idx + 1); + ::std::ptr::write(vec.get_unchecked_mut(idx), element); } pub struct MergeSorter { @@ -277,11 +225,11 @@ impl MergeSorter { let mut output = Vec::with_capacity(list1.len() + list2.len()); let mut result = self.empty(); - let mut list1 = VecQueue::from(list1); - let mut list2 = VecQueue::from(list2); + let mut list1 = list1.into_iter(); + let mut list2 = list2.into_iter(); - let mut head1 = if !list1.is_empty() { VecQueue::from(list1.pop()) } else { VecQueue::new() }; - let mut head2 = if !list2.is_empty() { VecQueue::from(list2.pop()) } else { VecQueue::new() }; + let mut head1 = VecDeque::from(list1.next().unwrap_or_default()); + let mut head2 = VecDeque::from(list2.next().unwrap_or_default()); // while we have valid data in each input, merge. while !head1.is_empty() && !head2.is_empty() { @@ -289,16 +237,16 @@ impl MergeSorter { while (result.capacity() - result.len()) > 0 && head1.len() > 0 && head2.len() > 0 { let cmp = { - let x = head1.peek(); - let y = head2.peek(); + let x = head1.front().unwrap(); + let y = head2.front().unwrap(); (&x.0, &x.1).cmp(&(&y.0, &y.1)) }; match cmp { - Ordering::Less => { unsafe { push_unchecked(&mut result, head1.pop()); } } - Ordering::Greater => { unsafe { push_unchecked(&mut result, head2.pop()); } } + Ordering::Less => { unsafe { push_unchecked(&mut result, head1.pop_front().unwrap()); } } + Ordering::Greater => { unsafe { push_unchecked(&mut result, head2.pop_front().unwrap()); } } Ordering::Equal => { - let (data1, time1, mut diff1) = head1.pop(); - let (_data2, _time2, diff2) = head2.pop(); + let (data1, time1, mut diff1) = head1.pop_front().unwrap(); + let (_data2, _time2, diff2) = head2.pop_front().unwrap(); diff1.plus_equals(&diff2); if !diff1.is_zero() { unsafe { push_unchecked(&mut result, (data1, time1, diff1)); } @@ -313,14 +261,14 @@ impl MergeSorter { } if head1.is_empty() { - let done1 = head1.done(); + let done1 = Vec::from(head1); if done1.capacity() == Self::buffer_size() { self.stash.push(done1); } - head1 = if !list1.is_empty() { VecQueue::from(list1.pop()) } else { VecQueue::new() }; + head1 = VecDeque::from(list1.next().unwrap_or_default()); } if head2.is_empty() { - let done2 = head2.done(); + let done2 = Vec::from(head2); if done2.capacity() == Self::buffer_size() { self.stash.push(done2); } - head2 = if !list2.is_empty() { VecQueue::from(list2.pop()) } else { VecQueue::new() }; + head2 = VecDeque::from(list2.next().unwrap_or_default()); } } @@ -329,21 +277,17 @@ impl MergeSorter { if !head1.is_empty() { let mut result = self.empty(); - for _ in 0 .. head1.len() { result.push(head1.pop()); } + for item1 in head1 { result.push(item1); } output.push(result); } - while !list1.is_empty() { - output.push(list1.pop()); - } + output.extend(list1); if !head2.is_empty() { let mut result = self.empty(); - for _ in 0 .. head2.len() { result.push(head2.pop()); } + for item2 in head2 { result.push(item2); } output.push(result); } - while !list2.is_empty() { - output.push(list2.pop()); - } + output.extend(list2); output } diff --git a/external/differential-dataflow/src/trace/implementations/spine_fueled.rs b/external/differential-dataflow/src/trace/implementations/spine_fueled.rs index 45e6b58b..d279b8a0 100644 --- a/external/differential-dataflow/src/trace/implementations/spine_fueled.rs +++ b/external/differential-dataflow/src/trace/implementations/spine_fueled.rs @@ -210,11 +210,14 @@ where Some((CursorList::new(cursors, &storage), storage)) } + #[inline] fn set_logical_compaction(&mut self, frontier: AntichainRef) { self.logical_frontier.clear(); self.logical_frontier.extend(frontier.iter().cloned()); } + #[inline] fn get_logical_compaction(&mut self) -> AntichainRef { self.logical_frontier.borrow() } + #[inline] fn set_physical_compaction(&mut self, frontier: AntichainRef) { // We should never request to rewind the frontier. debug_assert!(PartialOrder::less_equal(&self.physical_frontier.borrow(), &frontier), "FAIL\tthrough frontier !<= new frontier {:?} {:?}\n", self.physical_frontier, frontier); @@ -222,8 +225,10 @@ where self.physical_frontier.extend(frontier.iter().cloned()); self.consider_merges(); } + #[inline] fn get_physical_compaction(&mut self) -> AntichainRef { self.physical_frontier.borrow() } + #[inline] fn map_batches(&self, mut f: F) { for batch in self.merging.iter().rev() { match batch { diff --git a/external/differential-dataflow/src/trace/mod.rs b/external/differential-dataflow/src/trace/mod.rs index fc340e43..6e411475 100644 --- a/external/differential-dataflow/src/trace/mod.rs +++ b/external/differential-dataflow/src/trace/mod.rs @@ -160,6 +160,7 @@ pub trait TraceReader { /// Reads the upper frontier of committed times. /// /// + #[inline] fn read_upper(&mut self, target: &mut Antichain) where Self::Time: Timestamp, diff --git a/external/differential-dataflow/src/trace/wrappers/rc.rs b/external/differential-dataflow/src/trace/wrappers/rc.rs index b31f8779..508521b8 100644 --- a/external/differential-dataflow/src/trace/wrappers/rc.rs +++ b/external/differential-dataflow/src/trace/wrappers/rc.rs @@ -61,12 +61,14 @@ where } } /// Replaces elements of `lower` with those of `upper`. + #[inline] pub fn adjust_logical_compaction(&mut self, lower: AntichainRef, upper: AntichainRef) { self.logical_compaction.update_iter(upper.iter().cloned().map(|t| (t,1))); self.logical_compaction.update_iter(lower.iter().cloned().map(|t| (t,-1))); self.trace.set_logical_compaction(self.logical_compaction.frontier()); } /// Replaces elements of `lower` with those of `upper`. + #[inline] pub fn adjust_physical_compaction(&mut self, lower: AntichainRef, upper: AntichainRef) { self.physical_compaction.update_iter(upper.iter().cloned().map(|t| (t,1))); self.physical_compaction.update_iter(lower.iter().cloned().map(|t| (t,-1))); @@ -177,4 +179,4 @@ where self.logical_compaction = Antichain::new(); self.physical_compaction = Antichain::new(); } -} \ No newline at end of file +} diff --git a/external/differential-dataflow/tests/join.rs b/external/differential-dataflow/tests/join.rs index a2d92853..2c956a4b 100644 --- a/external/differential-dataflow/tests/join.rs +++ b/external/differential-dataflow/tests/join.rs @@ -4,7 +4,7 @@ extern crate differential_dataflow; use timely::dataflow::operators::{ToStream, Capture, Map}; use timely::dataflow::operators::capture::Extract; use differential_dataflow::AsCollection; -use differential_dataflow::operators::{Consolidate, Join, Count}; +use differential_dataflow::operators::{Join, Count}; #[test] fn join() { diff --git a/external/timely-dataflow/.github/workflows/deploy.yml b/external/timely-dataflow/.github/workflows/deploy.yml index 3a50095b..7a6b87b1 100644 --- a/external/timely-dataflow/.github/workflows/deploy.yml +++ b/external/timely-dataflow/.github/workflows/deploy.yml @@ -10,7 +10,7 @@ jobs: runs-on: ubuntu-22.04 steps: - uses: actions/checkout@v3 - - run: cargo install mdbook --version 0.4.20 + - run: cargo install mdbook --version 0.4.27 - run: cd mdbook && mdbook build - uses: JamesIves/github-pages-deploy-action@v4 with: diff --git a/external/timely-dataflow/.github/workflows/test.yml b/external/timely-dataflow/.github/workflows/test.yml index 766c2a25..9a1a6be9 100644 --- a/external/timely-dataflow/.github/workflows/test.yml +++ b/external/timely-dataflow/.github/workflows/test.yml @@ -7,7 +7,7 @@ jobs: runs-on: ubuntu-22.04 steps: - uses: actions/checkout@v3 - - run: rustup update 1.60 --no-self-update && rustup default 1.60 + - run: rustup update 1.64 --no-self-update && rustup default 1.64 - run: cargo build - name: test mdBook # rustdoc doesn't build dependencies, so it needs to run after `cargo build`, diff --git a/external/timely-dataflow/Cargo.toml b/external/timely-dataflow/Cargo.toml index e7bacb48..755167b7 100644 --- a/external/timely-dataflow/Cargo.toml +++ b/external/timely-dataflow/Cargo.toml @@ -3,10 +3,8 @@ members = [ "bytes", "communication", "container", - "kafkaesque", "logging", "timely", - "experiments" ] [profile.release] diff --git a/external/timely-dataflow/communication/src/allocator/counters.rs b/external/timely-dataflow/communication/src/allocator/counters.rs index 68d1fa4a..4bee0611 100644 --- a/external/timely-dataflow/communication/src/allocator/counters.rs +++ b/external/timely-dataflow/communication/src/allocator/counters.rs @@ -2,23 +2,21 @@ use std::rc::Rc; use std::cell::RefCell; -use std::collections::VecDeque; use crate::{Push, Pull}; -use crate::allocator::Event; /// The push half of an intra-thread channel. pub struct Pusher> { index: usize, // count: usize, - events: Rc>>, + events: Rc>>, pusher: P, phantom: ::std::marker::PhantomData, } impl> Pusher { /// Wraps a pusher with a message counter. - pub fn new(pusher: P, index: usize, events: Rc>>) -> Self { + pub fn new(pusher: P, index: usize, events: Rc>>) -> Self { Pusher { index, // count: 0, @@ -36,7 +34,7 @@ impl> Push for Pusher { // if self.count != 0 { // self.events // .borrow_mut() - // .push_back((self.index, Event::Pushed(self.count))); + // .push_back(self.index); // self.count = 0; // } // } @@ -47,7 +45,7 @@ impl> Push for Pusher { // moving information along. Better, but needs cooperation. self.events .borrow_mut() - .push_back((self.index, Event::Pushed(1))); + .push(self.index); self.pusher.push(element) } @@ -59,7 +57,7 @@ use crossbeam_channel::Sender; pub struct ArcPusher> { index: usize, // count: usize, - events: Sender<(usize, Event)>, + events: Sender, pusher: P, phantom: ::std::marker::PhantomData, buzzer: crate::buzzer::Buzzer, @@ -67,7 +65,7 @@ pub struct ArcPusher> { impl> ArcPusher { /// Wraps a pusher with a message counter. - pub fn new(pusher: P, index: usize, events: Sender<(usize, Event)>, buzzer: crate::buzzer::Buzzer) -> Self { + pub fn new(pusher: P, index: usize, events: Sender, buzzer: crate::buzzer::Buzzer) -> Self { ArcPusher { index, // count: 0, @@ -99,7 +97,7 @@ impl> Push for ArcPusher { // and finally awaken the thread. Other orders are defective when // multiple threads are involved. self.pusher.push(element); - let _ = self.events.send((self.index, Event::Pushed(1))); + let _ = self.events.send(self.index); // TODO : Perhaps this shouldn't be a fatal error (e.g. in shutdown). // .expect("Failed to send message count"); self.buzzer.buzz(); @@ -110,14 +108,14 @@ impl> Push for ArcPusher { pub struct Puller> { index: usize, count: usize, - events: Rc>>, + events: Rc>>, puller: P, phantom: ::std::marker::PhantomData, } impl> Puller { /// Wraps a puller with a message counter. - pub fn new(puller: P, index: usize, events: Rc>>) -> Self { + pub fn new(puller: P, index: usize, events: Rc>>) -> Self { Puller { index, count: 0, @@ -135,7 +133,7 @@ impl> Pull for Puller { if self.count != 0 { self.events .borrow_mut() - .push_back((self.index, Event::Pulled(self.count))); + .push(self.index); self.count = 0; } } diff --git a/external/timely-dataflow/communication/src/allocator/generic.rs b/external/timely-dataflow/communication/src/allocator/generic.rs index 625de6cd..49f01407 100644 --- a/external/timely-dataflow/communication/src/allocator/generic.rs +++ b/external/timely-dataflow/communication/src/allocator/generic.rs @@ -5,11 +5,10 @@ use std::rc::Rc; use std::cell::RefCell; -use std::collections::VecDeque; use crate::allocator::thread::ThreadBuilder; use crate::allocator::process::ProcessBuilder as TypedProcessBuilder; -use crate::allocator::{Allocate, AllocateBuilder, Event, Thread, Process}; +use crate::allocator::{Allocate, AllocateBuilder, Thread, Process}; use crate::allocator::zero_copy::allocator_process::{ProcessBuilder, ProcessAllocator}; use crate::allocator::zero_copy::allocator::{TcpBuilder, TcpAllocator}; @@ -74,7 +73,7 @@ impl Generic { Generic::ZeroCopy(z) => z.release(), } } - fn events(&self) -> &Rc>> { + fn events(&self) -> &Rc>> { match self { Generic::Thread(ref t) => t.events(), Generic::Process(ref p) => p.events(), @@ -93,7 +92,7 @@ impl Allocate for Generic { fn receive(&mut self) { self.receive(); } fn release(&mut self) { self.release(); } - fn events(&self) -> &Rc>> { self.events() } + fn events(&self) -> &Rc>> { self.events() } fn await_events(&self, _duration: Option) { match self { Generic::Thread(t) => t.await_events(_duration), diff --git a/external/timely-dataflow/communication/src/allocator/mod.rs b/external/timely-dataflow/communication/src/allocator/mod.rs index 4c29b85e..e5b858f6 100644 --- a/external/timely-dataflow/communication/src/allocator/mod.rs +++ b/external/timely-dataflow/communication/src/allocator/mod.rs @@ -3,7 +3,6 @@ use std::rc::Rc; use std::cell::RefCell; use std::time::Duration; -use std::collections::VecDeque; pub use self::thread::Thread; pub use self::process::Process; @@ -50,7 +49,7 @@ pub trait Allocate { /// drain these events in order to drive their computation. If they /// fail to do so the event queue may become quite large, and turn /// into a performance problem. - fn events(&self) -> &Rc>>; + fn events(&self) -> &Rc>>; /// Awaits communication events. /// @@ -92,11 +91,3 @@ pub trait Allocate { thread::Thread::new_from(identifier, self.events().clone()) } } - -/// A communication channel event. -pub enum Event { - /// A number of messages pushed into the channel. - Pushed(usize), - /// A number of messages pulled from the channel. - Pulled(usize), -} diff --git a/external/timely-dataflow/communication/src/allocator/process.rs b/external/timely-dataflow/communication/src/allocator/process.rs index 132bc6de..07d79368 100644 --- a/external/timely-dataflow/communication/src/allocator/process.rs +++ b/external/timely-dataflow/communication/src/allocator/process.rs @@ -5,11 +5,11 @@ use std::cell::RefCell; use std::sync::{Arc, Mutex}; use std::any::Any; use std::time::Duration; -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap}; use crossbeam_channel::{Sender, Receiver}; use crate::allocator::thread::{ThreadBuilder}; -use crate::allocator::{Allocate, AllocateBuilder, Event, Thread}; +use crate::allocator::{Allocate, AllocateBuilder, Thread}; use crate::{Push, Pull, Message}; use crate::buzzer::Buzzer; @@ -25,8 +25,8 @@ pub struct ProcessBuilder { buzzers_send: Vec>, buzzers_recv: Vec>, - counters_send: Vec>, - counters_recv: Receiver<(usize, Event)>, + counters_send: Vec>, + counters_recv: Receiver, } impl AllocateBuilder for ProcessBuilder { @@ -63,8 +63,8 @@ pub struct Process { // below: `Box` is a `Box>, Receiver)>>>` channels: Arc>>>, buzzers: Vec, - counters_send: Vec>, - counters_recv: Receiver<(usize, Event)>, + counters_send: Vec>, + counters_recv: Receiver, } impl Process { @@ -174,7 +174,7 @@ impl Allocate for Process { (sends, recv) } - fn events(&self) -> &Rc>> { + fn events(&self) -> &Rc>> { self.inner.events() } @@ -184,8 +184,8 @@ impl Allocate for Process { fn receive(&mut self) { let mut events = self.inner.events().borrow_mut(); - while let Ok((index, event)) = self.counters_recv.try_recv() { - events.push_back((index, event)); + while let Ok(index) = self.counters_recv.try_recv() { + events.push(index); } } } diff --git a/external/timely-dataflow/communication/src/allocator/thread.rs b/external/timely-dataflow/communication/src/allocator/thread.rs index ba5407e4..f46e3532 100644 --- a/external/timely-dataflow/communication/src/allocator/thread.rs +++ b/external/timely-dataflow/communication/src/allocator/thread.rs @@ -5,7 +5,7 @@ use std::cell::RefCell; use std::time::Duration; use std::collections::VecDeque; -use crate::allocator::{Allocate, AllocateBuilder, Event}; +use crate::allocator::{Allocate, AllocateBuilder}; use crate::allocator::counters::Pusher as CountPusher; use crate::allocator::counters::Puller as CountPuller; use crate::{Push, Pull, Message}; @@ -22,7 +22,7 @@ impl AllocateBuilder for ThreadBuilder { /// An allocator for intra-thread communication. pub struct Thread { /// Shared counts of messages in channels. - events: Rc>>, + events: Rc>>, } impl Allocate for Thread { @@ -32,7 +32,7 @@ impl Allocate for Thread { let (pusher, puller) = Thread::new_from(identifier, self.events.clone()); (vec![Box::new(pusher)], Box::new(puller)) } - fn events(&self) -> &Rc>> { + fn events(&self) -> &Rc>> { &self.events } fn await_events(&self, duration: Option) { @@ -56,12 +56,12 @@ impl Thread { /// Allocates a new thread-local channel allocator. pub fn new() -> Self { Thread { - events: Rc::new(RefCell::new(VecDeque::new())), + events: Rc::new(RefCell::new(Default::default())), } } /// Creates a new thread-local channel from an identifier and shared counts. - pub fn new_from(identifier: usize, events: Rc>>) + pub fn new_from(identifier: usize, events: Rc>>) -> (ThreadPusher>, ThreadPuller>) { let shared = Rc::new(RefCell::new((VecDeque::>::new(), VecDeque::>::new()))); diff --git a/external/timely-dataflow/communication/src/allocator/zero_copy/allocator.rs b/external/timely-dataflow/communication/src/allocator/zero_copy/allocator.rs index 64225ff2..6ef9ef64 100644 --- a/external/timely-dataflow/communication/src/allocator/zero_copy/allocator.rs +++ b/external/timely-dataflow/communication/src/allocator/zero_copy/allocator.rs @@ -10,7 +10,6 @@ use crate::networking::MessageHeader; use crate::{Allocate, Message, Data, Push, Pull}; use crate::allocator::AllocateBuilder; -use crate::allocator::Event; use crate::allocator::canary::Canary; use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue}; @@ -229,7 +228,7 @@ impl Allocate for TcpAllocator { // Increment message count for channel. // Safe to do this even if the channel has been dropped. - events.push_back((header.channel, Event::Pushed(1))); + events.push(header.channel); // Ensure that a queue exists. match self.to_local.entry(header.channel) { @@ -269,7 +268,7 @@ impl Allocate for TcpAllocator { // } // } } - fn events(&self) -> &Rc>> { + fn events(&self) -> &Rc>> { self.inner.events() } fn await_events(&self, duration: Option) { diff --git a/external/timely-dataflow/communication/src/allocator/zero_copy/allocator_process.rs b/external/timely-dataflow/communication/src/allocator/zero_copy/allocator_process.rs index dd2815a5..74056ac2 100644 --- a/external/timely-dataflow/communication/src/allocator/zero_copy/allocator_process.rs +++ b/external/timely-dataflow/communication/src/allocator/zero_copy/allocator_process.rs @@ -10,7 +10,7 @@ use bytes::arc::Bytes; use crate::networking::MessageHeader; use crate::{Allocate, Message, Data, Push, Pull}; -use crate::allocator::{AllocateBuilder, Event}; +use crate::allocator::{AllocateBuilder}; use crate::allocator::canary::Canary; use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue}; @@ -77,7 +77,7 @@ impl ProcessBuilder { ProcessAllocator { index: self.index, peers: self.peers, - events: Rc::new(RefCell::new(VecDeque::new())), + events: Rc::new(RefCell::new(Default::default())), canaries: Rc::new(RefCell::new(Vec::new())), channel_id_bound: None, staged: Vec::new(), @@ -103,7 +103,7 @@ pub struct ProcessAllocator { index: usize, // number out of peers peers: usize, // number of peer allocators (for typed channel allocation). - events: Rc>>, + events: Rc>>, canaries: Rc>>, @@ -196,7 +196,7 @@ impl Allocate for ProcessAllocator { // Increment message count for channel. // Safe to do this even if the channel has been dropped. - events.push_back((header.channel, Event::Pushed(1))); + events.push(header.channel); // Ensure that a queue exists. match self.to_local.entry(header.channel) { @@ -237,7 +237,7 @@ impl Allocate for ProcessAllocator { // } } - fn events(&self) -> &Rc>> { + fn events(&self) -> &Rc>> { &self.events } fn await_events(&self, duration: Option) { diff --git a/external/timely-dataflow/container/src/columnation.rs b/external/timely-dataflow/container/src/columnation.rs index 97d3ecdd..54a00ca4 100644 --- a/external/timely-dataflow/container/src/columnation.rs +++ b/external/timely-dataflow/container/src/columnation.rs @@ -105,6 +105,25 @@ impl TimelyStack { pub unsafe fn local(&mut self) -> &mut [T] { &mut self.local[..] } + + /// Estimate the memory capacity in bytes. + #[inline] + pub fn heap_size(&self, mut callback: impl FnMut(usize, usize)) { + let size_of = std::mem::size_of::(); + callback(self.local.len() * size_of, self.local.capacity() * size_of); + self.inner.heap_size(callback); + } + + /// Estimate the consumed memory capacity in bytes, summing both used and total capacity. + #[inline] + pub fn summed_heap_size(&self) -> (usize, usize) { + let (mut length, mut capacity) = (0, 0); + self.heap_size(|len, cap| { + length += len; + capacity += cap + }); + (length, capacity) + } } impl TimelyStack<(A, B)> { diff --git a/external/timely-dataflow/container/src/lib.rs b/external/timely-dataflow/container/src/lib.rs index c7666783..018e3942 100644 --- a/external/timely-dataflow/container/src/lib.rs +++ b/external/timely-dataflow/container/src/lib.rs @@ -79,7 +79,14 @@ mod rc { std::ops::Deref::deref(self).capacity() } - fn clear(&mut self) { } + fn clear(&mut self) { + // Try to reuse the allocation if possible + if let Some(inner) = Rc::get_mut(self) { + inner.clear(); + } else { + *self = Self::default(); + } + } } } @@ -103,7 +110,14 @@ mod arc { std::ops::Deref::deref(self).capacity() } - fn clear(&mut self) { } + fn clear(&mut self) { + // Try to reuse the allocation if possible + if let Some(inner) = Arc::get_mut(self) { + inner.clear(); + } else { + *self = Self::default(); + } + } } } diff --git a/external/timely-dataflow/experiments/Cargo.toml b/external/timely-dataflow/experiments/Cargo.toml deleted file mode 100644 index 6a0453f1..00000000 --- a/external/timely-dataflow/experiments/Cargo.toml +++ /dev/null @@ -1,12 +0,0 @@ -[package] -name = "experiments" -version = "0.1.0" -edition = "2018" - -[dependencies] -criterion = "0.3.5" -timely = { path = "../timely" } - -[[bench]] -name = "exchange_bench" -harness = false diff --git a/external/timely-dataflow/experiments/benches/exchange_bench.rs b/external/timely-dataflow/experiments/benches/exchange_bench.rs deleted file mode 100644 index cf9337ca..00000000 --- a/external/timely-dataflow/experiments/benches/exchange_bench.rs +++ /dev/null @@ -1,104 +0,0 @@ -extern crate timely; - -use std::fmt::{Display, Formatter}; -use std::time::{Duration, Instant}; - -use criterion::black_box; -use criterion::*; - -use timely::dataflow::operators::{Exchange, Input, Probe}; -use timely::dataflow::InputHandle; -use timely::{CommunicationConfig, Config, WorkerConfig}; - -#[derive(Clone)] -struct ExperimentConfig { - threads: usize, - batch: u64, -} - -impl Display for ExperimentConfig { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "threads={:2},batch={:5}", self.threads, self.batch) - } -} - -fn bench(c: &mut Criterion) { - let mut group = c.benchmark_group("exchange"); - for threads in [1, 2, 4, 8, 16] { - for shift in [0, 4, 8, 14] { - let params = ExperimentConfig { - threads, - batch: 1u64 << shift, - }; - group.bench_with_input( - BenchmarkId::new("Default", params.clone()), - ¶ms, - move |b, params| { - b.iter_custom(|iters| { - let config = Config::process(params.threads); - black_box(experiment_exchange( - config, - params.batch, - iters, - )) - }) - }, - ); - group.bench_with_input( - BenchmarkId::new("DefaultZero", params.clone()), - ¶ms, - move |b, params| { - b.iter_custom(|iters| { - let config = Config { - communication: CommunicationConfig::ProcessBinary(params.threads), - worker: WorkerConfig::default(), - }; - black_box(experiment_exchange( - config, - params.batch, - iters, - )) - }) - }, - ); - } - } -} - -fn experiment_exchange( - config: Config, - batch: u64, - rounds: u64, -) -> Duration { - timely::execute(config, move |worker| { - let mut input = InputHandle::new(); - let probe = worker.dataflow(|scope| scope.input_from(&mut input).exchange(|x| *x).probe()); - - let mut time = 0; - let timer = Instant::now(); - - let buffer = (0..batch).collect(); - let mut copy = Vec::new(); - - for _round in 0..rounds { - copy.clone_from(&buffer); - input.send_batch(&mut copy); - copy.clear(); - time += 1; - input.advance_to(time); - while probe.less_than(input.time()) { - worker.step(); - } - } - timer.elapsed() - }) - .unwrap() - .join() - .into_iter() - .next() - .unwrap() - .unwrap() -} - -criterion_group!(benches, bench); -criterion_main!(benches); diff --git a/external/timely-dataflow/kafkaesque/Cargo.toml b/external/timely-dataflow/kafkaesque/Cargo.toml deleted file mode 100644 index 4ba07f66..00000000 --- a/external/timely-dataflow/kafkaesque/Cargo.toml +++ /dev/null @@ -1,13 +0,0 @@ -[package] -name = "kafkaesque" -version = "0.12.0" -authors = ["Frank McSherry "] -edition = "2018" - -[dependencies] -clap="*" -abomonation="0.7" -timely = { path = "../timely" } - -[dependencies.rdkafka] -version = "0.23.0" diff --git a/external/timely-dataflow/kafkaesque/src/bin/capture_recv.rs b/external/timely-dataflow/kafkaesque/src/bin/capture_recv.rs deleted file mode 100644 index 862d44cb..00000000 --- a/external/timely-dataflow/kafkaesque/src/bin/capture_recv.rs +++ /dev/null @@ -1,46 +0,0 @@ -use timely::dataflow::operators::Inspect; -use timely::dataflow::operators::capture::Replay; -use timely::dataflow::operators::Accumulate; - -use rdkafka::config::ClientConfig; - -use kafkaesque::EventConsumer; - -fn main() { - timely::execute_from_args(std::env::args(), |worker| { - - let topic = std::env::args().nth(1).unwrap(); - let source_peers = std::env::args().nth(2).unwrap().parse::().unwrap(); - let brokers = "localhost:9092"; - - // Create Kafka stuff. - let mut consumer_config = ClientConfig::new(); - consumer_config - .set("produce.offset.report", "true") - .set("auto.offset.reset", "smallest") - .set("group.id", "example") - .set("enable.auto.commit", "false") - .set("enable.partition.eof", "false") - .set("auto.offset.reset", "earliest") - .set("session.timeout.ms", "6000") - .set("bootstrap.servers", &brokers); - - // create replayers from disjoint partition of source worker identifiers. - let replayers = - (0 .. source_peers) - .filter(|i| i % worker.peers() == worker.index()) - .map(|i| { - let topic = format!("{}-{:?}", topic, i); - EventConsumer::<_,u64>::new(consumer_config.clone(), topic) - }) - .collect::>(); - - worker.dataflow::(|scope| { - replayers - .replay_into(scope) - .count() - .inspect(|x| println!("replayed: {:?}", x)) - ; - }) - }).unwrap(); // asserts error-free execution -} diff --git a/external/timely-dataflow/kafkaesque/src/bin/capture_send.rs b/external/timely-dataflow/kafkaesque/src/bin/capture_send.rs deleted file mode 100644 index ea250250..00000000 --- a/external/timely-dataflow/kafkaesque/src/bin/capture_send.rs +++ /dev/null @@ -1,31 +0,0 @@ -use timely::dataflow::operators::ToStream; -use timely::dataflow::operators::capture::Capture; - -use rdkafka::config::ClientConfig; - -use kafkaesque::EventProducer; - -fn main() { - timely::execute_from_args(std::env::args(), |worker| { - - // target topic name. - let topic = std::env::args().nth(1).unwrap(); - let count = std::env::args().nth(2).unwrap().parse::().unwrap(); - let brokers = "localhost:9092"; - - // Create Kafka stuff. - let mut producer_config = ClientConfig::new(); - producer_config - .set("produce.offset.report", "true") - .set("bootstrap.servers", brokers); - - let topic = format!("{}-{:?}", topic, worker.index()); - let producer = EventProducer::new(producer_config, topic); - - worker.dataflow::(|scope| - (0 .. count) - .to_stream(scope) - .capture_into(producer) - ); - }).unwrap(); -} diff --git a/external/timely-dataflow/kafkaesque/src/bin/kafka_source.rs b/external/timely-dataflow/kafkaesque/src/bin/kafka_source.rs deleted file mode 100644 index f71f8df7..00000000 --- a/external/timely-dataflow/kafkaesque/src/bin/kafka_source.rs +++ /dev/null @@ -1,63 +0,0 @@ -use timely::dataflow::operators::Inspect; - -use rdkafka::config::ClientConfig; -use rdkafka::consumer::{Consumer, BaseConsumer, DefaultConsumerContext}; - -fn main() { - - let mut args = ::std::env::args(); - args.next(); - - // Extract Kafka topic. - let topic = args.next().expect("Must specify a Kafka topic"); - let brokers = "localhost:9092"; - - // Create Kafka consumer configuration. - // Feel free to change parameters here. - let mut consumer_config = ClientConfig::new(); - consumer_config - .set("produce.offset.report", "true") - .set("auto.offset.reset", "smallest") - .set("group.id", "example") - .set("enable.auto.commit", "false") - .set("enable.partition.eof", "false") - .set("auto.offset.reset", "earliest") - .set("session.timeout.ms", "6000") - .set("bootstrap.servers", &brokers); - - timely::execute_from_args(args, move |worker| { - - // A dataflow for producing spans. - worker.dataflow::(|scope| { - - // Create a Kafka consumer. - let consumer : BaseConsumer = consumer_config.create().expect("Couldn't create consumer"); - consumer.subscribe(&[&topic]).expect("Failed to subscribe to topic"); - - let strings = - kafkaesque::source(scope, "KafkaStringSource", consumer, |bytes, capability, output| { - - // If the bytes are utf8, convert to string and send. - if let Ok(text) = std::str::from_utf8(bytes) { - output - .session(capability) - .give(text.to_string()); - } - - // We need some rule to advance timestamps ... - let time = *capability.time(); - capability.downgrade(&(time + 1)); - - // Indicate that we are not yet done. - false - }); - - strings.inspect(|x| println!("Observed: {:?}", x)); - - }); - - }).expect("Timely computation failed somehow"); - - println!("Hello, world!"); -} - diff --git a/external/timely-dataflow/kafkaesque/src/kafka_source.rs b/external/timely-dataflow/kafkaesque/src/kafka_source.rs deleted file mode 100644 index 0167913d..00000000 --- a/external/timely-dataflow/kafkaesque/src/kafka_source.rs +++ /dev/null @@ -1,138 +0,0 @@ -use timely::Data; -use timely::dataflow::{Scope, Stream}; -use timely::dataflow::operators::Capability; -use timely::dataflow::operators::generic::OutputHandle; -use timely::dataflow::channels::pushers::Tee; - -use rdkafka::Message; -use rdkafka::consumer::{ConsumerContext, BaseConsumer}; - -/// Constructs a stream of data from a Kafka consumer. -/// -/// This method assembles a stream of data from a Kafka consumer and supplied -/// user logic for determining how to interpret the binary data Kafka supplies. -/// -/// The user logic is provided binary data as `&[u8]`, and mutable references to -/// a capability and an output handle, which the logic should use to produce data -/// if it is so inclined. The logic must return a bool indicating whether the stream -/// is complete (true indicates that the operator should cease data production and -/// shut down). -/// -/// # Examples -/// ```rust,no_run -/// use timely::dataflow::operators::Inspect; -/// -/// use rdkafka::Message; -/// use rdkafka::config::ClientConfig; -/// use rdkafka::consumer::{Consumer, BaseConsumer, DefaultConsumerContext}; -/// -/// fn main() { -/// -/// let mut args = ::std::env::args(); -/// args.next(); -/// -/// // Extract Kafka topic. -/// let topic = args.next().expect("Must specify a Kafka topic"); -/// let brokers = "localhost:9092"; -/// -/// // Create Kafka consumer configuration. -/// // Feel free to change parameters here. -/// let mut consumer_config = ClientConfig::new(); -/// consumer_config -/// .set("produce.offset.report", "true") -/// .set("auto.offset.reset", "smallest") -/// .set("group.id", "example") -/// .set("enable.auto.commit", "false") -/// .set("enable.partition.eof", "false") -/// .set("auto.offset.reset", "earliest") -/// .set("session.timeout.ms", "6000") -/// .set("bootstrap.servers", &brokers); -/// -/// timely::execute_from_args(args, move |worker| { -/// -/// // A dataflow for producing spans. -/// worker.dataflow::(|scope| { -/// -/// // Create a Kafka consumer. -/// let consumer : BaseConsumer = consumer_config.create().expect("Couldn't create consumer"); -/// consumer.subscribe(&[&topic]).expect("Failed to subscribe to topic"); -/// -/// let strings = -/// kafkaesque::source(scope, "KafkaStringSource", consumer, |bytes, capability, output| { -/// -/// // If the bytes are utf8, convert to string and send. -/// if let Ok(text) = std::str::from_utf8(bytes) { -/// output -/// .session(capability) -/// .give(text.to_string()); -/// } -/// -/// // We need some rule to advance timestamps ... -/// let time = *capability.time(); -/// capability.downgrade(&(time + 1)); -/// -/// // Indicate that we are not yet done. -/// false -/// }); -/// -/// strings.inspect(|x| println!("Observed: {:?}", x)); -/// -/// }); -/// -/// }).expect("Timely computation failed somehow"); -/// -/// println!("Hello, world!"); -/// } -/// ``` -pub fn kafka_source( - scope: &G, - name: &str, - consumer: BaseConsumer, - logic: L -) -> Stream -where - C: ConsumerContext+'static, - G: Scope, - D: Data, - L: Fn(&[u8], - &mut Capability, - &mut OutputHandle>) -> bool+'static, -{ - use timely::dataflow::operators::generic::source; - source(scope, name, move |capability, info| { - - let activator = scope.activator_for(&info.address[..]); - let mut cap = Some(capability); - - // define a closure to call repeatedly. - move |output| { - - // Act only if we retain the capability to send data. - let mut complete = false; - if let Some(mut capability) = cap.as_mut() { - - // Indicate that we should run again. - activator.activate(); - - // Repeatedly interrogate Kafka for [u8] messages. - // Cease only when Kafka stops returning new data. - // Could cease earlier, if we had a better policy. - while let Some(result) = consumer.poll(std::time::Duration::from_millis(0)) { - // If valid data back from Kafka - if let Ok(message) = result { - // Attempt to interpret bytes as utf8 ... - if let Some(payload) = message.payload() { - complete = logic(payload, &mut capability, output) || complete; - } - } - else { - println!("Kafka error"); - } - } - } - - if complete { cap = None; } - } - - }) -} \ No newline at end of file diff --git a/external/timely-dataflow/kafkaesque/src/lib.rs b/external/timely-dataflow/kafkaesque/src/lib.rs deleted file mode 100644 index ba91061e..00000000 --- a/external/timely-dataflow/kafkaesque/src/lib.rs +++ /dev/null @@ -1,128 +0,0 @@ -use std::sync::Arc; -use std::sync::atomic::{AtomicIsize, Ordering}; - -use abomonation::Abomonation; -use timely::dataflow::operators::capture::event::{EventCore, EventPusherCore, EventIteratorCore}; - -use rdkafka::Message; -use rdkafka::client::ClientContext; -use rdkafka::config::ClientConfig; -use rdkafka::producer::{BaseProducer, BaseRecord, ProducerContext, DeliveryResult}; -use rdkafka::consumer::{Consumer, BaseConsumer, DefaultConsumerContext}; - -use rdkafka::config::FromClientConfigAndContext; - -pub mod kafka_source; -pub use kafka_source::kafka_source as source; - -struct OutstandingCounterContext { - outstanding: Arc, -} - -impl ClientContext for OutstandingCounterContext { } - -impl ProducerContext for OutstandingCounterContext { - type DeliveryOpaque = (); - fn delivery(&self, _report: &DeliveryResult, _: Self::DeliveryOpaque) { - self.outstanding.fetch_sub(1, Ordering::SeqCst); - } -} - -impl OutstandingCounterContext { - pub fn new(counter: &Arc) -> Self { - OutstandingCounterContext { - outstanding: counter.clone() - } - } -} - -/// A wrapper for `W: Write` implementing `EventPusher`. -pub struct EventProducerCore { - topic: String, - buffer: Vec, - producer: BaseProducer, - counter: Arc, - phant: ::std::marker::PhantomData<(T,D)>, -} - -/// [EventProducerCore] specialized to vector-based containers. -pub type EventProducer = EventProducerCore>; - -impl EventProducerCore { - /// Allocates a new `EventWriter` wrapping a supplied writer. - pub fn new(config: ClientConfig, topic: String) -> Self { - let counter = Arc::new(AtomicIsize::new(0)); - let context = OutstandingCounterContext::new(&counter); - let producer = BaseProducer::::from_config_and_context(&config, context).expect("Couldn't create producer"); - println!("allocating producer for topic {:?}", topic); - Self { - topic: topic, - buffer: vec![], - producer: producer, - counter: counter, - phant: ::std::marker::PhantomData, - } - } -} - -impl EventPusherCore for EventProducerCore { - fn push(&mut self, event: EventCore) { - unsafe { ::abomonation::encode(&event, &mut self.buffer).expect("Encode failure"); } - // println!("sending {:?} bytes", self.buffer.len()); - self.producer.send::<(),[u8]>(BaseRecord::to(self.topic.as_str()).payload(&self.buffer[..])).unwrap(); - self.counter.fetch_add(1, Ordering::SeqCst); - self.producer.poll(std::time::Duration::from_millis(0)); - self.buffer.clear(); - } -} - -impl Drop for EventProducerCore { - fn drop(&mut self) { - while self.counter.load(Ordering::SeqCst) > 0 { - self.producer.poll(std::time::Duration::from_millis(10)); - } - } -} - -/// A Wrapper for `R: Read` implementing `EventIterator`. -pub struct EventConsumerCore { - consumer: BaseConsumer, - buffer: Vec, - phant: ::std::marker::PhantomData<(T,D)>, -} - -/// [EventConsumerCore] specialized to vector-based containers. -pub type EventConsumer = EventConsumerCore>; - -impl EventConsumerCore { - /// Allocates a new `EventReader` wrapping a supplied reader. - pub fn new(config: ClientConfig, topic: String) -> Self { - println!("allocating consumer for topic {:?}", topic); - let consumer : BaseConsumer = config.create().expect("Couldn't create consumer"); - consumer.subscribe(&[&topic]).expect("Failed to subscribe to topic"); - Self { - consumer: consumer, - buffer: Vec::new(), - phant: ::std::marker::PhantomData, - } - } -} - -impl EventIteratorCore for EventConsumerCore { - fn next(&mut self) -> Option<&EventCore> { - if let Some(result) = self.consumer.poll(std::time::Duration::from_millis(0)) { - match result { - Ok(message) => { - self.buffer.clear(); - self.buffer.extend_from_slice(message.payload().unwrap()); - Some(unsafe { ::abomonation::decode::>(&mut self.buffer[..]).unwrap().0 }) - }, - Err(err) => { - println!("KafkaConsumer error: {:?}", err); - None - }, - } - } - else { None } - } -} diff --git a/external/timely-dataflow/mdbook/src/chapter_2/chapter_2_4.md b/external/timely-dataflow/mdbook/src/chapter_2/chapter_2_4.md index ecbfe09c..00ab0042 100644 --- a/external/timely-dataflow/mdbook/src/chapter_2/chapter_2_4.md +++ b/external/timely-dataflow/mdbook/src/chapter_2/chapter_2_4.md @@ -118,7 +118,7 @@ The system is smart enough to notice when you downgrade and discard capabilities It may seem that we have only considered stateless operators, those that are only able to read from their inputs and immediately write to their outputs. But, you can have whatever state that you like, using the magic of Rust's closures. When we write a closure, it can capture ("close over") any state that is currently in scope, taking ownership of it. This is actually what we did up above with the capability. If that sounds too abstract, let's look at an example. -Our `unary` example from way back just incremented the value and passed it along. What if we wanted to only pass values larger than any value we have seen so far? We just define a variable `max` which we check and update as we would normally. Importantly, we should define it *outside* the closure we return, so that it persists across calls, and we need to use the `move` keyword so that the closure knows it is supposed to take ownership of the variable. +Our `unary` example from way back just incremented the value and passed it along. What if we wanted to only pass values larger than any value we have seen so far? We just define a variable `maximum` which we check and update as we would normally. Importantly, we should define it *outside* the closure we return, so that it persists across calls, and we need to use the `move` keyword so that the closure knows it is supposed to take ownership of the variable. ```rust extern crate timely; @@ -155,7 +155,7 @@ fn main() { This example just captures an integer, but you could just as easily define and capture ownership of a `HashMap`, or whatever complicated state you would like repeated access to. -Bear in mind that this example is probably a bit wrong, in that we update `max` without paying any attention to the times of the data that come past, and so we may report a sequence of values that doesn't seem to correspond with the sequence when sorted by time. Writing sane operators in the presence of batches of data at shuffled times requires more thought. Specifically, for an operator to put its input back in order it needs to understand which times it might see in the future, which was the reason we were so careful about those capabilities and is the subject of the next subsection. +Bear in mind that this example is probably a bit wrong, in that we update `maximum` without paying any attention to the times of the data that come past, and so we may report a sequence of values that doesn't seem to correspond with the sequence when sorted by time. Writing sane operators in the presence of batches of data at shuffled times requires more thought. Specifically, for an operator to put its input back in order it needs to understand which times it might see in the future, which was the reason we were so careful about those capabilities and is the subject of the next subsection. ### Frontiered operators @@ -163,7 +163,7 @@ Timely dataflow is constantly tracking the capabilities of operators throughout Specifically, each input has a `frontier` method which returns a `&[Timestamp]`, indicating a list of times such that any future time must be greater or equal to some element of the list. Often this list will just have a single element, indicating the "current" time, but as we get to more complicated forms of time ("partially ordered" time, if that means anything to you yet) we may need to report multiple incomparable timestamps. -This frontier information is invaluable for operators that must be sure that their output is correct and final before they send it as output. For our `max` example, we will want to wait to apply the new maximum until we are sure that we will not see any more elements at earlier times. That isn't to say we can't do anything with data we receive "early"; in the case of the maximum, each batch at a given time can be reduced down to just its maximum value, as all received values would be applied simultaneously. +This frontier information is invaluable for operators that must be sure that their output is correct and final before they send it as output. For our `maximum` example, we will want to wait to apply the new maximum until we are sure that we will not see any more elements at earlier times. That isn't to say we can't do anything with data we receive "early"; in the case of the maximum, each batch at a given time can be reduced down to just its maximum value, as all received values would be applied simultaneously. To make life easier for you, we've written a helper type called `Notificator` whose job in life is to help you keep track of times that you would like to send outputs, and to tell you when (according to your input frontiers) it is now safe to send the data. In fact, notificators do more by holding on to the *capabilities* for you, so that you can be sure that, even if you *don't* receive any more messages but just an indication that there will be none, you will still retain the ability to send your messages. diff --git a/external/timely-dataflow/timely/Cargo.toml b/external/timely-dataflow/timely/Cargo.toml index bb987ade..b0786a5a 100644 --- a/external/timely-dataflow/timely/Cargo.toml +++ b/external/timely-dataflow/timely/Cargo.toml @@ -32,7 +32,6 @@ timely_logging = { path = "../logging", version = "0.12" } timely_communication = { path = "../communication", version = "0.12", default-features = false } timely_container = { path = "../container", version = "0.12" } crossbeam-channel = "0.5.0" -futures-util = "0.3" [dev-dependencies] # timely_sort="0.1.6" diff --git a/external/timely-dataflow/timely/examples/rc.rs b/external/timely-dataflow/timely/examples/rc.rs index 18f6fe97..f79ef747 100644 --- a/external/timely-dataflow/timely/examples/rc.rs +++ b/external/timely-dataflow/timely/examples/rc.rs @@ -8,7 +8,7 @@ use abomonation::Abomonation; #[derive(Debug, Clone)] pub struct Test { - field: Rc, + _field: Rc, } impl Abomonation for Test { @@ -32,7 +32,7 @@ fn main() { // introduce data and watch! for round in 0..10 { - input.send(Test { field: Rc::new(round) } ); + input.send(Test { _field: Rc::new(round) } ); input.advance_to(round + 1); worker.step_while(|| probe.less_than(input.time())); } diff --git a/external/timely-dataflow/timely/src/dataflow/channels/pullers/counter.rs b/external/timely-dataflow/timely/src/dataflow/channels/pullers/counter.rs index 8bc8606e..8f9bbbf0 100644 --- a/external/timely-dataflow/timely/src/dataflow/channels/pullers/counter.rs +++ b/external/timely-dataflow/timely/src/dataflow/channels/pullers/counter.rs @@ -16,15 +16,23 @@ pub struct Counter>> { } /// A guard type that updates the change batch counts on drop -pub struct ConsumedGuard<'a, T: Ord + Clone + 'static> { - consumed: &'a Rc>>, +pub struct ConsumedGuard { + consumed: Rc>>, time: Option, len: usize, } -impl<'a, T:Ord+Clone+'static> Drop for ConsumedGuard<'a, T> { +impl ConsumedGuard { + pub(crate) fn time(&self) -> &T { + &self.time.as_ref().unwrap() + } +} + +impl Drop for ConsumedGuard { fn drop(&mut self) { - self.consumed.borrow_mut().update(self.time.take().unwrap(), self.len as i64); + // SAFETY: we're in a Drop impl, so this runs at most once + let time = self.time.take().unwrap(); + self.consumed.borrow_mut().update(time, self.len as i64); } } @@ -36,17 +44,14 @@ impl>> Counter Option<(ConsumedGuard<'_, T>, &mut BundleCore)> { + pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard, &mut BundleCore)> { if let Some(message) = self.pullable.pull() { - if message.data.len() > 0 { - let guard = ConsumedGuard { - consumed: &self.consumed, - time: Some(message.time.clone()), - len: message.data.len(), - }; - Some((guard, message)) - } - else { None } + let guard = ConsumedGuard { + consumed: Rc::clone(&self.consumed), + time: Some(message.time.clone()), + len: message.data.len(), + }; + Some((guard, message)) } else { None } } diff --git a/external/timely-dataflow/timely/src/dataflow/channels/pushers/buffer.rs b/external/timely-dataflow/timely/src/dataflow/channels/pushers/buffer.rs index 6f92dabb..d18a0f84 100644 --- a/external/timely-dataflow/timely/src/dataflow/channels/pushers/buffer.rs +++ b/external/timely-dataflow/timely/src/dataflow/channels/pushers/buffer.rs @@ -71,16 +71,19 @@ impl>> BufferCore where T: Eq // Gives an entire container at a specific time. fn give_container(&mut self, vector: &mut C) { - // flush to ensure fifo-ness - self.flush(); + if !vector.is_empty() { + // flush to ensure fifo-ness + self.flush(); - let time = self.time.as_ref().expect("Buffer::give_container(): time is None.").clone(); - Message::push_at(vector, time, &mut self.pusher); + let time = self.time.as_ref().expect("Buffer::give_container(): time is None.").clone(); + Message::push_at(vector, time, &mut self.pusher); + } } } impl>> Buffer where T: Eq+Clone { // internal method for use by `Session`. + #[inline] fn give(&mut self, data: D) { if self.buffer.capacity() < crate::container::buffer::default_capacity::() { let to_reserve = crate::container::buffer::default_capacity::() - self.buffer.capacity(); diff --git a/external/timely-dataflow/timely/src/dataflow/operators/capability.rs b/external/timely-dataflow/timely/src/dataflow/operators/capability.rs index a63d8337..5caa9bce 100644 --- a/external/timely-dataflow/timely/src/dataflow/operators/capability.rs +++ b/external/timely-dataflow/timely/src/dataflow/operators/capability.rs @@ -27,6 +27,7 @@ use std::cell::RefCell; use std::fmt::{self, Debug}; use crate::order::PartialOrder; +use crate::progress::Antichain; use crate::progress::Timestamp; use crate::progress::ChangeBatch; use crate::scheduling::Activations; @@ -223,41 +224,49 @@ impl Display for DowngradeError { impl Error for DowngradeError {} +/// A shared list of shared output capability buffers. type CapabilityUpdates = Rc>>>>>; -/// An unowned capability, which can be used but not retained. +/// An capability of an input port. Holding onto this capability will implicitly holds onto a +/// capability for all the outputs ports this input is connected to, after the connection summaries +/// have been applied. /// -/// The capability reference supplies a `retain(self)` method which consumes the reference -/// and turns it into an owned capability -pub struct CapabilityRef<'cap, T: Timestamp+'cap> { - time: &'cap T, +/// This input capability supplies a `retain_for_output(self)` method which consumes the input +/// capability and turns it into a [Capability] for a specific output port. +pub struct InputCapability { + /// Output capability buffers, for use in minting capabilities. internal: CapabilityUpdates, - /// A drop guard that updates the consumed capability this CapabilityRef refers to on drop - _consumed_guard: ConsumedGuard<'cap, T>, + /// Timestamp summaries for each output. + summaries: Rc>>>, + /// A drop guard that updates the consumed capability this InputCapability refers to on drop + consumed_guard: ConsumedGuard, } -impl<'cap, T: Timestamp+'cap> CapabilityTrait for CapabilityRef<'cap, T> { - fn time(&self) -> &T { self.time } +impl CapabilityTrait for InputCapability { + fn time(&self) -> &T { self.time() } fn valid_for_output(&self, query_buffer: &Rc>>) -> bool { - // let borrow = ; - self.internal.borrow().iter().any(|rc| Rc::ptr_eq(rc, query_buffer)) + let borrow = self.summaries.borrow(); + self.internal.borrow().iter().enumerate().any(|(index, rc)| { + // To be valid, the output buffer must match and the timestamp summary needs to be the default. + Rc::ptr_eq(rc, query_buffer) && borrow[index].len() == 1 && borrow[index][0] == Default::default() + }) } } -impl<'cap, T: Timestamp + 'cap> CapabilityRef<'cap, T> { +impl InputCapability { /// Creates a new capability reference at `time` while incrementing (and keeping a reference to) /// the provided [`ChangeBatch`]. - pub(crate) fn new(time: &'cap T, internal: CapabilityUpdates, guard: ConsumedGuard<'cap, T>) -> Self { - CapabilityRef { - time, + pub(crate) fn new(internal: CapabilityUpdates, summaries: Rc>>>, guard: ConsumedGuard) -> Self { + InputCapability { internal, - _consumed_guard: guard, + summaries, + consumed_guard: guard, } } /// The timestamp associated with this capability. pub fn time(&self) -> &T { - self.time + self.consumed_guard.time() } /// Makes a new capability for a timestamp `new_time` greater or equal to the timestamp of @@ -270,15 +279,11 @@ impl<'cap, T: Timestamp + 'cap> CapabilityRef<'cap, T> { /// Delays capability for a specific output port. pub fn delayed_for_output(&self, new_time: &T, output_port: usize) -> Capability { - // TODO : Test operator summary? - if !self.time.less_equal(new_time) { - panic!("Attempted to delay {:?} to {:?}, which is not beyond the capability's time.", self, new_time); - } - if output_port < self.internal.borrow().len() { + use crate::progress::timestamp::PathSummary; + if self.summaries.borrow()[output_port].iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) { Capability::new(new_time.clone(), self.internal.borrow()[output_port].clone()) - } - else { - panic!("Attempted to acquire a capability for a non-existent output port."); + } else { + panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", new_time, self.summaries.borrow()[output_port], self.time()); } } @@ -287,34 +292,39 @@ impl<'cap, T: Timestamp + 'cap> CapabilityRef<'cap, T> { /// This method produces an owned capability which must be dropped to release the /// capability. Users should take care that these capabilities are only stored for /// as long as they are required, as failing to drop them may result in livelock. + /// + /// This method panics if the timestamp summary to output zero strictly advances the time. pub fn retain(self) -> Capability { - // mint(self.time.clone(), self.internal) self.retain_for_output(0) } /// Transforms to an owned capability for a specific output port. + /// + /// This method panics if the timestamp summary to `output_port` strictly advances the time. pub fn retain_for_output(self, output_port: usize) -> Capability { - if output_port < self.internal.borrow().len() { - Capability::new(self.time.clone(), self.internal.borrow()[output_port].clone()) + use crate::progress::timestamp::PathSummary; + let self_time = self.time().clone(); + if self.summaries.borrow()[output_port].iter().flat_map(|summary| summary.results_in(&self_time)).any(|time| time.less_equal(&self_time)) { + Capability::new(self_time, self.internal.borrow()[output_port].clone()) } else { - panic!("Attempted to acquire a capability for a non-existent output port."); + panic!("Attempted to retain a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", self_time, self.summaries.borrow()[output_port], self_time); } } } -impl<'cap, T: Timestamp> Deref for CapabilityRef<'cap, T> { +impl Deref for InputCapability { type Target = T; fn deref(&self) -> &T { - self.time + self.time() } } -impl<'cap, T: Timestamp> Debug for CapabilityRef<'cap, T> { +impl Debug for InputCapability { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("CapabilityRef") - .field("time", &self.time) + f.debug_struct("InputCapability") + .field("time", self.time()) .field("internal", &"...") .finish() } diff --git a/external/timely-dataflow/timely/src/dataflow/operators/enterleave.rs b/external/timely-dataflow/timely/src/dataflow/operators/enterleave.rs index bd9b1f86..24c8fa12 100644 --- a/external/timely-dataflow/timely/src/dataflow/operators/enterleave.rs +++ b/external/timely-dataflow/timely/src/dataflow/operators/enterleave.rs @@ -21,6 +21,7 @@ use std::marker::PhantomData; +use crate::logging::{TimelyLogger, MessagesEvent}; use crate::progress::Timestamp; use crate::progress::timestamp::Refines; use crate::progress::{Source, Target}; @@ -90,17 +91,26 @@ impl, C: Data+Container> Enter::new(); let ingress = IngressNub { targets: CounterCore::new(targets), - phantom: ::std::marker::PhantomData, + phantom: PhantomData, activator: scope.activator_for(&scope.addr()), active: false, }; let produced = ingress.targets.produced().clone(); - let input = scope.subgraph.borrow_mut().new_input(produced); - let channel_id = scope.clone().new_identifier(); - self.connect_to(input, ingress, channel_id); - StreamCore::new(Source::new(0, input.port), registrar, scope.clone()) + + if let Some(logger) = scope.logging() { + let pusher = LogPusher::new(ingress, channel_id, scope.index(), logger); + self.connect_to(input, pusher, channel_id); + } else { + self.connect_to(input, ingress, channel_id); + } + + StreamCore::new( + Source::new(0, input.port), + registrar, + scope.clone(), + ) } } @@ -129,9 +139,17 @@ impl<'a, G: Scope, D: Clone+Container, T: Timestamp+Refines> Leave let scope = self.scope(); let output = scope.subgraph.borrow_mut().new_output(); + let target = Target::new(0, output.port); let (targets, registrar) = TeeCore::::new(); + let egress = EgressNub { targets, phantom: PhantomData }; let channel_id = scope.clone().new_identifier(); - self.connect_to(Target::new(0, output.port), EgressNub { targets, phantom: PhantomData }, channel_id); + + if let Some(logger) = scope.logging() { + let pusher = LogPusher::new(egress, channel_id, scope.index(), logger); + self.connect_to(target, pusher, channel_id); + } else { + self.connect_to(target, egress, channel_id); + } StreamCore::new( output, @@ -197,6 +215,61 @@ where TOuter: Timestamp, TInner: Timestamp+Refines, TData: Data { } } +/// A pusher that logs messages passing through it. +/// +/// This type performs the same function as the `LogPusher` and `LogPuller` types in +/// `timely::dataflow::channels::pact`. We need a special implementation for `enter`/`leave` +/// channels because those don't have a puller connected. Thus, this pusher needs to log both the +/// send and the receive `MessageEvent`. +struct LogPusher

{ + pusher: P, + channel: usize, + counter: usize, + index: usize, + logger: TimelyLogger, +} + +impl

LogPusher

{ + fn new(pusher: P, channel: usize, index: usize, logger: TimelyLogger) -> Self { + Self { + pusher, + channel, + counter: 0, + index, + logger, + } + } +} + +impl Push> for LogPusher

+where + D: Container, + P: Push>, +{ + fn push(&mut self, element: &mut Option>) { + if let Some(bundle) = element { + let send_event = MessagesEvent { + is_send: true, + channel: self.channel, + source: self.index, + target: self.index, + seq_no: self.counter, + length: bundle.data.len(), + }; + let recv_event = MessagesEvent { + is_send: false, + ..send_event + }; + + self.logger.log(send_event); + self.logger.log(recv_event); + self.counter += 1; + } + + self.pusher.push(element); + } +} + #[cfg(test)] mod test { /// Test that nested scopes with pass-through edges (no operators) correctly communicate progress. diff --git a/external/timely-dataflow/timely/src/dataflow/operators/generic/builder_rc.rs b/external/timely-dataflow/timely/src/dataflow/operators/generic/builder_rc.rs index cc505e7e..ca80e318 100644 --- a/external/timely-dataflow/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/external/timely-dataflow/timely/src/dataflow/operators/generic/builder_rc.rs @@ -31,6 +31,8 @@ pub struct OperatorBuilder { frontier: Vec>, consumed: Vec>>>, internal: Rc>>>>>, + /// For each input, a shared list of summaries to each output. + summaries: Vec::Summary>>>>>, produced: Vec>>>, logging: Option, } @@ -45,6 +47,7 @@ impl OperatorBuilder { frontier: Vec::new(), consumed: Vec::new(), internal: Rc::new(RefCell::new(Vec::new())), + summaries: Vec::new(), produced: Vec::new(), logging, } @@ -76,13 +79,16 @@ impl OperatorBuilder { where P: ParallelizationContractCore { - let puller = self.builder.new_input_connection(stream, pact, connection); + let puller = self.builder.new_input_connection(stream, pact, connection.clone()); let input = PullCounter::new(puller); self.frontier.push(MutableAntichain::new()); self.consumed.push(input.consumed().clone()); - new_input_handle(input, self.internal.clone(), self.logging.clone()) + let shared_summary = Rc::new(RefCell::new(connection)); + self.summaries.push(shared_summary.clone()); + + new_input_handle(input, self.internal.clone(), shared_summary, self.logging.clone()) } /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. @@ -101,7 +107,7 @@ impl OperatorBuilder { /// antichain indicating that there is no connection from the input to the output. pub fn new_output_connection(&mut self, connection: Vec::Summary>>) -> (OutputWrapper>, StreamCore) { - let (tee, stream) = self.builder.new_output_connection(connection); + let (tee, stream) = self.builder.new_output_connection(connection.clone()); let internal = Rc::new(RefCell::new(ChangeBatch::new())); self.internal.borrow_mut().push(internal.clone()); @@ -109,6 +115,10 @@ impl OperatorBuilder { let mut buffer = PushBuffer::new(PushCounter::new(tee)); self.produced.push(buffer.inner().produced().clone()); + for (summary, connection) in self.summaries.iter().zip(connection.into_iter()) { + summary.borrow_mut().push(connection.clone()); + } + (OutputWrapper::new(buffer, internal), stream) } diff --git a/external/timely-dataflow/timely/src/dataflow/operators/generic/handles.rs b/external/timely-dataflow/timely/src/dataflow/operators/generic/handles.rs index 3055354b..eeddf529 100644 --- a/external/timely-dataflow/timely/src/dataflow/operators/generic/handles.rs +++ b/external/timely-dataflow/timely/src/dataflow/operators/generic/handles.rs @@ -6,6 +6,7 @@ use std::rc::Rc; use std::cell::RefCell; +use crate::progress::Antichain; use crate::progress::Timestamp; use crate::progress::ChangeBatch; use crate::progress::frontier::MutableAntichain; @@ -17,13 +18,18 @@ use crate::communication::{Push, Pull, message::RefOrMut}; use crate::Container; use crate::logging::TimelyLogger as Logger; -use crate::dataflow::operators::CapabilityRef; +use crate::dataflow::operators::InputCapability; use crate::dataflow::operators::capability::CapabilityTrait; /// Handle to an operator's input stream. pub struct InputHandleCore>> { pull_counter: PullCounter, internal: Rc>>>>>, + /// Timestamp summaries from this input to each output. + /// + /// Each timestamp received through this input may only produce output timestamps + /// greater or equal to the input timestamp subjected to at least one of these summaries. + summaries: Rc>>>, logging: Option, } @@ -47,15 +53,16 @@ impl<'a, T: Timestamp, D: Container, P: Pull>> InputHandleCore< /// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability. /// Returns `None` when there's no more data available. #[inline] - pub fn next(&mut self) -> Option<(CapabilityRef, RefOrMut)> { + pub fn next(&mut self) -> Option<(InputCapability, RefOrMut)> { let internal = &self.internal; + let summaries = &self.summaries; self.pull_counter.next_guarded().map(|(guard, bundle)| { match bundle.as_ref_or_mut() { RefOrMut::Ref(bundle) => { - (CapabilityRef::new(&bundle.time, internal.clone(), guard), RefOrMut::Ref(&bundle.data)) + (InputCapability::new(internal.clone(), summaries.clone(), guard), RefOrMut::Ref(&bundle.data)) }, RefOrMut::Mut(bundle) => { - (CapabilityRef::new(&bundle.time, internal.clone(), guard), RefOrMut::Mut(&mut bundle.data)) + (InputCapability::new(internal.clone(), summaries.clone(), guard), RefOrMut::Mut(&mut bundle.data)) }, } }) @@ -80,7 +87,7 @@ impl<'a, T: Timestamp, D: Container, P: Pull>> InputHandleCore< /// }); /// ``` #[inline] - pub fn for_each, RefOrMut)>(&mut self, mut logic: F) { + pub fn for_each, RefOrMut)>(&mut self, mut logic: F) { let mut logging = self.logging.take(); while let Some((cap, data)) = self.next() { logging.as_mut().map(|l| l.log(crate::logging::GuardedMessageEvent { is_start: true })); @@ -105,7 +112,7 @@ impl<'a, T: Timestamp, D: Container, P: Pull>+'a> FrontieredInp /// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability. /// Returns `None` when there's no more data available. #[inline] - pub fn next(&mut self) -> Option<(CapabilityRef, RefOrMut)> { + pub fn next(&mut self) -> Option<(InputCapability, RefOrMut)> { self.handle.next() } @@ -128,7 +135,7 @@ impl<'a, T: Timestamp, D: Container, P: Pull>+'a> FrontieredInp /// }); /// ``` #[inline] - pub fn for_each, RefOrMut)>(&mut self, logic: F) { + pub fn for_each, RefOrMut)>(&mut self, logic: F) { self.handle.for_each(logic) } @@ -145,10 +152,16 @@ pub fn _access_pull_counter /// Constructs an input handle. /// Declared separately so that it can be kept private when `InputHandle` is re-exported. -pub fn new_input_handle>>(pull_counter: PullCounter, internal: Rc>>>>>, logging: Option) -> InputHandleCore { +pub fn new_input_handle>>( + pull_counter: PullCounter, + internal: Rc>>>>>, + summaries: Rc>>>, + logging: Option +) -> InputHandleCore { InputHandleCore { pull_counter, internal, + summaries, logging, } } @@ -221,6 +234,11 @@ impl<'a, T: Timestamp, C: Container, P: Push>> OutputHandleCore assert!(cap.valid_for_output(&self.internal_buffer), "Attempted to open output session with invalid capability"); self.push_buffer.session(cap.time()) } + + /// Flushes all pending data and indicate that no more data immediately follows. + pub fn cease(&mut self) { + self.push_buffer.cease(); + } } impl<'a, T: Timestamp, C: Container, P: Push>> Drop for OutputHandleCore<'a, T, C, P> { diff --git a/external/timely-dataflow/timely/src/dataflow/operators/mod.rs b/external/timely-dataflow/timely/src/dataflow/operators/mod.rs index 6f63f099..1eec15cb 100644 --- a/external/timely-dataflow/timely/src/dataflow/operators/mod.rs +++ b/external/timely-dataflow/timely/src/dataflow/operators/mod.rs @@ -21,7 +21,7 @@ pub use self::delay::Delay; pub use self::exchange::Exchange; pub use self::broadcast::Broadcast; pub use self::probe::Probe; -pub use self::to_stream::{ToStream, ToStreamCore, ToStreamAsync, Event}; +pub use self::to_stream::{ToStream, ToStreamCore}; pub use self::capture::Capture; pub use self::branch::{Branch, BranchWhen}; pub use self::ok_err::OkErr; @@ -62,4 +62,4 @@ pub mod count; // keep "mint" module-private mod capability; -pub use self::capability::{ActivateCapability, Capability, CapabilityRef, CapabilitySet, DowngradeError}; +pub use self::capability::{ActivateCapability, Capability, InputCapability, CapabilitySet, DowngradeError}; diff --git a/external/timely-dataflow/timely/src/dataflow/operators/probe.rs b/external/timely-dataflow/timely/src/dataflow/operators/probe.rs index 7c5a8567..ad990cb7 100644 --- a/external/timely-dataflow/timely/src/dataflow/operators/probe.rs +++ b/external/timely-dataflow/timely/src/dataflow/operators/probe.rs @@ -76,7 +76,7 @@ pub trait Probe { /// } /// }).unwrap(); /// ``` - fn probe_with(&self, handle: &mut Handle) -> StreamCore; + fn probe_with(&self, handle: &Handle) -> StreamCore; } impl Probe for StreamCore { @@ -87,14 +87,14 @@ impl Probe for StreamCore { self.probe_with(&mut handle); handle } - fn probe_with(&self, handle: &mut Handle) -> StreamCore { + fn probe_with(&self, handle: &Handle) -> StreamCore { let mut builder = OperatorBuilder::new("Probe".to_owned(), self.scope()); let mut input = PullCounter::new(builder.new_input(self, Pipeline)); let (tee, stream) = builder.new_output(); let mut output = PushBuffer::new(PushCounter::new(tee)); - let shared_frontier = handle.frontier.clone(); + let shared_frontier = Rc::downgrade(&handle.frontier); let mut started = false; let mut vector = Default::default(); @@ -103,8 +103,10 @@ impl Probe for StreamCore { move |progress| { // surface all frontier changes to the shared frontier. - let mut borrow = shared_frontier.borrow_mut(); - borrow.update_iter(progress.frontiers[0].drain()); + if let Some(shared_frontier) = shared_frontier.upgrade() { + let mut borrow = shared_frontier.borrow_mut(); + borrow.update_iter(progress.frontiers[0].drain()); + } if !started { // discard initial capability. diff --git a/external/timely-dataflow/timely/src/dataflow/operators/to_stream.rs b/external/timely-dataflow/timely/src/dataflow/operators/to_stream.rs index 597ec916..555a540b 100644 --- a/external/timely-dataflow/timely/src/dataflow/operators/to_stream.rs +++ b/external/timely-dataflow/timely/src/dataflow/operators/to_stream.rs @@ -1,15 +1,10 @@ //! Conversion to the `Stream` type from iterators. -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; use crate::Container; - -use crate::dataflow::operators::generic::operator::source; -use crate::dataflow::operators::CapabilitySet; -use crate::dataflow::{StreamCore, Scope, Stream}; use crate::progress::Timestamp; use crate::Data; +use crate::dataflow::operators::generic::operator::source; +use crate::dataflow::{StreamCore, Stream, Scope}; /// Converts to a timely `Stream`. pub trait ToStream { @@ -112,80 +107,3 @@ impl ToStreamCore for I where }) } } - -/// Data and progress events of the native stream. -pub enum Event { - /// Indicates that timestamps have advanced to frontier F - Progress(F), - /// Indicates that event D happened at time T - Message(F::Item, D), -} - -/// Converts to a timely `Stream`. -pub trait ToStreamAsync { - /// Converts a [native `Stream`](futures_util::stream::Stream) of [`Event`s](Event) into a [timely - /// `Stream`](crate::dataflow::Stream). - /// - /// # Examples - /// - /// ``` - /// use futures_util::stream; - /// - /// use timely::dataflow::operators::{Capture, Event, ToStream, ToStreamAsync}; - /// use timely::dataflow::operators::capture::Extract; - /// - /// let native_stream = stream::iter(vec![ - /// Event::Message(0, 0), - /// Event::Message(0, 1), - /// Event::Message(0, 2), - /// Event::Progress(Some(0)), - /// ]); - /// - /// let (data1, data2) = timely::example(|scope| { - /// let data1 = native_stream.to_stream(scope).capture(); - /// let data2 = vec![0,1,2].to_stream(scope).capture(); - /// - /// (data1, data2) - /// }); - /// - /// assert_eq!(data1.extract(), data2.extract()); - /// ``` - fn to_stream>(self, scope: &S) -> Stream; -} - -impl ToStreamAsync for I -where - D: Data, - T: Timestamp, - F: IntoIterator, - I: futures_util::stream::Stream> + Unpin + 'static, -{ - fn to_stream>(mut self, scope: &S) -> Stream { - source(scope, "ToStreamAsync", move |capability, info| { - let activator = Arc::new(scope.sync_activator_for(&info.address[..])); - - let mut cap_set = CapabilitySet::from_elem(capability); - - move |output| { - let waker = futures_util::task::waker_ref(&activator); - let mut context = Context::from_waker(&waker); - - // Consume all the ready items of the source_stream and issue them to the operator - while let Poll::Ready(item) = Pin::new(&mut self).poll_next(&mut context) { - match item { - Some(Event::Progress(time)) => { - cap_set.downgrade(time); - } - Some(Event::Message(time, data)) => { - output.session(&cap_set.delayed(&time)).give(data); - } - None => { - cap_set.downgrade(&[]); - break; - } - } - } - } - }) - } -} diff --git a/external/timely-dataflow/timely/src/order.rs b/external/timely-dataflow/timely/src/order.rs index 6132c5e8..21d876be 100644 --- a/external/timely-dataflow/timely/src/order.rs +++ b/external/timely-dataflow/timely/src/order.rs @@ -59,6 +59,15 @@ implement_total!(u8, u16, u32, u64, u128, usize, i8, i16, i32, i64, i128, isize, pub use product::Product; /// A pair of timestamps, partially ordered by the product order. mod product { + use std::fmt::{Formatter, Error, Debug}; + + #[cfg(feature = "columnation")] + use crate::container::columnation::{Columnation, Region}; + use crate::order::{Empty, TotalOrder}; + use crate::progress::Timestamp; + use crate::progress::timestamp::PathSummary; + use crate::progress::timestamp::Refines; + /// A nested pair of timestamps, one outer and one inner. /// /// We use `Product` rather than `(TOuter, TInner)` so that we can derive our own `PartialOrder`, @@ -82,7 +91,6 @@ mod product { } // Debug implementation to avoid seeing fully qualified path names. - use std::fmt::{Formatter, Error, Debug}; impl Debug for Product { fn fmt(&self, f: &mut Formatter) -> Result<(), Error> { f.write_str(&format!("({:?}, {:?})", self.outer, self.inner)) @@ -97,13 +105,11 @@ mod product { } } - use crate::progress::Timestamp; impl Timestamp for Product { type Summary = Product; fn minimum() -> Self { Self { outer: TOuter::minimum(), inner: TInner::minimum() }} } - use crate::progress::timestamp::PathSummary; impl PathSummary> for Product { #[inline] fn results_in(&self, product: &Product) -> Option> { @@ -123,7 +129,6 @@ mod product { } } - use crate::progress::timestamp::Refines; impl Refines for Product { fn to_inner(other: TOuter) -> Self { Product::new(other, TInner::minimum()) @@ -136,9 +141,52 @@ mod product { } } - use super::{Empty, TotalOrder}; impl Empty for Product { } impl TotalOrder for Product where T1: Empty, T2: TotalOrder { } + + #[cfg(feature = "columnation")] + impl Columnation for Product { + type InnerRegion = ProductRegion; + } + + #[cfg(feature = "columnation")] + #[derive(Default)] + pub struct ProductRegion { + outer_region: T1, + inner_region: T2, + } + + #[cfg(feature = "columnation")] + impl Region for ProductRegion { + type Item = Product; + + #[inline] + unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item { + Product { outer: self.outer_region.copy(&item.outer), inner: self.inner_region.copy(&item.inner) } + } + + fn clear(&mut self) { + self.outer_region.clear(); + self.inner_region.clear(); + } + + fn reserve_items<'a, I>(&mut self, items1: I) where Self: 'a, I: Iterator + Clone { + let items2 = items1.clone(); + self.outer_region.reserve_items(items1.map(|x| &x.outer)); + self.inner_region.reserve_items(items2.map(|x| &x.inner)) + } + + fn reserve_regions<'a, I>(&mut self, regions1: I) where Self: 'a, I: Iterator + Clone { + let regions2 = regions1.clone(); + self.outer_region.reserve_regions(regions1.map(|r| &r.outer_region)); + self.inner_region.reserve_regions(regions2.map(|r| &r.inner_region)); + } + + fn heap_size(&self, mut callback: impl FnMut(usize, usize)) { + self.outer_region.heap_size(&mut callback); + self.inner_region.heap_size(callback); + } + } } /// Rust tuple ordered by the lexicographic order. @@ -182,6 +230,19 @@ mod tuple { } } + use crate::progress::timestamp::Refines; + impl Refines for (TOuter, TInner) { + fn to_inner(other: TOuter) -> Self { + (other, TInner::minimum()) + } + fn to_outer(self: (TOuter, TInner)) -> TOuter { + self.0 + } + fn summarize(path: ::Summary) -> ::Summary { + path.0 + } + } + use super::Empty; impl Empty for (T1, T2) { } } diff --git a/external/timely-dataflow/timely/src/progress/frontier.rs b/external/timely-dataflow/timely/src/progress/frontier.rs index ea52ba63..ba44de48 100644 --- a/external/timely-dataflow/timely/src/progress/frontier.rs +++ b/external/timely-dataflow/timely/src/progress/frontier.rs @@ -13,7 +13,7 @@ use crate::order::{PartialOrder, TotalOrder}; /// Two antichains are equal if the contain the same set of elements, even if in different orders. /// This can make equality testing quadratic, though linear in the common case that the sequences /// are identical. -#[derive(Debug, Default, Abomonation, Serialize, Deserialize)] +#[derive(Debug, Abomonation, Serialize, Deserialize)] pub struct Antichain { elements: Vec } @@ -43,6 +43,32 @@ impl Antichain { } } + /// Updates the `Antichain` if the element is not greater than or equal to some present element. + /// + /// Returns true if element is added to the set + /// + /// Accepts a reference to an element, which is cloned when inserting. + /// + /// # Examples + /// + ///``` + /// use timely::progress::frontier::Antichain; + /// + /// let mut frontier = Antichain::new(); + /// assert!(frontier.insert_ref(&2)); + /// assert!(!frontier.insert(3)); + ///``` + pub fn insert_ref(&mut self, element: &T) -> bool where T: Clone { + if !self.elements.iter().any(|x| x.less_equal(element)) { + self.elements.retain(|x| !element.less_equal(x)); + self.elements.push(element.clone()); + true + } + else { + false + } + } + /// Reserves capacity for at least additional more elements to be inserted in the given `Antichain` pub fn reserve(&mut self, additional: usize) { self.elements.reserve(additional); @@ -238,6 +264,12 @@ impl Clone for Antichain { } } +impl Default for Antichain { + fn default() -> Self { + Self::new() + } +} + impl TotalOrder for Antichain { } impl Antichain { @@ -308,14 +340,9 @@ impl ::std::iter::IntoIterator for Antichain { /// The `MutableAntichain` implementation is done with the intent that updates to it are done in batches, /// and it is acceptable to rebuild the frontier from scratch when a batch of updates change it. This means /// that it can be expensive to maintain a large number of counts and change few elements near the frontier. -/// -/// There is an `update_dirty` method for single updates that leave the `MutableAntichain` in a dirty state, -/// but I strongly recommend against using them unless you must (on part of timely progress tracking seems -/// to be greatly simplified by access to this) #[derive(Clone, Debug, Abomonation, Serialize, Deserialize)] pub struct MutableAntichain { - dirty: usize, - updates: Vec<(T, i64)>, + updates: ChangeBatch, frontier: Vec, changes: ChangeBatch, } @@ -334,8 +361,7 @@ impl MutableAntichain { #[inline] pub fn new() -> MutableAntichain { MutableAntichain { - dirty: 0, - updates: Vec::new(), + updates: ChangeBatch::new(), frontier: Vec::new(), changes: ChangeBatch::new(), } @@ -354,21 +380,11 @@ impl MutableAntichain { ///``` #[inline] pub fn clear(&mut self) { - self.dirty = 0; self.updates.clear(); self.frontier.clear(); self.changes.clear(); } - /// This method deletes the contents. Unlike `clear` it records doing so. - pub fn empty(&mut self) { - for (_, diff) in self.updates.iter_mut() { - *diff = 0; - } - - self.dirty = self.updates.len(); - } - /// Reveals the minimal elements with positive count. /// /// # Examples @@ -381,7 +397,6 @@ impl MutableAntichain { ///``` #[inline] pub fn frontier(&self) -> AntichainRef<'_, T> { - debug_assert_eq!(self.dirty, 0); AntichainRef::new(&self.frontier) } @@ -396,13 +411,12 @@ impl MutableAntichain { /// assert!(frontier.frontier() == AntichainRef::new(&[0u64])); ///``` #[inline] - pub fn new_bottom(bottom: T) -> MutableAntichain + pub fn new_bottom(bottom: T) -> MutableAntichain where - T: Clone, + T: Ord+Clone, { MutableAntichain { - dirty: 0, - updates: vec![(bottom.clone(), 1)], + updates: ChangeBatch::new_from(bottom.clone(), 1), frontier: vec![bottom], changes: ChangeBatch::new(), } @@ -420,7 +434,6 @@ impl MutableAntichain { ///``` #[inline] pub fn is_empty(&self) -> bool { - debug_assert_eq!(self.dirty, 0); self.frontier.is_empty() } @@ -441,7 +454,6 @@ impl MutableAntichain { where T: PartialOrder, { - debug_assert_eq!(self.dirty, 0); self.frontier().less_than(time) } @@ -462,22 +474,9 @@ impl MutableAntichain { where T: PartialOrder, { - debug_assert_eq!(self.dirty, 0); self.frontier().less_equal(time) } - /// Allows a single-element push, but dirties the antichain and prevents inspection until cleaned. - /// - /// At the moment inspection is prevented via panic, so best be careful (this should probably be fixed). - /// It is *very* important if you want to use this method that very soon afterwards you call something - /// akin to `update_iter`, perhaps with a `None` argument if you have no more data, as this method will - /// tidy up the internal representation. - #[inline] - pub fn update_dirty(&mut self, time: T, delta: i64) { - self.updates.push((time, delta)); - self.dirty += 1; - } - /// Applies updates to the antichain and enumerates any changes. /// /// # Examples @@ -502,32 +501,20 @@ impl MutableAntichain { { let updates = updates.into_iter(); - // Attempt to pre-allocate for the new updates - let (min, max) = updates.size_hint(); - self.updates.reserve(max.unwrap_or(min)); - - for (time, delta) in updates { - self.updates.push((time, delta)); - self.dirty += 1; - } - // track whether a rebuild is needed. let mut rebuild_required = false; + for (time, delta) in updates { - // determine if recently pushed data requires rebuilding the frontier. - // note: this may be required even with an empty iterator, due to dirty data in self.updates. - while self.dirty > 0 && !rebuild_required { - - let time = &self.updates[self.updates.len() - self.dirty].0; - let delta = self.updates[self.updates.len() - self.dirty].1; - - let beyond_frontier = self.frontier.iter().any(|f| f.less_than(time)); - let before_frontier = !self.frontier.iter().any(|f| f.less_equal(time)); - rebuild_required = rebuild_required || !(beyond_frontier || (delta < 0 && before_frontier)); + // If we do not yet require a rebuild, test whether we might require one + // and set the flag in that case. + if !rebuild_required { + let beyond_frontier = self.frontier.iter().any(|f| f.less_than(&time)); + let before_frontier = !self.frontier.iter().any(|f| f.less_equal(&time)); + rebuild_required = !(beyond_frontier || (delta < 0 && before_frontier)); + } - self.dirty -= 1; + self.updates.update(time, delta); } - self.dirty = 0; if rebuild_required { self.rebuild() @@ -535,7 +522,7 @@ impl MutableAntichain { self.changes.drain() } - /// Sorts and consolidates `self.updates` and applies `action` to any frontier changes. + /// Rebuilds `self.frontier` from `self.updates`. /// /// This method is meant to be used for bulk updates to the frontier, and does more work than one might do /// for single updates, but is meant to be an efficient way to process multiple updates together. This is @@ -544,19 +531,6 @@ impl MutableAntichain { where T: Clone + PartialOrder + Ord, { - - // sort and consolidate updates; retain non-zero accumulations. - if !self.updates.is_empty() { - self.updates.sort_by(|x,y| x.0.cmp(&y.0)); - for i in 0 .. self.updates.len() - 1 { - if self.updates[i].0 == self.updates[i+1].0 { - self.updates[i+1].1 += self.updates[i].1; - self.updates[i].1 = 0; - } - } - self.updates.retain(|x| x.1 != 0); - } - for time in self.frontier.drain(..) { self.changes.update(time, -1); } @@ -580,11 +554,23 @@ impl MutableAntichain { T: Ord, { self.updates + .unstable_internal_updates() .iter() .filter(|td| td.0.eq(query_time)) .map(|td| td.1) .sum() } + + /// Reports the updates that form the frontier. Returns an iterator of timestamps and their frequency. + /// + /// Rebuilds the internal representation before revealing times and frequencies. + pub fn updates(&mut self) -> impl Iterator + where + T: Clone + PartialOrder + Ord, + { + self.rebuild(); + self.updates.iter() + } } impl Default for MutableAntichain { @@ -804,4 +790,25 @@ mod tests { assert!(!hashed.contains(&Antichain::from(vec![Elem('c', 3)]))); assert!(!hashed.contains(&Antichain::from(vec![]))); } + + #[test] + fn mutable_compaction() { + let mut mutable = MutableAntichain::new(); + mutable.update_iter(Some((7, 1))); + mutable.update_iter(Some((7, 1))); + mutable.update_iter(Some((7, 1))); + mutable.update_iter(Some((7, 1))); + mutable.update_iter(Some((7, 1))); + mutable.update_iter(Some((7, 1))); + mutable.update_iter(Some((8, 1))); + mutable.update_iter(Some((8, 1))); + mutable.update_iter(Some((8, 1))); + mutable.update_iter(Some((8, 1))); + mutable.update_iter(Some((8, 1))); + for _ in 0 .. 1000 { + mutable.update_iter(Some((9, 1))); + mutable.update_iter(Some((9, -1))); + } + assert!(mutable.updates.unstable_internal_updates().len() <= 32); + } } diff --git a/external/timely-dataflow/timely/src/progress/reachability.rs b/external/timely-dataflow/timely/src/progress/reachability.rs index 8e51c839..5a1f1f6f 100644 --- a/external/timely-dataflow/timely/src/progress/reachability.rs +++ b/external/timely-dataflow/timely/src/progress/reachability.rs @@ -897,4 +897,48 @@ pub mod logging { impl From for TrackerEvent { fn from(v: TargetUpdate) -> TrackerEvent { TrackerEvent::TargetUpdate(v) } } -} \ No newline at end of file +} + +// The Drop implementation for `Tracker` makes sure that reachability logging is correct for +// prematurely dropped dataflows. At the moment, this is only possible through `drop_dataflow`, +// because in all other cases the tracker stays alive while it has outstanding work, leaving no +// remaining work for this Drop implementation. +impl Drop for Tracker { + fn drop(&mut self) { + let logger = if let Some(logger) = &mut self.logger { + logger + } else { + // No cleanup necessary when there is no logger. + return; + }; + + // Retract pending data that `propagate_all` would normally log. + for (index, per_operator) in self.per_operator.iter_mut().enumerate() { + let target_changes = per_operator.targets + .iter_mut() + .enumerate() + .flat_map(|(port, target)| { + target.pointstamps + .updates() + .map(move |(time, diff)| (index, port, time.clone(), -diff)) + }) + .collect::>(); + if !target_changes.is_empty() { + logger.log_target_updates(Box::new(target_changes)); + } + + let source_changes = per_operator.sources + .iter_mut() + .enumerate() + .flat_map(|(port, source)| { + source.pointstamps + .updates() + .map(move |(time, diff)| (index, port, time.clone(), -diff)) + }) + .collect::>(); + if !source_changes.is_empty() { + logger.log_source_updates(Box::new(source_changes)); + } + } + } +} diff --git a/external/timely-dataflow/timely/src/progress/subgraph.rs b/external/timely-dataflow/timely/src/progress/subgraph.rs index 309d9077..f41fee19 100644 --- a/external/timely-dataflow/timely/src/progress/subgraph.rs +++ b/external/timely-dataflow/timely/src/progress/subgraph.rs @@ -200,6 +200,7 @@ where incomplete_count, activations, temp_active: BinaryHeap::new(), + maybe_shutdown: Vec::new(), children: self.children, input_messages: self.input_messages, output_capabilities: self.output_capabilities, @@ -242,6 +243,7 @@ where // shared activations (including children). activations: Rc>, temp_active: BinaryHeap>, + maybe_shutdown: Vec, // shared state written to by the datapath, counting records entering this subgraph instance. input_messages: Vec>>>, @@ -461,6 +463,7 @@ where // Drain propagated information into shared progress structure. for ((location, time), diff) in self.pointstamp_tracker.pushed().drain() { + self.maybe_shutdown.push(location.node); // Targets are actionable, sources are not. if let crate::progress::Port::Target(port) = location.port { if self.children[location.node].notify { @@ -477,6 +480,18 @@ where } } + // Consider scheduling each recipient of progress information to shut down. + self.maybe_shutdown.sort(); + self.maybe_shutdown.dedup(); + for child_index in self.maybe_shutdown.drain(..) { + let child_state = self.pointstamp_tracker.node_state(child_index); + let frontiers_empty = child_state.targets.iter().all(|x| x.implications.is_empty()); + let no_capabilities = child_state.sources.iter().all(|x| x.pointstamps.is_empty()); + if frontiers_empty && no_capabilities { + self.temp_active.push(Reverse(child_index)); + } + } + // Extract child zero frontier changes and report as internal capability changes. for (output, internal) in self.shared_progress.borrow_mut().internals.iter_mut().enumerate() { self.pointstamp_tracker diff --git a/external/timely-dataflow/timely/src/scheduling/activate.rs b/external/timely-dataflow/timely/src/scheduling/activate.rs index e91c87a9..e86f4435 100644 --- a/external/timely-dataflow/timely/src/scheduling/activate.rs +++ b/external/timely-dataflow/timely/src/scheduling/activate.rs @@ -1,14 +1,12 @@ //! Parking and unparking timely fibers. use std::rc::Rc; -use std::sync::Arc; use std::cell::RefCell; use std::thread::Thread; use std::collections::BinaryHeap; use std::time::{Duration, Instant}; use std::cmp::Reverse; use crossbeam_channel::{Sender, Receiver}; -use futures_util::task::ArcWake; /// Methods required to act as a timely scheduler. /// @@ -274,12 +272,6 @@ impl SyncActivator { } } -impl ArcWake for SyncActivator { - fn wake_by_ref(arc_self: &Arc) { - arc_self.activate().unwrap(); - } -} - /// The error returned when activation fails across thread boundaries because /// the receiving end has hung up. #[derive(Clone, Copy, Debug)] diff --git a/external/timely-dataflow/timely/src/worker.rs b/external/timely-dataflow/timely/src/worker.rs index 153f5c77..3db8d973 100644 --- a/external/timely-dataflow/timely/src/worker.rs +++ b/external/timely-dataflow/timely/src/worker.rs @@ -338,8 +338,9 @@ impl Worker { let events = allocator.events().clone(); let mut borrow = events.borrow_mut(); let paths = self.paths.borrow(); - for (channel, _event) in borrow.drain(..) { - // TODO: Pay more attent to `_event`. + borrow.sort_unstable(); + borrow.dedup(); + for channel in borrow.drain(..) { // Consider tracking whether a channel // in non-empty, and only activating // on the basis of non-empty channels. diff --git a/external/timely-dataflow/timely/tests/gh_523.rs b/external/timely-dataflow/timely/tests/gh_523.rs new file mode 100644 index 00000000..9003f7af --- /dev/null +++ b/external/timely-dataflow/timely/tests/gh_523.rs @@ -0,0 +1,41 @@ +use timely::dataflow::channels::pact::Pipeline; +use timely::dataflow::operators::{Exchange, Input, Operator, Probe}; +use timely::dataflow::InputHandle; +use timely::Config; + +#[test] +fn gh_523() { + timely::execute(Config::thread(), |worker| { + let mut input = InputHandle::new(); + let mut buf = Vec::new(); + let probe = worker.dataflow::(|scope| { + scope + .input_from(&mut input) + .unary(Pipeline, "Test", move |_, _| { + move |input, output| { + input.for_each(|cap, data| { + data.swap(&mut buf); + let mut session = output.session(&cap); + session.give_container(&mut Vec::new()); + session.give_container(&mut buf); + }); + } + }) + .exchange(|x| *x) + .probe() + }); + + for round in 0..2 { + input.send(round); + input.advance_to(round + 1); + } + input.close(); + + while !probe.done() { + worker.step(); + } + + println!("worker {} complete", worker.index()); + }) + .unwrap(); +} diff --git a/src/engine/dataflow.rs b/src/engine/dataflow.rs index 6e2f8252..13dd4483 100644 --- a/src/engine/dataflow.rs +++ b/src/engine/dataflow.rs @@ -56,7 +56,7 @@ use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::{Arranged, TraceAgent}; use differential_dataflow::operators::iterate::Variable; use differential_dataflow::operators::reduce::{Reduce, ReduceCore}; -use differential_dataflow::operators::{Consolidate, JoinCore}; +use differential_dataflow::operators::JoinCore; use differential_dataflow::trace::implementations::ord::{OrdKeySpine, OrdValSpine}; use differential_dataflow::Collection; use differential_dataflow::{AsCollection as _, Data}; @@ -3024,7 +3024,7 @@ impl> DataflowGraphInner { }; } }) - .probe_with(&mut self.output_probe); + .probe_with(&self.output_probe); Ok(()) } @@ -3104,7 +3104,7 @@ impl> DataflowGraphInner { } } }) - .probe_with(&mut self.output_probe); + .probe_with(&self.output_probe); Ok(()) }