-
Notifications
You must be signed in to change notification settings - Fork 175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CHORE] Refactor RayRunner so that we can add tracing #3163
Conversation
CodSpeed Performance ReportMerging #3163 will not alter performanceComparing Summary
|
if dispatches_allowed == 0 or next_step is None: | ||
# Break the dispatch batching/dispatch loop if no more dispatches allowed, or physical plan | ||
# needs work for forward progress | ||
if dispatches_allowed == 0 or not has_next: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOTE: behavior change here. We now use a has_next variable that is returned from self._construct_dispatch_batch to figure out whether or now we should break the DispatchBatching -> Dispatch loop.
Previously the scheduling loop had to keep track of next_step which was super weird and unwieldy, and caused us to call next(tasks) in a bunch of random places scattered throughout the loop.
num_returns=1, | ||
timeout=None, | ||
fetch_local=False, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOTE: slight behavior change here compared to previous awaiting logic
I call a ray.wait
here but discard the outputs to just wait on one item to be ready with timeout=None
.
Then I subsequently call ray.wait
again with a 0.01
timeout to actually retrieve a batch of ready tasks.
I think this logic is a little easier to follow, and gets rid of the weird loop over ("next_one", "next_batch")
that we had earlier. Also shouldn't have too much of a performance impact.
self.results_by_df[result_uuid].put(item, timeout=0.1) | ||
break | ||
except Full: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got rid of these weird locally-defined callbacks by making them methods, so that I can call them elsewhere without having to pass them around.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3163 +/- ##
==========================================
- Coverage 79.00% 78.61% -0.40%
==========================================
Files 634 634
Lines 76943 77961 +1018
==========================================
+ Hits 60789 61289 +500
- Misses 16154 16672 +518
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This PR refactors the RayRunner so that it is easier to add tracing.
I also added more docstrings to make it clearer about what is happening.
Code Changes
I highlight changes that were made in the code for easier review.
next_step
state, and instead expose a newhas_next
return variable fromself._construct_dispatch_batch
. This cleans up the code because iteration on the physical plan now ONLY happens inside ofself._construct_dispatch_batch
instead of being scattered across the scheduling loop.self._await_tasks
by explicitly waiting on one task (withtimeout=None
) to first wait for any task to complete, and then perform an actual wait on all tasks (withtimeout=0.01
) to actually retrieve tasks that are ready.self._is_active
andself._place_in_queue
into methods, instead of them being locally-defined functions