-
Notifications
You must be signed in to change notification settings - Fork 40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Client now retries .map() failures #2734
base: main
Are you sure you want to change the base?
Conversation
When two items had the same timestamp, we would try to sort by the actual item value, which breaks for types that don't support comparison. Instead use a nonce when inserting an item, to ensure that we never have to compare the item value itself.
Though very unlikely outside of unit tests, it's possible to have an output returned before the corresponding retry context has been put into the `pending_outputs` dict.
Once the input queue filled up, we had no more room to put pending retries. And since we had no more room to put retries, we stopped fetching new outputs. And since we stopped fetching new outputs, the server stopped accepting new inputs. As a result, the input queue would never burn down. Instead, use a semaphore to ensure we never have more than 1000 items outstanding.
Instead of using a priority queue, just use the event loop to schedule retries in the future. This significantly simplifies the implementation and makes it much more like the original. Note that we still do have a semaphore that ensures that no more than 1K inputs are in flight (i.e., sent to the server but not completed).
d46cc29
to
9c5d968
Compare
There were some unit tests on the priority queue that could be restored: |
modal/parallel_map.py
Outdated
if timestamp_seconds == self._MAX_PRIORITY: | ||
return None | ||
await self._queue.put((timestamp_seconds, idx)) | ||
await asyncio.sleep(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think using an asyncio.Condition
like the previous implementation is better than asyncio.sleep(1)
since the latter becomes a busy wait.
modal-client/modal/_utils/async_utils.py
Lines 806 to 810 in afdc7f8
# wait until either the timeout or a new item is added | |
try: | |
await asyncio.wait_for(self.condition.wait(), timeout=sleep_time) | |
except asyncio.TimeoutError: | |
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah good point. I'll bring back that original implementation.
Describe your changes
Closes SVC-180.
Adds client retries to .map(). We already did this to .remote() in this PR: #2403
For now, retries are enabled only if the
MODAL_CLIENT_RETRIES
flag is set to true.Check these boxes or delete any item (or this section) if not relevant for this PR.
Note on protobuf: protobuf message changes in one place may have impact to
multiple entities (client, server, worker, database). See points above.
Changelog