When designing or picking the “right” stream processor, we often focus on performance on the “happy path”. That is, how low is the end-to-end latency and/or throughput during execution without any failures. Although important, especially for time-critical pipelines, this metric does not necessarily tell the whole story.
In stateful distributed systems, fault tolerance is a critical component and, oftentimes, one that can severely impact performance. To understand the relationship between the two, one has to keep in mind that “performance” measures how fast a system makes progress, while fault tolerance is the ability to make any progress at all in the face of failures.
In this post I will focus on systems like Apache Flink which rely on global consistent state snapshots for fault-tolerance and I will study analytically the impact of scale on their ability to make progress in the face of failures. More precisely, I will show that the probability of an unfortunate series of failures and restarts that leads to the system making no progress becomes far from negligible as the scale grows.
I will focus on Apache Flink as it is the framework that I am mostly familiar with. I am a committer and PMC member for the project with contributions spread over more than 5 years and this is a question that I had in my mind for quite some time.
To be clear, I am not sure how often the scenario we are studying here is encountered in production as there are ways to limit its impact. In fact, Flink is already being used at large scale in production without any issues. Many companies trust it for many of their revenue-critical tasks. Alibaba trusts Flink for its Single’s Day shopping spree and you can find even more on how Flink performs at scale by watching some of the Flink Forward talks where companies like Alibaba, Netflix, Uber, Epic Games (the company behind Fortnite among others) and many others share their experience.
Nevertheless, I believe that the findings here are worth sharing and taking into consideration when configuring Flink or when designing a new data processing system.
With that said, let’s dive in the discussion…
Flink’s fault tolerance mechanism is based on taking consistent global snapshots of the state of a pipeline using a variation of the Chandy-Lamport algorithm. These snapshots are then used to bootstrap Flink’s state upon recovery.
This section presents Flink’s checkpointing mechanism in a nutshell. The thing to keep in mind for the remainder of the discussion is the all-or-nothing contract that is attached to Flink’s snapshotting mechanism. All the tasks of a pipeline have to be active at the same time in order to take such a consistent snapshot, and all tasks fall back to the state in the snapshot upon recovery. Everything in-between, such as only the failed task(s) resuming execution from the state in the snapshot while the rest keep running, would lead to inconsistent results.
If you know enough about Flink to understand the statement above, then feel free to go to the next paragraph. If not, please keep reading…
To take a consistent snapshot, Flink injects in its input streams some special records called “checkpoint barriers”. These barriers travel along with the data from source(s) to sink(s) without overtaking data records1.
Checkpoint barriers are injected to the sources of the pipeline by the Checkpoint Coordinator. When an operator receives a barrier for checkpoint c
from all of its input streams, it emits a barrier for that snapshot to all of its outgoing edges, it writes its working state to persistent storage, and it acknowledges to the coordinator that barrier for checkpoint c
was processed successfully. When all operators have acknowledged successful completion of checkpoint c
, then checkpoint c
is considered successful. From now on, in case of a failure, Flink will resume from the latest successful checkpoint which cannot be earlier than c
. This means that all processing done until checkpoint c
is safe as the computed results till that point will never be rolled back. For more details, refer to Flink’s documentation on checkpointing.
Summary: For checkpointing to work, all the tasks of the pipeline have to be active at the same time. For checkpoint c
to be complete, all operators must have processed and acknowledged it. In addition, the snapshot taken as a result of a checkpoint reflects the state of the whole pipeline at some point in time. As such, if one task fails, the whole pipeline has to roll back its state to the point of the last successful checkpoint and resume processing from that point onwards.
Upon failure, a pipeline will fall back to its last successful checkpoint c
. Work done between the time c
was taken and the moment of the failure will be lost. Furthermore, if a failure happens after the pipeline has recovered and before it has successfully processed checkpoint c + 1
, any results computed after recovery will be lost and the system will roll back to checkpoint c
...again. This means that in an unfortunate sequence of events, our pipeline may end up in a loop of failures and recoveries that can lead to making no progress.
Window of Vulnerability: Assuming that our pipeline needs T
sec between the time the first task of the recovered incarnation of our pipeline starts running and the “next” checkpoint — i.e. the first checkpoint after recovery — is considered complete, this leaves the system with a “window of vulnerability” of size T
sec. If another failure occurs within T
sec, then we will have to fall back to checkpoint c
thus making no progress.
Problem Statement: Here, we compute how likely it is to have a failed task within T
sec (or within its “window of vulnerability”) for a pipeline with S
tasks running on a cluster of n
nodes that has a probability of a node failure within a period of T
sec equal to p
. Remember that even if a single task fails, the whole pipeline will have to restart, and thus, roll back its state to the last successful checkpoint.
Assumptions: We assume that (i) tasks are assigned to nodes independently and at random and (ii) each node in the cluster can hold at most one task from a given pipeline. Although Flink allows us to collocate more than one tasks on the same node, assumption (ii) does not affect the applicability of the findings as we can adjust S
to reflect the number of unique nodes running taks of a given pipeline. Assumption (ii) allows us to assume independent failures which is a rather common assumption in the literature and make our computations here tractable. Finally, we account for failures that are equally likely to happen to any pipeline and not pipeline-specific ones, such as bugs in the pipeline code. These can be failures due to power outage, network failures, disk failures, etc.
As we will show in the remainder of this post, it turns out that the probability of a pipeline having to restart within T
sec increases rapidly with the number of tasks it contains, even if the probability of a node failure within T
sec is low.
The results of the analysis are presented in the graph below while the analysis itself is presented in the next paragraph. Here I assume that p=0.1%. This is an arbitrary number that I came up with for this example and may not reflect the reality in your settings. In real settings, p
will depend on the size of your cluster and the duration of your pipeline’s “window of vulnerability”. This 0.1% only serves to show that, counterintuitively, although a node failure within T
sec may not be so likely in your cluster, this does not mean that your pipeline is unlikely to have to restart.
The above graph shows the probability of a pipeline having to restart due to one of its tasks failing within its window of vulnerability for a pipeline with S = 10
, 100
, 500
and 1000
tasks. In addition, for each of the cases above, we vary the size of the cluster from 1
to 10000
nodes to show the impact of the size of the cluster. For each pipeline, we start from a cluster size of n
equal to S
. This is because we assumed that a node can accommodate at most one task from a given pipeline, so if n < S
, the cluster has not enough resources to accommodate the pipeline.
The graph shows that even if a node failure within T
sec may have a probability of p=0.1%
, the probability of the pipeline having to restart within the same period is a bit below 40%
for a pipeline with 500
tasks and it surpasses 60%
for a pipeline with 1000
tasks, irrespectively of the size of the cluster. These numbers are far from negligible. In fact, 60%
means that such task failures are the rule rather than the exception. Intuitively, the reason for this is that although the probability of a single node failing is low, a pipeline with 1000 tasks will have to restart if any of the 1000 nodes that execute its tasks fails.
Furthermore, the graph shows that for the same cluster size, the probability of a pipeline having to restart increases as the number of tasks per pipeline grows. This is rather expected as the more the tasks in a pipeline, the higher the probability that at least one of them - or more precisely a node that executes one of these tasks - will fail.
Finally, we see that the probability of having a task failure is practically unaffected by the size of the cluster. The reason we also vary the size of the cluster is because the analytical model based on which the graph was drawn (see Eq.4 below) includes it as one of the parameters affecting the probability of a task failing within the pipeline’s window of vulnerability.
If you blindly trust my math, then feel free to skip this paragraph and go straight to the Discussion. If not, here we will calculate the probability of at least one of our S
tasks being scheduled on one of the f
failed nodes in the cluster, where f
can range from 1
to n
nodes, where n
is the size of the cluster. Or, in a more formal way:
To calculate this, we will make use of the definition of the conditional probability which gives says that:
In this equation, A
is the event of having at least one failed task and B
is that of having f
failed nodes in cluster. This translates into P(B)
being the probability of having f
failed nodes in the cluster, while P(A|B)
is the probability of at least one of our tasks being scheduled on one of the f
failed nodes. Equations (1) and (2) combined give us:
Equation 1: Probability of a restart.
Intuition: The above formula says that the probability of a pipeline having to restart is equal to the probability of having f
node failures in the cluster, multiplied by the probability of at least one of our tasks being scheduled on one of the nodes that failed. The multiplication is due to the fact that “selecting” which nodes fail is independent of the selection of where to put the tasks of a pipeline.
Have f failed nodes: In a cluster with n
nodes with a probability of a node failing within T
time units equal to p
, we have that the probability of having f
failures follows the binomial distribution or:
Equation 2: Probability of f nodes failing.
The binomial distribution describes the probability of having f
successes in a sequence of n
independent experiments.
Have at least one of our S tasks on a failed node: As said earlier, we assume that tasks are assigned to nodes independently and at random and that each node can only have at most one task from a given pipeline.
With this in mind, if the number of tasks, S
, plus the number of failures, f
, is greater than the number of nodes in the cluster, n
, or, S + f > n
, then we are certain that at least (S + f - n)
tasks have failed, so the pipeline will have to restart. So:
For the case where S + f ≤ n
, we have that there are “n choose S” (see binomial coefficient) ways of assigning the S
tasks on the n
nodes of the cluster, and all combinations are equally likely to happen. In addition, given that we have f
failed nodes, and thus (n-f)
non-failed ones, we have “(n-f) choose S” task assignment schemes that assign all S
tasks on non-failed nodes. So, the probability of selecting a task assignment scheme where all tasks are on healthy nodes is:
Which gives us that the probability of having at least one task scheduled on a failed node is equal to 1 minus the above, so:
Combining the equations above we have:
Equation 3: Probability of at least one task failing.
Putting Equations (2) and (3) together in Equation (1) gives us:
Equation 4: Probability of a pipeline restart.
The above analysis resembles that of the probability of a node failure in the context of replication strategies in distributed storage systems. A good read on the topic can be found here.
The analysis above shows that the probability of a pipeline having to restart within a given period of time increases with the size of the pipeline’s “window of vulnerability”, as the bigger the T
the higher the p
, and the number of tasks in the pipeline S
. Both these parameters are pipeline-specific.
Focusing on the “window of vulnerability”, T
, this depends on the size of the pipeline’s state — the larger the state the longer it will take to load it during recovery — , its parallelism, and its consistency requirements. Computing T
accurately can be tedious, as it requires historical data of previous runs of the same pipeline with the same parallelism and with the same state/node/runtime characteristics.
Nevertheless, knowing the factors that affect the ability of our pipeline to make progress in the face of failures may help us extract some useful hints on how to reduce the probability of pipeline restarts even for large scale jobs. For example, we have not discussed the impact of allowing a node to host more than one task of the same pipeline. This number of tasks a node can accommodate in Flink is controlled by the number of slots per task manager. Collocating tasks on nodes can reduce the number of nodes that execute tasks of the pipeline and thus reduce the S
in Equation 4, or move to a lower curve in Figure 1.
On a higher level, another outcome of this study is that although checkpointing may lead to a fast "happy path" as it has minimal interference with the normal record-processing path, it ties all the tasks of a pipeline together. This may have negative consequences as the scale grows. Given this, one could think of different fault-tolerance strategies that decouple tasks from one another and see if their potential overheads are compensated by potential performance gains during recovery. It can even be that small to medium-parallelism pipelines have more to benefit from a checkpointing-like mechanism whereas larger-scale deployments could opt for a more decoupled architecture.
When it comes to performance at large scale, latency / throughput measurements do not tell the whole story. As failures become the rule rather than the exception, fault-tolerance plays an important role not only for correctness but also for guaranteeing that the computation is able to make any progress at all. Here we studied Flink’s checkpointing mechanism and showed that its ability to make progress can be hindered as the pipeline’s “window of vulnerability” grows and as the number of its tasks increases. Different fault-tolerance strategies offer different tradeoffs so maybe in the future I will dive into another strategy to see if there is something interesting there.
Special thanks to Paris Carbone, Mike Winters, Fabian Hueske and Aljoscha Krettek for their comments when writing this post.
1: Although the latter has changed with unaligned checkpoints, this does not affect our findings.