From 76250750358b57e572e78bb07fa03927bc460003 Mon Sep 17 00:00:00 2001 From: Raphael Jin Date: Thu, 30 May 2024 07:29:38 +0000 Subject: [PATCH] [Deploy] Avoid re-download the same model serving package. --- .../model_scheduler/worker_job_runner.py | 116 ++++++++---------- 1 file changed, 53 insertions(+), 63 deletions(-) diff --git a/python/fedml/computing/scheduler/model_scheduler/worker_job_runner.py b/python/fedml/computing/scheduler/model_scheduler/worker_job_runner.py index 332dab2547..831064b591 100755 --- a/python/fedml/computing/scheduler/model_scheduler/worker_job_runner.py +++ b/python/fedml/computing/scheduler/model_scheduler/worker_job_runner.py @@ -146,10 +146,10 @@ def run_impl(self, run_extend_queue_list, sender_message_center, logging.info(f"[Worker] Received model deployment request from master for endpoint {run_id}.") self.replica_handler = FedMLDeviceReplicaHandler(self.edge_id, self.request_json) if self.replica_handler is not None: - logging.info(f"=================Worker replica Handler ======================" - f"Reconcile with num diff {self.replica_handler.replica_num_diff} " - f"and version diff {self.replica_handler.replica_version_diff}." - f"=============================================================") + logging.info("\n================= Worker replica Handler ======================\n" + f"Reconcile with num diff {self.replica_handler.replica_num_diff}\n" + f"and version diff {self.replica_handler.replica_version_diff}\n" + "===============================================================\n") else: logging.error(f"[Worker] Replica handler is None.") return False @@ -178,39 +178,13 @@ def run_impl(self, run_extend_queue_list, sender_message_center, logging.info("[Worker] No need to reconcile.") return True - logging.info( - f"================Worker Reconcile Operations ======================\n" - f" op: {op}; op num: {op_num}.\n" - f"==================================================================\n") - - # If not rollback, download package from MLOps; otherwise, use the backup package - if op != "rollback": - logging.info("Download and unzip model to local...") - unzip_package_path, _, _ = \ - self.update_local_fedml_config(run_id, model_config, model_config_parameters) - if unzip_package_path is None: - logging.info("Failed to update local fedml config.") - self.check_runner_stop_event() - self.status_reporter.report_client_id_status( - self.edge_id, ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED, - is_from_model=True, run_id=run_id) - return False - - if not os.path.exists(unzip_package_path): - logging.info("Failed to unzip file.") - self.check_runner_stop_event() - self.status_reporter.report_client_id_status( - self.edge_id, ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED, - is_from_model=True, run_id=run_id) - return False - else: - logging.info("Try to use backup package to rollback...") - # Find folder under "~/.fedml/fedml-model-client/fedml/model_packages \ - # /${end_point_id}_${end_point_name}_${model_name}_${model_version}" - backup_folder_full_path = None - models_root_dir = ClientConstants.get_model_package_dir() + logging.info("\n================ Worker Reconcile Operations ======================\n" + f" op: {op}; op num: {op_num}.\n" + "===================================================================\n") + if op == "rollback": # Find the version (notified by master) to rollback + logging.info("Try to use backup package to rollback...") version_diff_dict = self.request_json["replica_version_diff"][str(self.edge_id)] version_rollback_to = None for replica_no, rollback_ops in version_diff_dict.items(): @@ -222,39 +196,38 @@ def run_impl(self, run_extend_queue_list, sender_message_center, return False model_version = version_rollback_to - # Format the version to match the folder name - model_version_formatted = version_rollback_to.replace(" ", "-") - model_version_formatted = model_version_formatted.replace(":", "-") - - last_run_folder_sub_fd = f"{run_id}_{end_point_name}_{model_name}_{model_version_formatted}" - for folder in os.listdir(models_root_dir): - if last_run_folder_sub_fd in folder: - backup_folder_full_path = os.path.join(models_root_dir, folder) - break - if backup_folder_full_path is None: - logging.error(f"No backup folder found for run_id: {self.run_id} edge_id: {self.edge_id} " - f"under {models_root_dir} with sub folder {last_run_folder_sub_fd}, rollback failed.") - return False + # Construct the parent folder name for the package + model_version_formatted = model_version.replace(" ", "-") + model_version_formatted = model_version_formatted.replace(":", "-") + models_root_dir = ClientConstants.get_model_package_dir() + parent_fd = f"{run_id}_{end_point_name}_{model_name}_{model_version_formatted}" - # Inside backup folder, find unzipped package with prefix unzip_fedml_run - unzip_package_path_parent = None - for folder in os.listdir(backup_folder_full_path): - if folder.startswith("unzip_fedml_run"): - unzip_package_path_parent = os.path.join(backup_folder_full_path, folder) - break - - # Inside unzip folder, find the unzipped package, should be the only one - unzip_package_path = None - for folder in os.listdir(unzip_package_path_parent): - if os.path.isdir(os.path.join(unzip_package_path_parent, folder)): - unzip_package_path = os.path.join(unzip_package_path_parent, folder) - break + # Check if the package is already downloaded + unzip_package_path = "" + if os.path.exists(os.path.join(models_root_dir, parent_fd)): + unzip_package_path = self.find_previous_downloaded_pkg(os.path.join(models_root_dir, parent_fd)) + # Download the package if not found + if unzip_package_path == "": + logging.info("Download and unzip model to local...") + unzip_package_path, _, _ = \ + self.update_local_fedml_config(run_id, model_config, model_config_parameters) if unzip_package_path is None: - logging.error(f"No unzipped package found for run_id: {self.run_id} edge_id: {self.edge_id} " - f"under {backup_folder_full_path}, rollback failed.") + logging.info("Failed to update local fedml config.") + self.check_runner_stop_event() + self.status_reporter.report_client_id_status( + self.edge_id, ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED, + is_from_model=True, run_id=run_id) return False + if not os.path.exists(unzip_package_path): + logging.info("Failed to unzip file.") + self.check_runner_stop_event() + self.status_reporter.report_client_id_status( + self.edge_id, ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED, + is_from_model=True, run_id=run_id) + return False + self.check_runner_stop_event() running_model_name, inference_output_url, inference_model_version, model_metadata, model_config = \ @@ -535,3 +508,20 @@ def build_dynamic_args(self, run_id, run_config, package_conf_object, base_dir): # Override def build_dynamic_constrain_variables(self, run_id, run_config): pass + + @staticmethod + def find_previous_downloaded_pkg(parent_dir) -> str: + unzip_fd = "" + res = "" + + for folder in os.listdir(parent_dir): + if folder.startswith("unzip_fedml_run"): + unzip_fd = os.path.join(parent_dir, folder) + break + + for folder in os.listdir(unzip_fd): + if os.path.isdir(os.path.join(unzip_fd, folder)): + res = os.path.join(unzip_fd, folder) + break + + return res