-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core][cgraph] Collapse other params into max_inflight_executions and adjust execution_index counting #49565
base: master
Are you sure you want to change the base?
Conversation
Signed-off-by: dayshah <[email protected]>
python/ray/dag/compiled_dag_node.py
Outdated
f"buffered results is {self._max_buffered_results}; call " | ||
"ray.get() on previous CompiledDAGRefs to free them up " | ||
"from buffer." | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cache execution results is used by both async and standard, so if we put the check here it will do it for both
Signed-off-by: dayshah <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
asyncio_max_queue_size
and max_buffered_results
have different definitions.
-
asyncio_max_queue_size
represents the maximum number of "inputs" that have not yet been written into the DAG. -
max_buffered_results
is the number of execution "results" that have already been retrieved by the driver but have not yet been consumed by the users.
Hence, removing asyncio_max_queue_size
is a breaking change. It's fine to have a breaking change because the RayCG API is currently in alpha status, but we should have a clear plan for handling the gap (e.g., using max_inflight_executions
to prevent users from sending unlimited inputs to the asyncio queue).
Would you mind writing more details in the PR description and adding tests if needed?
@kevin85421 @stephanie-wang Correct me if wrong, but I thought we want to collapse both |
ya that makes sense actually because getting the results actually removes from the result_buffer and all that max_buffered_results controls is the result buffer size which can only go up to max_inflight_executions anyways, so will remove both and only keep max_inflight_executions. |
Signed-off-by: dayshah <[email protected]>
Signed-off-by: dayshah <[email protected]>
updated with description of comparison, I think the test that exists currently for max_inflight_executions seems ok |
As I recall, this was not the conclusion, but I think removing both and using only |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Otherwise LGTM
Signed-off-by: dayshah <[email protected]>
Signed-off-by: dayshah <[email protected]>
Signed-off-by: dayshah <[email protected]>
adjusted execution_index counting, since before the counts would be off by one, it still worked since we only checked > not >= which it should've been. Also added tests for both exceptions. |
Signed-off-by: dayshah <[email protected]>
ref3 = compiled_dag.execute(1) | ||
# to show variables are being used and avoid destruction since | ||
# CompiledDagRef __del__ will release buffers and | ||
# increment _max_finished_execution_index |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems prone to issues, ex. if destruction is not in the same order execute was called
ray/python/ray/experimental/compiled_dag_ref.py
Lines 103 to 104 in ea4e315
if not self._ray_get_called: | |
self._dag.release_output_channel_buffers(self._execution_index) |
The continuation of this in the SynchronousReader also just releases all buffers, so anything that was executed up to that point would also be gone ex. this script fails
a = Actor.remote(0)
with InputNode() as inp:
dag = a.sleep.bind(inp)
dag = a.inc.bind(dag)
compiled_dag = dag.experimental_compile()
ref = compiled_dag.execute(1)
compiled_dag.execute(2)
ray.get(ref)
this is also inconsistent behavior vs. async
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that looks problematic. cc @kevin85421 who reviewed the skip deserialization PR.
One idea is to deserialize the prior indexes and save the results to buffer, and only skip deserialization for the exact index:
A ref goes out of scope => first check if result in buffer, if so delete, otherwise do above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya that sounds ok, there's the overhead of needing to get the prior indices though. But not really any way to get around that. One thing is that python might not call del in order so we may be forcing deserialization for refs that are also being deleted but not sure if we can do anything about that.
Should I open an issue to track, is it a beta blocker or just do this in a follow-up pr, no need for issue creation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should ensure correctness first. But open an issue for optimization. We can leave the optimization out of beta for now.
Signed-off-by: dayshah <[email protected]>
Signed-off-by: dayshah <[email protected]>
Signed-off-by: dayshah <[email protected]>
f"{self._max_inflight_executions}. Retrieve the output using " | ||
"ray.get before submitting more requests or increase " | ||
"`max_inflight_executions`. " | ||
f"You cannot execute more than {self._max_inflight_executions} " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe change to "The compiled DAG cannot execute more than ..." instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the user is the one doing the executing and calling execute though. Can do something like
you can't execute the compiled graph more than x times
, but I feel like it's pretty obvious it's a compiled graph execution because the error will point to the compiled_graph.execute
Signed-off-by: dayshah <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Others LGTM
Signed-off-by: dayshah <[email protected]>
Why are these changes needed?
This removes the
asyncio_max_queue_size
parameter and themax_buffered_results
parameter, asmax_inflight_executions
solves for a similar case.asyncio_max_queue_size
restricts the size of the queue in the AwaitableBackgroundWriter to that size. Items are added to the queue whenever execute is called. Items are removed from the queue as the writer runs, popping of an item, and then running.max_inflight_executions
gets checked right before the write happens, so we should never hit a case where the queue size actually goes overmax_inflight_executions
.max_buffered_results
checks against the size of the result buffer. An item is removed form the result buffer whenever get is called on the dagref or await is called on a future. This doesn't exactly correlate withmax_inflight_executions
because that checks againstexecution_index[possible newest one] - max_finished_execution_index
. This doesn't correlate exactly with the result_buffer size as things are only popped from the result buffer when get is called on it, while max_finished_execution index will just be the max index that get was called on.Related issue number
Closes #49051
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.