From 972f4302f7313a153d392b90ebea111aa6e51007 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Sat, 22 Apr 2023 23:15:49 +0000 Subject: [PATCH 01/15] fix Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- BUILD.bazel | 25 ------- .../ray/runtime/task/native_task_submitter.cc | 7 +- python/ray/_raylet.pyx | 7 +- python/ray/includes/libcoreworker.pxd | 5 +- src/ray/common/status.h | 5 ++ src/ray/core_worker/core_worker.cc | 14 ++-- src/ray/core_worker/core_worker.h | 9 ++- .../transport/direct_actor_task_submitter.h | 12 ++-- src/ray/internal/internal.cc | 68 ------------------- src/ray/internal/internal.h | 46 ------------- 10 files changed, 39 insertions(+), 159 deletions(-) delete mode 100644 src/ray/internal/internal.cc delete mode 100644 src/ray/internal/internal.h diff --git a/BUILD.bazel b/BUILD.bazel index c7b30fc61892..b7cb26e07bd0 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -479,29 +479,6 @@ cc_library( ], ) -# This header is used to warp some internal code so we can reduce suspicious -# symbols export. -cc_library( - name = "exported_internal", - srcs = glob( - [ - "src/ray/internal/internal.cc", - ], - ), - hdrs = glob( - [ - "src/ray/internal/internal.h", - ], - ), - copts = COPTS, - strip_include_prefix = "src", - visibility = ["//visibility:public"], - deps = [ - ":core_worker_lib", - ], - alwayslink = 1, -) - cc_binary( name = "raylet", srcs = ["src/ray/raylet/main.cc"], @@ -2813,7 +2790,6 @@ pyx_library( ), deps = [ "//:core_worker_lib", - "//:exported_internal", "//:global_state_accessor_lib", "//:ray_util", "//:raylet_lib", @@ -2848,7 +2824,6 @@ cc_binary( visibility = ["//java:__subpackages__"], deps = [ "//:core_worker_lib", - "//:exported_internal", "//:global_state_accessor_lib", "//:src/ray/ray_exported_symbols.lds", "//:src/ray/ray_version_script.lds", diff --git a/cpp/src/ray/runtime/task/native_task_submitter.cc b/cpp/src/ray/runtime/task/native_task_submitter.cc index 3e3228a50da4..53a923297cf0 100644 --- a/cpp/src/ray/runtime/task/native_task_submitter.cc +++ b/cpp/src/ray/runtime/task/native_task_submitter.cc @@ -68,9 +68,10 @@ ObjectID NativeTaskSubmitter::Submit(InvocationSpec &invocation, options.serialized_runtime_env_info = call_options.serialized_runtime_env_info; std::optional> return_refs; if (invocation.task_type == TaskType::ACTOR_TASK) { - return_refs = core_worker.SubmitActorTask( - invocation.actor_id, BuildRayFunction(invocation), invocation.args, options); - if (!return_refs.has_value()) { + auto status = core_worker.SubmitActorTask( + invocation.actor_id, BuildRayFunction(invocation), invocation.args, options, + return_refs); + if (!status.ok() || !return_refs.has_value()) { return ObjectID::Nil(); } } else { diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 89a80aff8bef..45b5e7b8efb5 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -2419,12 +2419,15 @@ cdef class CoreWorker: &incremented_put_arg_ids) with nogil: - return_refs = CCoreWorkerProcess.GetCoreWorker().SubmitActorTask( + status = CCoreWorkerProcess.GetCoreWorker().SubmitActorTask( c_actor_id, ray_function, args_vector, CTaskOptions( - name, num_returns, c_resources, concurrency_group_name)) + name, num_returns, c_resources, concurrency_group_name), + return_refs) + if not status.ok(): + raise Exception(f"Failed to submit task to actor {actor_id} due to {status.message()}") # These arguments were serialized and put into the local object # store during task submission. The backend increments their local # ref count initially to ensure that they remain in scope until we diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 41e29f58012e..6a25da36d281 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -117,10 +117,11 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const CPlacementGroupID &placement_group_id) CRayStatus WaitPlacementGroupReady( const CPlacementGroupID &placement_group_id, int64_t timeout_seconds) - optional[c_vector[CObjectReference]] SubmitActorTask( + CRayStatus SubmitActorTask( const CActorID &actor_id, const CRayFunction &function, const c_vector[unique_ptr[CTaskArg]] &args, - const CTaskOptions &options) + const CTaskOptions &options, + optional[c_vector[CObjectReference]]&) CRayStatus KillActor( const CActorID &actor_id, c_bool force_kill, c_bool no_restart) diff --git a/src/ray/common/status.h b/src/ray/common/status.h index c0477e652383..11fac3fec455 100644 --- a/src/ray/common/status.h +++ b/src/ray/common/status.h @@ -114,6 +114,7 @@ enum class StatusCode : char { OutOfDisk = 28, ObjectUnknownOwner = 29, RpcError = 30, + OutOfResource = 31 }; #if defined(__clang__) @@ -241,6 +242,10 @@ class RAY_EXPORT Status { return Status(StatusCode::RpcError, msg, rpc_code); } + static Status OutOfResource(const std::string &msg) { + return Status(StatusCode::OutOfResource, msg); + } + static StatusCode StringToCode(const std::string &str); // Returns true iff the status indicates success. diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index b84d91909dad..9bc1ad95a5c7 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2175,17 +2175,22 @@ Status CoreWorker::WaitPlacementGroupReady(const PlacementGroupID &placement_gro } } -std::optional> CoreWorker::SubmitActorTask( +Status CoreWorker::SubmitActorTask( const ActorID &actor_id, const RayFunction &function, const std::vector> &args, - const TaskOptions &task_options) { + const TaskOptions &task_options, + std::optional> &task_returns) { absl::ReleasableMutexLock lock(&actor_task_mutex_); + task_returns = std::nullopt; + if(!direct_actor_submitter_->IsActorAlive(actor_id)) { + return Status::NotFound("Can't find this actor. It might be dead or it's from a different cluster"); + } /// Check whether backpressure may happen at the very beginning of submitting a task. if (direct_actor_submitter_->PendingTasksFull(actor_id)) { RAY_LOG(DEBUG) << "Back pressure occurred while submitting the task to " << actor_id << ". " << direct_actor_submitter_->DebugString(actor_id); - return std::nullopt; + return Status::OutOfResource("Too many tasks pending to be executed. Please try later"); } auto actor_handle = actor_manager_->GetActorHandle(actor_id); @@ -2248,7 +2253,8 @@ std::optional> CoreWorker::SubmitActorTask( rpc_address_, task_spec, CurrentCallSite(), actor_handle->MaxTaskRetries()); RAY_CHECK_OK(direct_actor_submitter_->SubmitTask(task_spec)); } - return {std::move(returned_refs)}; + task_returns = {std::move(returned_refs)}; + return Status::OK(); } Status CoreWorker::CancelTask(const ObjectID &object_id, diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 088fd620644e..3373a5c7fd8b 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -806,12 +806,15 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// \param[in] function The remote function to execute. /// \param[in] args Arguments of this task. /// \param[in] task_options Options for this task. - /// \return ObjectRefs returned by this task. - std::optional> SubmitActorTask( + /// \param[out] task_returns The object returned by this task + /// + /// \return Status of this submission + Status SubmitActorTask( const ActorID &actor_id, const RayFunction &function, const std::vector> &args, - const TaskOptions &task_options); + const TaskOptions &task_options, + std::optional>& task_returns); /// Tell an actor to exit immediately, without completing outstanding work. /// diff --git a/src/ray/core_worker/transport/direct_actor_task_submitter.h b/src/ray/core_worker/transport/direct_actor_task_submitter.h index fa28fc485824..6ecd8b88e0bb 100644 --- a/src/ray/core_worker/transport/direct_actor_task_submitter.h +++ b/src/ray/core_worker/transport/direct_actor_task_submitter.h @@ -157,6 +157,12 @@ class CoreWorkerDirectActorTaskSubmitter /// \return string. std::string DebugString(const ActorID &actor_id) const; + + /// Whether the specified actor is alive. + /// + /// \param[in] actor_id The actor ID. + /// \return Whether this actor is alive. + bool IsActorAlive(const ActorID &actor_id) const; private: /// A helper function to get task finisher without holding mu_ /// We should use this function when access @@ -280,12 +286,6 @@ class CoreWorkerDirectActorTaskSubmitter const absl::flat_hash_map> &inflight_task_callbacks) LOCKS_EXCLUDED(mu_); - /// Whether the specified actor is alive. - /// - /// \param[in] actor_id The actor ID. - /// \return Whether this actor is alive. - bool IsActorAlive(const ActorID &actor_id) const; - /// Pool for producing new core worker clients. rpc::CoreWorkerClientPool &core_worker_client_pool_; diff --git a/src/ray/internal/internal.cc b/src/ray/internal/internal.cc deleted file mode 100644 index 7821c2b1ff86..000000000000 --- a/src/ray/internal/internal.cc +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright 2020 The Ray Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "ray/internal/internal.h" - -#include "ray/core_worker/core_worker.h" - -namespace ray { -namespace internal { - -using ray::core::CoreWorkerProcess; -using ray::core::TaskOptions; - -std::vector SendInternal(const ActorID &peer_actor_id, - std::shared_ptr buffer, - RayFunction &function, - int return_num) { - std::unordered_map resources; - std::string name = function.GetFunctionDescriptor()->DefaultTaskName(); - TaskOptions options{name, return_num, resources}; - - char meta_data[3] = {'R', 'A', 'W'}; - std::shared_ptr meta = - std::make_shared((uint8_t *)meta_data, 3, true); - - std::vector> args; - if (function.GetLanguage() == Language::PYTHON) { - auto dummy = "__RAY_DUMMY__"; - std::shared_ptr dummyBuffer = - std::make_shared((uint8_t *)dummy, 13, true); - args.emplace_back(new TaskArgByValue(std::make_shared( - std::move(dummyBuffer), meta, std::vector(), true))); - } - args.emplace_back(new TaskArgByValue(std::make_shared( - std::move(buffer), meta, std::vector(), true))); - - std::vector> results; - auto result = CoreWorkerProcess::GetCoreWorker().SubmitActorTask( - peer_actor_id, function, args, options); - if (!result.has_value()) { - RAY_CHECK(false) << "Back pressure should not be enabled."; - } - return result.value(); -} - -const ray::stats::TagKeyType TagRegister(const std::string tag_name) { - return ray::stats::TagKeyType::Register(tag_name); -} - -const ActorID &GetCurrentActorID() { - return CoreWorkerProcess::GetCoreWorker().GetWorkerContext().GetCurrentActorID(); -} - -bool IsInitialized() { return CoreWorkerProcess::IsInitialized(); } - -} // namespace internal -} // namespace ray diff --git a/src/ray/internal/internal.h b/src/ray/internal/internal.h deleted file mode 100644 index 0eb58062c03c..000000000000 --- a/src/ray/internal/internal.h +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright 2020 The Ray Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once -#include "ray/common/buffer.h" -#include "ray/common/id.h" -#include "ray/core_worker/common.h" -#include "ray/stats/metric.h" - -// This header is used to warp some internal code so we can reduce suspicious -// symbols export. -namespace ray { -namespace internal { - -using ray::core::RayFunction; - -/// Send buffer internal -/// \param[in] buffer buffer to be sent. -/// \param[in] function the function descriptor of peer's function. -/// \param[in] return_num return value number of the call. -/// \param[out] return_ids return ids from SubmitActorTask. -std::vector SendInternal(const ActorID &peer_actor_id, - std::shared_ptr buffer, - RayFunction &function, - int return_num); - -const stats::TagKeyType TagRegister(const std::string tag_name); - -/// Get current actor id via internal. -const ActorID &GetCurrentActorID(); - -/// Get core worker initialization flag via internal. -bool IsInitialized(); -} // namespace internal -} // namespace ray From 46431f978ffff52900d341a8b40f2f73eae6e0e3 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Sat, 22 Apr 2023 23:20:01 +0000 Subject: [PATCH 02/15] fix Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- cpp/src/ray/runtime/task/native_task_submitter.cc | 8 +++++--- python/ray/_raylet.pyx | 5 +++-- src/ray/core_worker/core_worker.cc | 8 +++++--- src/ray/core_worker/core_worker.h | 11 +++++------ .../transport/direct_actor_task_submitter.h | 2 +- 5 files changed, 19 insertions(+), 15 deletions(-) diff --git a/cpp/src/ray/runtime/task/native_task_submitter.cc b/cpp/src/ray/runtime/task/native_task_submitter.cc index 53a923297cf0..f6d84ba32a07 100644 --- a/cpp/src/ray/runtime/task/native_task_submitter.cc +++ b/cpp/src/ray/runtime/task/native_task_submitter.cc @@ -68,9 +68,11 @@ ObjectID NativeTaskSubmitter::Submit(InvocationSpec &invocation, options.serialized_runtime_env_info = call_options.serialized_runtime_env_info; std::optional> return_refs; if (invocation.task_type == TaskType::ACTOR_TASK) { - auto status = core_worker.SubmitActorTask( - invocation.actor_id, BuildRayFunction(invocation), invocation.args, options, - return_refs); + auto status = core_worker.SubmitActorTask(invocation.actor_id, + BuildRayFunction(invocation), + invocation.args, + options, + return_refs); if (!status.ok() || !return_refs.has_value()) { return ObjectID::Nil(); } diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 45b5e7b8efb5..e8714c4e3288 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -2425,9 +2425,10 @@ cdef class CoreWorker: args_vector, CTaskOptions( name, num_returns, c_resources, concurrency_group_name), - return_refs) + return_refs) if not status.ok(): - raise Exception(f"Failed to submit task to actor {actor_id} due to {status.message()}") + raise Exception(f"Failed to submit task to actor {actor_id} " + f"due to {status.message()}") # These arguments were serialized and put into the local object # store during task submission. The backend increments their local # ref count initially to ensure that they remain in scope until we diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 9bc1ad95a5c7..f08fa352a4c2 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2183,14 +2183,16 @@ Status CoreWorker::SubmitActorTask( std::optional> &task_returns) { absl::ReleasableMutexLock lock(&actor_task_mutex_); task_returns = std::nullopt; - if(!direct_actor_submitter_->IsActorAlive(actor_id)) { - return Status::NotFound("Can't find this actor. It might be dead or it's from a different cluster"); + if (!direct_actor_submitter_->IsActorAlive(actor_id)) { + return Status::NotFound( + "Can't find this actor. It might be dead or it's from a different cluster"); } /// Check whether backpressure may happen at the very beginning of submitting a task. if (direct_actor_submitter_->PendingTasksFull(actor_id)) { RAY_LOG(DEBUG) << "Back pressure occurred while submitting the task to " << actor_id << ". " << direct_actor_submitter_->DebugString(actor_id); - return Status::OutOfResource("Too many tasks pending to be executed. Please try later"); + return Status::OutOfResource( + "Too many tasks pending to be executed. Please try later"); } auto actor_handle = actor_manager_->GetActorHandle(actor_id); diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 3373a5c7fd8b..03669b14db58 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -809,12 +809,11 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// \param[out] task_returns The object returned by this task /// /// \return Status of this submission - Status SubmitActorTask( - const ActorID &actor_id, - const RayFunction &function, - const std::vector> &args, - const TaskOptions &task_options, - std::optional>& task_returns); + Status SubmitActorTask(const ActorID &actor_id, + const RayFunction &function, + const std::vector> &args, + const TaskOptions &task_options, + std::optional> &task_returns); /// Tell an actor to exit immediately, without completing outstanding work. /// diff --git a/src/ray/core_worker/transport/direct_actor_task_submitter.h b/src/ray/core_worker/transport/direct_actor_task_submitter.h index 6ecd8b88e0bb..85e73b3b6a89 100644 --- a/src/ray/core_worker/transport/direct_actor_task_submitter.h +++ b/src/ray/core_worker/transport/direct_actor_task_submitter.h @@ -157,12 +157,12 @@ class CoreWorkerDirectActorTaskSubmitter /// \return string. std::string DebugString(const ActorID &actor_id) const; - /// Whether the specified actor is alive. /// /// \param[in] actor_id The actor ID. /// \return Whether this actor is alive. bool IsActorAlive(const ActorID &actor_id) const; + private: /// A helper function to get task finisher without holding mu_ /// We should use this function when access From 85394fbcc713bbe2d9f932fb3f8aef4f2af62c04 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Mon, 24 Apr 2023 18:23:14 +0000 Subject: [PATCH 03/15] fix Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- python/ray/tests/test_failure_4.py | 20 +++++++++++++++++++ src/ray/core_worker/core_worker.cc | 2 +- ...io_ray_runtime_task_NativeTaskSubmitter.cc | 8 ++++---- .../transport/direct_actor_task_submitter.cc | 5 +++++ .../transport/direct_actor_task_submitter.h | 7 +++++++ 5 files changed, 37 insertions(+), 5 deletions(-) diff --git a/python/ray/tests/test_failure_4.py b/python/ray/tests/test_failure_4.py index f56f2e802b0d..eddaf2b18b80 100644 --- a/python/ray/tests/test_failure_4.py +++ b/python/ray/tests/test_failure_4.py @@ -700,6 +700,26 @@ def sleeper(): assert raylet["NodeManagerAddress"] in message +def test_accessing_actor_after_cluster_crashed(shutdown_only): + ray.init() + @ray.remote + class A: + def f(self): + return + + a = A.remote() + + ray.get(a.f.remote()) + + ray.shutdown() + ray.init() + with pytest.raises(Exception) as exc_info: + ray.get(a.f.remote()) + + assert "It might be dead or it's from a different cluster" \ + in exc_info.args[0] + + if __name__ == "__main__": import os diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index f08fa352a4c2..592142520276 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2183,7 +2183,7 @@ Status CoreWorker::SubmitActorTask( std::optional> &task_returns) { absl::ReleasableMutexLock lock(&actor_task_mutex_); task_returns = std::nullopt; - if (!direct_actor_submitter_->IsActorAlive(actor_id)) { + if (!direct_actor_submitter_->CheckActorExists(actor_id)) { return Status::NotFound( "Can't find this actor. It might be dead or it's from a different cluster"); } diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc index a5542eb700fb..c8e3e6eefb72 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc @@ -439,10 +439,10 @@ Java_io_ray_runtime_task_NativeTaskSubmitter_nativeSubmitActorTask( auto task_args = ToTaskArgs(env, args); RAY_CHECK(callOptions != nullptr); auto task_options = ToTaskOptions(env, numReturns, callOptions); - - auto return_refs = CoreWorkerProcess::GetCoreWorker().SubmitActorTask( - actor_id, ray_function, task_args, task_options); - if (!return_refs.has_value()) { + std::optional> return_refs; + auto status = CoreWorkerProcess::GetCoreWorker().SubmitActorTask( + actor_id, ray_function, task_args, task_options, return_refs); + if (!status.ok() || !return_refs.has_value()) { std::stringstream ss; ss << "The task " << ray_function.GetFunctionDescriptor()->ToString() << " could not be submitted to " << actor_id; diff --git a/src/ray/core_worker/transport/direct_actor_task_submitter.cc b/src/ray/core_worker/transport/direct_actor_task_submitter.cc index cab04a6cebe5..e5162c982117 100644 --- a/src/ray/core_worker/transport/direct_actor_task_submitter.cc +++ b/src/ray/core_worker/transport/direct_actor_task_submitter.cc @@ -599,6 +599,11 @@ bool CoreWorkerDirectActorTaskSubmitter::PendingTasksFull(const ActorID &actor_i it->second.cur_pending_calls >= it->second.max_pending_calls; } +bool CoreWorkerDirectActorTaskSubmitter::CheckActorExists(const ActorID &actor_id) const { + absl::MutexLock lock(&mu_); + return client_queues_.find(actor_id) != client_queues_.end(); +} + std::string CoreWorkerDirectActorTaskSubmitter::DebugString( const ActorID &actor_id) const { absl::MutexLock lock(&mu_); diff --git a/src/ray/core_worker/transport/direct_actor_task_submitter.h b/src/ray/core_worker/transport/direct_actor_task_submitter.h index 85e73b3b6a89..5340f17f9005 100644 --- a/src/ray/core_worker/transport/direct_actor_task_submitter.h +++ b/src/ray/core_worker/transport/direct_actor_task_submitter.h @@ -151,6 +151,13 @@ class CoreWorkerDirectActorTaskSubmitter /// \return Whether the corresponding client queue is full or not. bool PendingTasksFull(const ActorID &actor_id) const; + /// Check whether the actor exists + /// + /// \param[in] actor_id Actor id. + /// + /// \return Return true if the actor exists. + bool CheckActorExists(const ActorID &actor_id) const; + /// Returns debug string for class. /// /// \param[in] actor_id The actor whose debug string to return. From 424f916e84f954226fb6d9b18824d1fd69633c24 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Mon, 24 Apr 2023 18:24:00 +0000 Subject: [PATCH 04/15] format Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- python/ray/tests/test_failure_4.py | 8 ++--- ...io_ray_runtime_task_NativeTaskSubmitter.cc | 35 +++++++++---------- 2 files changed, 21 insertions(+), 22 deletions(-) diff --git a/python/ray/tests/test_failure_4.py b/python/ray/tests/test_failure_4.py index eddaf2b18b80..3bc606d34a57 100644 --- a/python/ray/tests/test_failure_4.py +++ b/python/ray/tests/test_failure_4.py @@ -702,10 +702,11 @@ def sleeper(): def test_accessing_actor_after_cluster_crashed(shutdown_only): ray.init() + @ray.remote class A: - def f(self): - return + def f(self): + return a = A.remote() @@ -716,8 +717,7 @@ def f(self): with pytest.raises(Exception) as exc_info: ray.get(a.f.remote()) - assert "It might be dead or it's from a different cluster" \ - in exc_info.args[0] + assert "It might be dead or it's from a different cluster" in exc_info.args[0] if __name__ == "__main__": diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc index c8e3e6eefb72..bd14c9707b7f 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc @@ -187,8 +187,8 @@ inline ActorCreationOptions ToActorCreationOptions(JNIEnv *env, max_restarts = env->GetIntField(actorCreationOptions, java_actor_creation_options_max_restarts); - max_task_retries = - env->GetIntField(actorCreationOptions, java_actor_creation_options_max_task_retries); + max_task_retries = env->GetIntField(actorCreationOptions, + java_actor_creation_options_max_task_retries); jobject java_resources = env->GetObjectField(actorCreationOptions, java_base_task_options_resources); resources = ToResources(env, java_resources); @@ -278,22 +278,21 @@ inline ActorCreationOptions ToActorCreationOptions(JNIEnv *env, placement_options.second); placement_group_scheduling_strategy->set_placement_group_capture_child_tasks(false); } - ActorCreationOptions actor_creation_options{ - max_restarts, - max_task_retries, - static_cast(max_concurrency), - resources, - resources, - dynamic_worker_options, - is_detached, - name, - ray_namespace, - is_async, - /*scheduling_strategy=*/scheduling_strategy, - serialized_runtime_env, - concurrency_groups, - /*execute_out_of_order*/ false, - max_pending_calls}; + ActorCreationOptions actor_creation_options{max_restarts, + max_task_retries, + static_cast(max_concurrency), + resources, + resources, + dynamic_worker_options, + is_detached, + name, + ray_namespace, + is_async, + /*scheduling_strategy=*/scheduling_strategy, + serialized_runtime_env, + concurrency_groups, + /*execute_out_of_order*/ false, + max_pending_calls}; return actor_creation_options; } From f2fb8ce6146e76e6e56bae2242130b50ae4b3fed Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Mon, 24 Apr 2023 11:27:17 -0700 Subject: [PATCH 05/15] fix test --- python/ray/tests/test_failure_4.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/python/ray/tests/test_failure_4.py b/python/ray/tests/test_failure_4.py index 3bc606d34a57..75081cdd662e 100644 --- a/python/ray/tests/test_failure_4.py +++ b/python/ray/tests/test_failure_4.py @@ -716,9 +716,8 @@ def f(self): ray.init() with pytest.raises(Exception) as exc_info: ray.get(a.f.remote()) - - assert "It might be dead or it's from a different cluster" in exc_info.args[0] - + assert "It might be dead or it's from a different cluster" in exc_info.value.args[0] + if __name__ == "__main__": import os From c96e73b03618543e1f3e64e11a250ccf18152f4a Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Mon, 24 Apr 2023 11:30:26 -0700 Subject: [PATCH 06/15] format --- python/ray/tests/test_failure_4.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/tests/test_failure_4.py b/python/ray/tests/test_failure_4.py index 75081cdd662e..17d5a9940d49 100644 --- a/python/ray/tests/test_failure_4.py +++ b/python/ray/tests/test_failure_4.py @@ -717,7 +717,7 @@ def f(self): with pytest.raises(Exception) as exc_info: ray.get(a.f.remote()) assert "It might be dead or it's from a different cluster" in exc_info.value.args[0] - + if __name__ == "__main__": import os From dfb388450c43c7eeb7982cf19e4a3144393f487e Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Mon, 24 Apr 2023 18:58:22 -0700 Subject: [PATCH 07/15] fix win --- src/ray/core_worker/test/core_worker_test.cc | 38 ++++++++++++-------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index cba46a14d733..3259a11f0a83 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -202,10 +202,10 @@ int CoreWorkerTest::GetActorPid(const ActorID &actor_id, TaskOptions options{"", 1, resources}; RayFunction func{Language::PYTHON, FunctionDescriptorBuilder::BuildPython("GetWorkerPid", "", "", "")}; - - auto return_ids = ObjectRefsToIds(CoreWorkerProcess::GetCoreWorker() - .SubmitActorTask(actor_id, func, args, options) - .value()); + std::optional> task_returns; + auto status = CoreWorkerProcess::GetCoreWorker().SubmitActorTask( + actor_id, func, args, options, task_returns); + auto return_ids = ObjectRefsToIds(task_returns.value()); std::vector> results; RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().Get(return_ids, -1, &results)); @@ -298,8 +298,10 @@ void CoreWorkerTest::TestActorTask(std::unordered_map &reso Language::PYTHON, FunctionDescriptorBuilder::BuildPython("MergeInputArgsAsOutput", "", "", "")); - auto return_ids = - ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options).value()); + std::optional> task_returns; + auto status = CoreWorkerProcess::GetCoreWorker().SubmitActorTask( + actor_id, func, args, options, task_returns); + auto return_ids = ObjectRefsToIds(task_returns.value()); ASSERT_EQ(return_ids.size(), 1); std::vector> results; @@ -344,8 +346,10 @@ void CoreWorkerTest::TestActorTask(std::unordered_map &reso RayFunction func( Language::PYTHON, FunctionDescriptorBuilder::BuildPython("MergeInputArgsAsOutput", "", "", "")); - auto return_ids = - ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options).value()); + std::optional> task_returns; + auto status = CoreWorkerProcess::GetCoreWorker().SubmitActorTask( + actor_id, func, args, options, task_returns); + auto return_ids = ObjectRefsToIds(task_returns.value()); ASSERT_EQ(return_ids.size(), 1); @@ -409,8 +413,10 @@ void CoreWorkerTest::TestActorRestart( Language::PYTHON, FunctionDescriptorBuilder::BuildPython("MergeInputArgsAsOutput", "", "", "")); - auto return_ids = - ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options).value()); + std::optional> task_returns; + auto status = CoreWorkerProcess::GetCoreWorker().SubmitActorTask( + actor_id, func, args, options, task_returns); + auto return_ids = ObjectRefsToIds(task_returns.value()); ASSERT_EQ(return_ids.size(), 1); // Verify if it's expected data. std::vector> results; @@ -453,8 +459,10 @@ void CoreWorkerTest::TestActorFailure( Language::PYTHON, FunctionDescriptorBuilder::BuildPython("MergeInputArgsAsOutput", "", "", "")); - auto return_ids = - ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options).value()); + std::optional> task_returns; + auto status = CoreWorkerProcess::GetCoreWorker().SubmitActorTask( + actor_id, func, args, options, task_returns); + auto return_ids = ObjectRefsToIds(task_returns.value()); ASSERT_EQ(return_ids.size(), 1); all_results.emplace_back(std::make_pair(return_ids[0], buffer1)); @@ -611,8 +619,10 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) { Language::PYTHON, FunctionDescriptorBuilder::BuildPython("MergeInputArgsAsOutput", "", "", "")); - auto return_ids = - ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options).value()); + std::optional> task_returns; + auto status = CoreWorkerProcess::GetCoreWorker().SubmitActorTask( + actor_id, func, args, options, task_returns); + auto return_ids = ObjectRefsToIds(task_returns.value()); ASSERT_EQ(return_ids.size(), 1); object_ids.emplace_back(return_ids[0]); } From f3ca96cc3d9502e97ce80a25ec11c7f75d275c7c Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Tue, 25 Apr 2023 13:42:53 -0700 Subject: [PATCH 08/15] fix comments --- src/ray/core_worker/core_worker.cc | 13 +++++++++---- .../transport/direct_actor_task_submitter.cc | 8 ++++++++ .../transport/direct_actor_task_submitter.h | 9 ++++++++- 3 files changed, 25 insertions(+), 5 deletions(-) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 592142520276..bc59bbb9c878 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -20,6 +20,7 @@ #include +#include "absl/strings/str_format.h" #include "boost/fiber/all.hpp" #include "ray/common/bundle_spec.h" #include "ray/common/ray_config.h" @@ -2184,15 +2185,19 @@ Status CoreWorker::SubmitActorTask( absl::ReleasableMutexLock lock(&actor_task_mutex_); task_returns = std::nullopt; if (!direct_actor_submitter_->CheckActorExists(actor_id)) { - return Status::NotFound( - "Can't find this actor. It might be dead or it's from a different cluster"); + std::string err_msg = absl::StrFormat( + "Can't find actor %s. It might be dead or it's from a different cluster", + actor_id.Hex()); + return Status::NotFound(std::move(err_msg)); } /// Check whether backpressure may happen at the very beginning of submitting a task. if (direct_actor_submitter_->PendingTasksFull(actor_id)) { RAY_LOG(DEBUG) << "Back pressure occurred while submitting the task to " << actor_id << ". " << direct_actor_submitter_->DebugString(actor_id); - return Status::OutOfResource( - "Too many tasks pending to be executed. Please try later"); + return Status::OutOfResource(absl::StrFormat( + "Too many tasks (%d) pending to be executed for actor %s. Please try later", + direct_actor_submitter_->NumPendingTasks(actor_id), + actor_id.Hex())); } auto actor_handle = actor_manager_->GetActorHandle(actor_id); diff --git a/src/ray/core_worker/transport/direct_actor_task_submitter.cc b/src/ray/core_worker/transport/direct_actor_task_submitter.cc index e5162c982117..0451e5c0ae1a 100644 --- a/src/ray/core_worker/transport/direct_actor_task_submitter.cc +++ b/src/ray/core_worker/transport/direct_actor_task_submitter.cc @@ -599,6 +599,14 @@ bool CoreWorkerDirectActorTaskSubmitter::PendingTasksFull(const ActorID &actor_i it->second.cur_pending_calls >= it->second.max_pending_calls; } +size_t CoreWorkerDirectActorTaskSubmitter::NumPendingTasks( + const ActorID &actor_id) const { + absl::MutexLock lock(&mu_); + auto it = client_queues_.find(actor_id); + RAY_CHECK(it != client_queues_.end()); + return it->second.cur_pending_calls; +} + bool CoreWorkerDirectActorTaskSubmitter::CheckActorExists(const ActorID &actor_id) const { absl::MutexLock lock(&mu_); return client_queues_.find(actor_id) != client_queues_.end(); diff --git a/src/ray/core_worker/transport/direct_actor_task_submitter.h b/src/ray/core_worker/transport/direct_actor_task_submitter.h index 5340f17f9005..e67cfd49f61b 100644 --- a/src/ray/core_worker/transport/direct_actor_task_submitter.h +++ b/src/ray/core_worker/transport/direct_actor_task_submitter.h @@ -39,7 +39,7 @@ namespace ray { namespace core { -/// In direct actor call task submitter and receiver, a task is directly submitted +/// In direct actor call submitter and receiver, a task is directly submitted /// to the actor that will execute it. // Interface for testing. @@ -90,6 +90,7 @@ class CoreWorkerDirectActorTaskSubmitter /// /// \param[in] actor_id The actor for whom to add a queue. /// \param[in] max_pending_calls The max pending calls for the actor to be added. + /// \param[in] execute_out_of_order Whether to execute tasks out of order. /// \param[in] fail_if_actor_unreachable Whether to fail newly submitted tasks /// immediately when the actor is unreachable. void AddActorQueueIfNotExists(const ActorID &actor_id, @@ -151,6 +152,12 @@ class CoreWorkerDirectActorTaskSubmitter /// \return Whether the corresponding client queue is full or not. bool PendingTasksFull(const ActorID &actor_id) const; + /// Get the number of pending tasks in the queue. + /// + /// \param[in] actor_id Actor id. + /// \return The number of pending tasks in the queue. + size_t NumPendingTasks(const ActorID &actor_id) const; + /// Check whether the actor exists /// /// \param[in] actor_id Actor id. From 2855b23c22379163b1c790a4d5272c525dc2f0d1 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Tue, 25 Apr 2023 14:08:13 -0700 Subject: [PATCH 09/15] fix --- .../ray/runtime/task/native_task_submitter.cc | 4 ++-- python/ray/_raylet.pyx | 12 +++++++----- python/ray/includes/common.pxd | 1 + python/ray/includes/libcoreworker.pxd | 2 +- src/ray/common/status.h | 2 ++ src/ray/core_worker/core_worker.cc | 15 +++++++-------- src/ray/core_worker/core_worker.h | 2 +- .../io_ray_runtime_task_NativeTaskSubmitter.cc | 6 +++--- src/ray/core_worker/test/core_worker_test.cc | 16 ++++++++++------ .../transport/direct_actor_task_submitter.h | 2 +- 10 files changed, 35 insertions(+), 27 deletions(-) diff --git a/cpp/src/ray/runtime/task/native_task_submitter.cc b/cpp/src/ray/runtime/task/native_task_submitter.cc index f6d84ba32a07..017d33e2f0e4 100644 --- a/cpp/src/ray/runtime/task/native_task_submitter.cc +++ b/cpp/src/ray/runtime/task/native_task_submitter.cc @@ -66,14 +66,14 @@ ObjectID NativeTaskSubmitter::Submit(InvocationSpec &invocation, options.name = call_options.name; options.resources = call_options.resources; options.serialized_runtime_env_info = call_options.serialized_runtime_env_info; - std::optional> return_refs; + std::vector return_refs; if (invocation.task_type == TaskType::ACTOR_TASK) { auto status = core_worker.SubmitActorTask(invocation.actor_id, BuildRayFunction(invocation), invocation.args, options, return_refs); - if (!status.ok() || !return_refs.has_value()) { + if (!status.ok()) { return ObjectID::Nil(); } } else { diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index e8714c4e3288..0021bc31365e 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -2426,9 +2426,6 @@ cdef class CoreWorker: CTaskOptions( name, num_returns, c_resources, concurrency_group_name), return_refs) - if not status.ok(): - raise Exception(f"Failed to submit task to actor {actor_id} " - f"due to {status.message()}") # These arguments were serialized and put into the local object # store during task submission. The backend increments their local # ref count initially to ensure that they remain in scope until we @@ -2438,7 +2435,7 @@ cdef class CoreWorker: CCoreWorkerProcess.GetCoreWorker().RemoveLocalReference( put_arg_id) - if return_refs.has_value(): + if status.ok(): # The initial local reference is already acquired internally # when adding the pending task. return VectorToObjectRefs(return_refs.value(), @@ -2447,7 +2444,8 @@ cdef class CoreWorker: actor = self.get_actor_handle(actor_id) actor_handle = (CCoreWorkerProcess.GetCoreWorker() .GetActorHandle(c_actor_id)) - raise PendingCallsLimitExceeded("The task {} could not be " + if status.IsOutOfResource(): + raise PendingCallsLimitExceeded("The task {} could not be " "submitted to {} because more " "than {} tasks are queued on " "the actor. This limit " @@ -2460,6 +2458,10 @@ cdef class CoreWorker: (dereference(actor_handle) .MaxPendingCalls()) )) + else: + raise Exception(f"Failed to submit task to actor {actor_id} " + f"due to {status.message()}") + def kill_actor(self, ActorID actor_id, c_bool no_restart): cdef: diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 5d5f3ab593ea..8de7c38c4cd2 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -117,6 +117,7 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil: c_bool IsNotFound() c_bool IsObjectUnknownOwner() c_bool IsRpcError() + c_bool IsOutOfResource() c_string ToString() c_string CodeAsString() diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 6a25da36d281..c0aba4ca6a45 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -121,7 +121,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const CActorID &actor_id, const CRayFunction &function, const c_vector[unique_ptr[CTaskArg]] &args, const CTaskOptions &options, - optional[c_vector[CObjectReference]]&) + c_vector[CObjectReference]&) CRayStatus KillActor( const CActorID &actor_id, c_bool force_kill, c_bool no_restart) diff --git a/src/ray/common/status.h b/src/ray/common/status.h index 11fac3fec455..bda9860ddc4a 100644 --- a/src/ray/common/status.h +++ b/src/ray/common/status.h @@ -292,6 +292,8 @@ class RAY_EXPORT Status { bool IsRpcError() const { return code() == StatusCode::RpcError; } + bool IsOutOfResource() const { return code() == StatusCode::OutOfResource; } + // Return a string representation of this status suitable for printing. // Returns the string "OK" for success. std::string ToString() const; diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index bc59bbb9c878..baae03c8427b 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2176,14 +2176,13 @@ Status CoreWorker::WaitPlacementGroupReady(const PlacementGroupID &placement_gro } } -Status CoreWorker::SubmitActorTask( - const ActorID &actor_id, - const RayFunction &function, - const std::vector> &args, - const TaskOptions &task_options, - std::optional> &task_returns) { +Status CoreWorker::SubmitActorTask(const ActorID &actor_id, + const RayFunction &function, + const std::vector> &args, + const TaskOptions &task_options, + std::vector &task_returns) { absl::ReleasableMutexLock lock(&actor_task_mutex_); - task_returns = std::nullopt; + task_returns.clear(); if (!direct_actor_submitter_->CheckActorExists(actor_id)) { std::string err_msg = absl::StrFormat( "Can't find actor %s. It might be dead or it's from a different cluster", @@ -2260,7 +2259,7 @@ Status CoreWorker::SubmitActorTask( rpc_address_, task_spec, CurrentCallSite(), actor_handle->MaxTaskRetries()); RAY_CHECK_OK(direct_actor_submitter_->SubmitTask(task_spec)); } - task_returns = {std::move(returned_refs)}; + task_returns = std::move(returned_refs); return Status::OK(); } diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 03669b14db58..e4e694610e1a 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -813,7 +813,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { const RayFunction &function, const std::vector> &args, const TaskOptions &task_options, - std::optional> &task_returns); + std::vector &task_returns); /// Tell an actor to exit immediately, without completing outstanding work. /// diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc index bd14c9707b7f..13a420f16972 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc @@ -438,10 +438,10 @@ Java_io_ray_runtime_task_NativeTaskSubmitter_nativeSubmitActorTask( auto task_args = ToTaskArgs(env, args); RAY_CHECK(callOptions != nullptr); auto task_options = ToTaskOptions(env, numReturns, callOptions); - std::optional> return_refs; + std::vector return_refs; auto status = CoreWorkerProcess::GetCoreWorker().SubmitActorTask( actor_id, ray_function, task_args, task_options, return_refs); - if (!status.ok() || !return_refs.has_value()) { + if (!status.ok()) { std::stringstream ss; ss << "The task " << ray_function.GetFunctionDescriptor()->ToString() << " could not be submitted to " << actor_id; @@ -455,7 +455,7 @@ Java_io_ray_runtime_task_NativeTaskSubmitter_nativeSubmitActorTask( } std::vector return_ids; - for (const auto &ref : return_refs.value()) { + for (const auto &ref : return_refs) { return_ids.push_back(ObjectID::FromBinary(ref.object_id())); } diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index 3259a11f0a83..109487fec4c0 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -202,7 +202,8 @@ int CoreWorkerTest::GetActorPid(const ActorID &actor_id, TaskOptions options{"", 1, resources}; RayFunction func{Language::PYTHON, FunctionDescriptorBuilder::BuildPython("GetWorkerPid", "", "", "")}; - std::optional> task_returns; + std::vector task_returns; + ; auto status = CoreWorkerProcess::GetCoreWorker().SubmitActorTask( actor_id, func, args, options, task_returns); auto return_ids = ObjectRefsToIds(task_returns.value()); @@ -298,7 +299,8 @@ void CoreWorkerTest::TestActorTask(std::unordered_map &reso Language::PYTHON, FunctionDescriptorBuilder::BuildPython("MergeInputArgsAsOutput", "", "", "")); - std::optional> task_returns; + std::vector task_returns; + ; auto status = CoreWorkerProcess::GetCoreWorker().SubmitActorTask( actor_id, func, args, options, task_returns); auto return_ids = ObjectRefsToIds(task_returns.value()); @@ -346,7 +348,8 @@ void CoreWorkerTest::TestActorTask(std::unordered_map &reso RayFunction func( Language::PYTHON, FunctionDescriptorBuilder::BuildPython("MergeInputArgsAsOutput", "", "", "")); - std::optional> task_returns; + std::vector task_returns; + ; auto status = CoreWorkerProcess::GetCoreWorker().SubmitActorTask( actor_id, func, args, options, task_returns); auto return_ids = ObjectRefsToIds(task_returns.value()); @@ -413,7 +416,7 @@ void CoreWorkerTest::TestActorRestart( Language::PYTHON, FunctionDescriptorBuilder::BuildPython("MergeInputArgsAsOutput", "", "", "")); - std::optional> task_returns; + std::vector task_returns; auto status = CoreWorkerProcess::GetCoreWorker().SubmitActorTask( actor_id, func, args, options, task_returns); auto return_ids = ObjectRefsToIds(task_returns.value()); @@ -459,7 +462,8 @@ void CoreWorkerTest::TestActorFailure( Language::PYTHON, FunctionDescriptorBuilder::BuildPython("MergeInputArgsAsOutput", "", "", "")); - std::optional> task_returns; + std::vector task_returns; + ; auto status = CoreWorkerProcess::GetCoreWorker().SubmitActorTask( actor_id, func, args, options, task_returns); auto return_ids = ObjectRefsToIds(task_returns.value()); @@ -619,7 +623,7 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) { Language::PYTHON, FunctionDescriptorBuilder::BuildPython("MergeInputArgsAsOutput", "", "", "")); - std::optional> task_returns; + std::vector task_returns; auto status = CoreWorkerProcess::GetCoreWorker().SubmitActorTask( actor_id, func, args, options, task_returns); auto return_ids = ObjectRefsToIds(task_returns.value()); diff --git a/src/ray/core_worker/transport/direct_actor_task_submitter.h b/src/ray/core_worker/transport/direct_actor_task_submitter.h index e67cfd49f61b..add2bd2fda91 100644 --- a/src/ray/core_worker/transport/direct_actor_task_submitter.h +++ b/src/ray/core_worker/transport/direct_actor_task_submitter.h @@ -39,7 +39,7 @@ namespace ray { namespace core { -/// In direct actor call submitter and receiver, a task is directly submitted +/// In direct actor call task submitter and receiver, a task is directly submitted /// to the actor that will execute it. // Interface for testing. From f4580e993aea86a6e55ec4ded3f92eca96811042 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Tue, 25 Apr 2023 14:12:07 -0700 Subject: [PATCH 10/15] fix --- python/ray/_raylet.pyx | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 0021bc31365e..6221ebca6ed8 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -2406,7 +2406,7 @@ cdef class CoreWorker: unordered_map[c_string, double] c_resources CRayFunction ray_function c_vector[unique_ptr[CTaskArg]] args_vector - optional[c_vector[CObjectReference]] return_refs + c_vector[CObjectReference] return_refs c_vector[CObjectID] incremented_put_arg_ids with self.profile_event(b"submit_task"): @@ -2438,20 +2438,20 @@ cdef class CoreWorker: if status.ok(): # The initial local reference is already acquired internally # when adding the pending task. - return VectorToObjectRefs(return_refs.value(), + return VectorToObjectRefs(return_refs, skip_adding_local_ref=True) else: actor = self.get_actor_handle(actor_id) actor_handle = (CCoreWorkerProcess.GetCoreWorker() .GetActorHandle(c_actor_id)) if status.IsOutOfResource(): - raise PendingCallsLimitExceeded("The task {} could not be " - "submitted to {} because more " - "than {} tasks are queued on " - "the actor. This limit " - "can be adjusted with the " - "`max_pending_calls` actor " - "option.".format( + raise PendingCallsLimitExceeded("The task {} could not be " + "submitted to {} because more " + "than {} tasks are queued on " + "the actor. This limit " + "can be adjusted with the " + "`max_pending_calls` actor " + "option.".format( function_descriptor .function_name, repr(actor), From 2457323d9eb23e8ee763eb798301829098980c34 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Tue, 25 Apr 2023 21:14:56 +0000 Subject: [PATCH 11/15] lint Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- python/ray/_raylet.pyx | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 6221ebca6ed8..fbdc9443c0c8 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -2445,24 +2445,16 @@ cdef class CoreWorker: actor_handle = (CCoreWorkerProcess.GetCoreWorker() .GetActorHandle(c_actor_id)) if status.IsOutOfResource(): - raise PendingCallsLimitExceeded("The task {} could not be " - "submitted to {} because more " - "than {} tasks are queued on " - "the actor. This limit " - "can be adjusted with the " - "`max_pending_calls` actor " - "option.".format( - function_descriptor - .function_name, - repr(actor), - (dereference(actor_handle) - .MaxPendingCalls()) - )) + raise PendingCallsLimitExceeded( + f"The task {function_descriptor..function_name} could not be " + f"submitted to {repr(actor)} because more than" + f" {(dereference(actor_handle).MaxPendingCalls())}" + " tasks are queued on the actor. This limit can be adjusted" + " with the `max_pending_calls` actor option.") else: raise Exception(f"Failed to submit task to actor {actor_id} " f"due to {status.message()}") - def kill_actor(self, ActorID actor_id, c_bool no_restart): cdef: CActorID c_actor_id = actor_id.native() From 64d924351ae6e97abf0759f8c92a3a7f272014cb Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Tue, 25 Apr 2023 14:20:14 -0700 Subject: [PATCH 12/15] fix --- cpp/src/ray/runtime/task/native_task_submitter.cc | 2 +- python/ray/_raylet.pyx | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/ray/runtime/task/native_task_submitter.cc b/cpp/src/ray/runtime/task/native_task_submitter.cc index 017d33e2f0e4..e69cf61fa164 100644 --- a/cpp/src/ray/runtime/task/native_task_submitter.cc +++ b/cpp/src/ray/runtime/task/native_task_submitter.cc @@ -98,7 +98,7 @@ ObjectID NativeTaskSubmitter::Submit(InvocationSpec &invocation, ""); } std::vector return_ids; - for (const auto &ref : return_refs.value()) { + for (const auto &ref : return_refs) { return_ids.push_back(ObjectID::FromBinary(ref.object_id())); } return return_ids[0]; diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index fbdc9443c0c8..3c6f3262aaad 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -2446,7 +2446,7 @@ cdef class CoreWorker: .GetActorHandle(c_actor_id)) if status.IsOutOfResource(): raise PendingCallsLimitExceeded( - f"The task {function_descriptor..function_name} could not be " + f"The task {function_descriptor.function_name} could not be " f"submitted to {repr(actor)} because more than" f" {(dereference(actor_handle).MaxPendingCalls())}" " tasks are queued on the actor. This limit can be adjusted" From 78fd9882803dbc83e25cc22bff87cfb35db648a6 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Tue, 25 Apr 2023 16:52:14 -0700 Subject: [PATCH 13/15] fix cpp --- src/ray/core_worker/test/core_worker_test.cc | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index 109487fec4c0..31a97db7bd4f 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -203,10 +203,9 @@ int CoreWorkerTest::GetActorPid(const ActorID &actor_id, RayFunction func{Language::PYTHON, FunctionDescriptorBuilder::BuildPython("GetWorkerPid", "", "", "")}; std::vector task_returns; - ; auto status = CoreWorkerProcess::GetCoreWorker().SubmitActorTask( actor_id, func, args, options, task_returns); - auto return_ids = ObjectRefsToIds(task_returns.value()); + auto return_ids = ObjectRefsToIds(task_returns); std::vector> results; RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().Get(return_ids, -1, &results)); @@ -300,10 +299,9 @@ void CoreWorkerTest::TestActorTask(std::unordered_map &reso FunctionDescriptorBuilder::BuildPython("MergeInputArgsAsOutput", "", "", "")); std::vector task_returns; - ; auto status = CoreWorkerProcess::GetCoreWorker().SubmitActorTask( actor_id, func, args, options, task_returns); - auto return_ids = ObjectRefsToIds(task_returns.value()); + auto return_ids = ObjectRefsToIds(task_returns); ASSERT_EQ(return_ids.size(), 1); std::vector> results; @@ -349,10 +347,9 @@ void CoreWorkerTest::TestActorTask(std::unordered_map &reso Language::PYTHON, FunctionDescriptorBuilder::BuildPython("MergeInputArgsAsOutput", "", "", "")); std::vector task_returns; - ; auto status = CoreWorkerProcess::GetCoreWorker().SubmitActorTask( actor_id, func, args, options, task_returns); - auto return_ids = ObjectRefsToIds(task_returns.value()); + auto return_ids = ObjectRefsToIds(task_returns); ASSERT_EQ(return_ids.size(), 1); @@ -419,7 +416,7 @@ void CoreWorkerTest::TestActorRestart( std::vector task_returns; auto status = CoreWorkerProcess::GetCoreWorker().SubmitActorTask( actor_id, func, args, options, task_returns); - auto return_ids = ObjectRefsToIds(task_returns.value()); + auto return_ids = ObjectRefsToIds(task_returns); ASSERT_EQ(return_ids.size(), 1); // Verify if it's expected data. std::vector> results; @@ -463,10 +460,9 @@ void CoreWorkerTest::TestActorFailure( FunctionDescriptorBuilder::BuildPython("MergeInputArgsAsOutput", "", "", "")); std::vector task_returns; - ; auto status = CoreWorkerProcess::GetCoreWorker().SubmitActorTask( actor_id, func, args, options, task_returns); - auto return_ids = ObjectRefsToIds(task_returns.value()); + auto return_ids = ObjectRefsToIds(task_returns); ASSERT_EQ(return_ids.size(), 1); all_results.emplace_back(std::make_pair(return_ids[0], buffer1)); @@ -626,7 +622,7 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) { std::vector task_returns; auto status = CoreWorkerProcess::GetCoreWorker().SubmitActorTask( actor_id, func, args, options, task_returns); - auto return_ids = ObjectRefsToIds(task_returns.value()); + auto return_ids = ObjectRefsToIds(task_returns); ASSERT_EQ(return_ids.size(), 1); object_ids.emplace_back(return_ids[0]); } From 4fd62d1e064f9c841171e8e4ed1d343fdfcd3e48 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Tue, 25 Apr 2023 17:28:52 -0700 Subject: [PATCH 14/15] fix failure --- python/ray/_raylet.pyx | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 3c6f3262aaad..f50db1628cb9 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -2441,10 +2441,10 @@ cdef class CoreWorker: return VectorToObjectRefs(return_refs, skip_adding_local_ref=True) else: - actor = self.get_actor_handle(actor_id) - actor_handle = (CCoreWorkerProcess.GetCoreWorker() - .GetActorHandle(c_actor_id)) if status.IsOutOfResource(): + actor = self.get_actor_handle(actor_id) + actor_handle = (CCoreWorkerProcess.GetCoreWorker() + .GetActorHandle(c_actor_id)) raise PendingCallsLimitExceeded( f"The task {function_descriptor.function_name} could not be " f"submitted to {repr(actor)} because more than" From 48afb2ee7386082a353417bc1c0f800005991d5f Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Wed, 26 Apr 2023 02:02:22 +0000 Subject: [PATCH 15/15] fix lint Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- python/ray/_raylet.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index f50db1628cb9..9e2f06e31b77 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -2444,7 +2444,7 @@ cdef class CoreWorker: if status.IsOutOfResource(): actor = self.get_actor_handle(actor_id) actor_handle = (CCoreWorkerProcess.GetCoreWorker() - .GetActorHandle(c_actor_id)) + .GetActorHandle(c_actor_id)) raise PendingCallsLimitExceeded( f"The task {function_descriptor.function_name} could not be " f"submitted to {repr(actor)} because more than"