diff --git a/labs/kubeflow/image-classify/eks/STEP3_distributed_k8s_training.ipynb b/labs/kubeflow/image-classify/eks/STEP3_distributed_k8s_training.ipynb index 2e510fc..7c4afbd 100644 --- a/labs/kubeflow/image-classify/eks/STEP3_distributed_k8s_training.ipynb +++ b/labs/kubeflow/image-classify/eks/STEP3_distributed_k8s_training.ipynb @@ -67,9 +67,9 @@ "from kubeflow.training import constants\n", "from kubeflow.training.utils import utils\n", "from kubeflow.training import V1ReplicaSpec\n", - "from kubeflow.training import V1PyTorchJob\n", - "from kubeflow.training import V1PyTorchJobSpec\n", - "from kubeflow.training import PyTorchJobClient\n", + "from kubeflow.training import KubeflowOrgV1PyTorchJob\n", + "from kubeflow.training import KubeflowOrgV1PyTorchJobSpec\n", + "from kubeflow.training import TrainingClient\n", "from kubeflow.training import V1RunPolicy\n", "\n", "import kfp\n", @@ -248,11 +248,11 @@ "outputs": [], "source": [ "# Define PyTorchJob custom resource manifest \n", - "pytorchjob = V1PyTorchJob(\n", + "pytorchjob = KubeflowOrgV1PyTorchJob(\n", " api_version=\"kubeflow.org/v1\",\n", " kind=\"PyTorchJob\",\n", " metadata=V1ObjectMeta(name=pytorch_distributed_jobname,namespace=user_namespace),\n", - " spec=V1PyTorchJobSpec(\n", + " spec=KubeflowOrgV1PyTorchJobSpec(\n", " run_policy=V1RunPolicy(clean_pod_policy=\"None\"),\n", " pytorch_replica_specs={\"Master\": master,\n", " \"Worker\": worker}\n", @@ -269,7 +269,7 @@ }, "outputs": [], "source": [ - "pytorchjob_client = PyTorchJobClient()\n", + "pytorchjob_client = TrainingClient()\n", "\n", "try:\n", " if(pytorchjob_client.get(pytorch_distributed_jobname, namespace=user_namespace)):\n", @@ -287,7 +287,7 @@ "outputs": [], "source": [ "# Creates and Submits PyTorchJob custom resource file to Kubernetes\n", - "pytorch_job_manifest=pytorchjob_client.create(pytorchjob)" + "pytorch_job_manifest=pytorchjob_client.create_pytorchjob(pytorchjob)" ] }, { @@ -347,7 +347,7 @@ "source": [ "# Check if the job succeeded\n", "\n", - "pytorchjob_client.is_job_succeeded(pytorch_distributed_jobname, user_namespace)" + "pytorchjob_client.is_job_succeeded(pytorch_distributed_jobname, user_namespace, job_kind=constants.PYTORCHJOB_KIND)" ] }, { @@ -395,7 +395,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.13" + "version": "3.10.8" } }, "nbformat": 4, diff --git a/labs/kubeflow/image-classify/eks/pytorch_dist_utility.py b/labs/kubeflow/image-classify/eks/pytorch_dist_utility.py new file mode 100644 index 0000000..b13ac32 --- /dev/null +++ b/labs/kubeflow/image-classify/eks/pytorch_dist_utility.py @@ -0,0 +1,93 @@ +from kubeflow.training import TrainingClient +from kubeflow.training.constants import constants + +def save_master_worker_spec(pytorch_client: TrainingClient, pytorch_jobname: str) -> str: + """save_master_worker_spec saves master and worker spec to a pipeline_yaml_specifications folder. """ + import yaml + import os + + if pytorch_client is None: + print("Please pass a valid pytorch client") + if (not len(pytorch_jobname) or pytorch_jobname is None): + print("Please pass a valid job name") + + pytorchjob=pytorch_client.get_pytorchjob(pytorch_jobname) + + master_spec=pytorchjob.spec.pytorch_replica_specs['Master'] + worker_spec=pytorchjob.spec.pytorch_replica_specs['Worker'] + + if not os.path.exists("pipeline_yaml_specifications"): + os.makedirs("pipeline_yaml_specifications") + + with open('pipeline_yaml_specifications/pipeline_master_spec.yml', 'w') as yaml_outfile_file: + yaml.dump(master_spec, yaml_outfile_file, default_flow_style=False) + + with open('pipeline_yaml_specifications/pipeline_worker_spec.yml', 'w') as yaml_outfile_file: + yaml.dump(worker_spec, yaml_outfile_file, default_flow_style=False) + + return "specs saved in ./pipeline_yaml_specifications folder" + +def read_logs(pytorch_client: TrainingClient, jobname: str, namespace: str, log_type: str) -> None: + """read_logs helps get logs from master and worker pods of distributed training using PyTorch Training Operators. + log_type: all, worker:all, master:all, worker:0, worker:1 + """ + import time + + print("Waiting for Pod condition to be Running") + + pytorch_client.wait_for_job_conditions( + jobname, + expected_conditions=set(["Running"]), + job_kind=constants.PYTORCHJOB_KIND, + namespace=namespace + ) + + print("Master and Worker Pods are Running now") + + print("**** PyTorchJob status **** ") + print(pytorch_client.get_job_conditions(jobname, namespace, job_kind=constants.PYTORCHJOB_KIND)) + print("*************************** \n") + + print("\n**** Pod names of the PyTorchJob **** ") + print(pytorch_client.get_job_pod_names(jobname, namespace)) + print("*************************** \n") + + if pytorch_client is None: + print("Please pass a valid pytorch client") + if (not len(jobname) or jobname is None): + print("Please pass a valid job name") + if (not len(namespace) or namespace is None): + print("Please pass a valid namespace") + if (not len(log_type) or (log_type is None) or (":" not in log_type and "all" not in log_type)): + print("Please pass a valid log_type name which is not empty and has ':'. e.g all, worker:all, master:all, worker:0, worker:1") + + log_type_list=log_type.split(":") + + if log_type_list[0] in ['worker','master']: + if log_type_list[1] == 'all': + pytorch_client.get_job_logs( + jobname, + namespace=namespace, + replica_type=log_type_list[0], + container=constants.PYTORCHJOB_CONTAINER, + is_master=False, + follow=True + ) + else: + pytorch_client.get_job_logs( + jobname, + namespace=namespace, + replica_type=log_type_list[0], + replica_index=log_type_list[1], + container=constants.PYTORCHJOB_CONTAINER, + is_master=False, + follow=False + ) + else: + pytorch_client.get_job_logs( + jobname, + namespace=namespace, + container=constants.PYTORCHJOB_CONTAINER, + is_master=False, + follow=False + ) \ No newline at end of file