Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][cgraph] Collapse other params into max_inflight_executions and adjust execution_index counting #49565

Open
wants to merge 14 commits into
base: master
Choose a base branch
from

Conversation

dayshah
Copy link
Contributor

@dayshah dayshah commented Jan 3, 2025

Why are these changes needed?

This removes the asyncio_max_queue_size parameter and the max_buffered_results parameter, as max_inflight_executions solves for a similar case.

asyncio_max_queue_size restricts the size of the queue in the AwaitableBackgroundWriter to that size. Items are added to the queue whenever execute is called. Items are removed from the queue as the writer runs, popping of an item, and then running. max_inflight_executions gets checked right before the write happens, so we should never hit a case where the queue size actually goes over max_inflight_executions.

max_buffered_results checks against the size of the result buffer. An item is removed form the result buffer whenever get is called on the dagref or await is called on a future. This doesn't exactly correlate with max_inflight_executions because that checks against execution_index[possible newest one] - max_finished_execution_index. This doesn't correlate exactly with the result_buffer size as things are only popped from the result buffer when get is called on it, while max_finished_execution index will just be the max index that get was called on.

Related issue number

Closes #49051

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

f"buffered results is {self._max_buffered_results}; call "
"ray.get() on previous CompiledDAGRefs to free them up "
"from buffer."
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cache execution results is used by both async and standard, so if we put the check here it will do it for both

@dayshah dayshah added the go add ONLY when ready to merge, run all tests label Jan 3, 2025
Signed-off-by: dayshah <[email protected]>
Copy link
Member

@kevin85421 kevin85421 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

asyncio_max_queue_size and max_buffered_results have different definitions.

  • asyncio_max_queue_size represents the maximum number of "inputs" that have not yet been written into the DAG.

  • max_buffered_results is the number of execution "results" that have already been retrieved by the driver but have not yet been consumed by the users.

Hence, removing asyncio_max_queue_size is a breaking change. It's fine to have a breaking change because the RayCG API is currently in alpha status, but we should have a clear plan for handling the gap (e.g., using max_inflight_executions to prevent users from sending unlimited inputs to the asyncio queue).

Would you mind writing more details in the PR description and adding tests if needed?

@ruisearch42 ruisearch42 self-assigned this Jan 6, 2025
@ruisearch42
Copy link
Contributor

@kevin85421 @stephanie-wang Correct me if wrong, but I thought we want to collapse both max_buffered_results and asyncio_max_queue_size into max_inflight_executions?

@dayshah
Copy link
Contributor Author

dayshah commented Jan 6, 2025

@kevin85421 @stephanie-wang Correct me if wrong, but I thought we want to collapse both max_buffered_results and asyncio_max_queue_size into max_inflight_executions?

ya that makes sense actually because getting the results actually removes from the result_buffer and all that max_buffered_results controls is the result buffer size which can only go up to max_inflight_executions anyways, so will remove both and only keep max_inflight_executions.

@dayshah dayshah changed the title [core][cgraph] Remove asyncio_max_queue_size and use max_buffered_results for async [core][cgraph] Remove asyncio_max_queue_size and max_buffered_results Jan 6, 2025
@dayshah
Copy link
Contributor Author

dayshah commented Jan 6, 2025

asyncio_max_queue_size and max_buffered_results have different definitions.

  • asyncio_max_queue_size represents the maximum number of "inputs" that have not yet been written into the DAG.
  • max_buffered_results is the number of execution "results" that have already been retrieved by the driver but have not yet been consumed by the users.

Hence, removing asyncio_max_queue_size is a breaking change. It's fine to have a breaking change because the RayCG API is currently in alpha status, but we should have a clear plan for handling the gap (e.g., using max_inflight_executions to prevent users from sending unlimited inputs to the asyncio queue).

Would you mind writing more details in the PR description and adding tests if needed?

updated with description of comparison, I think the test that exists currently for max_inflight_executions seems ok

@kevin85421
Copy link
Member

@kevin85421 @stephanie-wang Correct me if wrong, but I thought we want to collapse both max_buffered_results and asyncio_max_queue_size into max_inflight_executions?

As I recall, this was not the conclusion, but I think removing both and using only max_inflight_executions would be better. The reason is that max_buffered_results is hard for users to understand, and it depends on the speed of the compiled graphs. Additionally, it is difficult for users to determine when to call ray.get because it depends on the processing speed of the DAG.

Copy link
Contributor

@ruisearch42 ruisearch42 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise LGTM

@dayshah
Copy link
Contributor Author

dayshah commented Jan 9, 2025

adjusted execution_index counting, since before the counts would be off by one, it still worked since we only checked > not >= which it should've been. Also added tests for both exceptions.

@dayshah dayshah changed the title [core][cgraph] Remove asyncio_max_queue_size and max_buffered_results [core][cgraph] Collapse parameters into max_inflight_executions and adjust execution_index counting Jan 9, 2025
@dayshah dayshah changed the title [core][cgraph] Collapse parameters into max_inflight_executions and adjust execution_index counting [core][cgraph] Collapse other params into max_inflight_executions and adjust execution_index counting Jan 9, 2025
ref3 = compiled_dag.execute(1)
# to show variables are being used and avoid destruction since
# CompiledDagRef __del__ will release buffers and
# increment _max_finished_execution_index
Copy link
Contributor Author

@dayshah dayshah Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems prone to issues, ex. if destruction is not in the same order execute was called

if not self._ray_get_called:
self._dag.release_output_channel_buffers(self._execution_index)

The continuation of this in the SynchronousReader also just releases all buffers, so anything that was executed up to that point would also be gone ex. this script fails

a = Actor.remote(0)
with InputNode() as inp:
    dag = a.sleep.bind(inp)
    dag = a.inc.bind(dag)
compiled_dag = dag.experimental_compile()
ref = compiled_dag.execute(1)
compiled_dag.execute(2)
ray.get(ref)

this is also inconsistent behavior vs. async

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that looks problematic. cc @kevin85421 who reviewed the skip deserialization PR.

One idea is to deserialize the prior indexes and save the results to buffer, and only skip deserialization for the exact index:
A ref goes out of scope => first check if result in buffer, if so delete, otherwise do above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya that sounds ok, there's the overhead of needing to get the prior indices though. But not really any way to get around that. One thing is that python might not call del in order so we may be forcing deserialization for refs that are also being deleted but not sure if we can do anything about that.

Should I open an issue to track, is it a beta blocker or just do this in a follow-up pr, no need for issue creation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should ensure correctness first. But open an issue for optimization. We can leave the optimization out of beta for now.

Signed-off-by: dayshah <[email protected]>
Signed-off-by: dayshah <[email protected]>
Signed-off-by: dayshah <[email protected]>
f"{self._max_inflight_executions}. Retrieve the output using "
"ray.get before submitting more requests or increase "
"`max_inflight_executions`. "
f"You cannot execute more than {self._max_inflight_executions} "
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe change to "The compiled DAG cannot execute more than ..." instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the user is the one doing the executing and calling execute though. Can do something like
you can't execute the compiled graph more than x times, but I feel like it's pretty obvious it's a compiled graph execution because the error will point to the compiled_graph.execute

python/ray/experimental/compiled_dag_ref.py Outdated Show resolved Hide resolved
Signed-off-by: dayshah <[email protected]>
Copy link
Member

@kevin85421 kevin85421 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Others LGTM

Signed-off-by: dayshah <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[core][compiled graphs] Cleanup for DAG context parameters
3 participants