Skip to content

Commit

Permalink
Reorganized paragraphs
Browse files Browse the repository at this point in the history
  • Loading branch information
akalash committed Apr 25, 2022
1 parent 4fb6076 commit 2f5be59
Showing 1 changed file with 28 additions and 78 deletions.
106 changes: 28 additions & 78 deletions _posts/2022-04-05-buffer-debloat.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,14 @@ excerpt: Apache Flink adjust buffer size automatically in order to keep a balanc
---

## What is this article about?
One of the most important features of Flink is providing a streaming experience with maximum possible throughput and minimum possible memory overhead.
What does it actually mean? Let’s take a look at an ideal scenario:
One of the most important features of Flink is providing a streaming experience with maximum possible throughput.
In other words, the main target to keep the operator always busy and avoid idleness.
The obvious way to achieve that is using buffers which collect records while incoming stream rate grater than processing speed and utilize them when necessary. This approach helps to handle temporal spikes in data flow
but it also brings complexity in terms of configuration of buffers since on the one hand,
then bigger buffer size then longer spike can be handled, but on the other hand, the memory has limited capacity and
it should be configured in optimal way.

Suppose we have the job and environment that provide the constant time for processing and sending data.
In this case, the job processing looks like this:

<div class="row front-graphic">
<img src="{{ site.baseurl }}/img/blog/2022-03-28-buffer-debloat/ideal_case.gif"/>
</div>

As the picture shows, there is no delay between processing the two neighbor records since processing and sending times are equal.
This allows system to keep maximum possible throughput.

Unfortunately, it is obviously not reachable conditions in real life since all operators have different processing times(due to different logic, different record size, etc.),
and there are also different types of issues with the environment(network, server, software) that can lead to unpredictable delays.

<div class="row front-graphic">
<img src="{{ site.baseurl }}/img/blog/2022-03-28-buffer-debloat/simple_problem.gif"/>
</div>

As result, it is not trivial to support the maximum possible throughput due to different types of delays.

This article explains how Flink use different approaches to level out different types of instabilities and minimize idleness of operator in order to keep the highest possible throughput.
This article explains how Flink can help to find good balance between the maximum throughput and the minimum memory usage.

## Network stack

Expand All @@ -42,8 +27,8 @@ Here we just recall a couple of important things which explains how they minimiz
### Network buffer

Logically Flink’s unit of data is a record that is handled in each subtask but the overhead for sending a single record
between subtasks is too high it’s why physically, Flink gather records into the network buffer
which is the smallest unit of sending data to subtasks(link for subtask).
between subtasks is too high it’s why physically, Flink buffers outbound records into so-called network buffers which are,
roughly speaking, byte arrays of limited size and represent the smallest unit of data that is then sent via network.
In fact, it is not always true that the network buffer contains several records.
Since the network buffer has statically configured size it depends on the size of records how many of them fit into the network buffer
or even one record is split into several buffers.
Expand Down Expand Up @@ -85,63 +70,27 @@ the buffer can be sent by timeout even if it isn't full and the queue contains t
But in general, the picture shows that despite the different speeds of processing data at different times the downstream unlikely will be idling due to short instabilities since it has buffered data in the queue for processing.
</div>

Is it resolve all of our problems? Or is something else left?

## What else can go wrong?

Unfortunately, the approach above has a number of drawbacks. But before we dive into it let’s remember a couple of things.
First, the meaning [backpressure](https://flink.apache.org/2021/07/07/backpressure.html) refers to the situation where a system is receiving data at a higher rate than it can process during a temporary load spike.
Secondly, a couple of notices about [checkpoints](https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/stateful-stream-processing/#checkpointing):

- [Checkpoint's barriers](https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/stateful-stream-processing/#barriers) for aligned checkpoints travel to the next subtask along with other network buffers in the same priority.
- All in-flight data for [unaligned checkpoints](https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/stateful-stream-processing/#unaligned-checkpointing) should be stored along with operators' states.

Now, let’s try to answer the question - how much in-flight data should be configured?
First, in-flight data is configured by the number of the network buffers and the size of this buffer.
Since the network buffer has a static size and this size can be only set in the configuration as well as the total number of buffers,
the maximum total size of all buffers can not be changed in runtime. According to this, we can consider two scenarios:

1. Suppose we configure the in-flight data to a relatively large number. Then during the high backpressure, all queues reach maximum capacity.
This means that in the case of high backpressure when all queues are full, the subtasks requires some time to handle all in-flight data. That leads:

- to high unpredictable checkpoint time for aligned checkpoint(since as we mentioned above checkpoint's barriers stuck in the same queue along with other buffers).
- to large stored checkpoint size for unaligned checkpoint since it should store all in-flight data.

2. If we configure the in-flight data to a small number. Then in case of network/load volatility,
the network buffer queue would be filled too fast when the subtask is busy and it would contain not enough data when the subtask is ready to process
which leads to a high idle time or in other words less throughput than possible.

<div class="row front-graphic">
<img src="{{ site.baseurl }}/img/blog/2022-03-28-buffer-debloat/checkpoint-barrier.png"/>
</div>

<div class="alert alert-info">
According to the picture, in case of network buffer size is large enough and Subtask B.1 is processing slow,
then the checkpoint barrier stuck in the network buffer queue for a while. But in case of small the network buffer size,
Subtask B.1 can process all buffers from the queue faster than Subtask A.1 produces the new network buffer and this network buffer
would be transferred to Subtask B.1 which can lead to the idling of B.1 and as result to decreasing of the throughput.
</div>

In conclusion, the main problem is achieving the optimal size of the in-flight in order to have the highest throughput along with minimum overhead.
## How to minimize the memory usage?

## What is the possible solution?
As was described above, the high throughput can be reached when the in-flight data is configured to a high value
at the same time memory is limited and it makes sense to configure it to the optimal size greater which the throughput won't be changed anyway.
Obviously, this value can be found manually in experimental way but let's discuss how Flink can help with that.

As we saw before, it is not possible to totally avoid in-flight data since it will lead to dramatically low throughput.
It is why one of the ideas is to focus on the time for which in-flight data can be processed rather than the size of this data.
Flink can be switched to focusing on the time for which in-flight data can be processed rather than the size of this data.
For example, instead of configuring 1GB of in-flight data per subtask,
it makes more sense to configure 1 second as the maximum expected time for processing all in-flight data on one subtask.
If we get back to our scenarios we will see that:

- In case of backpressure:
- The checkpoint time for aligned checkpoint become more predictable since we know that every task holds in-flight data
which can be processed for the configured time, rather than as in the past when it was impossible to predict
how much time is required for processing configured in-flight data size(it depends on the speed of processing which is changing a lot during the time)
- The in-fligh for unaligned checkpoint decreases the size for the store since during the backpressure the processing time is decreasing which means that for conformity the configured processing time Flink should keep less in-flight data
- In case of zero backpressure:
Then, the following behaviours will be observed:

- In case of [backpressure](https://flink.apache.org/2021/07/07/backpressure.html):
- The checkpoint time for [aligned checkpoint](https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/stateful-stream-processing/#checkpointing) become more predictable since we know that every task holds in-flight data
which can be processed for the configured time, which is important since [aligned checkpoint's barriers](https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/stateful-stream-processing/#barriers) travels along with other buffers in the same priority
- The size of files on disk for [unaligned checkpoint](https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/stateful-stream-processing/#unaligned-checkpointing)
is decreased since during the backpressure the processing time increases which means that for conformity the configured processing time Flink should keep less in-flight data
- In case of zero [backpressure](https://flink.apache.org/2021/07/07/backpressure.html):
- Zero backpressure means that the processing time is low enough and Flink can store more in-flight data for keeping the configured processing time
which makes the subtask prepared for possible instabilities.

Flink implements the feature of buffer debloating which automatically adjusts the size of the in-flight data depending on the current throughput
This Flink's feature is called buffer debloating that automatically adjusts the size of the in-flight data depending on the current throughput
which allows for keeping the processing time near the configured one.

## How does buffer debloating work?
Expand Down Expand Up @@ -171,15 +120,16 @@ It is why some specific cases can be handled worse than expected.
## Should I use it always?

The buffer debloat tries to predict the future load rate based on the current throughput. It can handle pretty well different types of spikes
and provide predictable checkpoint times. But since the buffer debloat has inertia in making decisions about new buffer size
and provide predictable [checkpoints](https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/stateful-stream-processing/#checkpointing) times.
But since the buffer debloat has inertia in making decisions about new buffer size
it is always a risk that adaption for new conditions would be not fast enough which can lead to lower throughput than theoretically expected.
In any case, the buffer debloating can be used as a default option but if it was proven that it works not so well in certain cases,
the feature can be disabled. The buffer debloating is especially helpful if you:

- Have the aligned checkpoint with volatile(or unexpectedly large) checkpoint time and want to do it more predictable
- Have the unaligned checkpoint with undesirable large checkpoint files and want to decrease the size of the files
- Have the [aligned checkpoint](https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/stateful-stream-processing/#checkpointing) with volatile(or unexpectedly large) checkpoint time and want to do it more predictable
- Have the [unaligned checkpoints](https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/stateful-stream-processing/#unaligned-checkpointing) with undesirable large checkpoint files on disk and want to decrease the size of these files

Despite the buffer debloat working well enough in most cases and can be used as the solution by default, it is not perfect has a couple of [limitations](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/network_mem_tuning/#limitations):
Despite the buffer debloat working well enough in most cases and can be used as the solution by default, it is not perfect and it has a couple of [limitations](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/network_mem_tuning/#limitations):

### Buffer size and number of buffers

Expand Down

0 comments on commit 2f5be59

Please sign in to comment.