Skip to content

Commit

Permalink
Update external (#5675)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 880b38fcda3941ad32ae0a2b52198f84cd35f5cc
  • Loading branch information
embe-pw authored and Manul from Pathway committed Feb 13, 2024
1 parent 0c068ab commit 26690e2
Show file tree
Hide file tree
Showing 65 changed files with 1,087 additions and 1,001 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ where
let timer = std::time::Instant::now();
let mut work = 0;

// New entries to introduce to the stash after processing.
let mut stash_additions = HashMap::new();

if let Some(ref mut trace) = arrangement_trace {

for (capability, proposals) in stash.iter_mut() {
Expand Down Expand Up @@ -231,6 +234,35 @@ where
}

proposals.retain(|ptd| !ptd.2.is_zero());

// Determine the lower bound of remaining update times.
let mut antichain = Antichain::new();
for (_, initial, _) in proposals.iter() {
antichain.insert(initial.clone());
}
// Fast path: there is only one element in the antichain.
// All times in `proposals` must be greater or equal to it.
if antichain.len() == 1 && !antichain.less_equal(capability.time()) {
stash_additions
.entry(capability.delayed(&antichain[0]))
.or_insert(Vec::new())
.extend(proposals.drain(..));
}
else if antichain.len() > 1 {
// Any remaining times should peel off elements from `proposals`.
let mut additions = vec![Vec::new(); antichain.len()];
for (data, initial, diff) in proposals.drain(..) {
use timely::PartialOrder;
let position = antichain.iter().position(|t| t.less_equal(&initial)).unwrap();
additions[position].push((data, initial, diff));
}
for (time, addition) in antichain.into_iter().zip(additions) {
stash_additions
.entry(capability.delayed(&time))
.or_insert(Vec::new())
.extend(addition);
}
}
}
}
}
Expand All @@ -242,6 +274,10 @@ where

// drop fully processed capabilities.
stash.retain(|_,proposals| !proposals.is_empty());

for (capability, proposals) in stash_additions.into_iter() {
stash.entry(capability).or_insert(Vec::new()).extend(proposals);
}

// The logical merging frontier depends on both input1 and stash.
let mut frontier = timely::progress::frontier::Antichain::new();
Expand Down
1 change: 0 additions & 1 deletion external/differential-dataflow/examples/accumulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ extern crate differential_dataflow;
use rand::{Rng, SeedableRng, StdRng};

use differential_dataflow::input::Input;
use differential_dataflow::operators::Consolidate;

fn main() {

Expand Down
1 change: 0 additions & 1 deletion external/differential-dataflow/examples/arrange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use differential_dataflow::operators::arrange::ArrangeByKey;
use differential_dataflow::operators::reduce::Reduce;
use differential_dataflow::operators::join::JoinCore;
use differential_dataflow::operators::Iterate;
use differential_dataflow::operators::Consolidate;

fn main() {

Expand Down
153 changes: 153 additions & 0 deletions external/differential-dataflow/examples/dynamic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
extern crate rand;
extern crate timely;
extern crate differential_dataflow;

use rand::{Rng, SeedableRng, StdRng};

use timely::dataflow::*;
use timely::dataflow::operators::probe::Handle;

use differential_dataflow::input::Input;
use differential_dataflow::Collection;
use differential_dataflow::operators::*;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::logging::DifferentialEvent;

type Node = u32;
type Edge = (Node, Node);

fn main() {

let nodes: u32 = std::env::args().nth(1).unwrap().parse().unwrap();
let edges: u32 = std::env::args().nth(2).unwrap().parse().unwrap();
let batch: u32 = std::env::args().nth(3).unwrap().parse().unwrap();
let rounds: u32 = std::env::args().nth(4).unwrap().parse().unwrap();
let inspect: bool = std::env::args().nth(5).unwrap() == "inspect";

// define a new computational scope, in which to run BFS
timely::execute_from_args(std::env::args(), move |worker| {

if let Ok(addr) = ::std::env::var("DIFFERENTIAL_LOG_ADDR") {

eprintln!("enabled DIFFERENTIAL logging to {}", addr);

if let Ok(stream) = ::std::net::TcpStream::connect(&addr) {
let writer = ::timely::dataflow::operators::capture::EventWriter::new(stream);
let mut logger = ::timely::logging::BatchLogger::new(writer);
worker.log_register().insert::<DifferentialEvent,_>("differential/arrange", move |time, data|
logger.publish_batch(time, data)
);
}
else {
panic!("Could not connect to differential log address: {:?}", addr);
}
}

let timer = ::std::time::Instant::now();

// define BFS dataflow; return handles to roots and edges inputs
let mut probe = Handle::new();
let (mut roots, mut graph) = worker.dataflow(|scope| {

let (root_input, roots) = scope.new_collection();
let (edge_input, graph) = scope.new_collection();

let mut result = bfs(&graph, &roots);

if !inspect {
result = result.filter(|_| false);
}

result.map(|(_,l)| l)
.consolidate()
.inspect(|x| println!("\t{:?}", x))
.probe_with(&mut probe);

(root_input, edge_input)
});

let seed: &[_] = &[1, 2, 3, 4];
let mut rng1: StdRng = SeedableRng::from_seed(seed); // rng for edge additions
let mut rng2: StdRng = SeedableRng::from_seed(seed); // rng for edge deletions

roots.insert(0);
roots.close();

println!("performing BFS on {} nodes, {} edges:", nodes, edges);

if worker.index() == 0 {
for _ in 0 .. edges {
graph.insert((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)));
}
}

println!("{:?}\tloaded", timer.elapsed());

graph.advance_to(1);
graph.flush();
worker.step_or_park_while(None, || probe.less_than(graph.time()));

println!("{:?}\tstable", timer.elapsed());

for round in 0 .. rounds {
for element in 0 .. batch {
if worker.index() == 0 {
graph.insert((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)));
graph.remove((rng2.gen_range(0, nodes), rng2.gen_range(0, nodes)));
}
graph.advance_to(2 + round * batch + element);
}
graph.flush();

let timer2 = ::std::time::Instant::now();
worker.step_or_park_while(None, || probe.less_than(&graph.time()));

if worker.index() == 0 {
let elapsed = timer2.elapsed();
println!("{:?}\t{:?}:\t{}", timer.elapsed(), round, elapsed.as_secs() * 1000000000 + (elapsed.subsec_nanos() as u64));
}
}
println!("finished; elapsed: {:?}", timer.elapsed());
}).unwrap();
}

// returns pairs (n, s) indicating node n can be reached from a root in s steps.
fn bfs<G: Scope>(edges: &Collection<G, Edge>, roots: &Collection<G, Node>) -> Collection<G, (Node, u32)>
where G::Timestamp: Lattice+Ord {

use timely::order::Product;
use iterate::Variable;
use differential_dataflow::dynamic::{feedback_summary, pointstamp::PointStamp};

// initialize roots as reaching themselves at distance 0
let nodes = roots.map(|x| (x, 0));

// repeatedly update minimal distances each node can be reached from each root
nodes.scope().iterative::<PointStamp<usize>, _, _>(|inner| {

// These enter the statically bound scope, rather than any iterative scopes.
// We do not *need* to enter them into the dynamic scope, as they are static
// within that scope.
let edges = edges.enter(inner);
let nodes = nodes.enter(inner);

// Create a variable for label iteration.
let inner = feedback_summary::<usize>(1, 1);
let label = Variable::new_from(nodes.clone(), Product { outer: Default::default(), inner });

let next =
label
.join_map(&edges, |_k,l,d| (*d, l+1))
.concat(&nodes)
.reduce(|_, s, t| t.push((*s[0].0, 1)))
;

label.set(&next);
// Leave the dynamic iteration, stripping off the last timestamp coordinate.
next
.leave_dynamic(1)
.inspect(|x| println!("{:?}", x))
.leave()
})

}
2 changes: 1 addition & 1 deletion external/differential-dataflow/examples/graspan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use differential_dataflow::lattice::Lattice;
use differential_dataflow::input::{Input, InputSession};
use differential_dataflow::operators::arrange::{ArrangeByKey, ArrangeBySelf};
use differential_dataflow::operators::iterate::Variable;
use differential_dataflow::operators::{Threshold, JoinCore, Consolidate};
use differential_dataflow::operators::{Threshold, JoinCore};

type Node = usize;
type Edge = (Node, Node);
Expand Down
2 changes: 1 addition & 1 deletion external/differential-dataflow/examples/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ where
{
// Retain node connections along "default" timestamp summaries.
let nodes = nodes.flat_map(|(target, source, summary)| {
if summary != Default::default() {
if summary == Default::default() {
Some((Location::from(target), Location::from(source)))
}
else {
Expand Down
4 changes: 3 additions & 1 deletion external/differential-dataflow/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,6 @@ impl<G: Scope, D: Data, R: Semigroup> Collection<G, D, R> where G::Timestamp: Da
R: ::ExchangeData+Hashable,
G::Timestamp: Lattice+Ord,
{
use operators::consolidate::Consolidate;
self.consolidate()
.inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x));
}
Expand All @@ -545,6 +544,7 @@ impl<G: Scope, D: Data, R: Semigroup> Collection<G, D, R> where G::Timestamp: Da
use timely::dataflow::scopes::ScopeParent;
use timely::progress::timestamp::Refines;

/// Methods requiring a nested scope.
impl<'a, G: Scope, T: Timestamp, D: Data, R: Semigroup> Collection<Child<'a, G, T>, D, R>
where
T: Refines<<G as ScopeParent>::Timestamp>,
Expand Down Expand Up @@ -582,6 +582,7 @@ where
}
}

/// Methods requiring a region as the scope.
impl<'a, G: Scope, D: Data, R: Semigroup> Collection<Child<'a, G, G::Timestamp>, D, R>
{
/// Returns the value of a Collection from a nested region to its containing scope.
Expand All @@ -595,6 +596,7 @@ impl<'a, G: Scope, D: Data, R: Semigroup> Collection<Child<'a, G, G::Timestamp>,
}
}

/// Methods requiring an Abelian difference, to support negation.
impl<G: Scope, D: Data, R: Abelian> Collection<G, D, R> where G::Timestamp: Data {
/// Creates a new collection whose counts are the negation of those in the input.
///
Expand Down
16 changes: 10 additions & 6 deletions external/differential-dataflow/src/consolidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ pub fn consolidate_slice<T: Ord, R: Semigroup>(slice: &mut [(T, R)]) -> usize {
// In a world where there are not many results, we may never even need to call in to merge sort.
slice.sort_by(|x,y| x.0.cmp(&y.0));

let slice_ptr = slice.as_mut_ptr();

// Counts the number of distinct known-non-zero accumulations. Indexes the write location.
let mut offset = 0;
for index in 1 .. slice.len() {
Expand All @@ -55,8 +57,8 @@ pub fn consolidate_slice<T: Ord, R: Semigroup>(slice: &mut [(T, R)]) -> usize {
assert!(offset < index);

// LOOP INVARIANT: offset < index
let ptr1 = slice.as_mut_ptr().offset(offset as isize);
let ptr2 = slice.as_mut_ptr().offset(index as isize);
let ptr1 = slice_ptr.add(offset);
let ptr2 = slice_ptr.add(index);

if (*ptr1).0 == (*ptr2).0 {
(*ptr1).1.plus_equals(&(*ptr2).1);
Expand All @@ -65,7 +67,7 @@ pub fn consolidate_slice<T: Ord, R: Semigroup>(slice: &mut [(T, R)]) -> usize {
if !(*ptr1).1.is_zero() {
offset += 1;
}
let ptr1 = slice.as_mut_ptr().offset(offset as isize);
let ptr1 = slice_ptr.add(offset);
std::ptr::swap(ptr1, ptr2);
}
}
Expand Down Expand Up @@ -103,6 +105,8 @@ pub fn consolidate_updates_slice<D: Ord, T: Ord, R: Semigroup>(slice: &mut [(D,
// In a world where there are not many results, we may never even need to call in to merge sort.
slice.sort_unstable_by(|x,y| (&x.0, &x.1).cmp(&(&y.0, &y.1)));

let slice_ptr = slice.as_mut_ptr();

// Counts the number of distinct known-non-zero accumulations. Indexes the write location.
let mut offset = 0;
for index in 1 .. slice.len() {
Expand All @@ -118,8 +122,8 @@ pub fn consolidate_updates_slice<D: Ord, T: Ord, R: Semigroup>(slice: &mut [(D,
unsafe {

// LOOP INVARIANT: offset < index
let ptr1 = slice.as_mut_ptr().offset(offset as isize);
let ptr2 = slice.as_mut_ptr().offset(index as isize);
let ptr1 = slice_ptr.add(offset);
let ptr2 = slice_ptr.add(index);

if (*ptr1).0 == (*ptr2).0 && (*ptr1).1 == (*ptr2).1 {
(*ptr1).2.plus_equals(&(*ptr2).2);
Expand All @@ -128,7 +132,7 @@ pub fn consolidate_updates_slice<D: Ord, T: Ord, R: Semigroup>(slice: &mut [(D,
if !(*ptr1).2.is_zero() {
offset += 1;
}
let ptr1 = slice.as_mut_ptr().offset(offset as isize);
let ptr1 = slice_ptr.add(offset);
std::ptr::swap(ptr1, ptr2);
}

Expand Down
Loading

0 comments on commit 26690e2

Please sign in to comment.