Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create MutableMapping for automatic compression #3656

Open
mrocklin opened this issue Mar 28, 2020 · 8 comments · May be fixed by #3702
Open

Create MutableMapping for automatic compression #3656

mrocklin opened this issue Mar 28, 2020 · 8 comments · May be fixed by #3702
Labels

Comments

@mrocklin
Copy link
Member

In some workloads with highly compressible data we would like to trade off some computation time for more in-memory storage automatically. Dask workers store data in a MutableMapping (the superclass of dict). So in principle all we would need to do is make a MutableMapping subclass that overrides the getitem and setitem methods to compress and decompress data on demand.

This would be an interesting task for someone who wants to help Dask, wants to learn some internals, but doesn't know a lot just yet. I'm marking this as a good first issue. This is an interesting and useful task that doesn't require deep incidental Dask knowledge.

Here is a conceptual prototype of such a MutableMapping. This is completely untested, but maybe gives a sense of how I think about this problem. It's probably not ideal though so I would encourage others to come up with their own design.

import collections
from typing import Dict, Tuple, Callable

class TypeCompression(collections.MutableMapping):
    def __init__(
        self,
        types: Dict[type, Tuple[Callable, Callable]],
        storage=dict
    ):
        self.types = type
        self.storage = collections.defaultdict(storage)

    def __setitem__(self, key, value):
        typ = type(key)
        if typ in self.types:
            compress, decompress = self.types[typ]
            value = compress(value)
        self.storage[typ] = value

    def __getitem__(self, key):
        for typ, d in self.storage.items():
            if key in d:
                value = d[key]
                break
        else:
            raise KeyError(key)

        if typ in self.types:
            compress, decompress = self.types[typ]
            value = decompress(value)

        return value

This came up in #3624 . cc @madsbk and @jakirkham from that PR. cc also @eric-czech who was maybe curious about automatic compression/decompression.

People looking at compression might want to look at and use Dask's serializations and comrpession machinery in distributed.protocol (maybe start by looking at the dumps, serialize and maybe_compress functions).

@mrocklin mrocklin added the good first issue Clearly described and easy to accomplish. Good for beginners to the project. label Mar 28, 2020
@mrocklin
Copy link
Member Author

Also cc @prasunanand and @andersy005 who have both asked about good first issues in the past. I think that this would be fun.

@prasunanand
Copy link
Contributor

@mrocklin I would love to work in it. :)

@mrocklin
Copy link
Member Author

mrocklin commented Mar 28, 2020 via email

@prasunanand
Copy link
Contributor

prasunanand commented Apr 9, 2020

Hi, I need a little help.

Do I need to modify the logic in dumps, loads ( link ) ?

Does types in TypeCompress refer to int, double, etc. or to snappy, blosc, lz4 etc. ? If it refers to int, double, etc where are the corresponding compressors and decompressors ?

cc @jrbourbeau

@mrocklin
Copy link
Member Author

mrocklin commented Apr 9, 2020

Does types in TypeCompress refer to int, double, etc. or to snappy, blosc, lz4 etc. ?

You don't have to use the structure I started with. I encourage you to think about this on your own and how you would design it. If you blindly follow my design you probably won't develop a high level understanding of the problem. What I put up there was just an idea, but not a fully formed one, whoever solves this task will need to think a lot more about the problem than what I did.

@prasunanand prasunanand linked a pull request Apr 12, 2020 that will close this issue
@mrocklin
Copy link
Member Author

To add some more context here, this is an object that would replace the MutableMapping currently used in Worker.data. It would expect to receive any user generated Python object as a value. We would want to take those values, serialize, and maybe compress them when we put them into the underlying mapping.

So for example we would want something like the following to work:

x = np.ones(1000000)  # a large but compressible piece of data

d = MyMapping()
d["x"] = x  # put Python object into d
out = d["x"]    # get the object back out

assert str(out) == str(x)  # the two objects should match

# assuming here that the underlying bytes are stored in something like a `.storage` attribute, but this isn't required
# we check that the amount of actual data stored is small
assert sum(map(len, d.storage.values())) < x.nbytes

In Dask one would test this by putting it into a Worker

@pytest.mark.asyncio
async def test_compression():
    async with Scheduler() as s:
        async with Worker(s.address, data=MyMapping):
            async with Client(s.address, asynchronous=True) as c:
                x = da.ones((10000, 10000))
                y = await x.persist()  # put data in memory
                y = await (x + x.T).mean().persist()  # do some work
                assert sum(map(len, worker.data.storage.values())) < x.nbytes 

(None of the code here was tested, and may have bugs. I wouldn't trust it too much)

@fjetter
Copy link
Member

fjetter commented Apr 14, 2020

The mutable mapping looks like something which would be well suited for https://github.com/dask/zict The integration to distributed would then look similar to how spill-to-disk is implemented at the moment

@jakirkham
Copy link
Member

PR ( #3702 ) seems to be going in the right direction. Probably the best place to move this forward atm.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants