Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Add more message when accessing a dead actor. #34697

Merged
merged 17 commits into from
Apr 26, 2023
25 changes: 0 additions & 25 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -479,29 +479,6 @@ cc_library(
],
)

# This header is used to warp some internal code so we can reduce suspicious
# symbols export.
cc_library(
name = "exported_internal",
srcs = glob(
[
"src/ray/internal/internal.cc",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guess it is unrelated changes? Looks okay to include in this PR to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it is... There is some method used in this file got impacted by this PR. So when I compile it just failed :(
I notice it seems not useful, so just delete that file.

],
),
hdrs = glob(
[
"src/ray/internal/internal.h",
],
),
copts = COPTS,
strip_include_prefix = "src",
visibility = ["//visibility:public"],
deps = [
":core_worker_lib",
],
alwayslink = 1,
)

cc_binary(
name = "raylet",
srcs = ["src/ray/raylet/main.cc"],
Expand Down Expand Up @@ -2813,7 +2790,6 @@ pyx_library(
),
deps = [
"//:core_worker_lib",
"//:exported_internal",
"//:global_state_accessor_lib",
"//:ray_util",
"//:raylet_lib",
Expand Down Expand Up @@ -2848,7 +2824,6 @@ cc_binary(
visibility = ["//java:__subpackages__"],
deps = [
"//:core_worker_lib",
"//:exported_internal",
"//:global_state_accessor_lib",
"//:src/ray/ray_exported_symbols.lds",
"//:src/ray/ray_version_script.lds",
Expand Down
9 changes: 6 additions & 3 deletions cpp/src/ray/runtime/task/native_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,12 @@ ObjectID NativeTaskSubmitter::Submit(InvocationSpec &invocation,
options.serialized_runtime_env_info = call_options.serialized_runtime_env_info;
std::optional<std::vector<rpc::ObjectReference>> return_refs;
if (invocation.task_type == TaskType::ACTOR_TASK) {
return_refs = core_worker.SubmitActorTask(
invocation.actor_id, BuildRayFunction(invocation), invocation.args, options);
if (!return_refs.has_value()) {
auto status = core_worker.SubmitActorTask(invocation.actor_id,
BuildRayFunction(invocation),
invocation.args,
options,
return_refs);
if (!status.ok() || !return_refs.has_value()) {
return ObjectID::Nil();
}
} else {
Expand Down
8 changes: 6 additions & 2 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2419,12 +2419,16 @@ cdef class CoreWorker:
&incremented_put_arg_ids)

with nogil:
return_refs = CCoreWorkerProcess.GetCoreWorker().SubmitActorTask(
status = CCoreWorkerProcess.GetCoreWorker().SubmitActorTask(
c_actor_id,
ray_function,
args_vector,
CTaskOptions(
name, num_returns, c_resources, concurrency_group_name))
name, num_returns, c_resources, concurrency_group_name),
return_refs)
if not status.ok():
raise Exception(f"Failed to submit task to actor {actor_id} "
f"due to {status.message()}")
# These arguments were serialized and put into the local object
# store during task submission. The backend increments their local
# ref count initially to ensure that they remain in scope until we
Expand Down
5 changes: 3 additions & 2 deletions python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,11 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
const CPlacementGroupID &placement_group_id)
CRayStatus WaitPlacementGroupReady(
const CPlacementGroupID &placement_group_id, int64_t timeout_seconds)
optional[c_vector[CObjectReference]] SubmitActorTask(
CRayStatus SubmitActorTask(
const CActorID &actor_id, const CRayFunction &function,
const c_vector[unique_ptr[CTaskArg]] &args,
const CTaskOptions &options)
const CTaskOptions &options,
optional[c_vector[CObjectReference]]&)
CRayStatus KillActor(
const CActorID &actor_id, c_bool force_kill,
c_bool no_restart)
Expand Down
19 changes: 19 additions & 0 deletions python/ray/tests/test_failure_4.py
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,25 @@ def sleeper():
assert raylet["NodeManagerAddress"] in message


def test_accessing_actor_after_cluster_crashed(shutdown_only):
ray.init()

@ray.remote
class A:
def f(self):
return

a = A.remote()

ray.get(a.f.remote())

ray.shutdown()
ray.init()
with pytest.raises(Exception) as exc_info:
ray.get(a.f.remote())
assert "It might be dead or it's from a different cluster" in exc_info.value.args[0]


if __name__ == "__main__":
import os

Expand Down
5 changes: 5 additions & 0 deletions src/ray/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ enum class StatusCode : char {
OutOfDisk = 28,
ObjectUnknownOwner = 29,
RpcError = 30,
OutOfResource = 31
};

#if defined(__clang__)
Expand Down Expand Up @@ -241,6 +242,10 @@ class RAY_EXPORT Status {
return Status(StatusCode::RpcError, msg, rpc_code);
}

static Status OutOfResource(const std::string &msg) {
return Status(StatusCode::OutOfResource, msg);
}

static StatusCode StringToCode(const std::string &str);

// Returns true iff the status indicates success.
Expand Down
16 changes: 12 additions & 4 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2175,17 +2175,24 @@ Status CoreWorker::WaitPlacementGroupReady(const PlacementGroupID &placement_gro
}
}

std::optional<std::vector<rpc::ObjectReference>> CoreWorker::SubmitActorTask(
Status CoreWorker::SubmitActorTask(
const ActorID &actor_id,
const RayFunction &function,
const std::vector<std::unique_ptr<TaskArg>> &args,
const TaskOptions &task_options) {
const TaskOptions &task_options,
std::optional<std::vector<rpc::ObjectReference>> &task_returns) {
absl::ReleasableMutexLock lock(&actor_task_mutex_);
task_returns = std::nullopt;
if (!direct_actor_submitter_->CheckActorExists(actor_id)) {
return Status::NotFound(
"Can't find this actor. It might be dead or it's from a different cluster");
fishbone marked this conversation as resolved.
Show resolved Hide resolved
}
/// Check whether backpressure may happen at the very beginning of submitting a task.
if (direct_actor_submitter_->PendingTasksFull(actor_id)) {
RAY_LOG(DEBUG) << "Back pressure occurred while submitting the task to " << actor_id
<< ". " << direct_actor_submitter_->DebugString(actor_id);
return std::nullopt;
return Status::OutOfResource(
"Too many tasks pending to be executed. Please try later");
fishbone marked this conversation as resolved.
Show resolved Hide resolved
}

auto actor_handle = actor_manager_->GetActorHandle(actor_id);
Expand Down Expand Up @@ -2248,7 +2255,8 @@ std::optional<std::vector<rpc::ObjectReference>> CoreWorker::SubmitActorTask(
rpc_address_, task_spec, CurrentCallSite(), actor_handle->MaxTaskRetries());
RAY_CHECK_OK(direct_actor_submitter_->SubmitTask(task_spec));
}
return {std::move(returned_refs)};
task_returns = {std::move(returned_refs)};
return Status::OK();
}

Status CoreWorker::CancelTask(const ObjectID &object_id,
Expand Down
14 changes: 8 additions & 6 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -806,12 +806,14 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// \param[in] function The remote function to execute.
/// \param[in] args Arguments of this task.
/// \param[in] task_options Options for this task.
/// \return ObjectRefs returned by this task.
std::optional<std::vector<rpc::ObjectReference>> SubmitActorTask(
const ActorID &actor_id,
const RayFunction &function,
const std::vector<std::unique_ptr<TaskArg>> &args,
const TaskOptions &task_options);
/// \param[out] task_returns The object returned by this task
///
/// \return Status of this submission
Status SubmitActorTask(const ActorID &actor_id,
const RayFunction &function,
const std::vector<std::unique_ptr<TaskArg>> &args,
const TaskOptions &task_options,
std::optional<std::vector<rpc::ObjectReference>> &task_returns);

/// Tell an actor to exit immediately, without completing outstanding work.
///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ inline ActorCreationOptions ToActorCreationOptions(JNIEnv *env,

max_restarts =
env->GetIntField(actorCreationOptions, java_actor_creation_options_max_restarts);
max_task_retries =
env->GetIntField(actorCreationOptions, java_actor_creation_options_max_task_retries);
max_task_retries = env->GetIntField(actorCreationOptions,
java_actor_creation_options_max_task_retries);
jobject java_resources =
env->GetObjectField(actorCreationOptions, java_base_task_options_resources);
resources = ToResources(env, java_resources);
Expand Down Expand Up @@ -278,22 +278,21 @@ inline ActorCreationOptions ToActorCreationOptions(JNIEnv *env,
placement_options.second);
placement_group_scheduling_strategy->set_placement_group_capture_child_tasks(false);
}
ActorCreationOptions actor_creation_options{
max_restarts,
max_task_retries,
static_cast<int>(max_concurrency),
resources,
resources,
dynamic_worker_options,
is_detached,
name,
ray_namespace,
is_async,
/*scheduling_strategy=*/scheduling_strategy,
serialized_runtime_env,
concurrency_groups,
/*execute_out_of_order*/ false,
max_pending_calls};
ActorCreationOptions actor_creation_options{max_restarts,
max_task_retries,
static_cast<int>(max_concurrency),
resources,
resources,
dynamic_worker_options,
is_detached,
name,
ray_namespace,
is_async,
/*scheduling_strategy=*/scheduling_strategy,
serialized_runtime_env,
concurrency_groups,
/*execute_out_of_order*/ false,
max_pending_calls};
return actor_creation_options;
}

Expand Down Expand Up @@ -439,10 +438,10 @@ Java_io_ray_runtime_task_NativeTaskSubmitter_nativeSubmitActorTask(
auto task_args = ToTaskArgs(env, args);
RAY_CHECK(callOptions != nullptr);
auto task_options = ToTaskOptions(env, numReturns, callOptions);

auto return_refs = CoreWorkerProcess::GetCoreWorker().SubmitActorTask(
actor_id, ray_function, task_args, task_options);
if (!return_refs.has_value()) {
std::optional<std::vector<rpc::ObjectReference>> return_refs;
auto status = CoreWorkerProcess::GetCoreWorker().SubmitActorTask(
actor_id, ray_function, task_args, task_options, return_refs);
if (!status.ok() || !return_refs.has_value()) {
std::stringstream ss;
ss << "The task " << ray_function.GetFunctionDescriptor()->ToString()
<< " could not be submitted to " << actor_id;
Expand Down
38 changes: 24 additions & 14 deletions src/ray/core_worker/test/core_worker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,10 @@ int CoreWorkerTest::GetActorPid(const ActorID &actor_id,
TaskOptions options{"", 1, resources};
RayFunction func{Language::PYTHON,
FunctionDescriptorBuilder::BuildPython("GetWorkerPid", "", "", "")};

auto return_ids = ObjectRefsToIds(CoreWorkerProcess::GetCoreWorker()
.SubmitActorTask(actor_id, func, args, options)
.value());
std::optional<std::vector<rpc::ObjectReference>> task_returns;
auto status = CoreWorkerProcess::GetCoreWorker().SubmitActorTask(
actor_id, func, args, options, task_returns);
auto return_ids = ObjectRefsToIds(task_returns.value());

std::vector<std::shared_ptr<RayObject>> results;
RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().Get(return_ids, -1, &results));
Expand Down Expand Up @@ -298,8 +298,10 @@ void CoreWorkerTest::TestActorTask(std::unordered_map<std::string, double> &reso
Language::PYTHON,
FunctionDescriptorBuilder::BuildPython("MergeInputArgsAsOutput", "", "", ""));

auto return_ids =
ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options).value());
std::optional<std::vector<rpc::ObjectReference>> task_returns;
auto status = CoreWorkerProcess::GetCoreWorker().SubmitActorTask(
actor_id, func, args, options, task_returns);
auto return_ids = ObjectRefsToIds(task_returns.value());
ASSERT_EQ(return_ids.size(), 1);

std::vector<std::shared_ptr<RayObject>> results;
Expand Down Expand Up @@ -344,8 +346,10 @@ void CoreWorkerTest::TestActorTask(std::unordered_map<std::string, double> &reso
RayFunction func(
Language::PYTHON,
FunctionDescriptorBuilder::BuildPython("MergeInputArgsAsOutput", "", "", ""));
auto return_ids =
ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options).value());
std::optional<std::vector<rpc::ObjectReference>> task_returns;
auto status = CoreWorkerProcess::GetCoreWorker().SubmitActorTask(
actor_id, func, args, options, task_returns);
auto return_ids = ObjectRefsToIds(task_returns.value());

ASSERT_EQ(return_ids.size(), 1);

Expand Down Expand Up @@ -409,8 +413,10 @@ void CoreWorkerTest::TestActorRestart(
Language::PYTHON,
FunctionDescriptorBuilder::BuildPython("MergeInputArgsAsOutput", "", "", ""));

auto return_ids =
ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options).value());
std::optional<std::vector<rpc::ObjectReference>> task_returns;
auto status = CoreWorkerProcess::GetCoreWorker().SubmitActorTask(
actor_id, func, args, options, task_returns);
auto return_ids = ObjectRefsToIds(task_returns.value());
ASSERT_EQ(return_ids.size(), 1);
// Verify if it's expected data.
std::vector<std::shared_ptr<RayObject>> results;
Expand Down Expand Up @@ -453,8 +459,10 @@ void CoreWorkerTest::TestActorFailure(
Language::PYTHON,
FunctionDescriptorBuilder::BuildPython("MergeInputArgsAsOutput", "", "", ""));

auto return_ids =
ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options).value());
std::optional<std::vector<rpc::ObjectReference>> task_returns;
auto status = CoreWorkerProcess::GetCoreWorker().SubmitActorTask(
actor_id, func, args, options, task_returns);
auto return_ids = ObjectRefsToIds(task_returns.value());

ASSERT_EQ(return_ids.size(), 1);
all_results.emplace_back(std::make_pair(return_ids[0], buffer1));
Expand Down Expand Up @@ -611,8 +619,10 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) {
Language::PYTHON,
FunctionDescriptorBuilder::BuildPython("MergeInputArgsAsOutput", "", "", ""));

auto return_ids =
ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options).value());
std::optional<std::vector<rpc::ObjectReference>> task_returns;
auto status = CoreWorkerProcess::GetCoreWorker().SubmitActorTask(
actor_id, func, args, options, task_returns);
auto return_ids = ObjectRefsToIds(task_returns.value());
ASSERT_EQ(return_ids.size(), 1);
object_ids.emplace_back(return_ids[0]);
}
Expand Down
5 changes: 5 additions & 0 deletions src/ray/core_worker/transport/direct_actor_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,11 @@ bool CoreWorkerDirectActorTaskSubmitter::PendingTasksFull(const ActorID &actor_i
it->second.cur_pending_calls >= it->second.max_pending_calls;
}

bool CoreWorkerDirectActorTaskSubmitter::CheckActorExists(const ActorID &actor_id) const {
absl::MutexLock lock(&mu_);
return client_queues_.find(actor_id) != client_queues_.end();
}

std::string CoreWorkerDirectActorTaskSubmitter::DebugString(
const ActorID &actor_id) const {
absl::MutexLock lock(&mu_);
Expand Down
19 changes: 13 additions & 6 deletions src/ray/core_worker/transport/direct_actor_task_submitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,25 @@ class CoreWorkerDirectActorTaskSubmitter
/// \return Whether the corresponding client queue is full or not.
bool PendingTasksFull(const ActorID &actor_id) const;

/// Check whether the actor exists
///
/// \param[in] actor_id Actor id.
///
/// \return Return true if the actor exists.
bool CheckActorExists(const ActorID &actor_id) const;

/// Returns debug string for class.
///
/// \param[in] actor_id The actor whose debug string to return.
/// \return string.
std::string DebugString(const ActorID &actor_id) const;

/// Whether the specified actor is alive.
///
/// \param[in] actor_id The actor ID.
/// \return Whether this actor is alive.
bool IsActorAlive(const ActorID &actor_id) const;

private:
/// A helper function to get task finisher without holding mu_
/// We should use this function when access
Expand Down Expand Up @@ -280,12 +293,6 @@ class CoreWorkerDirectActorTaskSubmitter
const absl::flat_hash_map<TaskID, rpc::ClientCallback<rpc::PushTaskReply>>
&inflight_task_callbacks) LOCKS_EXCLUDED(mu_);

/// Whether the specified actor is alive.
///
/// \param[in] actor_id The actor ID.
/// \return Whether this actor is alive.
bool IsActorAlive(const ActorID &actor_id) const;

/// Pool for producing new core worker clients.
rpc::CoreWorkerClientPool &core_worker_client_pool_;

Expand Down
Loading