Skip to content

Commit

Permalink
Revert "Revert "[Cluster Events] Add basic job events. (#29164)" (#29… (
Browse files Browse the repository at this point in the history
#29196)

This reverts the PR and fixes the test failures

This also fixes a bug around _monitor_jobs API. the monitor job can be called on the same job twice now, which will break the event (because at the end of monitor job, we record the event that job is completed). The same completed event can be reported twice without the fix
  • Loading branch information
rkooo567 authored Nov 2, 2022
1 parent 9a020a2 commit 0540b1f
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 26 deletions.
79 changes: 66 additions & 13 deletions dashboard/modules/event/tests/test_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
from ray.dashboard.modules.event.event_utils import (
monitor_events,
)
from ray.job_submission import JobSubmissionClient
from pprint import pprint

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -381,21 +383,72 @@ def verify():
cluster.shutdown()


# def test_jobs_cluster_events(shutdown_only):
# ray.init()
# address = ray._private.worker._global_node.webui_url
# address = format_web_url(address)
# client = JobSubmissionClient(address)
# client.submit_job(entrypoint="ls")
def test_jobs_cluster_events(shutdown_only):
ray.init()
address = ray._private.worker._global_node.webui_url
address = format_web_url(address)
client = JobSubmissionClient(address)
submission_id = client.submit_job(entrypoint="ls")

def verify():
events = list_cluster_events()
assert len(list_cluster_events()) == 2
start_event = events[0]
completed_event = events[1]

assert start_event["source_type"] == "JOBS"
assert f"Started a ray job {submission_id}" in start_event["message"]
assert start_event["severity"] == "INFO"
assert completed_event["source_type"] == "JOBS"
assert (
f"Completed a ray job {submission_id} with a status SUCCEEDED."
== completed_event["message"]
)
assert completed_event["severity"] == "INFO"
return True

print("Test successful job run.")
wait_for_condition(verify)
pprint(list_cluster_events())

# Test the failure case. In this part, job fails because the runtime env
# creation fails.
submission_id = client.submit_job(
entrypoint="ls",
runtime_env={"pip": ["nonexistent_dep"]},
)

def verify():
events = list_cluster_events(detail=True)
failed_events = []

for e in events:
if (
"submission_id" in e["custom_fields"]
and e["custom_fields"]["submission_id"] == submission_id
):
failed_events.append(e)

assert len(failed_events) == 2
failed_start = failed_events[0]
failed_completed = failed_events[1]

assert failed_start["source_type"] == "JOBS"
assert f"Started a ray job {submission_id}" in failed_start["message"]
assert failed_completed["source_type"] == "JOBS"
assert failed_completed["severity"] == "ERROR"
assert (
f"Completed a ray job {submission_id} with a status FAILED."
in failed_completed["message"]
)

# def verify():
# assert len(list_cluster_events()) == 3
# for e in list_cluster_events():
# e["source_type"] = "JOBS"
# return True
# Make sure the error message is included.
assert "ERROR: No matching distribution found" in failed_completed["message"]
return True

# wait_for_condition(verify)
# print(list_cluster_events())
print("Test failed (runtime_env failure) job run.")
wait_for_condition(verify, timeout=30)
pprint(list_cluster_events())


if __name__ == "__main__":
Expand Down
4 changes: 3 additions & 1 deletion dashboard/modules/job/job_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ async def tail_job_logs(self, req: Request) -> Response:

def get_job_manager(self):
if not self._job_manager:
self._job_manager = JobManager(self._dashboard_agent.gcs_aio_client)
self._job_manager = JobManager(
self._dashboard_agent.gcs_aio_client, self._dashboard_agent.log_dir
)
return self._job_manager

async def run(self, server):
Expand Down
51 changes: 45 additions & 6 deletions dashboard/modules/job/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
from ray.dashboard.modules.job.utils import file_tail_iterator
from ray.exceptions import RuntimeEnvSetupError
from ray.job_submission import JobStatus

from ray._private.event.event_logger import get_event_logger
from ray.core.generated.event_pb2 import Event

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -410,12 +411,17 @@ class JobManager:
LOG_TAIL_SLEEP_S = 1
JOB_MONITOR_LOOP_PERIOD_S = 1

def __init__(self, gcs_aio_client: GcsAioClient):
def __init__(self, gcs_aio_client: GcsAioClient, logs_dir: str):
self._gcs_aio_client = gcs_aio_client
self._job_info_client = JobInfoStorageClient(gcs_aio_client)
self._gcs_address = gcs_aio_client._channel._gcs_address
self._log_client = JobLogStorageClient()
self._supervisor_actor_cls = ray.remote(JobSupervisor)
self.monitored_jobs = set()
try:
self.event_logger = get_event_logger(Event.SourceType.JOBS, logs_dir)
except Exception:
self.event_logger = None

create_task(self._recover_running_jobs())

Expand Down Expand Up @@ -447,6 +453,19 @@ async def _monitor_job(
This is necessary because we need to handle the case where the
JobSupervisor dies unexpectedly.
"""
if job_id in self.monitored_jobs:
logger.debug(f"Job {job_id} is already being monitored.")
return

self.monitored_jobs.add(job_id)
try:
await self._monitor_job_internal(job_id, job_supervisor)
finally:
self.monitored_jobs.remove(job_id)

async def _monitor_job_internal(
self, job_id: str, job_supervisor: Optional[ActorHandle] = None
):
is_alive = True
if job_supervisor is None:
job_supervisor = self._get_actor_for_job(job_id)
Expand All @@ -467,27 +486,43 @@ async def _monitor_job(
except Exception as e:
is_alive = False
job_status = await self._job_info_client.get_status(job_id)
job_error_message = None
if job_status.is_terminal():
# If the job is already in a terminal state, then the actor
# exiting is expected.
pass
elif isinstance(e, RuntimeEnvSetupError):
logger.info(f"Failed to set up runtime_env for job {job_id}.")
job_error_message = f"runtime_env setup failed: {e}"
job_status = JobStatus.FAILED
await self._job_info_client.put_status(
job_id,
JobStatus.FAILED,
message=f"runtime_env setup failed: {e}",
job_status,
message=job_error_message,
)
else:
logger.warning(
f"Job supervisor for job {job_id} failed unexpectedly: {e}."
)
job_error_message = f"Unexpected error occurred: {e}"
job_status = JobStatus.FAILED
await self._job_info_client.put_status(
job_id,
JobStatus.FAILED,
message=f"Unexpected error occurred: {e}",
job_status,
message=job_error_message,
)

# Log events
if self.event_logger:
event_log = (
f"Completed a ray job {job_id} with a status {job_status}."
)
if job_error_message:
event_log += f" {job_error_message}"
self.event_logger.error(event_log, submission_id=job_id)
else:
self.event_logger.info(event_log, submission_id=job_id)

# Kill the actor defensively to avoid leaking actors in unexpected error cases.
if job_supervisor is not None:
ray.kill(job_supervisor, no_restart=True)
Expand Down Expand Up @@ -631,6 +666,10 @@ async def submit_job(
# up.
try:
scheduling_strategy = await self._get_scheduling_strategy()
if self.event_logger:
self.event_logger.info(
f"Started a ray job {submission_id}.", submission_id=submission_id
)
supervisor = self._supervisor_actor_cls.options(
lifetime="detached",
name=JOB_ACTOR_NAME_TEMPLATE.format(job_id=submission_id),
Expand Down
14 changes: 8 additions & 6 deletions dashboard/modules/job/tests/test_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,16 @@
["""ray start --head"""],
indirect=True,
)
async def test_get_scheduling_strategy(call_ray_start, monkeypatch): # noqa: F811
async def test_get_scheduling_strategy(
call_ray_start, monkeypatch, tmp_path # noqa: F811
):
monkeypatch.setenv(RAY_JOB_ALLOW_DRIVER_ON_WORKER_NODES_ENV_VAR, "0")
address_info = ray.init(address=call_ray_start)
gcs_aio_client = GcsAioClient(
address=address_info["gcs_address"], nums_reconnect_retry=0
)

job_manager = JobManager(gcs_aio_client)
job_manager = JobManager(gcs_aio_client, tmp_path)

# If no head node id is found, we should use "DEFAULT".
await gcs_aio_client.internal_kv_del(
Expand Down Expand Up @@ -73,14 +75,14 @@ async def test_get_scheduling_strategy(call_ray_start, monkeypatch): # noqa: F8
["""ray start --head --resources={"TestResourceKey":123}"""],
indirect=True,
)
async def test_submit_no_ray_address(call_ray_start): # noqa: F811
async def test_submit_no_ray_address(call_ray_start, tmp_path): # noqa: F811
"""Test that a job script with an unspecified Ray address works."""

address_info = ray.init(address=call_ray_start)
gcs_aio_client = GcsAioClient(
address=address_info["gcs_address"], nums_reconnect_retry=0
)
job_manager = JobManager(gcs_aio_client)
job_manager = JobManager(gcs_aio_client, tmp_path)

init_ray_no_address_script = """
import ray
Expand Down Expand Up @@ -120,12 +122,12 @@ def shared_ray_instance():

@pytest.mark.asyncio
@pytest.fixture
async def job_manager(shared_ray_instance):
async def job_manager(shared_ray_instance, tmp_path):
address_info = shared_ray_instance
gcs_aio_client = GcsAioClient(
address=address_info["gcs_address"], nums_reconnect_retry=0
)
yield JobManager(gcs_aio_client)
yield JobManager(gcs_aio_client, tmp_path)


def _driver_script_path(file_name: str) -> str:
Expand Down
1 change: 1 addition & 0 deletions python/ray/experimental/state/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ class ClusterEventState(StateSchema):
source_type: str = state_column(filterable=True)
message: str = state_column(filterable=False)
event_id: int = state_column(filterable=True)
custom_fields: dict = state_column(filterable=False, detail=True)


@dataclass(init=True)
Expand Down

0 comments on commit 0540b1f

Please sign in to comment.