You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
For multi-node setup, ADAG double registers a channel at a remote node, and this fails assertion.
Background: when a channel is registered with remote readers: on the remote node, a reader_ref is created and an entry from writer_ref to reader_ref (plus additional info) is added to a hash map. There is a check that the writer_ref does not exist before the insertion. However, this check fails causing raylet to crash.
Stack trace:
(raylet) Raylet is terminated. Termination is unexpected. Possible reasons include: (1) SIGKILL by the user or system OOM killer, (2) Invalid memory access from Raylet causing SIGSEGV or SIGBUS, (3) Other termination signals. Last 20 lines of the Raylet logs:
[2024-07-03 08:41:58,353 D 69285 72216582] (raylet) client.cc:312: IncrementObjectCount 00d3905a616924a8671a482b0aafdba522ccdaf70100000002e1f505 count is now: 2
[2024-07-03 08:41:58,353 D 69285 72216582] (raylet) client.cc:713: Decrement object count 00d3905a616924a8671a482b0aafdba522ccdaf70100000002e1f505 count is now 1
[2024-07-03 08:41:58,355 D 69285 72216582] (raylet) node_manager.cc:1219: [Worker] Message NotifyUnblocked(11) from worker with PID nil
[2024-07-03 08:41:58,355 D 69285 72216582] (raylet) dependency_manager.cc:149: Canceling get request for worker 01000000ffffffffffffffffffffffffffffffffffffffffffffffff
[2024-07-03 08:41:58,369 C 69285 72216582] (raylet) experimental_mutable_object_provider.cc:97: Check failed: success
*** StackTrace Information ***
0 raylet 0x000000010581077c _ZN3raylsERNSt3__113basic_ostreamIcNS0_11char_traitsIcEEEERKNS_10StackTraceE + 84 ray::operator<<()
1 raylet 0x0000000105813658 _ZN3ray6RayLogD2Ev + 84 ray::RayLog::~RayLog()
2 raylet 0x0000000104f369b8 _ZN3ray4core12experimental21MutableObjectProvider27HandleRegisterMutableObjectERKNS_8ObjectIDExS5_ + 232 ray::core::experimental::MutableObjectProvider::HandleRegisterMutableObject()
3 raylet 0x0000000104d96d70 _ZN3ray6raylet11NodeManager27HandleRegisterMutableObjectENS_3rpc28RegisterMutableObjectRequestEPNS2_26RegisterMutableObjectReplyENSt3__18functionIFvNS_6StatusENS7_IFvvEEESA_EEE + 100 ray::raylet::NodeManager::HandleRegisterMutableObject()
4 raylet 0x0000000104e1f51c _ZN3ray3rpc14ServerCallImplINS0_25NodeManagerServiceHandlerENS0_28RegisterMutableObjectRequestENS0_26RegisterMutableObjectReplyELNS0_8AuthTypeE0EE17HandleRequestImplEb + 164 ray::rpc::ServerCallImpl<>::HandleRequestImpl()
5 raylet 0x00000001051e288c _ZN12EventTracker15RecordExecutionERKNSt3__18functionIFvvEEENS0_10shared_ptrI11StatsHandleEE + 228 EventTracker::RecordExecution()
6 raylet 0x00000001051dbedc _ZNSt3__110__function6__funcIZN23instrumented_io_context4postENS_8functionIFvvEEENS_12basic_stringIcNS_11char_traitsIcEENS_9allocatorIcEEEExE3$_0NS9_ISC_EES4_EclEv + 56 std::__1::__function::__func<>::operator()()
7 raylet 0x00000001051db73c _ZN5boost4asio6detail18completion_handlerINSt3__18functionIFvvEEENS0_10io_context19basic_executor_typeINS3_9allocatorIvEELm0EEEE11do_completeEPvPNS1_19scheduler_operationERKNS_6system10error_codeEm + 236 boost::asio::detail::completion_handler<>::do_complete()
8 raylet 0x00000001059335dc _ZN5boost4asio6detail9scheduler10do_run_oneERNS1_27conditionally_enabled_mutex11scoped_lockERNS1_21scheduler_thread_infoERKNS_6system10error_codeE + 664 boost::asio::detail::scheduler::do_run_one()
9 raylet 0x0000000105928a0c _ZN5boost4asio6detail9scheduler3runERNS_6system10error_codeE + 200 boost::asio::detail::scheduler::run()
10 raylet 0x00000001059288f4 _ZN5boost4asio10io_context3runEv + 32 boost::asio::io_context::run()
11 raylet 0x0000000104d3c784 main + 6032 main
12 dyld 0x00000001923be0e0 start + 2360 start
Versions / Dependencies
latest Ray, ray-2.31.0
Reproduction script
def test_pp(ray_start_cluster):
cluster = ray_start_cluster
# This node is for the driver.
cluster.add_node(num_cpus=0)
ray.init(address=cluster.address)
TP = 2
# This node is for the PP stage 1.
cluster.add_node(resources={"pp1": TP})
# This node is for the PP stage 2.
cluster.add_node(resources={"pp2": TP})
@ray.remote
class Worker:
def __init__(self):
pass
def execute_model(self, val):
return val
pp1_workers = [Worker.options(num_cpus=0, resources={"pp1": 1}).remote() for _ in range(TP)]
pp2_workers = [Worker.options(num_cpus=0, resources={"pp2": 1}).remote() for _ in range(TP)]
with InputNode() as inp:
outputs = [inp for _ in range(TP)]
outputs = [pp1_workers[i].execute_model.bind(outputs[i]) for i in range(TP)]
outputs = [pp2_workers[i].execute_model.bind(outputs[i]) for i in range(TP)]
dag = MultiOutputNode(outputs)
compiled_dag = dag.experimental_compile()
ref = compiled_dag.execute(1)
ray.get(ref)
Issue Severity
None
The text was updated successfully, but these errors were encountered:
When an ADAG channel is pickled, it currently does not include _writer_registered flag. However, when the channel is deserialized and passed to another node, the channel may be double registered, causing runtime failures.
Using the repro script of #46411 as an example:
The first registration (ensure_registered_as_writer()) happens when the driver calls do_allocate_channel() on the actor, _writer_registered is set to True
However, when the driver ray.get() on the channel, its _writer_registered is False as the field is not pickled
The second registration happens when driver calls do_exec_tasks() (-> _prep_task() -> output_writer.start() -> _output_channel.ensure_registered_as_writer()) on the actor, the task's output channel is passed in from driver (with _writer_registered==False`).
Since ensure_registered_as_writer() (if the reader is a remote node) eventually calls ExperimentalRegisterMutableObjectReaderRemote() (->HandleRegisterMutableObject()) on the remote node, where it inserts an entry to a hash map keyed with writer_object_id. If there is already an entry with the same key, the check fails as shown in the following snippet:
bool success =
remote_writer_object_to_local_reader_.insert({writer_object_id, info}).second;
RAY_CHECK(success);
This PR fixes the issue by including these states in pickling. A new test test_pp is added to verify the fix.
This PR also introduces test_multi_node_dag and moves several tests from test_accelerated_dag since it got large.
What happened + What you expected to happen
For multi-node setup, ADAG double registers a channel at a remote node, and this fails assertion.
Background: when a channel is registered with remote readers: on the remote node, a reader_ref is created and an entry from writer_ref to reader_ref (plus additional info) is added to a hash map. There is a check that the writer_ref does not exist before the insertion. However, this check fails causing raylet to crash.
Stack trace:
Versions / Dependencies
latest Ray, ray-2.31.0
Reproduction script
Issue Severity
None
The text was updated successfully, but these errors were encountered: