It's been a while since we've done something interesting in differential dataflow. So, today I thought we would try to do something pretty interesting. We're going to splat down just a little bit of code, and it will increase the capabilities of differential dataflow multiple-fold.
tl;dr we are going to introduce the ability to programmatically differentiate and integrate differential collections, and insert differential computation between these two actions to do some very cool things. If you are familiar with the concepts, we'll do as of joins and low-memory delta queries using just a few lines of code.
For those of you new here, or those returning but who need to be warmed up on the subject matter, let's review.
Differential dataflow is a collection-oriented programming framework, which allows you to use high-level operators on collections like map
, filter
, join
, and reduce
, but also weird things like iterate
. As you change your collections, the outputs of the computation change as well, which is pretty neat. It means you get to express your program once, and the underlying system handles both the initial execution as well as the continual maintenance of your computation.
Differential dataflow does this by translating your collection-oriented computation to a dataflow computation. Each collection is re-cast as a stream, and each of the high-level operators become dataflow operators. Each stream contains records that are "updates": they describe how your collection is changing. Operators are implemented to correctly respond to input updates with the output updates that correspond to the correct re-execution of the operator, as if from scratch on your updated input. Of course, we don't actually re-execute anything.
Differential dataflow collections are streams of updates that all have a common form, as a triple:
(data, time, diff)
Each data
is a record in your collection, or that could be in your collection. Perhaps it is a String
containing words, perhaps it is a pair (String, isize)
representing the counts of the number of times we've seen a particular word (the result of applying the count()
operator to the a collection of the first type).
Each time
is ... well let's say it is an integer. Really, it can be any well-founded lattice, but that doesn't usually help folks understand. But, well we are going to have to understand a bit by the end of the post. Sorry!
Each diff
is ... well let's say it is a signed integer.
What the stream describes are the changes that your collection undergoes. Each time your collection changes, because some data
has been added to or removed from your collection, it gets recorded by a triple
(data, time, diff)
where data
is the record in question, time
is the moment at which the change happened, and diff
is the nature of the change (for example, +1 or -1 to describe addition or removal, respectively).
We can always recover the contents of a collection at any time t
by accumulating up all of the updates whose time
is less or equal to t
. Similarly, if you have a collection in mind for each time
, we can produce the corresponding updates by playing time
forward and seeing how we need to change the accumulated collection at each time
.
Differential dataflow operators are cleverly implemented to be correct. That means that whenever an input collection changes, reflected by some updates (data, time, diff)
, the operator knows how to re-imagine its computation to produce a corresponding set of (data, time, diff)
in its output. The operator ensures that for each time
, its output accumulates to the collection that is its operator logic applied to the accumulated input.
These operators may need to be stateful. Not all operators are stateful; certain easy folks like map
and filter
and concat
can simply pass along updates, perhaps with modifications or restrictions. But operators like join
and reduce
will need to retain an accumulated form of their inputs, so that they can correctly respond when they receive new inputs.
In all cases so far, the accumulated state that operators maintain looks like all of the received updates, indexed by data
. Roughly, for any data
we should be able to get access to the history of the record: a sequence of (time, diff)
. Fortunately, we can compact down this history as soon as no users of the state can distinguish between two historical times.
For example, if we have updates
("frank", 17, +1)
("frank", 19, -1)
then once the operators using the collection, perhaps a reduce
that is counting distinct names in the input, reach a time like 20
, the operators can no longer distinguish between the updates, and we can coalesce them, and in this case totally cancel out the updates and reclaim memory.
With these two handy concepts, collections as streams of updates, and operators that automatically correct their output, we are able to compose larger dataflows of computations.
Let's take a simple example that we will return to in just a bit: computing triangles in a graph:
type Node = u32;
type Edge = (Node, Node);
/// Tranform a collection of `(src,dst)` into `(a,b,c)` where the collection
/// contains each of `(a,b)`, `(a,c)`, and `(b,c)`.
fn triangles<G>(edges: &Collection<G, Edge>)
-> Collection<G, (Node, Node, Node)>
where
G: Scope,
G::Timestamp: Lattice+Hash+Ord,
{
// First join (a,b) and (a,c), then test for
// (b,c) by semijoining against the edge set.
edges.join_map(&edges, |a,b,c| ((b,c),a))
.semijoin(&edges)
.map(|((b,c),a)| (a,b,c))
}
What a delightful computation!
This computation produces a collection which contains the triples (a,b,c)
where each of (a,b)
, (a,c)
, and (b,c)
existing in the input edges
collection. If you change edges
, those changes will flow through this computation and update the set of triangles appropriately.
What's not to like?
Well, try running it. Let me know how it goes.
Differential dataflow's stateful operators will keep a fairly substantial amount of state around. The first join_map
will maintain two indexed copies of edges
, and while we could optimize that down to one (shared arrangements!) this is just proportional to the input and not a huge deal. On the other hand, the semijoin
operator will maintain indexed representations of its inputs, one of which is edges
and is fine, but the other of which is the ouput of the first join and IT IS NOT FINE.
It can be enormous.
Consider what happens if some node a
has degree 1,000. The first join produces all triples ((b,c),a)
where (a,b)
and (a,c)
exist, which would be one million triples (you square the degree). In the 2012 common crawl data one node has degree 45,000,000, which you may square at your leisure. The result does not fit in RAM, or on anything short of a few petabytes.
So there could be an issue with state in some of these computations.
Rather than wax philosophical, let's try and get to the meat right away.
We are going to introduce the methods differentiate
and integrate
on collections. Why, and what should they do? The first is a great question to skip for the moment, but we should be able to figure out the second from our understanding of calculus.
Think of an initially empty collection, to which we add the string "frank"
, maybe twice, and then remove both of the copies. Here is an excellent high-tech time-series diagram of the count associated with "frank"
:
2 ------ |
1 -- |
0 -------------- ----|
t
With this in mind, what would a "derivative" look like? This isn't some continuous function, so that isn't going to work. But, we can define a collection whose contents at each time are the changes the source collection undergoes.
I'm thinking something like this for the above plot:
2 |
1 - - |
0 -------------- - ----- ---|
-1 |
-2 - |
t
Here we see two blips where there is a +1
up tick, corresponding to the moments where the count increased by one, and one blip of -2
where the counts got erased back down to zero. Everywhere else in this "derivative" collection is zero, which makes sense because most of the time the source collection isn't changing.
It turns out differentiation is super easy in a system called differential dataflow. You could probably see that coming. Imagine a dead-simple operator whose behavior is that for each input update triple
(data, time, diff)
it produces in its output two updates
(data, time, diff)
(data, time + 1, -diff)
Each input update exists in the output, but just for a moment. By the very next time, it is erased.
That derivative collection is now six updates, two +1
upblips each followed immediately by a -1
downblip, and one -2
followed immediately by a +2
.
It turns out it is a bit harder to do the reverse, what would be "integration".
Integration would be accomplished by just erasing the "cancellations", but we can't really tell which subtractions were introduced by differentiation, and which subtractions were actual changes to the source data. For example, there will be two -1
updates corresponding to the downblips of the first two increments, and we want to remove them, but the -2
update is real and we want to remove the +2
that immediately follows it.
One cheesy hack, which gives the right intuition, is to think of time
as being only the even integers. We reserve the odd integers for subtracting out changes at the even integers. The updates happen at even integers, and their retractions happen at odd integers. Integration is now just dropping updates at odd times. Our careful use of times allowed us to distinguish source changes from their retractions.
Now, we can't exactly do the even-odd hack, because times in differential dataflow can be arbitrary well-founded lattices, but there is a neat way to accomplish the same thing using the dogsdogsdogs::altneu::AltNeu
timestamp combinator.
/// Timestamps with two moments, one just before the other.
pub struct AltNeu<T> {
pub time: T,
pub neu: bool, // alt < neu in timestamp comparisons.
}
Informally, this type wraps any timestamp type to provide two "moments" at each time, one just before the other, but both indistinguishable to any other time. This is spelled out most clearly in its implementation of the less_equal
method that orders AltNeu<T>
elements using T::less_equal
.
// Implement timely dataflow's `PartialOrder` trait.
use timely::order::PartialOrder;
impl<T: PartialOrder> PartialOrder for AltNeu<T> {
fn less_equal(&self, other: &Self) -> bool {
if self.time.eq(&other.time) {
self.neu <= other.neu
}
else {
self.time.less_equal(&other.time)
}
}
}
Here we see that if the two times are equal, we compare the neu
field which orders alt
less than neu
, and if the two times are not equal we just compare them as usual.
There is a bit more work to produce the join
and meet
lattice operators correctly, which I think I've done and which you can check out on your own.
We will use the alt
variant for the moment at which collections change, and the neu
variant for the interstitial moment just afterwards but before the next moment at which a collection might change.
The actual implementations of differentiate
and integrate
can be found in dogsdogsdogs/calculus.rs
, but we are going to reproduce them for you.
In an attempt to reduce errors, the differentiate
method takes collections with a timestamp T
and re-wraps them with an AltNeu<T>
timestamp. The integrate
method takes collections with an AltNeu<T>
timestamp and pops them back up to a T
timestamp. This means we have to bounce into and out of timely dataflow scopes, and that is going to make the signatures a bit scary.
These are extension traits, written this way to present as inherent methods on collections.
/// Produce a collection containing the changes at the moments they happen.
pub trait Differentiate<G: Scope, D: Data, R: Abelian> {
fn differentiate<'a>(&self, scope: &Child<'a, G, AltNeu<G::Timestamp>>) -> Collection<Child<'a, G, AltNeu<G::Timestamp>>, D, R>;
}
/// Collect instantaneous changes back in to a collection.
pub trait Integrate<G: Scope, D: Data, R: Abelian> {
fn integrate(&self) -> Collection<G, D, R>;
}
The first trait is the wordier one to implement, but let's take a quick peek nonetheless. It follows the plan up above pretty much exactly.
impl<G, D, R> Differentiate<G, D, R> for Collection<G, D, R>
where
G: Scope,
D: Data,
R: Abelian,
{
// For each (data, Alt(time), diff) we add a (data, Neu(time), -diff).
fn differentiate<'a>(&self, child: &Child<'a, G, AltNeu<G::Timestamp>>)
-> Collection<Child<'a, G, AltNeu<G::Timestamp>>, D, R>
{
self.enter(child)
.inner
.flat_map(|(data, time, diff)| {
let neu = (
data.clone(),
AltNeu::neu(time.time.clone()),
diff.clone().neg(),
);
let alt = (data, time, diff);
Some(alt).into_iter().chain(Some(neu))
})
.as_collection()
}
}
The second trait is more concise, but also follows the plan up above.
impl<'a, G, D, R> Integrate<G, D, R> for Collection<Child<'a, G, AltNeu<G::Timestamp>>, D, R>
where
G: Scope,
D: Data,
R: Abelian,
{
// We discard each `neu` variant and strip off the `alt` wrapper.
fn integrate(&self) -> Collection<G, D, R> {
self.inner
.filter(|(_d,t,_r)| !t.neu)
.as_collection()
.leave()
}
}
Well done us. If you apply differentiate
and then integrate
in sequence, you get exactly the same collection back (and no "up to an additive constant" nonsense, either; homework: explain why not).
So what do we do with this? Other than apply the two in-order to do nothing, that is.
An "as-of" join is a type of join used on timestamped data, where the timestamp on one input is used to fix the retrieved contents from the other input. You can check out the linked documentation, but it comes from a community in which each single keypress is treated as a personal tragedy. We'll do our own explanation.
Imagine you maintain two collections: prices for things you sell, and orders from people who want to buy some of those things. The most natural computation you might write would be to join the orders with the prices, and track what customers should pay you.
let pay_me = orders.join(prices)
This is a great start, but your customers will start complaining as soon as you change your prices, as differential dataflow will update the join and send them new bills. Imagine: you are sitting at home, and get a phone call explaining that the 283 lattes you had in college just had a $1 price bump, and would you please forward along the cash promptly?
Instead, the as-of join picks out the value at the logical timestamp associated with each input, and locks it in. Whatever the price was at that time, that's what you get forever. Subsequent changes to prices
do not result in changes to pay_me
.
Makes sense, but it's not what differential dataflow does.
But, we can make that be what differential dataflow does, by using differentiate
:
// Perform an as-of join with `prices`.
let pay_me = dataflow.scope(|inner| {
orders.differentiate(inner)
.join(prices)
.integrate()
};
That might be a lot at once. Or perhaps, not quite enough at once. So let's unpack what is going on here.
First off, syntactically: while we know that differentiate()
followed by integrate()
is a no-op, that is not true when we put computation between them. This does something other than orders.join(prices)
, and our job is to unpack what that difference is.
To see what is going on, we could probably use an example. Let's say that an order comes in for "bacon"
at time
.
("bacon", time, 1)
The differentiate
operator is going to change this to be
("bacon", Alt(time), 1)
("bacon", Neu(time), -1)
This is the slightest of blips of "bacon"
. At any moment that isn't time
itself the two cannot be distinguished, and both accumulate to zero. But the blip exists for long enough that a correct differential dataflow will produce the correct corresponding output blip. What will that blip be?
A correct differential dataflow join
will produce at Alt(time)
the price of bacon at that time, and at Neu(time)
produce the same price with a subtraction.
(("bacon", price@time), Alt(time), 1)
(("bacon", price@time), Neu(time), -1)
Once time
has passed, all subsequent times cannot distinguish between these updates and will not produce any outputs in response to this blip. In fact, quite quickly we should expect that these two will cancel and simply be deleted from history. They are entirely forgotten, except that the output blip makes its way to the integrate
operator.
The integrate
operator will discard the neu
update, and produce only
(("bacon", price@time), time, 1)
which is the change we want to see.
Homework What happens if in the future we retract "bacon"
from orders
? Does it undo the computation and retract price@time
? No, it does not! What does it do? Write an short essay on how we have just broken differential's "incremental view maintenance" property, and how one should use this sort of stuff with care.
Ok, we talked about triangles up above, and this post is a pretty long walk to return to that and show you how to do it more efficiently. Recall that we described a three-way join up above that helped us compute triangles. It was great, except that it took more memory than are contained in one million MacBooks (Apple, call me). That join looked like
(EdgesAB join EdgesAC) join EdgesBC
where I've given different names to our re-use of the same edges
collection, for reasons of clarity that should soon become apparent. As the above collections change, the output will change. This is what we love about differential dataflow. We just wish it took less memory.
Let's talk through what happens when each input collection updates.
When an input changes, say EdgesAB
, we have a bit of work to do. We have to join the change against EdgesAC
, and then if that produces any results join them against EdgesBC
. This doesn't actually require lots of memory, just indexed forms of EdgesAC
and EdgesBC
. The same is true of EdgesAC
: when it changes we just need to join against EdgesAB
and then with the results against EdgesBC
.
The complicated one is EdgesBC
. When it changes, according to the plan up above, we need to join its changes against the contents of (EdgesAB join EdgesAC)
, which is enormous. We really don't want to do that. We really wish we could have just joined against EdgesAB
and then EdgesAC
. Or the other order, just not both at once please.
When we write down the three rules we'd like to use to update the triangles (△) when there are changes in each of the inputs, the rules look like [ed: someone just discovered unicode]:
d△/dAB := dEdgesAB ⋈ EdgesAC ⋈ EdgesBC
d△/dAC := dEdgesAC ⋈ EdgesAB ⋈ EdgesBC
d△/dBC := dEdgesBC ⋈ EdgesAB ⋈ EdgesAC
Notice that for each rule I've written its input with a d
prefix, because really we just want to take the changes and join them against the whole other two collections (not just their changes).
In the dogsdogsdogs
project, we used this intuition to write a bunch of clever custom operators that allow this but ... we could totally just use differentiate
to get the d
collections, and then join, and then integrate()
the results.
/// Tranform a collection of `(src,dst)` into `(a,b,c)` where the collection
/// contains each of `(a,b)`, `(a,c)`, and `(b,c)`.
fn triangles<G>(edges: &Collection<G, Edge>)
-> Collection<G, (Node, Node, Node)>
where
G: Scope,
G::Timestamp: Lattice+Hash+Ord,
{
edges.scope().scoped(|inner| {
// Bring in both `edges` and its derivative.
let edges = edges.enter(inner);
let d_edges = edges.differentiate(inner);
// d△/dAB := dEdgesAB ⋈ EdgesAC ⋈ EdgesBC
let d△dAB =
d_edges
.join_map(&edges, |a,b,c| ((b,c),a))
.semijoin(&edges)
.map(|((b,c),a)| (a,b,c));
// d△/dAC := dEdgesAC ⋈ EdgesAB ⋈ EdgesBC
let d△dAC =
d_edges
.join_map(&edges, |a,c,b| ((b,c),a))
.semijoin(&edges)
.map(|((b,c),a)| (a,b,c));
// d△/dBC := dEdgesBC ⋈ EdgesAB ⋈ EdgesAC
let d△dBC =
d_edges
.join_map(&edges.map(|(a,b)| (b,a)), |b,c,a| ((a,c),b))
.semijoin(&edges)
.map(|((a,c),b)| (a,b,c));
d△dAB.concat(d△dAC).concat(d△dBC).integrate()
}
}
That's a bit wordier than up above, but this behaves much better. Each of the three joins are between streams of blips and input collections (edges
). The streams of blips may be transiently substantial, but they vanish as soon as each time is complete. The edges
collections are necessary, but at least they aren't petabytes.*
Caveat: The above doesn't actually work, due to double counting of simultaneous updates. Consider adding the edges (1,2)
, (1,3)
, and (2,3)
, from which we expect one new triangle (1,2,3)
. The dataflow above will give us three copies, because each dataflow finds that its update leads to a triangle (true, but already accounted for). The answer is to do the same trick that dogsdogsdogs
does, which is to "nudge" the uses of edges
in the AltNeu
timespace so that the three rules appear to execute in sequence, rather than concurrently.
/// Tranform a collection of `(src,dst)` into `(a,b,c)` where the collection
/// contains each of `(a,b)`, `(a,c)`, and `(b,c)`.
fn triangles<G>(edges: &Collection<G, Edge>)
-> Collection<G, (Node, Node, Node)>
where
G: Scope,
G::Timestamp: Lattice+Hash+Ord,
{
edges.scope().scoped(|inner| {
// Bring in both `edges` and its derivative.
let edges_alt = edges.enter(inner);
let edges_neu = edges_alt.delay(|t| t.neu = true);
let d_edges = edges.differentiate(inner);
// d△/dAB := dEdgesAB ⋈ EdgesAC ⋈ EdgesBC
let d△dAB =
d_edges
.join_map(&edges_neu, |a,b,c| ((b,c),a))
.semijoin(&edges_neu)
.map(|((b,c),a)| (a,b,c));
// d△/dAC := dEdgesAC ⋈ EdgesAB ⋈ EdgesBC
let d△dAC =
d_edges
.join_map(&edges_alt, |a,c,b| ((b,c),a))
.semijoin(&edges_neu)
.map(|((b,c),a)| (a,b,c));
// d△/dBC := dEdgesBC ⋈ EdgesAB ⋈ EdgesAC
let d△dBC =
d_edges
.join_map(&edges_alt.map(|(a,b)| (b,a)), |b,c,a| ((a,c),b))
.semijoin(&edges_alt)
.map(|((a,c),b)| (a,b,c));
d△dAB.concat(d△dAC).concat(d△dBC).integrate()
}
}
This sequencing means that in the first rule, concurrent updates to edges
are "hidden" from the alt
updates in d_edges
. In the second rule, we only hide concurrent updates to EdgesBC
and expose concurrent updates to EdgesAB
. In the third rule, each of the prior relations have concurrent updates exposed.
Sorry for not telling the truth earlier.
*: Actually they may be petabytes, but only transiently. We won't accumulate the results, but we will need to materialize the results briefly. You could get petabytes if you add all 45,000,000 edges all at the same time
. If you add them more slowly it won't spike up so high. Sorry again, about the truth!
I'm personally delighted that the differentiate
and integrate
methods allow us to do so much more with differential dataflow than were originally intended. I'm especially pleased that there is now a way to do low-memory streaming joins within iterative contexts (the prior dogsdogsdogs
construction was only correct for totally ordered timestamps).
I've got to catch up on a vlog post I did a while back where I was on a roll and then had to bail out when I tried to use this stuff in an iterate
and wasn't getting the correct results (it was just wrong; so embarassing!). But now I can head back, record some more esoteric talky content, and see if we can compute trusses any faster than previously!
Also, we totally want to try and wire this up with Worst-Case Optimal Joins, which are another thing the dogsdogsdogs
project does. The triangle join plan we just talked through, for example, doesn't sit on nearly as much memory as before but it still does way too much work. You can do both at the same time, at least with bespoke operators, and I bet you can do so with native differential operators now that we have differentiate
and integrate
.