Today's post will combine two concepts that aren't obviously related: Graph processing and abstract algebra. In the context of differential dataflow (your favorite incremental data-parallel compute platform).
We'll start with a few examples of graph processing that may be initially familiar, but which should evolve into slightly weirder problems. We'll move from reachability in graphs, to single-source shortest paths, to single-source widest paths, to graph connectivity.
For each problem we'll describe algorithms which circulate per-node state on a graph. This state ranges from a bit ("reachable?") to more interesting quantities ("smallest connected node identifier").
Using vanilla differential dataflow algorithms, we will propagate this state as raw data among nodes, which is a not unreasonable way to think of doing things. These algorithms allow us to make arbitrary modifications to the input graph, and the arbitrarily changed results are determined and produced as output.
However, the point of this post will be to show that we can do something even more clever if we are willing to restrict how we are able to update the graph. If, for example, we promise to only ever add edges, never to remove edges, we can more easily maintain reachable sets. If we only ever shorten edges, or make edges wider, we can respectively maintain shortest and widest path computations more efficiently.
More generally, differential dataflow can now handle a broader class of updates than previously, where the new updates (when appropriate) provide substantial performance improvements!
Let's first describe four graph problems we'll consider, each of which has a similar iterative flavor, but with slight differences in what work we iteratively perform.
From a collection of directed edges, say pairs (src, dst)
, and a collection of starting nodes, we might like to determine the set of all nodes that can reached from a starting node along an arbitrary sequence of directed edges.
Over in Datalog lang we might write this computation using the two rules:
reach(x) := start(x)
reach(y) := reach(x), edge(x,y)
In differential dataflow, we could write
let reach =
start.iterate(|reach|
edges.enter(reach.scope())
.semijoin(&reach)
.map(|(src,dst)| dst)
.concat(&reach)
.distinct()
);
This perhaps over-complicated fragment starts from start
and repeatedly applies the iterative fragment that describes how to go from an input reach
to some next iterate. In this case, we restrict edges
to those of reachable sources (using semijoin
), pick out resulting destinations, merge back in reachable nodes, and reduce the collection down to the distinct set of reachable nodes.
In principle, the results of this computation go around for another iteration, repeating indefinitely, but in practice differential dataflow cleverly tracks what has changed and only performs work in response to changes; as the reachable set converges differential dataflow accelerates, and once it stops changing differential dataflow immediately stops doing work.
At this point, we are welcome to start changing the input collections, edges
and start
. We can arbitrarily add to and remove from these collections, and differential dataflow will spring back into action and report the resulting changes to the output set, as if we had re-run the computation from scratch on the new inputs.
Let's dress up our example a bit by adding "lengths" to each edge, and rather than just checking whether a graph node is reachable from a start node, we would like to know the length of the shortest path to each reachable node.
This turns out to be just a little bit of modification to our iterative algorithm. Rather than a collection reach
of reachable nodes, we'll maintain a collection of pairs (node, dist)
of shortest distances within which a node can be reached. This is initially zero for the start nodes. As we cross edges we must add the length of the edge, and rather than take the distinct set of results we want to record only the minimal distance for each node.
let dists =
start.map(|node| (node, 0))
.iterate(|dists|
edges.enter(dists.scope())
.join(&dists)
.map(|(src,(dst,len),tot)| (dst, len+tot))
.concat(&dists)
.reduce(|key, input, output| {
// push minimum length (input sorted).
output.push((*input[0].0, 1))
})
);
As above, differential dataflow will iteratively update the dists
collection with progressively shorter and shorter distances, until they cease changing (assuming no negative weight cycles, of course). Either of start
or edges
can be updated, and differential will correctly refresh the distances to reflect the changes.
As a bit of an interesting (to me) variation, let's imagine that the edges have "widths", indicating some sort of capacity (perhaps this is piping). We might like to determine for each other node the maximum capacity of the path from a source to the node, that being the smallest width along the path.
This is also a minor variation, where instead of adding path lengths as we go, we accumulate the edge values with the minimum operator. When aggregating options (in reduce
) rather than the minimum value we return the maximum value.
let widths =
start.map(|node| (node, usize::max_value()))
.iterate(|width|
edges.enter(width.scope())
.join(&width)
.map(|(src,(dst,wid),max)|
(dst, ::std::cmp::min(wid, max))
)
.concat(&width)
.reduce(|key, input, output| {
// push maximum width (input sorted).
output.push((*input.last().unwrap().0, 1))
})
);
This is almost identical structure to the computation, with different actions taken when we join node state with edges(here: min
) and when we reduce the candidate values for node state (here max
). Nonetheless, differential happily incrementally executes and maintains the computation as we change around the inputs.
A last example, we will try and determine the connected components of a symmetric graph (edges
should contain both (src,dst)
and (dst,src)
if it contains either). Two nodes are in the same connected component if each can be reached from the other along a directed path.
One classic algorithm for this task is "label propagation", where each node proposes a distinct label, and the nodes circulate the labels among their neighbors retaining the smallest label the have yet seen. As the labels only ever decrease, there is a limit to the computation, and in that limit if two nodes have the same label they must be connected, and if they do not have the same label they must not be connected.
In this case the edges do not have values and we instead draw values from the nodes themselves (using their own identifiers as their initial labels):
let labels =
start.map(|node| (node, node))
.iterate(|label|
edges.enter(label.scope())
.join(&label)
.map(|(src,dst,lbl)| (dst,lbl))
.concat(&label)
.reduce(|key, input, output| {
// push minimum label (input sorted).
output.push((*input[0].0, 1))
})
);
Again the same pattern: iterative development of a collection of node state as (node, value)
pairs. Through the magic of differential dataflow: automatically updated as we change things.
Differential dataflow works by maintaining the changes each of its collections undergoes. It describes these changes by an ever-growing set of updates, which we write as
(data, time, diff)
This triple describes three aspects of an update:
data
: What changed? As in, which record experienced a change?time
: When did it change? Was it because the input changed, or because of iteration, or maybe both?diff
: How did it change? As in, what change did the record undergo?
Most often, we think of differential dataflow as maintaining multisets, which assign a count to each of their records. In that case, the change that a multiset undergoes is that we added and removed some records. The data
is the record added or removed, the time
is when it happened, and the diff
is a signed integer describing the change (positive for additions, negative for subtractions).
This works pretty ok, In that we can maintain arbitrary changes to arbitrary collections. We can implement each of the graph algorithms above by maintaining the actual collections of node state as iterations unfold, which is enough for differential dataflow to determine in what ways they might change as iterations and rounds of input unfold.
However, integers are not the only way a collection could change.
Let's take a quick detour to describe the full class of changes differential dataflow supported, as of .. maybe a month ago. More general than just a signed integer, differential dataflow could work with any type that could be added to itself (like integers) and that could be negated (like integers) and which had an additive identity (which you can get from adding an element to its negation). Technically, the addition needs to be associative ((a+b)+c = (a+(b+c)
) and commutative (a + b = b + a
) if you want predictable outputs, which we kinda do.
These properties happens to be the very definition of an Abelian group. Differential dataflow could handle updates using arbitrary Abelian groups, which is handy because there are more of them out there than just the integers. For example, pairs of integers (total, count)
add up and have inverses, and differential can use them as the diff
field to associate an average with each element rather than a count.
But! There are other, weaker restrictions we could put on our types. For example, let's remove the requirement that the type be negatable. Maybe we just want to use the positive integers, for example. A type that satisfies associative, commutative addition, with a zero element, is called a Monoid. As of .. maybe a month ago differential dataflow supports monoids as its diff
types, as long as you don't want to use operators that require subtraction.
How many operators require subtraction? Just three, it turns out:
- The
negate
operator. This one was sort of obvious, right? If we want to negate the counts of each element in a collection, we kinda need a type that supports this. - The
reduce
operator. This operator applies arbitrary user logic to arbitrary collections, and it is totally possible that your user logic might produce something that requires subtraction to get from whatever output collection we were at a moment ago. The only general way we have to figure out a change is to take your new output minus the old output. - The
iterate
operator. This one might have been less obvious, but it turns out that to start the crazy and weird spiral of changes thatiterate
requires, we need to subtract out the initial state of the iterative process. Barring new mathematics or understanding, of course.
If we can avoid these operators, we could write computations that maintain monoids rather than counts. We could, for example, re-write our graph computations to use monoids rather than counts! There are two obvious issues with this line of thought:
- We kinda use
reduce
anditerate
in each example, and - Why would we want to use monoids rather than counts?
Let's take a swing at the second one first, talking through a cool thing that could happen if we sort out the math correctly.
Let's imagine that we had access to monoid technology, and could use types that do not support negation. Where would that fit in the story of each of the examples above?
The most important detail, which may be non-obvious, is that differential dataflow takes its triples (data, time, diff)
and adds up all diff
fields for corresponding (data, time)
pairs. Even though there may be many changes at the same data
and time
, differential can safely accumulate all of the differences to at most one update.
-
Reachability: we don't actually need to know the number of times a node is reachable, only that it has become reachable. Rather than use
isize
to count the number of times a node is reachable, and how it changes, we could consider just using the type()
which .. is just there; it indicates that a node is reachable. When you add it to itself it returns itself. -
Shortest paths: For each node, we propose records
(node, dist)
indicating that one can reachnode
indist
total length. We then flip through all of the distances on offer for each node and select the smallest one. For each of these collections, we track the count of each of the proposals.What if instead we proposed
node
with thediff
set to the proposed distance, ausize
say, and our "addition" operator was "minimum"? The "minimum" operator is associative and commutative, and it has a zero (infinity). Rather than track a collection of distinct(node, dist)
pairs, we could have just a singlenode
with all of thedist
terms present in thediff
field, which collapses for eachnode
. We would store a lot less space. -
Widest paths: Like with shortest paths, we propose
(node, width)
and keep the widest proposal. We could instead proposenode
withdiff
set to the width, where the "addition" operator is "maximum". Like "minimum" the maximum operator is associative, commutative, and has a zero element. And like with the previous approach, we could retain only the maximum for each node rather than all of the distinct proposals. -
Connected components: Label propogation is basically the same as the two examples above: we track the minimum label, and rather than encode this as data we could encode it in the
diff
field and allow the labels for each of the nodes to collapse down as we accumulate things.
So what's the catch?
There is absolutely a catch.
The catch is that if we start using these monoids, our inputs need to use the same diff
type.
We cannot simply add and remove edges.
We can only apply monoid updates.
For our four examples, we would have to only 1. add edges, 2. shorten edges, 3. widen edges, or 4. add edges.
Maybe this works for you, and maybe it doesn't.
If you need to be able to add and remove edges, not so great. Keep reading, though, because you'll learn stuff.
We had three operators which require negation.
The negate
operator isn't getting fixed. That shouldn't be much of a surprise. You don't get to use it if you are using a monoid rather than an Abelian group.
The reduce
operator is complicated because to go from one arbitrary output to another, we might need to perform subtraction, and this does seem to be the case. However, we can supply a weakened form of reduce
that accommodates both monoids and Abelian groups!
The traditional reduce
method asks for a closure that, when presented with the group key and input values, populates a destination vector with an arbitrary output collection. Simplified, it looks a bit like this:
fn reduce_abelian<L, V2, R2>(&self, logic: L)
where
L: Fn(&K, &[(&V, R)], &mut Vec<(V2, R2)>)
R2: Abelian;
Notice the requirement that R2
implement Abelian
. That is all well and good, but we decided that this approach was fundamentally problematic for monoids.
Instead, we can ask for a slightly more clever closure that, presented with the group key, input values, and proposed output values, populates a destination vector with any updates to apply. Simplified, it looks a bit like this:
fn reduce_core<L, V2, R2>(&self, logic: L)
where
L: Fn(&K, &[(&V, R)], &mut Vec<(V2, R2)>, &mut Vec<(V2, R2)>),
R2: Monoid;
It's not that different, except you get one more reference to a vector. Also the constraint on R2
is reduced to implementing Monoid
.
It turns out that this method is sufficient to get us on-line with monoids in our graph computations. We will see how this looks in just a moment, but you could imagine that when we collect proposed distances for each node we could look at the proposed output and, knowing that things can only get shorter, re-assure ourselves that we only need to propose a reduction in the output if one occurs, and never need to walk back an over-short proposed output.
The iterate
method is a pretty easy fix. We only need negation if we want to start our iteration from a specific collection. This is because we need to subtract it out pretty quickly (the second iteration) if we want to converge to the correct limit. That sounds problematic, but it turns out that lots of iterative computations are perfectly comfortable to start from the initially empty collection. We don't need to subtract that out, because it is empty.
Clever.
The Iterate
trait, which supplies the iterate()
method, was previously only implemented on collections, and meant "start iteration from this collection". I added an implementation to dataflow scopes too, which means "start iteration from the empty collection". The latter implementation only requires a monoid whereas the first requires an Abelian group.
Let's take a peek at the monoidal version of our graph algorithms!
unimplemented!()
(homework)
Shortest path computations requires that we define a new type, a bit like an isize
, which allows us to add and multiply elements. When we "add" things we'll want to take the minimum. When we "multiply" things we'll want to add them (what happens when we join
with edges).
/// An integer where + is "min" and * is "sum".
pub struct MinSum {
value: u32,
}
impl std::ops::Add<Self> for MinSum {
type Output = Self;
fn add(self, rhs: Self) -> Self {
MinSum { value: std::cmp::min(self.value, rhs.value) }
}
}
impl std::ops::Mul<Self> for MinSum {
type Output = Self;
fn mul(self, rhs: Self) -> Self {
MinSum { value: self.value + rhs.value }
}
}
impl Monoid for MinSum {
fn zero() -> MinSum { MinSum { value: u32::max_value() } }
}
With this type in hand for our diff
type, we break out the new reduce_core
method, and use iterate()
defined on scopes to make something that looks almost the same as our previous implementation. Unfortunately, because we are pulling back the veil on reduce_core
we have a few ergonomic hoops to leap through (we need to tell the operator how to manage its data). The only other difference is that rather than track (node, dist)
pairs we track (node, ())
pairs, to make joining and reducing a bit more ergonomic (they get confused without an explicit key).
roots
.scope()
.iterate(|dists| {
use differential_dataflow::operators::reduce::ReduceCore;
use differential_dataflow::trace::implementations::ord::OrdKeySpine;
edges
.enter(&dists.scope())
.join(&dists)
.map(|(src,(dst,()))| (dst,()))
.concat(&start.enter(&dists.scope()))
.reduce_core::<_,OrdKeySpine<_,_,_>>(|_key, input, output, updates| {
if output.is_empty() || input[0].1 < output[0].1 {
updates.push(((), input[0].1));
}
})
.as_collection(|k,()| (*k, ()))
})
.map(|(node, ())| node)
The most exciting moment is surely
.reduce_core::<_,OrdKeySpine<_,_,_>>(|_key, input, output, updates| {
if output.is_empty() || input[0].1 < output[0].1 {
updates.push(((), input[0].1));
}
})
where we check out the output
to see if we need to produce an update to correct the current input
to the proposed output. We are a bit more careful than just announcing what we want as output, which is what re-assures the reduce_core
implementation that it can actually implement our logic despite the absence of subtraction.
unimplemented!()
(homework)
unimplemented!()
(homework)
Does this actually do anything good at all?
Well, yes kinda but it depends. If you run the computations on random graphs (like I do) with relatively small average degree (like I do) then there isn't that much difference between tracking all of the proffered shortest distances (relatively few) and the least distance (one).
On the other hand, if you crank the average degree up to twenty (not so large, really) and go the additional distance of making edge weights random numbers between zero and 1,000, you get some pretty exciting results.
The coventional code, which I have to say is a pretty crappy way to compute shortest paths (we do breadth-first exploration by hop count, not by distance), gives us the following timings to start up a 1M node 20M edge computation, and then to perform 1,000 edge additions:
Echidnatron% cargo run --release --example sssp -- 1000000 20000000 1000 1 1000 no
Finished release [optimized + debuginfo] target(s) in 0.16s
Running `target/release/examples/sssp 1000000 20000000 1000 1 1000 no`
performing BFS on 1000000 nodes, 20000000 edges:
1.073813026s loaded
210.942974636s stable
210.969052581s 0: 25967913
210.983290373s 1: 13716671
...
225.992076415s 998: 723287
225.999423139s 999: 7336721
finished; elapsed: 225.999442906s
Echidnatron%
Takes a while to get started, and then another 15 seconds to do the 1,000 changes.
The approach where we only get to add edges, by using the MinSum
monoid for our diff
, goes a fair bit faster. This is still doing breadth-first exploration by hop count, but it hurts a bit less because we maintain less data:
Echidnatron% cargo run --release --example monoid-bfs -- 1000000 20000000 1000 1 1000 no
Finished release [optimized + debuginfo] target(s) in 0.11s
Running `target/release/examples/monoid-bfs 1000000 20000000 1000 1 1000 no`
performing BFS on 1000000 nodes, 20000000 edges:
953.631951ms loaded
38.696207054s stable
38.696914646s 0: 673442
38.697606572s 1: 675772
...
43.006604253s 998: 677619
43.009548921s 999: 2931505
finished; elapsed: 43.00957037s
Echidnatron%
Faster to get started, and then under 5 seconds to do the 1,000 changes.
In both cases we are only adding edges, but in the second case we have specialized the code to understand and exploit this.
When you find yourself in a situation where you only need to add, or otherwise improve, your input data, rather than retract options, you may be able to get a leaner implementation by thinking through whether your updates look like a monoid!
It's going to depend on your computation, on your data, and possibly on a bit of thinking, but you have more options now with differential dataflow than ever before!