Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Constrained spill #5543

Merged
merged 127 commits into from
Feb 16, 2022
Merged

Constrained spill #5543

merged 127 commits into from
Feb 16, 2022

Conversation

ncclementi
Copy link
Member

@ncclementi ncclementi commented Nov 23, 2021

Based on #5529
Closes #5364
Blocked by dask/zict#48
Followed up by #5805

Currently with this PR and the dask/zict#47 things work except when self.weight(key, value) > self.n since it never triggers the changes made on LRU, it will try to write to slow according to this
https://github.com/dask/zict/blob/b760997b98aad9aca8ab37fc4e8b5bb06809283d/zict/buffer.py#L88-L91

It will raise an exception, and ultimately it'll pass and we will lose the keys, because of this.
https://github.com/ncclementi/distributed/blob/67d067e34958c319c21d37cc50959b813af72fe3/distributed/spill.py#L33-L45

One option is to modify the zict.Buffer setitem to catch the exception and if it does write to fast as shown in dask/zict#48

    def __setitem__(self, key, value):
        # Avoid useless movement for heavy values
        if self.weight(key, value) <= self.n:
            if key in self.slow:
                del self.slow[key]
            self.fast[key] = value
        else:
            if key in self.fast:
                del self.fast[key]
            try:
                self.slow[key] = value
            except Exception:
                self.fast[key] = value

This ultimately handles the problem but it tries to move the keys multiple times, raising Exception up to 3 times in the case of two keys, and I don't think this is the desired behavior.

For example adding the try/except on the Buffer setitem we see:

buf = SpillBuffer("TEST_buff", target=200, max_spill=300)
a, b, c = "a" * 100, "b" * 200

buf["a"] = a #this goes to fast no problem

#try to add b
buf["b"] = b  #this bigger than target and it's bigger than max spill

Setting some print around to track the key, I see:

distributed.spill - WARNING - Spill file on disk reached capacity; keeping data in memory
distributed.spill - WARNING - Spill file on disk reached capacity; keeping data in memory
distributed.spill - WARNING - Spill file on disk reached capacity; keeping data in memory

pickled_size= 329
Im here in BUFFER line 96

Im here in LRU line 72
key= 'b'

Im here in BUFFER line 60
key= 'b'

pickled_size= 329
Im here in LRU line 77

Im here in LRU line 65

k= 'a' priority= 1
I'm here LRU line 98

Im here in BUFFER line 60
key= 'a'

pickled_size= 229
k= 'a' priority= 1 weight= 149
Im here in LRU line 65

k= 'b' priority= 2
I'm here LRU line 98

Im here in BUFFER line 60
key= 'b'

pickled_size= 329
I'm here LRU line 101

Which results with

>>> set(buf.fast)
{'b'}
>>> set(buf.slow)
{'a'}

But I don't think we want to experience all that moving.

The second option would be to modify the SpillBuffer setitem to do something similar than the Buffer but when the weight of the key is bigger than both target and max_spill to write it on fast. However, I wasn't able to find a way to get the size the key on disk, something like what we do here https://github.com/ncclementi/distributed/blob/67d067e34958c319c21d37cc50959b813af72fe3/distributed/spill.py#L89-L92 but before hitting the setitem on slow.

cc: @crusaderky do you have any ideas or suggestions. ?

@ncclementi ncclementi mentioned this pull request Nov 23, 2021
1 task
distributed/spill.py Outdated Show resolved Hide resolved
distributed/spill.py Outdated Show resolved Hide resolved
@crusaderky
Copy link
Collaborator

crusaderky commented Nov 24, 2021

This ultimately handles the problem but it tries to move the keys multiple times, raising Exception up to 3 times in the case of two keys, and I don't think this is the desired behavior.

The whole Buffer.__setitem__ looks redundant with LRU.__setitem__. You should be able to just replace the former with a one-liner self.fast[key] = value.
This gets rid of 1 of the 3 exceptions.
See my comment on dask/zict#47 on why you're getting the second one.

@ncclementi ncclementi marked this pull request as ready for review November 29, 2021 22:21
@ncclementi
Copy link
Member Author

Once dask/zict#48 is merged this is ready for review.

cc: @jrbourbeau @crusaderky

Copy link
Collaborator

@crusaderky crusaderky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add test in test_worker.py

distributed/distributed-schema.yaml Outdated Show resolved Hide resolved
distributed/distributed.yaml Outdated Show resolved Hide resolved
distributed/spill.py Outdated Show resolved Hide resolved
distributed/spill.py Outdated Show resolved Hide resolved
pickled = self.dump(value)
pickled_size = sum(len(frame) for frame in pickled)

if self.max_weight and self.total_weight + pickled_size > self.max_weight:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if self.max_weight and self.total_weight + pickled_size > self.max_weight:
if self.max_weight is not False and self.total_weight + pickled_size > self.max_weight:

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing something, but what is the point of explicitly asking for self.max_weight is not False and ..., when if self.max_weight and ... will only be True when not False and a value is provided.

distributed/tests/test_spill.py Show resolved Hide resolved
buf["a"] = a
assert not buf.memory
assert not buf.fast.weights
assert set(buf.disk) == {"a"}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert set(buf.disk) == {"a"}
assert buf.disk.keys() == {"a"}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used this assert set(buf.disk) == {"a"} to be consistent with what already existed in tests. This same pattern is followed in all the zict tests.

distributed/tests/test_spill.py Outdated Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
@crusaderky
Copy link
Collaborator

Further code review: ncclementi#3

crusaderky added a commit to crusaderky/distributed that referenced this pull request Feb 14, 2022
@crusaderky
Copy link
Collaborator

crusaderky commented Feb 14, 2022

This PR breaks keys update with zict 2.0. Please fix.

@ncclementi
Copy link
Member Author

As @crusaderky mentions, this PR is breaking with zict 2.0. I implemented changes that work both versions of zict, although I'm unsure if this is the best way of handling compatibility

Looking at the CI there were two tests failing.

  1. test_worker.py::test_fail_write_to_disk_target_1
    The problem here is that we are checking that the key remains in fast when an error is raised see https://github.com/ncclementi/distributed/blob/4591390549ed2192e3ccdcb84249ba52fedc367e/distributed/spill.py#L138-L140

and with the version of zict 2.0, those keys are dropped, hence the error. On spill.py we have couple of places where we do assert key_e in self.fast and also we also try to delete the key, which is a problem with zict 2.0 as that key doesn't exist.

Is the solution to skip this test for zict 2.0? That would mean that having a bad key would stop the cluster, or do we want to adapt the code to behave differently depending on which version of zict we have?

  1. test_worker.py::test_fail_write_to_disk_target_2
    The issue is similar than before, with zict 2.0 we lose the key, hence we have a failure in assert set(a.data.memory) == {"x", "y"}

The question is do we skip this test, or do we want to adapt the test to behave differently depending on which version of zict we have?

distributed/spill.py Show resolved Hide resolved
distributed/spill.py Outdated Show resolved Hide resolved
distributed/spill.py Outdated Show resolved Hide resolved
distributed/spill.py Outdated Show resolved Hide resolved
distributed/spill.py Outdated Show resolved Hide resolved
distributed/spill.py Outdated Show resolved Hide resolved
assert key not in self.weight_by_key
else:
self.d.pop(key, 0)
self.total_weight -= self.weight_by_key.pop(key, PickledSize(0, 0))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@crusaderky it seems this is breaking the linting, what is PickledSize in here, it's not defined?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad, it's from #5805

distributed/spill.py Outdated Show resolved Hide resolved
distributed/spill.py Outdated Show resolved Hide resolved
crusaderky added a commit to crusaderky/distributed that referenced this pull request Feb 15, 2022
@crusaderky
Copy link
Collaborator

Merging with a regression in test_memory, which is fixed by #5805
All other test failures are unrelated.

@crusaderky crusaderky merged commit b6e637f into dask:main Feb 16, 2022
@crusaderky
Copy link
Collaborator

IMPORTANT NOTE

🚨 🚨 🚨 🚨 🚨 🚨
All open PRs in dask/distributed will start failing on CI on Python 3.9 due to (deliberate) forward incompatibility with zict git tip. The solution is to merge from main.
🚨 🚨 🚨 🚨 🚨 🚨

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Spill to constrained disk space
4 participants