Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Deploy] Avoid re-download the same model serving package. #2139

Merged
merged 1 commit into from
May 30, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 53 additions & 63 deletions python/fedml/computing/scheduler/model_scheduler/worker_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,10 @@ def run_impl(self, run_extend_queue_list, sender_message_center,
logging.info(f"[Worker] Received model deployment request from master for endpoint {run_id}.")
self.replica_handler = FedMLDeviceReplicaHandler(self.edge_id, self.request_json)
if self.replica_handler is not None:
logging.info(f"=================Worker replica Handler ======================"
f"Reconcile with num diff {self.replica_handler.replica_num_diff} "
f"and version diff {self.replica_handler.replica_version_diff}."
f"=============================================================")
logging.info("\n================= Worker replica Handler ======================\n"
f"Reconcile with num diff {self.replica_handler.replica_num_diff}\n"
f"and version diff {self.replica_handler.replica_version_diff}\n"
"===============================================================\n")
else:
logging.error(f"[Worker] Replica handler is None.")
return False
Expand Down Expand Up @@ -178,39 +178,13 @@ def run_impl(self, run_extend_queue_list, sender_message_center,
logging.info("[Worker] No need to reconcile.")
return True

logging.info(
f"================Worker Reconcile Operations ======================\n"
f" op: {op}; op num: {op_num}.\n"
f"==================================================================\n")

# If not rollback, download package from MLOps; otherwise, use the backup package
if op != "rollback":
logging.info("Download and unzip model to local...")
unzip_package_path, _, _ = \
self.update_local_fedml_config(run_id, model_config, model_config_parameters)
if unzip_package_path is None:
logging.info("Failed to update local fedml config.")
self.check_runner_stop_event()
self.status_reporter.report_client_id_status(
self.edge_id, ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED,
is_from_model=True, run_id=run_id)
return False

if not os.path.exists(unzip_package_path):
logging.info("Failed to unzip file.")
self.check_runner_stop_event()
self.status_reporter.report_client_id_status(
self.edge_id, ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED,
is_from_model=True, run_id=run_id)
return False
else:
logging.info("Try to use backup package to rollback...")
# Find folder under "~/.fedml/fedml-model-client/fedml/model_packages \
# /${end_point_id}_${end_point_name}_${model_name}_${model_version}"
backup_folder_full_path = None
models_root_dir = ClientConstants.get_model_package_dir()
logging.info("\n================ Worker Reconcile Operations ======================\n"
f" op: {op}; op num: {op_num}.\n"
"===================================================================\n")

if op == "rollback":
# Find the version (notified by master) to rollback
logging.info("Try to use backup package to rollback...")
version_diff_dict = self.request_json["replica_version_diff"][str(self.edge_id)]
version_rollback_to = None
for replica_no, rollback_ops in version_diff_dict.items():
Expand All @@ -222,39 +196,38 @@ def run_impl(self, run_extend_queue_list, sender_message_center,
return False
model_version = version_rollback_to

# Format the version to match the folder name
model_version_formatted = version_rollback_to.replace(" ", "-")
model_version_formatted = model_version_formatted.replace(":", "-")

last_run_folder_sub_fd = f"{run_id}_{end_point_name}_{model_name}_{model_version_formatted}"
for folder in os.listdir(models_root_dir):
if last_run_folder_sub_fd in folder:
backup_folder_full_path = os.path.join(models_root_dir, folder)
break
if backup_folder_full_path is None:
logging.error(f"No backup folder found for run_id: {self.run_id} edge_id: {self.edge_id} "
f"under {models_root_dir} with sub folder {last_run_folder_sub_fd}, rollback failed.")
return False
# Construct the parent folder name for the package
model_version_formatted = model_version.replace(" ", "-")
model_version_formatted = model_version_formatted.replace(":", "-")
models_root_dir = ClientConstants.get_model_package_dir()
parent_fd = f"{run_id}_{end_point_name}_{model_name}_{model_version_formatted}"

# Inside backup folder, find unzipped package with prefix unzip_fedml_run
unzip_package_path_parent = None
for folder in os.listdir(backup_folder_full_path):
if folder.startswith("unzip_fedml_run"):
unzip_package_path_parent = os.path.join(backup_folder_full_path, folder)
break

# Inside unzip folder, find the unzipped package, should be the only one
unzip_package_path = None
for folder in os.listdir(unzip_package_path_parent):
if os.path.isdir(os.path.join(unzip_package_path_parent, folder)):
unzip_package_path = os.path.join(unzip_package_path_parent, folder)
break
# Check if the package is already downloaded
unzip_package_path = ""
if os.path.exists(os.path.join(models_root_dir, parent_fd)):
unzip_package_path = self.find_previous_downloaded_pkg(os.path.join(models_root_dir, parent_fd))

# Download the package if not found
if unzip_package_path == "":
logging.info("Download and unzip model to local...")
unzip_package_path, _, _ = \
self.update_local_fedml_config(run_id, model_config, model_config_parameters)
if unzip_package_path is None:
logging.error(f"No unzipped package found for run_id: {self.run_id} edge_id: {self.edge_id} "
f"under {backup_folder_full_path}, rollback failed.")
logging.info("Failed to update local fedml config.")
self.check_runner_stop_event()
self.status_reporter.report_client_id_status(
self.edge_id, ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED,
is_from_model=True, run_id=run_id)
return False

if not os.path.exists(unzip_package_path):
logging.info("Failed to unzip file.")
self.check_runner_stop_event()
self.status_reporter.report_client_id_status(
self.edge_id, ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED,
is_from_model=True, run_id=run_id)
return False

self.check_runner_stop_event()

running_model_name, inference_output_url, inference_model_version, model_metadata, model_config = \
Expand Down Expand Up @@ -535,3 +508,20 @@ def build_dynamic_args(self, run_id, run_config, package_conf_object, base_dir):
# Override
def build_dynamic_constrain_variables(self, run_id, run_config):
pass

@staticmethod
def find_previous_downloaded_pkg(parent_dir) -> str:
unzip_fd = ""
res = ""

for folder in os.listdir(parent_dir):
if folder.startswith("unzip_fedml_run"):
unzip_fd = os.path.join(parent_dir, folder)
break

for folder in os.listdir(unzip_fd):
if os.path.isdir(os.path.join(unzip_fd, folder)):
res = os.path.join(unzip_fd, folder)
break

return res
Loading