Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[serve] Add support for deploy refactor #34845

Merged
merged 14 commits into from
May 1, 2023
180 changes: 101 additions & 79 deletions python/ray/serve/_private/application_state.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import traceback
from typing import Dict, List
from typing import Dict, List, Optional
from ray._private.usage.usage_lib import TagKey, record_extra_usage_tag
from ray.serve._private.common import ApplicationStatus
from ray.serve._private.deployment_state import DeploymentStateManager
Expand Down Expand Up @@ -38,29 +38,61 @@ def __init__(
deploy_obj_ref: Task ObjRef of deploying application.
deployment_time: Deployment timestamp
"""

self._name = name
self._deploy_obj_ref = deploy_obj_ref
self._app_msg = ""
self._deployment_state_manager = deployment_state_manager
self._deployment_params: List[Dict] = []
# This set tracks old deployments that are being deleted
self._deployments_to_delete = set()
self._ready_to_be_deleted = False
self._route_prefix = None
self._docs_path = None

if deploy_obj_ref:
self.status: ApplicationStatus = ApplicationStatus.DEPLOYING
self._status: ApplicationStatus = ApplicationStatus.DEPLOYING
else:
self.status: ApplicationStatus = ApplicationStatus.NOT_STARTED
self.name = name
self.deployment_params: List[Dict] = []
self.ready_to_be_deleted = False
self.deployment_state_manager = deployment_state_manager
self._status: ApplicationStatus = ApplicationStatus.NOT_STARTED
if deployment_time:
self.deployment_timestamp = deployment_time
self._deployment_timestamp = deployment_time
else:
self.deployment_timestamp = time.time()
self.deploy_obj_ref = deploy_obj_ref
self.app_msg = ""
self.route_prefix = None
self.docs_path = None
self._deployment_timestamp = time.time()

# This set tracks old deployments that are being deleted
self.deployments_to_delete = set()
@property
def ready_to_be_deleted(self) -> bool:
return self._ready_to_be_deleted

@property
def route_prefix(self) -> Optional[str]:
return self._route_prefix

@property
def docs_path(self) -> Optional[str]:
return self._docs_path

@property
def status(self) -> ApplicationStatus:
return self._status

@property
def deployment_timestamp(self) -> int:
return self._deployment_timestamp

@property
def deploy_obj_ref(self) -> Optional[ObjectRef]:
return self._deploy_obj_ref

@property
def deployments(self) -> List[str]:
"""Return all deployments name from the application"""
if self._deployment_params is None:
return []
return [params["name"] for params in self._deployment_params]

def delete(self):
"""Delete the application"""
self.status = ApplicationStatus.DELETING
self._status = ApplicationStatus.DELETING

def deploy(self, deployment_params: List[Dict]) -> List[str]:
"""Deploy the application.
Expand All @@ -76,60 +108,60 @@ def deploy(self, deployment_params: List[Dict]) -> List[str]:
# that are not used in the new deployment_params
to_be_deployed_deployments = {params["name"] for params in deployment_params}
cur_deployments_to_delete = []
for deployment_name in self.get_all_deployments():
for deployment_name in self.deployments:
if deployment_name not in to_be_deployed_deployments:
cur_deployments_to_delete.append(deployment_name)
self.deployments_to_delete.add(deployment_name)
self.deployment_params = deployment_params
self._deployments_to_delete.add(deployment_name)
self._deployment_params = deployment_params

# Update route prefix for application
num_route_prefixes = 0
num_docs_paths = 0
for deploy_param in deployment_params:
if deploy_param.get("route_prefix") is not None:
self.route_prefix = deploy_param["route_prefix"]
self._route_prefix = deploy_param["route_prefix"]
num_route_prefixes += 1

if deploy_param.get("docs_path") is not None:
self.docs_path = deploy_param["docs_path"]
self._docs_path = deploy_param["docs_path"]
num_docs_paths += 1
if num_route_prefixes > 1:
raise RayServeException(
f'Found multiple route prefix from application "{self.name}",'
f'Found multiple route prefix from application "{self._name}",'
" Please specify only one route prefix for the application "
"to avoid this issue."
)
# NOTE(zcin) This will not catch multiple FastAPI deployments in the application
# if user sets the docs path to None in their FastAPI app.
if num_docs_paths > 1:
raise RayServeException(
f'Found multiple deployments in application "{self.name}" that have '
f'Found multiple deployments in application "{self._name}" that have '
"a docs path. This may be due to using multiple FastAPI deployments "
"in your application. Please only include one deployment with a docs "
"path in your application to avoid this issue."
)

self.status = ApplicationStatus.DEPLOYING
self._status = ApplicationStatus.DEPLOYING
return cur_deployments_to_delete

def update_obj_ref(self, deploy_obj_ref: ObjectRef, deployment_time: int):
self.deploy_obj_ref = deploy_obj_ref
self.deployment_timestamp = deployment_time
self.status = ApplicationStatus.DEPLOYING
self._deploy_obj_ref = deploy_obj_ref
self._deployment_timestamp = deployment_time
self._status = ApplicationStatus.DEPLOYING

def _process_terminating_deployments(self):
"""Update the tracking for all deployments being deleted

When a deployment's status is None, the deployment will be
removed from application.
"""
for name in list(self.deployments_to_delete):
if self.deployment_state_manager.get_deployment(name):
for name in list(self._deployments_to_delete):
if self._deployment_state_manager.get_deployment(name):
logger.warning(
f"Deleting deployment {name} from application {self.name}."
f"Deleting deployment {name} from application {self._name}."
)
else:
self.deployments_to_delete.remove(name)
self._deployments_to_delete.remove(name)

def update(self):
"""Update the application status, maintain the ApplicationStatus.
Expand All @@ -141,87 +173,77 @@ def update(self):
DELETING: Mark ready_to_be_deleted as True when all deployments are gone.
"""

if self.ready_to_be_deleted:
if self._ready_to_be_deleted:
return

if self.status == ApplicationStatus.DELETING:
if self._status == ApplicationStatus.DELETING:
mark_delete = True
# Application won't be deleted until all deployments get cleaned up
for name in self.get_all_deployments():
if self.deployment_state_manager.get_deployment(name):
for name in self.deployments:
if self._deployment_state_manager.get_deployment(name):
logger.debug(
f"Deleting deployment {name} from application {self.name}."
f"Deleting deployment {name} from application {self._name}."
)
mark_delete = False
break
if self.deployments_to_delete:
if self._deployments_to_delete:
mark_delete = False
self.ready_to_be_deleted = mark_delete
self._ready_to_be_deleted = mark_delete
self._process_terminating_deployments()
return

if self.status == ApplicationStatus.DEPLOYING:
if self.deploy_obj_ref:
finished, pending = ray.wait([self.deploy_obj_ref], timeout=0)
if self._status == ApplicationStatus.DEPLOYING:
if self._deploy_obj_ref:
finished, pending = ray.wait([self._deploy_obj_ref], timeout=0)
if pending:
return
try:
ray.get(finished[0])
logger.info(f"Deploy task for app '{self.name}' ran successfully.")
logger.info(f"Deploy task for app '{self._name}' ran successfully.")
except RayTaskError as e:
self.status = ApplicationStatus.DEPLOY_FAILED
self._status = ApplicationStatus.DEPLOY_FAILED
# NOTE(zcin): we should use str(e) instead of traceback.format_exc()
# here because the full details of the error is not displayed
# properly with traceback.format_exc(). RayTaskError has its own
# custom __str__ function.
self.app_msg = f"Deploying app '{self.name}' failed:\n{str(e)}"
self.deploy_obj_ref = None
logger.warning(self.app_msg)
self._app_msg = f"Deploying app '{self._name}' failed:\n{str(e)}"
self._deploy_obj_ref = None
logger.warning(self._app_msg)
return
except RuntimeEnvSetupError:
self.status = ApplicationStatus.DEPLOY_FAILED
self.app_msg = (
f"Runtime env setup for app '{self.name}' "
self._status = ApplicationStatus.DEPLOY_FAILED
self._app_msg = (
f"Runtime env setup for app '{self._name}' "
f"failed:\n{traceback.format_exc()}"
)
self.deploy_obj_ref = None
logger.warning(self.app_msg)
self._deploy_obj_ref = None
logger.warning(self._app_msg)
return
deployments_statuses = (
self.deployment_state_manager.get_deployment_statuses(
self.get_all_deployments()
)
self._deployment_state_manager.get_deployment_statuses(self.deployments)
)
num_health_deployments = 0
for deployment_status in deployments_statuses:
if deployment_status.status == DeploymentStatus.UNHEALTHY:
self.status = ApplicationStatus.DEPLOY_FAILED
self._status = ApplicationStatus.DEPLOY_FAILED
return
if deployment_status.status == DeploymentStatus.HEALTHY:
num_health_deployments += 1
if num_health_deployments == len(deployments_statuses):
self.status = ApplicationStatus.RUNNING
self._status = ApplicationStatus.RUNNING

self._process_terminating_deployments()

def get_all_deployments(self) -> List[str]:
"""Return all deployments name from the application"""
if self.deployment_params is None:
return []
return [params["name"] for params in self.deployment_params]

def get_deployments_statuses(self) -> List[DeploymentStatusInfo]:
"""Return all deployment status information"""
return self.deployment_state_manager.get_deployment_statuses(
self.get_all_deployments()
)
return self._deployment_state_manager.get_deployment_statuses(self.deployments)

def get_application_status_info(self) -> ApplicationStatusInfo:
"""Return the application status information"""
return ApplicationStatusInfo(
self.status,
message=self.app_msg,
deployment_timestamp=self.deployment_timestamp,
self._status,
message=self._app_msg,
deployment_timestamp=self._deployment_timestamp,
)

def list_deployment_details(self) -> Dict[str, DeploymentDetails]:
Expand All @@ -235,15 +257,15 @@ def list_deployment_details(self) -> Dict[str, DeploymentDetails]:
been deleted.
"""
details = {
name: self.deployment_state_manager.get_deployment_details(name)
for name in self.get_all_deployments()
name: self._deployment_state_manager.get_deployment_details(name)
for name in self.deployments
}
return {k: v for k, v in details.items() if v is not None}


class ApplicationStateManager:
def __init__(self, deployment_state_manager):
self.deployment_state_manager = deployment_state_manager
self._deployment_state_manager = deployment_state_manager
self._application_states: Dict[str, ApplicationState] = {}

def delete_application(self, name: str):
Expand Down Expand Up @@ -287,7 +309,7 @@ def deploy_application(self, name: str, deployment_args: List[Dict]):
if name not in self._application_states:
self._application_states[name] = ApplicationState(
name,
self.deployment_state_manager,
self._deployment_state_manager,
)
record_extra_usage_tag(
TagKey.SERVE_NUM_APPS, str(len(self._application_states))
Expand All @@ -298,7 +320,7 @@ def get_deployments(self, app_name: str) -> List[str]:
"""Return all deployment names by app name"""
if app_name not in self._application_states:
return []
return self._application_states[app_name].get_all_deployments()
return self._application_states[app_name].deployments

def get_deployments_statuses(self, app_name: str) -> List[DeploymentStatusInfo]:
"""Return all deployment statuses by app name"""
Expand All @@ -315,6 +337,11 @@ def get_app_status(self, name: str) -> ApplicationStatusInfo:
)
return self._application_states[name].get_application_status_info()

def get_deployment_timestamp(self, name: str) -> float:
if name not in self._application_states:
return -1
return self._application_states[name].deployment_timestamp

def get_docs_path(self, app_name: str):
return self._application_states[app_name].docs_path

Expand Down Expand Up @@ -360,16 +387,11 @@ def create_application_state(
else:
self._application_states[name] = ApplicationState(
name,
self.deployment_state_manager,
self._deployment_state_manager,
deploy_obj_ref=deploy_obj_ref,
deployment_time=deployment_time,
)

def get_deployment_timestamp(self, name: str) -> float:
if name not in self._application_states:
return -1
return self._application_states[name].deployment_timestamp

def update(self):
"""Update each application state"""
apps_to_be_deleted = []
Expand Down
Loading