-
-
Notifications
You must be signed in to change notification settings - Fork 722
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Deserialization of compressed data is sluggish and causes memory flares #7433
Comments
Notably, a lot of the tests in coiled-runtime start from |
Replicated on Python: 3.11.0
|
TL,DR; It's death by deep-copy. We're spending 85ms doing actual decompression and 49ms doing deep-copies.
The 128 MiB input buffer is split in two frames (
Fixing this will require an upstream PR that lets us pass an If I increase
We can see that numpy is performing a deep copy - which should not happen. So even here we should be able to shave off 19ms off in numpy deserialization and 9ms in PyBytes_FromStringAndSize (9ms is the duratio of a raw deepcopy on my computer). |
Worth noting that blosc (removed in #5269) does offer deserialization to already existing memory (albeit in an unpythonic and unsafe way). |
This works, thanks to #5208: import dask
import distributed
import numpy
c = distributed.Client(n_workers=2, memory_limit="8 GiB")
x = c.submit(numpy.random.random, 4 * 2**30 // 8, workers=[0])
y = c.submit(lambda x: None, x, workers=[1]) Replace |
Another important note: |
I would be curious to see if turning off compression entirely affects the benchmark suite. That might give us a better holistic understanding of how much impact this has. Is that easy to test? |
Yes but a lot of data in the benchmark suite is created with numpy.random.random - which is uncompressible - so it's not necessarily (I'd dare say, unlikely) representative of real world data. |
I think the whole thing can be dropped from 143ms to 18ms. I think we were too hasty to drop support for blosc - the performance benefit is huge. import pickle
from time import time
import blosc
def compress(frame):
if isinstance(frame, pickle.PickleBuffer):
mv = memoryview(frame)
return blosc.compress(mv, clevel=5, cname="blosclz", typesize=mv.itemsize)
else:
return blosc.compress(frame, clevel=5, cname="blosclz")
# a = numpy.linspace(0, 1, 128 * 2**20 // 8)
a = numpy.ones(128 * 2**20 // 8)
t0 = time()
frames = []
pik = pickle.dumps(a, protocol=5, buffer_callback=frames.append)
t1 = time()
frames.insert(0, pik)
print([len(memoryview(frame)) for frame in frames])
cframes = [compress(frame) for frame in frames]
print([len(frame) for frame in cframes])
t2 = time()
frames2 = [blosc.decompress(frame) for frame in cframes]
t3 = time()
a2 = pickle.loads(frames2[0], buffers=frames2[1:])
t4 = time()
numpy.testing.assert_array_equal(a, a2)
print("dumps", t1 - t0)
print("compress", t2 - t1)
print("decompress", t3 - t2)
print("loads", t4 - t3)
Same as above, but with
linspace does not compress at all with lz4, while it compresses to 6% with blosc. On a separate topic - what's the purpose of sharding? I can't see how it makes us avoid any memory spikes anyway, as we always have all the sharded frames in memory before we do anything with them? |
While it's true that we won't be able to see benefits of compressor we will
be able to see the costs pretty clearly. If we turn off compression and
nothing changes then we gain the option of ignoring this for a while.
Sharding is important as data sizes grow to 2GB, when many algorithms break
down. Below that we're guessing that it's nice (smoother maybe).
…On Fri, Dec 23, 2022, 5:04 PM crusaderky ***@***.***> wrote:
I think the whole thing can be dropped from 143ms to 18ms. I think we were
too hasty to drop support for blosc - the performance benefit is huge.
import picklefrom time import timeimport blosc
def compress(frame):
if isinstance(frame, pickle.PickleBuffer):
mv = memoryview(frame)
return blosc.compress(mv, clevel=5, cname="blosclz", typesize=mv.itemsize)
else:
return blosc.compress(frame, clevel=5, cname="blosclz")
a = numpy.linspace(0, 1, 128 * 2**20 // 8)# a = numpy.ones(128 * 2**20 // 8)
t0 = time()frames = []pik = pickle.dumps(a, protocol=5, buffer_callback=frames.append)t1 = time()frames.insert(0, pik)print([len(memoryview(frame)) for frame in frames])cframes = [compress(frame) for frame in frames]print([len(frame) for frame in cframes])t2 = time()frames2 = [blosc.decompress(frame, as_bytearray=True) for frame in cframes]t3 = time()a2 = pickle.loads(frames2[0], buffers=frames2[1:])t4 = time()numpy.testing.assert_array_equal(a, a2)
print("dumps", t1 - t0)print("compress", t2 - t1)print("decompress", t3 - t2)print("loads", t4 - t3)
[120, 16777216]
[136, 1069658]
dumps 0.0005013942718505859
compress 0.006549835205078125
decompress 0.012061119079589844
loads 0.0056269168853759766
On a separate topic - what's the purpose of sharding? I can't see how it
makes us avoid any memory spikes anyway, as we always have all the sharded
frames in memory before we do anything with them?
—
Reply to this email directly, view it on GitHub
<#7433 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTFJQDZDUOPN3EKV5BTWOY4XBANCNFSM6AAAAAATHEYDAI>
.
You are receiving this because you commented.Message ID:
***@***.***>
|
Actually, we won't.
Are you talking about the network stack? |
Ah, good point
In [1]: import lz4.block
In [2]: lz4.block.compress(b'0' * int(3e9))
---------------------------------------------------------------------------
OverflowError Traceback (most recent call last)
Cell In [2], line 1
----> 1 lz4.block.compress(b'0' * int(3e9))
OverflowError: Input too large for LZ4 API |
Then I think we should seriously reconsider blosc. Besides being 5x faster, it's also the only API that allows decompressing to an already existing bytearray. With sharding upstream of compression, that's the only way to avoid deep copies. |
To recap the discussion: both the time spent and the memory flares are caused by the conjunction of the following:
Additionally,
|
I don't strongly object. If we add blosc again I think that we should try to add it in a normal way similar to other compressors, rather than the way that it was implemented before, as part of the serializer. This caused frequent complications.
My recollection is that it's faster than 1GB/s, and so probably very cheap relative to other things in the pipeline. "Slower than blosc" doesn't seem motivating to me. I think that the thing to beat here is "a common bottleneck in common workloads".
I agree that having benchmark results would be helpful in motivating the prioritization of this work. Right now the original motivation (dot-product with spill on compressible data) feels a little niche to me given the amount of effort. Of course, if you're having fun then don't let me stop you 🙂 |
Of course, maybe this actually applies in lots of common cases (every time we move compressible data bigger than a few megabytes and every time we spill) and has been having a large but silent impact. If so, hooray 🎉. In general, +1 to benchmarking. I would hope that this would come up in some of the realistic benchmarks? Vorticitiy? Dataframe joins? |
400 MiB/s compression Which is, in fact, extremely noticeable - discussion and code on #7655. |
If lz4 is installed and a chunk can be compressed efficiently, it is compressed automatically.
This happens both in network comms (configurable) and upon spilling (not configurable).
While working on coiled/benchmarks#629, I found out that deserializing a compressed frame is abysmally slow:
When deserialization is triggered by network comms, it is offloaded to a separate thread for any message larger than 10 MiB (
distributed.comm.offload
). Note that a message will contain multiple small keys if possible.I did not test if the GIL is released while this happens.
When deserialization is triggered by unspilling, the whole event loop is temporarily blocked.
Microbenchmark
Serializing a 128 MiB compressible numpy array takes 9.96ms; there's a modest overhead on top of the raw call to lz4, which takes 9.75ms.
Deserializing the same takes a whopping 144ms. The raw lz4 decompress takes an already huge 83ms.
At the very least we should be able to trim down the overhead. Also the raw decompression speed is suspiciously slow - typically, compression takes much longer than decompression so I suspect that something must be going on upstream.
Output:
serialize_bytelist
anddeserialize_bytes
in the code above are exactly whatdistributed.spill
uses. However, I measured the timings indistributed.comm.tcp
and I got the same number (134ms).The text was updated successfully, but these errors were encountered: