Skip to content

Commit

Permalink
[serve] Add support for deploy refactor (#34845)
Browse files Browse the repository at this point in the history
Some preparation for upcoming deploy refactor PRs.
- Rename fields of `ApplicationState` as private fields
- Add route prefix to deployment info (it will be used by application state to deploy deployments in a reconciler loop)
- Stop setting num replicas from autoscaling in `deployment_config.num_replicas`, instead put it in a separate field of deployment info.
- determine initial autoscaled num replicas in deployment state manager .deploy()
  • Loading branch information
zcin authored May 1, 2023
1 parent 7071a4f commit d87a251
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 181 deletions.
180 changes: 101 additions & 79 deletions python/ray/serve/_private/application_state.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import traceback
from typing import Dict, List
from typing import Dict, List, Optional
from ray._private.usage.usage_lib import TagKey, record_extra_usage_tag
from ray.serve._private.common import ApplicationStatus
from ray.serve._private.deployment_state import DeploymentStateManager
Expand Down Expand Up @@ -38,29 +38,61 @@ def __init__(
deploy_obj_ref: Task ObjRef of deploying application.
deployment_time: Deployment timestamp
"""

self._name = name
self._deploy_obj_ref = deploy_obj_ref
self._app_msg = ""
self._deployment_state_manager = deployment_state_manager
self._deployment_params: List[Dict] = []
# This set tracks old deployments that are being deleted
self._deployments_to_delete = set()
self._ready_to_be_deleted = False
self._route_prefix = None
self._docs_path = None

if deploy_obj_ref:
self.status: ApplicationStatus = ApplicationStatus.DEPLOYING
self._status: ApplicationStatus = ApplicationStatus.DEPLOYING
else:
self.status: ApplicationStatus = ApplicationStatus.NOT_STARTED
self.name = name
self.deployment_params: List[Dict] = []
self.ready_to_be_deleted = False
self.deployment_state_manager = deployment_state_manager
self._status: ApplicationStatus = ApplicationStatus.NOT_STARTED
if deployment_time:
self.deployment_timestamp = deployment_time
self._deployment_timestamp = deployment_time
else:
self.deployment_timestamp = time.time()
self.deploy_obj_ref = deploy_obj_ref
self.app_msg = ""
self.route_prefix = None
self.docs_path = None
self._deployment_timestamp = time.time()

# This set tracks old deployments that are being deleted
self.deployments_to_delete = set()
@property
def ready_to_be_deleted(self) -> bool:
return self._ready_to_be_deleted

@property
def route_prefix(self) -> Optional[str]:
return self._route_prefix

@property
def docs_path(self) -> Optional[str]:
return self._docs_path

@property
def status(self) -> ApplicationStatus:
return self._status

@property
def deployment_timestamp(self) -> int:
return self._deployment_timestamp

@property
def deploy_obj_ref(self) -> Optional[ObjectRef]:
return self._deploy_obj_ref

@property
def deployments(self) -> List[str]:
"""Return all deployments name from the application"""
if self._deployment_params is None:
return []
return [params["name"] for params in self._deployment_params]

def delete(self):
"""Delete the application"""
self.status = ApplicationStatus.DELETING
self._status = ApplicationStatus.DELETING

def deploy(self, deployment_params: List[Dict]) -> List[str]:
"""Deploy the application.
Expand All @@ -76,60 +108,60 @@ def deploy(self, deployment_params: List[Dict]) -> List[str]:
# that are not used in the new deployment_params
to_be_deployed_deployments = {params["name"] for params in deployment_params}
cur_deployments_to_delete = []
for deployment_name in self.get_all_deployments():
for deployment_name in self.deployments:
if deployment_name not in to_be_deployed_deployments:
cur_deployments_to_delete.append(deployment_name)
self.deployments_to_delete.add(deployment_name)
self.deployment_params = deployment_params
self._deployments_to_delete.add(deployment_name)
self._deployment_params = deployment_params

# Update route prefix for application
num_route_prefixes = 0
num_docs_paths = 0
for deploy_param in deployment_params:
if deploy_param.get("route_prefix") is not None:
self.route_prefix = deploy_param["route_prefix"]
self._route_prefix = deploy_param["route_prefix"]
num_route_prefixes += 1

if deploy_param.get("docs_path") is not None:
self.docs_path = deploy_param["docs_path"]
self._docs_path = deploy_param["docs_path"]
num_docs_paths += 1
if num_route_prefixes > 1:
raise RayServeException(
f'Found multiple route prefix from application "{self.name}",'
f'Found multiple route prefix from application "{self._name}",'
" Please specify only one route prefix for the application "
"to avoid this issue."
)
# NOTE(zcin) This will not catch multiple FastAPI deployments in the application
# if user sets the docs path to None in their FastAPI app.
if num_docs_paths > 1:
raise RayServeException(
f'Found multiple deployments in application "{self.name}" that have '
f'Found multiple deployments in application "{self._name}" that have '
"a docs path. This may be due to using multiple FastAPI deployments "
"in your application. Please only include one deployment with a docs "
"path in your application to avoid this issue."
)

self.status = ApplicationStatus.DEPLOYING
self._status = ApplicationStatus.DEPLOYING
return cur_deployments_to_delete

def update_obj_ref(self, deploy_obj_ref: ObjectRef, deployment_time: int):
self.deploy_obj_ref = deploy_obj_ref
self.deployment_timestamp = deployment_time
self.status = ApplicationStatus.DEPLOYING
self._deploy_obj_ref = deploy_obj_ref
self._deployment_timestamp = deployment_time
self._status = ApplicationStatus.DEPLOYING

def _process_terminating_deployments(self):
"""Update the tracking for all deployments being deleted
When a deployment's status is None, the deployment will be
removed from application.
"""
for name in list(self.deployments_to_delete):
if self.deployment_state_manager.get_deployment(name):
for name in list(self._deployments_to_delete):
if self._deployment_state_manager.get_deployment(name):
logger.warning(
f"Deleting deployment {name} from application {self.name}."
f"Deleting deployment {name} from application {self._name}."
)
else:
self.deployments_to_delete.remove(name)
self._deployments_to_delete.remove(name)

def update(self):
"""Update the application status, maintain the ApplicationStatus.
Expand All @@ -141,87 +173,77 @@ def update(self):
DELETING: Mark ready_to_be_deleted as True when all deployments are gone.
"""

if self.ready_to_be_deleted:
if self._ready_to_be_deleted:
return

if self.status == ApplicationStatus.DELETING:
if self._status == ApplicationStatus.DELETING:
mark_delete = True
# Application won't be deleted until all deployments get cleaned up
for name in self.get_all_deployments():
if self.deployment_state_manager.get_deployment(name):
for name in self.deployments:
if self._deployment_state_manager.get_deployment(name):
logger.debug(
f"Deleting deployment {name} from application {self.name}."
f"Deleting deployment {name} from application {self._name}."
)
mark_delete = False
break
if self.deployments_to_delete:
if self._deployments_to_delete:
mark_delete = False
self.ready_to_be_deleted = mark_delete
self._ready_to_be_deleted = mark_delete
self._process_terminating_deployments()
return

if self.status == ApplicationStatus.DEPLOYING:
if self.deploy_obj_ref:
finished, pending = ray.wait([self.deploy_obj_ref], timeout=0)
if self._status == ApplicationStatus.DEPLOYING:
if self._deploy_obj_ref:
finished, pending = ray.wait([self._deploy_obj_ref], timeout=0)
if pending:
return
try:
ray.get(finished[0])
logger.info(f"Deploy task for app '{self.name}' ran successfully.")
logger.info(f"Deploy task for app '{self._name}' ran successfully.")
except RayTaskError as e:
self.status = ApplicationStatus.DEPLOY_FAILED
self._status = ApplicationStatus.DEPLOY_FAILED
# NOTE(zcin): we should use str(e) instead of traceback.format_exc()
# here because the full details of the error is not displayed
# properly with traceback.format_exc(). RayTaskError has its own
# custom __str__ function.
self.app_msg = f"Deploying app '{self.name}' failed:\n{str(e)}"
self.deploy_obj_ref = None
logger.warning(self.app_msg)
self._app_msg = f"Deploying app '{self._name}' failed:\n{str(e)}"
self._deploy_obj_ref = None
logger.warning(self._app_msg)
return
except RuntimeEnvSetupError:
self.status = ApplicationStatus.DEPLOY_FAILED
self.app_msg = (
f"Runtime env setup for app '{self.name}' "
self._status = ApplicationStatus.DEPLOY_FAILED
self._app_msg = (
f"Runtime env setup for app '{self._name}' "
f"failed:\n{traceback.format_exc()}"
)
self.deploy_obj_ref = None
logger.warning(self.app_msg)
self._deploy_obj_ref = None
logger.warning(self._app_msg)
return
deployments_statuses = (
self.deployment_state_manager.get_deployment_statuses(
self.get_all_deployments()
)
self._deployment_state_manager.get_deployment_statuses(self.deployments)
)
num_health_deployments = 0
for deployment_status in deployments_statuses:
if deployment_status.status == DeploymentStatus.UNHEALTHY:
self.status = ApplicationStatus.DEPLOY_FAILED
self._status = ApplicationStatus.DEPLOY_FAILED
return
if deployment_status.status == DeploymentStatus.HEALTHY:
num_health_deployments += 1
if num_health_deployments == len(deployments_statuses):
self.status = ApplicationStatus.RUNNING
self._status = ApplicationStatus.RUNNING

self._process_terminating_deployments()

def get_all_deployments(self) -> List[str]:
"""Return all deployments name from the application"""
if self.deployment_params is None:
return []
return [params["name"] for params in self.deployment_params]

def get_deployments_statuses(self) -> List[DeploymentStatusInfo]:
"""Return all deployment status information"""
return self.deployment_state_manager.get_deployment_statuses(
self.get_all_deployments()
)
return self._deployment_state_manager.get_deployment_statuses(self.deployments)

def get_application_status_info(self) -> ApplicationStatusInfo:
"""Return the application status information"""
return ApplicationStatusInfo(
self.status,
message=self.app_msg,
deployment_timestamp=self.deployment_timestamp,
self._status,
message=self._app_msg,
deployment_timestamp=self._deployment_timestamp,
)

def list_deployment_details(self) -> Dict[str, DeploymentDetails]:
Expand All @@ -235,15 +257,15 @@ def list_deployment_details(self) -> Dict[str, DeploymentDetails]:
been deleted.
"""
details = {
name: self.deployment_state_manager.get_deployment_details(name)
for name in self.get_all_deployments()
name: self._deployment_state_manager.get_deployment_details(name)
for name in self.deployments
}
return {k: v for k, v in details.items() if v is not None}


class ApplicationStateManager:
def __init__(self, deployment_state_manager):
self.deployment_state_manager = deployment_state_manager
self._deployment_state_manager = deployment_state_manager
self._application_states: Dict[str, ApplicationState] = {}

def delete_application(self, name: str):
Expand Down Expand Up @@ -287,7 +309,7 @@ def deploy_application(self, name: str, deployment_args: List[Dict]):
if name not in self._application_states:
self._application_states[name] = ApplicationState(
name,
self.deployment_state_manager,
self._deployment_state_manager,
)
record_extra_usage_tag(
TagKey.SERVE_NUM_APPS, str(len(self._application_states))
Expand All @@ -298,7 +320,7 @@ def get_deployments(self, app_name: str) -> List[str]:
"""Return all deployment names by app name"""
if app_name not in self._application_states:
return []
return self._application_states[app_name].get_all_deployments()
return self._application_states[app_name].deployments

def get_deployments_statuses(self, app_name: str) -> List[DeploymentStatusInfo]:
"""Return all deployment statuses by app name"""
Expand All @@ -315,6 +337,11 @@ def get_app_status(self, name: str) -> ApplicationStatusInfo:
)
return self._application_states[name].get_application_status_info()

def get_deployment_timestamp(self, name: str) -> float:
if name not in self._application_states:
return -1
return self._application_states[name].deployment_timestamp

def get_docs_path(self, app_name: str):
return self._application_states[app_name].docs_path

Expand Down Expand Up @@ -360,16 +387,11 @@ def create_application_state(
else:
self._application_states[name] = ApplicationState(
name,
self.deployment_state_manager,
self._deployment_state_manager,
deploy_obj_ref=deploy_obj_ref,
deployment_time=deployment_time,
)

def get_deployment_timestamp(self, name: str) -> float:
if name not in self._application_states:
return -1
return self._application_states[name].deployment_timestamp

def update(self):
"""Update each application state"""
apps_to_be_deleted = []
Expand Down
Loading

0 comments on commit d87a251

Please sign in to comment.