Skip to content

Latest commit

 

History

History
391 lines (321 loc) · 24.2 KB

2024-11-25.md

File metadata and controls

391 lines (321 loc) · 24.2 KB

Understanding Consistency in Materialize

Materialize draws data in from multiple external transactional sources of data, and provides a "consistent" view over the ensemble of data. In fact, one of its primary contributions is the introduction of structure such that it both:

  1. faithfully reflects the transactional state of each input, and
  2. brings together multiple transactional inputs into a single timeline.

How Materialize pulls this off is both subtle and at the same time surprisingly straight-forward. We'll unpack how this unfolds, starting from your transactional source of truth, on through integrations with other sources, and across many independently authored and maintained SQL views.

By the end of the post, you should have a clear understanding of how Virtual Time can provide concurrency control mechanisms that compose with each other, and how Materialize uses Virtual Time to align its input transactional data, and provide always consistent outputs.

Consistency and Change Data Capture (CDC)

Let's start with a hypothetical transactional source of business data (e.g., PostgreSQL). It will contain three tables, product, client, and sales, each containing the current state of the relevant information. As time passes these tables may change, but the changes happen "transactionally", in that some changes to multiple tables are meant to happen at the exact same moment.

Let's draw a sparkline indicating the moments at which these tables change.

OLTP DB
-------
product 0----------*----*-----**-------->
 client 0----------*----*----*-*-------->
  sales 0----------*----*----***-------->

What we've drawn here for each is a line going from left to right. Each starts at some initial moment 0--, experiences updates at each -*-, up to its current state indicated by -->.

The vertical stacking of the lines means to suggest transactional consistency: tables may update at exactly the same time. A "serializable" database is one where there is such a linear timeline: each transaction occurs atomically, in some total order. Moreover, anyone looking at the data sees it at some moment in this timeline. Let's represent this with a vertical line to indicate a transactionally consistent view.

OLTP DB          <- consistent view ->
-------                    |
product 0----------*----*--|--**-------->
 client 0----------*----*--|-*-*-------->
  sales 0----------*----*--|-***-------->

I've dropped a vertical line at an arbitrary aesthetically appealing location, but everything we'll discuss checks out for any vertical line. The vertical lines will define what it means to be transactionally consistent, for this post at least.

One of the most appealing properties of a database is that it masks the complexity of continually updating data, and presents as if your data moves through a sequence of consistent states. If you were to drop in to the OLTP database and issue a query, the answer would be as if we stopped the world for long enough to get the precise answer at some moment.

Consider for example, a reporting query like so:

  -- Avg of big-ticket sales, by client.
  SELECT client.name, AVG(sales.amount)
    FROM client, sales
   WHERE client.c_id = sales.c_id
     AND sales.amount > 100;
GROUP BY client.name

Although this brings together information from client and sales, with each record potentially altering some result, the output would be as if executed atomically at some vertical line dropped through the timelines of the tables. If every sales.c_id has a corresponding client.c_id, we will be sure to incorporate each of them. If multiple sales were part of the same transaction, we'll see either all of them or none of them.

However, providing the appearance of transactional updates is taxing for an OLTP database. Ad-hoc query processing interferes with the continual updates to the source tables, and the longer a query needs to run the greater the skew between its results and reality. This is where Materialize comes in.

Differential Dataflow and Virtual Time

The timelines we've drawn are not only a helpful way of thinking about transactional systems, they are also a tool for ensuring consistency. Specifically, Virtual Time is a concurrency control mechanism that asks for all updates to be explicitly timestamped, where the stamped times fully spell out the order in which commands are applied. In Materialize, and Differential Dataflow (the engine on which it builds), these timestamps are the tool for ensuring consistency.

Recall our sparkline from above, annotated with OLTP to remind us where that comes from.

product 0----------*----*-----**-------->   \
 client 0----------*----*----*-*-------->   | OLTP
  sales 0----------*----*----***-------->   /

Although not necessarily the case, imagine that each update -*- happens at an explicitly recorded moment in time. Databases do not necessarily record updates by time, perhaps instead using say sequence numbers, but we will. Materialize will assign explicit times to each update to ensure transactional consistency: all updates for any one transaction get an identical timestamp.

Concretely, Materialize represents all updates as triples (data, time, diff). The data component is the row that experiences a change. The time component is the timeline position of the moment the update occurs. The diff copmonent is best thought of as either "insert" or "delete". Transactional consistency is provided by having updates in a transaction use identical time coordinates.

These times are not just a helpful consistency idiom, but they tell us exactly what we need to compute to respond to a query at a time. Let's start with what might be the first part of the SQL query above: restricting our attention to sales with an amount of more than 100. This is only a function of sales, but we can place the result in the context of all of the data.

product 0----------*----*-----**-------->   \
 client 0----------*----*----*-*-------->   | OLTP
  sales 0----------*----*----***-------->   /

WHERE sales.amount > 100                    \ Differential
        0----------*-----------*-------->   / Dataflow (DD)

The WHERE clause gets its own timeline, consistent with all the other timelines. This timeline is exactly determined from the timeline of the sales table. Each -*- update in sales may (or may not) result in a corresponding update in the result. We can determine the exact timeline, conceptually at least, by moving through time moment by moment, and observing how the output must change as a function of the input and the query logic.

Were we to drop a vertical line for some view of the data, consistency still checks out.

OLTP DB          <- consistent view ->
-------                    |
product 0----------*----*--|--**-------->   \
 client 0----------*----*--|-*-*-------->   | OLTP
  sales 0----------*----*--|-***-------->   /
                           |
WHERE sales.amount > 100   |                \ Differential
        0----------*-------|---*-------->   / Dataflow (DD)

The timeline for the output of the WHERE clause aligns exactly with the timeline for the input.


This is the "subtle, but also simple" moment.

Materialize sets up a framework that tells us what the correct answer needs to be for every time. It then uses distributed, streaming, scale-out infrastructure to determine these correct answers. And although the system internals are fanscinating and nuanced, the user experience and outcomes are meant to be simple and clear. Your query results will be as if we stopped the world to compute them for you, and we'll shoulder the burden of doing it more efficiently than that.


Let's add the operators that correspond to our SQL view into the stack of timelines:

                 <- consistent view ->
                           |
product 0----------*----*--|--**-------->   \
 client 0----------*----*--|-*-*-------->   | OLTP
  sales 0----------*----*--|-***-------->   /
                           |
WHERE sales.amount > 100   |                \
        0----------*-------|---*-------->   |
JOIN client ON (c_id)      |                | SQL
        0----------*-------|-*-*-------->   | view
GROUP BY client.name       |                |
        0----------*-------|-*-*-------->   /

Once we have filtered sales we join with client, producing a collection that may change whenever either input changes. We then group by client id and aggregate the results, producing a collection that can only change when its input does. As before, each timeline is exactly determined from its input timelines and the query logic.

Also as before, the exact correspondence is a basis for consistency. If we drop a vertical line, we are able to align a consistent view over the inputs and their corresponding outputs. This consistency comes despite the OLTP inputs and the SQL view computation being on two potentially independent systems. The explicit timelines are the only mechanism coordinating the two systems. They are nonetheless powerful enough to exactly correlate input data and output results.

Materialize

Differential Dataflow provides the building blocks for transforming timelines, but Materialize is what assembles those blocks into a full SQL experience.

Stepping back, there are several tasks Materialize performs that we'll want to call out in order to build a fuller system.

  1. Ingest each OLTP input as transitions on a common timeline.

    Our examples above used a single OLTP input, with multiple tables, but you may have tables from multiple independent sources you are bringing together. Materialize cannot make independent sources become consistent (a very hard distributed systems problem), but it can place all of them on a common timeline. Each input will be internally consistent (i.e., transactions respected by Materialize), with an opinionated take about how their timelines interleave.

  2. Maintain the consistent timelines for any composition of derived views.

    We saw an example of a relatively simple SQL view above, and how one might reason about the relationship between its inputs and outputs. This only gets more complicated with multiple views, authored independently, executing across independent compute hardware. Nonetheless, our building blocks say that we can do this, and this is a responsibility that Materialize shoulders.

  3. Determine how to respond to user queries.

    We dropped a vertical line to indicate the consistent moment at which we might provide a query response. Who chooses where to drop this vertical line, and what are the trade-offs in responsiveness, freshness, and consistency? If we can't respond immediately because the results aren't yet ready, how should we communicate this?

  4. Keep users up to date on the progress of their queries.

    If we are unable to respond immediately to your query, how should we communicate the progress the system is making towards its determination? Is there something simpler to show you than the various timelines, arrowheads, and dropped vertical lines?

Let's unpack these tasks.

Task 1: Data Ingestion

Materialize's CREATE SOURCE command allows you to bring in a collection of transactionally consistent tables from an external upstream source. The source is Materialize's unit of internal consistency: all tables from the same source will update in lock-step with the transitions of their input tables, always consistent with one another. Updates to tables from different sources will be put in an order, by virtue of being put in a timeline, but that order may not reflect external causal constraints.

                 <- consistent view ->
                           |
product 0----------*----*--|--**-------->   \
 client 0----------*----*--|-*-*-------->   | OLTP 1
  sales 0----------*----*--|-***-------->   /
                           |
reports 0---------*----*---|-**--------->   \
  deals 0---------*----*---|*-*--------->   | OLTP 2
  money 0---------*----*---|***--------->   /

Notice how there is a bit of skew between what might be related update events. Consistency is nonetheless defined by the vertical line through timelines.

By putting all source updates onto a common timeline, Materialize introduces foundational structure that is otherwise missing. It is admittedly guessing a bit, about how updates to unrelated sources interleave, but having done so there is now one view of all sources, shared by all users. Materialize resolves and locks down one source of ambiguity, so that all downstream uses can be consistent with each other, and with each source individually.

The problem of putting multiple unrelated sources in a consistent order is fundamentally hard. While you may know that you update your MySQL before your PostgreSQL, no one else knows this. Database systems don't yet have great hooks for exposing these levels of cross-system constraints, and most solutions are bespoke (e.g., causality tokens). Materialize's common timelines are one way to introduce this structure, and make it available going forward.

Task 2: View Maintenance

Materialize maintains views using Differential Dataflow (DD), which - as sketched above - translates input timelines to output timelines. While DD ensures that the input and output timelines align perfectly, this comes at a cost: the output timelines are likely not immediately available.

Let's return to our example from before, but pay attention to the arrowheads --> indicating the extent of completed work.

                 <- consistent view ->
                           |
product 0----------*----*--|--**-------->   \
 client 0----------*----*--|-*-*-------->   | OLTP
  sales 0----------*----*--|-***-------->   /
                           |
WHERE sales.amount > 100   |                \
        0----------*-------|---*------>     |
JOIN client ON (c_id)      |                | SQL
        0----------*-------|-*-*---->       | view
GROUP BY client.name       |                |
        0----------*-------|-*-*-->         /

Each arrowhead necessarily lags the arrowheads of its immediate inputs. In order to know the answer for some dropped vertical, the input must also be known at that time, and then some (ideally small) amount of real time is required to establish and express confidence in the answer.

Understandably, you are probably most interested in results for verticals near the arrowheads of your input data: these represent the freshest views of your data. At the same time, these are also where the lag, however slight, prevents you from immediately knowing the answer.

Materialize, and DD underlying it, are optimized around reducing the lag of these arrowheads. As much work as possible is done ahead of time, so that when an input arrowhead advances we can advance the output arrowheads in the smallest amount of real time.

Everything we've said about individual DD operators generalizes to entire SQL views.

                 <- consistent view ->
                           |
product 0----------*----*--|--**-------->   \
 client 0----------*----*--|-*-*-------->   | OLTP
  sales 0----------*----*--|-***-------->   /
                           |
CREATE VIEW big_sales AS   |
SELECT * FROM sales WHERE sales.amount > 100
        0----------*-------|-***--->
                           |
CREATE VIEW client_vol AS  |
SELECT c_id, COUNT(*) FROM client GROUP BY c_id
        0----------*-------|--*------>
                           |
SELECT * FROM big_sales JOIN client_vol ON (c_id)
        0----------*-------|--**-->

Here we've created several views, each with an output timeline, as well as a query that brings the views together. Despite no knowledge that the views were designed to be used together, Materialize ensures that they can only be viewed consistently.

I've also shiften the arrowheads --> around to remind you that it may not be what you think. Although each view cannot be farther ahead than its inputs, their progress isn't necessarily in any other order. Materialize is in charge of tracking what outputs are locked in for each view on your behalf, which it uses to inform how it should respond to your queries.

Task 3: Timestamp Selection

Finally, who chooses these vertical lines?

Each dropped vertical line corresponds to a "timestamp" on the common timeline. How we choose timestamps reflects three facets of Materialize's product principle of trust: responsiveness, freshness, and consistency. These three are often in tension, but let's see what each corresponds to in isolation:

  1. Responsiveness: Always choose a timestamp to the left of (before) the arrowhead of the query output. This ensures that Materialize is always able to immediately answer your question; no waiting!
  2. Freshness: Always choose a timestamp to the right of (after) all input arrowheads. This ensures that Materialize only responds with results that reflect the most recent input.
  3. Consistency: Always choose a timestamp to the right of (after) all previously chosen timestamps. This ensures that Materialize presents as moving forward through the common timeline, rather than jumping forwards and backwards.

You can now see how these might be in tension.

Recall that multiple people may be using Materialize at the same time, and they may have different goals. Let's look more closely at the potential interactions of three potential query timestamps, in the figure below T0, T1, and T2.

                           T0       T1   T2
                           |        |    |
product 0----------*----*--|--**----|--->|  \
 client 0----------*----*--|-*-*----|--->|  | OLTP
  sales 0----------*----*--|-***----|--->|  /
                           |        |    |
CREATE VIEW big_sales AS   |        |    |
SELECT * FROM sales WHERE sales.amount > 100
        0----------*-------|-***->  |    |
                           |        |    |
CREATE VIEW client_vol AS  |        |    |
SELECT c_id, COUNT(*) FROM client GROUP BY c_id
        0----------*-------|--*-----|>   |
                           |        |    |
SELECT * FROM big_sales JOIN client_vol ON (c_id)
        0----------*-------|>       |    |

Each of these query timestamps provides a different qualitative experience to the users.

The T0 timestamp is great for someone who wants to access a complex query (the last one in the diagram) interactively. The corresponding result is not as up to date as others, but it is still available at a consistent moment. As long as the user looks at the same view they can keep going right, providing the experience of moving only forward in time.

The T1 timestamp is great for someone who wants to access the client_vol view. While also not immediately up to date, it is available at a relatively recent timestamp. Combined with the T0 motivation, it should be clear how ensuring consistency (always go right) puts T1's freshness in conflict with T0's responsiveness. A user who wants to see both can't get what they want at the same time, without some give.

The T2 timestamp is for a freshness absolutist, who needs to be sure that they are seeing results that reflect reality as of when the query was submitted. Imagine presenting a bank balance back to a customer, or checking inventory levels before confirming a purchase. While the freshness is great, as good as it gets, there are significant responsiveness limitations. This level of freshness can be ensured by the "zero-staleness" feature, which provides "real-time recency" guarantees.

Task 4: The Query Lifecycle

We've seen a bunch of complex pictures of common timelines, as they form the backbone of consistency in Materialize. Most users, however, don't want to have to flip through these things to figure out what is going on with their query. Fortunately, there is a simpler way to understand Materialize's operation in the context of individual queries.

Each query that arrives at Materialize is first assigned a timestamp. The timestamp corresponds to the vertical line, and its choice is a reflection of the transaction isolation levels of the user's session. There is some explaining to do about how your timestamp is chosen, which you can consult as you wait for your results.

But why are you waiting? We've chosen a timestamp; what prevents the immediate presentation of that information? The information you are looking for is essentially the progress bar for which arrowheads have passed the dropped vertical line.

Let's return to the example above, and the experience of a user assigned the T1 timestamp.

                                    T1   
                                    |     
product 0----------*----*-----**----|--->   \
 client 0----------*----*----*-*----|--->   | OLTP
  sales 0----------*----*----***----|--->   /
                                    |     
CREATE VIEW big_sales AS            |     
SELECT * FROM sales WHERE sales.amount > 100
        0----------*---------***->  |     
                                    |     
CREATE VIEW client_vol AS           |     
SELECT c_id, COUNT(*) FROM client GROUP BY c_id
        0----------*----------*-----|>    
                                    |
SELECT * FROM big_sales JOIN client_vol ON (c_id)
        0----------*-------->       |
                                    |

With T1 locked in, Materialize can use its view of the arrowhead state, and the dependences among sources and views, to produce (and keep up to date) the following output:

> SELECT * FROM big_sales JOIN client_vol ON (c_id);
query timestamp: T1

 type  | name       | status
-------+------------+------------
 input | sales      | ready
 input | client     | ready
 view  | client_vol | ready
 view  | big_sales  | refreshing
 query |            | pending

As time advances, and arrowheads move rightwards, the arrowhead of big_sales will pass T1, changing to ready and moving query to the refreshing state, until it too advances to the right. As time advances, more and more of the query steps transition to ready, until they are all ready - you should then have your response imminently.

This view of the system simplifies boils everything down to how it relates to T1. Work whose arrowheads have passed T1 are "ready", work which depends on "ready" inputs but is not itself ready are "refreshing", and other work is "pending". This looks (to me) closest to what a person who wants the answer to their query wants to know: of the work that needs to get done, which are done, which are in progress, and what still needs to be done afterwards.

Wrapping up

Materialize's value proposition is that it digests a complex world of continually changing data, and presents it back to you and yours in a more manageable form.

Your OLTP data continually evolves based on your business, its rules, and other considerations outside of Materialize's purview. Nonetheless, Materialize uses change data capture to present the data as if you were in the OLTP database itself. Updates are always consistent, and the state of the system moves continually forward.

Your SQL business logic is potentially highly complex, and may rely on multiple sources of data. Materialize uses the structure of Virtual Time to get a head start on your queries, precomputing their results and keeping them up to date as time advances. Virtual Time also allows the integration of multiple upstream sources: once brought on to the same timeline, SQL queries across multiple inputs have specific answers Materialize can compute and incrementally maintain.

Your interactions with Materialize, queries specifically, also inhabit the same timeline, and result in precisely correct answers at the chosen times. The way in which Materialize choose query times reflects the isolation guarantees you've requested, trading off responsiveness, freshness, and consistency. The timeline also provides a useful idiom for Materialize to report progress back to you, as a sequence of tasks that "complete" as they pass the query timestamp.

Although Materialize is complex under the hood, fascinatingly complex, it fundamentally aims to provide simplicity back to you. Virtual Time and the consistent timelines it produces are the backbone of this simplicity.