Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADAG] Double channel registration at remote node fails assertion #46411

Closed
ruisearch42 opened this issue Jul 3, 2024 · 0 comments · Fixed by #46417
Closed

[ADAG] Double channel registration at remote node fails assertion #46411

ruisearch42 opened this issue Jul 3, 2024 · 0 comments · Fixed by #46417
Assignees
Labels
bug Something that is supposed to be working; but isn't compiled-graphs

Comments

@ruisearch42
Copy link
Contributor

ruisearch42 commented Jul 3, 2024

What happened + What you expected to happen

For multi-node setup, ADAG double registers a channel at a remote node, and this fails assertion.

Background: when a channel is registered with remote readers: on the remote node, a reader_ref is created and an entry from writer_ref to reader_ref (plus additional info) is added to a hash map. There is a check that the writer_ref does not exist before the insertion. However, this check fails causing raylet to crash.

Stack trace:

(raylet) Raylet is terminated. Termination is unexpected. Possible reasons include: (1) SIGKILL by the user or system OOM killer, (2) Invalid memory access from Raylet causing SIGSEGV or SIGBUS, (3) Other termination signals. Last 20 lines of the Raylet logs:
    [2024-07-03 08:41:58,353 D 69285 72216582] (raylet) client.cc:312: IncrementObjectCount 00d3905a616924a8671a482b0aafdba522ccdaf70100000002e1f505 count is now: 2
    [2024-07-03 08:41:58,353 D 69285 72216582] (raylet) client.cc:713: Decrement object count 00d3905a616924a8671a482b0aafdba522ccdaf70100000002e1f505 count is now 1
    [2024-07-03 08:41:58,355 D 69285 72216582] (raylet) node_manager.cc:1219: [Worker] Message NotifyUnblocked(11) from worker with PID nil
    [2024-07-03 08:41:58,355 D 69285 72216582] (raylet) dependency_manager.cc:149: Canceling get request for worker 01000000ffffffffffffffffffffffffffffffffffffffffffffffff
    [2024-07-03 08:41:58,369 C 69285 72216582] (raylet) experimental_mutable_object_provider.cc:97:  Check failed: success
    *** StackTrace Information ***
    0   raylet                              0x000000010581077c _ZN3raylsERNSt3__113basic_ostreamIcNS0_11char_traitsIcEEEERKNS_10StackTraceE + 84 ray::operator<<()
    1   raylet                              0x0000000105813658 _ZN3ray6RayLogD2Ev + 84 ray::RayLog::~RayLog()
    2   raylet                              0x0000000104f369b8 _ZN3ray4core12experimental21MutableObjectProvider27HandleRegisterMutableObjectERKNS_8ObjectIDExS5_ + 232 ray::core::experimental::MutableObjectProvider::HandleRegisterMutableObject()
    3   raylet                              0x0000000104d96d70 _ZN3ray6raylet11NodeManager27HandleRegisterMutableObjectENS_3rpc28RegisterMutableObjectRequestEPNS2_26RegisterMutableObjectReplyENSt3__18functionIFvNS_6StatusENS7_IFvvEEESA_EEE + 100 ray::raylet::NodeManager::HandleRegisterMutableObject()
    4   raylet                              0x0000000104e1f51c _ZN3ray3rpc14ServerCallImplINS0_25NodeManagerServiceHandlerENS0_28RegisterMutableObjectRequestENS0_26RegisterMutableObjectReplyELNS0_8AuthTypeE0EE17HandleRequestImplEb + 164 ray::rpc::ServerCallImpl<>::HandleRequestImpl()
    5   raylet                              0x00000001051e288c _ZN12EventTracker15RecordExecutionERKNSt3__18functionIFvvEEENS0_10shared_ptrI11StatsHandleEE + 228 EventTracker::RecordExecution()
    6   raylet                              0x00000001051dbedc _ZNSt3__110__function6__funcIZN23instrumented_io_context4postENS_8functionIFvvEEENS_12basic_stringIcNS_11char_traitsIcEENS_9allocatorIcEEEExE3$_0NS9_ISC_EES4_EclEv + 56 std::__1::__function::__func<>::operator()()
    7   raylet                              0x00000001051db73c _ZN5boost4asio6detail18completion_handlerINSt3__18functionIFvvEEENS0_10io_context19basic_executor_typeINS3_9allocatorIvEELm0EEEE11do_completeEPvPNS1_19scheduler_operationERKNS_6system10error_codeEm + 236 boost::asio::detail::completion_handler<>::do_complete()
    8   raylet                              0x00000001059335dc _ZN5boost4asio6detail9scheduler10do_run_oneERNS1_27conditionally_enabled_mutex11scoped_lockERNS1_21scheduler_thread_infoERKNS_6system10error_codeE + 664 boost::asio::detail::scheduler::do_run_one()
    9   raylet                              0x0000000105928a0c _ZN5boost4asio6detail9scheduler3runERNS_6system10error_codeE + 200 boost::asio::detail::scheduler::run()
    10  raylet                              0x00000001059288f4 _ZN5boost4asio10io_context3runEv + 32 boost::asio::io_context::run()
    11  raylet                              0x0000000104d3c784 main + 6032 main
    12  dyld                                0x00000001923be0e0 start + 2360 start

Versions / Dependencies

latest Ray, ray-2.31.0

Reproduction script

def test_pp(ray_start_cluster):
    cluster = ray_start_cluster
    # This node is for the driver.
    cluster.add_node(num_cpus=0)
    ray.init(address=cluster.address)
    TP = 2
    # This node is for the PP stage 1.
    cluster.add_node(resources={"pp1": TP})
    # This node is for the PP stage 2.
    cluster.add_node(resources={"pp2": TP})

    @ray.remote
    class Worker:
        def __init__(self):
            pass

        def execute_model(self, val):
            return val

    pp1_workers = [Worker.options(num_cpus=0, resources={"pp1": 1}).remote() for _ in range(TP)]
    pp2_workers = [Worker.options(num_cpus=0, resources={"pp2": 1}).remote() for _ in range(TP)]

    with InputNode() as inp:
        outputs = [inp for _ in range(TP)]
        outputs = [pp1_workers[i].execute_model.bind(outputs[i]) for i in range(TP)]
        outputs = [pp2_workers[i].execute_model.bind(outputs[i]) for i in range(TP)]
        dag = MultiOutputNode(outputs)
    
    compiled_dag = dag.experimental_compile()
    ref = compiled_dag.execute(1)
    ray.get(ref)

Issue Severity

None

@ruisearch42 ruisearch42 added bug Something that is supposed to be working; but isn't compiled-graphs labels Jul 3, 2024
@ruisearch42 ruisearch42 self-assigned this Jul 3, 2024
rkooo567 pushed a commit that referenced this issue Jul 4, 2024
When an ADAG channel is pickled, it currently does not include _writer_registered flag. However, when the channel is deserialized and passed to another node, the channel may be double registered, causing runtime failures.

Using the repro script of #46411 as an example:

The first registration (ensure_registered_as_writer()) happens when the driver calls do_allocate_channel() on the actor, _writer_registered is set to True
However, when the driver ray.get() on the channel, its _writer_registered is False as the field is not pickled
The second registration happens when driver calls do_exec_tasks() (-> _prep_task() -> output_writer.start() -> _output_channel.ensure_registered_as_writer()) on the actor, the task's output channel is passed in from driver (with _writer_registered==False`).
Since ensure_registered_as_writer() (if the reader is a remote node) eventually calls ExperimentalRegisterMutableObjectReaderRemote() (->HandleRegisterMutableObject()) on the remote node, where it inserts an entry to a hash map keyed with writer_object_id. If there is already an entry with the same key, the check fails as shown in the following snippet:
  bool success =
      remote_writer_object_to_local_reader_.insert({writer_object_id, info}).second;
  RAY_CHECK(success);
This PR fixes the issue by including these states in pickling. A new test test_pp is added to verify the fix.

This PR also introduces test_multi_node_dag and moves several tests from test_accelerated_dag since it got large.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't compiled-graphs
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant