-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Constrained spill #5543
Constrained spill #5543
Conversation
The whole |
Co-authored-by: crusaderky <[email protected]>
… into constrained_spill
Once dask/zict#48 is merged this is ready for review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add test in test_worker.py
distributed/spill.py
Outdated
pickled = self.dump(value) | ||
pickled_size = sum(len(frame) for frame in pickled) | ||
|
||
if self.max_weight and self.total_weight + pickled_size > self.max_weight: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if self.max_weight and self.total_weight + pickled_size > self.max_weight: | |
if self.max_weight is not False and self.total_weight + pickled_size > self.max_weight: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I'm missing something, but what is the point of explicitly asking for self.max_weight is not False and ...
, when if self.max_weight and ...
will only be True when not False and a value is provided.
distributed/tests/test_spill.py
Outdated
buf["a"] = a | ||
assert not buf.memory | ||
assert not buf.fast.weights | ||
assert set(buf.disk) == {"a"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert set(buf.disk) == {"a"} | |
assert buf.disk.keys() == {"a"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used this assert set(buf.disk) == {"a"}
to be consistent with what already existed in tests. This same pattern is followed in all the zict
tests.
Co-authored-by: crusaderky <[email protected]>
Co-authored-by: crusaderky <[email protected]>
Co-authored-by: crusaderky <[email protected]>
Co-authored-by: crusaderky <[email protected]>
Co-authored-by: crusaderky <[email protected]>
Further code review: ncclementi#3 |
This PR breaks keys update with zict 2.0. Please fix. |
As @crusaderky mentions, this PR is breaking with zict 2.0. I implemented changes that work both versions of zict, although I'm unsure if this is the best way of handling compatibility Looking at the CI there were two tests failing.
and with the version of zict 2.0, those keys are dropped, hence the error. On Is the solution to skip this test for zict 2.0? That would mean that having a bad key would stop the cluster, or do we want to adapt the code to behave differently depending on which version of zict we have?
The question is do we skip this test, or do we want to adapt the test to behave differently depending on which version of zict we have? |
Co-authored-by: crusaderky <[email protected]>
distributed/spill.py
Outdated
assert key not in self.weight_by_key | ||
else: | ||
self.d.pop(key, 0) | ||
self.total_weight -= self.weight_by_key.pop(key, PickledSize(0, 0)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@crusaderky it seems this is breaking the linting, what is PickledSize in here, it's not defined?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my bad, it's from #5805
Merging with a regression in test_memory, which is fixed by #5805 |
IMPORTANT NOTE🚨 🚨 🚨 🚨 🚨 🚨 |
Based on #5529
Closes #5364
Blocked by dask/zict#48
Followed up by #5805
Currently with this PR and the dask/zict#47 things work except when
self.weight(key, value) > self.n
since it never triggers the changes made on LRU, it will try to write toslow
according to thishttps://github.com/dask/zict/blob/b760997b98aad9aca8ab37fc4e8b5bb06809283d/zict/buffer.py#L88-L91
It will raise an exception, and ultimately it'll pass and we will lose the keys, because of this.
https://github.com/ncclementi/distributed/blob/67d067e34958c319c21d37cc50959b813af72fe3/distributed/spill.py#L33-L45
One option is to modify the
zict.Buffer
setitem
to catch the exception and if it does write tofast
as shown in dask/zict#48This ultimately handles the problem but it tries to move the keys multiple times, raising Exception up to 3 times in the case of two keys, and I don't think this is the desired behavior.
For example adding the try/except on the Buffer setitem we see:
Setting some print around to track the key, I see:
Which results with
But I don't think we want to experience all that moving.
The second option would be to modify the SpillBuffer setitem to do something similar than the Buffer but when the weight of the key is bigger than both target and max_spill to write it on fast. However, I wasn't able to find a way to get the size the key on disk, something like what we do here https://github.com/ncclementi/distributed/blob/67d067e34958c319c21d37cc50959b813af72fe3/distributed/spill.py#L89-L92 but before hitting the setitem on slow.
cc: @crusaderky do you have any ideas or suggestions. ?